feat: create draft framework for SDK-support CLI

This commit is contained in:
mingholy.lmh
2025-10-30 18:18:41 +08:00
parent 567b73e6e0
commit 1aa282c054
48 changed files with 11714 additions and 56 deletions

View File

@@ -0,0 +1,111 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Message Router
*
* Routes incoming messages to appropriate handlers based on message type.
* Provides classification for control messages vs data messages.
*/
import type { Config } from '@qwen-code/qwen-code-core';
import type {
CLIMessage,
CLIControlRequest,
CLIControlResponse,
ControlCancelRequest,
} from '../types/protocol.js';
import {
isCLIUserMessage,
isCLIAssistantMessage,
isCLISystemMessage,
isCLIResultMessage,
isCLIPartialAssistantMessage,
isControlRequest,
isControlResponse,
isControlCancel,
} from '../types/protocol.js';
export type MessageType =
| 'control_request'
| 'control_response'
| 'control_cancel'
| 'user'
| 'assistant'
| 'system'
| 'result'
| 'stream_event'
| 'unknown';
export interface RoutedMessage {
type: MessageType;
message:
| CLIMessage
| CLIControlRequest
| CLIControlResponse
| ControlCancelRequest;
}
/**
* Message Router
*
* Classifies incoming messages and routes them to appropriate handlers.
*/
export class MessageRouter {
private debugMode: boolean;
constructor(config: Config) {
this.debugMode = config.getDebugMode();
}
/**
* Route a message to the appropriate handler based on its type
*/
route(
message:
| CLIMessage
| CLIControlRequest
| CLIControlResponse
| ControlCancelRequest,
): RoutedMessage {
// Check control messages first
if (isControlRequest(message)) {
return { type: 'control_request', message };
}
if (isControlResponse(message)) {
return { type: 'control_response', message };
}
if (isControlCancel(message)) {
return { type: 'control_cancel', message };
}
// Check data messages
if (isCLIUserMessage(message)) {
return { type: 'user', message };
}
if (isCLIAssistantMessage(message)) {
return { type: 'assistant', message };
}
if (isCLISystemMessage(message)) {
return { type: 'system', message };
}
if (isCLIResultMessage(message)) {
return { type: 'result', message };
}
if (isCLIPartialAssistantMessage(message)) {
return { type: 'stream_event', message };
}
// Unknown message type
if (this.debugMode) {
console.error(
'[MessageRouter] Unknown message type:',
JSON.stringify(message, null, 2),
);
}
return { type: 'unknown', message };
}
}

View File

