Merge branch 'main' of github.com:QwenLM/qwen-code into fix/windows-shell-output-garbled

This commit is contained in:
xuewenjie
2025-12-08 11:35:47 +08:00
71 changed files with 3403 additions and 1019 deletions

View File

@@ -63,6 +63,7 @@ vi.mock('../tools/tool-registry', () => {
ToolRegistryMock.prototype.registerTool = vi.fn();
ToolRegistryMock.prototype.discoverAllTools = vi.fn();
ToolRegistryMock.prototype.getAllTools = vi.fn(() => []); // Mock methods if needed
ToolRegistryMock.prototype.getAllToolNames = vi.fn(() => []);
ToolRegistryMock.prototype.getTool = vi.fn();
ToolRegistryMock.prototype.getFunctionDeclarations = vi.fn(() => []);
return { ToolRegistry: ToolRegistryMock };

View File

@@ -46,6 +46,7 @@ import { ExitPlanModeTool } from '../tools/exitPlanMode.js';
import { GlobTool } from '../tools/glob.js';
import { GrepTool } from '../tools/grep.js';
import { LSTool } from '../tools/ls.js';
import type { SendSdkMcpMessage } from '../tools/mcp-client.js';
import { MemoryTool, setGeminiMdFilename } from '../tools/memoryTool.js';
import { ReadFileTool } from '../tools/read-file.js';
import { ReadManyFilesTool } from '../tools/read-many-files.js';
@@ -239,9 +240,18 @@ export class MCPServerConfig {
readonly targetAudience?: string,
/* targetServiceAccount format: <service-account-name>@<project-num>.iam.gserviceaccount.com */
readonly targetServiceAccount?: string,
// SDK MCP server type - 'sdk' indicates server runs in SDK process
readonly type?: 'sdk',
) {}
}
/**
* Check if an MCP server config represents an SDK server
*/
export function isSdkMcpServerConfig(config: MCPServerConfig): boolean {
return config.type === 'sdk';
}
export enum AuthProviderType {
DYNAMIC_DISCOVERY = 'dynamic_discovery',
GOOGLE_CREDENTIALS = 'google_credentials',
@@ -360,6 +370,17 @@ function normalizeConfigOutputFormat(
}
}
/**
* Options for Config.initialize()
*/
export interface ConfigInitializeOptions {
/**
* Callback for sending MCP messages to SDK servers via control plane.
* Required for SDK MCP server support in SDK mode.
*/
sendSdkMcpMessage?: SendSdkMcpMessage;
}
export class Config {
private sessionId: string;
private sessionData?: ResumedSessionData;
@@ -599,8 +620,9 @@ export class Config {
/**
* Must only be called once, throws if called again.
* @param options Optional initialization options including sendSdkMcpMessage callback
*/
async initialize(): Promise<void> {
async initialize(options?: ConfigInitializeOptions): Promise<void> {
if (this.initialized) {
throw Error('Config was already initialized');
}
@@ -619,7 +641,9 @@ export class Config {
this.subagentManager.loadSessionSubagents(this.sessionSubagents);
}
this.toolRegistry = await this.createToolRegistry();
this.toolRegistry = await this.createToolRegistry(
options?.sendSdkMcpMessage,
);
await this.geminiClient.initialize();
@@ -1261,8 +1285,14 @@ export class Config {
return this.subagentManager;
}
async createToolRegistry(): Promise<ToolRegistry> {
const registry = new ToolRegistry(this, this.eventEmitter);
async createToolRegistry(
sendSdkMcpMessage?: SendSdkMcpMessage,
): Promise<ToolRegistry> {
const registry = new ToolRegistry(
this,
this.eventEmitter,
sendSdkMcpMessage,
);
const coreToolsConfig = this.getCoreTools();
const excludeToolsConfig = this.getExcludeTools();
@@ -1347,6 +1377,7 @@ export class Config {
}
await registry.discoverAllTools();
console.debug('ToolRegistry created', registry.getAllToolNames());
return registry;
}
}

View File

@@ -448,6 +448,7 @@ describe('Gemini Client (client.ts)', () => {
getHistory: mockGetHistory,
addHistory: vi.fn(),
setHistory: vi.fn(),
stripThoughtsFromHistory: vi.fn(),
} as unknown as GeminiChat;
});
@@ -462,6 +463,7 @@ describe('Gemini Client (client.ts)', () => {
const mockOriginalChat: Partial<GeminiChat> = {
getHistory: vi.fn((_curated?: boolean) => chatHistory),
setHistory: vi.fn(),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockOriginalChat as GeminiChat;
@@ -1080,6 +1082,7 @@ describe('Gemini Client (client.ts)', () => {
const mockChat = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
} as unknown as GeminiChat;
client['chat'] = mockChat;
@@ -1142,6 +1145,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1197,6 +1201,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1273,6 +1278,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1319,6 +1325,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1363,6 +1370,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1450,6 +1458,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1506,6 +1515,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -1586,6 +1596,7 @@ ${JSON.stringify(
.mockReturnValue([
{ role: 'user', parts: [{ text: 'previous message' }] },
]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
});
@@ -1840,6 +1851,7 @@ ${JSON.stringify(
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]), // Default empty history
setHistory: vi.fn(),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -2180,6 +2192,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -2216,6 +2229,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;
@@ -2256,6 +2270,7 @@ ${JSON.stringify(
const mockChat: Partial<GeminiChat> = {
addHistory: vi.fn(),
getHistory: vi.fn().mockReturnValue([]),
stripThoughtsFromHistory: vi.fn(),
};
client['chat'] = mockChat as GeminiChat;

View File

@@ -419,6 +419,9 @@ export class GeminiClient {
// record user message for session management
this.config.getChatRecordingService()?.recordUserMessage(request);
// strip thoughts from history before sending the message
this.stripThoughtsFromHistory();
}
this.sessionTurnCount++;
if (

View File

@@ -1541,10 +1541,10 @@ describe('GeminiChat', () => {
{
role: 'model',
parts: [
{ text: 'thinking...', thoughtSignature: 'thought-123' },
{ text: 'thinking...', thought: true },
{ text: 'hi' },
{
functionCall: { name: 'test', args: {} },
thoughtSignature: 'thought-456',
},
],
},
@@ -1559,10 +1559,7 @@ describe('GeminiChat', () => {
},
{
role: 'model',
parts: [
{ text: 'thinking...' },
{ functionCall: { name: 'test', args: {} } },
],
parts: [{ text: 'hi' }, { functionCall: { name: 'test', args: {} } }],
},
]);
});

View File

@@ -443,20 +443,28 @@ export class GeminiChat {
}
stripThoughtsFromHistory(): void {
this.history = this.history.map((content) => {
const newContent = { ...content };
if (newContent.parts) {
newContent.parts = newContent.parts.map((part) => {
if (part && typeof part === 'object' && 'thoughtSignature' in part) {
const newPart = { ...part };
delete (newPart as { thoughtSignature?: string }).thoughtSignature;
return newPart;
}
return part;
});
}
return newContent;
});
this.history = this.history
.map((content) => {
if (!content.parts) return content;
// Filter out thought parts entirely
const filteredParts = content.parts.filter(
(part) =>
!(
part &&
typeof part === 'object' &&
'thought' in part &&
part.thought
),
);
return {
...content,
parts: filteredParts,
};
})
// Remove Content objects that have no parts left after filtering
.filter((content) => content.parts && content.parts.length > 0);
}
setTools(tools: Tool[]): void {
@@ -497,8 +505,6 @@ export class GeminiChat {
): AsyncGenerator<GenerateContentResponse> {
// Collect ALL parts from the model response (including thoughts for recording)
const allModelParts: Part[] = [];
// Non-thought parts for history (what we send back to the API)
const historyParts: Part[] = [];
let usageMetadata: GenerateContentResponseUsageMetadata | undefined;
let hasToolCall = false;
@@ -516,8 +522,6 @@ export class GeminiChat {
// Collect all parts for recording
allModelParts.push(...content.parts);
// Collect non-thought parts for history
historyParts.push(...content.parts.filter((part) => !part.thought));
}
}
@@ -534,9 +538,15 @@ export class GeminiChat {
yield chunk; // Yield every chunk to the UI immediately.
}
// Consolidate text parts for history (merges adjacent text parts).
const thoughtParts = allModelParts.filter((part) => part.thought);
const thoughtText = thoughtParts
.map((part) => part.text)
.join('')
.trim();
const contentParts = allModelParts.filter((part) => !part.thought);
const consolidatedHistoryParts: Part[] = [];
for (const part of historyParts) {
for (const part of contentParts) {
const lastPart =
consolidatedHistoryParts[consolidatedHistoryParts.length - 1];
if (
@@ -550,20 +560,21 @@ export class GeminiChat {
}
}
const responseText = consolidatedHistoryParts
const contentText = consolidatedHistoryParts
.filter((part) => part.text)
.map((part) => part.text)
.join('')
.trim();
// Record assistant turn with raw Content and metadata
if (responseText || hasToolCall || usageMetadata) {
if (thoughtText || contentText || hasToolCall || usageMetadata) {
this.chatRecordingService?.recordAssistantTurn({
model,
message: [
...(responseText ? [{ text: responseText }] : []),
...(thoughtText ? [{ text: thoughtText, thought: true }] : []),
...(contentText ? [{ text: contentText }] : []),
...(hasToolCall
? historyParts
? contentParts
.filter((part) => part.functionCall)
.map((part) => ({ functionCall: part.functionCall }))
: []),
@@ -579,7 +590,7 @@ export class GeminiChat {
// We throw an error only when there's no tool call AND:
// - No finish reason, OR
// - Empty response text (e.g., only thoughts with no actual content)
if (!hasToolCall && (!hasFinishReason || !responseText)) {
if (!hasToolCall && (!hasFinishReason || !contentText)) {
if (!hasFinishReason) {
throw new InvalidStreamError(
'Model stream ended without a finish reason.',
@@ -593,8 +604,13 @@ export class GeminiChat {
}
}
// Add to history (without thoughts, for API calls)
this.history.push({ role: 'model', parts: consolidatedHistoryParts });
this.history.push({
role: 'model',
parts: [
...(thoughtText ? [{ text: thoughtText, thought: true }] : []),
...consolidatedHistoryParts,
],
});
}
}

View File

@@ -8,6 +8,7 @@ import { describe, it, expect, beforeEach } from 'vitest';
import { OpenAIContentConverter } from './converter.js';
import type { StreamingToolCallParser } from './streamingToolCallParser.js';
import type { GenerateContentParameters, Content } from '@google/genai';
import type OpenAI from 'openai';
describe('OpenAIContentConverter', () => {
let converter: OpenAIContentConverter;
@@ -142,4 +143,63 @@ describe('OpenAIContentConverter', () => {
expect(toolMessage?.content).toBe('{"data":{"value":42}}');
});
});
describe('OpenAI -> Gemini reasoning content', () => {
it('should convert reasoning_content to a thought part for non-streaming responses', () => {
const response = converter.convertOpenAIResponseToGemini({
object: 'chat.completion',
id: 'chatcmpl-1',
created: 123,
model: 'gpt-test',
choices: [
{
index: 0,
message: {
role: 'assistant',
content: 'final answer',
reasoning_content: 'chain-of-thought',
},
finish_reason: 'stop',
logprobs: null,
},
],
} as unknown as OpenAI.Chat.ChatCompletion);
const parts = response.candidates?.[0]?.content?.parts;
expect(parts?.[0]).toEqual(
expect.objectContaining({ thought: true, text: 'chain-of-thought' }),
);
expect(parts?.[1]).toEqual(
expect.objectContaining({ text: 'final answer' }),
);
});
it('should convert streaming reasoning_content delta to a thought part', () => {
const chunk = converter.convertOpenAIChunkToGemini({
object: 'chat.completion.chunk',
id: 'chunk-1',
created: 456,
choices: [
{
index: 0,
delta: {
content: 'visible text',
reasoning_content: 'thinking...',
},
finish_reason: 'stop',
logprobs: null,
},
],
model: 'gpt-test',
} as unknown as OpenAI.Chat.ChatCompletionChunk);
const parts = chunk.candidates?.[0]?.content?.parts;
expect(parts?.[0]).toEqual(
expect.objectContaining({ thought: true, text: 'thinking...' }),
);
expect(parts?.[1]).toEqual(
expect.objectContaining({ text: 'visible text' }),
);
});
});
});

View File

@@ -31,6 +31,25 @@ interface ExtendedCompletionUsage extends OpenAI.CompletionUsage {
cached_tokens?: number;
}
interface ExtendedChatCompletionAssistantMessageParam
extends OpenAI.Chat.ChatCompletionAssistantMessageParam {
reasoning_content?: string | null;
}
type ExtendedChatCompletionMessageParam =
| OpenAI.Chat.ChatCompletionMessageParam
| ExtendedChatCompletionAssistantMessageParam;
export interface ExtendedCompletionMessage
extends OpenAI.Chat.ChatCompletionMessage {
reasoning_content?: string | null;
}
export interface ExtendedCompletionChunkDelta
extends OpenAI.Chat.ChatCompletionChunk.Choice.Delta {
reasoning_content?: string | null;
}
/**
* Tool call accumulator for streaming responses
*/
@@ -44,7 +63,8 @@ export interface ToolCallAccumulator {
* Parsed parts from Gemini content, categorized by type
*/
interface ParsedParts {
textParts: string[];
thoughtParts: string[];
contentParts: string[];
functionCalls: FunctionCall[];
functionResponses: FunctionResponse[];
mediaParts: Array<{
@@ -251,7 +271,7 @@ export class OpenAIContentConverter {
*/
private processContents(
contents: ContentListUnion,
messages: OpenAI.Chat.ChatCompletionMessageParam[],
messages: ExtendedChatCompletionMessageParam[],
): void {
if (Array.isArray(contents)) {
for (const content of contents) {
@@ -267,7 +287,7 @@ export class OpenAIContentConverter {
*/
private processContent(
content: ContentUnion | PartUnion,
messages: OpenAI.Chat.ChatCompletionMessageParam[],
messages: ExtendedChatCompletionMessageParam[],
): void {
if (typeof content === 'string') {
messages.push({ role: 'user' as const, content });
@@ -301,11 +321,19 @@ export class OpenAIContentConverter {
},
}));
messages.push({
const assistantMessage: ExtendedChatCompletionAssistantMessageParam = {
role: 'assistant' as const,
content: parsedParts.textParts.join('') || null,
content: parsedParts.contentParts.join('') || null,
tool_calls: toolCalls,
});
};
// Only include reasoning_content if it has actual content
const reasoningContent = parsedParts.thoughtParts.join('');
if (reasoningContent) {
assistantMessage.reasoning_content = reasoningContent;
}
messages.push(assistantMessage);
return;
}
@@ -322,7 +350,8 @@ export class OpenAIContentConverter {
* Parse Gemini parts into categorized components
*/
private parseParts(parts: Part[]): ParsedParts {
const textParts: string[] = [];
const thoughtParts: string[] = [];
const contentParts: string[] = [];
const functionCalls: FunctionCall[] = [];
const functionResponses: FunctionResponse[] = [];
const mediaParts: Array<{
@@ -334,9 +363,20 @@ export class OpenAIContentConverter {
for (const part of parts) {
if (typeof part === 'string') {
textParts.push(part);
} else if ('text' in part && part.text) {
textParts.push(part.text);
contentParts.push(part);
} else if (
'text' in part &&
part.text &&
!('thought' in part && part.thought)
) {
contentParts.push(part.text);
} else if (
'text' in part &&
part.text &&
'thought' in part &&
part.thought
) {
thoughtParts.push(part.text);
} else if ('functionCall' in part && part.functionCall) {
functionCalls.push(part.functionCall);
} else if ('functionResponse' in part && part.functionResponse) {
@@ -361,7 +401,13 @@ export class OpenAIContentConverter {
}
}
return { textParts, functionCalls, functionResponses, mediaParts };
return {
thoughtParts,
contentParts,
functionCalls,
functionResponses,
mediaParts,
};
}
private extractFunctionResponseContent(response: unknown): string {
@@ -408,14 +454,29 @@ export class OpenAIContentConverter {
*/
private createMultimodalMessage(
role: 'user' | 'assistant',
parsedParts: Pick<ParsedParts, 'textParts' | 'mediaParts'>,
): OpenAI.Chat.ChatCompletionMessageParam | null {
const { textParts, mediaParts } = parsedParts;
const content = textParts.map((text) => ({ type: 'text' as const, text }));
parsedParts: Pick<
ParsedParts,
'contentParts' | 'mediaParts' | 'thoughtParts'
>,
): ExtendedChatCompletionMessageParam | null {
const { contentParts, mediaParts, thoughtParts } = parsedParts;
const reasoningContent = thoughtParts.join('');
const content = contentParts.map((text) => ({
type: 'text' as const,
text,
}));
// If no media parts, return simple text message
if (mediaParts.length === 0) {
return content.length > 0 ? { role, content } : null;
if (content.length === 0) return null;
const message: ExtendedChatCompletionMessageParam = { role, content };
// Only include reasoning_content if it has actual content
if (reasoningContent) {
(
message as ExtendedChatCompletionAssistantMessageParam
).reasoning_content = reasoningContent;
}
return message;
}
// For assistant messages with media, convert to text only
@@ -536,6 +597,13 @@ export class OpenAIContentConverter {
const parts: Part[] = [];
// Handle reasoning content (thoughts)
const reasoningText = (choice.message as ExtendedCompletionMessage)
.reasoning_content;
if (reasoningText) {
parts.push({ text: reasoningText, thought: true });
}
// Handle text content
if (choice.message.content) {
parts.push({ text: choice.message.content });
@@ -632,6 +700,12 @@ export class OpenAIContentConverter {
if (choice) {
const parts: Part[] = [];
const reasoningText = (choice.delta as ExtendedCompletionChunkDelta)
.reasoning_content;
if (reasoningText) {
parts.push({ text: reasoningText, thought: true });
}
// Handle text content
if (choice.delta?.content) {
if (typeof choice.delta.content === 'string') {
@@ -721,6 +795,8 @@ export class OpenAIContentConverter {
const promptTokens = usage.prompt_tokens || 0;
const completionTokens = usage.completion_tokens || 0;
const totalTokens = usage.total_tokens || 0;
const thinkingTokens =
usage.completion_tokens_details?.reasoning_tokens || 0;
// Support both formats: prompt_tokens_details.cached_tokens (OpenAI standard)
// and cached_tokens (some models return it at top level)
const extendedUsage = usage as ExtendedCompletionUsage;
@@ -743,6 +819,7 @@ export class OpenAIContentConverter {
response.usageMetadata = {
promptTokenCount: finalPromptTokens,
candidatesTokenCount: finalCompletionTokens,
thoughtsTokenCount: thinkingTokens,
totalTokenCount: totalTokens,
cachedContentTokenCount: cachedTokens,
};

View File

@@ -561,11 +561,14 @@ describe('DefaultTelemetryService', () => {
choices: [
{
index: 0,
delta: { content: 'Hello' },
delta: {
content: 'Hello',
reasoning_content: 'thinking ',
},
finish_reason: null,
},
],
} as OpenAI.Chat.ChatCompletionChunk,
} as unknown as OpenAI.Chat.ChatCompletionChunk,
{
id: 'test-id',
object: 'chat.completion.chunk',
@@ -574,7 +577,10 @@ describe('DefaultTelemetryService', () => {
choices: [
{
index: 0,
delta: { content: ' world' },
delta: {
content: ' world',
reasoning_content: 'more',
},
finish_reason: 'stop',
},
],
@@ -583,7 +589,7 @@ describe('DefaultTelemetryService', () => {
completion_tokens: 5,
total_tokens: 15,
},
} as OpenAI.Chat.ChatCompletionChunk,
} as unknown as OpenAI.Chat.ChatCompletionChunk,
];
await telemetryService.logStreamingSuccess(
@@ -603,11 +609,11 @@ describe('DefaultTelemetryService', () => {
choices: [
{
index: 0,
message: {
message: expect.objectContaining({
role: 'assistant',
content: 'Hello world',
refusal: null,
},
reasoning_content: 'thinking more',
}),
finish_reason: 'stop',
logprobs: null,
},
@@ -722,11 +728,14 @@ describe('DefaultTelemetryService', () => {
choices: [
{
index: 0,
delta: { content: 'Hello' },
delta: {
content: 'Hello',
reasoning_content: 'thinking ',
},
finish_reason: null,
},
],
} as OpenAI.Chat.ChatCompletionChunk,
} as unknown as OpenAI.Chat.ChatCompletionChunk,
{
id: 'test-id',
object: 'chat.completion.chunk',
@@ -735,7 +744,10 @@ describe('DefaultTelemetryService', () => {
choices: [
{
index: 0,
delta: { content: ' world!' },
delta: {
content: ' world!',
reasoning_content: 'more',
},
finish_reason: 'stop',
},
],
@@ -744,7 +756,7 @@ describe('DefaultTelemetryService', () => {
completion_tokens: 5,
total_tokens: 15,
},
} as OpenAI.Chat.ChatCompletionChunk,
} as unknown as OpenAI.Chat.ChatCompletionChunk,
];
await telemetryService.logStreamingSuccess(
@@ -757,27 +769,14 @@ describe('DefaultTelemetryService', () => {
expect(openaiLogger.logInteraction).toHaveBeenCalledWith(
mockOpenAIRequest,
expect.objectContaining({
id: 'test-id',
object: 'chat.completion',
created: 1234567890,
model: 'gpt-4',
choices: [
{
index: 0,
message: {
role: 'assistant',
expect.objectContaining({
message: expect.objectContaining({
content: 'Hello world!',
refusal: null,
},
finish_reason: 'stop',
logprobs: null,
},
reasoning_content: 'thinking more',
}),
}),
],
usage: {
prompt_tokens: 10,
completion_tokens: 5,
total_tokens: 15,
},
}),
);
});

View File

@@ -10,6 +10,7 @@ import { ApiErrorEvent, ApiResponseEvent } from '../../telemetry/types.js';
import { OpenAILogger } from '../../utils/openaiLogger.js';
import type { GenerateContentResponse } from '@google/genai';
import type OpenAI from 'openai';
import type { ExtendedCompletionChunkDelta } from './converter.js';
export interface RequestContext {
userPromptId: string;
@@ -172,6 +173,7 @@ export class DefaultTelemetryService implements TelemetryService {
| 'content_filter'
| 'function_call'
| null = null;
let combinedReasoning = '';
let usage:
| {
prompt_tokens: number;
@@ -183,6 +185,12 @@ export class DefaultTelemetryService implements TelemetryService {
for (const chunk of chunks) {
const choice = chunk.choices?.[0];
if (choice) {
// Combine reasoning content
const reasoningContent = (choice.delta as ExtendedCompletionChunkDelta)
?.reasoning_content;
if (reasoningContent) {
combinedReasoning += reasoningContent;
}
// Combine text content
if (choice.delta?.content) {
combinedContent += choice.delta.content;
@@ -230,6 +238,11 @@ export class DefaultTelemetryService implements TelemetryService {
content: combinedContent || null,
refusal: null,
};
if (combinedReasoning) {
// Attach reasoning content if any thought tokens were streamed
(message as { reasoning_content?: string }).reasoning_content =
combinedReasoning;
}
// Add tool calls if any
if (toolCalls.length > 0) {

View File

@@ -120,6 +120,97 @@ describe('Turn', () => {
expect(turn.getDebugResponses().length).toBe(2);
});
it('should emit Thought events when a thought part is present', async () => {
const mockResponseStream = (async function* () {
yield {
type: StreamEventType.CHUNK,
value: {
candidates: [
{
content: {
role: 'model',
parts: [
{ thought: true, text: 'reasoning...' },
{ text: 'final answer' },
],
},
},
],
} as GenerateContentResponse,
};
})();
mockSendMessageStream.mockResolvedValue(mockResponseStream);
const events = [];
const reqParts: Part[] = [{ text: 'Hi' }];
for await (const event of turn.run(
'test-model',
reqParts,
new AbortController().signal,
)) {
events.push(event);
}
expect(events).toEqual([
{
type: GeminiEventType.Thought,
value: { subject: '', description: 'reasoning...' },
},
]);
});
it('should emit thought descriptions per incoming chunk', async () => {
const mockResponseStream = (async function* () {
yield {
type: StreamEventType.CHUNK,
value: {
candidates: [
{
content: {
role: 'model',
parts: [{ thought: true, text: 'part1' }],
},
},
],
} as GenerateContentResponse,
};
yield {
type: StreamEventType.CHUNK,
value: {
candidates: [
{
content: {
role: 'model',
parts: [{ thought: true, text: 'part2' }],
},
},
],
} as GenerateContentResponse,
};
})();
mockSendMessageStream.mockResolvedValue(mockResponseStream);
const events = [];
for await (const event of turn.run(
'test-model',
[{ text: 'Hi' }],
new AbortController().signal,
)) {
events.push(event);
}
expect(events).toEqual([
{
type: GeminiEventType.Thought,
value: { subject: '', description: 'part1' },
},
{
type: GeminiEventType.Thought,
value: { subject: '', description: 'part2' },
},
]);
});
it('should yield tool_call_request events for function calls', async () => {
const mockResponseStream = (async function* () {
yield {

View File

@@ -27,7 +27,7 @@ import {
toFriendlyError,
} from '../utils/errors.js';
import type { GeminiChat } from './geminiChat.js';
import { parseThought, type ThoughtSummary } from '../utils/thoughtUtils.js';
import { getThoughtText, type ThoughtSummary } from '../utils/thoughtUtils.js';
// Define a structure for tools passed to the server
export interface ServerTool {
@@ -266,12 +266,11 @@ export class Turn {
this.currentResponseId = resp.responseId;
}
const thoughtPart = resp.candidates?.[0]?.content?.parts?.[0];
if (thoughtPart?.thought) {
const thought = parseThought(thoughtPart.text ?? '');
const thoughtPart = getThoughtText(resp);
if (thoughtPart) {
yield {
type: GeminiEventType.Thought,
value: thought,
value: { subject: '', description: thoughtPart },
};
continue;
}

View File

@@ -102,7 +102,9 @@ export * from './tools/shell.js';
export * from './tools/web-search/index.js';
export * from './tools/read-many-files.js';
export * from './tools/mcp-client.js';
export * from './tools/mcp-client-manager.js';
export * from './tools/mcp-tool.js';
export * from './tools/sdk-control-client-transport.js';
export * from './tools/task.js';
export * from './tools/todoWrite.js';
export * from './tools/exitPlanMode.js';

View File

@@ -542,6 +542,39 @@ export class SessionService {
}
}
/**
* Options for building API history from conversation.
*/
export interface BuildApiHistoryOptions {
/**
* Whether to strip thought parts from the history.
* Thought parts are content parts that have `thought: true`.
* @default true
*/
stripThoughtsFromHistory?: boolean;
}
/**
* Strips thought parts from a Content object.
* Thought parts are identified by having `thought: true`.
* Returns null if the content only contained thought parts.
*/
function stripThoughtsFromContent(content: Content): Content | null {
if (!content.parts) return content;
const filteredParts = content.parts.filter((part) => !(part as Part).thought);
// If all parts were thoughts, remove the entire content
if (filteredParts.length === 0) {
return null;
}
return {
...content,
parts: filteredParts,
};
}
/**
* Builds the model-facing chat history (Content[]) from a reconstructed
* conversation. This keeps UI history intact while applying chat compression
@@ -555,7 +588,9 @@ export class SessionService {
*/
export function buildApiHistoryFromConversation(
conversation: ConversationRecord,
options: BuildApiHistoryOptions = {},
): Content[] {
const { stripThoughtsFromHistory = true } = options;
const { messages } = conversation;
let lastCompressionIndex = -1;
@@ -585,14 +620,26 @@ export function buildApiHistoryFromConversation(
}
}
if (stripThoughtsFromHistory) {
return baseHistory
.map(stripThoughtsFromContent)
.filter((content): content is Content => content !== null);
}
return baseHistory;
}
// Fallback: return linear messages as Content[]
return messages
const result = messages
.map((record) => record.message)
.filter((message): message is Content => message !== undefined)
.map((message) => structuredClone(message));
if (stripThoughtsFromHistory) {
return result
.map(stripThoughtsFromContent)
.filter((content): content is Content => content !== null);
}
return result;
}
/**

View File

@@ -5,6 +5,7 @@
*/
import type { Config, MCPServerConfig } from '../config/config.js';
import { isSdkMcpServerConfig } from '../config/config.js';
import type { ToolRegistry } from './tool-registry.js';
import type { PromptRegistry } from '../prompts/prompt-registry.js';
import {
@@ -12,6 +13,7 @@ import {
MCPDiscoveryState,
populateMcpServerCommand,
} from './mcp-client.js';
import type { SendSdkMcpMessage } from './mcp-client.js';
import { getErrorMessage } from '../utils/errors.js';
import type { EventEmitter } from 'node:events';
import type { WorkspaceContext } from '../utils/workspaceContext.js';
@@ -31,6 +33,7 @@ export class McpClientManager {
private readonly workspaceContext: WorkspaceContext;
private discoveryState: MCPDiscoveryState = MCPDiscoveryState.NOT_STARTED;
private readonly eventEmitter?: EventEmitter;
private readonly sendSdkMcpMessage?: SendSdkMcpMessage;
constructor(
mcpServers: Record<string, MCPServerConfig>,
@@ -40,6 +43,7 @@ export class McpClientManager {
debugMode: boolean,
workspaceContext: WorkspaceContext,
eventEmitter?: EventEmitter,
sendSdkMcpMessage?: SendSdkMcpMessage,
) {
this.mcpServers = mcpServers;
this.mcpServerCommand = mcpServerCommand;
@@ -48,6 +52,7 @@ export class McpClientManager {
this.debugMode = debugMode;
this.workspaceContext = workspaceContext;
this.eventEmitter = eventEmitter;
this.sendSdkMcpMessage = sendSdkMcpMessage;
}
/**
@@ -71,6 +76,11 @@ export class McpClientManager {
this.eventEmitter?.emit('mcp-client-update', this.clients);
const discoveryPromises = Object.entries(servers).map(
async ([name, config]) => {
// For SDK MCP servers, pass the sendSdkMcpMessage callback
const sdkCallback = isSdkMcpServerConfig(config)
? this.sendSdkMcpMessage
: undefined;
const client = new McpClient(
name,
config,
@@ -78,6 +88,7 @@ export class McpClientManager {
this.promptRegistry,
this.workspaceContext,
this.debugMode,
sdkCallback,
);
this.clients.set(name, client);

View File

@@ -13,6 +13,7 @@ import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/
import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import type {
GetPromptResult,
JSONRPCMessage,
Prompt,
} from '@modelcontextprotocol/sdk/types.js';
import {
@@ -22,10 +23,11 @@ import {
} from '@modelcontextprotocol/sdk/types.js';
import { parse } from 'shell-quote';
import type { Config, MCPServerConfig } from '../config/config.js';
import { AuthProviderType } from '../config/config.js';
import { AuthProviderType, isSdkMcpServerConfig } from '../config/config.js';
import { GoogleCredentialProvider } from '../mcp/google-auth-provider.js';
import { ServiceAccountImpersonationProvider } from '../mcp/sa-impersonation-provider.js';
import { DiscoveredMCPTool } from './mcp-tool.js';
import { SdkControlClientTransport } from './sdk-control-client-transport.js';
import type { FunctionDeclaration } from '@google/genai';
import { mcpToTool } from '@google/genai';
@@ -42,6 +44,14 @@ import type {
} from '../utils/workspaceContext.js';
import type { ToolRegistry } from './tool-registry.js';
/**
* Callback type for sending MCP messages to SDK servers via control plane
*/
export type SendSdkMcpMessage = (
serverName: string,
message: JSONRPCMessage,
) => Promise<JSONRPCMessage>;
export const MCP_DEFAULT_TIMEOUT_MSEC = 10 * 60 * 1000; // default to 10 minutes
export type DiscoveredMCPPrompt = Prompt & {
@@ -92,6 +102,7 @@ export class McpClient {
private readonly promptRegistry: PromptRegistry,
private readonly workspaceContext: WorkspaceContext,
private readonly debugMode: boolean,
private readonly sendSdkMcpMessage?: SendSdkMcpMessage,
) {
this.client = new Client({
name: `qwen-cli-mcp-client-${this.serverName}`,
@@ -189,7 +200,12 @@ export class McpClient {
}
private async createTransport(): Promise<Transport> {
return createTransport(this.serverName, this.serverConfig, this.debugMode);
return createTransport(
this.serverName,
this.serverConfig,
this.debugMode,
this.sendSdkMcpMessage,
);
}
private async discoverTools(cliConfig: Config): Promise<DiscoveredMCPTool[]> {
@@ -501,6 +517,7 @@ export function populateMcpServerCommand(
* @param mcpServerName The name identifier for this MCP server
* @param mcpServerConfig Configuration object containing connection details
* @param toolRegistry The registry to register discovered tools with
* @param sendSdkMcpMessage Optional callback for SDK MCP servers to route messages via control plane.
* @returns Promise that resolves when discovery is complete
*/
export async function connectAndDiscover(
@@ -511,6 +528,7 @@ export async function connectAndDiscover(
debugMode: boolean,
workspaceContext: WorkspaceContext,
cliConfig: Config,
sendSdkMcpMessage?: SendSdkMcpMessage,
): Promise<void> {
updateMCPServerStatus(mcpServerName, MCPServerStatus.CONNECTING);
@@ -521,6 +539,7 @@ export async function connectAndDiscover(
mcpServerConfig,
debugMode,
workspaceContext,
sendSdkMcpMessage,
);
mcpClient.onerror = (error) => {
@@ -744,6 +763,7 @@ export function hasNetworkTransport(config: MCPServerConfig): boolean {
*
* @param mcpServerName The name of the MCP server, used for logging and identification.
* @param mcpServerConfig The configuration specifying how to connect to the server.
* @param sendSdkMcpMessage Optional callback for SDK MCP servers to route messages via control plane.
* @returns A promise that resolves to a connected MCP `Client` instance.
* @throws An error if the connection fails or the configuration is invalid.
*/
@@ -752,6 +772,7 @@ export async function connectToMcpServer(
mcpServerConfig: MCPServerConfig,
debugMode: boolean,
workspaceContext: WorkspaceContext,
sendSdkMcpMessage?: SendSdkMcpMessage,
): Promise<Client> {
const mcpClient = new Client({
name: 'qwen-code-mcp-client',
@@ -808,6 +829,7 @@ export async function connectToMcpServer(
mcpServerName,
mcpServerConfig,
debugMode,
sendSdkMcpMessage,
);
try {
await mcpClient.connect(transport, {
@@ -1172,7 +1194,21 @@ export async function createTransport(
mcpServerName: string,
mcpServerConfig: MCPServerConfig,
debugMode: boolean,
sendSdkMcpMessage?: SendSdkMcpMessage,
): Promise<Transport> {
if (isSdkMcpServerConfig(mcpServerConfig)) {
if (!sendSdkMcpMessage) {
throw new Error(
`SDK MCP server '${mcpServerName}' requires sendSdkMcpMessage callback`,
);
}
return new SdkControlClientTransport({
serverName: mcpServerName,
sendMcpMessage: sendSdkMcpMessage,
debugMode,
});
}
if (
mcpServerConfig.authProviderType ===
AuthProviderType.SERVICE_ACCOUNT_IMPERSONATION

View File

@@ -0,0 +1,163 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* SdkControlClientTransport - MCP Client transport for SDK MCP servers
*
* This transport enables CLI's MCP client to connect to SDK MCP servers
* through the control plane. Messages are routed:
*
* CLI MCP Client → SdkControlClientTransport → sendMcpMessage() →
* control_request (mcp_message) → SDK → control_response → onmessage → CLI
*
* Unlike StdioClientTransport which spawns a subprocess, this transport
* communicates with SDK MCP servers running in the SDK process.
*/
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
/**
* Callback to send MCP messages to SDK via control plane
* Returns the MCP response from the SDK
*/
export type SendMcpMessageCallback = (
serverName: string,
message: JSONRPCMessage,
) => Promise<JSONRPCMessage>;
export interface SdkControlClientTransportOptions {
serverName: string;
sendMcpMessage: SendMcpMessageCallback;
debugMode?: boolean;
}
/**
* MCP Client Transport for SDK MCP servers
*
* Implements the @modelcontextprotocol/sdk Transport interface to enable
* CLI's MCP client to connect to SDK MCP servers via the control plane.
*/
export class SdkControlClientTransport {
private serverName: string;
private sendMcpMessage: SendMcpMessageCallback;
private debugMode: boolean;
private started = false;
// Transport interface callbacks
onmessage?: (message: JSONRPCMessage) => void;
onerror?: (error: Error) => void;
onclose?: () => void;
constructor(options: SdkControlClientTransportOptions) {
this.serverName = options.serverName;
this.sendMcpMessage = options.sendMcpMessage;
this.debugMode = options.debugMode ?? false;
}
/**
* Start the transport
* For SDK transport, this just marks it as ready - no subprocess to spawn
*/
async start(): Promise<void> {
if (this.started) {
return;
}
this.started = true;
if (this.debugMode) {
console.error(
`[SdkControlClientTransport] Started for server '${this.serverName}'`,
);
}
}
/**
* Send a message to the SDK MCP server via control plane
*
* Routes the message through the control plane and delivers
* the response via onmessage callback.
*/
async send(message: JSONRPCMessage): Promise<void> {
if (!this.started) {
throw new Error(
`SdkControlClientTransport (${this.serverName}) not started. Call start() first.`,
);
}
if (this.debugMode) {
console.error(
`[SdkControlClientTransport] Sending message to '${this.serverName}':`,
JSON.stringify(message),
);
}
try {
// Send message to SDK and wait for response
const response = await this.sendMcpMessage(this.serverName, message);
if (this.debugMode) {
console.error(
`[SdkControlClientTransport] Received response from '${this.serverName}':`,
JSON.stringify(response),
);
}
// Deliver response via onmessage callback
if (this.onmessage) {
this.onmessage(response);
}
} catch (error) {
if (this.debugMode) {
console.error(
`[SdkControlClientTransport] Error sending to '${this.serverName}':`,
error,
);
}
if (this.onerror) {
this.onerror(error instanceof Error ? error : new Error(String(error)));
}
throw error;
}
}
/**
* Close the transport
*/
async close(): Promise<void> {
if (!this.started) {
return;
}
this.started = false;
if (this.debugMode) {
console.error(
`[SdkControlClientTransport] Closed for server '${this.serverName}'`,
);
}
if (this.onclose) {
this.onclose();
}
}
/**
* Check if transport is started
*/
isStarted(): boolean {
return this.started;
}
/**
* Get server name
*/
getServerName(): string {
return this.serverName;
}
}

View File

@@ -16,6 +16,7 @@ import type { Config } from '../config/config.js';
import { spawn } from 'node:child_process';
import { StringDecoder } from 'node:string_decoder';
import { connectAndDiscover } from './mcp-client.js';
import type { SendSdkMcpMessage } from './mcp-client.js';
import { McpClientManager } from './mcp-client-manager.js';
import { DiscoveredMCPTool } from './mcp-tool.js';
import { parse } from 'shell-quote';
@@ -173,7 +174,11 @@ export class ToolRegistry {
private config: Config;
private mcpClientManager: McpClientManager;
constructor(config: Config, eventEmitter?: EventEmitter) {
constructor(
config: Config,
eventEmitter?: EventEmitter,
sendSdkMcpMessage?: SendSdkMcpMessage,
) {
this.config = config;
this.mcpClientManager = new McpClientManager(
this.config.getMcpServers() ?? {},
@@ -183,6 +188,7 @@ export class ToolRegistry {
this.config.getDebugMode(),
this.config.getWorkspaceContext(),
eventEmitter,
sendSdkMcpMessage,
);
}

View File

@@ -4,6 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import type { GenerateContentResponse } from '@google/genai';
export type ThoughtSummary = {
subject: string;
description: string;
@@ -52,3 +54,23 @@ export function parseThought(rawText: string): ThoughtSummary {
return { subject, description };
}
export function getThoughtText(
response: GenerateContentResponse,
): string | null {
if (response.candidates && response.candidates.length > 0) {
const candidate = response.candidates[0];
if (
candidate.content &&
candidate.content.parts &&
candidate.content.parts.length > 0
) {
return candidate.content.parts
.filter((part) => part.thought)
.map((part) => part.text ?? '')
.join('');
}
}
return null;
}