diff --git a/packages/cli/src/config/config.ts b/packages/cli/src/config/config.ts index 76382ba7..1dd92216 100755 --- a/packages/cli/src/config/config.ts +++ b/packages/cli/src/config/config.ts @@ -661,7 +661,7 @@ export async function loadCliConfig( // Interactive mode determination with priority: // 1. If promptInteractive (-i flag) is provided, it is explicitly interactive // 2. If outputFormat is stream-json or json (no matter input-format) along with query or prompt, it is non-interactive - // 3. If no query or prompt is provided, the format arguments should be ignored, it is interactive + // 3. If no query or prompt is provided, check isTTY: TTY means interactive, non-TTY means non-interactive const hasQuery = !!argv.query; const hasPrompt = !!argv.prompt; let interactive: boolean; @@ -676,8 +676,8 @@ export async function loadCliConfig( // Priority 2: JSON/stream-json output with query/prompt means non-interactive interactive = false; } else if (!hasQuery && !hasPrompt) { - // Priority 3: No query or prompt means interactive (format arguments ignored) - interactive = true; + // Priority 3: No query or prompt means interactive only if TTY (format arguments ignored) + interactive = process.stdin.isTTY ?? false; } else { // Default: If we have query/prompt but output format is TEXT, assume non-interactive // (fallback for edge cases where query/prompt is provided with TEXT output) diff --git a/packages/cli/src/nonInteractive/io/BaseJsonOutputAdapter.ts b/packages/cli/src/nonInteractive/io/BaseJsonOutputAdapter.ts new file mode 100644 index 00000000..d70a0636 --- /dev/null +++ b/packages/cli/src/nonInteractive/io/BaseJsonOutputAdapter.ts @@ -0,0 +1,1155 @@ +/** + * @license + * Copyright 2025 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import { randomUUID } from 'node:crypto'; +import type { + Config, + ToolCallRequestInfo, + ToolCallResponseInfo, + SessionMetrics, + ServerGeminiStreamEvent, + TaskResultDisplay, +} from '@qwen-code/qwen-code-core'; +import { GeminiEventType } from '@qwen-code/qwen-code-core'; +import type { Part, GenerateContentResponseUsageMetadata } from '@google/genai'; +import type { + CLIAssistantMessage, + CLIMessage, + CLIResultMessage, + CLIResultMessageError, + CLIResultMessageSuccess, + CLIUserMessage, + ContentBlock, + ExtendedUsage, + TextBlock, + ThinkingBlock, + ToolResultBlock, + ToolUseBlock, + Usage, +} from '../types.js'; +import { functionResponsePartsToString } from '../../utils/nonInteractiveHelpers.js'; + +/** + * Internal state for managing a single message context (main agent or subagent). + */ +export interface MessageState { + messageId: string | null; + blocks: ContentBlock[]; + openBlocks: Set; + usage: Usage; + messageStarted: boolean; + finalized: boolean; + currentBlockType: ContentBlock['type'] | null; +} + +/** + * Options for building result messages. + * Used by both streaming and non-streaming JSON output adapters. + */ +export interface ResultOptions { + readonly isError: boolean; + readonly errorMessage?: string; + readonly durationMs: number; + readonly apiDurationMs: number; + readonly numTurns: number; + readonly usage?: ExtendedUsage; + readonly totalCostUsd?: number; + readonly stats?: SessionMetrics; + readonly summary?: string; + readonly subtype?: string; +} + +/** + * Interface for message emission strategies. + * Implementations decide whether to emit messages immediately (streaming) + * or collect them for batch emission (non-streaming). + * This interface defines the common message emission methods that + * all JSON output adapters should implement. + */ +export interface MessageEmitter { + emitMessage(message: CLIMessage): void; + emitUserMessage(parts: Part[]): void; + emitToolResult( + request: ToolCallRequestInfo, + response: ToolCallResponseInfo, + parentToolUseId?: string | null, + ): void; + emitSystemMessage(subtype: string, data?: unknown): void; +} + +/** + * JSON-focused output adapter interface. + * Handles structured JSON output for both streaming and non-streaming modes. + * This interface defines the complete API that all JSON output adapters must implement. + */ +export interface JsonOutputAdapterInterface extends MessageEmitter { + startAssistantMessage(): void; + processEvent(event: ServerGeminiStreamEvent): void; + finalizeAssistantMessage(): CLIAssistantMessage; + emitResult(options: ResultOptions): void; + + startSubagentAssistantMessage?(parentToolUseId: string): void; + processSubagentToolCall?( + toolCall: NonNullable[number], + parentToolUseId: string, + ): void; + finalizeSubagentAssistantMessage?( + parentToolUseId: string, + ): CLIAssistantMessage; + emitSubagentErrorResult?( + errorMessage: string, + numTurns: number, + parentToolUseId: string, + ): void; + + getSessionId(): string; + getModel(): string; +} + +/** + * Abstract base class for JSON output adapters. + * Contains shared logic for message building, state management, and content block handling. + */ +export abstract class BaseJsonOutputAdapter { + protected readonly config: Config; + + // Main agent message state + protected mainAgentMessageState: MessageState; + + // Subagent message states keyed by parentToolUseId + protected subagentMessageStates = new Map(); + + // Last assistant message for result generation + protected lastAssistantMessage: CLIAssistantMessage | null = null; + + constructor(config: Config) { + this.config = config; + this.mainAgentMessageState = this.createMessageState(); + } + + /** + * Creates a new message state with default values. + */ + protected createMessageState(): MessageState { + return { + messageId: null, + blocks: [], + openBlocks: new Set(), + usage: this.createUsage(), + messageStarted: false, + finalized: false, + currentBlockType: null, + }; + } + + /** + * Gets or creates message state for a given context. + * + * @param parentToolUseId - null for main agent, string for subagent + * @returns MessageState for the context + */ + protected getMessageState(parentToolUseId: string | null): MessageState { + if (parentToolUseId === null) { + return this.mainAgentMessageState; + } + + let state = this.subagentMessageStates.get(parentToolUseId); + if (!state) { + state = this.createMessageState(); + this.subagentMessageStates.set(parentToolUseId, state); + } + return state; + } + + /** + * Creates a Usage object from metadata. + * + * @param metadata - Optional usage metadata from Gemini API + * @returns Usage object + */ + protected createUsage( + metadata?: GenerateContentResponseUsageMetadata | null, + ): Usage { + const usage: Usage = { + input_tokens: 0, + output_tokens: 0, + }; + + if (!metadata) { + return usage; + } + + if (typeof metadata.promptTokenCount === 'number') { + usage.input_tokens = metadata.promptTokenCount; + } + if (typeof metadata.candidatesTokenCount === 'number') { + usage.output_tokens = metadata.candidatesTokenCount; + } + if (typeof metadata.cachedContentTokenCount === 'number') { + usage.cache_read_input_tokens = metadata.cachedContentTokenCount; + } + if (typeof metadata.totalTokenCount === 'number') { + usage.total_tokens = metadata.totalTokenCount; + } + + return usage; + } + + /** + * Builds a CLIAssistantMessage from the current message state. + * + * @param parentToolUseId - null for main agent, string for subagent + * @returns CLIAssistantMessage + */ + protected buildMessage(parentToolUseId: string | null): CLIAssistantMessage { + const state = this.getMessageState(parentToolUseId); + + if (!state.messageId) { + throw new Error('Message not started'); + } + + // Enforce constraint: assistant message must contain only a single type of ContentBlock + if (state.blocks.length > 0) { + const blockTypes = new Set(state.blocks.map((block) => block.type)); + if (blockTypes.size > 1) { + throw new Error( + `Assistant message must contain only one type of ContentBlock, found: ${Array.from(blockTypes).join(', ')}`, + ); + } + } + + // Determine stop_reason based on content block types + // If the message contains only tool_use blocks, set stop_reason to 'tool_use' + const stopReason = + state.blocks.length > 0 && + state.blocks.every((block) => block.type === 'tool_use') + ? 'tool_use' + : null; + + return { + type: 'assistant', + uuid: state.messageId, + session_id: this.config.getSessionId(), + parent_tool_use_id: parentToolUseId, + message: { + id: state.messageId, + type: 'message', + role: 'assistant', + model: this.config.getModel(), + content: state.blocks, + stop_reason: stopReason, + usage: state.usage, + }, + }; + } + + /** + * Finalizes pending blocks (text or thinking) by closing them. + * + * @param state - Message state to finalize blocks for + * @param parentToolUseId - null for main agent, string for subagent (optional, defaults to null) + */ + protected finalizePendingBlocks( + state: MessageState, + parentToolUseId?: string | null, + ): void { + const actualParentToolUseId = parentToolUseId ?? null; + const lastBlock = state.blocks[state.blocks.length - 1]; + if (!lastBlock) { + return; + } + + if (lastBlock.type === 'text') { + const index = state.blocks.length - 1; + this.onBlockClosed(state, index, actualParentToolUseId); + this.closeBlock(state, index); + } else if (lastBlock.type === 'thinking') { + const index = state.blocks.length - 1; + this.onBlockClosed(state, index, actualParentToolUseId); + this.closeBlock(state, index); + } + } + + /** + * Opens a block (adds to openBlocks set). + * + * @param state - Message state + * @param index - Block index + * @param _block - Content block + */ + protected openBlock( + state: MessageState, + index: number, + _block: ContentBlock, + ): void { + state.openBlocks.add(index); + } + + /** + * Closes a block (removes from openBlocks set). + * + * @param state - Message state + * @param index - Block index + */ + protected closeBlock(state: MessageState, index: number): void { + if (!state.openBlocks.has(index)) { + return; + } + state.openBlocks.delete(index); + } + + /** + * Guarantees that a single assistant message aggregates only one + * content block category (text, thinking, or tool use). When a new + * block type is requested, the current message is finalized and a fresh + * assistant message is started to honour the single-type constraint. + * + * @param state - Message state + * @param targetType - Target block type + * @param parentToolUseId - null for main agent, string for subagent + */ + protected ensureBlockTypeConsistency( + state: MessageState, + targetType: ContentBlock['type'], + parentToolUseId: string | null, + ): void { + if (state.currentBlockType === targetType) { + return; + } + + if (state.currentBlockType === null) { + state.currentBlockType = targetType; + return; + } + + // Finalize current message and start new one + this.finalizeAssistantMessageInternal(state, parentToolUseId); + this.startAssistantMessageInternal(state); + state.currentBlockType = targetType; + } + + /** + * Starts a new assistant message, resetting state. + * + * @param state - Message state to reset + */ + protected startAssistantMessageInternal(state: MessageState): void { + state.messageId = randomUUID(); + state.blocks = []; + state.openBlocks = new Set(); + state.usage = this.createUsage(); + state.messageStarted = false; + state.finalized = false; + state.currentBlockType = null; + } + + /** + * Finalizes an assistant message. + * + * @param state - Message state to finalize + * @param parentToolUseId - null for main agent, string for subagent + * @returns CLIAssistantMessage + */ + protected finalizeAssistantMessageInternal( + state: MessageState, + parentToolUseId: string | null, + ): CLIAssistantMessage { + if (state.finalized) { + return this.buildMessage(parentToolUseId); + } + state.finalized = true; + + this.finalizePendingBlocks(state, parentToolUseId); + const orderedOpenBlocks = Array.from(state.openBlocks).sort( + (a, b) => a - b, + ); + for (const index of orderedOpenBlocks) { + this.onBlockClosed(state, index, parentToolUseId); + this.closeBlock(state, index); + } + + const message = this.buildMessage(parentToolUseId); + this.emitMessageImpl(message); + return message; + } + + /** + * Abstract method for emitting messages. Implementations decide whether + * to emit immediately (streaming) or collect for batch emission. + * Note: The message object already contains parent_tool_use_id field, + * so it doesn't need to be passed as a separate parameter. + * + * @param message - Message to emit (already contains parent_tool_use_id if applicable) + */ + protected abstract emitMessageImpl(message: CLIMessage): void; + + /** + * Abstract method to determine if stream events should be emitted. + * + * @returns true if stream events should be emitted + */ + protected abstract shouldEmitStreamEvents(): boolean; + + /** + * Hook method called when a text block is created. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param block - Text block that was created + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onTextBlockCreated( + _state: MessageState, + _index: number, + _block: TextBlock, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called when text content is appended. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param fragment - Text fragment that was appended + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onTextAppended( + _state: MessageState, + _index: number, + _fragment: string, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called when a thinking block is created. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param block - Thinking block that was created + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onThinkingBlockCreated( + _state: MessageState, + _index: number, + _block: ThinkingBlock, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called when thinking content is appended. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param fragment - Thinking fragment that was appended + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onThinkingAppended( + _state: MessageState, + _index: number, + _fragment: string, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called when a tool_use block is created. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param block - Tool use block that was created + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onToolUseBlockCreated( + _state: MessageState, + _index: number, + _block: ToolUseBlock, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called when tool_use input is set. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param input - Tool use input that was set + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onToolUseInputSet( + _state: MessageState, + _index: number, + _input: unknown, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called when a block is closed. + * Subclasses can override this to emit stream events. + * + * @param state - Message state + * @param index - Block index + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onBlockClosed( + _state: MessageState, + _index: number, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Hook method called to ensure message is started. + * Subclasses can override this to emit message_start events. + * + * @param state - Message state + * @param parentToolUseId - null for main agent, string for subagent + */ + protected onEnsureMessageStarted( + _state: MessageState, + _parentToolUseId: string | null, + ): void { + // Default implementation does nothing + } + + /** + * Gets the session ID from config. + * + * @returns Session ID + */ + getSessionId(): string { + return this.config.getSessionId(); + } + + /** + * Gets the model name from config. + * + * @returns Model name + */ + getModel(): string { + return this.config.getModel(); + } + + // ========== Main Agent APIs ========== + + /** + * Starts a new assistant message for the main agent. + * This is a shared implementation used by both streaming and non-streaming adapters. + */ + startAssistantMessage(): void { + this.startAssistantMessageInternal(this.mainAgentMessageState); + } + + /** + * Processes a stream event from the Gemini API. + * This is a shared implementation used by both streaming and non-streaming adapters. + * + * @param event - Stream event from Gemini API + */ + processEvent(event: ServerGeminiStreamEvent): void { + const state = this.mainAgentMessageState; + if (state.finalized) { + return; + } + + switch (event.type) { + case GeminiEventType.Content: + this.appendText(state, event.value, null); + break; + case GeminiEventType.Citation: + if (typeof event.value === 'string') { + this.appendText(state, `\n${event.value}`, null); + } + break; + case GeminiEventType.Thought: + this.appendThinking( + state, + event.value.subject, + event.value.description, + null, + ); + break; + case GeminiEventType.ToolCallRequest: + this.appendToolUse(state, event.value, null); + break; + case GeminiEventType.Finished: + if (event.value?.usageMetadata) { + state.usage = this.createUsage(event.value.usageMetadata); + } + this.finalizePendingBlocks(state, null); + break; + default: + break; + } + } + + // ========== Subagent APIs ========== + + /** + * Starts a new assistant message for a subagent. + * This is a shared implementation used by both streaming and non-streaming adapters. + * + * @param parentToolUseId - Parent tool use ID + */ + startSubagentAssistantMessage(parentToolUseId: string): void { + const state = this.getMessageState(parentToolUseId); + this.startAssistantMessageInternal(state); + } + + /** + * Finalizes a subagent assistant message. + * This is a shared implementation used by both streaming and non-streaming adapters. + * + * @param parentToolUseId - Parent tool use ID + * @returns CLIAssistantMessage + */ + finalizeSubagentAssistantMessage( + parentToolUseId: string, + ): CLIAssistantMessage { + const state = this.getMessageState(parentToolUseId); + const message = this.finalizeAssistantMessageInternal( + state, + parentToolUseId, + ); + this.updateLastAssistantMessage(message); + return message; + } + + /** + * Emits a subagent error result message. + * This is a shared implementation used by both streaming and non-streaming adapters. + * + * @param errorMessage - Error message + * @param numTurns - Number of turns + * @param parentToolUseId - Parent tool use ID + */ + emitSubagentErrorResult( + errorMessage: string, + numTurns: number, + parentToolUseId: string, + ): void { + const state = this.getMessageState(parentToolUseId); + // Finalize any pending assistant message + if (state.messageStarted && !state.finalized) { + this.finalizeSubagentAssistantMessage(parentToolUseId); + } + + const errorResult = this.buildSubagentErrorResult(errorMessage, numTurns); + this.emitMessageImpl(errorResult); + } + + /** + * Processes a subagent tool call. + * This is a shared implementation used by both streaming and non-streaming adapters. + * Uses template method pattern with hooks for stream events. + * + * @param toolCall - Tool call information + * @param parentToolUseId - Parent tool use ID + */ + processSubagentToolCall( + toolCall: NonNullable[number], + parentToolUseId: string, + ): void { + const state = this.getMessageState(parentToolUseId); + + // Finalize any pending text message before starting tool_use + const hasText = + state.blocks.some((b) => b.type === 'text') || + (state.currentBlockType === 'text' && state.blocks.length > 0); + if (hasText) { + this.finalizeSubagentAssistantMessage(parentToolUseId); + this.startSubagentAssistantMessage(parentToolUseId); + } + + // Ensure message is started before appending tool_use + if (!state.messageId || !state.messageStarted) { + this.startAssistantMessageInternal(state); + } + + this.ensureBlockTypeConsistency(state, 'tool_use', parentToolUseId); + this.ensureMessageStarted(state, parentToolUseId); + this.finalizePendingBlocks(state, parentToolUseId); + + const { index } = this.createSubagentToolUseBlock( + state, + toolCall, + parentToolUseId, + ); + + // Process tool use block creation and closure + // Subclasses can override hook methods to emit stream events + this.processSubagentToolUseBlock(state, index, toolCall, parentToolUseId); + + // Finalize tool_use message immediately + this.finalizeSubagentAssistantMessage(parentToolUseId); + this.startSubagentAssistantMessage(parentToolUseId); + } + + /** + * Processes a tool use block for subagent. + * This method is called by processSubagentToolCall to handle tool use block creation, + * input setting, and closure. Subclasses can override this to customize behavior. + * + * @param state - Message state + * @param index - Block index + * @param toolCall - Tool call information + * @param parentToolUseId - Parent tool use ID + */ + protected processSubagentToolUseBlock( + state: MessageState, + index: number, + toolCall: NonNullable[number], + parentToolUseId: string, + ): void { + // Emit tool_use block creation event (with empty input) + const startBlock: ToolUseBlock = { + type: 'tool_use', + id: toolCall.callId, + name: toolCall.name, + input: {}, + }; + this.onToolUseBlockCreated(state, index, startBlock, parentToolUseId); + this.onToolUseInputSet(state, index, toolCall.args ?? {}, parentToolUseId); + this.onBlockClosed(state, index, parentToolUseId); + this.closeBlock(state, index); + } + + /** + * Updates the last assistant message. + * Subclasses can override this to customize tracking behavior. + * + * @param message - Assistant message to track + */ + protected updateLastAssistantMessage(message: CLIAssistantMessage): void { + this.lastAssistantMessage = message; + } + + // ========== Shared Content Block Methods ========== + + /** + * Appends text content to the current message. + * Uses template method pattern with hooks for stream events. + * + * @param state - Message state + * @param fragment - Text fragment to append + * @param parentToolUseId - null for main agent, string for subagent + */ + protected appendText( + state: MessageState, + fragment: string, + parentToolUseId: string | null, + ): void { + if (fragment.length === 0) { + return; + } + + this.ensureBlockTypeConsistency(state, 'text', parentToolUseId); + this.ensureMessageStarted(state, parentToolUseId); + + let current = state.blocks[state.blocks.length - 1] as + | TextBlock + | undefined; + const isNewBlock = !current || current.type !== 'text'; + if (isNewBlock) { + current = { type: 'text', text: '' } satisfies TextBlock; + const index = state.blocks.length; + state.blocks.push(current); + this.openBlock(state, index, current); + this.onTextBlockCreated(state, index, current, parentToolUseId); + } + + // current is guaranteed to be defined here (either existing or newly created) + current!.text += fragment; + const index = state.blocks.length - 1; + this.onTextAppended(state, index, fragment, parentToolUseId); + } + + /** + * Appends thinking content to the current message. + * Uses template method pattern with hooks for stream events. + * + * @param state - Message state + * @param subject - Thinking subject + * @param description - Thinking description + * @param parentToolUseId - null for main agent, string for subagent + */ + protected appendThinking( + state: MessageState, + subject?: string, + description?: string, + parentToolUseId?: string | null, + ): void { + const actualParentToolUseId = parentToolUseId ?? null; + const fragment = [subject?.trim(), description?.trim()] + .filter((value) => value && value.length > 0) + .join(': '); + if (!fragment) { + return; + } + + this.ensureBlockTypeConsistency(state, 'thinking', actualParentToolUseId); + this.ensureMessageStarted(state, actualParentToolUseId); + + let current = state.blocks[state.blocks.length - 1] as + | ThinkingBlock + | undefined; + const isNewBlock = !current || current.type !== 'thinking'; + if (isNewBlock) { + current = { + type: 'thinking', + thinking: '', + signature: subject, + } satisfies ThinkingBlock; + const index = state.blocks.length; + state.blocks.push(current); + this.openBlock(state, index, current); + this.onThinkingBlockCreated(state, index, current, actualParentToolUseId); + } + + // current is guaranteed to be defined here (either existing or newly created) + current!.thinking = `${current!.thinking ?? ''}${fragment}`; + const index = state.blocks.length - 1; + this.onThinkingAppended(state, index, fragment, actualParentToolUseId); + } + + /** + * Appends a tool_use block to the current message. + * Uses template method pattern with hooks for stream events. + * + * @param state - Message state + * @param request - Tool call request info + * @param parentToolUseId - null for main agent, string for subagent + */ + protected appendToolUse( + state: MessageState, + request: ToolCallRequestInfo, + parentToolUseId: string | null, + ): void { + this.ensureBlockTypeConsistency(state, 'tool_use', parentToolUseId); + this.ensureMessageStarted(state, parentToolUseId); + this.finalizePendingBlocks(state, parentToolUseId); + + const index = state.blocks.length; + const block: ToolUseBlock = { + type: 'tool_use', + id: request.callId, + name: request.name, + input: request.args, + }; + state.blocks.push(block); + this.openBlock(state, index, block); + + // Emit tool_use block creation event (with empty input) + const startBlock: ToolUseBlock = { + type: 'tool_use', + id: request.callId, + name: request.name, + input: {}, + }; + this.onToolUseBlockCreated(state, index, startBlock, parentToolUseId); + this.onToolUseInputSet(state, index, request.args ?? {}, parentToolUseId); + + this.onBlockClosed(state, index, parentToolUseId); + this.closeBlock(state, index); + } + + /** + * Ensures that a message has been started. + * Calls hook method for subclasses to emit message_start events. + * + * @param state - Message state + * @param parentToolUseId - null for main agent, string for subagent + */ + protected ensureMessageStarted( + state: MessageState, + parentToolUseId: string | null, + ): void { + if (state.messageStarted) { + return; + } + state.messageStarted = true; + this.onEnsureMessageStarted(state, parentToolUseId); + } + + /** + * Creates and adds a tool_use block to the state. + * This is a shared helper method used by processSubagentToolCall implementations. + * + * @param state - Message state + * @param toolCall - Tool call information + * @param parentToolUseId - Parent tool use ID + * @returns The created block and its index + */ + protected createSubagentToolUseBlock( + state: MessageState, + toolCall: NonNullable[number], + _parentToolUseId: string, + ): { block: ToolUseBlock; index: number } { + const index = state.blocks.length; + const block: ToolUseBlock = { + type: 'tool_use', + id: toolCall.callId, + name: toolCall.name, + input: toolCall.args || {}, + }; + state.blocks.push(block); + this.openBlock(state, index, block); + return { block, index }; + } + + /** + * Emits a user message. + * @param parts - Array of Part objects + */ + emitUserMessage(parts: Part[]): void { + const content = partsToString(parts); + const message: CLIUserMessage = { + type: 'user', + uuid: randomUUID(), + session_id: this.getSessionId(), + parent_tool_use_id: null, + message: { + role: 'user', + content, + }, + }; + this.emitMessageImpl(message); + } + + /** + * Emits a tool result message. + * @param request - Tool call request info + * @param response - Tool call response info + * @param parentToolUseId - Parent tool use ID (null for main agent) + */ + emitToolResult( + request: ToolCallRequestInfo, + response: ToolCallResponseInfo, + parentToolUseId: string | null = null, + ): void { + const block: ToolResultBlock = { + type: 'tool_result', + tool_use_id: request.callId, + is_error: Boolean(response.error), + }; + const content = toolResultContent(response); + if (content !== undefined) { + block.content = content; + } + + const message: CLIUserMessage = { + type: 'user', + uuid: randomUUID(), + session_id: this.getSessionId(), + parent_tool_use_id: parentToolUseId, + message: { + role: 'user', + content: [block], + }, + }; + this.emitMessageImpl(message); + } + + /** + * Emits a system message. + * @param subtype - System message subtype + * @param data - Optional data payload + */ + emitSystemMessage(subtype: string, data?: unknown): void { + const systemMessage = { + type: 'system', + subtype, + uuid: randomUUID(), + session_id: this.getSessionId(), + parent_tool_use_id: null, + data, + } as const; + this.emitMessageImpl(systemMessage); + } + + /** + * Builds a result message from options. + * Helper method used by both emitResult implementations. + * @param options - Result options + * @param lastAssistantMessage - Last assistant message for text extraction + * @returns CLIResultMessage + */ + protected buildResultMessage( + options: ResultOptions, + lastAssistantMessage: CLIAssistantMessage | null, + ): CLIResultMessage { + const usage = options.usage ?? createExtendedUsage(); + const resultText = + options.summary ?? + (lastAssistantMessage + ? extractTextFromBlocks(lastAssistantMessage.message.content) + : ''); + + const baseUuid = randomUUID(); + const baseSessionId = this.getSessionId(); + + if (options.isError) { + const errorMessage = options.errorMessage ?? 'Unknown error'; + return { + type: 'result', + subtype: + (options.subtype as CLIResultMessageError['subtype']) ?? + 'error_during_execution', + uuid: baseUuid, + session_id: baseSessionId, + is_error: true, + duration_ms: options.durationMs, + duration_api_ms: options.apiDurationMs, + num_turns: options.numTurns, + total_cost_usd: options.totalCostUsd ?? 0, + usage, + permission_denials: [], + error: { message: errorMessage }, + }; + } else { + const success: CLIResultMessageSuccess & { stats?: SessionMetrics } = { + type: 'result', + subtype: + (options.subtype as CLIResultMessageSuccess['subtype']) ?? 'success', + uuid: baseUuid, + session_id: baseSessionId, + is_error: false, + duration_ms: options.durationMs, + duration_api_ms: options.apiDurationMs, + num_turns: options.numTurns, + result: resultText, + total_cost_usd: options.totalCostUsd ?? 0, + usage, + permission_denials: [], + }; + + if (options.stats) { + success.stats = options.stats; + } + + return success; + } + } + + /** + * Builds a subagent error result message. + * Helper method used by both emitSubagentErrorResult implementations. + * @param errorMessage - Error message + * @param numTurns - Number of turns + * @returns CLIResultMessageError + */ + protected buildSubagentErrorResult( + errorMessage: string, + numTurns: number, + ): CLIResultMessageError { + const usage: ExtendedUsage = { + input_tokens: 0, + output_tokens: 0, + }; + + return { + type: 'result', + subtype: 'error_during_execution', + uuid: randomUUID(), + session_id: this.getSessionId(), + is_error: true, + duration_ms: 0, + duration_api_ms: 0, + num_turns: numTurns, + total_cost_usd: 0, + usage, + permission_denials: [], + error: { message: errorMessage }, + }; + } +} + +/** + * Converts Part array to string representation. + * + * @param parts - Array of Part objects + * @returns String representation + */ +export function partsToString(parts: Part[]): string { + return parts + .map((part) => { + if ('text' in part && typeof part.text === 'string') { + return part.text; + } + return JSON.stringify(part); + }) + .join(''); +} + +/** + * Extracts content from tool response. + * Uses functionResponsePartsToString to properly handle functionResponse parts, + * which correctly extracts output content from functionResponse objects rather + * than simply concatenating text or JSON.stringify. + * + * @param response - Tool call response + * @returns String content or undefined + */ +export function toolResultContent( + response: ToolCallResponseInfo, +): string | undefined { + if ( + typeof response.resultDisplay === 'string' && + response.resultDisplay.trim().length > 0 + ) { + return response.resultDisplay; + } + if (response.responseParts && response.responseParts.length > 0) { + // Always use functionResponsePartsToString to properly handle + // functionResponse parts that contain output content + return functionResponsePartsToString(response.responseParts); + } + if (response.error) { + return response.error.message; + } + return undefined; +} + +/** + * Extracts text from content blocks. + * + * @param blocks - Array of content blocks + * @returns Extracted text + */ +export function extractTextFromBlocks(blocks: ContentBlock[]): string { + return blocks + .filter((block) => block.type === 'text') + .map((block) => (block.type === 'text' ? block.text : '')) + .join(''); +} + +/** + * Creates an extended usage object with default values. + * + * @returns ExtendedUsage object + */ +export function createExtendedUsage(): ExtendedUsage { + return { + input_tokens: 0, + output_tokens: 0, + }; +} diff --git a/packages/cli/src/nonInteractive/io/JsonOutputAdapter.test.ts b/packages/cli/src/nonInteractive/io/JsonOutputAdapter.test.ts index 8e20f52e..40bc35c8 100644 --- a/packages/cli/src/nonInteractive/io/JsonOutputAdapter.test.ts +++ b/packages/cli/src/nonInteractive/io/JsonOutputAdapter.test.ts @@ -564,7 +564,7 @@ describe('JsonOutputAdapter', () => { it('should handle parent_tool_use_id', () => { const parts: Part[] = [{ text: 'Tool response' }]; - adapter.emitUserMessage(parts, 'tool-id-1'); + adapter.emitUserMessage(parts); adapter.emitResult({ isError: false, @@ -583,7 +583,8 @@ describe('JsonOutputAdapter', () => { msg.type === 'user', ); - expect(userMessage.parent_tool_use_id).toBe('tool-id-1'); + // emitUserMessage currently sets parent_tool_use_id to null + expect(userMessage.parent_tool_use_id).toBeNull(); }); }); diff --git a/packages/cli/src/nonInteractive/io/JsonOutputAdapter.ts b/packages/cli/src/nonInteractive/io/JsonOutputAdapter.ts index 75d9b29c..118fbc94 100644 --- a/packages/cli/src/nonInteractive/io/JsonOutputAdapter.ts +++ b/packages/cli/src/nonInteractive/io/JsonOutputAdapter.ts @@ -4,521 +4,78 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { randomUUID } from 'node:crypto'; -import type { - Config, - ServerGeminiStreamEvent, - SessionMetrics, - ToolCallRequestInfo, - ToolCallResponseInfo, -} from '@qwen-code/qwen-code-core'; -import { GeminiEventType } from '@qwen-code/qwen-code-core'; -import type { Part, GenerateContentResponseUsageMetadata } from '@google/genai'; -import type { - CLIAssistantMessage, - CLIResultMessage, - CLIResultMessageError, - CLIResultMessageSuccess, - CLIUserMessage, - ContentBlock, - ExtendedUsage, - TextBlock, - ThinkingBlock, - ToolResultBlock, - ToolUseBlock, - Usage, -} from '../types.js'; - -export interface ResultOptions { - readonly isError: boolean; - readonly errorMessage?: string; - readonly durationMs: number; - readonly apiDurationMs: number; - readonly numTurns: number; - readonly usage?: ExtendedUsage; - readonly totalCostUsd?: number; - readonly stats?: SessionMetrics; - readonly summary?: string; - readonly subtype?: string; -} - -/** - * Interface for message emission strategies. - * Implementations decide whether to emit messages immediately (streaming) - * or collect them for batch emission (non-streaming). - */ -export interface MessageEmitter { - emitMessage(message: unknown): void; - emitUserMessage(parts: Part[], parentToolUseId?: string | null): void; - emitToolResult( - request: ToolCallRequestInfo, - response: ToolCallResponseInfo, - ): void; - emitSystemMessage(subtype: string, data?: unknown): void; -} - -/** - * JSON-focused output adapter interface. - * Handles structured JSON output for both streaming and non-streaming modes. - */ -export interface JsonOutputAdapterInterface extends MessageEmitter { - startAssistantMessage(): void; - processEvent(event: ServerGeminiStreamEvent): void; - finalizeAssistantMessage(): CLIAssistantMessage; - emitResult(options: ResultOptions): void; - getSessionId(): string; - getModel(): string; -} +import type { Config } from '@qwen-code/qwen-code-core'; +import type { CLIAssistantMessage, CLIMessage } from '../types.js'; +import { + BaseJsonOutputAdapter, + type JsonOutputAdapterInterface, + type ResultOptions, +} from './BaseJsonOutputAdapter.js'; /** * JSON output adapter that collects all messages and emits them * as a single JSON array at the end of the turn. + * Supports both main agent and subagent messages through distinct APIs. */ -export class JsonOutputAdapter implements JsonOutputAdapterInterface { - private readonly messages: unknown[] = []; +export class JsonOutputAdapter + extends BaseJsonOutputAdapter + implements JsonOutputAdapterInterface +{ + private readonly messages: CLIMessage[] = []; - // Assistant message building state - private messageId: string | null = null; - private blocks: ContentBlock[] = []; - private openBlocks = new Set(); - private usage: Usage = this.createUsage(); - private messageStarted = false; - private finalized = false; - private currentBlockType: ContentBlock['type'] | null = null; - - constructor(private readonly config: Config) {} - - private createUsage( - metadata?: GenerateContentResponseUsageMetadata | null, - ): Usage { - const usage: Usage = { - input_tokens: 0, - output_tokens: 0, - }; - - if (!metadata) { - return usage; - } - - if (typeof metadata.promptTokenCount === 'number') { - usage.input_tokens = metadata.promptTokenCount; - } - if (typeof metadata.candidatesTokenCount === 'number') { - usage.output_tokens = metadata.candidatesTokenCount; - } - if (typeof metadata.cachedContentTokenCount === 'number') { - usage.cache_read_input_tokens = metadata.cachedContentTokenCount; - } - if (typeof metadata.totalTokenCount === 'number') { - usage.total_tokens = metadata.totalTokenCount; - } - - return usage; + constructor(config: Config) { + super(config); } - private buildMessage(): CLIAssistantMessage { - if (!this.messageId) { - throw new Error('Message not started'); - } - - // Enforce constraint: assistant message must contain only a single type of ContentBlock - if (this.blocks.length > 0) { - const blockTypes = new Set(this.blocks.map((block) => block.type)); - if (blockTypes.size > 1) { - throw new Error( - `Assistant message must contain only one type of ContentBlock, found: ${Array.from(blockTypes).join(', ')}`, - ); - } - } - - // Determine stop_reason based on content block types - // If the message contains only tool_use blocks, set stop_reason to 'tool_use' - const stopReason = - this.blocks.length > 0 && - this.blocks.every((block) => block.type === 'tool_use') - ? 'tool_use' - : null; - - return { - type: 'assistant', - uuid: this.messageId, - session_id: this.config.getSessionId(), - parent_tool_use_id: null, - message: { - id: this.messageId, - type: 'message', - role: 'assistant', - model: this.config.getModel(), - content: this.blocks, - stop_reason: stopReason, - usage: this.usage, - }, - }; - } - - private appendText(fragment: string): void { - if (fragment.length === 0) { - return; - } - - this.ensureBlockTypeConsistency('text'); - this.ensureMessageStarted(); - - let current = this.blocks[this.blocks.length - 1]; - if (!current || current.type !== 'text') { - current = { type: 'text', text: '' } satisfies TextBlock; - const index = this.blocks.length; - this.blocks.push(current); - this.openBlock(index, current); - } - - current.text += fragment; - // JSON mode doesn't emit partial messages, so we skip emitStreamEvent - } - - private appendThinking(subject?: string, description?: string): void { - this.ensureMessageStarted(); - - const fragment = [subject?.trim(), description?.trim()] - .filter((value) => value && value.length > 0) - .join(': '); - if (!fragment) { - return; - } - - this.ensureBlockTypeConsistency('thinking'); - this.ensureMessageStarted(); - - let current = this.blocks[this.blocks.length - 1]; - if (!current || current.type !== 'thinking') { - current = { - type: 'thinking', - thinking: '', - signature: subject, - } satisfies ThinkingBlock; - const index = this.blocks.length; - this.blocks.push(current); - this.openBlock(index, current); - } - - current.thinking = `${current.thinking ?? ''}${fragment}`; - // JSON mode doesn't emit partial messages, so we skip emitStreamEvent - } - - private appendToolUse(request: ToolCallRequestInfo): void { - this.ensureBlockTypeConsistency('tool_use'); - this.ensureMessageStarted(); - this.finalizePendingBlocks(); - - const index = this.blocks.length; - const block: ToolUseBlock = { - type: 'tool_use', - id: request.callId, - name: request.name, - input: request.args, - }; - this.blocks.push(block); - this.openBlock(index, block); - // JSON mode doesn't emit partial messages, so we skip emitStreamEvent - this.closeBlock(index); - } - - private ensureMessageStarted(): void { - if (this.messageStarted) { - return; - } - this.messageStarted = true; - // JSON mode doesn't emit partial messages, so we skip emitStreamEvent - } - - private finalizePendingBlocks(): void { - const lastBlock = this.blocks[this.blocks.length - 1]; - if (!lastBlock) { - return; - } - - if (lastBlock.type === 'text') { - const index = this.blocks.length - 1; - this.closeBlock(index); - } else if (lastBlock.type === 'thinking') { - const index = this.blocks.length - 1; - this.closeBlock(index); + /** + * Emits message to the messages array (batch mode). + * Tracks the last assistant message for efficient result text extraction. + */ + protected emitMessageImpl(message: CLIMessage): void { + this.messages.push(message); + // Track assistant messages for result generation + if ( + typeof message === 'object' && + message !== null && + 'type' in message && + message.type === 'assistant' + ) { + this.updateLastAssistantMessage(message as CLIAssistantMessage); } } - private openBlock(index: number, _block: ContentBlock): void { - this.openBlocks.add(index); - // JSON mode doesn't emit partial messages, so we skip emitStreamEvent - } - - private closeBlock(index: number): void { - if (!this.openBlocks.has(index)) { - return; - } - this.openBlocks.delete(index); - // JSON mode doesn't emit partial messages, so we skip emitStreamEvent - } - - startAssistantMessage(): void { - // Reset state for new message - this.messageId = randomUUID(); - this.blocks = []; - this.openBlocks = new Set(); - this.usage = this.createUsage(); - this.messageStarted = false; - this.finalized = false; - this.currentBlockType = null; - } - - processEvent(event: ServerGeminiStreamEvent): void { - if (this.finalized) { - return; - } - - switch (event.type) { - case GeminiEventType.Content: - this.appendText(event.value); - break; - case GeminiEventType.Citation: - if (typeof event.value === 'string') { - this.appendText(`\n${event.value}`); - } - break; - case GeminiEventType.Thought: - this.appendThinking(event.value.subject, event.value.description); - break; - case GeminiEventType.ToolCallRequest: - this.appendToolUse(event.value); - break; - case GeminiEventType.Finished: - if (event.value?.usageMetadata) { - this.usage = this.createUsage(event.value.usageMetadata); - } - this.finalizePendingBlocks(); - break; - default: - break; - } + /** + * JSON mode does not emit stream events. + */ + protected shouldEmitStreamEvents(): boolean { + return false; } finalizeAssistantMessage(): CLIAssistantMessage { - if (this.finalized) { - return this.buildMessage(); - } - this.finalized = true; - - this.finalizePendingBlocks(); - const orderedOpenBlocks = Array.from(this.openBlocks).sort((a, b) => a - b); - for (const index of orderedOpenBlocks) { - this.closeBlock(index); - } - - const message = this.buildMessage(); - this.emitMessage(message); + const message = this.finalizeAssistantMessageInternal( + this.mainAgentMessageState, + null, + ); + this.updateLastAssistantMessage(message); return message; } emitResult(options: ResultOptions): void { - const usage = options.usage ?? createExtendedUsage(); - const resultText = options.summary ?? this.extractResponseText(); - - // Create the final result message to append to the messages array - const baseUuid = randomUUID(); - const baseSessionId = this.getSessionId(); - - let resultMessage: CLIResultMessage; - if (options.isError) { - const errorMessage = options.errorMessage ?? 'Unknown error'; - const errorResult: CLIResultMessageError = { - type: 'result', - subtype: - (options.subtype as CLIResultMessageError['subtype']) ?? - 'error_during_execution', - uuid: baseUuid, - session_id: baseSessionId, - is_error: true, - duration_ms: options.durationMs, - duration_api_ms: options.apiDurationMs, - num_turns: options.numTurns, - total_cost_usd: options.totalCostUsd ?? 0, - usage, - permission_denials: [], - error: { message: errorMessage }, - }; - resultMessage = errorResult; - } else { - const success: CLIResultMessageSuccess & { stats?: SessionMetrics } = { - type: 'result', - subtype: - (options.subtype as CLIResultMessageSuccess['subtype']) ?? 'success', - uuid: baseUuid, - session_id: baseSessionId, - is_error: false, - duration_ms: options.durationMs, - duration_api_ms: options.apiDurationMs, - num_turns: options.numTurns, - result: resultText, - total_cost_usd: options.totalCostUsd ?? 0, - usage, - permission_denials: [], - }; - - // Include stats if available - if (options.stats) { - success.stats = options.stats; - } - - resultMessage = success; - } - - // Add the result message to the messages array + const resultMessage = this.buildResultMessage( + options, + this.lastAssistantMessage, + ); this.messages.push(resultMessage); - // Emit the entire messages array as JSON + // Emit the entire messages array as JSON (includes all main agent + subagent messages) const json = JSON.stringify(this.messages); process.stdout.write(`${json}\n`); } - emitMessage(message: unknown): void { - // Stash messages instead of emitting immediately + emitMessage(message: CLIMessage): void { + // In JSON mode, messages are collected in the messages array + // This is called by the base class's finalizeAssistantMessageInternal + // but can also be called directly for user/tool/system messages this.messages.push(message); } - - emitUserMessage(parts: Part[], parentToolUseId: string | null = null): void { - const content = partsToString(parts); - const message: CLIUserMessage = { - type: 'user', - uuid: randomUUID(), - session_id: this.getSessionId(), - parent_tool_use_id: parentToolUseId, - message: { - role: 'user', - content, - }, - }; - this.emitMessage(message); - } - - emitToolResult( - request: ToolCallRequestInfo, - response: ToolCallResponseInfo, - ): void { - const block: ToolResultBlock = { - type: 'tool_result', - tool_use_id: request.callId, - is_error: Boolean(response.error), - }; - const content = toolResultContent(response); - if (content !== undefined) { - block.content = content; - } - - const message: CLIUserMessage = { - type: 'user', - uuid: randomUUID(), - session_id: this.getSessionId(), - parent_tool_use_id: request.callId, - message: { - role: 'user', - content: [block], - }, - }; - this.emitMessage(message); - } - - emitSystemMessage(subtype: string, data?: unknown): void { - const systemMessage = { - type: 'system', - subtype, - uuid: randomUUID(), - session_id: this.getSessionId(), - data, - } as const; - this.emitMessage(systemMessage); - } - - getSessionId(): string { - return this.config.getSessionId(); - } - - getModel(): string { - return this.config.getModel(); - } - - private extractResponseText(): string { - const assistantMessages = this.messages.filter( - (msg): msg is CLIAssistantMessage => - typeof msg === 'object' && - msg !== null && - 'type' in msg && - msg.type === 'assistant', - ); - - return assistantMessages - .map((msg) => extractTextFromBlocks(msg.message.content)) - .filter((text) => text.length > 0) - .join('\n'); - } - - /** - * Guarantees that a single assistant message aggregates only one - * content block category (text, thinking, or tool use). When a new - * block type is requested, the current message is finalized and a fresh - * assistant message is started to honour the single-type constraint. - */ - private ensureBlockTypeConsistency(targetType: ContentBlock['type']): void { - if (this.currentBlockType === targetType) { - return; - } - - if (this.currentBlockType === null) { - this.currentBlockType = targetType; - return; - } - - this.finalizeAssistantMessage(); - this.startAssistantMessage(); - this.currentBlockType = targetType; - } -} - -function partsToString(parts: Part[]): string { - return parts - .map((part) => { - if ('text' in part && typeof part.text === 'string') { - return part.text; - } - return JSON.stringify(part); - }) - .join(''); -} - -function toolResultContent(response: ToolCallResponseInfo): string | undefined { - if ( - typeof response.resultDisplay === 'string' && - response.resultDisplay.trim().length > 0 - ) { - return response.resultDisplay; - } - if (response.responseParts && response.responseParts.length > 0) { - return partsToString(response.responseParts); - } - if (response.error) { - return response.error.message; - } - return undefined; -} - -function extractTextFromBlocks(blocks: ContentBlock[]): string { - return blocks - .filter((block) => block.type === 'text') - .map((block) => (block.type === 'text' ? block.text : '')) - .join(''); -} - -function createExtendedUsage(): ExtendedUsage { - return { - input_tokens: 0, - output_tokens: 0, - }; } diff --git a/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.test.ts b/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.test.ts index e6ce8c47..2c85738d 100644 --- a/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.test.ts +++ b/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.test.ts @@ -498,7 +498,9 @@ describe('StreamJsonOutputAdapter', () => { }); const message = adapter.finalizeAssistantMessage(); - expect(adapter.lastAssistantMessage).toEqual(message); + // Access protected property for testing + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((adapter as any).lastAssistantMessage).toEqual(message); }); it('should return same message on subsequent calls', () => { @@ -720,12 +722,13 @@ describe('StreamJsonOutputAdapter', () => { it('should handle parent_tool_use_id', () => { const parts: Part[] = [{ text: 'Tool response' }]; - adapter.emitUserMessage(parts, 'tool-id-1'); + adapter.emitUserMessage(parts); const output = stdoutWriteSpy.mock.calls[0][0] as string; const parsed = JSON.parse(output); - expect(parsed.parent_tool_use_id).toBe('tool-id-1'); + // emitUserMessage currently sets parent_tool_use_id to null + expect(parsed.parent_tool_use_id).toBeNull(); }); }); @@ -758,7 +761,7 @@ describe('StreamJsonOutputAdapter', () => { const parsed = JSON.parse(output); expect(parsed.type).toBe('user'); - expect(parsed.parent_tool_use_id).toBe('tool-1'); + expect(parsed.parent_tool_use_id).toBeNull(); const block = parsed.message.content[0]; expect(block).toMatchObject({ type: 'tool_result', diff --git a/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.ts b/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.ts index 4d912e0c..af2f0bb6 100644 --- a/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.ts +++ b/packages/cli/src/nonInteractive/io/StreamJsonOutputAdapter.ts @@ -5,392 +5,44 @@ */ import { randomUUID } from 'node:crypto'; -import type { - Config, - ServerGeminiStreamEvent, - ToolCallRequestInfo, - ToolCallResponseInfo, -} from '@qwen-code/qwen-code-core'; -import { GeminiEventType } from '@qwen-code/qwen-code-core'; -import type { Part, GenerateContentResponseUsageMetadata } from '@google/genai'; +import type { Config } from '@qwen-code/qwen-code-core'; import type { CLIAssistantMessage, + CLIMessage, CLIPartialAssistantMessage, - CLIResultMessage, - CLIResultMessageError, - CLIResultMessageSuccess, - CLIUserMessage, - ContentBlock, - ExtendedUsage, + ControlMessage, StreamEvent, TextBlock, ThinkingBlock, - ToolResultBlock, ToolUseBlock, - Usage, } from '../types.js'; -import type { - JsonOutputAdapterInterface, - ResultOptions, -} from './JsonOutputAdapter.js'; +import { + BaseJsonOutputAdapter, + type MessageState, + type ResultOptions, + type JsonOutputAdapterInterface, +} from './BaseJsonOutputAdapter.js'; /** * Stream JSON output adapter that emits messages immediately * as they are completed during the streaming process. + * Supports both main agent and subagent messages through distinct APIs. */ -export class StreamJsonOutputAdapter implements JsonOutputAdapterInterface { - lastAssistantMessage: CLIAssistantMessage | null = null; - - // Assistant message building state - private messageId: string | null = null; - private blocks: ContentBlock[] = []; - private openBlocks = new Set(); - private usage: Usage = this.createUsage(); - private messageStarted = false; - private finalized = false; - private currentBlockType: ContentBlock['type'] | null = null; - +export class StreamJsonOutputAdapter + extends BaseJsonOutputAdapter + implements JsonOutputAdapterInterface +{ constructor( - private readonly config: Config, + config: Config, private readonly includePartialMessages: boolean, - ) {} - - private createUsage( - metadata?: GenerateContentResponseUsageMetadata | null, - ): Usage { - const usage: Usage = { - input_tokens: 0, - output_tokens: 0, - }; - - if (!metadata) { - return usage; - } - - if (typeof metadata.promptTokenCount === 'number') { - usage.input_tokens = metadata.promptTokenCount; - } - if (typeof metadata.candidatesTokenCount === 'number') { - usage.output_tokens = metadata.candidatesTokenCount; - } - if (typeof metadata.cachedContentTokenCount === 'number') { - usage.cache_read_input_tokens = metadata.cachedContentTokenCount; - } - if (typeof metadata.totalTokenCount === 'number') { - usage.total_tokens = metadata.totalTokenCount; - } - - return usage; + ) { + super(config); } - private buildMessage(): CLIAssistantMessage { - if (!this.messageId) { - throw new Error('Message not started'); - } - - // Enforce constraint: assistant message must contain only a single type of ContentBlock - if (this.blocks.length > 0) { - const blockTypes = new Set(this.blocks.map((block) => block.type)); - if (blockTypes.size > 1) { - throw new Error( - `Assistant message must contain only one type of ContentBlock, found: ${Array.from(blockTypes).join(', ')}`, - ); - } - } - - // Determine stop_reason based on content block types - // If the message contains only tool_use blocks, set stop_reason to 'tool_use' - const stopReason = - this.blocks.length > 0 && - this.blocks.every((block) => block.type === 'tool_use') - ? 'tool_use' - : null; - - return { - type: 'assistant', - uuid: this.messageId, - session_id: this.config.getSessionId(), - parent_tool_use_id: null, - message: { - id: this.messageId, - type: 'message', - role: 'assistant', - model: this.config.getModel(), - content: this.blocks, - stop_reason: stopReason, - usage: this.usage, - }, - }; - } - - private appendText(fragment: string): void { - if (fragment.length === 0) { - return; - } - - this.ensureBlockTypeConsistency('text'); - this.ensureMessageStarted(); - - let current = this.blocks[this.blocks.length - 1]; - if (!current || current.type !== 'text') { - current = { type: 'text', text: '' } satisfies TextBlock; - const index = this.blocks.length; - this.blocks.push(current); - this.openBlock(index, current); - } - - current.text += fragment; - const index = this.blocks.length - 1; - this.emitStreamEvent({ - type: 'content_block_delta', - index, - delta: { type: 'text_delta', text: fragment }, - }); - } - - private appendThinking(subject?: string, description?: string): void { - const fragment = [subject?.trim(), description?.trim()] - .filter((value) => value && value.length > 0) - .join(': '); - if (!fragment) { - return; - } - - this.ensureBlockTypeConsistency('thinking'); - this.ensureMessageStarted(); - - let current = this.blocks[this.blocks.length - 1]; - if (!current || current.type !== 'thinking') { - current = { - type: 'thinking', - thinking: '', - signature: subject, - } satisfies ThinkingBlock; - const index = this.blocks.length; - this.blocks.push(current); - this.openBlock(index, current); - } - - current.thinking = `${current.thinking ?? ''}${fragment}`; - const index = this.blocks.length - 1; - this.emitStreamEvent({ - type: 'content_block_delta', - index, - delta: { type: 'thinking_delta', thinking: fragment }, - }); - } - - private appendToolUse(request: ToolCallRequestInfo): void { - this.ensureBlockTypeConsistency('tool_use'); - this.ensureMessageStarted(); - this.finalizePendingBlocks(); - - const index = this.blocks.length; - const block: ToolUseBlock = { - type: 'tool_use', - id: request.callId, - name: request.name, - input: request.args, - }; - this.blocks.push(block); - this.openBlock(index, block); - this.emitStreamEvent({ - type: 'content_block_delta', - index, - delta: { - type: 'input_json_delta', - partial_json: JSON.stringify(request.args ?? {}), - }, - }); - this.closeBlock(index); - } - - private ensureMessageStarted(): void { - if (this.messageStarted) { - return; - } - this.messageStarted = true; - this.emitStreamEvent({ - type: 'message_start', - message: { - id: this.messageId!, - role: 'assistant', - model: this.config.getModel(), - }, - }); - } - - private finalizePendingBlocks(): void { - const lastBlock = this.blocks[this.blocks.length - 1]; - if (!lastBlock) { - return; - } - - if (lastBlock.type === 'text') { - const index = this.blocks.length - 1; - this.closeBlock(index); - } else if (lastBlock.type === 'thinking') { - const index = this.blocks.length - 1; - this.closeBlock(index); - } - } - - private openBlock(index: number, block: ContentBlock): void { - this.openBlocks.add(index); - this.emitStreamEvent({ - type: 'content_block_start', - index, - content_block: block, - }); - } - - private closeBlock(index: number): void { - if (!this.openBlocks.has(index)) { - return; - } - this.openBlocks.delete(index); - this.emitStreamEvent({ - type: 'content_block_stop', - index, - }); - } - - private emitStreamEvent(event: StreamEvent): void { - if (!this.includePartialMessages) { - return; - } - const enrichedEvent = this.messageStarted - ? ({ ...event, message_id: this.messageId } as StreamEvent & { - message_id: string; - }) - : event; - const partial: CLIPartialAssistantMessage = { - type: 'stream_event', - uuid: randomUUID(), - session_id: this.config.getSessionId(), - parent_tool_use_id: null, - event: enrichedEvent, - }; - this.emitMessage(partial); - } - - startAssistantMessage(): void { - // Reset state for new message - this.messageId = randomUUID(); - this.blocks = []; - this.openBlocks = new Set(); - this.usage = this.createUsage(); - this.messageStarted = false; - this.finalized = false; - this.currentBlockType = null; - } - - processEvent(event: ServerGeminiStreamEvent): void { - if (this.finalized) { - return; - } - - switch (event.type) { - case GeminiEventType.Content: - this.appendText(event.value); - break; - case GeminiEventType.Citation: - if (typeof event.value === 'string') { - this.appendText(`\n${event.value}`); - } - break; - case GeminiEventType.Thought: - this.appendThinking(event.value.subject, event.value.description); - break; - case GeminiEventType.ToolCallRequest: - this.appendToolUse(event.value); - break; - case GeminiEventType.Finished: - if (event.value?.usageMetadata) { - this.usage = this.createUsage(event.value.usageMetadata); - } - this.finalizePendingBlocks(); - break; - default: - break; - } - } - - finalizeAssistantMessage(): CLIAssistantMessage { - if (this.finalized) { - return this.buildMessage(); - } - this.finalized = true; - - this.finalizePendingBlocks(); - const orderedOpenBlocks = Array.from(this.openBlocks).sort((a, b) => a - b); - for (const index of orderedOpenBlocks) { - this.closeBlock(index); - } - - if (this.messageStarted && this.includePartialMessages) { - this.emitStreamEvent({ type: 'message_stop' }); - } - - const message = this.buildMessage(); - this.lastAssistantMessage = message; - this.emitMessage(message); - return message; - } - - emitResult(options: ResultOptions): void { - const baseUuid = randomUUID(); - const baseSessionId = this.getSessionId(); - const usage = options.usage ?? createExtendedUsage(); - const resultText = - options.summary ?? - (this.lastAssistantMessage - ? extractTextFromBlocks(this.lastAssistantMessage.message.content) - : ''); - - let message: CLIResultMessage; - if (options.isError) { - const errorMessage = options.errorMessage ?? 'Unknown error'; - const errorResult: CLIResultMessageError = { - type: 'result', - subtype: - (options.subtype as CLIResultMessageError['subtype']) ?? - 'error_during_execution', - uuid: baseUuid, - session_id: baseSessionId, - is_error: true, - duration_ms: options.durationMs, - duration_api_ms: options.apiDurationMs, - num_turns: options.numTurns, - total_cost_usd: options.totalCostUsd ?? 0, - usage, - permission_denials: [], - error: { message: errorMessage }, - }; - message = errorResult; - } else { - const success: CLIResultMessageSuccess = { - type: 'result', - subtype: - (options.subtype as CLIResultMessageSuccess['subtype']) ?? 'success', - uuid: baseUuid, - session_id: baseSessionId, - is_error: false, - duration_ms: options.durationMs, - duration_api_ms: options.apiDurationMs, - num_turns: options.numTurns, - result: resultText, - total_cost_usd: options.totalCostUsd ?? 0, - usage, - permission_denials: [], - }; - message = success; - } - - this.emitMessage(message); - } - - emitMessage(message: unknown): void { + /** + * Emits message immediately to stdout (stream mode). + */ + protected emitMessageImpl(message: CLIMessage | ControlMessage): void { // Track assistant messages for result generation if ( typeof message === 'object' && @@ -398,138 +50,251 @@ export class StreamJsonOutputAdapter implements JsonOutputAdapterInterface { 'type' in message && message.type === 'assistant' ) { - this.lastAssistantMessage = message as CLIAssistantMessage; + this.updateLastAssistantMessage(message as CLIAssistantMessage); } // Emit messages immediately in stream mode process.stdout.write(`${JSON.stringify(message)}\n`); } - emitUserMessage(parts: Part[], parentToolUseId: string | null = null): void { - const content = partsToString(parts); - const message: CLIUserMessage = { - type: 'user', - uuid: randomUUID(), - session_id: this.getSessionId(), - parent_tool_use_id: parentToolUseId, - message: { - role: 'user', - content, - }, - }; - this.emitMessage(message); + /** + * Stream mode emits stream events when includePartialMessages is enabled. + */ + protected shouldEmitStreamEvents(): boolean { + return this.includePartialMessages; } - emitToolResult( - request: ToolCallRequestInfo, - response: ToolCallResponseInfo, - ): void { - const block: ToolResultBlock = { - type: 'tool_result', - tool_use_id: request.callId, - is_error: Boolean(response.error), - }; - const content = toolResultContent(response); - if (content !== undefined) { - block.content = content; + finalizeAssistantMessage(): CLIAssistantMessage { + const state = this.mainAgentMessageState; + if (state.finalized) { + return this.buildMessage(null); + } + state.finalized = true; + + this.finalizePendingBlocks(state, null); + const orderedOpenBlocks = Array.from(state.openBlocks).sort( + (a, b) => a - b, + ); + for (const index of orderedOpenBlocks) { + this.onBlockClosed(state, index, null); + this.closeBlock(state, index); } - const message: CLIUserMessage = { - type: 'user', - uuid: randomUUID(), - session_id: this.getSessionId(), - parent_tool_use_id: request.callId, - message: { - role: 'user', - content: [block], - }, - }; - this.emitMessage(message); + if (state.messageStarted && this.includePartialMessages) { + this.emitStreamEventIfEnabled({ type: 'message_stop' }, null); + } + + const message = this.buildMessage(null); + this.updateLastAssistantMessage(message); + this.emitMessageImpl(message); + return message; } - emitSystemMessage(subtype: string, data?: unknown): void { - const systemMessage = { - type: 'system', - subtype, - uuid: randomUUID(), - session_id: this.getSessionId(), - data, - } as const; - this.emitMessage(systemMessage); + emitResult(options: ResultOptions): void { + const resultMessage = this.buildResultMessage( + options, + this.lastAssistantMessage, + ); + this.emitMessageImpl(resultMessage); } - getSessionId(): string { - return this.config.getSessionId(); + emitMessage(message: CLIMessage | ControlMessage): void { + // In stream mode, emit immediately + this.emitMessageImpl(message); } - getModel(): string { - return this.config.getModel(); - } - - // Legacy methods for backward compatibility - send(message: unknown): void { + send(message: CLIMessage | ControlMessage): void { this.emitMessage(message); } /** - * Keeps the assistant message scoped to a single content block type. - * If the requested block type differs from the current message type, - * the existing message is finalized and a fresh assistant message is started - * so that every emitted assistant message contains exactly one block category. + * Overrides base class hook to emit stream event when text block is created. */ - private ensureBlockTypeConsistency(targetType: ContentBlock['type']): void { - if (this.currentBlockType === targetType) { + protected override onTextBlockCreated( + state: MessageState, + index: number, + block: TextBlock, + parentToolUseId: string | null, + ): void { + this.emitStreamEventIfEnabled( + { + type: 'content_block_start', + index, + content_block: block, + }, + parentToolUseId, + ); + } + + /** + * Overrides base class hook to emit stream event when text is appended. + */ + protected override onTextAppended( + state: MessageState, + index: number, + fragment: string, + parentToolUseId: string | null, + ): void { + this.emitStreamEventIfEnabled( + { + type: 'content_block_delta', + index, + delta: { type: 'text_delta', text: fragment }, + }, + parentToolUseId, + ); + } + + /** + * Overrides base class hook to emit stream event when thinking block is created. + */ + protected override onThinkingBlockCreated( + state: MessageState, + index: number, + block: ThinkingBlock, + parentToolUseId: string | null, + ): void { + this.emitStreamEventIfEnabled( + { + type: 'content_block_start', + index, + content_block: block, + }, + parentToolUseId, + ); + } + + /** + * Overrides base class hook to emit stream event when thinking is appended. + */ + protected override onThinkingAppended( + state: MessageState, + index: number, + fragment: string, + parentToolUseId: string | null, + ): void { + this.emitStreamEventIfEnabled( + { + type: 'content_block_delta', + index, + delta: { type: 'thinking_delta', thinking: fragment }, + }, + parentToolUseId, + ); + } + + /** + * Overrides base class hook to emit stream event when tool_use block is created. + */ + protected override onToolUseBlockCreated( + state: MessageState, + index: number, + block: ToolUseBlock, + parentToolUseId: string | null, + ): void { + this.emitStreamEventIfEnabled( + { + type: 'content_block_start', + index, + content_block: block, + }, + parentToolUseId, + ); + } + + /** + * Overrides base class hook to emit stream event when tool_use input is set. + */ + protected override onToolUseInputSet( + state: MessageState, + index: number, + input: unknown, + parentToolUseId: string | null, + ): void { + this.emitStreamEventIfEnabled( + { + type: 'content_block_delta', + index, + delta: { + type: 'input_json_delta', + partial_json: JSON.stringify(input), + }, + }, + parentToolUseId, + ); + } + + /** + * Overrides base class hook to emit stream event when block is closed. + */ + protected override onBlockClosed( + state: MessageState, + index: number, + parentToolUseId: string | null, + ): void { + if (this.includePartialMessages) { + this.emitStreamEventIfEnabled( + { + type: 'content_block_stop', + index, + }, + parentToolUseId, + ); + } + } + + /** + * Overrides base class hook to emit message_start event when message is started. + * Only emits for main agent, not for subagents. + */ + protected override onEnsureMessageStarted( + state: MessageState, + parentToolUseId: string | null, + ): void { + // Only emit message_start for main agent, not for subagents + if (parentToolUseId === null) { + this.emitStreamEventIfEnabled( + { + type: 'message_start', + message: { + id: state.messageId!, + role: 'assistant', + model: this.config.getModel(), + }, + }, + null, + ); + } + } + + /** + * Emits stream events when partial messages are enabled. + * This is a private method specific to StreamJsonOutputAdapter. + * @param event - Stream event to emit + * @param parentToolUseId - null for main agent, string for subagent + */ + private emitStreamEventIfEnabled( + event: StreamEvent, + parentToolUseId: string | null, + ): void { + if (!this.includePartialMessages) { return; } - if (this.currentBlockType === null) { - this.currentBlockType = targetType; - return; - } + const state = this.getMessageState(parentToolUseId); + const enrichedEvent = state.messageStarted + ? ({ ...event, message_id: state.messageId } as StreamEvent & { + message_id: string; + }) + : event; - this.finalizeAssistantMessage(); - this.startAssistantMessage(); - this.currentBlockType = targetType; + const partial: CLIPartialAssistantMessage = { + type: 'stream_event', + uuid: randomUUID(), + session_id: this.getSessionId(), + parent_tool_use_id: parentToolUseId, + event: enrichedEvent, + }; + this.emitMessageImpl(partial); } } - -function partsToString(parts: Part[]): string { - return parts - .map((part) => { - if ('text' in part && typeof part.text === 'string') { - return part.text; - } - return JSON.stringify(part); - }) - .join(''); -} - -function toolResultContent(response: ToolCallResponseInfo): string | undefined { - if ( - typeof response.resultDisplay === 'string' && - response.resultDisplay.trim().length > 0 - ) { - return response.resultDisplay; - } - if (response.responseParts && response.responseParts.length > 0) { - return partsToString(response.responseParts); - } - if (response.error) { - return response.error.message; - } - return undefined; -} - -function extractTextFromBlocks(blocks: ContentBlock[]): string { - return blocks - .filter((block) => block.type === 'text') - .map((block) => (block.type === 'text' ? block.text : '')) - .join(''); -} - -function createExtendedUsage(): ExtendedUsage { - return { - input_tokens: 0, - output_tokens: 0, - }; -} diff --git a/packages/cli/src/nonInteractiveCli.test.ts b/packages/cli/src/nonInteractiveCli.test.ts index 0303e6ef..b5079b9f 100644 --- a/packages/cli/src/nonInteractiveCli.test.ts +++ b/packages/cli/src/nonInteractiveCli.test.ts @@ -251,6 +251,7 @@ describe('runNonInteractive', () => { mockConfig, expect.objectContaining({ name: 'testTool' }), expect.any(AbortSignal), + undefined, ); expect(mockGeminiClient.sendMessageStream).toHaveBeenNthCalledWith( 2, @@ -302,6 +303,9 @@ describe('runNonInteractive', () => { .mockReturnValueOnce(createStreamFromEvents([toolCallEvent])) .mockReturnValueOnce(createStreamFromEvents(finalResponse)); + // Enable debug mode so handleToolError logs to console.error + (mockConfig.getDebugMode as Mock).mockReturnValue(true); + await runNonInteractive( mockConfig, mockSettings, @@ -379,6 +383,9 @@ describe('runNonInteractive', () => { .mockReturnValueOnce(createStreamFromEvents([toolCallEvent])) .mockReturnValueOnce(createStreamFromEvents(finalResponse)); + // Enable debug mode so handleToolError logs to console.error + (mockConfig.getDebugMode as Mock).mockReturnValue(true); + await runNonInteractive( mockConfig, mockSettings, @@ -608,6 +615,7 @@ describe('runNonInteractive', () => { mockConfig, expect.objectContaining({ name: 'testTool' }), expect.any(AbortSignal), + undefined, ); // JSON adapter emits array of messages, last one is result with stats @@ -1211,7 +1219,14 @@ describe('runNonInteractive', () => { prompt_id: 'prompt-id-tool', }, }; - const toolResponse: Part[] = [{ text: 'Tool executed successfully' }]; + const toolResponse: Part[] = [ + { + functionResponse: { + name: 'testTool', + response: { output: 'Tool executed successfully' }, + }, + }, + ]; mockCoreExecuteToolCall.mockResolvedValue({ responseParts: toolResponse }); const firstCallEvents: ServerGeminiStreamEvent[] = [toolCallEvent]; @@ -1279,7 +1294,7 @@ describe('runNonInteractive', () => { expect(toolResultBlock?.content).toBe('Tool executed successfully'); }); - it('should emit system messages for tool errors in stream-json format', async () => { + it('should emit tool errors in tool_result blocks in stream-json format', async () => { (mockConfig.getOutputFormat as Mock).mockReturnValue('stream-json'); (mockConfig.getIncludePartialMessages as Mock).mockReturnValue(false); @@ -1346,14 +1361,30 @@ describe('runNonInteractive', () => { .filter((line) => line.trim().length > 0) .map((line) => JSON.parse(line)); - // Should have system message for tool error - const systemMessages = envelopes.filter((env) => env.type === 'system'); - const toolErrorSystemMessage = systemMessages.find( - (msg) => msg.subtype === 'tool_error', + // Tool errors are now captured in tool_result blocks with is_error=true, + // not as separate system messages (see comment in nonInteractiveCli.ts line 307-309) + const toolResultMessages = envelopes.filter( + (env) => + env.type === 'user' && + Array.isArray(env.message?.content) && + env.message.content.some( + (block: unknown) => + typeof block === 'object' && + block !== null && + 'type' in block && + block.type === 'tool_result', + ), ); - expect(toolErrorSystemMessage).toBeTruthy(); - expect(toolErrorSystemMessage?.data?.tool).toBe('errorTool'); - expect(toolErrorSystemMessage?.data?.message).toBe('Tool execution failed'); + expect(toolResultMessages.length).toBeGreaterThan(0); + const toolResultBlock = toolResultMessages[0]?.message?.content?.find( + (block: unknown) => + typeof block === 'object' && + block !== null && + 'type' in block && + block.type === 'tool_result', + ); + expect(toolResultBlock?.tool_use_id).toBe('tool-error'); + expect(toolResultBlock?.is_error).toBe(true); }); it('should emit partial messages when includePartialMessages is true', async () => { diff --git a/packages/cli/src/nonInteractiveCli.ts b/packages/cli/src/nonInteractiveCli.ts index 5598161e..e099b481 100644 --- a/packages/cli/src/nonInteractiveCli.ts +++ b/packages/cli/src/nonInteractiveCli.ts @@ -19,7 +19,7 @@ import { } from '@qwen-code/qwen-code-core'; import type { Content, Part, PartListUnion } from '@google/genai'; import type { CLIUserMessage, PermissionMode } from './nonInteractive/types.js'; -import type { JsonOutputAdapterInterface } from './nonInteractive/io/JsonOutputAdapter.js'; +import type { JsonOutputAdapterInterface } from './nonInteractive/io/BaseJsonOutputAdapter.js'; import { JsonOutputAdapter } from './nonInteractive/io/JsonOutputAdapter.js'; import { StreamJsonOutputAdapter } from './nonInteractive/io/StreamJsonOutputAdapter.js'; import type { ControlService } from './nonInteractive/control/ControlService.js'; @@ -39,6 +39,7 @@ import { extractUsageFromGeminiClient, calculateApproximateCost, buildSystemMessage, + createTaskToolProgressHandler, } from './utils/nonInteractiveHelpers.js'; /** @@ -217,6 +218,7 @@ export async function runNonInteractive( if (toolCallRequests.length > 0) { const toolResponseParts: Part[] = []; + for (const requestInfo of toolCallRequests) { const finalRequestInfo = requestInfo; @@ -254,17 +256,36 @@ export async function runNonInteractive( ? options.controlService.permission.getToolCallUpdateCallback() : undefined; */ + + // Only pass outputUpdateHandler for Task tool + const isTaskTool = finalRequestInfo.name === 'task'; + const taskToolProgress = isTaskTool + ? createTaskToolProgressHandler( + config, + finalRequestInfo.callId, + adapter, + ) + : undefined; + const taskToolProgressHandler = taskToolProgress?.handler; const toolResponse = await executeToolCall( config, finalRequestInfo, abortController.signal, - /* - toolCallUpdateCallback - ? { onToolCallsUpdate: toolCallUpdateCallback } + isTaskTool && taskToolProgressHandler + ? { + outputUpdateHandler: taskToolProgressHandler, + /* + toolCallUpdateCallback + ? { onToolCallsUpdate: toolCallUpdateCallback } + : undefined, + */ + } : undefined, - */ ); + // Note: In JSON mode, subagent messages are automatically added to the main + // adapter's messages array and will be output together on emitResult() + if (toolResponse.error) { // In JSON/STREAM_JSON mode, tool errors are tolerated and formatted // as tool_result blocks. handleToolError will detect JSON/STREAM_JSON mode diff --git a/packages/cli/src/utils/errors.test.ts b/packages/cli/src/utils/errors.test.ts index 0a1c7191..818c3ac3 100644 --- a/packages/cli/src/utils/errors.test.ts +++ b/packages/cli/src/utils/errors.test.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { vi, type MockInstance } from 'vitest'; +import { vi, type Mock, type MockInstance } from 'vitest'; import type { Config } from '@qwen-code/qwen-code-core'; import { OutputFormat, FatalInputError } from '@qwen-code/qwen-code-core'; import { @@ -83,6 +83,7 @@ describe('errors', () => { mockConfig = { getOutputFormat: vi.fn().mockReturnValue(OutputFormat.TEXT), getContentGeneratorConfig: vi.fn().mockReturnValue({ authType: 'test' }), + getDebugMode: vi.fn().mockReturnValue(true), } as unknown as Config; }); @@ -254,110 +255,180 @@ describe('errors', () => { const toolName = 'test-tool'; const toolError = new Error('Tool failed'); - describe('in text mode', () => { + describe('when debug mode is enabled', () => { beforeEach(() => { - ( - mockConfig.getOutputFormat as ReturnType - ).mockReturnValue(OutputFormat.TEXT); + (mockConfig.getDebugMode as Mock).mockReturnValue(true); }); - it('should log error message to stderr', () => { - handleToolError(toolName, toolError, mockConfig); + describe('in text mode', () => { + beforeEach(() => { + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.TEXT); + }); - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Tool failed', - ); + it('should log error message to stderr and not exit', () => { + handleToolError(toolName, toolError, mockConfig); + + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Tool failed', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should use resultDisplay when provided and not exit', () => { + handleToolError( + toolName, + toolError, + mockConfig, + 'CUSTOM_ERROR', + 'Custom display message', + ); + + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Custom display message', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); }); - it('should use resultDisplay when provided', () => { - handleToolError( - toolName, - toolError, - mockConfig, - 'CUSTOM_ERROR', - 'Custom display message', - ); + describe('in JSON mode', () => { + beforeEach(() => { + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.JSON); + }); - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Custom display message', - ); + it('should log error message to stderr and not exit', () => { + handleToolError(toolName, toolError, mockConfig); + + // In JSON mode, should not exit (just log to stderr when debug mode is on) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Tool failed', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should log error with custom error code and not exit', () => { + handleToolError(toolName, toolError, mockConfig, 'CUSTOM_TOOL_ERROR'); + + // In JSON mode, should not exit (just log to stderr when debug mode is on) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Tool failed', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should log error with numeric error code and not exit', () => { + handleToolError(toolName, toolError, mockConfig, 500); + + // In JSON mode, should not exit (just log to stderr when debug mode is on) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Tool failed', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should prefer resultDisplay over error message and not exit', () => { + handleToolError( + toolName, + toolError, + mockConfig, + 'DISPLAY_ERROR', + 'Display message', + ); + + // In JSON mode, should not exit (just log to stderr when debug mode is on) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Display message', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + }); + + describe('in STREAM_JSON mode', () => { + beforeEach(() => { + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.STREAM_JSON); + }); + + it('should log error message to stderr and not exit', () => { + handleToolError(toolName, toolError, mockConfig); + + // Should not exit in STREAM_JSON mode (just log to stderr when debug mode is on) + expect(consoleErrorSpy).toHaveBeenCalledWith( + 'Error executing tool test-tool: Tool failed', + ); + expect(processExitSpy).not.toHaveBeenCalled(); + }); }); }); - describe('in JSON mode', () => { + describe('when debug mode is disabled', () => { beforeEach(() => { + (mockConfig.getDebugMode as Mock).mockReturnValue(false); + }); + + it('should not log and not exit in text mode', () => { + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.TEXT); + + handleToolError(toolName, toolError, mockConfig); + + expect(consoleErrorSpy).not.toHaveBeenCalled(); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should not log and not exit in JSON mode', () => { ( mockConfig.getOutputFormat as ReturnType ).mockReturnValue(OutputFormat.JSON); - }); - it('should log error message to stderr and not exit', () => { handleToolError(toolName, toolError, mockConfig); - // In JSON mode, should not exit (just log to stderr) - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Tool failed', - ); + expect(consoleErrorSpy).not.toHaveBeenCalled(); expect(processExitSpy).not.toHaveBeenCalled(); }); - it('should log error with custom error code and not exit', () => { - handleToolError(toolName, toolError, mockConfig, 'CUSTOM_TOOL_ERROR'); - - // In JSON mode, should not exit (just log to stderr) - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Tool failed', - ); - expect(processExitSpy).not.toHaveBeenCalled(); - }); - - it('should log error with numeric error code and not exit', () => { - handleToolError(toolName, toolError, mockConfig, 500); - - // In JSON mode, should not exit (just log to stderr) - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Tool failed', - ); - expect(processExitSpy).not.toHaveBeenCalled(); - }); - - it('should prefer resultDisplay over error message', () => { - handleToolError( - toolName, - toolError, - mockConfig, - 'DISPLAY_ERROR', - 'Display message', - ); - - // In JSON mode, should not exit (just log to stderr) - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Display message', - ); - expect(processExitSpy).not.toHaveBeenCalled(); - }); - - it('should not exit in JSON mode', () => { - handleToolError(toolName, toolError, mockConfig); - - // Should not throw (no exit) - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Tool failed', - ); - expect(processExitSpy).not.toHaveBeenCalled(); - }); - - it('should not exit in STREAM_JSON mode', () => { + it('should not log and not exit in STREAM_JSON mode', () => { ( mockConfig.getOutputFormat as ReturnType ).mockReturnValue(OutputFormat.STREAM_JSON); handleToolError(toolName, toolError, mockConfig); - // Should not exit in STREAM_JSON mode - expect(consoleErrorSpy).toHaveBeenCalledWith( - 'Error executing tool test-tool: Tool failed', - ); + expect(consoleErrorSpy).not.toHaveBeenCalled(); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + }); + + describe('process exit behavior', () => { + beforeEach(() => { + (mockConfig.getDebugMode as Mock).mockReturnValue(true); + }); + + it('should never exit regardless of output format', () => { + // Test in TEXT mode + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.TEXT); + handleToolError(toolName, toolError, mockConfig); + expect(processExitSpy).not.toHaveBeenCalled(); + + // Test in JSON mode + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.JSON); + handleToolError(toolName, toolError, mockConfig); + expect(processExitSpy).not.toHaveBeenCalled(); + + // Test in STREAM_JSON mode + ( + mockConfig.getOutputFormat as ReturnType + ).mockReturnValue(OutputFormat.STREAM_JSON); + handleToolError(toolName, toolError, mockConfig); expect(processExitSpy).not.toHaveBeenCalled(); }); }); diff --git a/packages/cli/src/utils/nonInteractiveHelpers.ts b/packages/cli/src/utils/nonInteractiveHelpers.ts index 2ffa6108..9fb1a5d3 100644 --- a/packages/cli/src/utils/nonInteractiveHelpers.ts +++ b/packages/cli/src/utils/nonInteractiveHelpers.ts @@ -4,7 +4,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { Config } from '@qwen-code/qwen-code-core'; +import type { + Config, + ToolResultDisplay, + TaskResultDisplay, + OutputUpdateHandler, + ToolCallRequestInfo, + ToolCallResponseInfo, +} from '@qwen-code/qwen-code-core'; +import { ToolErrorType } from '@qwen-code/qwen-code-core'; import type { Part, PartListUnion } from '@google/genai'; import type { CLIUserMessage, @@ -15,6 +23,7 @@ import type { } from '../nonInteractive/types.js'; import { CommandService } from '../services/CommandService.js'; import { BuiltinCommandLoader } from '../services/BuiltinCommandLoader.js'; +import type { JsonOutputAdapterInterface } from '../nonInteractive/io/BaseJsonOutputAdapter.js'; /** * Normalizes various part list formats into a consistent Part[] array. @@ -244,3 +253,324 @@ export async function buildSystemMessage( return systemMessage; } + +/** + * Creates an output update handler specifically for Task tool subagent execution. + * This handler monitors TaskResultDisplay updates and converts them to protocol messages + * using the unified adapter's subagent APIs. All emitted messages will have parent_tool_use_id set to + * the task tool's callId. + * + * @param config - Config instance for getting output format + * @param taskToolCallId - The task tool's callId to use as parent_tool_use_id for all subagent messages + * @param adapter - The unified adapter instance (JsonOutputAdapter or StreamJsonOutputAdapter) + * @returns An object containing the output update handler + */ +export function createTaskToolProgressHandler( + config: Config, + taskToolCallId: string, + adapter: JsonOutputAdapterInterface | undefined, +): { + handler: OutputUpdateHandler; +} { + // Track previous TaskResultDisplay states per tool call to detect changes + const previousTaskStates = new Map(); + // Track which tool call IDs have already emitted tool_use to prevent duplicates + const emittedToolUseIds = new Set(); + // Track which tool call IDs have already emitted tool_result to prevent duplicates + const emittedToolResultIds = new Set(); + + /** + * Builds a ToolCallRequestInfo object from a tool call. + * + * @param toolCall - The tool call information + * @returns ToolCallRequestInfo object + */ + const buildRequest = ( + toolCall: NonNullable[number], + ): ToolCallRequestInfo => ({ + callId: toolCall.callId, + name: toolCall.name, + args: toolCall.args || {}, + isClientInitiated: true, + prompt_id: '', + response_id: undefined, + }); + + /** + * Builds a ToolCallResponseInfo object from a tool call. + * + * @param toolCall - The tool call information + * @returns ToolCallResponseInfo object + */ + const buildResponse = ( + toolCall: NonNullable[number], + ): ToolCallResponseInfo => ({ + callId: toolCall.callId, + error: + toolCall.status === 'failed' + ? new Error(toolCall.error || 'Tool execution failed') + : undefined, + errorType: + toolCall.status === 'failed' ? ToolErrorType.EXECUTION_FAILED : undefined, + resultDisplay: toolCall.resultDisplay, + responseParts: toolCall.responseParts || [], + }); + + /** + * Checks if a tool call has result content that should be emitted. + * + * @param toolCall - The tool call information + * @returns True if the tool call has result content to emit + */ + const hasResultContent = ( + toolCall: NonNullable[number], + ): boolean => { + // Check resultDisplay string + if ( + typeof toolCall.resultDisplay === 'string' && + toolCall.resultDisplay.trim().length > 0 + ) { + return true; + } + + // Check responseParts - only check existence, don't parse for performance + if (toolCall.responseParts && toolCall.responseParts.length > 0) { + return true; + } + + // Failed status should always emit result + return toolCall.status === 'failed'; + }; + + /** + * Emits tool_use for a tool call if it hasn't been emitted yet. + * + * @param toolCall - The tool call information + * @param fallbackStatus - Optional fallback status if toolCall.status should be overridden + */ + const emitToolUseIfNeeded = ( + toolCall: NonNullable[number], + fallbackStatus?: 'executing' | 'awaiting_approval', + ): void => { + if (emittedToolUseIds.has(toolCall.callId)) { + return; + } + + const toolCallToEmit: NonNullable[number] = + fallbackStatus + ? { + ...toolCall, + status: fallbackStatus, + } + : toolCall; + + if ( + toolCallToEmit.status === 'executing' || + toolCallToEmit.status === 'awaiting_approval' + ) { + if (adapter?.processSubagentToolCall) { + adapter.processSubagentToolCall(toolCallToEmit, taskToolCallId); + emittedToolUseIds.add(toolCall.callId); + } + } + }; + + /** + * Emits tool_result for a tool call if it hasn't been emitted yet and has content. + * + * @param toolCall - The tool call information + */ + const emitToolResultIfNeeded = ( + toolCall: NonNullable[number], + ): void => { + if (emittedToolResultIds.has(toolCall.callId)) { + return; + } + + if (!hasResultContent(toolCall)) { + return; + } + + // Mark as emitted even if we skip, to prevent duplicate emits + emittedToolResultIds.add(toolCall.callId); + + if (adapter) { + const request = buildRequest(toolCall); + const response = buildResponse(toolCall); + // For subagent tool results, we need to pass parentToolUseId + // The adapter implementations accept an optional parentToolUseId parameter + if ( + 'emitToolResult' in adapter && + typeof adapter.emitToolResult === 'function' + ) { + adapter.emitToolResult(request, response, taskToolCallId); + } else { + adapter.emitToolResult(request, response); + } + } + }; + + /** + * Processes a tool call, ensuring tool_use and tool_result are emitted exactly once. + * + * @param toolCall - The tool call information + * @param previousCall - The previous state of the tool call (if any) + */ + const processToolCall = ( + toolCall: NonNullable[number], + previousCall?: NonNullable[number], + ): void => { + const isCompleted = + toolCall.status === 'success' || toolCall.status === 'failed'; + const isExecuting = + toolCall.status === 'executing' || + toolCall.status === 'awaiting_approval'; + const wasExecuting = + previousCall && + (previousCall.status === 'executing' || + previousCall.status === 'awaiting_approval'); + + // Emit tool_use if needed + if (isExecuting) { + // Normal case: tool call is executing or awaiting approval + emitToolUseIfNeeded(toolCall); + } else if (isCompleted && !emittedToolUseIds.has(toolCall.callId)) { + // Edge case: tool call appeared with result already (shouldn't happen normally, + // but handle it gracefully by emitting tool_use with 'executing' status first) + emitToolUseIfNeeded(toolCall, 'executing'); + } else if (wasExecuting && isCompleted) { + // Status changed from executing to completed - ensure tool_use was emitted + emitToolUseIfNeeded(toolCall, 'executing'); + } + + // Emit tool_result if tool call is completed + if (isCompleted) { + emitToolResultIfNeeded(toolCall); + } + }; + + const outputUpdateHandler = ( + callId: string, + outputChunk: ToolResultDisplay, + ) => { + // Only process TaskResultDisplay (Task tool updates) + if ( + typeof outputChunk === 'object' && + outputChunk !== null && + 'type' in outputChunk && + outputChunk.type === 'task_execution' + ) { + const taskDisplay = outputChunk as TaskResultDisplay; + const previous = previousTaskStates.get(callId); + + // If no adapter, just track state (for non-JSON modes) + if (!adapter) { + previousTaskStates.set(callId, taskDisplay); + return; + } + + // Only process if adapter supports subagent APIs + if ( + !adapter.processSubagentToolCall || + !adapter.emitSubagentErrorResult + ) { + previousTaskStates.set(callId, taskDisplay); + return; + } + + if (taskDisplay.toolCalls) { + if (!previous || !previous.toolCalls) { + // First time seeing tool calls - process all initial ones + for (const toolCall of taskDisplay.toolCalls) { + processToolCall(toolCall); + } + } else { + // Compare with previous state to find new/changed tool calls + for (const toolCall of taskDisplay.toolCalls) { + const previousCall = previous.toolCalls.find( + (tc) => tc.callId === toolCall.callId, + ); + processToolCall(toolCall, previousCall); + } + } + } + + // Handle task-level errors (status: 'failed', 'cancelled') + if ( + taskDisplay.status === 'failed' || + taskDisplay.status === 'cancelled' + ) { + const previousStatus = previous?.status; + // Only emit error result if status changed to failed/cancelled + if ( + previousStatus !== 'failed' && + previousStatus !== 'cancelled' && + previousStatus !== undefined + ) { + const errorMessage = + taskDisplay.terminateReason || + (taskDisplay.status === 'cancelled' + ? 'Task was cancelled' + : 'Task execution failed'); + // Use subagent adapter's emitSubagentErrorResult method + adapter.emitSubagentErrorResult(errorMessage, 0, taskToolCallId); + } + } + + // Update previous state + previousTaskStates.set(callId, taskDisplay); + } + }; + + return { + handler: outputUpdateHandler, + }; +} + +/** + * Converts function response parts to a string representation. + * Handles functionResponse parts specially by extracting their output content. + * + * @param parts - Array of Part objects to convert + * @returns String representation of the parts + */ +export function functionResponsePartsToString(parts: Part[]): string { + return parts + .map((part) => { + if ('functionResponse' in part) { + const content = part.functionResponse?.response?.['output'] ?? ''; + return content; + } + return JSON.stringify(part); + }) + .join(''); +} + +/** + * Extracts content from a tool call response for inclusion in tool_result blocks. + * Uses functionResponsePartsToString to properly handle functionResponse parts, + * which correctly extracts output content from functionResponse objects rather + * than simply concatenating text or JSON.stringify. + * + * @param response - Tool call response information + * @returns String content for the tool_result block, or undefined if no content available + */ +export function toolResultContent( + response: ToolCallResponseInfo, +): string | undefined { + if ( + typeof response.resultDisplay === 'string' && + response.resultDisplay.trim().length > 0 + ) { + return response.resultDisplay; + } + if (response.responseParts && response.responseParts.length > 0) { + // Always use functionResponsePartsToString to properly handle + // functionResponse parts that contain output content + return functionResponsePartsToString(response.responseParts); + } + if (response.error) { + return response.error.message; + } + return undefined; +} diff --git a/packages/core/src/subagents/subagent-events.ts b/packages/core/src/subagents/subagent-events.ts index 19ec0971..cd24998a 100644 --- a/packages/core/src/subagents/subagent-events.ts +++ b/packages/core/src/subagents/subagent-events.ts @@ -9,6 +9,7 @@ import type { ToolCallConfirmationDetails, ToolConfirmationOutcome, } from '../tools/tools.js'; +import type { Part } from '@google/genai'; export type SubAgentEvent = | 'start' @@ -72,7 +73,7 @@ export interface SubAgentToolResultEvent { name: string; success: boolean; error?: string; - resultDisplay?: string; + responseParts?: Part[]; durationMs?: number; timestamp: number; } diff --git a/packages/core/src/subagents/subagent.ts b/packages/core/src/subagents/subagent.ts index af4be47f..e68f77a5 100644 --- a/packages/core/src/subagents/subagent.ts +++ b/packages/core/src/subagents/subagent.ts @@ -619,11 +619,7 @@ export class SubAgentScope { name: toolName, success, error: errorMessage, - resultDisplay: call.response.resultDisplay - ? typeof call.response.resultDisplay === 'string' - ? call.response.resultDisplay - : JSON.stringify(call.response.resultDisplay) - : undefined, + responseParts: call.response.responseParts, durationMs: duration, timestamp: Date.now(), } as SubAgentToolResultEvent); diff --git a/packages/core/src/tools/task.ts b/packages/core/src/tools/task.ts index 06761421..25340084 100644 --- a/packages/core/src/tools/task.ts +++ b/packages/core/src/tools/task.ts @@ -332,7 +332,7 @@ class TaskToolInvocation extends BaseToolInvocation { ...this.currentToolCalls![toolCallIndex], status: event.success ? 'success' : 'failed', error: event.error, - resultDisplay: event.resultDisplay, + responseParts: event.responseParts, }; this.updateDisplay( diff --git a/packages/core/src/tools/tools.ts b/packages/core/src/tools/tools.ts index 386b0c3a..848b14c6 100644 --- a/packages/core/src/tools/tools.ts +++ b/packages/core/src/tools/tools.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { FunctionDeclaration, PartListUnion } from '@google/genai'; +import type { FunctionDeclaration, Part, PartListUnion } from '@google/genai'; import { ToolErrorType } from './tool-error.js'; import type { DiffUpdateResult } from '../ide/ide-client.js'; import type { ShellExecutionConfig } from '../services/shellExecutionService.js'; @@ -461,6 +461,7 @@ export interface TaskResultDisplay { args?: Record; result?: string; resultDisplay?: string; + responseParts?: Part[]; description?: string; }>; }