refactor: nonInteractive mode framework

This commit is contained in:
mingholy.lmh
2025-11-02 09:14:02 +08:00
parent 8034fd5f82
commit f0e0c13300
46 changed files with 7013 additions and 3558 deletions

View File

@@ -101,7 +101,8 @@ export class Query implements AsyncIterable<CLIMessage> {
this.options = options;
this.sessionId = randomUUID();
this.inputStream = new Stream<CLIMessage>();
this.abortController = new AbortController();
// Use provided abortController or create a new one
this.abortController = options.abortController ?? new AbortController();
this.isSingleTurn = options.singleTurn ?? false;
// Setup first result tracking
@@ -109,10 +110,16 @@ export class Query implements AsyncIterable<CLIMessage> {
this.firstResultReceivedResolve = resolve;
});
// Handle external abort signal
if (options.signal) {
options.signal.addEventListener('abort', () => {
this.abortController.abort();
// Handle abort signal if controller is provided and already aborted or will be aborted
if (this.abortController.signal.aborted) {
// Already aborted - set error immediately
this.inputStream.setError(new AbortError('Query aborted by user'));
this.close().catch((err) => {
console.error('[Query] Error during abort cleanup:', err);
});
} else {
// Listen for abort events on the controller's signal
this.abortController.signal.addEventListener('abort', () => {
// Set abort error on the stream before closing
this.inputStream.setError(new AbortError('Query aborted by user'));
this.close().catch((err) => {
@@ -350,7 +357,7 @@ export class Query implements AsyncIterable<CLIMessage> {
case 'can_use_tool':
response = (await this.handlePermissionRequest(
payload.tool_name,
payload.input,
payload.input as Record<string, unknown>,
payload.permission_suggestions,
requestAbortController.signal,
)) as unknown as Record<string, unknown>;
@@ -530,9 +537,14 @@ export class Query implements AsyncIterable<CLIMessage> {
// Resolve or reject based on response type
if (payload.subtype === 'success') {
pending.resolve(payload.response);
pending.resolve(payload.response as Record<string, unknown> | null);
} else {
pending.reject(new Error(payload.error ?? 'Unknown error'));
// Extract error message from error field (can be string or object)
const errorMessage =
typeof payload.error === 'string'
? payload.error
: (payload.error?.message ?? 'Unknown error');
pending.reject(new Error(errorMessage));
}
}
@@ -764,6 +776,7 @@ export class Query implements AsyncIterable<CLIMessage> {
} catch (error) {
// Check if aborted - if so, set abort error on stream
if (this.abortController.signal.aborted) {
console.log('[Query] Aborted during input streaming');
this.inputStream.setError(
new AbortError('Query aborted during input streaming'),
);

View File

@@ -11,7 +11,7 @@ import type {
ExternalMcpServerConfig,
} from '../types/config.js';
import { ProcessTransport } from '../transport/ProcessTransport.js';
import { resolveCliPath, parseExecutableSpec } from '../utils/cliPath.js';
import { parseExecutableSpec } from '../utils/cliPath.js';
import { Query } from './Query.js';
/**
@@ -29,7 +29,7 @@ export type QueryOptions = {
string,
{ connect: (transport: unknown) => Promise<void> }
>;
signal?: AbortSignal;
abortController?: AbortController;
debug?: boolean;
stderr?: (message: string) => void;
};
@@ -60,8 +60,8 @@ export function query({
prompt: string | AsyncIterable<CLIUserMessage>;
options?: QueryOptions;
}): Query {
// Validate options
validateOptions(options);
// Validate options and obtain normalized executable metadata
const parsedExecutable = validateOptions(options);
// Determine if this is a single-turn or multi-turn query
// Single-turn: string prompt (simple Q&A)
@@ -74,13 +74,14 @@ export function query({
singleTurn: isSingleTurn,
};
// Resolve CLI path (auto-detect if not provided)
const pathToQwenExecutable = resolveCliPath(options.pathToQwenExecutable);
// Resolve CLI specification while preserving explicit runtime directives
const pathToQwenExecutable =
options.pathToQwenExecutable ?? parsedExecutable.executablePath;
// Pass signal to transport (it will handle AbortController internally)
const signal = options.signal;
// Use provided abortController or create a new one
const abortController = options.abortController ?? new AbortController();
// Create transport
// Create transport with abortController
const transport = new ProcessTransport({
pathToQwenExecutable,
cwd: options.cwd,
@@ -88,13 +89,19 @@ export function query({
permissionMode: options.permissionMode,
mcpServers: options.mcpServers,
env: options.env,
signal,
abortController,
debug: options.debug,
stderr: options.stderr,
});
// Build query options with abortController
const finalQueryOptions: CreateQueryOptions = {
...queryOptions,
abortController,
};
// Create Query
const queryInstance = new Query(transport, queryOptions);
const queryInstance = new Query(transport, finalQueryOptions);
// Handle prompt based on type
if (isSingleTurn) {
@@ -110,10 +117,8 @@ export function query({
parent_tool_use_id: null,
};
// Send message after query is initialized
(async () => {
try {
// Wait a bit for initialization to complete
await new Promise((resolve) => setTimeout(resolve, 0));
transport.write(serializeJsonLine(message));
} catch (err) {
@@ -139,9 +144,20 @@ export function query({
export const createQuery = query;
/**
* Validates query configuration options.
* Validate query configuration options and normalize CLI executable details.
*
* Performs strict validation for each supported option, including
* permission mode, callbacks, AbortController usage, and executable spec.
* Returns the parsed executable description so callers can retain
* explicit runtime directives (e.g., `bun:/path/to/cli.js`) while still
* benefiting from early validation and auto-detection fallbacks when the
* specification is omitted.
*/
function validateOptions(options: QueryOptions): void {
function validateOptions(
options: QueryOptions,
): ReturnType<typeof parseExecutableSpec> {
let parsedExecutable: ReturnType<typeof parseExecutableSpec>;
// Validate permission mode if provided
if (options.permissionMode) {
const validModes = ['default', 'plan', 'auto-edit', 'yolo'];
@@ -157,14 +173,17 @@ function validateOptions(options: QueryOptions): void {
throw new Error('canUseTool must be a function');
}
// Validate signal is AbortSignal if provided
if (options.signal && !(options.signal instanceof AbortSignal)) {
throw new Error('signal must be an AbortSignal instance');
// Validate abortController is AbortController if provided
if (
options.abortController &&
!(options.abortController instanceof AbortController)
) {
throw new Error('abortController must be an AbortController instance');
}
// Validate executable path early to provide clear error messages
try {
parseExecutableSpec(options.pathToQwenExecutable);
parsedExecutable = parseExecutableSpec(options.pathToQwenExecutable);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
throw new Error(`Invalid pathToQwenExecutable: ${errorMessage}`);
@@ -182,4 +201,6 @@ function validateOptions(options: QueryOptions): void {
);
}
}
return parsedExecutable;
}

View File

@@ -43,7 +43,6 @@ export class ProcessTransport implements Transport {
private cleanupCallbacks: Array<() => void> = [];
private closed = false;
private abortController: AbortController | null = null;
private abortHandler: (() => void) | null = null;
private exitListeners: ExitListener[] = [];
constructor(options: TransportOptions) {
@@ -58,26 +57,26 @@ export class ProcessTransport implements Transport {
return; // Already started
}
// Use provided abortController or create a new one
this.abortController =
this.options.abortController ?? new AbortController();
// Check if already aborted
if (this.options.signal?.aborted) {
throw new AbortError('Transport start aborted by signal');
if (this.abortController.signal.aborted) {
throw new AbortError('Transport start aborted');
}
const cliArgs = this.buildCliArguments();
const cwd = this.options.cwd ?? process.cwd();
const env = { ...process.env, ...this.options.env };
// Setup internal AbortController if signal provided
if (this.options.signal) {
this.abortController = new AbortController();
this.abortHandler = () => {
this.logForDebugging('Transport aborted by user signal');
this._exitError = new AbortError('Operation aborted by user');
this._isReady = false;
void this.close();
};
this.options.signal.addEventListener('abort', this.abortHandler);
}
// Setup abort handler
this.abortController.signal.addEventListener('abort', () => {
this.logForDebugging('Transport aborted by user');
this._exitError = new AbortError('Operation aborted by user');
this._isReady = false;
void this.close();
});
// Create exit promise
this.exitPromise = new Promise<void>((resolve) => {
@@ -103,8 +102,8 @@ export class ProcessTransport implements Transport {
cwd,
env,
stdio: ['pipe', 'pipe', stderrMode],
// Use internal AbortController signal if available
signal: this.abortController?.signal,
// Use AbortController signal
signal: this.abortController.signal,
},
);
@@ -138,10 +137,7 @@ export class ProcessTransport implements Transport {
// Handle process errors
this.childProcess.on('error', (error) => {
if (
this.options.signal?.aborted ||
this.abortController?.signal.aborted
) {
if (this.abortController?.signal.aborted) {
this._exitError = new AbortError('CLI process aborted by user');
} else {
this._exitError = new Error(`CLI process error: ${error.message}`);
@@ -155,10 +151,7 @@ export class ProcessTransport implements Transport {
this._isReady = false;
// Check if aborted
if (
this.options.signal?.aborted ||
this.abortController?.signal.aborted
) {
if (this.abortController?.signal.aborted) {
this._exitError = new AbortError('CLI process aborted by user');
} else if (code !== null && code !== 0 && !this.closed) {
this._exitError = new Error(`CLI process exited with code ${code}`);
@@ -243,12 +236,6 @@ export class ProcessTransport implements Transport {
this.closed = true;
this._isReady = false;
// Clean up abort handler
if (this.abortHandler && this.options.signal) {
this.options.signal.removeEventListener('abort', this.abortHandler);
this.abortHandler = null;
}
// Clean up exit listeners
for (const { handler } of this.exitListeners) {
this.childProcess?.off('exit', handler);
@@ -292,7 +279,7 @@ export class ProcessTransport implements Transport {
*/
write(message: string): void {
// Check abort status
if (this.options.signal?.aborted) {
if (this.abortController?.signal.aborted) {
throw new AbortError('Cannot write: operation aborted');
}
@@ -423,10 +410,7 @@ export class ProcessTransport implements Transport {
const handler = (code: number | null, signal: NodeJS.Signals | null) => {
let error: Error | undefined;
if (
this.options.signal?.aborted ||
this.abortController?.signal.aborted
) {
if (this.abortController?.signal.aborted) {
error = new AbortError('Process aborted by user');
} else if (code !== null && code !== 0) {
error = new Error(`Process exited with code ${code}`);

View File

@@ -112,8 +112,8 @@ export type CreateQueryOptions = {
singleTurn?: boolean;
// Advanced options
/** AbortSignal for cancellation support */
signal?: AbortSignal;
/** AbortController for cancellation support */
abortController?: AbortController;
/** Enable debug output (inherits stderr) */
debug?: boolean;
/** Callback for stderr output */
@@ -136,8 +136,8 @@ export type TransportOptions = {
mcpServers?: Record<string, ExternalMcpServerConfig>;
/** Environment variables */
env?: Record<string, string>;
/** AbortSignal for cancellation support */
signal?: AbortSignal;
/** AbortController for cancellation support */
abortController?: AbortController;
/** Enable debug output */
debug?: boolean;
/** Callback for stderr output */

View File

@@ -34,16 +34,16 @@ describe('AbortController and Process Lifecycle (E2E)', () => {
async () => {
const controller = new AbortController();
// Abort after 2 seconds
// Abort after 5 seconds
setTimeout(() => {
controller.abort();
}, 2000);
}, 5000);
const q = query({
prompt: 'Write a very long story about TypeScript programming',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});
@@ -84,13 +84,16 @@ describe('AbortController and Process Lifecycle (E2E)', () => {
prompt: 'Write a very long essay',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
debug: false,
abortController: controller,
debug: true,
},
});
// Abort immediately
setTimeout(() => controller.abort(), 100);
setTimeout(() => {
controller.abort();
console.log('Aborted!');
}, 300);
try {
for await (const _message of q) {
@@ -266,7 +269,7 @@ describe('AbortController and Process Lifecycle (E2E)', () => {
prompt: 'Write a long story',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});
@@ -369,7 +372,7 @@ describe('AbortController and Process Lifecycle (E2E)', () => {
prompt: 'Write a very long essay about programming',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});
@@ -404,7 +407,7 @@ describe('AbortController and Process Lifecycle (E2E)', () => {
prompt: 'Count to 100',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});
@@ -464,7 +467,7 @@ describe('AbortController and Process Lifecycle (E2E)', () => {
prompt: 'Hello',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});

View File

@@ -63,56 +63,50 @@ function getMessageType(message: CLIMessage | ControlMessage): string {
describe('Basic Usage (E2E)', () => {
describe('Message Type Recognition', () => {
it(
'should correctly identify message types using type guards',
async () => {
const q = query({
prompt:
'What files are in the current directory? List only the top-level files and folders.',
options: {
...SHARED_TEST_OPTIONS,
cwd: process.cwd(),
debug: false,
},
});
it('should correctly identify message types using type guards', async () => {
const q = query({
prompt:
'What files are in the current directory? List only the top-level files and folders.',
options: {
...SHARED_TEST_OPTIONS,
cwd: process.cwd(),
debug: true,
},
});
const messages: CLIMessage[] = [];
const messageTypes: string[] = [];
const messages: CLIMessage[] = [];
const messageTypes: string[] = [];
try {
for await (const message of q) {
messages.push(message);
const messageType = getMessageType(message);
messageTypes.push(messageType);
try {
for await (const message of q) {
messages.push(message);
const messageType = getMessageType(message);
messageTypes.push(messageType);
if (isCLIResultMessage(message)) {
break;
}
if (isCLIResultMessage(message)) {
break;
}
expect(messages.length).toBeGreaterThan(0);
expect(messageTypes.length).toBe(messages.length);
// Should have at least assistant and result messages
expect(messageTypes.some((type) => type.includes('ASSISTANT'))).toBe(
true,
);
expect(messageTypes.some((type) => type.includes('RESULT'))).toBe(
true,
);
// Verify type guards work correctly
const assistantMessages = messages.filter(isCLIAssistantMessage);
const resultMessages = messages.filter(isCLIResultMessage);
expect(assistantMessages.length).toBeGreaterThan(0);
expect(resultMessages.length).toBeGreaterThan(0);
} finally {
await q.close();
}
},
TEST_TIMEOUT,
);
expect(messages.length).toBeGreaterThan(0);
expect(messageTypes.length).toBe(messages.length);
// Should have at least assistant and result messages
expect(messageTypes.some((type) => type.includes('ASSISTANT'))).toBe(
true,
);
expect(messageTypes.some((type) => type.includes('RESULT'))).toBe(true);
// Verify type guards work correctly
const assistantMessages = messages.filter(isCLIAssistantMessage);
const resultMessages = messages.filter(isCLIResultMessage);
expect(assistantMessages.length).toBeGreaterThan(0);
expect(resultMessages.length).toBeGreaterThan(0);
} finally {
await q.close();
}
});
it(
'should handle message content extraction',
@@ -121,7 +115,7 @@ describe('Basic Usage (E2E)', () => {
prompt: 'Say hello and explain what you are',
options: {
...SHARED_TEST_OPTIONS,
debug: false,
debug: true,
},
});

View File

@@ -135,8 +135,6 @@ describe('Multi-Turn Conversations (E2E)', () => {
if (isCLIAssistantMessage(message)) {
assistantMessages.push(message);
const text = extractText(message.message.content);
expect(text.length).toBeGreaterThan(0);
turnCount++;
}
}

View File

@@ -141,7 +141,7 @@ describe('Simple Query Execution (E2E)', () => {
'should complete iteration after result',
async () => {
const q = query({
prompt: 'Test completion',
prompt: 'Hello, who are you?',
options: {
...SHARED_TEST_OPTIONS,
debug: false,
@@ -475,7 +475,7 @@ describe('Simple Query Execution (E2E)', () => {
prompt: 'Write a very long story about TypeScript',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});
@@ -505,7 +505,7 @@ describe('Simple Query Execution (E2E)', () => {
prompt: 'Write a very long essay',
options: {
...SHARED_TEST_OPTIONS,
signal: controller.signal,
abortController: controller,
debug: false,
},
});