openspec/lightweight-tasks/task1-2-3.md

feat: add StreamJsonWriter and associated types for structured JSON streaming
This commit is contained in:
x22x22
2025-10-30 02:51:50 +08:00
parent eb1247d31e
commit c7ca1d40fd
3 changed files with 686 additions and 0 deletions

View File

@@ -0,0 +1,183 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
export type StreamJsonFormat = 'text' | 'stream-json';
export interface StreamJsonAnnotation {
type: string;
value: string;
}
export interface StreamJsonTextBlock {
type: 'text';
text: string;
annotations?: StreamJsonAnnotation[];
}
export interface StreamJsonThinkingBlock {
type: 'thinking';
thinking: string;
signature?: string;
annotations?: StreamJsonAnnotation[];
}
export interface StreamJsonToolUseBlock {
type: 'tool_use';
id: string;
name: string;
input: unknown;
annotations?: StreamJsonAnnotation[];
}
export interface StreamJsonToolResultBlock {
type: 'tool_result';
tool_use_id: string;
content?: StreamJsonContentBlock[] | string;
is_error?: boolean;
annotations?: StreamJsonAnnotation[];
}
export type StreamJsonContentBlock =
| StreamJsonTextBlock
| StreamJsonThinkingBlock
| StreamJsonToolUseBlock
| StreamJsonToolResultBlock;
export interface StreamJsonAssistantEnvelope {
type: 'assistant';
message: {
role: 'assistant';
model?: string;
content: StreamJsonContentBlock[];
};
parent_tool_use_id?: string;
}
export interface StreamJsonUserEnvelope {
type: 'user';
message: {
role?: 'user';
content: string | StreamJsonContentBlock[];
};
parent_tool_use_id?: string;
options?: Record<string, unknown>;
}
export interface StreamJsonSystemEnvelope {
type: 'system';
subtype?: string;
session_id?: string;
data?: unknown;
}
export interface StreamJsonUsage {
input_tokens?: number;
output_tokens?: number;
total_tokens?: number;
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
}
export interface StreamJsonResultEnvelope {
type: 'result';
subtype?: string;
duration_ms?: number;
duration_api_ms?: number;
num_turns?: number;
session_id?: string;
is_error?: boolean;
summary?: string;
usage?: StreamJsonUsage;
total_cost_usd?: number;
error?: { type?: string; message: string; [key: string]: unknown };
[key: string]: unknown;
}
export interface StreamJsonMessageStreamEvent {
type: string;
index?: number;
delta?: unknown;
[key: string]: unknown;
}
export interface StreamJsonStreamEventEnvelope {
type: 'stream_event';
uuid: string;
session_id?: string;
event: StreamJsonMessageStreamEvent;
}
export interface StreamJsonControlRequestEnvelope {
type: 'control_request';
request_id: string;
request: {
subtype: string;
[key: string]: unknown;
};
}
export interface StreamJsonControlResponseEnvelope {
type: 'control_response';
request_id: string;
success?: boolean;
response?: unknown;
error?: string | { message: string; [key: string]: unknown };
}
export interface StreamJsonControlCancelRequestEnvelope {
type: 'control_cancel_request';
request_id?: string;
reason?: string;
}
export type StreamJsonOutputEnvelope =
| StreamJsonAssistantEnvelope
| StreamJsonUserEnvelope
| StreamJsonSystemEnvelope
| StreamJsonResultEnvelope
| StreamJsonStreamEventEnvelope
| StreamJsonControlRequestEnvelope
| StreamJsonControlResponseEnvelope
| StreamJsonControlCancelRequestEnvelope;
export type StreamJsonInputEnvelope =
| StreamJsonUserEnvelope
| StreamJsonControlRequestEnvelope
| StreamJsonControlResponseEnvelope
| StreamJsonControlCancelRequestEnvelope;
export type StreamJsonEnvelope =
| StreamJsonOutputEnvelope
| StreamJsonInputEnvelope;
export function serializeStreamJsonEnvelope(
envelope: StreamJsonOutputEnvelope,
): string {
return JSON.stringify(envelope);
}
export class StreamJsonParseError extends Error {}
export function parseStreamJsonEnvelope(line: string): StreamJsonEnvelope {
let parsed: unknown;
try {
parsed = JSON.parse(line) as StreamJsonEnvelope;
} catch (error) {
throw new StreamJsonParseError(
`Failed to parse stream-json line: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
if (!parsed || typeof parsed !== 'object') {
throw new StreamJsonParseError('Parsed value is not an object');
}
const type = (parsed as { type?: unknown }).type;
if (typeof type !== 'string') {
throw new StreamJsonParseError('Missing required "type" field');
}
return parsed as StreamJsonEnvelope;
}

View File

@@ -0,0 +1,146 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import type { Config, ToolCallRequestInfo } from '@qwen-code/qwen-code-core';
import { StreamJsonWriter } from './writer.js';
function createConfig(): Config {
return {
getSessionId: () => 'session-test',
getModel: () => 'model-test',
} as unknown as Config;
}
function parseEnvelopes(writes: string[]): unknown[] {
return writes
.join('')
.split('\n')
.filter((line) => line.trim().length > 0)
.map((line) => JSON.parse(line));
}
describe('StreamJsonWriter', () => {
let writes: string[];
beforeEach(() => {
writes = [];
vi.spyOn(process.stdout, 'write').mockImplementation(
(chunk: string | Uint8Array) => {
if (typeof chunk === 'string') {
writes.push(chunk);
} else {
writes.push(Buffer.from(chunk).toString('utf8'));
}
return true;
},
);
});
afterEach(() => {
vi.restoreAllMocks();
});
it('emits result envelopes with usage and cost details', () => {
const writer = new StreamJsonWriter(createConfig(), false);
writer.emitResult({
isError: false,
numTurns: 2,
durationMs: 1200,
apiDurationMs: 800,
usage: {
input_tokens: 10,
output_tokens: 5,
total_tokens: 15,
cache_read_input_tokens: 2,
},
totalCostUsd: 0.123,
summary: 'Completed',
subtype: 'session_summary',
});
const [envelope] = parseEnvelopes(writes) as Array<Record<string, unknown>>;
expect(envelope).toMatchObject({
type: 'result',
duration_ms: 1200,
duration_api_ms: 800,
usage: {
input_tokens: 10,
output_tokens: 5,
total_tokens: 15,
cache_read_input_tokens: 2,
},
total_cost_usd: 0.123,
summary: 'Completed',
subtype: 'session_summary',
is_error: false,
});
});
it('emits thinking deltas and assistant messages for thought blocks', () => {
const writer = new StreamJsonWriter(createConfig(), true);
const builder = writer.createAssistantBuilder();
builder.appendThinking('Reflecting');
builder.appendThinking(' more');
builder.finalize();
const envelopes = parseEnvelopes(writes) as Array<Record<string, unknown>>;
expect(
envelopes.some(
(env) =>
env.type === 'stream_event' &&
env.event?.type === 'content_block_delta' &&
env.event?.delta?.type === 'thinking_delta',
),
).toBe(true);
const assistantEnvelope = envelopes.find((env) => env.type === 'assistant');
expect(assistantEnvelope?.message?.content?.[0]).toEqual({
type: 'thinking',
thinking: 'Reflecting more',
});
});
it('emits input_json_delta events when tool calls are appended', () => {
const writer = new StreamJsonWriter(createConfig(), true);
const builder = writer.createAssistantBuilder();
const request: ToolCallRequestInfo = {
callId: 'tool-123',
name: 'write_file',
args: { path: 'foo.ts', content: 'console.log(1);' },
isClientInitiated: false,
prompt_id: 'prompt-1',
};
builder.appendToolUse(request);
builder.finalize();
const envelopes = parseEnvelopes(writes) as Array<Record<string, unknown>>;
expect(
envelopes.some(
(env) =>
env.type === 'stream_event' &&
env.event?.type === 'content_block_delta' &&
env.event?.delta?.type === 'input_json_delta',
),
).toBe(true);
});
it('includes session id in system messages', () => {
const writer = new StreamJsonWriter(createConfig(), false);
writer.emitSystemMessage('init', { foo: 'bar' });
const [envelope] = parseEnvelopes(writes) as Array<Record<string, unknown>>;
expect(envelope).toMatchObject({
type: 'system',
subtype: 'init',
session_id: 'session-test',
data: { foo: 'bar' },
});
});
});

View File

@@ -0,0 +1,357 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { randomUUID } from 'node:crypto';
import type {
Config,
ToolCallRequestInfo,
ToolCallResponseInfo,
} from '@qwen-code/qwen-code-core';
import type { Part } from '@google/genai';
import {
serializeStreamJsonEnvelope,
type StreamJsonAssistantEnvelope,
type StreamJsonContentBlock,
type StreamJsonMessageStreamEvent,
type StreamJsonOutputEnvelope,
type StreamJsonStreamEventEnvelope,
type StreamJsonUsage,
type StreamJsonToolResultBlock,
} from './types.js';
export interface StreamJsonResultOptions {
readonly isError: boolean;
readonly errorMessage?: string;
readonly durationMs?: number;
readonly apiDurationMs?: number;
readonly numTurns: number;
readonly usage?: StreamJsonUsage;
readonly totalCostUsd?: number;
readonly summary?: string;
readonly subtype?: string;
}
export class StreamJsonWriter {
private readonly includePartialMessages: boolean;
private readonly sessionId: string;
private readonly model: string;
constructor(config: Config, includePartialMessages: boolean) {
this.includePartialMessages = includePartialMessages;
this.sessionId = config.getSessionId();
this.model = config.getModel();
}
createAssistantBuilder(): StreamJsonAssistantMessageBuilder {
return new StreamJsonAssistantMessageBuilder(
this,
this.includePartialMessages,
this.sessionId,
this.model,
);
}
emitUserMessageFromParts(parts: Part[], parentToolUseId?: string): void {
const envelope: StreamJsonOutputEnvelope = {
type: 'user',
message: {
role: 'user',
content: this.partsToString(parts),
},
parent_tool_use_id: parentToolUseId,
};
this.writeEnvelope(envelope);
}
emitToolResult(
request: ToolCallRequestInfo,
response: ToolCallResponseInfo,
): void {
const block: StreamJsonToolResultBlock = {
type: 'tool_result',
tool_use_id: request.callId,
is_error: Boolean(response.error),
};
const content = this.toolResultContent(response);
if (content !== undefined) {
block.content = content;
}
const envelope: StreamJsonOutputEnvelope = {
type: 'user',
message: {
content: [block],
},
parent_tool_use_id: request.callId,
};
this.writeEnvelope(envelope);
}
emitResult(options: StreamJsonResultOptions): void {
const envelope: StreamJsonOutputEnvelope = {
type: 'result',
subtype:
options.subtype ?? (options.isError ? 'error' : 'session_summary'),
is_error: options.isError,
session_id: this.sessionId,
num_turns: options.numTurns,
};
if (typeof options.durationMs === 'number') {
envelope.duration_ms = options.durationMs;
}
if (typeof options.apiDurationMs === 'number') {
envelope.duration_api_ms = options.apiDurationMs;
}
if (options.summary) {
envelope.summary = options.summary;
}
if (options.usage) {
envelope.usage = options.usage;
}
if (typeof options.totalCostUsd === 'number') {
envelope.total_cost_usd = options.totalCostUsd;
}
if (options.errorMessage) {
envelope.error = { message: options.errorMessage };
}
this.writeEnvelope(envelope);
}
emitSystemMessage(subtype: string, data?: unknown): void {
const envelope: StreamJsonOutputEnvelope = {
type: 'system',
subtype,
session_id: this.sessionId,
data,
};
this.writeEnvelope(envelope);
}
emitStreamEvent(event: StreamJsonMessageStreamEvent): void {
if (!this.includePartialMessages) {
return;
}
const envelope: StreamJsonStreamEventEnvelope = {
type: 'stream_event',
uuid: randomUUID(),
session_id: this.sessionId,
event,
};
this.writeEnvelope(envelope);
}
writeEnvelope(envelope: StreamJsonOutputEnvelope): void {
const line = serializeStreamJsonEnvelope(envelope);
process.stdout.write(`${line}\n`);
}
private toolResultContent(
response: ToolCallResponseInfo,
): string | undefined {
if (typeof response.resultDisplay === 'string') {
return response.resultDisplay;
}
if (response.responseParts && response.responseParts.length > 0) {
return this.partsToString(response.responseParts);
}
if (response.error) {
return response.error.message;
}
return undefined;
}
private partsToString(parts: Part[]): string {
return parts
.map((part) => {
if ('text' in part && typeof part.text === 'string') {
return part.text;
}
return JSON.stringify(part);
})
.join('');
}
}
class StreamJsonAssistantMessageBuilder {
private readonly blocks: StreamJsonContentBlock[] = [];
private readonly openBlocks = new Set<number>();
private started = false;
private finalized = false;
private messageId: string | null = null;
constructor(
private readonly writer: StreamJsonWriter,
private readonly includePartialMessages: boolean,
private readonly sessionId: string,
private readonly model: string,
) {}
appendText(fragment: string): void {
if (this.finalized) {
return;
}
this.ensureMessageStarted();
let currentBlock = this.blocks[this.blocks.length - 1];
if (!currentBlock || currentBlock.type !== 'text') {
currentBlock = { type: 'text', text: '' };
const index = this.blocks.length;
this.blocks.push(currentBlock);
this.openBlock(index, currentBlock);
}
currentBlock.text += fragment;
const index = this.blocks.length - 1;
this.emitEvent({
type: 'content_block_delta',
index,
delta: { type: 'text_delta', text: fragment },
});
}
appendThinking(fragment: string): void {
if (this.finalized) {
return;
}
this.ensureMessageStarted();
let currentBlock = this.blocks[this.blocks.length - 1];
if (!currentBlock || currentBlock.type !== 'thinking') {
currentBlock = { type: 'thinking', thinking: '' };
const index = this.blocks.length;
this.blocks.push(currentBlock);
this.openBlock(index, currentBlock);
}
currentBlock.thinking = `${currentBlock.thinking ?? ''}${fragment}`;
const index = this.blocks.length - 1;
this.emitEvent({
type: 'content_block_delta',
index,
delta: { type: 'thinking_delta', thinking: fragment },
});
}
appendToolUse(request: ToolCallRequestInfo): void {
if (this.finalized) {
return;
}
this.ensureMessageStarted();
const index = this.blocks.length;
const block: StreamJsonContentBlock = {
type: 'tool_use',
id: request.callId,
name: request.name,
input: request.args,
};
this.blocks.push(block);
this.openBlock(index, block);
this.emitEvent({
type: 'content_block_delta',
index,
delta: {
type: 'input_json_delta',
partial_json: JSON.stringify(request.args ?? {}),
},
});
this.closeBlock(index);
}
finalize(): StreamJsonAssistantEnvelope {
if (this.finalized) {
return {
type: 'assistant',
message: {
role: 'assistant',
model: this.model,
content: this.blocks,
},
};
}
this.finalized = true;
const orderedOpenBlocks = [...this.openBlocks].sort((a, b) => a - b);
for (const index of orderedOpenBlocks) {
this.closeBlock(index);
}
if (this.includePartialMessages && this.started) {
this.emitEvent({
type: 'message_stop',
message: {
type: 'assistant',
role: 'assistant',
model: this.model,
session_id: this.sessionId,
id: this.messageId ?? undefined,
},
});
}
const envelope: StreamJsonAssistantEnvelope = {
type: 'assistant',
message: {
role: 'assistant',
model: this.model,
content: this.blocks,
},
};
this.writer.writeEnvelope(envelope);
return envelope;
}
private ensureMessageStarted(): void {
if (this.started) {
return;
}
this.started = true;
if (!this.messageId) {
this.messageId = randomUUID();
}
this.emitEvent({
type: 'message_start',
message: {
type: 'assistant',
role: 'assistant',
model: this.model,
session_id: this.sessionId,
id: this.messageId,
},
});
}
private openBlock(index: number, block: StreamJsonContentBlock): void {
this.openBlocks.add(index);
this.emitEvent({
type: 'content_block_start',
index,
content_block: block,
});
}
private closeBlock(index: number): void {
if (!this.openBlocks.has(index)) {
return;
}
this.openBlocks.delete(index);
this.emitEvent({
type: 'content_block_stop',
index,
});
}
private emitEvent(event: StreamJsonMessageStreamEvent): void {
if (!this.includePartialMessages) {
return;
}
const enriched = this.messageId
? { ...event, message_id: this.messageId }
: event;
this.writer.emitStreamEvent(enriched);
}
}