From f5f378f262657dfa5940657a2cb1bc9d34d14ca3 Mon Sep 17 00:00:00 2001 From: x22x22 Date: Thu, 30 Oct 2025 12:59:05 +0800 Subject: [PATCH] openspec/lightweight-tasks/task1-2-4-1.md Implement control request handling and refactor related functions - Added `handleIncomingControlRequest` method to `StreamJsonController` for processing control requests. - Created `input.test.ts` and `session.test.ts` to test control request handling. - Refactored `runStreamJsonSession` to delegate control requests to the controller. - Moved `extractUserMessageText` and `writeStreamJsonEnvelope` to a new `io.ts` file for better organization. - Updated tests to ensure proper functionality of control responses and message extraction. --- packages/cli/src/streamJson/controller.ts | 39 ++++++++++++ packages/cli/src/streamJson/input.test.ts | 47 ++++++++++++++ packages/cli/src/streamJson/input.ts | 34 ++--------- packages/cli/src/streamJson/io.ts | 41 +++++++++++++ packages/cli/src/streamJson/session.test.ts | 68 +++++++++++++++++++++ packages/cli/src/streamJson/session.ts | 43 +------------ packages/cli/src/streamJson/writer.test.ts | 40 +++++++----- packages/cli/src/streamJson/writer.ts | 5 +- 8 files changed, 228 insertions(+), 89 deletions(-) create mode 100644 packages/cli/src/streamJson/input.test.ts create mode 100644 packages/cli/src/streamJson/io.ts create mode 100644 packages/cli/src/streamJson/session.test.ts diff --git a/packages/cli/src/streamJson/controller.ts b/packages/cli/src/streamJson/controller.ts index 4214b89b..7ed8fe71 100644 --- a/packages/cli/src/streamJson/controller.ts +++ b/packages/cli/src/streamJson/controller.ts @@ -5,9 +5,11 @@ */ import { randomUUID } from 'node:crypto'; +import type { Config } from '@qwen-code/qwen-code-core'; import type { StreamJsonWriter } from './writer.js'; import type { StreamJsonControlCancelRequestEnvelope, + StreamJsonControlRequestEnvelope, StreamJsonControlResponseEnvelope, StreamJsonOutputEnvelope, } from './types.js'; @@ -28,6 +30,43 @@ export class StreamJsonController { constructor(private readonly writer: StreamJsonWriter) {} + handleIncomingControlRequest( + config: Config, + envelope: StreamJsonControlRequestEnvelope, + ): boolean { + const subtype = envelope.request?.subtype; + switch (subtype) { + case 'initialize': + this.writer.emitSystemMessage('session_initialized', { + session_id: config.getSessionId(), + }); + this.writer.writeEnvelope({ + type: 'control_response', + request_id: envelope.request_id, + success: true, + response: { subtype: 'initialize' }, + }); + return true; + case 'interrupt': + this.interruptActiveRun(); + this.writer.writeEnvelope({ + type: 'control_response', + request_id: envelope.request_id, + success: true, + response: { subtype: 'interrupt' }, + }); + return true; + default: + this.writer.writeEnvelope({ + type: 'control_response', + request_id: envelope.request_id, + success: false, + error: `Unsupported control_request subtype: ${subtype ?? 'unknown'}`, + }); + return false; + } + } + sendControlRequest( subtype: string, payload: Record, diff --git a/packages/cli/src/streamJson/input.test.ts b/packages/cli/src/streamJson/input.test.ts new file mode 100644 index 00000000..107b485e --- /dev/null +++ b/packages/cli/src/streamJson/input.test.ts @@ -0,0 +1,47 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { parseStreamJsonInputFromIterable } from './input.js'; +import * as ioModule from './io.js'; + +describe('parseStreamJsonInputFromIterable', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('uses the shared stream writer for control responses', async () => { + const writeSpy = vi + .spyOn(ioModule, 'writeStreamJsonEnvelope') + .mockImplementation(() => {}); + + async function* makeLines(): AsyncGenerator { + yield JSON.stringify({ + type: 'control_request', + request_id: 'req-init', + request: { subtype: 'initialize' }, + }); + yield JSON.stringify({ + type: 'user', + message: { + role: 'user', + content: [{ type: 'text', text: 'hello world' }], + }, + }); + } + + const result = await parseStreamJsonInputFromIterable(makeLines()); + + expect(result.prompt).toBe('hello world'); + expect(writeSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'control_response', + request_id: 'req-init', + success: true, + }), + ); + }); +}); diff --git a/packages/cli/src/streamJson/input.ts b/packages/cli/src/streamJson/input.ts index 0da040f7..946e3a74 100644 --- a/packages/cli/src/streamJson/input.ts +++ b/packages/cli/src/streamJson/input.ts @@ -8,12 +8,11 @@ import { createInterface } from 'node:readline/promises'; import process from 'node:process'; import { parseStreamJsonEnvelope, - serializeStreamJsonEnvelope, type StreamJsonControlRequestEnvelope, type StreamJsonOutputEnvelope, - type StreamJsonUserEnvelope, } from './types.js'; import { FatalInputError } from '@qwen-code/qwen-code-core'; +import { extractUserMessageText, writeStreamJsonEnvelope } from './io.js'; export interface ParsedStreamJsonInput { prompt: string; @@ -35,7 +34,9 @@ export async function readStreamJsonInput(): Promise { export async function parseStreamJsonInputFromIterable( lines: AsyncIterable, - emitEnvelope: (envelope: StreamJsonOutputEnvelope) => void = writeEnvelope, + emitEnvelope: ( + envelope: StreamJsonOutputEnvelope, + ) => void = writeStreamJsonEnvelope, ): Promise { const promptParts: string[] = []; let receivedUserMessage = false; @@ -104,29 +105,4 @@ function handleControlRequest( }); } -export function extractUserMessageText( - envelope: StreamJsonUserEnvelope, -): string { - const content = envelope.message?.content; - if (typeof content === 'string') { - return content; - } - if (Array.isArray(content)) { - return content - .map((block) => { - if (block && typeof block === 'object' && 'type' in block) { - if (block.type === 'text' && 'text' in block) { - return block.text ?? ''; - } - return JSON.stringify(block); - } - return ''; - }) - .join('\n'); - } - return ''; -} - -function writeEnvelope(envelope: StreamJsonOutputEnvelope): void { - process.stdout.write(`${serializeStreamJsonEnvelope(envelope)}\n`); -} +export { extractUserMessageText } from './io.js'; diff --git a/packages/cli/src/streamJson/io.ts b/packages/cli/src/streamJson/io.ts new file mode 100644 index 00000000..dd0e1299 --- /dev/null +++ b/packages/cli/src/streamJson/io.ts @@ -0,0 +1,41 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import process from 'node:process'; +import { + serializeStreamJsonEnvelope, + type StreamJsonOutputEnvelope, + type StreamJsonUserEnvelope, +} from './types.js'; + +export function writeStreamJsonEnvelope( + envelope: StreamJsonOutputEnvelope, +): void { + process.stdout.write(`${serializeStreamJsonEnvelope(envelope)}\n`); +} + +export function extractUserMessageText( + envelope: StreamJsonUserEnvelope, +): string { + const content = envelope.message?.content; + if (typeof content === 'string') { + return content; + } + if (Array.isArray(content)) { + return content + .map((block) => { + if (block && typeof block === 'object' && 'type' in block) { + if (block.type === 'text' && 'text' in block) { + return block.text ?? ''; + } + return JSON.stringify(block); + } + return ''; + }) + .join('\n'); + } + return ''; +} diff --git a/packages/cli/src/streamJson/session.test.ts b/packages/cli/src/streamJson/session.test.ts new file mode 100644 index 00000000..a9fa3dd9 --- /dev/null +++ b/packages/cli/src/streamJson/session.test.ts @@ -0,0 +1,68 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { PassThrough } from 'node:stream'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { Config } from '@qwen-code/qwen-code-core'; +import { runStreamJsonSession } from './session.js'; +import { StreamJsonController } from './controller.js'; +import { StreamJsonWriter } from './writer.js'; +import type { LoadedSettings } from '../config/settings.js'; + +vi.mock('../nonInteractiveCli.js', () => ({ + runNonInteractive: vi.fn().mockResolvedValue(undefined), +})); + +function createConfig(): Config { + return { + getIncludePartialMessages: () => false, + getSessionId: () => 'session-test', + getModel: () => 'model-test', + } as unknown as Config; +} + +describe('runStreamJsonSession', () => { + let settings: LoadedSettings; + + beforeEach(() => { + vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + settings = {} as LoadedSettings; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('delegates incoming control requests to the controller', async () => { + const controllerPrototype = StreamJsonController.prototype as unknown as { + handleIncomingControlRequest: (...args: unknown[]) => unknown; + }; + const handleSpy = vi.spyOn( + controllerPrototype, + 'handleIncomingControlRequest', + ); + + const inputStream = new PassThrough(); + const config = createConfig(); + + const controlRequest = { + type: 'control_request', + request_id: 'req-1', + request: { subtype: 'initialize' }, + }; + + inputStream.end(`${JSON.stringify(controlRequest)}\n`); + + await runStreamJsonSession(config, settings, undefined, { + input: inputStream, + writer: new StreamJsonWriter(config, false), + }); + + expect(handleSpy).toHaveBeenCalledTimes(1); + const firstCall = handleSpy.mock.calls[0] as unknown[] | undefined; + expect(firstCall?.[1]).toMatchObject(controlRequest); + }); +}); diff --git a/packages/cli/src/streamJson/session.ts b/packages/cli/src/streamJson/session.ts index de9ac9c6..187b4bde 100644 --- a/packages/cli/src/streamJson/session.ts +++ b/packages/cli/src/streamJson/session.ts @@ -9,10 +9,9 @@ import type { Config } from '@qwen-code/qwen-code-core'; import { parseStreamJsonEnvelope, type StreamJsonEnvelope, - type StreamJsonControlRequestEnvelope, type StreamJsonUserEnvelope, } from './types.js'; -import { extractUserMessageText } from './input.js'; +import { extractUserMessageText } from './io.js'; import { StreamJsonWriter } from './writer.js'; import { StreamJsonController } from './controller.js'; import { runNonInteractive } from '../nonInteractiveCli.js'; @@ -124,7 +123,7 @@ export async function runStreamJsonSession( }); break; case 'control_request': - await handleControlRequest(config, controller, envelope, writer); + controller.handleIncomingControlRequest(config, envelope); break; case 'control_response': controller.handleControlResponse(envelope); @@ -174,41 +173,3 @@ async function handleUserPrompt( userEnvelope: job.envelope, }); } - -async function handleControlRequest( - config: Config, - controller: StreamJsonController, - envelope: StreamJsonControlRequestEnvelope, - writer: StreamJsonWriter, -): Promise { - const subtype = envelope.request?.subtype; - switch (subtype) { - case 'initialize': - writer.emitSystemMessage('session_initialized', { - session_id: config.getSessionId(), - }); - controller.handleControlResponse({ - type: 'control_response', - request_id: envelope.request_id, - success: true, - response: { subtype: 'initialize' }, - }); - break; - case 'interrupt': - controller.interruptActiveRun(); - controller.handleControlResponse({ - type: 'control_response', - request_id: envelope.request_id, - success: true, - response: { subtype: 'interrupt' }, - }); - break; - default: - controller.handleControlResponse({ - type: 'control_response', - request_id: envelope.request_id, - success: false, - error: `Unsupported control_request subtype: ${subtype ?? 'unknown'}`, - }); - } -} diff --git a/packages/cli/src/streamJson/writer.test.ts b/packages/cli/src/streamJson/writer.test.ts index 6c9ece11..7e7639a8 100644 --- a/packages/cli/src/streamJson/writer.test.ts +++ b/packages/cli/src/streamJson/writer.test.ts @@ -90,14 +90,18 @@ describe('StreamJsonWriter', () => { const envelopes = parseEnvelopes(writes); - expect( - envelopes.some( - (env) => - env.type === 'stream_event' && - env.event?.type === 'content_block_delta' && - env.event?.delta?.type === 'thinking_delta', - ), - ).toBe(true); + const hasThinkingDelta = envelopes.some((env) => { + if (env.type !== 'stream_event') { + return false; + } + if (env.event?.type !== 'content_block_delta') { + return false; + } + const delta = env.event.delta as { type?: string } | undefined; + return delta?.type === 'thinking_delta'; + }); + + expect(hasThinkingDelta).toBe(true); const assistantEnvelope = envelopes.find((env) => env.type === 'assistant'); expect(assistantEnvelope?.message.content?.[0]).toEqual({ @@ -122,14 +126,18 @@ describe('StreamJsonWriter', () => { const envelopes = parseEnvelopes(writes); - expect( - envelopes.some( - (env) => - env.type === 'stream_event' && - env.event?.type === 'content_block_delta' && - env.event?.delta?.type === 'input_json_delta', - ), - ).toBe(true); + const hasInputJsonDelta = envelopes.some((env) => { + if (env.type !== 'stream_event') { + return false; + } + if (env.event?.type !== 'content_block_delta') { + return false; + } + const delta = env.event.delta as { type?: string } | undefined; + return delta?.type === 'input_json_delta'; + }); + + expect(hasInputJsonDelta).toBe(true); }); it('includes session id in system messages', () => { diff --git a/packages/cli/src/streamJson/writer.ts b/packages/cli/src/streamJson/writer.ts index acb8bd50..2f1f3da4 100644 --- a/packages/cli/src/streamJson/writer.ts +++ b/packages/cli/src/streamJson/writer.ts @@ -12,7 +12,6 @@ import type { } from '@qwen-code/qwen-code-core'; import type { Part } from '@google/genai'; import { - serializeStreamJsonEnvelope, type StreamJsonAssistantEnvelope, type StreamJsonContentBlock, type StreamJsonMessageStreamEvent, @@ -21,6 +20,7 @@ import { type StreamJsonUsage, type StreamJsonToolResultBlock, } from './types.js'; +import { writeStreamJsonEnvelope } from './io.js'; export interface StreamJsonResultOptions { readonly isError: boolean; @@ -146,8 +146,7 @@ export class StreamJsonWriter { } writeEnvelope(envelope: StreamJsonOutputEnvelope): void { - const line = serializeStreamJsonEnvelope(envelope); - process.stdout.write(`${line}\n`); + writeStreamJsonEnvelope(envelope); } private toolResultContent(