feat: create draft framework for cli & sdk

This commit is contained in:
mingholy.lmh
2025-11-23 19:36:31 +08:00
parent 6729980b47
commit e1ffaec499
58 changed files with 8982 additions and 668 deletions

View File

@@ -0,0 +1,66 @@
export { query } from './query/createQuery.js';
export { Query } from './query/Query.js';
export type { ExternalMcpServerConfig } from './types/queryOptionsSchema.js';
export type { QueryOptions } from './query/createQuery.js';
export type {
ContentBlock,
TextBlock,
ThinkingBlock,
ToolUseBlock,
ToolResultBlock,
CLIUserMessage,
CLIAssistantMessage,
CLISystemMessage,
CLIResultMessage,
CLIPartialAssistantMessage,
CLIMessage,
} from './types/protocol.js';
export {
isCLIUserMessage,
isCLIAssistantMessage,
isCLISystemMessage,
isCLIResultMessage,
isCLIPartialAssistantMessage,
} from './types/protocol.js';
export { AbortError, isAbortError } from './types/errors.js';
export { ControlRequestType } from './types/protocol.js';
export { ProcessTransport } from './transport/ProcessTransport.js';
export type { Transport } from './transport/Transport.js';
export { Stream } from './utils/Stream.js';
export {
serializeJsonLine,
parseJsonLineSafe,
isValidMessage,
parseJsonLinesStream,
} from './utils/jsonLines.js';
export {
findCliPath,
resolveCliPath,
prepareSpawnInfo,
} from './utils/cliPath.js';
export type { SpawnInfo } from './utils/cliPath.js';
export { createSdkMcpServer } from './mcp/createSdkMcpServer.js';
export {
tool,
createTool,
validateToolName,
validateInputSchema,
} from './mcp/tool.js';
export type {
JSONSchema,
ToolDefinition,
PermissionMode,
CanUseTool,
PermissionResult,
} from './types/types.js';

View File

@@ -0,0 +1,111 @@
/**
* SdkControlServerTransport - bridges MCP Server with Query's control plane
*
* Implements @modelcontextprotocol/sdk Transport interface to enable
* SDK-embedded MCP servers. Messages flow bidirectionally:
*
* MCP Server → send() → Query → control_request (mcp_message) → CLI
* CLI → control_request (mcp_message) → Query → handleMessage() → MCP Server
*/
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
export type SendToQueryCallback = (message: JSONRPCMessage) => Promise<void>;
export interface SdkControlServerTransportOptions {
sendToQuery: SendToQueryCallback;
serverName: string;
}
export class SdkControlServerTransport {
sendToQuery: SendToQueryCallback;
private serverName: string;
private started = false;
onmessage?: (message: JSONRPCMessage) => void;
onerror?: (error: Error) => void;
onclose?: () => void;
constructor(options: SdkControlServerTransportOptions) {
this.sendToQuery = options.sendToQuery;
this.serverName = options.serverName;
}
async start(): Promise<void> {
this.started = true;
}
async send(message: JSONRPCMessage): Promise<void> {
if (!this.started) {
throw new Error(
`SdkControlServerTransport (${this.serverName}) not started. Call start() first.`,
);
}
try {
// Send via Query's control plane
await this.sendToQuery(message);
} catch (error) {
// Invoke error callback if set
if (this.onerror) {
this.onerror(error instanceof Error ? error : new Error(String(error)));
}
throw error;
}
}
async close(): Promise<void> {
if (!this.started) {
return; // Already closed
}
this.started = false;
// Notify MCP Server
if (this.onclose) {
this.onclose();
}
}
handleMessage(message: JSONRPCMessage): void {
if (!this.started) {
console.warn(
`[SdkControlServerTransport] Received message for closed transport (${this.serverName})`,
);
return;
}
if (this.onmessage) {
this.onmessage(message);
} else {
console.warn(
`[SdkControlServerTransport] No onmessage handler set for ${this.serverName}`,
);
}
}
handleError(error: Error): void {
if (this.onerror) {
this.onerror(error);
} else {
console.error(
`[SdkControlServerTransport] Error for ${this.serverName}:`,
error,
);
}
}
isStarted(): boolean {
return this.started;
}
getServerName(): string {
return this.serverName;
}
}
export function createSdkControlServerTransport(
options: SdkControlServerTransportOptions,
): SdkControlServerTransport {
return new SdkControlServerTransport(options);
}

View File

@@ -0,0 +1,109 @@
/**
* Factory function to create SDK-embedded MCP servers
*
* Creates MCP Server instances that run in the user's Node.js process
* and are proxied to the CLI via the control plane.
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import {
ListToolsRequestSchema,
CallToolRequestSchema,
type CallToolResultSchema,
} from '@modelcontextprotocol/sdk/types.js';
import type { ToolDefinition } from '../types/types.js';
import { formatToolResult, formatToolError } from './formatters.js';
import { validateToolName } from './tool.js';
import type { z } from 'zod';
type CallToolResult = z.infer<typeof CallToolResultSchema>;
export function createSdkMcpServer(
name: string,
version: string,
tools: ToolDefinition[],
): Server {
// Validate server name
if (!name || typeof name !== 'string') {
throw new Error('MCP server name must be a non-empty string');
}
if (!version || typeof version !== 'string') {
throw new Error('MCP server version must be a non-empty string');
}
if (!Array.isArray(tools)) {
throw new Error('Tools must be an array');
}
// Validate tool names are unique
const toolNames = new Set<string>();
for (const tool of tools) {
validateToolName(tool.name);
if (toolNames.has(tool.name)) {
throw new Error(
`Duplicate tool name '${tool.name}' in MCP server '${name}'`,
);
}
toolNames.add(tool.name);
}
// Create MCP Server instance
const server = new Server(
{
name,
version,
},
{
capabilities: {
tools: {},
},
},
);
// Create tool map for fast lookup
const toolMap = new Map<string, ToolDefinition>();
for (const tool of tools) {
toolMap.set(tool.name, tool);
}
// Register list_tools handler
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: tools.map((tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
})),
}));
// Register call_tool handler
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name: toolName, arguments: toolArgs } = request.params;
// Find tool
const tool = toolMap.get(toolName);
if (!tool) {
return formatToolError(
new Error(`Tool '${toolName}' not found in server '${name}'`),
) as CallToolResult;
}
try {
// Invoke tool handler
const result = await tool.handler(toolArgs);
// Format result
return formatToolResult(result) as CallToolResult;
} catch (error) {
// Handle tool execution error
return formatToolError(
error instanceof Error
? error
: new Error(`Tool '${toolName}' failed: ${String(error)}`),
) as CallToolResult;
}
});
return server;
}

View File

@@ -0,0 +1,194 @@
/**
* Tool result formatting utilities for MCP responses
*
* Converts various output types to MCP content blocks.
*/
export type McpContentBlock =
| { type: 'text'; text: string }
| { type: 'image'; data: string; mimeType: string }
| { type: 'resource'; uri: string; mimeType?: string; text?: string };
export interface ToolResult {
content: McpContentBlock[];
isError?: boolean;
}
export function formatToolResult(result: unknown): ToolResult {
// Handle Error objects
if (result instanceof Error) {
return {
content: [
{
type: 'text',
text: result.message || 'Unknown error',
},
],
isError: true,
};
}
// Handle null/undefined
if (result === null || result === undefined) {
return {
content: [
{
type: 'text',
text: '',
},
],
};
}
// Handle string
if (typeof result === 'string') {
return {
content: [
{
type: 'text',
text: result,
},
],
};
}
// Handle number
if (typeof result === 'number') {
return {
content: [
{
type: 'text',
text: String(result),
},
],
};
}
// Handle boolean
if (typeof result === 'boolean') {
return {
content: [
{
type: 'text',
text: String(result),
},
],
};
}
// Handle object (including arrays)
if (typeof result === 'object') {
try {
return {
content: [
{
type: 'text',
text: JSON.stringify(result, null, 2),
},
],
};
} catch {
// JSON.stringify failed
return {
content: [
{
type: 'text',
text: String(result),
},
],
};
}
}
// Fallback: convert to string
return {
content: [
{
type: 'text',
text: String(result),
},
],
};
}
export function formatToolError(error: Error | string): ToolResult {
const message = error instanceof Error ? error.message : error;
return {
content: [
{
type: 'text',
text: message,
},
],
isError: true,
};
}
export function formatTextResult(text: string): ToolResult {
return {
content: [
{
type: 'text',
text,
},
],
};
}
export function formatJsonResult(data: unknown): ToolResult {
return {
content: [
{
type: 'text',
text: JSON.stringify(data, null, 2),
},
],
};
}
export function mergeToolResults(results: ToolResult[]): ToolResult {
const mergedContent: McpContentBlock[] = [];
let hasError = false;
for (const result of results) {
mergedContent.push(...result.content);
if (result.isError) {
hasError = true;
}
}
return {
content: mergedContent,
isError: hasError,
};
}
export function isValidContentBlock(block: unknown): block is McpContentBlock {
if (!block || typeof block !== 'object') {
return false;
}
const blockObj = block as Record<string, unknown>;
if (!blockObj.type || typeof blockObj.type !== 'string') {
return false;
}
switch (blockObj.type) {
case 'text':
return typeof blockObj.text === 'string';
case 'image':
return (
typeof blockObj.data === 'string' &&
typeof blockObj.mimeType === 'string'
);
case 'resource':
return typeof blockObj.uri === 'string';
default:
return false;
}
}