@@ -0,0 +1,633 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/* eslint-disable @typescript-eslint/no-explicit-any */
/**
* Transport-agnostic JSON Lines protocol handler for bidirectional communication.
* Works with any Readable/Writable stream (stdin/stdout, HTTP, WebSocket, etc.)
*/
import * as readline from 'node:readline';
import { randomUUID } from 'node:crypto';
import type { Readable, Writable } from 'node:stream';
import type {
CLIMessage,
CLIUserMessage,
ContentBlock,
CLIControlRequest,
CLIControlResponse,
ControlCancelRequest,
CLIAssistantMessage,
CLIPartialAssistantMessage,
StreamEvent,
TextBlock,
ThinkingBlock,
ToolUseBlock,
Usage,
} from '../types/protocol.js';
import type { ServerGeminiStreamEvent } from '@qwen-code/qwen-code-core';
import { GeminiEventType } from '@qwen-code/qwen-code-core';
/**
* ============================================================================
* Stream JSON I/O Class
* ============================================================================
*/
export interface StreamJsonOptions {
input?: Readable;
output?: Writable;
onError?: (error: Error) => void;
}
/**
* Handles JSON Lines communication over arbitrary streams.
*/
export class StreamJson {
private input: Readable;
private output: Writable;
private rl?: readline.Interface;
private onError?: (error: Error) => void;
constructor(options: StreamJsonOptions = {}) {
this.input = options.input || process.stdin;
this.output = options.output || process.stdout;
this.onError = options.onError;
}
/**
* Read messages from input stream as async generator.
*/
async *readMessages(): AsyncGenerator<
CLIMessage | CLIControlRequest | CLIControlResponse | ControlCancelRequest,
void,
unknown
> {
this.rl = readline.createInterface({
input: this.input,
crlfDelay: Infinity,
terminal: false,
});
try {
for await (const line of this.rl) {
if (!line.trim()) {
continue; // Skip empty lines
}
try {
const message = JSON.parse(line);
yield message;
} catch (error) {
console.error(
'[StreamJson] Failed to parse message:',
line.substring(0, 100),
error,
);
// Continue processing (skip bad line)
}
}
} finally {
// Cleanup on exit
}
}
/**
* Send a message to output stream.
*/
send(message: CLIMessage | CLIControlResponse | CLIControlRequest): void {
try {
const line = JSON.stringify(message) + '\n';
this.output.write(line);
} catch (error) {
console.error('[StreamJson] Failed to send message:', error);
if (this.onError) {
this.onError(error as Error);
}
}
}
/**
* Create an assistant message builder.
*/
createAssistantBuilder(
sessionId: string,
parentToolUseId: string | null,
model: string,
includePartialMessages: boolean = false,
): AssistantMessageBuilder {
return new AssistantMessageBuilder({
sessionId,
parentToolUseId,
includePartialMessages,
model,
streamJson: this,
});
}
/**
* Cleanup resources.
*/
cleanup(): void {
if (this.rl) {
this.rl.close();
this.rl = undefined;
}
}
}
/**
* ============================================================================
* Assistant Message Builder
* ============================================================================
*/
export interface AssistantMessageBuilderOptions {
sessionId: string;
parentToolUseId: string | null;
includePartialMessages: boolean;
model: string;
streamJson: StreamJson;
}
/**
* Builds assistant messages from Gemini stream events.
* Accumulates content blocks and emits streaming events in real-time.
*/
export class AssistantMessageBuilder {
private sessionId: string;
private parentToolUseId: string | null;
private includePartialMessages: boolean;
private model: string;
private streamJson: StreamJson;
private messageId: string;
private contentBlocks: ContentBlock[] = [];
private openBlocks = new Set<number>();
private messageStarted: boolean = false;
private finalized: boolean = false;
private usage: Usage | null = null;
// Current block state
private currentBlockType: 'text' | 'thinking' | null = null;
private currentTextContent: string = '';
private currentThinkingContent: string = '';
private currentThinkingSignature: string = '';
constructor(options: AssistantMessageBuilderOptions) {
this.sessionId = options.sessionId;
this.parentToolUseId = options.parentToolUseId;
this.includePartialMessages = options.includePartialMessages;
this.model = options.model;
this.streamJson = options.streamJson;
this.messageId = randomUUID();
}
/**
* Process a Gemini stream event and update internal state.
*/
processEvent(event: ServerGeminiStreamEvent): void {
if (this.finalized) {
return;
}
switch (event.type) {
case GeminiEventType.Content:
this.handleContentEvent(event.value);
break;
case GeminiEventType.Thought:
this.handleThoughtEvent(event.value.subject, event.value.description);
break;
case GeminiEventType.ToolCallRequest:
this.handleToolCallRequest(event.value);
break;
case GeminiEventType.Finished:
this.finalizePendingBlocks();
break;
default:
// Ignore other event types
break;
}
}
/**
* Handle text content event.
*/
private handleContentEvent(content: string): void {
if (!content) {
return;
}
this.ensureMessageStarted();
// If we're not in a text block, switch to text mode
if (this.currentBlockType !== 'text') {
this.switchToTextBlock();
}
// Accumulate content
this.currentTextContent += content;
// Emit delta for streaming updates
const currentIndex = this.contentBlocks.length;
this.emitContentBlockDelta(currentIndex, {
type: 'text_delta',
text: content,
});
}
/**
* Handle thinking event.
*/
private handleThoughtEvent(subject: string, description: string): void {
this.ensureMessageStarted();
const thinkingFragment = `${subject}: ${description}`;
// If we're not in a thinking block, switch to thinking mode
if (this.currentBlockType !== 'thinking') {
this.switchToThinkingBlock(subject);
}
// Accumulate thinking content
this.currentThinkingContent += thinkingFragment;
// Emit delta for streaming updates
const currentIndex = this.contentBlocks.length;
this.emitContentBlockDelta(currentIndex, {
type: 'thinking_delta',
thinking: thinkingFragment,
});
}
/**
* Handle tool call request.
*/
private handleToolCallRequest(request: any): void {
this.ensureMessageStarted();
// Finalize any open blocks first
this.finalizePendingBlocks();
// Create and add tool use block
const index = this.contentBlocks.length;
const toolUseBlock: ToolUseBlock = {
type: 'tool_use',
id: request.callId,
name: request.name,
input: request.args,
};
this.contentBlocks.push(toolUseBlock);
this.openBlock(index, toolUseBlock);
this.closeBlock(index);
}
/**
* Finalize any pending content blocks.
*/
private finalizePendingBlocks(): void {
if (this.currentBlockType === 'text' && this.currentTextContent) {
this.finalizeTextBlock();
} else if (
this.currentBlockType === 'thinking' &&
this.currentThinkingContent
) {
this.finalizeThinkingBlock();
}
}
/**
* Switch to text block mode.
*/
private switchToTextBlock(): void {
this.finalizePendingBlocks();
this.currentBlockType = 'text';
this.currentTextContent = '';
const index = this.contentBlocks.length;
const textBlock: TextBlock = {
type: 'text',
text: '',
};
this.openBlock(index, textBlock);
}
/**
* Switch to thinking block mode.
*/
private switchToThinkingBlock(signature: string): void {
this.finalizePendingBlocks();
this.currentBlockType = 'thinking';
this.currentThinkingContent = '';
this.currentThinkingSignature = signature;
const index = this.contentBlocks.length;
const thinkingBlock: ThinkingBlock = {
type: 'thinking',
thinking: '',
signature,
};
this.openBlock(index, thinkingBlock);
}
/**
* Finalize current text block.
*/
private finalizeTextBlock(): void {
if (!this.currentTextContent) {
return;
}
const index = this.contentBlocks.length;
const textBlock: TextBlock = {
type: 'text',
text: this.currentTextContent,
};
this.contentBlocks.push(textBlock);
this.closeBlock(index);
this.currentBlockType = null;
this.currentTextContent = '';
}
/**
* Finalize current thinking block.
*/
private finalizeThinkingBlock(): void {
if (!this.currentThinkingContent) {
return;
}
const index = this.contentBlocks.length;
const thinkingBlock: ThinkingBlock = {
type: 'thinking',
thinking: this.currentThinkingContent,
signature: this.currentThinkingSignature,
};
this.contentBlocks.push(thinkingBlock);
this.closeBlock(index);
this.currentBlockType = null;
this.currentThinkingContent = '';
this.currentThinkingSignature = '';
}
/**
* Set usage information for the final message.
*/
setUsage(usage: Usage): void {
this.usage = usage;
}
/**
* Build and return the final assistant message.
*/
finalize(): CLIAssistantMessage {
if (this.finalized) {
return this.buildFinalMessage();
}
this.finalized = true;
// Finalize any pending blocks
this.finalizePendingBlocks();
// Close all open blocks in order
const orderedOpenBlocks = [...this.openBlocks].sort((a, b) => a - b);
for (const index of orderedOpenBlocks) {
this.closeBlock(index);
}
// Emit message stop event
if (this.messageStarted) {
this.emitMessageStop();
}
return this.buildFinalMessage();
}
/**
* Build the final message structure.
*/
private buildFinalMessage(): CLIAssistantMessage {
return {
type: 'assistant',
uuid: this.messageId,
session_id: this.sessionId,
parent_tool_use_id: this.parentToolUseId,
message: {
id: this.messageId,
type: 'message',
role: 'assistant',
model: this.model,
content: this.contentBlocks,
stop_reason: null,
usage: this.usage || {
input_tokens: 0,
output_tokens: 0,
},
},
};
}
/**
* Ensure message has been started.
*/
private ensureMessageStarted(): void {
if (this.messageStarted) {
return;
}
this.messageStarted = true;
this.emitMessageStart();
}
/**
* Open a content block and emit start event.
*/
private openBlock(index: number, block: ContentBlock): void {
this.openBlocks.add(index);
this.emitContentBlockStart(index, block);
}
/**
* Close a content block and emit stop event.
*/
private closeBlock(index: number): void {
if (!this.openBlocks.has(index)) {
return;
}
this.openBlocks.delete(index);
this.emitContentBlockStop(index);
}
/**
* Emit message_start stream event.
*/
private emitMessageStart(): void {
const event: StreamEvent = {
type: 'message_start',
message: {
id: this.messageId,
role: 'assistant',
model: this.model,
},
};
this.emitStreamEvent(event);
}
/**
* Emit content_block_start stream event.
*/
private emitContentBlockStart(
index: number,
contentBlock: ContentBlock,
): void {
const event: StreamEvent = {
type: 'content_block_start',
index,
content_block: contentBlock,
};
this.emitStreamEvent(event);
}
/**
* Emit content_block_delta stream event.
*/
private emitContentBlockDelta(
index: number,
delta: {
type: 'text_delta' | 'thinking_delta';
text?: string;
thinking?: string;
},
): void {
const event: StreamEvent = {
type: 'content_block_delta',
index,
delta,
};
this.emitStreamEvent(event);
}
/**
* Emit content_block_stop stream event
*/
private emitContentBlockStop(index: number): void {
const event: StreamEvent = {
type: 'content_block_stop',
index,
};
this.emitStreamEvent(event);
}
/**
* Emit message_stop stream event
*/
private emitMessageStop(): void {
const event: StreamEvent = {
type: 'message_stop',
};
this.emitStreamEvent(event);
}
/**
* Emit a stream event as SDKPartialAssistantMessage
*/
private emitStreamEvent(event: StreamEvent): void {
if (!this.includePartialMessages) return;
const message: CLIPartialAssistantMessage = {
type: 'stream_event',
uuid: randomUUID(),
session_id: this.sessionId,
event,
parent_tool_use_id: this.parentToolUseId,
};
this.streamJson.send(message);
}
}
/**
* Extract text content from user message
*/
export function extractUserMessageText(message: CLIUserMessage): string[] {
const texts: string[] = [];
const content = message.message.content;
if (typeof content === 'string') {
texts.push(content);
} else if (Array.isArray(content)) {
for (const block of content) {
if ('content' in block && typeof block.content === 'string') {
texts.push(block.content);
}
}
}
return texts;
}
/**
* Extract text content from content blocks
*/
export function extractTextFromContent(content: ContentBlock[]): string {
return content
.filter((block) => block.type === 'text')
.map((block) => (block.type === 'text' ? block.text : ''))
.join('');
}
/**
* Create text content block
*/
export function createTextContent(text: string): ContentBlock {
return {
type: 'text',
text,
};
}
/**
* Create tool use content block
*/
export function createToolUseContent(
id: string,
name: string,
input: Record<string, any>,
): ContentBlock {
return {
type: 'tool_use',
id,
name,
input,
};
}
/**
* Create tool result content block
*/
export function createToolResultContent(
tool_use_id: string,
content: string | Array<Record<string, any>> | null,
is_error?: boolean,
): ContentBlock {
return {
type: 'tool_result',
tool_use_id,
content,
is_error,
};
}

