fix: subagent tool call messages

This commit is contained in:
mingholy.lmh
2025-11-04 20:29:38 +08:00
parent 14ad26f27e
commit a962e10406
14 changed files with 2011 additions and 1079 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -564,7 +564,7 @@ describe('JsonOutputAdapter', () => {
it('should handle parent_tool_use_id', () => {
const parts: Part[] = [{ text: 'Tool response' }];
adapter.emitUserMessage(parts, 'tool-id-1');
adapter.emitUserMessage(parts);
adapter.emitResult({
isError: false,
@@ -583,7 +583,8 @@ describe('JsonOutputAdapter', () => {
msg.type === 'user',
);
expect(userMessage.parent_tool_use_id).toBe('tool-id-1');
// emitUserMessage currently sets parent_tool_use_id to null
expect(userMessage.parent_tool_use_id).toBeNull();
});
});

View File

@@ -4,521 +4,78 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { randomUUID } from 'node:crypto';
import type {
Config,
ServerGeminiStreamEvent,
SessionMetrics,
ToolCallRequestInfo,
ToolCallResponseInfo,
} from '@qwen-code/qwen-code-core';
import { GeminiEventType } from '@qwen-code/qwen-code-core';
import type { Part, GenerateContentResponseUsageMetadata } from '@google/genai';
import type {
CLIAssistantMessage,
CLIResultMessage,
CLIResultMessageError,
CLIResultMessageSuccess,
CLIUserMessage,
ContentBlock,
ExtendedUsage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
Usage,
} from '../types.js';
export interface ResultOptions {
readonly isError: boolean;
readonly errorMessage?: string;
readonly durationMs: number;
readonly apiDurationMs: number;
readonly numTurns: number;
readonly usage?: ExtendedUsage;
readonly totalCostUsd?: number;
readonly stats?: SessionMetrics;
readonly summary?: string;
readonly subtype?: string;
}
/**
* Interface for message emission strategies.
* Implementations decide whether to emit messages immediately (streaming)
* or collect them for batch emission (non-streaming).
*/
export interface MessageEmitter {
emitMessage(message: unknown): void;
emitUserMessage(parts: Part[], parentToolUseId?: string | null): void;
emitToolResult(
request: ToolCallRequestInfo,
response: ToolCallResponseInfo,
): void;
emitSystemMessage(subtype: string, data?: unknown): void;
}
/**
* JSON-focused output adapter interface.
* Handles structured JSON output for both streaming and non-streaming modes.
*/
export interface JsonOutputAdapterInterface extends MessageEmitter {
startAssistantMessage(): void;
processEvent(event: ServerGeminiStreamEvent): void;
finalizeAssistantMessage(): CLIAssistantMessage;
emitResult(options: ResultOptions): void;
getSessionId(): string;
getModel(): string;
}
import type { Config } from '@qwen-code/qwen-code-core';
import type { CLIAssistantMessage, CLIMessage } from '../types.js';
import {
BaseJsonOutputAdapter,
type JsonOutputAdapterInterface,
type ResultOptions,
} from './BaseJsonOutputAdapter.js';
/**
* JSON output adapter that collects all messages and emits them
* as a single JSON array at the end of the turn.
* Supports both main agent and subagent messages through distinct APIs.
*/
export class JsonOutputAdapter implements JsonOutputAdapterInterface {
private readonly messages: unknown[] = [];
export class JsonOutputAdapter
extends BaseJsonOutputAdapter
implements JsonOutputAdapterInterface
{
private readonly messages: CLIMessage[] = [];
// Assistant message building state
private messageId: string | null = null;
private blocks: ContentBlock[] = [];
private openBlocks = new Set<number>();
private usage: Usage = this.createUsage();
private messageStarted = false;
private finalized = false;
private currentBlockType: ContentBlock['type'] | null = null;
constructor(private readonly config: Config) {}
private createUsage(
metadata?: GenerateContentResponseUsageMetadata | null,
): Usage {
const usage: Usage = {
input_tokens: 0,
output_tokens: 0,
};
if (!metadata) {
return usage;
}
if (typeof metadata.promptTokenCount === 'number') {
usage.input_tokens = metadata.promptTokenCount;
}
if (typeof metadata.candidatesTokenCount === 'number') {
usage.output_tokens = metadata.candidatesTokenCount;
}
if (typeof metadata.cachedContentTokenCount === 'number') {
usage.cache_read_input_tokens = metadata.cachedContentTokenCount;
}
if (typeof metadata.totalTokenCount === 'number') {
usage.total_tokens = metadata.totalTokenCount;
}
return usage;
constructor(config: Config) {
super(config);
}
private buildMessage(): CLIAssistantMessage {
if (!this.messageId) {
throw new Error('Message not started');
}
// Enforce constraint: assistant message must contain only a single type of ContentBlock
if (this.blocks.length > 0) {
const blockTypes = new Set(this.blocks.map((block) => block.type));
if (blockTypes.size > 1) {
throw new Error(
`Assistant message must contain only one type of ContentBlock, found: ${Array.from(blockTypes).join(', ')}`,
);
}
}
// Determine stop_reason based on content block types
// If the message contains only tool_use blocks, set stop_reason to 'tool_use'
const stopReason =
this.blocks.length > 0 &&
this.blocks.every((block) => block.type === 'tool_use')
? 'tool_use'
: null;
return {
type: 'assistant',
uuid: this.messageId,
session_id: this.config.getSessionId(),
parent_tool_use_id: null,
message: {
id: this.messageId,
type: 'message',
role: 'assistant',
model: this.config.getModel(),
content: this.blocks,
stop_reason: stopReason,
usage: this.usage,
},
};
}
private appendText(fragment: string): void {
if (fragment.length === 0) {
return;
}
this.ensureBlockTypeConsistency('text');
this.ensureMessageStarted();
let current = this.blocks[this.blocks.length - 1];
if (!current || current.type !== 'text') {
current = { type: 'text', text: '' } satisfies TextBlock;
const index = this.blocks.length;
this.blocks.push(current);
this.openBlock(index, current);
}
current.text += fragment;
// JSON mode doesn't emit partial messages, so we skip emitStreamEvent
}
private appendThinking(subject?: string, description?: string): void {
this.ensureMessageStarted();
const fragment = [subject?.trim(), description?.trim()]
.filter((value) => value && value.length > 0)
.join(': ');
if (!fragment) {
return;
}
this.ensureBlockTypeConsistency('thinking');
this.ensureMessageStarted();
let current = this.blocks[this.blocks.length - 1];
if (!current || current.type !== 'thinking') {
current = {
type: 'thinking',
thinking: '',
signature: subject,
} satisfies ThinkingBlock;
const index = this.blocks.length;
this.blocks.push(current);
this.openBlock(index, current);
}
current.thinking = `${current.thinking ?? ''}${fragment}`;
// JSON mode doesn't emit partial messages, so we skip emitStreamEvent
}
private appendToolUse(request: ToolCallRequestInfo): void {
this.ensureBlockTypeConsistency('tool_use');
this.ensureMessageStarted();
this.finalizePendingBlocks();
const index = this.blocks.length;
const block: ToolUseBlock = {
type: 'tool_use',
id: request.callId,
name: request.name,
input: request.args,
};
this.blocks.push(block);
this.openBlock(index, block);
// JSON mode doesn't emit partial messages, so we skip emitStreamEvent
this.closeBlock(index);
}
private ensureMessageStarted(): void {
if (this.messageStarted) {
return;
}
this.messageStarted = true;
// JSON mode doesn't emit partial messages, so we skip emitStreamEvent
}
private finalizePendingBlocks(): void {
const lastBlock = this.blocks[this.blocks.length - 1];
if (!lastBlock) {
return;
}
if (lastBlock.type === 'text') {
const index = this.blocks.length - 1;
this.closeBlock(index);
} else if (lastBlock.type === 'thinking') {
const index = this.blocks.length - 1;
this.closeBlock(index);
/**
* Emits message to the messages array (batch mode).
* Tracks the last assistant message for efficient result text extraction.
*/
protected emitMessageImpl(message: CLIMessage): void {
this.messages.push(message);
// Track assistant messages for result generation
if (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message.type === 'assistant'
) {
this.updateLastAssistantMessage(message as CLIAssistantMessage);
}
}
private openBlock(index: number, _block: ContentBlock): void {
this.openBlocks.add(index);
// JSON mode doesn't emit partial messages, so we skip emitStreamEvent
}
private closeBlock(index: number): void {
if (!this.openBlocks.has(index)) {
return;
}
this.openBlocks.delete(index);
// JSON mode doesn't emit partial messages, so we skip emitStreamEvent
}
startAssistantMessage(): void {
// Reset state for new message
this.messageId = randomUUID();
this.blocks = [];
this.openBlocks = new Set<number>();
this.usage = this.createUsage();
this.messageStarted = false;
this.finalized = false;
this.currentBlockType = null;
}
processEvent(event: ServerGeminiStreamEvent): void {
if (this.finalized) {
return;
}
switch (event.type) {
case GeminiEventType.Content:
this.appendText(event.value);
break;
case GeminiEventType.Citation:
if (typeof event.value === 'string') {
this.appendText(`\n${event.value}`);
}
break;
case GeminiEventType.Thought:
this.appendThinking(event.value.subject, event.value.description);
break;
case GeminiEventType.ToolCallRequest:
this.appendToolUse(event.value);
break;
case GeminiEventType.Finished:
if (event.value?.usageMetadata) {
this.usage = this.createUsage(event.value.usageMetadata);
}
this.finalizePendingBlocks();
break;
default:
break;
}
/**
* JSON mode does not emit stream events.
*/
protected shouldEmitStreamEvents(): boolean {
return false;
}
finalizeAssistantMessage(): CLIAssistantMessage {
if (this.finalized) {
return this.buildMessage();
}
this.finalized = true;
this.finalizePendingBlocks();
const orderedOpenBlocks = Array.from(this.openBlocks).sort((a, b) => a - b);
for (const index of orderedOpenBlocks) {
this.closeBlock(index);
}
const message = this.buildMessage();
this.emitMessage(message);
const message = this.finalizeAssistantMessageInternal(
this.mainAgentMessageState,
null,
);
this.updateLastAssistantMessage(message);
return message;
}
emitResult(options: ResultOptions): void {
const usage = options.usage ?? createExtendedUsage();
const resultText = options.summary ?? this.extractResponseText();
// Create the final result message to append to the messages array
const baseUuid = randomUUID();
const baseSessionId = this.getSessionId();
let resultMessage: CLIResultMessage;
if (options.isError) {
const errorMessage = options.errorMessage ?? 'Unknown error';
const errorResult: CLIResultMessageError = {
type: 'result',
subtype:
(options.subtype as CLIResultMessageError['subtype']) ??
'error_during_execution',
uuid: baseUuid,
session_id: baseSessionId,
is_error: true,
duration_ms: options.durationMs,
duration_api_ms: options.apiDurationMs,
num_turns: options.numTurns,
total_cost_usd: options.totalCostUsd ?? 0,
usage,
permission_denials: [],
error: { message: errorMessage },
};
resultMessage = errorResult;
} else {
const success: CLIResultMessageSuccess & { stats?: SessionMetrics } = {
type: 'result',
subtype:
(options.subtype as CLIResultMessageSuccess['subtype']) ?? 'success',
uuid: baseUuid,
session_id: baseSessionId,
is_error: false,
duration_ms: options.durationMs,
duration_api_ms: options.apiDurationMs,
num_turns: options.numTurns,
result: resultText,
total_cost_usd: options.totalCostUsd ?? 0,
usage,
permission_denials: [],
};
// Include stats if available
if (options.stats) {
success.stats = options.stats;
}
resultMessage = success;
}
// Add the result message to the messages array
const resultMessage = this.buildResultMessage(
options,
this.lastAssistantMessage,
);
this.messages.push(resultMessage);
// Emit the entire messages array as JSON
// Emit the entire messages array as JSON (includes all main agent + subagent messages)
const json = JSON.stringify(this.messages);
process.stdout.write(`${json}\n`);
}
emitMessage(message: unknown): void {
// Stash messages instead of emitting immediately
emitMessage(message: CLIMessage): void {
// In JSON mode, messages are collected in the messages array
// This is called by the base class's finalizeAssistantMessageInternal
// but can also be called directly for user/tool/system messages
this.messages.push(message);
}
emitUserMessage(parts: Part[], parentToolUseId: string | null = null): void {
const content = partsToString(parts);
const message: CLIUserMessage = {
type: 'user',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: parentToolUseId,
message: {
role: 'user',
content,
},
};
this.emitMessage(message);
}
emitToolResult(
request: ToolCallRequestInfo,
response: ToolCallResponseInfo,
): void {
const block: ToolResultBlock = {
type: 'tool_result',
tool_use_id: request.callId,
is_error: Boolean(response.error),
};
const content = toolResultContent(response);
if (content !== undefined) {
block.content = content;
}
const message: CLIUserMessage = {
type: 'user',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: request.callId,
message: {
role: 'user',
content: [block],
},
};
this.emitMessage(message);
}
emitSystemMessage(subtype: string, data?: unknown): void {
const systemMessage = {
type: 'system',
subtype,
uuid: randomUUID(),
session_id: this.getSessionId(),
data,
} as const;
this.emitMessage(systemMessage);
}
getSessionId(): string {
return this.config.getSessionId();
}
getModel(): string {
return this.config.getModel();
}
private extractResponseText(): string {
const assistantMessages = this.messages.filter(
(msg): msg is CLIAssistantMessage =>
typeof msg === 'object' &&
msg !== null &&
'type' in msg &&
msg.type === 'assistant',
);
return assistantMessages
.map((msg) => extractTextFromBlocks(msg.message.content))
.filter((text) => text.length > 0)
.join('\n');
}
/**
* Guarantees that a single assistant message aggregates only one
* content block category (text, thinking, or tool use). When a new
* block type is requested, the current message is finalized and a fresh
* assistant message is started to honour the single-type constraint.
*/
private ensureBlockTypeConsistency(targetType: ContentBlock['type']): void {
if (this.currentBlockType === targetType) {
return;
}
if (this.currentBlockType === null) {
this.currentBlockType = targetType;
return;
}
this.finalizeAssistantMessage();
this.startAssistantMessage();
this.currentBlockType = targetType;
}
}
function partsToString(parts: Part[]): string {
return parts
.map((part) => {
if ('text' in part && typeof part.text === 'string') {
return part.text;
}
return JSON.stringify(part);
})
.join('');
}
function toolResultContent(response: ToolCallResponseInfo): string | undefined {
if (
typeof response.resultDisplay === 'string' &&
response.resultDisplay.trim().length > 0
) {
return response.resultDisplay;
}
if (response.responseParts && response.responseParts.length > 0) {
return partsToString(response.responseParts);
}
if (response.error) {
return response.error.message;
}
return undefined;
}
function extractTextFromBlocks(blocks: ContentBlock[]): string {
return blocks
.filter((block) => block.type === 'text')
.map((block) => (block.type === 'text' ? block.text : ''))
.join('');
}
function createExtendedUsage(): ExtendedUsage {
return {
input_tokens: 0,
output_tokens: 0,
};
}

View File

@@ -498,7 +498,9 @@ describe('StreamJsonOutputAdapter', () => {
});
const message = adapter.finalizeAssistantMessage();
expect(adapter.lastAssistantMessage).toEqual(message);
// Access protected property for testing
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((adapter as any).lastAssistantMessage).toEqual(message);
});
it('should return same message on subsequent calls', () => {
@@ -720,12 +722,13 @@ describe('StreamJsonOutputAdapter', () => {
it('should handle parent_tool_use_id', () => {
const parts: Part[] = [{ text: 'Tool response' }];
adapter.emitUserMessage(parts, 'tool-id-1');
adapter.emitUserMessage(parts);
const output = stdoutWriteSpy.mock.calls[0][0] as string;
const parsed = JSON.parse(output);
expect(parsed.parent_tool_use_id).toBe('tool-id-1');
// emitUserMessage currently sets parent_tool_use_id to null
expect(parsed.parent_tool_use_id).toBeNull();
});
});
@@ -758,7 +761,7 @@ describe('StreamJsonOutputAdapter', () => {
const parsed = JSON.parse(output);
expect(parsed.type).toBe('user');
expect(parsed.parent_tool_use_id).toBe('tool-1');
expect(parsed.parent_tool_use_id).toBeNull();
const block = parsed.message.content[0];
expect(block).toMatchObject({
type: 'tool_result',

View File

@@ -5,392 +5,44 @@
*/
import { randomUUID } from 'node:crypto';
import type {
Config,
ServerGeminiStreamEvent,
ToolCallRequestInfo,
ToolCallResponseInfo,
} from '@qwen-code/qwen-code-core';
import { GeminiEventType } from '@qwen-code/qwen-code-core';
import type { Part, GenerateContentResponseUsageMetadata } from '@google/genai';
import type { Config } from '@qwen-code/qwen-code-core';
import type {
CLIAssistantMessage,
CLIMessage,
CLIPartialAssistantMessage,
CLIResultMessage,
CLIResultMessageError,
CLIResultMessageSuccess,
CLIUserMessage,
ContentBlock,
ExtendedUsage,
ControlMessage,
StreamEvent,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
Usage,
} from '../types.js';
import type {
JsonOutputAdapterInterface,
ResultOptions,
} from './JsonOutputAdapter.js';
import {
BaseJsonOutputAdapter,
type MessageState,
type ResultOptions,
type JsonOutputAdapterInterface,
} from './BaseJsonOutputAdapter.js';
/**
* Stream JSON output adapter that emits messages immediately
* as they are completed during the streaming process.
* Supports both main agent and subagent messages through distinct APIs.
*/
export class StreamJsonOutputAdapter implements JsonOutputAdapterInterface {
lastAssistantMessage: CLIAssistantMessage | null = null;
// Assistant message building state
private messageId: string | null = null;
private blocks: ContentBlock[] = [];
private openBlocks = new Set<number>();
private usage: Usage = this.createUsage();
private messageStarted = false;
private finalized = false;
private currentBlockType: ContentBlock['type'] | null = null;
export class StreamJsonOutputAdapter
extends BaseJsonOutputAdapter
implements JsonOutputAdapterInterface
{
constructor(
private readonly config: Config,
config: Config,
private readonly includePartialMessages: boolean,
) {}
private createUsage(
metadata?: GenerateContentResponseUsageMetadata | null,
): Usage {
const usage: Usage = {
input_tokens: 0,
output_tokens: 0,
};
if (!metadata) {
return usage;
}
if (typeof metadata.promptTokenCount === 'number') {
usage.input_tokens = metadata.promptTokenCount;
}
if (typeof metadata.candidatesTokenCount === 'number') {
usage.output_tokens = metadata.candidatesTokenCount;
}
if (typeof metadata.cachedContentTokenCount === 'number') {
usage.cache_read_input_tokens = metadata.cachedContentTokenCount;
}
if (typeof metadata.totalTokenCount === 'number') {
usage.total_tokens = metadata.totalTokenCount;
}
return usage;
) {
super(config);
}
private buildMessage(): CLIAssistantMessage {
if (!this.messageId) {
throw new Error('Message not started');
}
// Enforce constraint: assistant message must contain only a single type of ContentBlock
if (this.blocks.length > 0) {
const blockTypes = new Set(this.blocks.map((block) => block.type));
if (blockTypes.size > 1) {
throw new Error(
`Assistant message must contain only one type of ContentBlock, found: ${Array.from(blockTypes).join(', ')}`,
);
}
}
// Determine stop_reason based on content block types
// If the message contains only tool_use blocks, set stop_reason to 'tool_use'
const stopReason =
this.blocks.length > 0 &&
this.blocks.every((block) => block.type === 'tool_use')
? 'tool_use'
: null;
return {
type: 'assistant',
uuid: this.messageId,
session_id: this.config.getSessionId(),
parent_tool_use_id: null,
message: {
id: this.messageId,
type: 'message',
role: 'assistant',
model: this.config.getModel(),
content: this.blocks,
stop_reason: stopReason,
usage: this.usage,
},
};
}
private appendText(fragment: string): void {
if (fragment.length === 0) {
return;
}
this.ensureBlockTypeConsistency('text');
this.ensureMessageStarted();
let current = this.blocks[this.blocks.length - 1];
if (!current || current.type !== 'text') {
current = { type: 'text', text: '' } satisfies TextBlock;
const index = this.blocks.length;
this.blocks.push(current);
this.openBlock(index, current);
}
current.text += fragment;
const index = this.blocks.length - 1;
this.emitStreamEvent({
type: 'content_block_delta',
index,
delta: { type: 'text_delta', text: fragment },
});
}
private appendThinking(subject?: string, description?: string): void {
const fragment = [subject?.trim(), description?.trim()]
.filter((value) => value && value.length > 0)
.join(': ');
if (!fragment) {
return;
}
this.ensureBlockTypeConsistency('thinking');
this.ensureMessageStarted();
let current = this.blocks[this.blocks.length - 1];
if (!current || current.type !== 'thinking') {
current = {
type: 'thinking',
thinking: '',
signature: subject,
} satisfies ThinkingBlock;
const index = this.blocks.length;
this.blocks.push(current);
this.openBlock(index, current);
}
current.thinking = `${current.thinking ?? ''}${fragment}`;
const index = this.blocks.length - 1;
this.emitStreamEvent({
type: 'content_block_delta',
index,
delta: { type: 'thinking_delta', thinking: fragment },
});
}
private appendToolUse(request: ToolCallRequestInfo): void {
this.ensureBlockTypeConsistency('tool_use');
this.ensureMessageStarted();
this.finalizePendingBlocks();
const index = this.blocks.length;
const block: ToolUseBlock = {
type: 'tool_use',
id: request.callId,
name: request.name,
input: request.args,
};
this.blocks.push(block);
this.openBlock(index, block);
this.emitStreamEvent({
type: 'content_block_delta',
index,
delta: {
type: 'input_json_delta',
partial_json: JSON.stringify(request.args ?? {}),
},
});
this.closeBlock(index);
}
private ensureMessageStarted(): void {
if (this.messageStarted) {
return;
}
this.messageStarted = true;
this.emitStreamEvent({
type: 'message_start',
message: {
id: this.messageId!,
role: 'assistant',
model: this.config.getModel(),
},
});
}
private finalizePendingBlocks(): void {
const lastBlock = this.blocks[this.blocks.length - 1];
if (!lastBlock) {
return;
}
if (lastBlock.type === 'text') {
const index = this.blocks.length - 1;
this.closeBlock(index);
} else if (lastBlock.type === 'thinking') {
const index = this.blocks.length - 1;
this.closeBlock(index);
}
}
private openBlock(index: number, block: ContentBlock): void {
this.openBlocks.add(index);
this.emitStreamEvent({
type: 'content_block_start',
index,
content_block: block,
});
}
private closeBlock(index: number): void {
if (!this.openBlocks.has(index)) {
return;
}
this.openBlocks.delete(index);
this.emitStreamEvent({
type: 'content_block_stop',
index,
});
}
private emitStreamEvent(event: StreamEvent): void {
if (!this.includePartialMessages) {
return;
}
const enrichedEvent = this.messageStarted
? ({ ...event, message_id: this.messageId } as StreamEvent & {
message_id: string;
})
: event;
const partial: CLIPartialAssistantMessage = {
type: 'stream_event',
uuid: randomUUID(),
session_id: this.config.getSessionId(),
parent_tool_use_id: null,
event: enrichedEvent,
};
this.emitMessage(partial);
}
startAssistantMessage(): void {
// Reset state for new message
this.messageId = randomUUID();
this.blocks = [];
this.openBlocks = new Set<number>();
this.usage = this.createUsage();
this.messageStarted = false;
this.finalized = false;
this.currentBlockType = null;
}
processEvent(event: ServerGeminiStreamEvent): void {
if (this.finalized) {
return;
}
switch (event.type) {
case GeminiEventType.Content:
this.appendText(event.value);
break;
case GeminiEventType.Citation:
if (typeof event.value === 'string') {
this.appendText(`\n${event.value}`);
}
break;
case GeminiEventType.Thought:
this.appendThinking(event.value.subject, event.value.description);
break;
case GeminiEventType.ToolCallRequest:
this.appendToolUse(event.value);
break;
case GeminiEventType.Finished:
if (event.value?.usageMetadata) {
this.usage = this.createUsage(event.value.usageMetadata);
}
this.finalizePendingBlocks();
break;
default:
break;
}
}
finalizeAssistantMessage(): CLIAssistantMessage {
if (this.finalized) {
return this.buildMessage();
}
this.finalized = true;
this.finalizePendingBlocks();
const orderedOpenBlocks = Array.from(this.openBlocks).sort((a, b) => a - b);
for (const index of orderedOpenBlocks) {
this.closeBlock(index);
}
if (this.messageStarted && this.includePartialMessages) {
this.emitStreamEvent({ type: 'message_stop' });
}
const message = this.buildMessage();
this.lastAssistantMessage = message;
this.emitMessage(message);
return message;
}
emitResult(options: ResultOptions): void {
const baseUuid = randomUUID();
const baseSessionId = this.getSessionId();
const usage = options.usage ?? createExtendedUsage();
const resultText =
options.summary ??
(this.lastAssistantMessage
? extractTextFromBlocks(this.lastAssistantMessage.message.content)
: '');
let message: CLIResultMessage;
if (options.isError) {
const errorMessage = options.errorMessage ?? 'Unknown error';
const errorResult: CLIResultMessageError = {
type: 'result',
subtype:
(options.subtype as CLIResultMessageError['subtype']) ??
'error_during_execution',
uuid: baseUuid,
session_id: baseSessionId,
is_error: true,
duration_ms: options.durationMs,
duration_api_ms: options.apiDurationMs,
num_turns: options.numTurns,
total_cost_usd: options.totalCostUsd ?? 0,
usage,
permission_denials: [],
error: { message: errorMessage },
};
message = errorResult;
} else {
const success: CLIResultMessageSuccess = {
type: 'result',
subtype:
(options.subtype as CLIResultMessageSuccess['subtype']) ?? 'success',
uuid: baseUuid,
session_id: baseSessionId,
is_error: false,
duration_ms: options.durationMs,
duration_api_ms: options.apiDurationMs,
num_turns: options.numTurns,
result: resultText,
total_cost_usd: options.totalCostUsd ?? 0,
usage,
permission_denials: [],
};
message = success;
}
this.emitMessage(message);
}
emitMessage(message: unknown): void {
/**
* Emits message immediately to stdout (stream mode).
*/
protected emitMessageImpl(message: CLIMessage | ControlMessage): void {
// Track assistant messages for result generation
if (
typeof message === 'object' &&
@@ -398,138 +50,251 @@ export class StreamJsonOutputAdapter implements JsonOutputAdapterInterface {
'type' in message &&
message.type === 'assistant'
) {
this.lastAssistantMessage = message as CLIAssistantMessage;
this.updateLastAssistantMessage(message as CLIAssistantMessage);
}
// Emit messages immediately in stream mode
process.stdout.write(`${JSON.stringify(message)}\n`);
}
emitUserMessage(parts: Part[], parentToolUseId: string | null = null): void {
const content = partsToString(parts);
const message: CLIUserMessage = {
type: 'user',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: parentToolUseId,
message: {
role: 'user',
content,
},
};
this.emitMessage(message);
/**
* Stream mode emits stream events when includePartialMessages is enabled.
*/
protected shouldEmitStreamEvents(): boolean {
return this.includePartialMessages;
}
emitToolResult(
request: ToolCallRequestInfo,
response: ToolCallResponseInfo,
): void {
const block: ToolResultBlock = {
type: 'tool_result',
tool_use_id: request.callId,
is_error: Boolean(response.error),
};
const content = toolResultContent(response);
if (content !== undefined) {
block.content = content;
finalizeAssistantMessage(): CLIAssistantMessage {
const state = this.mainAgentMessageState;
if (state.finalized) {
return this.buildMessage(null);
}
state.finalized = true;
this.finalizePendingBlocks(state, null);
const orderedOpenBlocks = Array.from(state.openBlocks).sort(
(a, b) => a - b,
);
for (const index of orderedOpenBlocks) {
this.onBlockClosed(state, index, null);
this.closeBlock(state, index);
}
const message: CLIUserMessage = {
type: 'user',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: request.callId,
message: {
role: 'user',
content: [block],
},
};
this.emitMessage(message);
if (state.messageStarted && this.includePartialMessages) {
this.emitStreamEventIfEnabled({ type: 'message_stop' }, null);
}
const message = this.buildMessage(null);
this.updateLastAssistantMessage(message);
this.emitMessageImpl(message);
return message;
}
emitSystemMessage(subtype: string, data?: unknown): void {
const systemMessage = {
type: 'system',
subtype,
uuid: randomUUID(),
session_id: this.getSessionId(),
data,
} as const;
this.emitMessage(systemMessage);
emitResult(options: ResultOptions): void {
const resultMessage = this.buildResultMessage(
options,
this.lastAssistantMessage,
);
this.emitMessageImpl(resultMessage);
}
getSessionId(): string {
return this.config.getSessionId();
emitMessage(message: CLIMessage | ControlMessage): void {
// In stream mode, emit immediately
this.emitMessageImpl(message);
}
getModel(): string {
return this.config.getModel();
}
// Legacy methods for backward compatibility
send(message: unknown): void {
send(message: CLIMessage | ControlMessage): void {
this.emitMessage(message);
}
/**
* Keeps the assistant message scoped to a single content block type.
* If the requested block type differs from the current message type,
* the existing message is finalized and a fresh assistant message is started
* so that every emitted assistant message contains exactly one block category.
* Overrides base class hook to emit stream event when text block is created.
*/
private ensureBlockTypeConsistency(targetType: ContentBlock['type']): void {
if (this.currentBlockType === targetType) {
protected override onTextBlockCreated(
state: MessageState,
index: number,
block: TextBlock,
parentToolUseId: string | null,
): void {
this.emitStreamEventIfEnabled(
{
type: 'content_block_start',
index,
content_block: block,
},
parentToolUseId,
);
}
/**
* Overrides base class hook to emit stream event when text is appended.
*/
protected override onTextAppended(
state: MessageState,
index: number,
fragment: string,
parentToolUseId: string | null,
): void {
this.emitStreamEventIfEnabled(
{
type: 'content_block_delta',
index,
delta: { type: 'text_delta', text: fragment },
},
parentToolUseId,
);
}
/**
* Overrides base class hook to emit stream event when thinking block is created.
*/
protected override onThinkingBlockCreated(
state: MessageState,
index: number,
block: ThinkingBlock,
parentToolUseId: string | null,
): void {
this.emitStreamEventIfEnabled(
{
type: 'content_block_start',
index,
content_block: block,
},
parentToolUseId,
);
}
/**
* Overrides base class hook to emit stream event when thinking is appended.
*/
protected override onThinkingAppended(
state: MessageState,
index: number,
fragment: string,
parentToolUseId: string | null,
): void {
this.emitStreamEventIfEnabled(
{
type: 'content_block_delta',
index,
delta: { type: 'thinking_delta', thinking: fragment },
},
parentToolUseId,
);
}
/**
* Overrides base class hook to emit stream event when tool_use block is created.
*/
protected override onToolUseBlockCreated(
state: MessageState,
index: number,
block: ToolUseBlock,
parentToolUseId: string | null,
): void {
this.emitStreamEventIfEnabled(
{
type: 'content_block_start',
index,
content_block: block,
},
parentToolUseId,
);
}
/**
* Overrides base class hook to emit stream event when tool_use input is set.
*/
protected override onToolUseInputSet(
state: MessageState,
index: number,
input: unknown,
parentToolUseId: string | null,
): void {
this.emitStreamEventIfEnabled(
{
type: 'content_block_delta',
index,
delta: {
type: 'input_json_delta',
partial_json: JSON.stringify(input),
},
},
parentToolUseId,
);
}
/**
* Overrides base class hook to emit stream event when block is closed.
*/
protected override onBlockClosed(
state: MessageState,
index: number,
parentToolUseId: string | null,
): void {
if (this.includePartialMessages) {
this.emitStreamEventIfEnabled(
{
type: 'content_block_stop',
index,
},
parentToolUseId,
);
}
}
/**
* Overrides base class hook to emit message_start event when message is started.
* Only emits for main agent, not for subagents.
*/
protected override onEnsureMessageStarted(
state: MessageState,
parentToolUseId: string | null,
): void {
// Only emit message_start for main agent, not for subagents
if (parentToolUseId === null) {
this.emitStreamEventIfEnabled(
{
type: 'message_start',
message: {
id: state.messageId!,
role: 'assistant',
model: this.config.getModel(),
},
},
null,
);
}
}
/**
* Emits stream events when partial messages are enabled.
* This is a private method specific to StreamJsonOutputAdapter.
* @param event - Stream event to emit
* @param parentToolUseId - null for main agent, string for subagent
*/
private emitStreamEventIfEnabled(
event: StreamEvent,
parentToolUseId: string | null,
): void {
if (!this.includePartialMessages) {
return;
}
if (this.currentBlockType === null) {
this.currentBlockType = targetType;
return;
}
const state = this.getMessageState(parentToolUseId);
const enrichedEvent = state.messageStarted
? ({ ...event, message_id: state.messageId } as StreamEvent & {
message_id: string;
})
: event;
this.finalizeAssistantMessage();
this.startAssistantMessage();
this.currentBlockType = targetType;
const partial: CLIPartialAssistantMessage = {
type: 'stream_event',
uuid: randomUUID(),
session_id: this.getSessionId(),
parent_tool_use_id: parentToolUseId,
event: enrichedEvent,
};
this.emitMessageImpl(partial);
}
}
function partsToString(parts: Part[]): string {
return parts
.map((part) => {
if ('text' in part && typeof part.text === 'string') {
return part.text;
}
return JSON.stringify(part);
})
.join('');
}
function toolResultContent(response: ToolCallResponseInfo): string | undefined {
if (
typeof response.resultDisplay === 'string' &&
response.resultDisplay.trim().length > 0
) {
return response.resultDisplay;
}
if (response.responseParts && response.responseParts.length > 0) {
return partsToString(response.responseParts);
}
if (response.error) {
return response.error.message;
}
return undefined;
}
function extractTextFromBlocks(blocks: ContentBlock[]): string {
return blocks
.filter((block) => block.type === 'text')
.map((block) => (block.type === 'text' ? block.text : ''))
.join('');
}
function createExtendedUsage(): ExtendedUsage {
return {
input_tokens: 0,
output_tokens: 0,
};
}