View File

@@ -0,0 +1,91 @@
/**
* Tool definition helper for SDK-embedded MCP servers
*
* Provides type-safe tool definitions with generic input/output types.
*/
import type { ToolDefinition } from '../types/types.js';
export function tool<TInput = unknown, TOutput = unknown>(
def: ToolDefinition<TInput, TOutput>,
): ToolDefinition<TInput, TOutput> {
// Validate tool definition
if (!def.name || typeof def.name !== 'string') {
throw new Error('Tool definition must have a name (string)');
}
if (!def.description || typeof def.description !== 'string') {
throw new Error(
`Tool definition for '${def.name}' must have a description (string)`,
);
}
if (!def.inputSchema || typeof def.inputSchema !== 'object') {
throw new Error(
`Tool definition for '${def.name}' must have an inputSchema (object)`,
);
}
if (!def.handler || typeof def.handler !== 'function') {
throw new Error(
`Tool definition for '${def.name}' must have a handler (function)`,
);
}
// Return definition (pass-through for type safety)
return def;
}
export function validateToolName(name: string): void {
if (!name) {
throw new Error('Tool name cannot be empty');
}
if (name.length > 64) {
throw new Error(
`Tool name '${name}' is too long (max 64 characters): ${name.length}`,
);
}
if (!/^[a-zA-Z][a-zA-Z0-9_]*$/.test(name)) {
throw new Error(
`Tool name '${name}' is invalid. Must start with a letter and contain only letters, numbers, and underscores.`,
);
}
}
export function validateInputSchema(schema: unknown): void {
if (!schema || typeof schema !== 'object') {
throw new Error('Input schema must be an object');
}
const schemaObj = schema as Record<string, unknown>;
if (!schemaObj.type) {
throw new Error('Input schema must have a type field');
}
// For object schemas, validate properties
if (schemaObj.type === 'object') {
if (schemaObj.properties && typeof schemaObj.properties !== 'object') {
throw new Error('Input schema properties must be an object');
}
if (schemaObj.required && !Array.isArray(schemaObj.required)) {
throw new Error('Input schema required must be an array');
}
}
}
export function createTool<TInput = unknown, TOutput = unknown>(
def: ToolDefinition<TInput, TOutput>,
): ToolDefinition<TInput, TOutput> {
// Validate via tool() function
const validated = tool(def);
// Additional validation
validateToolName(validated.name);
validateInputSchema(validated.inputSchema);
return validated;
}

View File