View File

@@ -0,0 +1,73 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Control Context
*
* Shared context for control plane communication, providing access to
* session state, configuration, and I/O without prop drilling.
*/
import type { Config, MCPServerConfig } from '@qwen-code/qwen-code-core';
import type { Client } from '@modelcontextprotocol/sdk/client/index.js';
import type { StreamJson } from '../StreamJson.js';
import type { PermissionMode } from '../../types/protocol.js';
/**
* Control Context interface
*
* Provides shared access to session-scoped resources and mutable state
* for all controllers.
*/
export interface IControlContext {
readonly config: Config;
readonly streamJson: StreamJson;
readonly sessionId: string;
readonly abortSignal: AbortSignal;
readonly debugMode: boolean;
permissionMode: PermissionMode;
sdkMcpServers: Set<string>;
mcpClients: Map<string, { client: Client; config: MCPServerConfig }>;
onInterrupt?: () => void;
}
/**
* Control Context implementation
*/
export class ControlContext implements IControlContext {
readonly config: Config;
readonly streamJson: StreamJson;
readonly sessionId: string;
readonly abortSignal: AbortSignal;
readonly debugMode: boolean;
permissionMode: PermissionMode;
sdkMcpServers: Set<string>;
mcpClients: Map<string, { client: Client; config: MCPServerConfig }>;
onInterrupt?: () => void;
constructor(options: {
config: Config;
streamJson: StreamJson;
sessionId: string;
abortSignal: AbortSignal;
permissionMode?: PermissionMode;
onInterrupt?: () => void;
}) {
this.config = options.config;
this.streamJson = options.streamJson;
this.sessionId = options.sessionId;
this.abortSignal = options.abortSignal;
this.debugMode = options.config.getDebugMode();
this.permissionMode = options.permissionMode || 'default';
this.sdkMcpServers = new Set();
this.mcpClients = new Map();
this.onInterrupt = options.onInterrupt;
}
}

View File

