openspec/lightweight-tasks/task1-2-4-1.md

Implement control request handling and refactor related functions

- Added `handleIncomingControlRequest` method to `StreamJsonController` for processing control requests.
- Created `input.test.ts` and `session.test.ts` to test control request handling.
- Refactored `runStreamJsonSession` to delegate control requests to the controller.
- Moved `extractUserMessageText` and `writeStreamJsonEnvelope` to a new `io.ts` file for better organization.
- Updated tests to ensure proper functionality of control responses and message extraction.
This commit is contained in:
x22x22
2025-10-30 12:59:05 +08:00
parent e25d68afe9
commit f5f378f262
8 changed files with 228 additions and 89 deletions

View File

@@ -5,9 +5,11 @@
*/
import { randomUUID } from 'node:crypto';
import type { Config } from '@qwen-code/qwen-code-core';
import type { StreamJsonWriter } from './writer.js';
import type {
StreamJsonControlCancelRequestEnvelope,
StreamJsonControlRequestEnvelope,
StreamJsonControlResponseEnvelope,
StreamJsonOutputEnvelope,
} from './types.js';
@@ -28,6 +30,43 @@ export class StreamJsonController {
constructor(private readonly writer: StreamJsonWriter) {}
handleIncomingControlRequest(
config: Config,
envelope: StreamJsonControlRequestEnvelope,
): boolean {
const subtype = envelope.request?.subtype;
switch (subtype) {
case 'initialize':
this.writer.emitSystemMessage('session_initialized', {
session_id: config.getSessionId(),
});
this.writer.writeEnvelope({
type: 'control_response',
request_id: envelope.request_id,
success: true,
response: { subtype: 'initialize' },
});
return true;
case 'interrupt':
this.interruptActiveRun();
this.writer.writeEnvelope({
type: 'control_response',
request_id: envelope.request_id,
success: true,
response: { subtype: 'interrupt' },
});
return true;
default:
this.writer.writeEnvelope({
type: 'control_response',
request_id: envelope.request_id,
success: false,
error: `Unsupported control_request subtype: ${subtype ?? 'unknown'}`,
});
return false;
}
}
sendControlRequest(
subtype: string,
payload: Record<string, unknown>,

View File

@@ -0,0 +1,47 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { afterEach, describe, expect, it, vi } from 'vitest';
import { parseStreamJsonInputFromIterable } from './input.js';
import * as ioModule from './io.js';
describe('parseStreamJsonInputFromIterable', () => {
afterEach(() => {
vi.restoreAllMocks();
});
it('uses the shared stream writer for control responses', async () => {
const writeSpy = vi
.spyOn(ioModule, 'writeStreamJsonEnvelope')
.mockImplementation(() => {});
async function* makeLines(): AsyncGenerator<string> {
yield JSON.stringify({
type: 'control_request',
request_id: 'req-init',
request: { subtype: 'initialize' },
});
yield JSON.stringify({
type: 'user',
message: {
role: 'user',
content: [{ type: 'text', text: 'hello world' }],
},
});
}
const result = await parseStreamJsonInputFromIterable(makeLines());
expect(result.prompt).toBe('hello world');
expect(writeSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: 'control_response',
request_id: 'req-init',
success: true,
}),
);
});
});

View File