@@ -0,0 +1,738 @@
/**
* Query class - Main orchestrator for SDK
*
* Manages SDK workflow, routes messages, and handles lifecycle.
* Implements AsyncIterator protocol for message consumption.
*/
const PERMISSION_CALLBACK_TIMEOUT = 30000;
const MCP_REQUEST_TIMEOUT = 30000;
const CONTROL_REQUEST_TIMEOUT = 30000;
const STREAM_CLOSE_TIMEOUT = 10000;
import { randomUUID } from 'node:crypto';
import type {
CLIMessage,
CLIUserMessage,
CLIControlRequest,
CLIControlResponse,
ControlCancelRequest,
PermissionSuggestion,
} from '../types/protocol.js';
import {
isCLIUserMessage,
isCLIAssistantMessage,
isCLISystemMessage,
isCLIResultMessage,
isCLIPartialAssistantMessage,
isControlRequest,
isControlResponse,
isControlCancel,
} from '../types/protocol.js';
import type { Transport } from '../transport/Transport.js';
import { type QueryOptions } from '../types/queryOptionsSchema.js';
import { Stream } from '../utils/Stream.js';
import { serializeJsonLine } from '../utils/jsonLines.js';
import { AbortError } from '../types/errors.js';
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import type { SdkControlServerTransport } from '../mcp/SdkControlServerTransport.js';
import { ControlRequestType } from '../types/protocol.js';
interface PendingControlRequest {
resolve: (response: Record<string, unknown> | null) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
abortController: AbortController;
}
interface TransportWithEndInput extends Transport {
endInput(): void;
}
export class Query implements AsyncIterable<CLIMessage> {
private transport: Transport;
private options: QueryOptions;
private sessionId: string;
private inputStream: Stream<CLIMessage>;
private sdkMessages: AsyncGenerator<CLIMessage>;
private abortController: AbortController;
private pendingControlRequests: Map<string, PendingControlRequest> =
new Map();
private sdkMcpTransports: Map<string, SdkControlServerTransport> = new Map();
readonly initialized: Promise<void>;
private closed = false;
private messageRouterStarted = false;
private firstResultReceivedPromise?: Promise<void>;
private firstResultReceivedResolve?: () => void;
private readonly isSingleTurn: boolean;
constructor(
transport: Transport,
options: QueryOptions,
singleTurn: boolean = false,
) {
this.transport = transport;
this.options = options;
this.sessionId = randomUUID();
this.inputStream = new Stream<CLIMessage>();
this.abortController = options.abortController ?? new AbortController();
this.isSingleTurn = singleTurn;
/**
* Create async generator proxy to ensure stream.next() is called at least once.
* The generator will start iterating when the user begins iteration.
* This ensures readResolve/readReject are set up as soon as iteration starts.
* If errors occur before iteration starts, they'll be stored in hasError and
* properly rejected when the user starts iterating.
*/
this.sdkMessages = this.readSdkMessages();
this.firstResultReceivedPromise = new Promise((resolve) => {
this.firstResultReceivedResolve = resolve;
});
/**
* Handle abort signal if controller is provided and already aborted or will be aborted.
* If already aborted, set error immediately. Otherwise, listen for abort events
* and set abort error on the stream before closing.
*/
if (this.abortController.signal.aborted) {
this.inputStream.error(new AbortError('Query aborted by user'));
this.close().catch((err) => {
console.error('[Query] Error during abort cleanup:', err);
});
} else {
this.abortController.signal.addEventListener('abort', () => {
this.inputStream.error(new AbortError('Query aborted by user'));
this.close().catch((err) => {
console.error('[Query] Error during abort cleanup:', err);
});
});
}
this.initialized = this.initialize();
this.initialized.catch(() => {});
this.startMessageRouter();
}
private async initialize(): Promise<void> {
try {
await this.setupSdkMcpServers();
const sdkMcpServerNames = Array.from(this.sdkMcpTransports.keys());
await this.sendControlRequest(ControlRequestType.INITIALIZE, {
hooks: null,
sdkMcpServers:
sdkMcpServerNames.length > 0 ? sdkMcpServerNames : undefined,
mcpServers: this.options.mcpServers,
});
} catch (error) {
console.error('[Query] Initialization error:', error);
throw error;
}
}
private async setupSdkMcpServers(): Promise<void> {
if (!this.options.sdkMcpServers) {
return;
}
const externalNames = Object.keys(this.options.mcpServers ?? {});
const sdkNames = Object.keys(this.options.sdkMcpServers);
const conflicts = sdkNames.filter((name) => externalNames.includes(name));
if (conflicts.length > 0) {
throw new Error(
`MCP server name conflicts between mcpServers and sdkMcpServers: ${conflicts.join(', ')}`,
);
}
/**
* Import SdkControlServerTransport dynamically to avoid circular dependencies.
* Create transport for each server that sends MCP messages via control plane.
*/
const { SdkControlServerTransport } = await import(
'../mcp/SdkControlServerTransport.js'
);
for (const [name, server] of Object.entries(this.options.sdkMcpServers)) {
const transport = new SdkControlServerTransport({
serverName: name,
sendToQuery: async (message: JSONRPCMessage) => {
await this.sendControlRequest(ControlRequestType.MCP_MESSAGE, {
server_name: name,
message,
});
},
});
await transport.start();
await server.connect(transport);
this.sdkMcpTransports.set(name, transport);
}
}
private startMessageRouter(): void {
if (this.messageRouterStarted) {
return;
}
this.messageRouterStarted = true;
(async () => {
try {
for await (const message of this.transport.readMessages()) {
await this.routeMessage(message);
if (this.closed) {
break;
}
}
if (this.abortController.signal.aborted) {
this.inputStream.error(new AbortError('Query aborted'));
} else {
this.inputStream.done();
}
} catch (error) {
this.inputStream.error(
error instanceof Error ? error : new Error(String(error)),
);
}
})();
}
private async routeMessage(message: unknown): Promise<void> {
if (isControlRequest(message)) {
await this.handleControlRequest(message);
return;
}
if (isControlResponse(message)) {
this.handleControlResponse(message);
return;
}
if (isControlCancel(message)) {
this.handleControlCancelRequest(message);
return;
}
if (isCLISystemMessage(message)) {
/**
* SystemMessage contains session info (cwd, tools, model, etc.)
* that should be passed to user.
*/
this.inputStream.enqueue(message);
return;
}
if (isCLIResultMessage(message)) {
if (this.firstResultReceivedResolve) {
this.firstResultReceivedResolve();
}
/**
* In single-turn mode, automatically close input after receiving result
* to signal completion to the CLI.
*/
if (this.isSingleTurn && 'endInput' in this.transport) {
(this.transport as TransportWithEndInput).endInput();
}
this.inputStream.enqueue(message);
return;
}
if (
isCLIAssistantMessage(message) ||
isCLIUserMessage(message) ||
isCLIPartialAssistantMessage(message)
) {
this.inputStream.enqueue(message);
return;
}
if (process.env['DEBUG']) {
console.warn('[Query] Unknown message type:', message);
}
this.inputStream.enqueue(message as CLIMessage);
}
private async handleControlRequest(
request: CLIControlRequest,
): Promise<void> {
const { request_id, request: payload } = request;
const requestAbortController = new AbortController();
try {
let response: Record<string, unknown> | null = null;
switch (payload.subtype) {
case 'can_use_tool':
response = await this.handlePermissionRequest(
payload.tool_name,
payload.input as Record<string, unknown>,
payload.permission_suggestions,
requestAbortController.signal,
);
break;
case 'mcp_message':
response = await this.handleMcpMessage(
payload.server_name,
payload.message as unknown as JSONRPCMessage,
);
break;
default:
throw new Error(
`Unknown control request subtype: ${payload.subtype}`,
);
}
await this.sendControlResponse(request_id, true, response);
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : String(error);
await this.sendControlResponse(request_id, false, errorMessage);
}
}
private async handlePermissionRequest(
toolName: string,
toolInput: Record<string, unknown>,
permissionSuggestions: PermissionSuggestion[] | null,
signal: AbortSignal,
): Promise<Record<string, unknown>> {
/* Default deny all wildcard tool requests */
if (!this.options.canUseTool) {
return { behavior: 'deny', message: 'Denied' };
}
try {
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error('Permission callback timeout')),
PERMISSION_CALLBACK_TIMEOUT,
);
});
const result = await Promise.race([
Promise.resolve(
this.options.canUseTool(toolName, toolInput, {
signal,
suggestions: permissionSuggestions,
}),
),
timeoutPromise,
]);
// Handle boolean return (backward compatibility)
if (typeof result === 'boolean') {
return result
? { behavior: 'allow', updatedInput: toolInput }
: { behavior: 'deny', message: 'Denied' };
}
// Handle PermissionResult format
const permissionResult = result as {
behavior: 'allow' | 'deny';
updatedInput?: Record<string, unknown>;
message?: string;
interrupt?: boolean;
};
if (permissionResult.behavior === 'allow') {
return {
behavior: 'allow',
updatedInput: permissionResult.updatedInput ?? toolInput,
};
} else {
return {
behavior: 'deny',
message: permissionResult.message ?? 'Denied',
...(permissionResult.interrupt !== undefined
? { interrupt: permissionResult.interrupt }
: {}),
};
}
} catch (error) {
/**
* Timeout or error → deny (fail-safe).
* This ensures that any issues with the permission callback
* result in a safe default of denying access.
*/
const errorMessage =
error instanceof Error ? error.message : String(error);
console.warn(
'[Query] Permission callback error (denying by default):',
errorMessage,
);
return {
behavior: 'deny',
message: `Permission check failed: ${errorMessage}`,
};
}
}
private async handleMcpMessage(
serverName: string,
message: JSONRPCMessage,
): Promise<Record<string, unknown>> {
const transport = this.sdkMcpTransports.get(serverName);
if (!transport) {
throw new Error(
`MCP server '${serverName}' not found in SDK-embedded servers`,
);
}
/**
* Check if this is a request (has method and id) or notification.
* Requests need to wait for a response, while notifications are just routed.
*/
const isRequest =
'method' in message && 'id' in message && message.id !== null;
if (isRequest) {
const response = await this.handleMcpRequest(
serverName,
message,
transport,
);
return { mcp_response: response };
} else {
transport.handleMessage(message);
return { mcp_response: { jsonrpc: '2.0', result: {}, id: 0 } };
}
}
private handleMcpRequest(
_serverName: string,
message: JSONRPCMessage,
transport: SdkControlServerTransport,
): Promise<JSONRPCMessage> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('MCP request timeout'));
}, MCP_REQUEST_TIMEOUT);
const messageId = 'id' in message ? message.id : null;
/**
* Hook into transport to capture response.
* Temporarily replace sendToQuery to intercept the response message
* matching this request's ID, then restore the original handler.
*/
const originalSend = transport.sendToQuery;
transport.sendToQuery = async (responseMessage: JSONRPCMessage) => {
if ('id' in responseMessage && responseMessage.id === messageId) {
clearTimeout(timeout);
transport.sendToQuery = originalSend;
resolve(responseMessage);
}
return originalSend(responseMessage);
};
transport.handleMessage(message);
});
}
private handleControlResponse(response: CLIControlResponse): void {
const { response: payload } = response;
const request_id = payload.request_id;
const pending = this.pendingControlRequests.get(request_id);
if (!pending) {
console.warn(
'[Query] Received response for unknown request:',
request_id,
);
return;
}
clearTimeout(pending.timeout);
this.pendingControlRequests.delete(request_id);
if (payload.subtype === 'success') {
pending.resolve(payload.response as Record<string, unknown> | null);
} else {
/**
* Extract error message from error field.
* Error can be either a string or an object with a message property.
*/
const errorMessage =
typeof payload.error === 'string'
? payload.error
: (payload.error?.message ?? 'Unknown error');
pending.reject(new Error(errorMessage));
}
}
private handleControlCancelRequest(request: ControlCancelRequest): void {
const { request_id } = request;
if (!request_id) {
console.warn('[Query] Received cancel request without request_id');
return;
}
const pending = this.pendingControlRequests.get(request_id);
if (pending) {
pending.abortController.abort();
clearTimeout(pending.timeout);
this.pendingControlRequests.delete(request_id);
pending.reject(new AbortError('Request cancelled'));
}
}
private async sendControlRequest(
subtype: string,
data: Record<string, unknown> = {},
): Promise<Record<string, unknown> | null> {
const requestId = randomUUID();
const request: CLIControlRequest = {
type: 'control_request',
request_id: requestId,
request: {
subtype: subtype as never,
...data,
} as CLIControlRequest['request'],
};
const responsePromise = new Promise<Record<string, unknown> | null>(
(resolve, reject) => {
const abortController = new AbortController();
const timeout = setTimeout(() => {
this.pendingControlRequests.delete(requestId);
reject(new Error(`Control request timeout: ${subtype}`));
}, CONTROL_REQUEST_TIMEOUT);
this.pendingControlRequests.set(requestId, {
resolve,
reject,
timeout,
abortController,
});
},
);
this.transport.write(serializeJsonLine(request));
return responsePromise;
}
private async sendControlResponse(
requestId: string,
success: boolean,
responseOrError: Record<string, unknown> | null | string,
): Promise<void> {
const response: CLIControlResponse = {
type: 'control_response',
response: success
? {
subtype: 'success',
request_id: requestId,
response: responseOrError as Record<string, unknown> | null,
}
: {
subtype: 'error',
request_id: requestId,
error: responseOrError as string,
},
};
this.transport.write(serializeJsonLine(response));
}
async close(): Promise<void> {
if (this.closed) {
return;
}
this.closed = true;
for (const pending of this.pendingControlRequests.values()) {
pending.abortController.abort();
clearTimeout(pending.timeout);
}
this.pendingControlRequests.clear();
await this.transport.close();
/**
* Complete input stream - check if aborted first.
* Only set error/done if stream doesn't already have an error state.
*/
if (this.inputStream.hasError === undefined) {
if (this.abortController.signal.aborted) {
this.inputStream.error(new AbortError('Query aborted'));
} else {
this.inputStream.done();
}
}
for (const transport of this.sdkMcpTransports.values()) {
try {
await transport.close();
} catch (error) {
console.error('[Query] Error closing MCP transport:', error);
}
}
this.sdkMcpTransports.clear();
}
private async *readSdkMessages(): AsyncGenerator<CLIMessage> {
for await (const message of this.inputStream) {
yield message;
}
}
async next(...args: [] | [unknown]): Promise<IteratorResult<CLIMessage>> {
return this.sdkMessages.next(...args);
}
async return(value?: unknown): Promise<IteratorResult<CLIMessage>> {
return this.sdkMessages.return(value);
}
async throw(e?: unknown): Promise<IteratorResult<CLIMessage>> {
return this.sdkMessages.throw(e);
}
[Symbol.asyncIterator](): AsyncIterator<CLIMessage> {
return this.sdkMessages;
}
async streamInput(messages: AsyncIterable<CLIUserMessage>): Promise<void> {
if (this.closed) {
throw new Error('Query is closed');
}
try {
/**
* Wait for initialization to complete before sending messages.
* This prevents "write after end" errors when streamInput is called
* with an empty iterable before initialization finishes.
*/
await this.initialized;
for await (const message of messages) {
if (this.abortController.signal.aborted) {
break;
}
this.transport.write(serializeJsonLine(message));
}
/**
* In multi-turn mode with MCP servers, wait for first result
* to ensure MCP servers have time to process before next input.
* This prevents race conditions where the next input arrives before
* MCP servers have finished processing the current request.
*/
if (
!this.isSingleTurn &&
this.sdkMcpTransports.size > 0 &&
this.firstResultReceivedPromise
) {
await Promise.race([
this.firstResultReceivedPromise,
new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, STREAM_CLOSE_TIMEOUT);
}),
]);
}
this.endInput();
} catch (error) {
if (this.abortController.signal.aborted) {
console.log('[Query] Aborted during input streaming');
this.inputStream.error(
new AbortError('Query aborted during input streaming'),
);
return;
}
throw error;
}
}
endInput(): void {
if (this.closed) {
throw new Error('Query is closed');
}
if (
'endInput' in this.transport &&
typeof this.transport.endInput === 'function'
) {
(this.transport as TransportWithEndInput).endInput();
}
}
async interrupt(): Promise<void> {
if (this.closed) {
throw new Error('Query is closed');
}
await this.sendControlRequest(ControlRequestType.INTERRUPT);
}
async setPermissionMode(mode: string): Promise<void> {
if (this.closed) {
throw new Error('Query is closed');
}
await this.sendControlRequest(ControlRequestType.SET_PERMISSION_MODE, {
mode,
});
}
async setModel(model: string): Promise<void> {
if (this.closed) {
throw new Error('Query is closed');
}
await this.sendControlRequest(ControlRequestType.SET_MODEL, { model });
}
/**
* Get list of control commands supported by the CLI
*
* @returns Promise resolving to list of supported command names
* @throws Error if query is closed
*/
async supportedCommands(): Promise<Record<string, unknown> | null> {
if (this.closed) {
throw new Error('Query is closed');
}
return this.sendControlRequest(ControlRequestType.SUPPORTED_COMMANDS);
}
/**
* Get the status of MCP servers
*
* @returns Promise resolving to MCP server status information
* @throws Error if query is closed
*/
async mcpServerStatus(): Promise<Record<string, unknown> | null> {
if (this.closed) {
throw new Error('Query is closed');
}
return this.sendControlRequest(ControlRequestType.MCP_SERVER_STATUS);
}
getSessionId(): string {
return this.sessionId;
}
isClosed(): boolean {
return this.closed;
}
}