@@ -0,0 +1,351 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Control Dispatcher
*
* Routes control requests between SDK and CLI to appropriate controllers.
* Manages pending request registry and handles cancellation/cleanup.
*
* Controllers:
* - SystemController: initialize, interrupt, set_model, supported_commands
* - PermissionController: can_use_tool, set_permission_mode
* - MCPController: mcp_message, mcp_server_status
* - HookController: hook_callback
*
* Note: Control request types are centrally defined in the ControlRequestType
* enum in packages/sdk/typescript/src/types/controlRequests.ts
*/
import type { IControlContext } from './ControlContext.js';
import type { IPendingRequestRegistry } from './controllers/baseController.js';
import { SystemController } from './controllers/systemController.js';
import { PermissionController } from './controllers/permissionController.js';
import { MCPController } from './controllers/mcpController.js';
import { HookController } from './controllers/hookController.js';
import type {
CLIControlRequest,
CLIControlResponse,
ControlResponse,
ControlRequestPayload,
} from '../../types/protocol.js';
/**
* Tracks an incoming request from SDK awaiting CLI response
*/
interface PendingIncomingRequest {
controller: string;
abortController: AbortController;
timeoutId: NodeJS.Timeout;
}
/**
* Tracks an outgoing request from CLI awaiting SDK response
*/
interface PendingOutgoingRequest {
controller: string;
resolve: (response: ControlResponse) => void;
reject: (error: Error) => void;
timeoutId: NodeJS.Timeout;
}
/**
* Central coordinator for control plane communication.
* Routes requests to controllers and manages request lifecycle.
*/
export class ControlDispatcher implements IPendingRequestRegistry {
private context: IControlContext;
// Make controllers publicly accessible
readonly systemController: SystemController;
readonly permissionController: PermissionController;
readonly mcpController: MCPController;
readonly hookController: HookController;
// Central pending request registries
private pendingIncomingRequests: Map<string, PendingIncomingRequest> =
new Map();
private pendingOutgoingRequests: Map<string, PendingOutgoingRequest> =
new Map();
constructor(context: IControlContext) {
this.context = context;
// Create domain controllers with context and registry
this.systemController = new SystemController(
context,
this,
'SystemController',
);
this.permissionController = new PermissionController(
context,
this,
'PermissionController',
);
this.mcpController = new MCPController(context, this, 'MCPController');
this.hookController = new HookController(context, this, 'HookController');
// Listen for main abort signal
this.context.abortSignal.addEventListener('abort', () => {
this.shutdown();
});
}
/**
* Routes an incoming request to the appropriate controller and sends response
*/
async dispatch(request: CLIControlRequest): Promise<void> {
const { request_id, request: payload } = request;
try {
// Route to appropriate controller
const controller = this.getControllerForRequest(payload.subtype);
const response = await controller.handleRequest(payload, request_id);
// Send success response
this.sendSuccessResponse(request_id, response);
// Special handling for initialize: send SystemMessage after success response
if (payload.subtype === 'initialize') {
this.systemController.sendSystemMessage();
}
} catch (error) {
// Send error response
const errorMessage =
error instanceof Error ? error.message : String(error);
this.sendErrorResponse(request_id, errorMessage);
}
}
/**
* Processes response from SDK for an outgoing request
*/
handleControlResponse(response: CLIControlResponse): void {
const responsePayload = response.response;
const requestId = responsePayload.request_id;
const pending = this.pendingOutgoingRequests.get(requestId);
if (!pending) {
// No pending request found - may have timed out or been cancelled
if (this.context.debugMode) {
console.error(
`[ControlDispatcher] No pending outgoing request for: ${requestId}`,
);
}
return;
}
// Deregister
this.deregisterOutgoingRequest(requestId);
// Resolve or reject based on response type
if (responsePayload.subtype === 'success') {
pending.resolve(responsePayload);
} else {
pending.reject(new Error(responsePayload.error));
}
}
/**
* Sends a control request to SDK and waits for response
*/
async sendControlRequest(
payload: ControlRequestPayload,
timeoutMs?: number,
): Promise<ControlResponse> {
// Delegate to system controller (or any controller, they all have the same method)
return this.systemController.sendControlRequest(payload, timeoutMs);
}
/**
* Cancels a specific request or all pending requests
*/
handleCancel(requestId?: string): void {
if (requestId) {
// Cancel specific incoming request
const pending = this.pendingIncomingRequests.get(requestId);
if (pending) {
pending.abortController.abort();
this.deregisterIncomingRequest(requestId);
this.sendErrorResponse(requestId, 'Request cancelled');
if (this.context.debugMode) {
console.error(
`[ControlDispatcher] Cancelled incoming request: ${requestId}`,
);
}
}
} else {
// Cancel ALL pending incoming requests
const requestIds = Array.from(this.pendingIncomingRequests.keys());
for (const id of requestIds) {
const pending = this.pendingIncomingRequests.get(id);
if (pending) {
pending.abortController.abort();
this.deregisterIncomingRequest(id);
this.sendErrorResponse(id, 'All requests cancelled');
}
}
if (this.context.debugMode) {
console.error(
`[ControlDispatcher] Cancelled all ${requestIds.length} pending incoming requests`,
);
}
}
}
/**
* Stops all pending requests and cleans up all controllers
*/
shutdown(): void {
if (this.context.debugMode) {
console.error('[ControlDispatcher] Shutting down');
}
// Cancel all incoming requests
for (const [
_requestId,
pending,
] of this.pendingIncomingRequests.entries()) {
pending.abortController.abort();
clearTimeout(pending.timeoutId);
}
this.pendingIncomingRequests.clear();
// Cancel all outgoing requests
for (const [
_requestId,
pending,
] of this.pendingOutgoingRequests.entries()) {
clearTimeout(pending.timeoutId);
pending.reject(new Error('Dispatcher shutdown'));
}
this.pendingOutgoingRequests.clear();
// Cleanup controllers (MCP controller will close all clients)
this.systemController.cleanup();
this.permissionController.cleanup();
this.mcpController.cleanup();
this.hookController.cleanup();
}
/**
* Registers an incoming request in the pending registry
*/
registerIncomingRequest(
requestId: string,
controller: string,
abortController: AbortController,
timeoutId: NodeJS.Timeout,
): void {
this.pendingIncomingRequests.set(requestId, {
controller,
abortController,
timeoutId,
});
}
/**
* Removes an incoming request from the pending registry
*/
deregisterIncomingRequest(requestId: string): void {
const pending = this.pendingIncomingRequests.get(requestId);
if (pending) {
clearTimeout(pending.timeoutId);
this.pendingIncomingRequests.delete(requestId);
}
}
/**
* Registers an outgoing request in the pending registry
*/
registerOutgoingRequest(
requestId: string,
controller: string,
resolve: (response: ControlResponse) => void,
reject: (error: Error) => void,
timeoutId: NodeJS.Timeout,
): void {
this.pendingOutgoingRequests.set(requestId, {
controller,
resolve,
reject,
timeoutId,
});
}
/**
* Removes an outgoing request from the pending registry
*/
deregisterOutgoingRequest(requestId: string): void {
const pending = this.pendingOutgoingRequests.get(requestId);
if (pending) {
clearTimeout(pending.timeoutId);
this.pendingOutgoingRequests.delete(requestId);
}
}
/**
* Returns the controller that handles the given request subtype
*/
private getControllerForRequest(subtype: string) {
switch (subtype) {
case 'initialize':
case 'interrupt':
case 'set_model':
case 'supported_commands':
return this.systemController;
case 'can_use_tool':
case 'set_permission_mode':
return this.permissionController;
case 'mcp_message':
case 'mcp_server_status':
return this.mcpController;
case 'hook_callback':
return this.hookController;
default:
throw new Error(`Unknown control request subtype: ${subtype}`);
}
}
/**
* Sends a success response back to SDK
*/
private sendSuccessResponse(
requestId: string,
response: Record<string, unknown>,
): void {
const controlResponse: CLIControlResponse = {
type: 'control_response',
response: {
subtype: 'success',
request_id: requestId,
response,
},
};
this.context.streamJson.send(controlResponse);
}
/**
* Sends an error response back to SDK
*/
private sendErrorResponse(requestId: string, error: string): void {
const controlResponse: CLIControlResponse = {
type: 'control_response',
response: {
subtype: 'error',
request_id: requestId,
error,
},
};
this.context.streamJson.send(controlResponse);
}
}

View File

