diff --git a/packages/cli/src/streamJson/types.ts b/packages/cli/src/streamJson/types.ts new file mode 100644 index 00000000..4d451df4 --- /dev/null +++ b/packages/cli/src/streamJson/types.ts @@ -0,0 +1,183 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +export type StreamJsonFormat = 'text' | 'stream-json'; + +export interface StreamJsonAnnotation { + type: string; + value: string; +} + +export interface StreamJsonTextBlock { + type: 'text'; + text: string; + annotations?: StreamJsonAnnotation[]; +} + +export interface StreamJsonThinkingBlock { + type: 'thinking'; + thinking: string; + signature?: string; + annotations?: StreamJsonAnnotation[]; +} + +export interface StreamJsonToolUseBlock { + type: 'tool_use'; + id: string; + name: string; + input: unknown; + annotations?: StreamJsonAnnotation[]; +} + +export interface StreamJsonToolResultBlock { + type: 'tool_result'; + tool_use_id: string; + content?: StreamJsonContentBlock[] | string; + is_error?: boolean; + annotations?: StreamJsonAnnotation[]; +} + +export type StreamJsonContentBlock = + | StreamJsonTextBlock + | StreamJsonThinkingBlock + | StreamJsonToolUseBlock + | StreamJsonToolResultBlock; + +export interface StreamJsonAssistantEnvelope { + type: 'assistant'; + message: { + role: 'assistant'; + model?: string; + content: StreamJsonContentBlock[]; + }; + parent_tool_use_id?: string; +} + +export interface StreamJsonUserEnvelope { + type: 'user'; + message: { + role?: 'user'; + content: string | StreamJsonContentBlock[]; + }; + parent_tool_use_id?: string; + options?: Record; +} + +export interface StreamJsonSystemEnvelope { + type: 'system'; + subtype?: string; + session_id?: string; + data?: unknown; +} + +export interface StreamJsonUsage { + input_tokens?: number; + output_tokens?: number; + total_tokens?: number; + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; +} + +export interface StreamJsonResultEnvelope { + type: 'result'; + subtype?: string; + duration_ms?: number; + duration_api_ms?: number; + num_turns?: number; + session_id?: string; + is_error?: boolean; + summary?: string; + usage?: StreamJsonUsage; + total_cost_usd?: number; + error?: { type?: string; message: string; [key: string]: unknown }; + [key: string]: unknown; +} + +export interface StreamJsonMessageStreamEvent { + type: string; + index?: number; + delta?: unknown; + [key: string]: unknown; +} + +export interface StreamJsonStreamEventEnvelope { + type: 'stream_event'; + uuid: string; + session_id?: string; + event: StreamJsonMessageStreamEvent; +} + +export interface StreamJsonControlRequestEnvelope { + type: 'control_request'; + request_id: string; + request: { + subtype: string; + [key: string]: unknown; + }; +} + +export interface StreamJsonControlResponseEnvelope { + type: 'control_response'; + request_id: string; + success?: boolean; + response?: unknown; + error?: string | { message: string; [key: string]: unknown }; +} + +export interface StreamJsonControlCancelRequestEnvelope { + type: 'control_cancel_request'; + request_id?: string; + reason?: string; +} + +export type StreamJsonOutputEnvelope = + | StreamJsonAssistantEnvelope + | StreamJsonUserEnvelope + | StreamJsonSystemEnvelope + | StreamJsonResultEnvelope + | StreamJsonStreamEventEnvelope + | StreamJsonControlRequestEnvelope + | StreamJsonControlResponseEnvelope + | StreamJsonControlCancelRequestEnvelope; + +export type StreamJsonInputEnvelope = + | StreamJsonUserEnvelope + | StreamJsonControlRequestEnvelope + | StreamJsonControlResponseEnvelope + | StreamJsonControlCancelRequestEnvelope; + +export type StreamJsonEnvelope = + | StreamJsonOutputEnvelope + | StreamJsonInputEnvelope; + +export function serializeStreamJsonEnvelope( + envelope: StreamJsonOutputEnvelope, +): string { + return JSON.stringify(envelope); +} + +export class StreamJsonParseError extends Error {} + +export function parseStreamJsonEnvelope(line: string): StreamJsonEnvelope { + let parsed: unknown; + try { + parsed = JSON.parse(line) as StreamJsonEnvelope; + } catch (error) { + throw new StreamJsonParseError( + `Failed to parse stream-json line: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + } + if (!parsed || typeof parsed !== 'object') { + throw new StreamJsonParseError('Parsed value is not an object'); + } + const type = (parsed as { type?: unknown }).type; + if (typeof type !== 'string') { + throw new StreamJsonParseError('Missing required "type" field'); + } + return parsed as StreamJsonEnvelope; +} diff --git a/packages/cli/src/streamJson/writer.test.ts b/packages/cli/src/streamJson/writer.test.ts new file mode 100644 index 00000000..bc598496 --- /dev/null +++ b/packages/cli/src/streamJson/writer.test.ts @@ -0,0 +1,146 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { Config, ToolCallRequestInfo } from '@qwen-code/qwen-code-core'; +import { StreamJsonWriter } from './writer.js'; + +function createConfig(): Config { + return { + getSessionId: () => 'session-test', + getModel: () => 'model-test', + } as unknown as Config; +} + +function parseEnvelopes(writes: string[]): unknown[] { + return writes + .join('') + .split('\n') + .filter((line) => line.trim().length > 0) + .map((line) => JSON.parse(line)); +} + +describe('StreamJsonWriter', () => { + let writes: string[]; + + beforeEach(() => { + writes = []; + vi.spyOn(process.stdout, 'write').mockImplementation( + (chunk: string | Uint8Array) => { + if (typeof chunk === 'string') { + writes.push(chunk); + } else { + writes.push(Buffer.from(chunk).toString('utf8')); + } + return true; + }, + ); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('emits result envelopes with usage and cost details', () => { + const writer = new StreamJsonWriter(createConfig(), false); + writer.emitResult({ + isError: false, + numTurns: 2, + durationMs: 1200, + apiDurationMs: 800, + usage: { + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + cache_read_input_tokens: 2, + }, + totalCostUsd: 0.123, + summary: 'Completed', + subtype: 'session_summary', + }); + + const [envelope] = parseEnvelopes(writes) as Array>; + expect(envelope).toMatchObject({ + type: 'result', + duration_ms: 1200, + duration_api_ms: 800, + usage: { + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + cache_read_input_tokens: 2, + }, + total_cost_usd: 0.123, + summary: 'Completed', + subtype: 'session_summary', + is_error: false, + }); + }); + + it('emits thinking deltas and assistant messages for thought blocks', () => { + const writer = new StreamJsonWriter(createConfig(), true); + const builder = writer.createAssistantBuilder(); + builder.appendThinking('Reflecting'); + builder.appendThinking(' more'); + builder.finalize(); + + const envelopes = parseEnvelopes(writes) as Array>; + + expect( + envelopes.some( + (env) => + env.type === 'stream_event' && + env.event?.type === 'content_block_delta' && + env.event?.delta?.type === 'thinking_delta', + ), + ).toBe(true); + + const assistantEnvelope = envelopes.find((env) => env.type === 'assistant'); + expect(assistantEnvelope?.message?.content?.[0]).toEqual({ + type: 'thinking', + thinking: 'Reflecting more', + }); + }); + + it('emits input_json_delta events when tool calls are appended', () => { + const writer = new StreamJsonWriter(createConfig(), true); + const builder = writer.createAssistantBuilder(); + const request: ToolCallRequestInfo = { + callId: 'tool-123', + name: 'write_file', + args: { path: 'foo.ts', content: 'console.log(1);' }, + isClientInitiated: false, + prompt_id: 'prompt-1', + }; + + builder.appendToolUse(request); + builder.finalize(); + + const envelopes = parseEnvelopes(writes) as Array>; + + expect( + envelopes.some( + (env) => + env.type === 'stream_event' && + env.event?.type === 'content_block_delta' && + env.event?.delta?.type === 'input_json_delta', + ), + ).toBe(true); + }); + + it('includes session id in system messages', () => { + const writer = new StreamJsonWriter(createConfig(), false); + writer.emitSystemMessage('init', { foo: 'bar' }); + + const [envelope] = parseEnvelopes(writes) as Array>; + expect(envelope).toMatchObject({ + type: 'system', + subtype: 'init', + session_id: 'session-test', + data: { foo: 'bar' }, + }); + }); +}); diff --git a/packages/cli/src/streamJson/writer.ts b/packages/cli/src/streamJson/writer.ts new file mode 100644 index 00000000..acb8bd50 --- /dev/null +++ b/packages/cli/src/streamJson/writer.ts @@ -0,0 +1,357 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { randomUUID } from 'node:crypto'; +import type { + Config, + ToolCallRequestInfo, + ToolCallResponseInfo, +} from '@qwen-code/qwen-code-core'; +import type { Part } from '@google/genai'; +import { + serializeStreamJsonEnvelope, + type StreamJsonAssistantEnvelope, + type StreamJsonContentBlock, + type StreamJsonMessageStreamEvent, + type StreamJsonOutputEnvelope, + type StreamJsonStreamEventEnvelope, + type StreamJsonUsage, + type StreamJsonToolResultBlock, +} from './types.js'; + +export interface StreamJsonResultOptions { + readonly isError: boolean; + readonly errorMessage?: string; + readonly durationMs?: number; + readonly apiDurationMs?: number; + readonly numTurns: number; + readonly usage?: StreamJsonUsage; + readonly totalCostUsd?: number; + readonly summary?: string; + readonly subtype?: string; +} + +export class StreamJsonWriter { + private readonly includePartialMessages: boolean; + private readonly sessionId: string; + private readonly model: string; + + constructor(config: Config, includePartialMessages: boolean) { + this.includePartialMessages = includePartialMessages; + this.sessionId = config.getSessionId(); + this.model = config.getModel(); + } + + createAssistantBuilder(): StreamJsonAssistantMessageBuilder { + return new StreamJsonAssistantMessageBuilder( + this, + this.includePartialMessages, + this.sessionId, + this.model, + ); + } + + emitUserMessageFromParts(parts: Part[], parentToolUseId?: string): void { + const envelope: StreamJsonOutputEnvelope = { + type: 'user', + message: { + role: 'user', + content: this.partsToString(parts), + }, + parent_tool_use_id: parentToolUseId, + }; + this.writeEnvelope(envelope); + } + + emitToolResult( + request: ToolCallRequestInfo, + response: ToolCallResponseInfo, + ): void { + const block: StreamJsonToolResultBlock = { + type: 'tool_result', + tool_use_id: request.callId, + is_error: Boolean(response.error), + }; + const content = this.toolResultContent(response); + if (content !== undefined) { + block.content = content; + } + + const envelope: StreamJsonOutputEnvelope = { + type: 'user', + message: { + content: [block], + }, + parent_tool_use_id: request.callId, + }; + this.writeEnvelope(envelope); + } + + emitResult(options: StreamJsonResultOptions): void { + const envelope: StreamJsonOutputEnvelope = { + type: 'result', + subtype: + options.subtype ?? (options.isError ? 'error' : 'session_summary'), + is_error: options.isError, + session_id: this.sessionId, + num_turns: options.numTurns, + }; + + if (typeof options.durationMs === 'number') { + envelope.duration_ms = options.durationMs; + } + if (typeof options.apiDurationMs === 'number') { + envelope.duration_api_ms = options.apiDurationMs; + } + if (options.summary) { + envelope.summary = options.summary; + } + if (options.usage) { + envelope.usage = options.usage; + } + if (typeof options.totalCostUsd === 'number') { + envelope.total_cost_usd = options.totalCostUsd; + } + if (options.errorMessage) { + envelope.error = { message: options.errorMessage }; + } + + this.writeEnvelope(envelope); + } + + emitSystemMessage(subtype: string, data?: unknown): void { + const envelope: StreamJsonOutputEnvelope = { + type: 'system', + subtype, + session_id: this.sessionId, + data, + }; + this.writeEnvelope(envelope); + } + + emitStreamEvent(event: StreamJsonMessageStreamEvent): void { + if (!this.includePartialMessages) { + return; + } + const envelope: StreamJsonStreamEventEnvelope = { + type: 'stream_event', + uuid: randomUUID(), + session_id: this.sessionId, + event, + }; + this.writeEnvelope(envelope); + } + + writeEnvelope(envelope: StreamJsonOutputEnvelope): void { + const line = serializeStreamJsonEnvelope(envelope); + process.stdout.write(`${line}\n`); + } + + private toolResultContent( + response: ToolCallResponseInfo, + ): string | undefined { + if (typeof response.resultDisplay === 'string') { + return response.resultDisplay; + } + if (response.responseParts && response.responseParts.length > 0) { + return this.partsToString(response.responseParts); + } + if (response.error) { + return response.error.message; + } + return undefined; + } + + private partsToString(parts: Part[]): string { + return parts + .map((part) => { + if ('text' in part && typeof part.text === 'string') { + return part.text; + } + return JSON.stringify(part); + }) + .join(''); + } +} + +class StreamJsonAssistantMessageBuilder { + private readonly blocks: StreamJsonContentBlock[] = []; + private readonly openBlocks = new Set(); + private started = false; + private finalized = false; + private messageId: string | null = null; + + constructor( + private readonly writer: StreamJsonWriter, + private readonly includePartialMessages: boolean, + private readonly sessionId: string, + private readonly model: string, + ) {} + + appendText(fragment: string): void { + if (this.finalized) { + return; + } + this.ensureMessageStarted(); + + let currentBlock = this.blocks[this.blocks.length - 1]; + if (!currentBlock || currentBlock.type !== 'text') { + currentBlock = { type: 'text', text: '' }; + const index = this.blocks.length; + this.blocks.push(currentBlock); + this.openBlock(index, currentBlock); + } + + currentBlock.text += fragment; + const index = this.blocks.length - 1; + this.emitEvent({ + type: 'content_block_delta', + index, + delta: { type: 'text_delta', text: fragment }, + }); + } + + appendThinking(fragment: string): void { + if (this.finalized) { + return; + } + this.ensureMessageStarted(); + + let currentBlock = this.blocks[this.blocks.length - 1]; + if (!currentBlock || currentBlock.type !== 'thinking') { + currentBlock = { type: 'thinking', thinking: '' }; + const index = this.blocks.length; + this.blocks.push(currentBlock); + this.openBlock(index, currentBlock); + } + + currentBlock.thinking = `${currentBlock.thinking ?? ''}${fragment}`; + const index = this.blocks.length - 1; + this.emitEvent({ + type: 'content_block_delta', + index, + delta: { type: 'thinking_delta', thinking: fragment }, + }); + } + + appendToolUse(request: ToolCallRequestInfo): void { + if (this.finalized) { + return; + } + this.ensureMessageStarted(); + const index = this.blocks.length; + const block: StreamJsonContentBlock = { + type: 'tool_use', + id: request.callId, + name: request.name, + input: request.args, + }; + this.blocks.push(block); + this.openBlock(index, block); + this.emitEvent({ + type: 'content_block_delta', + index, + delta: { + type: 'input_json_delta', + partial_json: JSON.stringify(request.args ?? {}), + }, + }); + this.closeBlock(index); + } + + finalize(): StreamJsonAssistantEnvelope { + if (this.finalized) { + return { + type: 'assistant', + message: { + role: 'assistant', + model: this.model, + content: this.blocks, + }, + }; + } + this.finalized = true; + + const orderedOpenBlocks = [...this.openBlocks].sort((a, b) => a - b); + for (const index of orderedOpenBlocks) { + this.closeBlock(index); + } + + if (this.includePartialMessages && this.started) { + this.emitEvent({ + type: 'message_stop', + message: { + type: 'assistant', + role: 'assistant', + model: this.model, + session_id: this.sessionId, + id: this.messageId ?? undefined, + }, + }); + } + + const envelope: StreamJsonAssistantEnvelope = { + type: 'assistant', + message: { + role: 'assistant', + model: this.model, + content: this.blocks, + }, + }; + this.writer.writeEnvelope(envelope); + return envelope; + } + + private ensureMessageStarted(): void { + if (this.started) { + return; + } + this.started = true; + if (!this.messageId) { + this.messageId = randomUUID(); + } + this.emitEvent({ + type: 'message_start', + message: { + type: 'assistant', + role: 'assistant', + model: this.model, + session_id: this.sessionId, + id: this.messageId, + }, + }); + } + + private openBlock(index: number, block: StreamJsonContentBlock): void { + this.openBlocks.add(index); + this.emitEvent({ + type: 'content_block_start', + index, + content_block: block, + }); + } + + private closeBlock(index: number): void { + if (!this.openBlocks.has(index)) { + return; + } + this.openBlocks.delete(index); + this.emitEvent({ + type: 'content_block_stop', + index, + }); + } + + private emitEvent(event: StreamJsonMessageStreamEvent): void { + if (!this.includePartialMessages) { + return; + } + const enriched = this.messageId + ? { ...event, message_id: this.messageId } + : event; + this.writer.emitStreamEvent(enriched); + } +}