View File

@@ -0,0 +1,139 @@
/**
* Factory function for creating Query instances.
*/
import type { CLIUserMessage } from '../types/protocol.js';
import { serializeJsonLine } from '../utils/jsonLines.js';
import { ProcessTransport } from '../transport/ProcessTransport.js';
import { parseExecutableSpec } from '../utils/cliPath.js';
import { Query } from './Query.js';
import {
QueryOptionsSchema,
type QueryOptions,
} from '../types/queryOptionsSchema.js';
export type { QueryOptions };
export function query({
prompt,
options = {},
}: {
prompt: string | AsyncIterable<CLIUserMessage>;
options?: QueryOptions;
}): Query {
// Validate options and obtain normalized executable metadata
const parsedExecutable = validateOptions(options);
// Determine if this is a single-turn or multi-turn query
// Single-turn: string prompt (simple Q&A)
// Multi-turn: AsyncIterable prompt (streaming conversation)
const isSingleTurn = typeof prompt === 'string';
// Resolve CLI specification while preserving explicit runtime directives
const pathToQwenExecutable =
options.pathToQwenExecutable ?? parsedExecutable.executablePath;
// Use provided abortController or create a new one
const abortController = options.abortController ?? new AbortController();
// Create transport with abortController
const transport = new ProcessTransport({
pathToQwenExecutable,
cwd: options.cwd,
model: options.model,
permissionMode: options.permissionMode,
mcpServers: options.mcpServers,
env: options.env,
abortController,
debug: options.debug,
stderr: options.stderr,
maxSessionTurns: options.maxSessionTurns,
coreTools: options.coreTools,
excludeTools: options.excludeTools,
authType: options.authType,
});
// Build query options with abortController
const queryOptions: QueryOptions = {
...options,
abortController,
};
// Create Query
const queryInstance = new Query(transport, queryOptions, isSingleTurn);
// Handle prompt based on type
if (isSingleTurn) {
// For single-turn queries, send the prompt directly via transport
const stringPrompt = prompt as string;
const message: CLIUserMessage = {
type: 'user',
session_id: queryInstance.getSessionId(),
message: {
role: 'user',
content: stringPrompt,
},
parent_tool_use_id: null,
};
(async () => {
try {
await queryInstance.initialized;
transport.write(serializeJsonLine(message));
} catch (err) {
console.error('[query] Error sending single-turn prompt:', err);
}
})();
} else {
queryInstance
.streamInput(prompt as AsyncIterable<CLIUserMessage>)
.catch((err) => {
console.error('[query] Error streaming input:', err);
});
}
return queryInstance;
}
/**
* Backward compatibility alias
* @deprecated Use query() instead
*/
export const createQuery = query;
function validateOptions(
options: QueryOptions,
): ReturnType<typeof parseExecutableSpec> {
// Validate options using Zod schema
const validationResult = QueryOptionsSchema.safeParse(options);
if (!validationResult.success) {
const errors = validationResult.error.errors
.map((err) => `${err.path.join('.')}: ${err.message}`)
.join('; ');
throw new Error(`Invalid QueryOptions: ${errors}`);
}
// Validate executable path early to provide clear error messages
let parsedExecutable: ReturnType<typeof parseExecutableSpec>;
try {
parsedExecutable = parseExecutableSpec(options.pathToQwenExecutable);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
throw new Error(`Invalid pathToQwenExecutable: ${errorMessage}`);
}
// Validate no MCP server name conflicts (cross-field validation not easily expressible in Zod)
if (options.mcpServers && options.sdkMcpServers) {
const externalNames = Object.keys(options.mcpServers);
const sdkNames = Object.keys(options.sdkMcpServers);
const conflicts = externalNames.filter((name) => sdkNames.includes(name));
if (conflicts.length > 0) {
throw new Error(
`MCP server name conflicts between mcpServers and sdkMcpServers: ${conflicts.join(', ')}`,
);
}
}
return parsedExecutable;
}

View File