@@ -0,0 +1,180 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Base Controller
*
* Abstract base class for domain-specific control plane controllers.
* Provides common functionality for:
* - Handling incoming control requests (SDK -> CLI)
* - Sending outgoing control requests (CLI -> SDK)
* - Request lifecycle management with timeout and cancellation
* - Integration with central pending request registry
*/
import { randomUUID } from 'node:crypto';
import type { IControlContext } from '../ControlContext.js';
import type {
ControlRequestPayload,
ControlResponse,
CLIControlRequest,
} from '../../../types/protocol.js';
const DEFAULT_REQUEST_TIMEOUT_MS = 30000; // 30 seconds
/**
* Registry interface for controllers to register/deregister pending requests
*/
export interface IPendingRequestRegistry {
registerIncomingRequest(
requestId: string,
controller: string,
abortController: AbortController,
timeoutId: NodeJS.Timeout,
): void;
deregisterIncomingRequest(requestId: string): void;
registerOutgoingRequest(
requestId: string,
controller: string,
resolve: (response: ControlResponse) => void,
reject: (error: Error) => void,
timeoutId: NodeJS.Timeout,
): void;
deregisterOutgoingRequest(requestId: string): void;
}
/**
* Abstract base controller class
*
* Subclasses should implement handleRequestPayload() to process specific
* control request types.
*/
export abstract class BaseController {
protected context: IControlContext;
protected registry: IPendingRequestRegistry;
protected controllerName: string;
constructor(
context: IControlContext,
registry: IPendingRequestRegistry,
controllerName: string,
) {
this.context = context;
this.registry = registry;
this.controllerName = controllerName;
}
/**
* Handle an incoming control request
*
* Manages lifecycle: register -> process -> deregister
*/
async handleRequest(
payload: ControlRequestPayload,
requestId: string,
): Promise<Record<string, unknown>> {
const requestAbortController = new AbortController();
// Setup timeout
const timeoutId = setTimeout(() => {
requestAbortController.abort();
this.registry.deregisterIncomingRequest(requestId);
if (this.context.debugMode) {
console.error(`[${this.controllerName}] Request timeout: ${requestId}`);
}
}, DEFAULT_REQUEST_TIMEOUT_MS);
// Register with central registry
this.registry.registerIncomingRequest(
requestId,
this.controllerName,
requestAbortController,
timeoutId,
);
try {
const response = await this.handleRequestPayload(
payload,
requestAbortController.signal,
);
// Success - deregister
this.registry.deregisterIncomingRequest(requestId);
return response;
} catch (error) {
// Error - deregister
this.registry.deregisterIncomingRequest(requestId);
throw error;
}
}
/**
* Send an outgoing control request to SDK
*
* Manages lifecycle: register -> send -> wait for response -> deregister
*/
async sendControlRequest(
payload: ControlRequestPayload,
timeoutMs: number = DEFAULT_REQUEST_TIMEOUT_MS,
): Promise<ControlResponse> {
const requestId = randomUUID();
return new Promise<ControlResponse>((resolve, reject) => {
// Setup timeout
const timeoutId = setTimeout(() => {
this.registry.deregisterOutgoingRequest(requestId);
reject(new Error('Control request timeout'));
if (this.context.debugMode) {
console.error(
`[${this.controllerName}] Outgoing request timeout: ${requestId}`,
);
}
}, timeoutMs);
// Register with central registry
this.registry.registerOutgoingRequest(
requestId,
this.controllerName,
resolve,
reject,
timeoutId,
);
// Send control request
const request: CLIControlRequest = {
type: 'control_request',
request_id: requestId,
request: payload,
};
try {
this.context.streamJson.send(request);
} catch (error) {
this.registry.deregisterOutgoingRequest(requestId);
reject(error);
}
});
}
/**
* Abstract method: Handle specific request payload
*
* Subclasses must implement this to process their domain-specific requests.
*/
protected abstract handleRequestPayload(
payload: ControlRequestPayload,
signal: AbortSignal,
): Promise<Record<string, unknown>>;
/**
* Cleanup resources
*/
cleanup(): void {
// Subclasses can override to add cleanup logic
}
}

View File

@@ -0,0 +1,56 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Hook Controller
*
* Handles hook-related control requests:
* - hook_callback: Process hook callbacks (placeholder for future)
*/
import { BaseController } from './baseController.js';
import type {
ControlRequestPayload,
CLIHookCallbackRequest,
} from '../../../types/protocol.js';
export class HookController extends BaseController {
/**
* Handle hook control requests
*/
protected async handleRequestPayload(
payload: ControlRequestPayload,
_signal: AbortSignal,
): Promise<Record<string, unknown>> {
switch (payload.subtype) {
case 'hook_callback':
return this.handleHookCallback(payload as CLIHookCallbackRequest);
default:
throw new Error(`Unsupported request subtype in HookController`);
}
}
/**
* Handle hook_callback request
*
* Processes hook callbacks (placeholder implementation)
*/
private async handleHookCallback(
payload: CLIHookCallbackRequest,
): Promise<Record<string, unknown>> {
if (this.context.debugMode) {
console.error(`[HookController] Hook callback: ${payload.callback_id}`);
}
// Hook callback processing not yet implemented
return {
result: 'Hook callback processing not yet implemented',
callback_id: payload.callback_id,
tool_use_id: payload.tool_use_id,
};
}
}

View File