@@ -8,12 +8,11 @@ import { createInterface } from 'node:readline/promises';
import process from 'node:process';
import {
parseStreamJsonEnvelope,
serializeStreamJsonEnvelope,
type StreamJsonControlRequestEnvelope,
type StreamJsonOutputEnvelope,
type StreamJsonUserEnvelope,
} from './types.js';
import { FatalInputError } from '@qwen-code/qwen-code-core';
import { extractUserMessageText, writeStreamJsonEnvelope } from './io.js';
export interface ParsedStreamJsonInput {
prompt: string;
@@ -35,7 +34,9 @@ export async function readStreamJsonInput(): Promise<ParsedStreamJsonInput> {
export async function parseStreamJsonInputFromIterable(
lines: AsyncIterable<string>,
emitEnvelope: (envelope: StreamJsonOutputEnvelope) => void = writeEnvelope,
emitEnvelope: (
envelope: StreamJsonOutputEnvelope,
) => void = writeStreamJsonEnvelope,
): Promise<ParsedStreamJsonInput> {
const promptParts: string[] = [];
let receivedUserMessage = false;
@@ -104,29 +105,4 @@ function handleControlRequest(
});
}
export function extractUserMessageText(
envelope: StreamJsonUserEnvelope,
): string {
const content = envelope.message?.content;
if (typeof content === 'string') {
return content;
}
if (Array.isArray(content)) {
return content
.map((block) => {
if (block && typeof block === 'object' && 'type' in block) {
if (block.type === 'text' && 'text' in block) {
return block.text ?? '';
}
return JSON.stringify(block);
}
return '';
})
.join('\n');
}
return '';
}
function writeEnvelope(envelope: StreamJsonOutputEnvelope): void {
process.stdout.write(`${serializeStreamJsonEnvelope(envelope)}\n`);
}
export { extractUserMessageText } from './io.js';

View File

@@ -0,0 +1,41 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import process from 'node:process';
import {
serializeStreamJsonEnvelope,
type StreamJsonOutputEnvelope,
type StreamJsonUserEnvelope,
} from './types.js';
export function writeStreamJsonEnvelope(
envelope: StreamJsonOutputEnvelope,
): void {
process.stdout.write(`${serializeStreamJsonEnvelope(envelope)}\n`);
}
export function extractUserMessageText(
envelope: StreamJsonUserEnvelope,
): string {
const content = envelope.message?.content;
if (typeof content === 'string') {
return content;
}
if (Array.isArray(content)) {
return content
.map((block) => {
if (block && typeof block === 'object' && 'type' in block) {
if (block.type === 'text' && 'text' in block) {
return block.text ?? '';
}
return JSON.stringify(block);
}
return '';
})
.join('\n');
}
return '';
}

View File

@@ -0,0 +1,68 @@
/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { PassThrough } from 'node:stream';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import type { Config } from '@qwen-code/qwen-code-core';
import { runStreamJsonSession } from './session.js';
import { StreamJsonController } from './controller.js';
import { StreamJsonWriter } from './writer.js';
import type { LoadedSettings } from '../config/settings.js';
vi.mock('../nonInteractiveCli.js', () => ({
runNonInteractive: vi.fn().mockResolvedValue(undefined),
}));
function createConfig(): Config {
return {
getIncludePartialMessages: () => false,
getSessionId: () => 'session-test',
getModel: () => 'model-test',
} as unknown as Config;
}
describe('runStreamJsonSession', () => {
let settings: LoadedSettings;
beforeEach(() => {
vi.spyOn(process.stdout, 'write').mockImplementation(() => true);
settings = {} as LoadedSettings;
});
afterEach(() => {
vi.restoreAllMocks();
});
it('delegates incoming control requests to the controller', async () => {
const controllerPrototype = StreamJsonController.prototype as unknown as {
handleIncomingControlRequest: (...args: unknown[]) => unknown;
};
const handleSpy = vi.spyOn(
controllerPrototype,
'handleIncomingControlRequest',
);
const inputStream = new PassThrough();
const config = createConfig();
const controlRequest = {
type: 'control_request',
request_id: 'req-1',
request: { subtype: 'initialize' },
};
inputStream.end(`${JSON.stringify(controlRequest)}\n`);
await runStreamJsonSession(config, settings, undefined, {
input: inputStream,
writer: new StreamJsonWriter(config, false),
});
expect(handleSpy).toHaveBeenCalledTimes(1);
const firstCall = handleSpy.mock.calls[0] as unknown[] | undefined;
expect(firstCall?.[1]).toMatchObject(controlRequest);
});
});

View File

@@ -9,10 +9,9 @@ import type { Config } from '@qwen-code/qwen-code-core';
import {
parseStreamJsonEnvelope,
type StreamJsonEnvelope,
type StreamJsonControlRequestEnvelope,
type StreamJsonUserEnvelope,
} from './types.js';
import { extractUserMessageText } from './input.js';
import { extractUserMessageText } from './io.js';
import { StreamJsonWriter } from './writer.js';
import { StreamJsonController } from './controller.js';
import { runNonInteractive } from '../nonInteractiveCli.js';
@@ -124,7 +123,7 @@ export async function runStreamJsonSession(
});
break;
case 'control_request':
await handleControlRequest(config, controller, envelope, writer);
controller.handleIncomingControlRequest(config, envelope);
break;
case 'control_response':
controller.handleControlResponse(envelope);
@@ -174,41 +173,3 @@ async function handleUserPrompt(
userEnvelope: job.envelope,
});
}
async function handleControlRequest(
config: Config,
controller: StreamJsonController,
envelope: StreamJsonControlRequestEnvelope,
writer: StreamJsonWriter,
): Promise<void> {
const subtype = envelope.request?.subtype;
switch (subtype) {
case 'initialize':
writer.emitSystemMessage('session_initialized', {
session_id: config.getSessionId(),
});
controller.handleControlResponse({
type: 'control_response',
request_id: envelope.request_id,
success: true,
response: { subtype: 'initialize' },
});
break;
case 'interrupt':
controller.interruptActiveRun();
controller.handleControlResponse({
type: 'control_response',
request_id: envelope.request_id,
success: true,
response: { subtype: 'interrupt' },
});
break;
default:
controller.handleControlResponse({
type: 'control_response',
request_id: envelope.request_id,
success: false,
error: `Unsupported control_request subtype: ${subtype ?? 'unknown'}`,
});
}
}