@@ -0,0 +1,392 @@
import { spawn, type ChildProcess } from 'node:child_process';
import * as readline from 'node:readline';
import type { Writable, Readable } from 'node:stream';
import type { TransportOptions } from '../types/types.js';
import type { Transport } from './Transport.js';
import { parseJsonLinesStream } from '../utils/jsonLines.js';
import { prepareSpawnInfo } from '../utils/cliPath.js';
import { AbortError } from '../types/errors.js';
type ExitListener = {
callback: (error?: Error) => void;
handler: (code: number | null, signal: NodeJS.Signals | null) => void;
};
export class ProcessTransport implements Transport {
private childProcess: ChildProcess | null = null;
private childStdin: Writable | null = null;
private childStdout: Readable | null = null;
private options: TransportOptions;
private ready = false;
private _exitError: Error | null = null;
private closed = false;
private abortController: AbortController;
private exitListeners: ExitListener[] = [];
private processExitHandler: (() => void) | null = null;
private abortHandler: (() => void) | null = null;
constructor(options: TransportOptions) {
this.options = options;
this.abortController =
this.options.abortController ?? new AbortController();
this.initialize();
}
private initialize(): void {
try {
if (this.abortController.signal.aborted) {
throw new AbortError('Transport start aborted');
}
const cliArgs = this.buildCliArguments();
const cwd = this.options.cwd ?? process.cwd();
const env = { ...process.env, ...this.options.env };
const spawnInfo = prepareSpawnInfo(this.options.pathToQwenExecutable);
const stderrMode =
this.options.debug || this.options.stderr ? 'pipe' : 'ignore';
this.logForDebugging(
`Spawning CLI (${spawnInfo.type}): ${spawnInfo.command} ${[...spawnInfo.args, ...cliArgs].join(' ')}`,
);
this.childProcess = spawn(
spawnInfo.command,
[...spawnInfo.args, ...cliArgs],
{
cwd,
env,
stdio: ['pipe', 'pipe', stderrMode],
signal: this.abortController.signal,
},
);
this.childStdin = this.childProcess.stdin;
this.childStdout = this.childProcess.stdout;
if (this.options.debug || this.options.stderr) {
this.childProcess.stderr?.on('data', (data) => {
this.logForDebugging(data.toString());
});
}
const cleanup = (): void => {
if (this.childProcess && !this.childProcess.killed) {
this.childProcess.kill('SIGTERM');
}
};
this.processExitHandler = cleanup;
this.abortHandler = cleanup;
process.on('exit', this.processExitHandler);
this.abortController.signal.addEventListener('abort', this.abortHandler);
this.setupEventHandlers();
this.ready = true;
} catch (error) {
this.ready = false;
throw error;
}
}
private setupEventHandlers(): void {
if (!this.childProcess) return;
this.childProcess.on('error', (error) => {
this.ready = false;
if (this.abortController.signal.aborted) {
this._exitError = new AbortError('CLI process aborted by user');
} else {
this._exitError = new Error(`CLI process error: ${error.message}`);
this.logForDebugging(this._exitError.message);
}
});
this.childProcess.on('close', (code, signal) => {
this.ready = false;
if (this.abortController.signal.aborted) {
this._exitError = new AbortError('CLI process aborted by user');
} else {
const error = this.getProcessExitError(code, signal);
if (error) {
this._exitError = error;
this.logForDebugging(error.message);
}
}
const error = this._exitError;
for (const listener of this.exitListeners) {
try {
listener.callback(error || undefined);
} catch (err) {
this.logForDebugging(`Exit listener error: ${err}`);
}
}
});
}
private getProcessExitError(
code: number | null,
signal: NodeJS.Signals | null,
): Error | undefined {
if (code !== 0 && code !== null) {
return new Error(`CLI process exited with code ${code}`);
} else if (signal) {
return new Error(`CLI process terminated by signal ${signal}`);
}
return undefined;
}
private buildCliArguments(): string[] {
const args: string[] = [
'--input-format',
'stream-json',
'--output-format',
'stream-json',
];
if (this.options.model) {
args.push('--model', this.options.model);
}
if (this.options.permissionMode) {
args.push('--approval-mode', this.options.permissionMode);
}
if (this.options.maxSessionTurns !== undefined) {
args.push('--max-session-turns', String(this.options.maxSessionTurns));
}
if (this.options.coreTools && this.options.coreTools.length > 0) {
args.push('--core-tools', this.options.coreTools.join(','));
}
if (this.options.excludeTools && this.options.excludeTools.length > 0) {
args.push('--exclude-tools', this.options.excludeTools.join(','));
}
if (this.options.authType) {
args.push('--auth-type', this.options.authType);
}
return args;
}
async close(): Promise<void> {
if (this.childStdin) {
this.childStdin.end();
this.childStdin = null;
}
if (this.processExitHandler) {
process.off('exit', this.processExitHandler);
this.processExitHandler = null;
}
if (this.abortHandler) {
this.abortController.signal.removeEventListener(
'abort',
this.abortHandler,
);
this.abortHandler = null;
}
for (const { handler } of this.exitListeners) {
this.childProcess?.off('close', handler);
}
this.exitListeners = [];
if (this.childProcess && !this.childProcess.killed) {
this.childProcess.kill('SIGTERM');
setTimeout(() => {
if (this.childProcess && !this.childProcess.killed) {
this.childProcess.kill('SIGKILL');
}
}, 5000);
}
this.ready = false;
this.closed = true;
}
async waitForExit(): Promise<void> {
if (!this.childProcess) {
if (this._exitError) {
throw this._exitError;
}
return;
}
if (this.childProcess.exitCode !== null || this.childProcess.killed) {
if (this._exitError) {
throw this._exitError;
}
return;
}
return new Promise<void>((resolve, reject) => {
const exitHandler = (
code: number | null,
signal: NodeJS.Signals | null,
) => {
if (this.abortController.signal.aborted) {
reject(new AbortError('Operation aborted'));
return;
}
const error = this.getProcessExitError(code, signal);
if (error) {
reject(error);
} else {
resolve();
}
};
this.childProcess!.once('close', exitHandler);
const errorHandler = (error: Error) => {
this.childProcess!.off('close', exitHandler);
reject(error);
};
this.childProcess!.once('error', errorHandler);
this.childProcess!.once('close', () => {
this.childProcess!.off('error', errorHandler);
});
});
}
write(message: string): void {
if (this.abortController.signal.aborted) {
throw new AbortError('Cannot write: operation aborted');
}
if (!this.ready || !this.childStdin) {
throw new Error('Transport not ready for writing');
}
if (this.closed) {
throw new Error('Cannot write to closed transport');
}
if (this.childStdin.writableEnded) {
throw new Error('Cannot write to ended stream');
}
if (this.childProcess?.killed || this.childProcess?.exitCode !== null) {
throw new Error('Cannot write to terminated process');
}
if (this._exitError) {
throw new Error(
`Cannot write to process that exited with error: ${this._exitError.message}`,
);
}
if (process.env['DEBUG']) {
this.logForDebugging(
`[ProcessTransport] Writing to stdin (${message.length} bytes): ${message.substring(0, 100)}`,
);
}
try {
const written = this.childStdin.write(message);
if (!written) {
this.logForDebugging(
`[ProcessTransport] Write buffer full (${message.length} bytes), data queued. Waiting for drain event...`,
);
} else if (process.env['DEBUG']) {
this.logForDebugging(
`[ProcessTransport] Write successful (${message.length} bytes)`,
);
}
} catch (error) {
this.ready = false;
throw new Error(
`Failed to write to stdin: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
async *readMessages(): AsyncGenerator<unknown, void, unknown> {
if (!this.childStdout) {
throw new Error('Cannot read messages: process not started');
}
const rl = readline.createInterface({
input: this.childStdout,
crlfDelay: Infinity,
terminal: false,
});
try {
for await (const message of parseJsonLinesStream(
rl,
'ProcessTransport',
)) {
yield message;
}
await this.waitForExit();
} finally {
rl.close();
}
}
get isReady(): boolean {
return this.ready;
}
get exitError(): Error | null {
return this._exitError;
}
onExit(callback: (error?: Error) => void): () => void {
if (!this.childProcess) {
return () => {};
}
const handler = (code: number | null, signal: NodeJS.Signals | null) => {
const error = this.getProcessExitError(code, signal);
callback(error);
};
this.childProcess.on('close', handler);
this.exitListeners.push({ callback, handler });
return () => {
if (this.childProcess) {
this.childProcess.off('close', handler);
}
const index = this.exitListeners.findIndex((l) => l.handler === handler);
if (index !== -1) {
this.exitListeners.splice(index, 1);
}
};
}
endInput(): void {
if (this.childStdin) {
this.childStdin.end();
}
}
getInputStream(): Writable | undefined {
return this.childStdin || undefined;
}
getOutputStream(): Readable | undefined {
return this.childStdout || undefined;
}
private logForDebugging(message: string): void {
if (this.options.debug || process.env['DEBUG']) {
process.stderr.write(`[ProcessTransport] ${message}\n`);
}
if (this.options.stderr) {
this.options.stderr(message);
}
}
}

View File

@@ -0,0 +1,22 @@
/**
* Transport interface for SDK-CLI communication
*
* The Transport abstraction enables communication between SDK and CLI via different mechanisms:
* - ProcessTransport: Local subprocess via stdin/stdout (initial implementation)
* - HttpTransport: Remote CLI via HTTP (future)
* - WebSocketTransport: Remote CLI via WebSocket (future)
*/
export interface Transport {
close(): Promise<void>;
waitForExit(): Promise<void>;
write(message: string): void;
readMessages(): AsyncGenerator<unknown, void, unknown>;
readonly isReady: boolean;
readonly exitError: Error | null;
}

View File

@@ -0,0 +1,17 @@
export class AbortError extends Error {
constructor(message = 'Operation aborted') {
super(message);
this.name = 'AbortError';
Object.setPrototypeOf(this, AbortError.prototype);
}
}
export function isAbortError(error: unknown): error is AbortError {
return (
error instanceof AbortError ||
(typeof error === 'object' &&
error !== null &&
'name' in error &&
error.name === 'AbortError')
);
}

View File

@@ -0,0 +1,560 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
export interface Annotation {
type: string;
value: string;
}
export interface Usage {
input_tokens: number;
output_tokens: number;
cache_creation_input_tokens?: number;
cache_read_input_tokens?: number;
total_tokens?: number;
}
export interface ExtendedUsage extends Usage {
server_tool_use?: {
web_search_requests: number;
};
service_tier?: string;
cache_creation?: {
ephemeral_1h_input_tokens: number;
ephemeral_5m_input_tokens: number;
};
}
export interface ModelUsage {
inputTokens: number;
outputTokens: number;
cacheReadInputTokens: number;
cacheCreationInputTokens: number;
webSearchRequests: number;
contextWindow: number;
}
export interface CLIPermissionDenial {
tool_name: string;
tool_use_id: string;
tool_input: unknown;
}
export interface TextBlock {
type: 'text';
text: string;
annotations?: Annotation[];
}
export interface ThinkingBlock {
type: 'thinking';
thinking: string;
signature?: string;
annotations?: Annotation[];
}
export interface ToolUseBlock {
type: 'tool_use';
id: string;
name: string;
input: unknown;
annotations?: Annotation[];
}
export interface ToolResultBlock {
type: 'tool_result';
tool_use_id: string;
content?: string | ContentBlock[];
is_error?: boolean;
annotations?: Annotation[];
}
export type ContentBlock =
| TextBlock
| ThinkingBlock
| ToolUseBlock
| ToolResultBlock;
export interface APIUserMessage {
role: 'user';
content: string | ContentBlock[];
}
export interface APIAssistantMessage {
id: string;
type: 'message';
role: 'assistant';
model: string;
content: ContentBlock[];
stop_reason?: string | null;
usage: Usage;
}
export interface CLIUserMessage {
type: 'user';
uuid?: string;
session_id: string;
message: APIUserMessage;
parent_tool_use_id: string | null;
options?: Record<string, unknown>;
}
export interface CLIAssistantMessage {
type: 'assistant';
uuid: string;
session_id: string;
message: APIAssistantMessage;
parent_tool_use_id: string | null;
}
export interface CLISystemMessage {
type: 'system';
subtype: string;
uuid: string;
session_id: string;
data?: unknown;
cwd?: string;
tools?: string[];
mcp_servers?: Array<{
name: string;
status: string;
}>;
model?: string;
permissionMode?: string;
slash_commands?: string[];
apiKeySource?: string;
qwen_code_version?: string;
output_style?: string;
agents?: string[];
skills?: string[];
capabilities?: Record<string, unknown>;
compact_metadata?: {
trigger: 'manual' | 'auto';
pre_tokens: number;
};
}
export interface CLIResultMessageSuccess {
type: 'result';
subtype: 'success';
uuid: string;
session_id: string;
is_error: false;
duration_ms: number;
duration_api_ms: number;
num_turns: number;
result: string;
usage: ExtendedUsage;
modelUsage?: Record<string, ModelUsage>;
permission_denials: CLIPermissionDenial[];
[key: string]: unknown;
}
export interface CLIResultMessageError {
type: 'result';
subtype: 'error_max_turns' | 'error_during_execution';
uuid: string;
session_id: string;
is_error: true;
duration_ms: number;
duration_api_ms: number;
num_turns: number;
usage: ExtendedUsage;
modelUsage?: Record<string, ModelUsage>;
permission_denials: CLIPermissionDenial[];
error?: {
type?: string;
message: string;
[key: string]: unknown;
};
[key: string]: unknown;
}
export type CLIResultMessage = CLIResultMessageSuccess | CLIResultMessageError;
export interface MessageStartStreamEvent {
type: 'message_start';
message: {
id: string;
role: 'assistant';
model: string;
};
}
export interface ContentBlockStartEvent {
type: 'content_block_start';
index: number;
content_block: ContentBlock;
}
export type ContentBlockDelta =
| {
type: 'text_delta';
text: string;
}
| {
type: 'thinking_delta';
thinking: string;
}
| {
type: 'input_json_delta';
partial_json: string;
};
export interface ContentBlockDeltaEvent {
type: 'content_block_delta';
index: number;
delta: ContentBlockDelta;
}
export interface ContentBlockStopEvent {
type: 'content_block_stop';
index: number;
}
export interface MessageStopStreamEvent {
type: 'message_stop';
}
export type StreamEvent =
| MessageStartStreamEvent
| ContentBlockStartEvent
| ContentBlockDeltaEvent
| ContentBlockStopEvent
| MessageStopStreamEvent;
export interface CLIPartialAssistantMessage {
type: 'stream_event';
uuid: string;
session_id: string;
event: StreamEvent;
parent_tool_use_id: string | null;
}
export type PermissionMode = 'default' | 'plan' | 'auto-edit' | 'yolo';
/**
* TODO: Align with `ToolCallConfirmationDetails`
*/
export interface PermissionSuggestion {
type: 'allow' | 'deny' | 'modify';
label: string;
description?: string;
modifiedInput?: unknown;
}
export interface HookRegistration {
event: string;
callback_id: string;
}
export interface HookCallbackResult {
shouldSkip?: boolean;
shouldInterrupt?: boolean;
suppressOutput?: boolean;
message?: string;
}
export interface CLIControlInterruptRequest {
subtype: 'interrupt';
}
export interface CLIControlPermissionRequest {
subtype: 'can_use_tool';
tool_name: string;
tool_use_id: string;
input: unknown;
permission_suggestions: PermissionSuggestion[] | null;
blocked_path: string | null;
}
export enum AuthProviderType {
DYNAMIC_DISCOVERY = 'dynamic_discovery',
GOOGLE_CREDENTIALS = 'google_credentials',
SERVICE_ACCOUNT_IMPERSONATION = 'service_account_impersonation',
}
export interface MCPServerConfig {
command?: string;
args?: string[];
env?: Record<string, string>;
cwd?: string;
url?: string;
httpUrl?: string;
headers?: Record<string, string>;
tcp?: string;
timeout?: number;
trust?: boolean;
description?: string;
includeTools?: string[];
excludeTools?: string[];
extensionName?: string;
oauth?: Record<string, unknown>;
authProviderType?: AuthProviderType;
targetAudience?: string;
targetServiceAccount?: string;
}
export interface CLIControlInitializeRequest {
subtype: 'initialize';
hooks?: HookRegistration[] | null;
sdkMcpServers?: Record<string, MCPServerConfig>;
mcpServers?: Record<string, MCPServerConfig>;
agents?: SubagentConfig[];
}
export interface CLIControlSetPermissionModeRequest {
subtype: 'set_permission_mode';
mode: PermissionMode;
}
export interface CLIHookCallbackRequest {
subtype: 'hook_callback';
callback_id: string;
input: unknown;
tool_use_id: string | null;
}
export interface CLIControlMcpMessageRequest {
subtype: 'mcp_message';
server_name: string;
message: {
jsonrpc?: string;
method: string;
params?: Record<string, unknown>;
id?: string | number | null;
};
}
export interface CLIControlSetModelRequest {
subtype: 'set_model';
model: string;
}
export interface CLIControlMcpStatusRequest {
subtype: 'mcp_server_status';
}
export interface CLIControlSupportedCommandsRequest {
subtype: 'supported_commands';
}
export type ControlRequestPayload =
| CLIControlInterruptRequest
| CLIControlPermissionRequest
| CLIControlInitializeRequest
| CLIControlSetPermissionModeRequest
| CLIHookCallbackRequest
| CLIControlMcpMessageRequest
| CLIControlSetModelRequest
| CLIControlMcpStatusRequest
| CLIControlSupportedCommandsRequest;
export interface CLIControlRequest {
type: 'control_request';
request_id: string;
request: ControlRequestPayload;
}
export interface PermissionApproval {
allowed: boolean;
reason?: string;
modifiedInput?: unknown;
}
export interface ControlResponse {
subtype: 'success';
request_id: string;
response: unknown;
}
export interface ControlErrorResponse {
subtype: 'error';
request_id: string;
error: string | { message: string; [key: string]: unknown };
}
export interface CLIControlResponse {
type: 'control_response';
response: ControlResponse | ControlErrorResponse;
}
export interface ControlCancelRequest {
type: 'control_cancel_request';
request_id?: string;
}
export type ControlMessage =
| CLIControlRequest
| CLIControlResponse
| ControlCancelRequest;
/**
* Union of all CLI message types
*/
export type CLIMessage =
| CLIUserMessage
| CLIAssistantMessage
| CLISystemMessage
| CLIResultMessage
| CLIPartialAssistantMessage;
export function isCLIUserMessage(msg: any): msg is CLIUserMessage {
return (
msg && typeof msg === 'object' && msg.type === 'user' && 'message' in msg
);
}
export function isCLIAssistantMessage(msg: any): msg is CLIAssistantMessage {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'assistant' &&
'uuid' in msg &&
'message' in msg &&
'session_id' in msg &&
'parent_tool_use_id' in msg
);
}
export function isCLISystemMessage(msg: any): msg is CLISystemMessage {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'system' &&
'subtype' in msg &&
'uuid' in msg &&
'session_id' in msg
);
}
export function isCLIResultMessage(msg: any): msg is CLIResultMessage {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'result' &&
'subtype' in msg &&
'duration_ms' in msg &&
'is_error' in msg &&
'uuid' in msg &&
'session_id' in msg
);
}
export function isCLIPartialAssistantMessage(
msg: any,
): msg is CLIPartialAssistantMessage {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'stream_event' &&
'uuid' in msg &&
'session_id' in msg &&
'event' in msg &&
'parent_tool_use_id' in msg
);
}
export function isControlRequest(msg: any): msg is CLIControlRequest {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'control_request' &&
'request_id' in msg &&
'request' in msg
);
}
export function isControlResponse(msg: any): msg is CLIControlResponse {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'control_response' &&
'response' in msg
);
}
export function isControlCancel(msg: any): msg is ControlCancelRequest {
return (
msg &&
typeof msg === 'object' &&
msg.type === 'control_cancel_request' &&
'request_id' in msg
);
}
export function isTextBlock(block: any): block is TextBlock {
return block && typeof block === 'object' && block.type === 'text';
}
export function isThinkingBlock(block: any): block is ThinkingBlock {
return block && typeof block === 'object' && block.type === 'thinking';
}
export function isToolUseBlock(block: any): block is ToolUseBlock {
return block && typeof block === 'object' && block.type === 'tool_use';
}
export function isToolResultBlock(block: any): block is ToolResultBlock {
return block && typeof block === 'object' && block.type === 'tool_result';
}
export type SubagentLevel = 'session';
export interface ModelConfig {
model?: string;
temp?: number;
top_p?: number;
}
export interface RunConfig {
max_time_minutes?: number;
max_turns?: number;
}
export interface SubagentConfig {
name: string;
description: string;
tools?: string[];
systemPrompt: string;
level: SubagentLevel;
filePath: string;
modelConfig?: Partial<ModelConfig>;
runConfig?: Partial<RunConfig>;
color?: string;
readonly isBuiltin?: boolean;
}
/**
* @license
* Copyright 2025 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
/**
* Control Request Types
*
* Centralized enum for all control request subtypes supported by the CLI.
* This enum should be kept in sync with the controllers in:
* - packages/cli/src/services/control/controllers/systemController.ts
* - packages/cli/src/services/control/controllers/permissionController.ts
* - packages/cli/src/services/control/controllers/mcpController.ts
* - packages/cli/src/services/control/controllers/hookController.ts
*/
export enum ControlRequestType {
// SystemController requests
INITIALIZE = 'initialize',
INTERRUPT = 'interrupt',
SET_MODEL = 'set_model',
SUPPORTED_COMMANDS = 'supported_commands',
// PermissionController requests
CAN_USE_TOOL = 'can_use_tool',
SET_PERMISSION_MODE = 'set_permission_mode',
// MCPController requests
MCP_MESSAGE = 'mcp_message',
MCP_SERVER_STATUS = 'mcp_server_status',
// HookController requests
HOOK_CALLBACK = 'hook_callback',
}

View File

@@ -0,0 +1,86 @@
import { z } from 'zod';
import type { CanUseTool } from './types.js';
import type { SubagentConfig } from './protocol.js';
export const ExternalMcpServerConfigSchema = z.object({
command: z.string().min(1, 'Command must be a non-empty string'),
args: z.array(z.string()).optional(),
env: z.record(z.string(), z.string()).optional(),
});
export const SdkMcpServerConfigSchema = z.object({
connect: z.custom<(transport: unknown) => Promise<void>>(
(val) => typeof val === 'function',
{ message: 'connect must be a function' },
),
});
export const ModelConfigSchema = z.object({
model: z.string().optional(),
temp: z.number().optional(),
top_p: z.number().optional(),
});
export const RunConfigSchema = z.object({
max_time_minutes: z.number().optional(),
max_turns: z.number().optional(),
});
export const SubagentConfigSchema = z.object({
name: z.string().min(1, 'Name must be a non-empty string'),
description: z.string().min(1, 'Description must be a non-empty string'),
tools: z.array(z.string()).optional(),
systemPrompt: z.string().min(1, 'System prompt must be a non-empty string'),
filePath: z.string().min(1, 'File path must be a non-empty string'),
modelConfig: ModelConfigSchema.partial().optional(),
runConfig: RunConfigSchema.partial().optional(),
color: z.string().optional(),
isBuiltin: z.boolean().optional(),
});
export const QueryOptionsSchema = z
.object({
cwd: z.string().optional(),
model: z.string().optional(),
pathToQwenExecutable: z.string().optional(),
env: z.record(z.string(), z.string()).optional(),
permissionMode: z.enum(['default', 'plan', 'auto-edit', 'yolo']).optional(),
canUseTool: z
.custom<CanUseTool>((val) => typeof val === 'function', {
message: 'canUseTool must be a function',
})
.optional(),
mcpServers: z.record(z.string(), ExternalMcpServerConfigSchema).optional(),
sdkMcpServers: z.record(z.string(), SdkMcpServerConfigSchema).optional(),
abortController: z.instanceof(AbortController).optional(),
debug: z.boolean().optional(),
stderr: z
.custom<
(message: string) => void
>((val) => typeof val === 'function', { message: 'stderr must be a function' })
.optional(),
maxSessionTurns: z.number().optional(),
coreTools: z.array(z.string()).optional(),
excludeTools: z.array(z.string()).optional(),
authType: z.enum(['openai', 'qwen-oauth']).optional(),
agents: z
.array(
z.custom<SubagentConfig>(
(val) =>
val &&
typeof val === 'object' &&
'name' in val &&
'description' in val &&
'systemPrompt' in val &&
'filePath' in val,
{ message: 'agents must be an array of SubagentConfig objects' },
),
)
.optional(),
})
.strict();
export type ExternalMcpServerConfig = z.infer<
typeof ExternalMcpServerConfigSchema
>;
export type QueryOptions = z.infer<typeof QueryOptionsSchema>;

View File

@@ -0,0 +1,57 @@
import type { PermissionMode, PermissionSuggestion } from './protocol.js';
import type { ExternalMcpServerConfig } from './queryOptionsSchema.js';
export type { PermissionMode };
export type JSONSchema = {
type: string;
properties?: Record<string, unknown>;
required?: string[];
description?: string;
[key: string]: unknown;
};
export type ToolDefinition<TInput = unknown, TOutput = unknown> = {
name: string;
description: string;
inputSchema: JSONSchema;
handler: (input: TInput) => Promise<TOutput>;
};
export type TransportOptions = {
pathToQwenExecutable: string;
cwd?: string;
model?: string;
permissionMode?: PermissionMode;
mcpServers?: Record<string, ExternalMcpServerConfig>;
env?: Record<string, string>;
abortController?: AbortController;
debug?: boolean;
stderr?: (message: string) => void;
maxSessionTurns?: number;
coreTools?: string[];
excludeTools?: string[];
authType?: string;
};
type ToolInput = Record<string, unknown>;
export type CanUseTool = (
toolName: string,
input: ToolInput,
options: {
signal: AbortSignal;
suggestions?: PermissionSuggestion[] | null;
},
) => Promise<PermissionResult>;
export type PermissionResult =
| {
behavior: 'allow';
updatedInput: ToolInput;
}
| {
behavior: 'deny';
message: string;
interrupt?: boolean;
};

View File

@@ -0,0 +1,91 @@
/**
* Async iterable queue for streaming messages between producer and consumer.
*/
export class Stream<T> implements AsyncIterable<T> {
private returned: (() => void) | undefined;
private queue: T[] = [];
private readResolve: ((result: IteratorResult<T>) => void) | undefined;
private readReject: ((error: Error) => void) | undefined;
private isDone = false;
hasError: Error | undefined;
private started = false;
constructor(returned?: () => void) {
this.returned = returned;
}
[Symbol.asyncIterator](): AsyncIterator<T> {
if (this.started) {
throw new Error('Stream can only be iterated once');
}
this.started = true;
return this;
}
async next(): Promise<IteratorResult<T>> {
// Check queue first - if there are queued items, return immediately
if (this.queue.length > 0) {
return Promise.resolve({
done: false,
value: this.queue.shift()!,
});
}
// Check if stream is done
if (this.isDone) {
return Promise.resolve({ done: true, value: undefined });
}
// Check for errors that occurred before next() was called
// This ensures errors set via error() before iteration starts are properly rejected
if (this.hasError) {
return Promise.reject(this.hasError);
}
// No queued items, not done, no error - set up promise for next value/error
return new Promise<IteratorResult<T>>((resolve, reject) => {
this.readResolve = resolve;
this.readReject = reject;
});
}
enqueue(value: T): void {
if (this.readResolve) {
const resolve = this.readResolve;
this.readResolve = undefined;
this.readReject = undefined;
resolve({ done: false, value });
} else {
this.queue.push(value);
}
}
done(): void {
this.isDone = true;
if (this.readResolve) {
const resolve = this.readResolve;
this.readResolve = undefined;
this.readReject = undefined;
resolve({ done: true, value: undefined });
}
}
error(error: Error): void {
this.hasError = error;
// If readReject exists (next() has been called), reject immediately
if (this.readReject) {
const reject = this.readReject;
this.readResolve = undefined;
this.readReject = undefined;
reject(error);
}
// Otherwise, error is stored in hasError and will be rejected when next() is called
// This handles the case where error() is called before the first next() call
}
return(): Promise<IteratorResult<T>> {
this.isDone = true;
if (this.returned) {
this.returned();
}
return Promise.resolve({ done: true, value: undefined });
}
}

View File

@@ -0,0 +1,365 @@
/**
* CLI path auto-detection and subprocess spawning utilities
*
* Supports multiple execution modes:
* 1. Native binary: 'qwen' (production)
* 2. Node.js bundle: 'node /path/to/cli.js' (production validation)
* 3. Bun bundle: 'bun /path/to/cli.js' (alternative runtime)
* 4. TypeScript source: 'tsx /path/to/index.ts' (development)
*
* Auto-detection locations for native binary:
* 1. QWEN_CODE_CLI_PATH environment variable
* 2. ~/.volta/bin/qwen
* 3. ~/.npm-global/bin/qwen
* 4. /usr/local/bin/qwen
* 5. ~/.local/bin/qwen
* 6. ~/node_modules/.bin/qwen
* 7. ~/.yarn/bin/qwen
*/
import * as fs from 'node:fs';
import * as path from 'node:path';
import { execSync } from 'node:child_process';
/**
* Executable types supported by the SDK
*/
export type ExecutableType = 'native' | 'node' | 'bun' | 'tsx' | 'deno';
/**
* Spawn information for CLI process
*/
export type SpawnInfo = {
/** Command to execute (e.g., 'qwen', 'node', 'bun', 'tsx') */
command: string;
/** Arguments to pass to command */
args: string[];
/** Type of executable detected */
type: ExecutableType;
/** Original input that was resolved */
originalInput: string;
};
export function findNativeCliPath(): string {
const homeDir = process.env['HOME'] || process.env['USERPROFILE'] || '';
const candidates: Array<string | undefined> = [
// 1. Environment variable (highest priority)
process.env['QWEN_CODE_CLI_PATH'],
// 2. Volta bin
path.join(homeDir, '.volta', 'bin', 'qwen'),
// 3. Global npm installations
path.join(homeDir, '.npm-global', 'bin', 'qwen'),
// 4. Common Unix binary locations
'/usr/local/bin/qwen',
// 5. User local bin
path.join(homeDir, '.local', 'bin', 'qwen'),
// 6. Node modules bin in home directory
path.join(homeDir, 'node_modules', '.bin', 'qwen'),
// 7. Yarn global bin
path.join(homeDir, '.yarn', 'bin', 'qwen'),
];
// Find first existing candidate
for (const candidate of candidates) {
if (candidate && fs.existsSync(candidate)) {
return path.resolve(candidate);
}
}
// Not found - throw helpful error
throw new Error(
'qwen CLI not found. Please:\n' +
' 1. Install qwen globally: npm install -g qwen\n' +
' 2. Or provide explicit executable: query({ pathToQwenExecutable: "/path/to/qwen" })\n' +
' 3. Or set environment variable: QWEN_CODE_CLI_PATH="/path/to/qwen"\n' +
'\n' +
'For development/testing, you can also use:\n' +
' • TypeScript source: query({ pathToQwenExecutable: "/path/to/index.ts" })\n' +
' • Node.js bundle: query({ pathToQwenExecutable: "/path/to/cli.js" })\n' +
' • Force specific runtime: query({ pathToQwenExecutable: "bun:/path/to/cli.js" })',
);
}
function isCommandAvailable(command: string): boolean {
try {
// Use 'which' on Unix-like systems, 'where' on Windows
const whichCommand = process.platform === 'win32' ? 'where' : 'which';
execSync(`${whichCommand} ${command}`, {
stdio: 'ignore',
timeout: 5000, // 5 second timeout
});
return true;
} catch {
return false;
}
}
function validateRuntimeAvailability(runtime: string): boolean {
// Node.js is always available since we're running in Node.js
if (runtime === 'node') {
return true;
}
// Check if the runtime command is available in PATH
return isCommandAvailable(runtime);
}
function validateFileExtensionForRuntime(
filePath: string,
runtime: string,
): boolean {
const ext = path.extname(filePath).toLowerCase();
switch (runtime) {
case 'node':
case 'bun':
return ['.js', '.mjs', '.cjs'].includes(ext);
case 'tsx':
return ['.ts', '.tsx'].includes(ext);
case 'deno':
return ['.ts', '.tsx', '.js', '.mjs'].includes(ext);
default:
return true; // Unknown runtime, let it pass
}
}
/**
* Parse executable specification into components with comprehensive validation
*
* Supports multiple formats:
* - 'qwen' -> native binary (auto-detected)
* - '/path/to/qwen' -> native binary (explicit path)
* - '/path/to/cli.js' -> Node.js bundle (default for .js files)
* - '/path/to/index.ts' -> TypeScript source (requires tsx)
*
* Advanced runtime specification (for overriding defaults):
* - 'bun:/path/to/cli.js' -> Force Bun runtime
* - 'node:/path/to/cli.js' -> Force Node.js runtime
* - 'tsx:/path/to/index.ts' -> Force tsx runtime
* - 'deno:/path/to/cli.ts' -> Force Deno runtime
*
* @param executableSpec - Executable specification
* @returns Parsed executable information
* @throws Error if specification is invalid or files don't exist
*/
export function parseExecutableSpec(executableSpec?: string): {
runtime?: string;
executablePath: string;
isExplicitRuntime: boolean;
} {
// Handle empty string case first (before checking for undefined/null)
if (
executableSpec === '' ||
(executableSpec && executableSpec.trim() === '')
) {
throw new Error('Command name cannot be empty');
}
if (!executableSpec) {
// Auto-detect native CLI
return {
executablePath: findNativeCliPath(),
isExplicitRuntime: false,
};
}
// Check for runtime prefix (e.g., 'bun:/path/to/cli.js')
const runtimeMatch = executableSpec.match(/^([^:]+):(.+)$/);
if (runtimeMatch) {
const [, runtime, filePath] = runtimeMatch;
if (!runtime || !filePath) {
throw new Error(`Invalid runtime specification: '${executableSpec}'`);
}
// Validate runtime is supported
const supportedRuntimes = ['node', 'bun', 'tsx', 'deno'];
if (!supportedRuntimes.includes(runtime)) {
throw new Error(
`Unsupported runtime '${runtime}'. Supported runtimes: ${supportedRuntimes.join(', ')}`,
);
}
// Validate runtime availability
if (!validateRuntimeAvailability(runtime)) {
throw new Error(
`Runtime '${runtime}' is not available on this system. Please install it first.`,
);
}
const resolvedPath = path.resolve(filePath);
// Validate file exists
if (!fs.existsSync(resolvedPath)) {
throw new Error(
`Executable file not found at '${resolvedPath}' for runtime '${runtime}'. ` +
'Please check the file path and ensure the file exists.',
);
}
// Validate file extension matches runtime
if (!validateFileExtensionForRuntime(resolvedPath, runtime)) {
const ext = path.extname(resolvedPath);
throw new Error(
`File extension '${ext}' is not compatible with runtime '${runtime}'. ` +
`Expected extensions for ${runtime}: ${getExpectedExtensions(runtime).join(', ')}`,
);
}
return {
runtime,
executablePath: resolvedPath,
isExplicitRuntime: true,
};
}
// Check if it's a command name (no path separators) or a file path
const isCommandName =
!executableSpec.includes('/') && !executableSpec.includes('\\');
if (isCommandName) {
// It's a command name like 'qwen' - validate it's a reasonable command name
if (!executableSpec || executableSpec.trim() === '') {
throw new Error('Command name cannot be empty');
}
// Basic validation for command names
if (!/^[a-zA-Z0-9._-]+$/.test(executableSpec)) {
throw new Error(
`Invalid command name '${executableSpec}'. Command names should only contain letters, numbers, dots, hyphens, and underscores.`,
);
}
return {
executablePath: executableSpec,
isExplicitRuntime: false,
};
}
// It's a file path - validate and resolve
const resolvedPath = path.resolve(executableSpec);
if (!fs.existsSync(resolvedPath)) {
throw new Error(
`Executable file not found at '${resolvedPath}'. ` +
'Please check the file path and ensure the file exists. ' +
'You can also:\n' +
' • Set QWEN_CODE_CLI_PATH environment variable\n' +
' • Install qwen globally: npm install -g qwen\n' +
' • For TypeScript files, ensure tsx is installed: npm install -g tsx\n' +
' • Force specific runtime: bun:/path/to/cli.js or tsx:/path/to/index.ts',
);
}
// Additional validation for file paths
const stats = fs.statSync(resolvedPath);
if (!stats.isFile()) {
throw new Error(
`Path '${resolvedPath}' exists but is not a file. Please provide a path to an executable file.`,
);
}
return {
executablePath: resolvedPath,
isExplicitRuntime: false,
};
}
function getExpectedExtensions(runtime: string): string[] {
switch (runtime) {
case 'node':
case 'bun':
return ['.js', '.mjs', '.cjs'];
case 'tsx':
return ['.ts', '.tsx'];
case 'deno':
return ['.ts', '.tsx', '.js', '.mjs'];
default:
return [];
}
}
/**
* @deprecated Use parseExecutableSpec and prepareSpawnInfo instead
*/
export function resolveCliPath(explicitPath?: string): string {
const parsed = parseExecutableSpec(explicitPath);
return parsed.executablePath;
}
function detectRuntimeFromExtension(filePath: string): string | undefined {
const ext = path.extname(filePath).toLowerCase();
if (['.js', '.mjs', '.cjs'].includes(ext)) {
// Default to Node.js for JavaScript files
return 'node';
}
if (['.ts', '.tsx'].includes(ext)) {
// Check if tsx is available for TypeScript files
if (isCommandAvailable('tsx')) {
return 'tsx';
}
// If tsx is not available, suggest it in error message
throw new Error(
`TypeScript file '${filePath}' requires 'tsx' runtime, but it's not available. ` +
'Please install tsx: npm install -g tsx, or use explicit runtime: tsx:/path/to/file.ts',
);
}
// Native executable or unknown extension
return undefined;
}
export function prepareSpawnInfo(executableSpec?: string): SpawnInfo {
const parsed = parseExecutableSpec(executableSpec);
const { runtime, executablePath, isExplicitRuntime } = parsed;
// If runtime is explicitly specified, use it
if (isExplicitRuntime && runtime) {
const runtimeCommand = runtime === 'node' ? process.execPath : runtime;
return {
command: runtimeCommand,
args: [executablePath],
type: runtime as ExecutableType,
originalInput: executableSpec || '',
};
}
// If no explicit runtime, try to detect from file extension
const detectedRuntime = detectRuntimeFromExtension(executablePath);
if (detectedRuntime) {
const runtimeCommand =
detectedRuntime === 'node' ? process.execPath : detectedRuntime;
return {
command: runtimeCommand,
args: [executablePath],
type: detectedRuntime as ExecutableType,
originalInput: executableSpec || '',
};
}
// Native executable or command name - use it directly
return {
command: executablePath,
args: [],
type: 'native',
originalInput: executableSpec || '',
};
}
/**
* @deprecated Use prepareSpawnInfo() instead
*/
export function findCliPath(): string {
return findNativeCliPath();
}

View File

@@ -0,0 +1,65 @@
export function serializeJsonLine(message: unknown): string {
try {
return JSON.stringify(message) + '\n';
} catch (error) {
throw new Error(
`Failed to serialize message to JSON: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
export function parseJsonLineSafe(
line: string,
context = 'JsonLines',
): unknown | null {
try {
return JSON.parse(line);
} catch (error) {
console.warn(
`[${context}] Failed to parse JSON line, skipping:`,
line.substring(0, 100),
error instanceof Error ? error.message : String(error),
);
return null;
}
}
export function isValidMessage(message: unknown): boolean {
return (
message !== null &&
typeof message === 'object' &&
'type' in message &&
typeof (message as { type: unknown }).type === 'string'
);
}
export async function* parseJsonLinesStream(
lines: AsyncIterable<string>,
context = 'JsonLines',
): AsyncGenerator<unknown, void, unknown> {
for await (const line of lines) {
// Skip empty lines
if (line.trim().length === 0) {
continue;
}
// Parse with error handling
const message = parseJsonLineSafe(line, context);
// Skip malformed messages
if (message === null) {
continue;
}
// Validate message structure
if (!isValidMessage(message)) {
console.warn(
`[${context}] Invalid message structure (missing 'type' field), skipping:`,
line.substring(0, 100),
);
continue;
}
yield message;
}
}