From 270dda4aa7be61a707da15cc7bb257d575f7f437 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Mon, 13 Oct 2025 09:25:31 +0800 Subject: [PATCH] fix: invalid tool_calls request due to improper cancellation (#790) --- .../openaiContentGenerator/pipeline.test.ts | 78 +++++++++++++++++++ .../core/openaiContentGenerator/pipeline.ts | 6 ++ 2 files changed, 84 insertions(+) diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts index ce5b9d6d..c92e7a79 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts @@ -161,6 +161,9 @@ describe('ContentGenerationPipeline', () => { top_p: 0.9, max_tokens: 1000, }), + expect.objectContaining({ + signal: undefined, + }), ); expect(mockConverter.convertOpenAIResponseToGemini).toHaveBeenCalledWith( mockOpenAIResponse, @@ -238,6 +241,9 @@ describe('ContentGenerationPipeline', () => { expect.objectContaining({ tools: mockTools, }), + expect.objectContaining({ + signal: undefined, + }), ); }); @@ -274,6 +280,30 @@ describe('ContentGenerationPipeline', () => { request, ); }); + + it('should pass abort signal to OpenAI client when provided', async () => { + const abortController = new AbortController(); + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ parts: [{ text: 'Hello' }], role: 'user' }], + config: { abortSignal: abortController.signal }, + }; + + (mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]); + (mockConverter.convertOpenAIResponseToGemini as Mock).mockReturnValue( + new GenerateContentResponse(), + ); + (mockClient.chat.completions.create as Mock).mockResolvedValue({ + choices: [{ message: { content: 'response' } }], + }); + + await pipeline.execute(request, 'test-id'); + + expect(mockClient.chat.completions.create).toHaveBeenCalledWith( + expect.any(Object), + expect.objectContaining({ signal: abortController.signal }), + ); + }); }); describe('executeStream', () => { @@ -338,6 +368,9 @@ describe('ContentGenerationPipeline', () => { stream: true, stream_options: { include_usage: true }, }), + expect.objectContaining({ + signal: undefined, + }), ); expect(mockTelemetryService.logStreamingSuccess).toHaveBeenCalledWith( expect.objectContaining({ @@ -470,6 +503,42 @@ describe('ContentGenerationPipeline', () => { ); }); + it('should pass abort signal to OpenAI client for streaming requests', async () => { + const abortController = new AbortController(); + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ parts: [{ text: 'Hello' }], role: 'user' }], + config: { abortSignal: abortController.signal }, + }; + + const mockStream = { + async *[Symbol.asyncIterator]() { + yield { + id: 'chunk-1', + choices: [{ delta: { content: 'Hello' }, finish_reason: 'stop' }], + }; + }, + }; + + (mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]); + (mockConverter.convertOpenAIChunkToGemini as Mock).mockReturnValue( + new GenerateContentResponse(), + ); + (mockClient.chat.completions.create as Mock).mockResolvedValue( + mockStream, + ); + + const resultGenerator = await pipeline.executeStream(request, 'test-id'); + for await (const _result of resultGenerator) { + // Consume stream + } + + expect(mockClient.chat.completions.create).toHaveBeenCalledWith( + expect.any(Object), + expect.objectContaining({ signal: abortController.signal }), + ); + }); + it('should merge finishReason and usageMetadata from separate chunks', async () => { // Arrange const request: GenerateContentParameters = { @@ -924,6 +993,9 @@ describe('ContentGenerationPipeline', () => { top_p: 0.9, // Config parameter used since request overrides are not being applied in current implementation max_tokens: 1000, // Config parameter used since request overrides are not being applied in current implementation }), + expect.objectContaining({ + signal: undefined, + }), ); }); @@ -960,6 +1032,9 @@ describe('ContentGenerationPipeline', () => { top_p: 0.9, // From config max_tokens: 1000, // From config }), + expect.objectContaining({ + signal: undefined, + }), ); }); @@ -1009,6 +1084,9 @@ describe('ContentGenerationPipeline', () => { expect.objectContaining({ metadata: { promptId: userPromptId }, }), + expect.objectContaining({ + signal: undefined, + }), ); }); }); diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.ts b/packages/core/src/core/openaiContentGenerator/pipeline.ts index 6e67d8f0..587afda4 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.ts @@ -48,6 +48,9 @@ export class ContentGenerationPipeline { async (openaiRequest, context) => { const openaiResponse = (await this.client.chat.completions.create( openaiRequest, + { + signal: request.config?.abortSignal, + }, )) as OpenAI.Chat.ChatCompletion; const geminiResponse = @@ -78,6 +81,9 @@ export class ContentGenerationPipeline { // Stage 1: Create OpenAI stream const stream = (await this.client.chat.completions.create( openaiRequest, + { + signal: request.config?.abortSignal, + }, )) as AsyncIterable; // Stage 2: Process stream with conversion and logging