mirror of
https://github.com/QwenLM/qwen-code.git
synced 2025-12-20 16:57:46 +00:00
Headless enhancement: add stream-json as input-format/output-format to support programmatically use (#926)
This commit is contained in:
@@ -15,14 +15,16 @@ import {
|
||||
FatalInputError,
|
||||
promptIdContext,
|
||||
OutputFormat,
|
||||
JsonFormatter,
|
||||
uiTelemetryService,
|
||||
} from '@qwen-code/qwen-code-core';
|
||||
|
||||
import type { Content, Part } from '@google/genai';
|
||||
import type { Content, Part, PartListUnion } from '@google/genai';
|
||||
import type { CLIUserMessage, PermissionMode } from './nonInteractive/types.js';
|
||||
import type { JsonOutputAdapterInterface } from './nonInteractive/io/BaseJsonOutputAdapter.js';
|
||||
import { JsonOutputAdapter } from './nonInteractive/io/JsonOutputAdapter.js';
|
||||
import { StreamJsonOutputAdapter } from './nonInteractive/io/StreamJsonOutputAdapter.js';
|
||||
import type { ControlService } from './nonInteractive/control/ControlService.js';
|
||||
|
||||
import { handleSlashCommand } from './nonInteractiveCliCommands.js';
|
||||
import { ConsolePatcher } from './ui/utils/ConsolePatcher.js';
|
||||
import { handleAtCommand } from './ui/hooks/atCommandProcessor.js';
|
||||
import {
|
||||
handleError,
|
||||
@@ -30,73 +32,144 @@ import {
|
||||
handleCancellationError,
|
||||
handleMaxTurnsExceededError,
|
||||
} from './utils/errors.js';
|
||||
import {
|
||||
normalizePartList,
|
||||
extractPartsFromUserMessage,
|
||||
buildSystemMessage,
|
||||
createTaskToolProgressHandler,
|
||||
computeUsageFromMetrics,
|
||||
} from './utils/nonInteractiveHelpers.js';
|
||||
|
||||
/**
|
||||
* Provides optional overrides for `runNonInteractive` execution.
|
||||
*
|
||||
* @param abortController - Optional abort controller for cancellation.
|
||||
* @param adapter - Optional JSON output adapter for structured output formats.
|
||||
* @param userMessage - Optional CLI user message payload for preformatted input.
|
||||
* @param controlService - Optional control service for future permission handling.
|
||||
*/
|
||||
export interface RunNonInteractiveOptions {
|
||||
abortController?: AbortController;
|
||||
adapter?: JsonOutputAdapterInterface;
|
||||
userMessage?: CLIUserMessage;
|
||||
controlService?: ControlService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the non-interactive CLI flow for a single request.
|
||||
*/
|
||||
export async function runNonInteractive(
|
||||
config: Config,
|
||||
settings: LoadedSettings,
|
||||
input: string,
|
||||
prompt_id: string,
|
||||
options: RunNonInteractiveOptions = {},
|
||||
): Promise<void> {
|
||||
return promptIdContext.run(prompt_id, async () => {
|
||||
const consolePatcher = new ConsolePatcher({
|
||||
stderr: true,
|
||||
debugMode: config.getDebugMode(),
|
||||
});
|
||||
// Create output adapter based on format
|
||||
let adapter: JsonOutputAdapterInterface | undefined;
|
||||
const outputFormat = config.getOutputFormat();
|
||||
|
||||
if (options.adapter) {
|
||||
adapter = options.adapter;
|
||||
} else if (outputFormat === OutputFormat.JSON) {
|
||||
adapter = new JsonOutputAdapter(config);
|
||||
} else if (outputFormat === OutputFormat.STREAM_JSON) {
|
||||
adapter = new StreamJsonOutputAdapter(
|
||||
config,
|
||||
config.getIncludePartialMessages(),
|
||||
);
|
||||
}
|
||||
|
||||
// Get readonly values once at the start
|
||||
const sessionId = config.getSessionId();
|
||||
const permissionMode = config.getApprovalMode() as PermissionMode;
|
||||
|
||||
let turnCount = 0;
|
||||
let totalApiDurationMs = 0;
|
||||
const startTime = Date.now();
|
||||
|
||||
const stdoutErrorHandler = (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === 'EPIPE') {
|
||||
process.stdout.removeListener('error', stdoutErrorHandler);
|
||||
process.exit(0);
|
||||
}
|
||||
};
|
||||
|
||||
const geminiClient = config.getGeminiClient();
|
||||
const abortController = options.abortController ?? new AbortController();
|
||||
|
||||
// Setup signal handlers for graceful shutdown
|
||||
const shutdownHandler = () => {
|
||||
if (config.getDebugMode()) {
|
||||
console.error('[runNonInteractive] Shutdown signal received');
|
||||
}
|
||||
abortController.abort();
|
||||
};
|
||||
|
||||
try {
|
||||
consolePatcher.patch();
|
||||
// Handle EPIPE errors when the output is piped to a command that closes early.
|
||||
process.stdout.on('error', (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === 'EPIPE') {
|
||||
// Exit gracefully if the pipe is closed.
|
||||
process.exit(0);
|
||||
}
|
||||
});
|
||||
process.stdout.on('error', stdoutErrorHandler);
|
||||
|
||||
const geminiClient = config.getGeminiClient();
|
||||
process.on('SIGINT', shutdownHandler);
|
||||
process.on('SIGTERM', shutdownHandler);
|
||||
|
||||
const abortController = new AbortController();
|
||||
let initialPartList: PartListUnion | null = extractPartsFromUserMessage(
|
||||
options.userMessage,
|
||||
);
|
||||
|
||||
let query: Part[] | undefined;
|
||||
|
||||
if (isSlashCommand(input)) {
|
||||
const slashCommandResult = await handleSlashCommand(
|
||||
input,
|
||||
abortController,
|
||||
config,
|
||||
settings,
|
||||
);
|
||||
// If a slash command is found and returns a prompt, use it.
|
||||
// Otherwise, slashCommandResult fall through to the default prompt
|
||||
// handling.
|
||||
if (slashCommandResult) {
|
||||
query = slashCommandResult as Part[];
|
||||
}
|
||||
}
|
||||
|
||||
if (!query) {
|
||||
const { processedQuery, shouldProceed } = await handleAtCommand({
|
||||
query: input,
|
||||
config,
|
||||
addItem: (_item, _timestamp) => 0,
|
||||
onDebugMessage: () => {},
|
||||
messageId: Date.now(),
|
||||
signal: abortController.signal,
|
||||
});
|
||||
|
||||
if (!shouldProceed || !processedQuery) {
|
||||
// An error occurred during @include processing (e.g., file not found).
|
||||
// The error message is already logged by handleAtCommand.
|
||||
throw new FatalInputError(
|
||||
'Exiting due to an error processing the @ command.',
|
||||
if (!initialPartList) {
|
||||
let slashHandled = false;
|
||||
if (isSlashCommand(input)) {
|
||||
const slashCommandResult = await handleSlashCommand(
|
||||
input,
|
||||
abortController,
|
||||
config,
|
||||
settings,
|
||||
);
|
||||
if (slashCommandResult) {
|
||||
// A slash command can replace the prompt entirely; fall back to @-command processing otherwise.
|
||||
initialPartList = slashCommandResult as PartListUnion;
|
||||
slashHandled = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!slashHandled) {
|
||||
const { processedQuery, shouldProceed } = await handleAtCommand({
|
||||
query: input,
|
||||
config,
|
||||
addItem: (_item, _timestamp) => 0,
|
||||
onDebugMessage: () => {},
|
||||
messageId: Date.now(),
|
||||
signal: abortController.signal,
|
||||
});
|
||||
|
||||
if (!shouldProceed || !processedQuery) {
|
||||
// An error occurred during @include processing (e.g., file not found).
|
||||
// The error message is already logged by handleAtCommand.
|
||||
throw new FatalInputError(
|
||||
'Exiting due to an error processing the @ command.',
|
||||
);
|
||||
}
|
||||
initialPartList = processedQuery as PartListUnion;
|
||||
}
|
||||
query = processedQuery as Part[];
|
||||
}
|
||||
|
||||
let currentMessages: Content[] = [{ role: 'user', parts: query }];
|
||||
if (!initialPartList) {
|
||||
initialPartList = [{ text: input }];
|
||||
}
|
||||
|
||||
const initialParts = normalizePartList(initialPartList);
|
||||
let currentMessages: Content[] = [{ role: 'user', parts: initialParts }];
|
||||
|
||||
if (adapter) {
|
||||
const systemMessage = await buildSystemMessage(
|
||||
config,
|
||||
sessionId,
|
||||
permissionMode,
|
||||
);
|
||||
adapter.emitMessage(systemMessage);
|
||||
}
|
||||
|
||||
let turnCount = 0;
|
||||
while (true) {
|
||||
turnCount++;
|
||||
if (
|
||||
@@ -105,43 +178,124 @@ export async function runNonInteractive(
|
||||
) {
|
||||
handleMaxTurnsExceededError(config);
|
||||
}
|
||||
const toolCallRequests: ToolCallRequestInfo[] = [];
|
||||
|
||||
const toolCallRequests: ToolCallRequestInfo[] = [];
|
||||
const apiStartTime = Date.now();
|
||||
const responseStream = geminiClient.sendMessageStream(
|
||||
currentMessages[0]?.parts || [],
|
||||
abortController.signal,
|
||||
prompt_id,
|
||||
);
|
||||
|
||||
let responseText = '';
|
||||
// Start assistant message for this turn
|
||||
if (adapter) {
|
||||
adapter.startAssistantMessage();
|
||||
}
|
||||
|
||||
for await (const event of responseStream) {
|
||||
if (abortController.signal.aborted) {
|
||||
handleCancellationError(config);
|
||||
}
|
||||
|
||||
if (event.type === GeminiEventType.Content) {
|
||||
if (config.getOutputFormat() === OutputFormat.JSON) {
|
||||
responseText += event.value;
|
||||
} else {
|
||||
process.stdout.write(event.value);
|
||||
if (adapter) {
|
||||
// Use adapter for all event processing
|
||||
adapter.processEvent(event);
|
||||
if (event.type === GeminiEventType.ToolCallRequest) {
|
||||
toolCallRequests.push(event.value);
|
||||
}
|
||||
} else {
|
||||
// Text output mode - direct stdout
|
||||
if (event.type === GeminiEventType.Content) {
|
||||
process.stdout.write(event.value);
|
||||
} else if (event.type === GeminiEventType.ToolCallRequest) {
|
||||
toolCallRequests.push(event.value);
|
||||
}
|
||||
} else if (event.type === GeminiEventType.ToolCallRequest) {
|
||||
toolCallRequests.push(event.value);
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize assistant message
|
||||
if (adapter) {
|
||||
adapter.finalizeAssistantMessage();
|
||||
}
|
||||
totalApiDurationMs += Date.now() - apiStartTime;
|
||||
|
||||
if (toolCallRequests.length > 0) {
|
||||
const toolResponseParts: Part[] = [];
|
||||
|
||||
for (const requestInfo of toolCallRequests) {
|
||||
const finalRequestInfo = requestInfo;
|
||||
|
||||
/*
|
||||
if (options.controlService) {
|
||||
const permissionResult =
|
||||
await options.controlService.permission.shouldAllowTool(
|
||||
requestInfo,
|
||||
);
|
||||
if (!permissionResult.allowed) {
|
||||
if (config.getDebugMode()) {
|
||||
console.error(
|
||||
`[runNonInteractive] Tool execution denied: ${requestInfo.name}`,
|
||||
permissionResult.message ?? '',
|
||||
);
|
||||
}
|
||||
if (adapter && permissionResult.message) {
|
||||
adapter.emitSystemMessage('tool_denied', {
|
||||
tool: requestInfo.name,
|
||||
message: permissionResult.message,
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (permissionResult.updatedArgs) {
|
||||
finalRequestInfo = {
|
||||
...requestInfo,
|
||||
args: permissionResult.updatedArgs,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const toolCallUpdateCallback = options.controlService
|
||||
? options.controlService.permission.getToolCallUpdateCallback()
|
||||
: undefined;
|
||||
*/
|
||||
|
||||
// Only pass outputUpdateHandler for Task tool
|
||||
const isTaskTool = finalRequestInfo.name === 'task';
|
||||
const taskToolProgress = isTaskTool
|
||||
? createTaskToolProgressHandler(
|
||||
config,
|
||||
finalRequestInfo.callId,
|
||||
adapter,
|
||||
)
|
||||
: undefined;
|
||||
const taskToolProgressHandler = taskToolProgress?.handler;
|
||||
const toolResponse = await executeToolCall(
|
||||
config,
|
||||
requestInfo,
|
||||
finalRequestInfo,
|
||||
abortController.signal,
|
||||
isTaskTool && taskToolProgressHandler
|
||||
? {
|
||||
outputUpdateHandler: taskToolProgressHandler,
|
||||
/*
|
||||
toolCallUpdateCallback
|
||||
? { onToolCallsUpdate: toolCallUpdateCallback }
|
||||
: undefined,
|
||||
*/
|
||||
}
|
||||
: undefined,
|
||||
);
|
||||
|
||||
// Note: In JSON mode, subagent messages are automatically added to the main
|
||||
// adapter's messages array and will be output together on emitResult()
|
||||
|
||||
if (toolResponse.error) {
|
||||
// In JSON/STREAM_JSON mode, tool errors are tolerated and formatted
|
||||
// as tool_result blocks. handleToolError will detect JSON/STREAM_JSON mode
|
||||
// from config and allow the session to continue so the LLM can decide what to do next.
|
||||
// In text mode, we still log the error.
|
||||
handleToolError(
|
||||
requestInfo.name,
|
||||
finalRequestInfo.name,
|
||||
toolResponse.error,
|
||||
config,
|
||||
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
|
||||
@@ -149,6 +303,13 @@ export async function runNonInteractive(
|
||||
? toolResponse.resultDisplay
|
||||
: undefined,
|
||||
);
|
||||
// Note: We no longer emit a separate system message for tool errors
|
||||
// in JSON/STREAM_JSON mode, as the error is already captured in the
|
||||
// tool_result block with is_error=true.
|
||||
}
|
||||
|
||||
if (adapter) {
|
||||
adapter.emitToolResult(finalRequestInfo, toolResponse);
|
||||
}
|
||||
|
||||
if (toolResponse.responseParts) {
|
||||
@@ -157,20 +318,57 @@ export async function runNonInteractive(
|
||||
}
|
||||
currentMessages = [{ role: 'user', parts: toolResponseParts }];
|
||||
} else {
|
||||
if (config.getOutputFormat() === OutputFormat.JSON) {
|
||||
const formatter = new JsonFormatter();
|
||||
const stats = uiTelemetryService.getMetrics();
|
||||
process.stdout.write(formatter.format(responseText, stats));
|
||||
// For JSON and STREAM_JSON modes, compute usage from metrics
|
||||
if (adapter) {
|
||||
const metrics = uiTelemetryService.getMetrics();
|
||||
const usage = computeUsageFromMetrics(metrics);
|
||||
// Get stats for JSON format output
|
||||
const stats =
|
||||
outputFormat === OutputFormat.JSON
|
||||
? uiTelemetryService.getMetrics()
|
||||
: undefined;
|
||||
adapter.emitResult({
|
||||
isError: false,
|
||||
durationMs: Date.now() - startTime,
|
||||
apiDurationMs: totalApiDurationMs,
|
||||
numTurns: turnCount,
|
||||
usage,
|
||||
stats,
|
||||
});
|
||||
} else {
|
||||
process.stdout.write('\n'); // Ensure a final newline
|
||||
// Text output mode - no usage needed
|
||||
process.stdout.write('\n');
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// For JSON and STREAM_JSON modes, compute usage from metrics
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
if (adapter) {
|
||||
const metrics = uiTelemetryService.getMetrics();
|
||||
const usage = computeUsageFromMetrics(metrics);
|
||||
// Get stats for JSON format output
|
||||
const stats =
|
||||
outputFormat === OutputFormat.JSON
|
||||
? uiTelemetryService.getMetrics()
|
||||
: undefined;
|
||||
adapter.emitResult({
|
||||
isError: true,
|
||||
durationMs: Date.now() - startTime,
|
||||
apiDurationMs: totalApiDurationMs,
|
||||
numTurns: turnCount,
|
||||
errorMessage: message,
|
||||
usage,
|
||||
stats,
|
||||
});
|
||||
}
|
||||
handleError(error, config);
|
||||
} finally {
|
||||
consolePatcher.cleanup();
|
||||
process.stdout.removeListener('error', stdoutErrorHandler);
|
||||
// Cleanup signal handlers
|
||||
process.removeListener('SIGINT', shutdownHandler);
|
||||
process.removeListener('SIGTERM', shutdownHandler);
|
||||
if (isTelemetrySdkInitialized()) {
|
||||
await shutdownTelemetry(config);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user