View File

@@ -90,14 +90,18 @@ describe('StreamJsonWriter', () => {
const envelopes = parseEnvelopes(writes);
expect(
envelopes.some(
(env) =>
env.type === 'stream_event' &&
env.event?.type === 'content_block_delta' &&
env.event?.delta?.type === 'thinking_delta',
),
).toBe(true);
const hasThinkingDelta = envelopes.some((env) => {
if (env.type !== 'stream_event') {
return false;
}
if (env.event?.type !== 'content_block_delta') {
return false;
}
const delta = env.event.delta as { type?: string } | undefined;
return delta?.type === 'thinking_delta';
});
expect(hasThinkingDelta).toBe(true);
const assistantEnvelope = envelopes.find((env) => env.type === 'assistant');
expect(assistantEnvelope?.message.content?.[0]).toEqual({
@@ -122,14 +126,18 @@ describe('StreamJsonWriter', () => {
const envelopes = parseEnvelopes(writes);
expect(
envelopes.some(
(env) =>
env.type === 'stream_event' &&
env.event?.type === 'content_block_delta' &&
env.event?.delta?.type === 'input_json_delta',
),
).toBe(true);
const hasInputJsonDelta = envelopes.some((env) => {
if (env.type !== 'stream_event') {
return false;
}
if (env.event?.type !== 'content_block_delta') {
return false;
}
const delta = env.event.delta as { type?: string } | undefined;
return delta?.type === 'input_json_delta';
});
expect(hasInputJsonDelta).toBe(true);
});
it('includes session id in system messages', () => {

View File

@@ -12,7 +12,6 @@ import type {
} from '@qwen-code/qwen-code-core';
import type { Part } from '@google/genai';
import {
serializeStreamJsonEnvelope,
type StreamJsonAssistantEnvelope,
type StreamJsonContentBlock,
type StreamJsonMessageStreamEvent,
@@ -21,6 +20,7 @@ import {
type StreamJsonUsage,
type StreamJsonToolResultBlock,
} from './types.js';
import { writeStreamJsonEnvelope } from './io.js';
export interface StreamJsonResultOptions {
readonly isError: boolean;
@@ -146,8 +146,7 @@ export class StreamJsonWriter {
}
writeEnvelope(envelope: StreamJsonOutputEnvelope): void {
const line = serializeStreamJsonEnvelope(envelope);
process.stdout.write(`${line}\n`);
writeStreamJsonEnvelope(envelope);
}
private toolResultContent(