From c3032db8b832318f097640600fa9ad94dc2ec677 Mon Sep 17 00:00:00 2001 From: Mingholy Date: Fri, 12 Sep 2025 13:25:48 +0800 Subject: [PATCH] fix: relax chunk validation to avoid unnecessary retry (#584) * fix: relax chunk validation to avoid unnecessary retry * fix: merge tail chunks for stable finishReason and usageMetadata --- packages/core/src/core/geminiChat.ts | 10 +- .../core/openaiContentGenerator/converter.ts | 27 +- .../openaiContentGenerator/pipeline.test.ts | 427 +++++++++++++++++- .../core/openaiContentGenerator/pipeline.ts | 117 ++++- 4 files changed, 550 insertions(+), 31 deletions(-) diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index 6b4e9bde..5db2a286 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -50,9 +50,13 @@ const INVALID_CONTENT_RETRY_OPTIONS: ContentRetryOptions = { }; /** * Returns true if the response is valid, false otherwise. + * + * The DashScope provider may return the last 2 chunks as: + * 1. A choice(candidate) with finishReason and empty content + * 2. Empty choices with usage metadata + * We'll check separately for both of these cases. */ function isValidResponse(response: GenerateContentResponse): boolean { - // The Dashscope provider returns empty content with usage metadata at the end of the stream if (response.usageMetadata) { return true; } @@ -61,6 +65,10 @@ function isValidResponse(response: GenerateContentResponse): boolean { return false; } + if (response.candidates.some((candidate) => candidate.finishReason)) { + return true; + } + const content = response.candidates[0]?.content; return content !== undefined && isValidContent(content); } diff --git a/packages/core/src/core/openaiContentGenerator/converter.ts b/packages/core/src/core/openaiContentGenerator/converter.ts index e4b9c220..b70f65f6 100644 --- a/packages/core/src/core/openaiContentGenerator/converter.ts +++ b/packages/core/src/core/openaiContentGenerator/converter.ts @@ -18,6 +18,7 @@ import { ContentListUnion, ContentUnion, PartUnion, + Candidate, } from '@google/genai'; import OpenAI from 'openai'; import { safeJsonParse } from '../../utils/safeJsonParse.js'; @@ -652,19 +653,21 @@ export class OpenAIContentConverter { this.streamingToolCallParser.reset(); } - response.candidates = [ - { - content: { - parts, - role: 'model' as const, - }, - finishReason: choice.finish_reason - ? this.mapOpenAIFinishReasonToGemini(choice.finish_reason) - : FinishReason.FINISH_REASON_UNSPECIFIED, - index: 0, - safetyRatings: [], + // Only include finishReason key if finish_reason is present + const candidate: Candidate = { + content: { + parts, + role: 'model' as const, }, - ]; + index: 0, + safetyRatings: [], + }; + if (choice.finish_reason) { + candidate.finishReason = this.mapOpenAIFinishReasonToGemini( + choice.finish_reason, + ); + } + response.candidates = [candidate]; } else { response.candidates = []; } diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts index 20c53e90..db778166 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.test.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.test.ts @@ -4,20 +4,21 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { describe, it, expect, beforeEach, vi, Mock } from 'vitest'; +import { describe, it, expect, beforeEach, vi, type Mock } from 'vitest'; import OpenAI from 'openai'; import { - GenerateContentParameters, + type GenerateContentParameters, GenerateContentResponse, Type, + FinishReason, } from '@google/genai'; -import { ContentGenerationPipeline, PipelineConfig } from './pipeline.js'; +import { ContentGenerationPipeline, type PipelineConfig } from './pipeline.js'; import { OpenAIContentConverter } from './converter.js'; import { Config } from '../../config/config.js'; -import { ContentGeneratorConfig, AuthType } from '../contentGenerator.js'; -import { OpenAICompatibleProvider } from './provider/index.js'; -import { TelemetryService } from './telemetryService.js'; -import { ErrorHandler } from './errorHandler.js'; +import { type ContentGeneratorConfig, AuthType } from '../contentGenerator.js'; +import { type OpenAICompatibleProvider } from './provider/index.js'; +import { type TelemetryService } from './telemetryService.js'; +import { type ErrorHandler } from './errorHandler.js'; // Mock dependencies vi.mock('./converter.js'); @@ -470,6 +471,418 @@ describe('ContentGenerationPipeline', () => { request, ); }); + + it('should merge finishReason and usageMetadata from separate chunks', async () => { + // Arrange + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ parts: [{ text: 'Hello' }], role: 'user' }], + }; + const userPromptId = 'test-prompt-id'; + + // Content chunk + const mockChunk1 = { + id: 'chunk-1', + choices: [ + { delta: { content: 'Hello response' }, finish_reason: null }, + ], + } as OpenAI.Chat.ChatCompletionChunk; + + // Finish reason chunk (empty content, has finish_reason) + const mockChunk2 = { + id: 'chunk-2', + choices: [{ delta: { content: '' }, finish_reason: 'stop' }], + } as OpenAI.Chat.ChatCompletionChunk; + + // Usage metadata chunk (empty candidates, has usage) + const mockChunk3 = { + id: 'chunk-3', + object: 'chat.completion.chunk', + created: Date.now(), + model: 'test-model', + choices: [], + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }, + } as OpenAI.Chat.ChatCompletionChunk; + + const mockStream = { + async *[Symbol.asyncIterator]() { + yield mockChunk1; + yield mockChunk2; + yield mockChunk3; + }, + }; + + // Mock converter responses + const mockContentResponse = new GenerateContentResponse(); + mockContentResponse.candidates = [ + { content: { parts: [{ text: 'Hello response' }], role: 'model' } }, + ]; + + const mockFinishResponse = new GenerateContentResponse(); + mockFinishResponse.candidates = [ + { + content: { parts: [], role: 'model' }, + finishReason: FinishReason.STOP, + }, + ]; + + const mockUsageResponse = new GenerateContentResponse(); + mockUsageResponse.candidates = []; + mockUsageResponse.usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + + // Expected merged response (finishReason + usageMetadata combined) + const mockMergedResponse = new GenerateContentResponse(); + mockMergedResponse.candidates = [ + { + content: { parts: [], role: 'model' }, + finishReason: FinishReason.STOP, + }, + ]; + mockMergedResponse.usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + + (mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]); + (mockConverter.convertOpenAIChunkToGemini as Mock) + .mockReturnValueOnce(mockContentResponse) + .mockReturnValueOnce(mockFinishResponse) + .mockReturnValueOnce(mockUsageResponse); + (mockClient.chat.completions.create as Mock).mockResolvedValue( + mockStream, + ); + + // Act + const resultGenerator = await pipeline.executeStream( + request, + userPromptId, + ); + const results = []; + for await (const result of resultGenerator) { + results.push(result); + } + + // Assert + expect(results).toHaveLength(2); // Content chunk + merged finish/usage chunk + expect(results[0]).toBe(mockContentResponse); + + // The last result should have both finishReason and usageMetadata + const lastResult = results[1]; + expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP); + expect(lastResult.usageMetadata).toEqual({ + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }); + + expect(mockTelemetryService.logStreamingSuccess).toHaveBeenCalledWith( + expect.objectContaining({ + userPromptId, + model: 'test-model', + authType: 'openai', + isStreaming: true, + }), + results, + expect.any(Object), + [mockChunk1, mockChunk2, mockChunk3], + ); + }); + + it('should handle ideal case where last chunk has both finishReason and usageMetadata', async () => { + // Arrange + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ parts: [{ text: 'Hello' }], role: 'user' }], + }; + const userPromptId = 'test-prompt-id'; + + // Content chunk + const mockChunk1 = { + id: 'chunk-1', + choices: [ + { delta: { content: 'Hello response' }, finish_reason: null }, + ], + } as OpenAI.Chat.ChatCompletionChunk; + + // Final chunk with both finish_reason and usage (ideal case) + const mockChunk2 = { + id: 'chunk-2', + choices: [{ delta: { content: '' }, finish_reason: 'stop' }], + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }, + } as OpenAI.Chat.ChatCompletionChunk; + + const mockStream = { + async *[Symbol.asyncIterator]() { + yield mockChunk1; + yield mockChunk2; + }, + }; + + // Mock converter responses + const mockContentResponse = new GenerateContentResponse(); + mockContentResponse.candidates = [ + { content: { parts: [{ text: 'Hello response' }], role: 'model' } }, + ]; + + const mockFinalResponse = new GenerateContentResponse(); + mockFinalResponse.candidates = [ + { + content: { parts: [], role: 'model' }, + finishReason: FinishReason.STOP, + }, + ]; + mockFinalResponse.usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + + (mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]); + (mockConverter.convertOpenAIChunkToGemini as Mock) + .mockReturnValueOnce(mockContentResponse) + .mockReturnValueOnce(mockFinalResponse); + (mockClient.chat.completions.create as Mock).mockResolvedValue( + mockStream, + ); + + // Act + const resultGenerator = await pipeline.executeStream( + request, + userPromptId, + ); + const results = []; + for await (const result of resultGenerator) { + results.push(result); + } + + // Assert + expect(results).toHaveLength(2); + expect(results[0]).toBe(mockContentResponse); + expect(results[1]).toBe(mockFinalResponse); + + // The last result should have both finishReason and usageMetadata + const lastResult = results[1]; + expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP); + expect(lastResult.usageMetadata).toEqual({ + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }); + }); + + it('should handle providers that send zero usage in finish chunk (like modelscope)', async () => { + // Arrange + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ parts: [{ text: 'Hello' }], role: 'user' }], + }; + const userPromptId = 'test-prompt-id'; + + // Content chunk with zero usage (typical for modelscope) + const mockChunk1 = { + id: 'chunk-1', + choices: [ + { delta: { content: 'Hello response' }, finish_reason: null }, + ], + usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }, + } as OpenAI.Chat.ChatCompletionChunk; + + // Finish chunk with zero usage (has finishReason but usage is all zeros) + const mockChunk2 = { + id: 'chunk-2', + choices: [{ delta: { content: '' }, finish_reason: 'stop' }], + usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }, + } as OpenAI.Chat.ChatCompletionChunk; + + // Final usage chunk with actual usage data + const mockChunk3 = { + id: 'chunk-3', + object: 'chat.completion.chunk', + created: Date.now(), + model: 'test-model', + choices: [], + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }, + } as OpenAI.Chat.ChatCompletionChunk; + + const mockStream = { + async *[Symbol.asyncIterator]() { + yield mockChunk1; + yield mockChunk2; + yield mockChunk3; + }, + }; + + // Mock converter responses + const mockContentResponse = new GenerateContentResponse(); + mockContentResponse.candidates = [ + { content: { parts: [{ text: 'Hello response' }], role: 'model' } }, + ]; + // Content chunk has zero usage metadata (should be filtered or ignored) + mockContentResponse.usageMetadata = { + promptTokenCount: 0, + candidatesTokenCount: 0, + totalTokenCount: 0, + }; + + const mockFinishResponseWithZeroUsage = new GenerateContentResponse(); + mockFinishResponseWithZeroUsage.candidates = [ + { + content: { parts: [], role: 'model' }, + finishReason: FinishReason.STOP, + }, + ]; + // Finish chunk has zero usage metadata (should be treated as no usage) + mockFinishResponseWithZeroUsage.usageMetadata = { + promptTokenCount: 0, + candidatesTokenCount: 0, + totalTokenCount: 0, + }; + + const mockUsageResponse = new GenerateContentResponse(); + mockUsageResponse.candidates = []; + mockUsageResponse.usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + + (mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]); + (mockConverter.convertOpenAIChunkToGemini as Mock) + .mockReturnValueOnce(mockContentResponse) + .mockReturnValueOnce(mockFinishResponseWithZeroUsage) + .mockReturnValueOnce(mockUsageResponse); + (mockClient.chat.completions.create as Mock).mockResolvedValue( + mockStream, + ); + + // Act + const resultGenerator = await pipeline.executeStream( + request, + userPromptId, + ); + const results = []; + for await (const result of resultGenerator) { + results.push(result); + } + + // Assert + expect(results).toHaveLength(2); // Content chunk + merged finish/usage chunk + expect(results[0]).toBe(mockContentResponse); + + // The last result should have both finishReason and valid usageMetadata + const lastResult = results[1]; + expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP); + expect(lastResult.usageMetadata).toEqual({ + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }); + + expect(mockTelemetryService.logStreamingSuccess).toHaveBeenCalledWith( + expect.objectContaining({ + userPromptId, + model: 'test-model', + authType: 'openai', + isStreaming: true, + }), + results, + expect.any(Object), + [mockChunk1, mockChunk2, mockChunk3], + ); + }); + + it('should handle providers that send finishReason and valid usage in same chunk', async () => { + // Arrange + const request: GenerateContentParameters = { + model: 'test-model', + contents: [{ parts: [{ text: 'Hello' }], role: 'user' }], + }; + const userPromptId = 'test-prompt-id'; + + // Content chunk with zero usage + const mockChunk1 = { + id: 'chunk-1', + choices: [ + { delta: { content: 'Hello response' }, finish_reason: null }, + ], + usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }, + } as OpenAI.Chat.ChatCompletionChunk; + + // Finish chunk with both finishReason and valid usage in same chunk + const mockChunk2 = { + id: 'chunk-2', + choices: [{ delta: { content: '' }, finish_reason: 'stop' }], + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }, + } as OpenAI.Chat.ChatCompletionChunk; + + const mockStream = { + async *[Symbol.asyncIterator]() { + yield mockChunk1; + yield mockChunk2; + }, + }; + + // Mock converter responses + const mockContentResponse = new GenerateContentResponse(); + mockContentResponse.candidates = [ + { content: { parts: [{ text: 'Hello response' }], role: 'model' } }, + ]; + mockContentResponse.usageMetadata = { + promptTokenCount: 0, + candidatesTokenCount: 0, + totalTokenCount: 0, + }; + + const mockFinalResponse = new GenerateContentResponse(); + mockFinalResponse.candidates = [ + { + content: { parts: [], role: 'model' }, + finishReason: FinishReason.STOP, + }, + ]; + mockFinalResponse.usageMetadata = { + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }; + + (mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]); + (mockConverter.convertOpenAIChunkToGemini as Mock) + .mockReturnValueOnce(mockContentResponse) + .mockReturnValueOnce(mockFinalResponse); + (mockClient.chat.completions.create as Mock).mockResolvedValue( + mockStream, + ); + + // Act + const resultGenerator = await pipeline.executeStream( + request, + userPromptId, + ); + const results = []; + for await (const result of resultGenerator) { + results.push(result); + } + + // Assert + expect(results).toHaveLength(2); + expect(results[0]).toBe(mockContentResponse); + expect(results[1]).toBe(mockFinalResponse); + + // The last result should have both finishReason and valid usageMetadata + const lastResult = results[1]; + expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP); + expect(lastResult.usageMetadata).toEqual({ + promptTokenCount: 10, + candidatesTokenCount: 20, + totalTokenCount: 30, + }); + }); }); describe('buildRequest', () => { diff --git a/packages/core/src/core/openaiContentGenerator/pipeline.ts b/packages/core/src/core/openaiContentGenerator/pipeline.ts index 306344a3..d2922fab 100644 --- a/packages/core/src/core/openaiContentGenerator/pipeline.ts +++ b/packages/core/src/core/openaiContentGenerator/pipeline.ts @@ -6,15 +6,18 @@ import OpenAI from 'openai'; import { - GenerateContentParameters, + type GenerateContentParameters, GenerateContentResponse, } from '@google/genai'; import { Config } from '../../config/config.js'; -import { ContentGeneratorConfig } from '../contentGenerator.js'; +import { type ContentGeneratorConfig } from '../contentGenerator.js'; import { type OpenAICompatibleProvider } from './provider/index.js'; import { OpenAIContentConverter } from './converter.js'; -import { TelemetryService, RequestContext } from './telemetryService.js'; -import { ErrorHandler } from './errorHandler.js'; +import { + type TelemetryService, + type RequestContext, +} from './telemetryService.js'; +import { type ErrorHandler } from './errorHandler.js'; export interface PipelineConfig { cliConfig: Config; @@ -96,8 +99,9 @@ export class ContentGenerationPipeline { * This method handles the complete stream processing pipeline: * 1. Convert OpenAI chunks to Gemini format while preserving original chunks * 2. Filter empty responses - * 3. Collect both formats for logging - * 4. Handle success/error logging with original OpenAI format + * 3. Handle chunk merging for providers that send finishReason and usageMetadata separately + * 4. Collect both formats for logging + * 5. Handle success/error logging with original OpenAI format */ private async *processStreamWithLogging( stream: AsyncIterable, @@ -111,6 +115,9 @@ export class ContentGenerationPipeline { // Reset streaming tool calls to prevent data pollution from previous streams this.converter.resetStreamingToolCalls(); + // State for handling chunk merging + let pendingFinishResponse: GenerateContentResponse | null = null; + try { // Stage 2a: Convert and yield each chunk while preserving original for await (const chunk of stream) { @@ -119,18 +126,40 @@ export class ContentGenerationPipeline { // Stage 2b: Filter empty responses to avoid downstream issues if ( response.candidates?.[0]?.content?.parts?.length === 0 && + !response.candidates?.[0]?.finishReason && !response.usageMetadata ) { continue; } - // Stage 2c: Collect both formats and yield Gemini format to consumer - collectedGeminiResponses.push(response); - collectedOpenAIChunks.push(chunk); - yield response; + // Stage 2c: Handle chunk merging for providers that send finishReason and usageMetadata separately + const shouldYield = this.handleChunkMerging( + response, + chunk, + collectedGeminiResponses, + collectedOpenAIChunks, + (mergedResponse) => { + pendingFinishResponse = mergedResponse; + }, + ); + + if (shouldYield) { + // If we have a pending finish response, yield it instead + if (pendingFinishResponse) { + yield pendingFinishResponse; + pendingFinishResponse = null; + } else { + yield response; + } + } } - // Stage 2d: Stream completed successfully - perform logging with original OpenAI chunks + // Stage 2d: If there's still a pending finish response at the end, yield it + if (pendingFinishResponse) { + yield pendingFinishResponse; + } + + // Stage 2e: Stream completed successfully - perform logging with original OpenAI chunks context.duration = Date.now() - context.startTime; await this.config.telemetryService.logStreamingSuccess( @@ -156,6 +185,72 @@ export class ContentGenerationPipeline { } } + /** + * Handle chunk merging for providers that send finishReason and usageMetadata separately. + * + * Strategy: When we encounter a finishReason chunk, we hold it and merge all subsequent + * chunks into it until the stream ends. This ensures the final chunk contains both + * finishReason and the most up-to-date usage information from any provider pattern. + * + * @param response Current Gemini response + * @param chunk Current OpenAI chunk + * @param collectedGeminiResponses Array to collect responses for logging + * @param collectedOpenAIChunks Array to collect chunks for logging + * @param setPendingFinish Callback to set pending finish response + * @returns true if the response should be yielded, false if it should be held for merging + */ + private handleChunkMerging( + response: GenerateContentResponse, + chunk: OpenAI.Chat.ChatCompletionChunk, + collectedGeminiResponses: GenerateContentResponse[], + collectedOpenAIChunks: OpenAI.Chat.ChatCompletionChunk[], + setPendingFinish: (response: GenerateContentResponse) => void, + ): boolean { + const isFinishChunk = response.candidates?.[0]?.finishReason; + + // Check if we have a pending finish response from previous chunks + const hasPendingFinish = + collectedGeminiResponses.length > 0 && + collectedGeminiResponses[collectedGeminiResponses.length - 1] + .candidates?.[0]?.finishReason; + + if (isFinishChunk) { + // This is a finish reason chunk + collectedGeminiResponses.push(response); + collectedOpenAIChunks.push(chunk); + setPendingFinish(response); + return false; // Don't yield yet, wait for potential subsequent chunks to merge + } else if (hasPendingFinish) { + // We have a pending finish chunk, merge this chunk's data into it + const lastResponse = + collectedGeminiResponses[collectedGeminiResponses.length - 1]; + const mergedResponse = new GenerateContentResponse(); + + // Keep the finish reason from the previous chunk + mergedResponse.candidates = lastResponse.candidates; + + // Merge usage metadata if this chunk has it + if (response.usageMetadata) { + mergedResponse.usageMetadata = response.usageMetadata; + } else { + mergedResponse.usageMetadata = lastResponse.usageMetadata; + } + + // Update the collected responses with the merged response + collectedGeminiResponses[collectedGeminiResponses.length - 1] = + mergedResponse; + collectedOpenAIChunks.push(chunk); + + setPendingFinish(mergedResponse); + return true; // Yield the merged response + } + + // Normal chunk - collect and yield + collectedGeminiResponses.push(response); + collectedOpenAIChunks.push(chunk); + return true; + } + private async buildRequest( request: GenerateContentParameters, userPromptId: string,