@@ -0,0 +1,287 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* MCP Controller
*
* Handles MCP-related control requests:
* - mcp_message: Route MCP messages
* - mcp_server_status: Return MCP server status
*/
import { BaseController } from './baseController.js';
import type { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { ResultSchema } from '@modelcontextprotocol/sdk/types.js';
import type {
ControlRequestPayload,
CLIControlMcpMessageRequest,
} from '../../../types/protocol.js';
import type {
MCPServerConfig,
WorkspaceContext,
} from '@qwen-code/qwen-code-core';
import {
connectToMcpServer,
MCP_DEFAULT_TIMEOUT_MSEC,
} from '@qwen-code/qwen-code-core';
export class MCPController extends BaseController {
/**
* Handle MCP control requests
*/
protected async handleRequestPayload(
payload: ControlRequestPayload,
_signal: AbortSignal,
): Promise<Record<string, unknown>> {
switch (payload.subtype) {
case 'mcp_message':
return this.handleMcpMessage(payload as CLIControlMcpMessageRequest);
case 'mcp_server_status':
return this.handleMcpStatus();
default:
throw new Error(`Unsupported request subtype in MCPController`);
}
}
/**
* Handle mcp_message request
*
* Routes JSON-RPC messages to MCP servers
*/
private async handleMcpMessage(
payload: CLIControlMcpMessageRequest,
): Promise<Record<string, unknown>> {
const serverNameRaw = payload.server_name;
if (
typeof serverNameRaw !== 'string' ||
serverNameRaw.trim().length === 0
) {
throw new Error('Missing server_name in mcp_message request');
}
const message = payload.message;
if (!message || typeof message !== 'object') {
throw new Error(
'Missing or invalid message payload for mcp_message request',
);
}
// Get or create MCP client
let clientEntry: { client: Client; config: MCPServerConfig };
try {
clientEntry = await this.getOrCreateMcpClient(serverNameRaw.trim());
} catch (error) {
throw new Error(
error instanceof Error
? error.message
: 'Failed to connect to MCP server',
);
}
const method = message.method;
if (typeof method !== 'string' || method.trim().length === 0) {
throw new Error('Invalid MCP message: missing method');
}
const jsonrpcVersion =
typeof message.jsonrpc === 'string' ? message.jsonrpc : '2.0';
const messageId = message.id;
const params = message.params;
const timeout =
typeof clientEntry.config.timeout === 'number'
? clientEntry.config.timeout
: MCP_DEFAULT_TIMEOUT_MSEC;
try {
// Handle notification (no id)
if (messageId === undefined) {
await clientEntry.client.notification({
method,
params,
});
return {
subtype: 'mcp_message',
mcp_response: {
jsonrpc: jsonrpcVersion,
id: null,
result: { success: true, acknowledged: true },
},
};
}
// Handle request (with id)
const result = await clientEntry.client.request(
{
method,
params,
},
ResultSchema,
{ timeout },
);
return {
subtype: 'mcp_message',
mcp_response: {
jsonrpc: jsonrpcVersion,
id: messageId,
result,
},
};
} catch (error) {
// If connection closed, remove from cache
if (error instanceof Error && /closed/i.test(error.message)) {
this.context.mcpClients.delete(serverNameRaw.trim());
}
const errorCode =
typeof (error as { code?: unknown })?.code === 'number'
? ((error as { code: number }).code as number)
: -32603;
const errorMessage =
error instanceof Error
? error.message
: 'Failed to execute MCP request';
const errorData = (error as { data?: unknown })?.data;
const errorBody: Record<string, unknown> = {
code: errorCode,
message: errorMessage,
};
if (errorData !== undefined) {
errorBody['data'] = errorData;
}
return {
subtype: 'mcp_message',
mcp_response: {
jsonrpc: jsonrpcVersion,
id: messageId ?? null,
error: errorBody,
},
};
}
}
/**
* Handle mcp_server_status request
*
* Returns status of registered MCP servers
*/
private async handleMcpStatus(): Promise<Record<string, unknown>> {
const status: Record<string, string> = {};
// Include SDK MCP servers
for (const serverName of this.context.sdkMcpServers) {
status[serverName] = 'connected';
}
// Include CLI-managed MCP clients
for (const serverName of this.context.mcpClients.keys()) {
status[serverName] = 'connected';
}
if (this.context.debugMode) {
console.error(
`[MCPController] MCP status: ${Object.keys(status).length} servers`,
);
}
return status;
}
/**
* Get or create MCP client for a server
*
* Implements lazy connection and caching
*/
private async getOrCreateMcpClient(
serverName: string,
): Promise<{ client: Client; config: MCPServerConfig }> {
// Check cache first
const cached = this.context.mcpClients.get(serverName);
if (cached) {
return cached;
}
// Get server configuration
const provider = this.context.config as unknown as {
getMcpServers?: () => Record<string, MCPServerConfig> | undefined;
getDebugMode?: () => boolean;
getWorkspaceContext?: () => unknown;
};
if (typeof provider.getMcpServers !== 'function') {
throw new Error(`MCP server "${serverName}" is not configured`);
}
const servers = provider.getMcpServers() ?? {};
const serverConfig = servers[serverName];
if (!serverConfig) {
throw new Error(`MCP server "${serverName}" is not configured`);
}
const debugMode =
typeof provider.getDebugMode === 'function'
? provider.getDebugMode()
: false;
const workspaceContext =
typeof provider.getWorkspaceContext === 'function'
? provider.getWorkspaceContext()
: undefined;
if (!workspaceContext) {
throw new Error('Workspace context is not available for MCP connection');
}
// Connect to MCP server
const client = await connectToMcpServer(
serverName,
serverConfig,
debugMode,
workspaceContext as WorkspaceContext,
);
// Cache the client
const entry = { client, config: serverConfig };
this.context.mcpClients.set(serverName, entry);
if (this.context.debugMode) {
console.error(`[MCPController] Connected to MCP server: ${serverName}`);
}
return entry;
}
/**
* Cleanup MCP clients
*/
override cleanup(): void {
if (this.context.debugMode) {
console.error(
`[MCPController] Cleaning up ${this.context.mcpClients.size} MCP clients`,
);
}
// Close all MCP clients
for (const [serverName, { client }] of this.context.mcpClients.entries()) {
try {
client.close();
} catch (error) {
if (this.context.debugMode) {
console.error(
`[MCPController] Failed to close MCP client ${serverName}:`,
error,
);
}
}
}
this.context.mcpClients.clear();
}
}

View File

@@ -0,0 +1,480 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Permission Controller
*
* Handles permission-related control requests:
* - can_use_tool: Check if tool usage is allowed
* - set_permission_mode: Change permission mode at runtime
*
* Abstracts all permission logic from the session manager to keep it clean.
*/
import type {
ToolCallRequestInfo,
WaitingToolCall,
} from '@qwen-code/qwen-code-core';
import { ToolConfirmationOutcome } from '@qwen-code/qwen-code-core';
import type {
CLIControlPermissionRequest,
CLIControlSetPermissionModeRequest,
ControlRequestPayload,
PermissionMode,
PermissionSuggestion,
} from '../../../types/protocol.js';
import { BaseController } from './baseController.js';
// Import ToolCallConfirmationDetails types for type alignment
type ToolConfirmationType = 'edit' | 'exec' | 'mcp' | 'info' | 'plan';
export class PermissionController extends BaseController {
private pendingOutgoingRequests = new Set<string>();
/**
* Handle permission control requests
*/
protected async handleRequestPayload(
payload: ControlRequestPayload,
_signal: AbortSignal,
): Promise<Record<string, unknown>> {
switch (payload.subtype) {
case 'can_use_tool':
return this.handleCanUseTool(payload as CLIControlPermissionRequest);
case 'set_permission_mode':
return this.handleSetPermissionMode(
payload as CLIControlSetPermissionModeRequest,
);
default:
throw new Error(`Unsupported request subtype in PermissionController`);
}
}
/**
* Handle can_use_tool request
*
* Comprehensive permission evaluation based on:
* - Permission mode (approval level)
* - Tool registry validation
* - Error handling with safe defaults
*/
private async handleCanUseTool(
payload: CLIControlPermissionRequest,
): Promise<Record<string, unknown>> {
const toolName = payload.tool_name;
if (
!toolName ||
typeof toolName !== 'string' ||
toolName.trim().length === 0
) {
return {
subtype: 'can_use_tool',
behavior: 'deny',
message: 'Missing or invalid tool_name in can_use_tool request',
};
}
let behavior: 'allow' | 'deny' = 'allow';
let message: string | undefined;
try {
// Check permission mode first
const permissionResult = this.checkPermissionMode();
if (!permissionResult.allowed) {
behavior = 'deny';
message = permissionResult.message;
}
// Check tool registry if permission mode allows
if (behavior === 'allow') {
const registryResult = this.checkToolRegistry(toolName);
if (!registryResult.allowed) {
behavior = 'deny';
message = registryResult.message;
}
}
} catch (error) {
behavior = 'deny';
message =
error instanceof Error
? `Failed to evaluate tool permission: ${error.message}`
: 'Failed to evaluate tool permission';
}
const response: Record<string, unknown> = {
subtype: 'can_use_tool',
behavior,
};
if (message) {
response['message'] = message;
}
return response;
}
/**
* Check permission mode for tool execution
*/
private checkPermissionMode(): { allowed: boolean; message?: string } {
const mode = this.context.permissionMode;
// Map permission modes to approval logic (aligned with VALID_APPROVAL_MODE_VALUES)
switch (mode) {
case 'yolo': // Allow all tools
case 'auto-edit': // Auto-approve edit operations
case 'plan': // Auto-approve planning operations
return { allowed: true };
case 'default': // TODO: allow all tools for test
default:
return {
allowed: false,
message:
'Tool execution requires manual approval. Update permission mode or approve via host.',
};
}
}
/**
* Check if tool exists in registry
*/
private checkToolRegistry(toolName: string): {
allowed: boolean;
message?: string;
} {
try {
// Access tool registry through config
const config = this.context.config;
const registryProvider = config as unknown as {
getToolRegistry?: () => {
getTool?: (name: string) => unknown;
};
};
if (typeof registryProvider.getToolRegistry === 'function') {
const registry = registryProvider.getToolRegistry();
if (
registry &&
typeof registry.getTool === 'function' &&
!registry.getTool(toolName)
) {
return {
allowed: false,
message: `Tool "${toolName}" is not registered.`,
};
}
}
return { allowed: true };
} catch (error) {
return {
allowed: false,
message: `Failed to check tool registry: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
}
/**
* Handle set_permission_mode request
*
* Updates the permission mode in the context
*/
private async handleSetPermissionMode(
payload: CLIControlSetPermissionModeRequest,
): Promise<Record<string, unknown>> {
const mode = payload.mode;
const validModes: PermissionMode[] = [
'default',
'plan',
'auto-edit',
'yolo',
];
if (!validModes.includes(mode)) {
throw new Error(
`Invalid permission mode: ${mode}. Valid values are: ${validModes.join(', ')}`,
);
}
this.context.permissionMode = mode;
if (this.context.debugMode) {
console.error(
`[PermissionController] Permission mode updated to: ${mode}`,
);
}
return { status: 'updated', mode };
}
/**
* Build permission suggestions for tool confirmation UI
*
* This method creates UI suggestions based on tool confirmation details,
* helping the host application present appropriate permission options.
*/
buildPermissionSuggestions(
confirmationDetails: unknown,
): PermissionSuggestion[] | null {
if (
!confirmationDetails ||
typeof confirmationDetails !== 'object' ||
!('type' in confirmationDetails)
) {
return null;
}
const details = confirmationDetails as Record<string, unknown>;
const type = String(details['type'] ?? '');
const title =
typeof details['title'] === 'string' ? details['title'] : undefined;
// Ensure type matches ToolCallConfirmationDetails union
const confirmationType = type as ToolConfirmationType;
switch (confirmationType) {
case 'exec': // ToolExecuteConfirmationDetails
return [
{
type: 'allow',
label: 'Allow Command',
description: `Execute: ${details['command']}`,
},
{
type: 'deny',
label: 'Deny',
description: 'Block this command execution',
},
];
case 'edit': // ToolEditConfirmationDetails
return [
{
type: 'allow',
label: 'Allow Edit',
description: `Edit file: ${details['fileName']}`,
},
{
type: 'deny',
label: 'Deny',
description: 'Block this file edit',
},
{
type: 'modify',
label: 'Review Changes',
description: 'Review the proposed changes before applying',
},
];
case 'plan': // ToolPlanConfirmationDetails
return [
{
type: 'allow',
label: 'Approve Plan',
description: title || 'Execute the proposed plan',
},
{
type: 'deny',
label: 'Reject Plan',
description: 'Do not execute this plan',
},
];
case 'mcp': // ToolMcpConfirmationDetails
return [
{
type: 'allow',
label: 'Allow MCP Call',
description: `${details['serverName']}: ${details['toolName']}`,
},
{
type: 'deny',
label: 'Deny',
description: 'Block this MCP server call',
},
];
case 'info': // ToolInfoConfirmationDetails
return [
{
type: 'allow',
label: 'Allow Info Request',
description: title || 'Allow information request',
},
{
type: 'deny',
label: 'Deny',
description: 'Block this information request',
},
];
default:
// Fallback for unknown types
return [
{
type: 'allow',
label: 'Allow',
description: title || `Allow ${type} operation`,
},
{
type: 'deny',
label: 'Deny',
description: `Block ${type} operation`,
},
];
}
}
/**
* Check if a tool should be executed based on current permission settings
*
* This is a convenience method for direct tool execution checks without
* going through the control request flow.
*/
async shouldAllowTool(
toolRequest: ToolCallRequestInfo,
confirmationDetails?: unknown,
): Promise<{
allowed: boolean;
message?: string;
updatedArgs?: Record<string, unknown>;
}> {
// Check permission mode
const modeResult = this.checkPermissionMode();
if (!modeResult.allowed) {
return {
allowed: false,
message: modeResult.message,
};
}
// Check tool registry
const registryResult = this.checkToolRegistry(toolRequest.name);
if (!registryResult.allowed) {
return {
allowed: false,
message: registryResult.message,
};
}
// If we have confirmation details, we could potentially modify args
// This is a hook for future enhancement
if (confirmationDetails) {
// Future: handle argument modifications based on confirmation details
}
return { allowed: true };
}
/**
* Get callback for monitoring tool calls and handling outgoing permission requests
* This is passed to executeToolCall to hook into CoreToolScheduler updates
*/
getToolCallUpdateCallback(): (toolCalls: unknown[]) => void {
return (toolCalls: unknown[]) => {
for (const call of toolCalls) {
if (
call &&
typeof call === 'object' &&
(call as { status?: string }).status === 'awaiting_approval'
) {
const awaiting = call as WaitingToolCall;
if (
typeof awaiting.confirmationDetails?.onConfirm === 'function' &&
!this.pendingOutgoingRequests.has(awaiting.request.callId)
) {
this.pendingOutgoingRequests.add(awaiting.request.callId);
void this.handleOutgoingPermissionRequest(awaiting);
}
}
}
};
}
/**
* Handle outgoing permission request
*
* Behavior depends on input format:
* - stream-json mode: Send can_use_tool to SDK and await response
* - Other modes: Check local approval mode and decide immediately
*/
private async handleOutgoingPermissionRequest(
toolCall: WaitingToolCall,
): Promise<void> {
try {
const inputFormat = this.context.config.getInputFormat?.();
const isStreamJsonMode = inputFormat === 'stream-json';
if (!isStreamJsonMode) {
// No SDK available - use local permission check
const modeCheck = this.checkPermissionMode();
const outcome = modeCheck.allowed
? ToolConfirmationOutcome.ProceedOnce
: ToolConfirmationOutcome.Cancel;
await toolCall.confirmationDetails.onConfirm(outcome);
return;
}
// Stream-json mode: ask SDK for permission
const permissionSuggestions = this.buildPermissionSuggestions(
toolCall.confirmationDetails,
);
const response = await this.sendControlRequest(
{
subtype: 'can_use_tool',
tool_name: toolCall.request.name,
tool_use_id: toolCall.request.callId,
input: toolCall.request.args,
permission_suggestions: permissionSuggestions,
blocked_path: null,
} as CLIControlPermissionRequest,
30000,
);
if (response.subtype !== 'success') {
await toolCall.confirmationDetails.onConfirm(
ToolConfirmationOutcome.Cancel,
);
return;
}
const payload = (response.response || {}) as Record<string, unknown>;
const behavior = String(payload['behavior'] || '').toLowerCase();
if (behavior === 'allow') {
// Handle updated input if provided
const updatedInput = payload['updatedInput'];
if (updatedInput && typeof updatedInput === 'object') {
toolCall.request.args = updatedInput as Record<string, unknown>;
}
await toolCall.confirmationDetails.onConfirm(
ToolConfirmationOutcome.ProceedOnce,
);
} else {
await toolCall.confirmationDetails.onConfirm(
ToolConfirmationOutcome.Cancel,
);
}
} catch (error) {
if (this.context.debugMode) {
console.error(
'[PermissionController] Outgoing permission failed:',
error,
);
}
await toolCall.confirmationDetails.onConfirm(
ToolConfirmationOutcome.Cancel,
);
} finally {
this.pendingOutgoingRequests.delete(toolCall.request.callId);
}
}
}

View File

@@ -0,0 +1,292 @@
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* System Controller
*
* Handles system-level control requests:
* - initialize: Setup session and return system info
* - interrupt: Cancel current operations
* - set_model: Switch model (placeholder)
*/
import { BaseController } from './baseController.js';
import { CommandService } from '../../CommandService.js';
import { BuiltinCommandLoader } from '../../BuiltinCommandLoader.js';
import type {
ControlRequestPayload,
CLIControlInitializeRequest,
CLIControlSetModelRequest,
CLISystemMessage,
} from '../../../types/protocol.js';
export class SystemController extends BaseController {
/**
* Handle system control requests
*/
protected async handleRequestPayload(
payload: ControlRequestPayload,
_signal: AbortSignal,
): Promise<Record<string, unknown>> {
switch (payload.subtype) {
case 'initialize':
return this.handleInitialize(payload as CLIControlInitializeRequest);
case 'interrupt':
return this.handleInterrupt();
case 'set_model':
return this.handleSetModel(payload as CLIControlSetModelRequest);
case 'supported_commands':
return this.handleSupportedCommands();
default:
throw new Error(`Unsupported request subtype in SystemController`);
}
}
/**
* Handle initialize request
*
* Registers SDK MCP servers and returns capabilities
*/
private async handleInitialize(
payload: CLIControlInitializeRequest,
): Promise<Record<string, unknown>> {
// Register SDK MCP servers if provided
if (payload.sdkMcpServers && Array.isArray(payload.sdkMcpServers)) {
for (const serverName of payload.sdkMcpServers) {
this.context.sdkMcpServers.add(serverName);
}
}
// Build capabilities for response
const capabilities = this.buildControlCapabilities();
if (this.context.debugMode) {
console.error(
`[SystemController] Initialized with ${this.context.sdkMcpServers.size} SDK MCP servers`,
);
}
return {
subtype: 'initialize',
capabilities,
};
}
/**
* Send system message to SDK
*
* Called after successful initialize response is sent
*/
async sendSystemMessage(): Promise<void> {
const toolRegistry = this.context.config.getToolRegistry();
const tools = toolRegistry ? toolRegistry.getAllToolNames() : [];
const mcpServers = this.context.config.getMcpServers();
const mcpServerList = mcpServers
? Object.keys(mcpServers).map((name) => ({
name,
status: 'connected',
}))
: [];
// Load slash commands
const slashCommands = await this.loadSlashCommandNames();
// Build capabilities
const capabilities = this.buildControlCapabilities();
const systemMessage: CLISystemMessage = {
type: 'system',
subtype: 'init',
uuid: this.context.sessionId,
session_id: this.context.sessionId,
cwd: this.context.config.getTargetDir(),
tools,
mcp_servers: mcpServerList,
model: this.context.config.getModel(),
permissionMode: this.context.permissionMode,
slash_commands: slashCommands,
apiKeySource: 'none',
qwen_code_version: this.context.config.getCliVersion() || 'unknown',
output_style: 'default',
agents: [],
skills: [],
capabilities,
};
this.context.streamJson.send(systemMessage);
if (this.context.debugMode) {
console.error('[SystemController] System message sent');
}
}
/**
* Build control capabilities for initialize response
*/
private buildControlCapabilities(): Record<string, unknown> {
const capabilities: Record<string, unknown> = {
can_handle_can_use_tool: true,
can_handle_hook_callback: true,
can_set_permission_mode:
typeof this.context.config.setApprovalMode === 'function',
can_set_model: typeof this.context.config.setModel === 'function',
};
// Check if MCP message handling is available
try {
const mcpProvider = this.context.config as unknown as {
getMcpServers?: () => Record<string, unknown> | undefined;
};
if (typeof mcpProvider.getMcpServers === 'function') {
const servers = mcpProvider.getMcpServers();
capabilities['can_handle_mcp_message'] = Boolean(
servers && Object.keys(servers).length > 0,
);
} else {
capabilities['can_handle_mcp_message'] = false;
}
} catch (error) {
if (this.context.debugMode) {
console.error(
'[SystemController] Failed to determine MCP capability:',
error,
);
}
capabilities['can_handle_mcp_message'] = false;
}
return capabilities;
}
/**
* Handle interrupt request
*
* Triggers the interrupt callback to cancel current operations
*/
private async handleInterrupt(): Promise<Record<string, unknown>> {
// Trigger interrupt callback if available
if (this.context.onInterrupt) {
this.context.onInterrupt();
}
// Abort the main signal to cancel ongoing operations
if (this.context.abortSignal && !this.context.abortSignal.aborted) {
// Note: We can't directly abort the signal, but the onInterrupt callback should handle this
if (this.context.debugMode) {
console.error('[SystemController] Interrupt signal triggered');
}
}
if (this.context.debugMode) {
console.error('[SystemController] Interrupt handled');
}
return { subtype: 'interrupt' };
}
/**
* Handle set_model request
*
* Implements actual model switching with validation and error handling
*/
private async handleSetModel(
payload: CLIControlSetModelRequest,
): Promise<Record<string, unknown>> {
const model = payload.model;
// Validate model parameter
if (typeof model !== 'string' || model.trim() === '') {
throw new Error('Invalid model specified for set_model request');
}
try {
// Attempt to set the model using config
await this.context.config.setModel(model);
if (this.context.debugMode) {
console.error(`[SystemController] Model switched to: ${model}`);
}
return {
subtype: 'set_model',
model,
};
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : 'Failed to set model';
if (this.context.debugMode) {
console.error(
`[SystemController] Failed to set model ${model}:`,
error,
);
}
throw new Error(errorMessage);
}
}
/**
* Handle supported_commands request
*
* Returns list of supported control commands
*
* Note: This list should match the ControlRequestType enum in
* packages/sdk/typescript/src/types/controlRequests.ts
*/
private async handleSupportedCommands(): Promise<Record<string, unknown>> {
const commands = [
'initialize',
'interrupt',
'set_model',
'supported_commands',
'can_use_tool',
'set_permission_mode',
'mcp_message',
'mcp_server_status',
'hook_callback',
];
return {
subtype: 'supported_commands',
commands,
};
}
/**
* Load slash command names using CommandService
*/
private async loadSlashCommandNames(): Promise<string[]> {
const controller = new AbortController();
try {
const service = await CommandService.create(
[new BuiltinCommandLoader(this.context.config)],
controller.signal,
);
const names = new Set<string>();
const commands = service.getCommands();
for (const command of commands) {
names.add(command.name);
}
return Array.from(names).sort();
} catch (error) {
if (this.context.debugMode) {
console.error(
'[SystemController] Failed to load slash commands:',
error,
);
}
return [];
} finally {
controller.abort();
}
}
}