fix: relax chunk validation to avoid unnecessary retry (#584)

* fix: relax chunk validation to avoid unnecessary retry

* fix: merge tail chunks for stable finishReason and usageMetadata
This commit is contained in:
Mingholy
2025-09-12 13:25:48 +08:00
committed by GitHub
parent 38d9ee64ca
commit c3032db8b8
4 changed files with 550 additions and 31 deletions

View File

@@ -50,9 +50,13 @@ const INVALID_CONTENT_RETRY_OPTIONS: ContentRetryOptions = {
};
/**
* Returns true if the response is valid, false otherwise.
*
* The DashScope provider may return the last 2 chunks as:
* 1. A choice(candidate) with finishReason and empty content
* 2. Empty choices with usage metadata
* We'll check separately for both of these cases.
*/
function isValidResponse(response: GenerateContentResponse): boolean {
// The Dashscope provider returns empty content with usage metadata at the end of the stream
if (response.usageMetadata) {
return true;
}
@@ -61,6 +65,10 @@ function isValidResponse(response: GenerateContentResponse): boolean {
return false;
}
if (response.candidates.some((candidate) => candidate.finishReason)) {
return true;
}
const content = response.candidates[0]?.content;
return content !== undefined && isValidContent(content);
}

View File

@@ -18,6 +18,7 @@ import {
ContentListUnion,
ContentUnion,
PartUnion,
Candidate,
} from '@google/genai';
import OpenAI from 'openai';
import { safeJsonParse } from '../../utils/safeJsonParse.js';
@@ -652,19 +653,21 @@ export class OpenAIContentConverter {
this.streamingToolCallParser.reset();
}
response.candidates = [
{
content: {
parts,
role: 'model' as const,
},
finishReason: choice.finish_reason
? this.mapOpenAIFinishReasonToGemini(choice.finish_reason)
: FinishReason.FINISH_REASON_UNSPECIFIED,
index: 0,
safetyRatings: [],
// Only include finishReason key if finish_reason is present
const candidate: Candidate = {
content: {
parts,
role: 'model' as const,
},
];
index: 0,
safetyRatings: [],
};
if (choice.finish_reason) {
candidate.finishReason = this.mapOpenAIFinishReasonToGemini(
choice.finish_reason,
);
}
response.candidates = [candidate];
} else {
response.candidates = [];
}

View File

@@ -4,20 +4,21 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, it, expect, beforeEach, vi, Mock } from 'vitest';
import { describe, it, expect, beforeEach, vi, type Mock } from 'vitest';
import OpenAI from 'openai';
import {
GenerateContentParameters,
type GenerateContentParameters,
GenerateContentResponse,
Type,
FinishReason,
} from '@google/genai';
import { ContentGenerationPipeline, PipelineConfig } from './pipeline.js';
import { ContentGenerationPipeline, type PipelineConfig } from './pipeline.js';
import { OpenAIContentConverter } from './converter.js';
import { Config } from '../../config/config.js';
import { ContentGeneratorConfig, AuthType } from '../contentGenerator.js';
import { OpenAICompatibleProvider } from './provider/index.js';
import { TelemetryService } from './telemetryService.js';
import { ErrorHandler } from './errorHandler.js';
import { type ContentGeneratorConfig, AuthType } from '../contentGenerator.js';
import { type OpenAICompatibleProvider } from './provider/index.js';
import { type TelemetryService } from './telemetryService.js';
import { type ErrorHandler } from './errorHandler.js';
// Mock dependencies
vi.mock('./converter.js');
@@ -470,6 +471,418 @@ describe('ContentGenerationPipeline', () => {
request,
);
});
it('should merge finishReason and usageMetadata from separate chunks', async () => {
// Arrange
const request: GenerateContentParameters = {
model: 'test-model',
contents: [{ parts: [{ text: 'Hello' }], role: 'user' }],
};
const userPromptId = 'test-prompt-id';
// Content chunk
const mockChunk1 = {
id: 'chunk-1',
choices: [
{ delta: { content: 'Hello response' }, finish_reason: null },
],
} as OpenAI.Chat.ChatCompletionChunk;
// Finish reason chunk (empty content, has finish_reason)
const mockChunk2 = {
id: 'chunk-2',
choices: [{ delta: { content: '' }, finish_reason: 'stop' }],
} as OpenAI.Chat.ChatCompletionChunk;
// Usage metadata chunk (empty candidates, has usage)
const mockChunk3 = {
id: 'chunk-3',
object: 'chat.completion.chunk',
created: Date.now(),
model: 'test-model',
choices: [],
usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
} as OpenAI.Chat.ChatCompletionChunk;
const mockStream = {
async *[Symbol.asyncIterator]() {
yield mockChunk1;
yield mockChunk2;
yield mockChunk3;
},
};
// Mock converter responses
const mockContentResponse = new GenerateContentResponse();
mockContentResponse.candidates = [
{ content: { parts: [{ text: 'Hello response' }], role: 'model' } },
];
const mockFinishResponse = new GenerateContentResponse();
mockFinishResponse.candidates = [
{
content: { parts: [], role: 'model' },
finishReason: FinishReason.STOP,
},
];
const mockUsageResponse = new GenerateContentResponse();
mockUsageResponse.candidates = [];
mockUsageResponse.usageMetadata = {
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
};
// Expected merged response (finishReason + usageMetadata combined)
const mockMergedResponse = new GenerateContentResponse();
mockMergedResponse.candidates = [
{
content: { parts: [], role: 'model' },
finishReason: FinishReason.STOP,
},
];
mockMergedResponse.usageMetadata = {
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
};
(mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]);
(mockConverter.convertOpenAIChunkToGemini as Mock)
.mockReturnValueOnce(mockContentResponse)
.mockReturnValueOnce(mockFinishResponse)
.mockReturnValueOnce(mockUsageResponse);
(mockClient.chat.completions.create as Mock).mockResolvedValue(
mockStream,
);
// Act
const resultGenerator = await pipeline.executeStream(
request,
userPromptId,
);
const results = [];
for await (const result of resultGenerator) {
results.push(result);
}
// Assert
expect(results).toHaveLength(2); // Content chunk + merged finish/usage chunk
expect(results[0]).toBe(mockContentResponse);
// The last result should have both finishReason and usageMetadata
const lastResult = results[1];
expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP);
expect(lastResult.usageMetadata).toEqual({
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
});
expect(mockTelemetryService.logStreamingSuccess).toHaveBeenCalledWith(
expect.objectContaining({
userPromptId,
model: 'test-model',
authType: 'openai',
isStreaming: true,
}),
results,
expect.any(Object),
[mockChunk1, mockChunk2, mockChunk3],
);
});
it('should handle ideal case where last chunk has both finishReason and usageMetadata', async () => {
// Arrange
const request: GenerateContentParameters = {
model: 'test-model',
contents: [{ parts: [{ text: 'Hello' }], role: 'user' }],
};
const userPromptId = 'test-prompt-id';
// Content chunk
const mockChunk1 = {
id: 'chunk-1',
choices: [
{ delta: { content: 'Hello response' }, finish_reason: null },
],
} as OpenAI.Chat.ChatCompletionChunk;
// Final chunk with both finish_reason and usage (ideal case)
const mockChunk2 = {
id: 'chunk-2',
choices: [{ delta: { content: '' }, finish_reason: 'stop' }],
usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
} as OpenAI.Chat.ChatCompletionChunk;
const mockStream = {
async *[Symbol.asyncIterator]() {
yield mockChunk1;
yield mockChunk2;
},
};
// Mock converter responses
const mockContentResponse = new GenerateContentResponse();
mockContentResponse.candidates = [
{ content: { parts: [{ text: 'Hello response' }], role: 'model' } },
];
const mockFinalResponse = new GenerateContentResponse();
mockFinalResponse.candidates = [
{
content: { parts: [], role: 'model' },
finishReason: FinishReason.STOP,
},
];
mockFinalResponse.usageMetadata = {
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
};
(mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]);
(mockConverter.convertOpenAIChunkToGemini as Mock)
.mockReturnValueOnce(mockContentResponse)
.mockReturnValueOnce(mockFinalResponse);
(mockClient.chat.completions.create as Mock).mockResolvedValue(
mockStream,
);
// Act
const resultGenerator = await pipeline.executeStream(
request,
userPromptId,
);
const results = [];
for await (const result of resultGenerator) {
results.push(result);
}
// Assert
expect(results).toHaveLength(2);
expect(results[0]).toBe(mockContentResponse);
expect(results[1]).toBe(mockFinalResponse);
// The last result should have both finishReason and usageMetadata
const lastResult = results[1];
expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP);
expect(lastResult.usageMetadata).toEqual({
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
});
});
it('should handle providers that send zero usage in finish chunk (like modelscope)', async () => {
// Arrange
const request: GenerateContentParameters = {
model: 'test-model',
contents: [{ parts: [{ text: 'Hello' }], role: 'user' }],
};
const userPromptId = 'test-prompt-id';
// Content chunk with zero usage (typical for modelscope)
const mockChunk1 = {
id: 'chunk-1',
choices: [
{ delta: { content: 'Hello response' }, finish_reason: null },
],
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
} as OpenAI.Chat.ChatCompletionChunk;
// Finish chunk with zero usage (has finishReason but usage is all zeros)
const mockChunk2 = {
id: 'chunk-2',
choices: [{ delta: { content: '' }, finish_reason: 'stop' }],
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
} as OpenAI.Chat.ChatCompletionChunk;
// Final usage chunk with actual usage data
const mockChunk3 = {
id: 'chunk-3',
object: 'chat.completion.chunk',
created: Date.now(),
model: 'test-model',
choices: [],
usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
} as OpenAI.Chat.ChatCompletionChunk;
const mockStream = {
async *[Symbol.asyncIterator]() {
yield mockChunk1;
yield mockChunk2;
yield mockChunk3;
},
};
// Mock converter responses
const mockContentResponse = new GenerateContentResponse();
mockContentResponse.candidates = [
{ content: { parts: [{ text: 'Hello response' }], role: 'model' } },
];
// Content chunk has zero usage metadata (should be filtered or ignored)
mockContentResponse.usageMetadata = {
promptTokenCount: 0,
candidatesTokenCount: 0,
totalTokenCount: 0,
};
const mockFinishResponseWithZeroUsage = new GenerateContentResponse();
mockFinishResponseWithZeroUsage.candidates = [
{
content: { parts: [], role: 'model' },
finishReason: FinishReason.STOP,
},
];
// Finish chunk has zero usage metadata (should be treated as no usage)
mockFinishResponseWithZeroUsage.usageMetadata = {
promptTokenCount: 0,
candidatesTokenCount: 0,
totalTokenCount: 0,
};
const mockUsageResponse = new GenerateContentResponse();
mockUsageResponse.candidates = [];
mockUsageResponse.usageMetadata = {
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
};
(mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]);
(mockConverter.convertOpenAIChunkToGemini as Mock)
.mockReturnValueOnce(mockContentResponse)
.mockReturnValueOnce(mockFinishResponseWithZeroUsage)
.mockReturnValueOnce(mockUsageResponse);
(mockClient.chat.completions.create as Mock).mockResolvedValue(
mockStream,
);
// Act
const resultGenerator = await pipeline.executeStream(
request,
userPromptId,
);
const results = [];
for await (const result of resultGenerator) {
results.push(result);
}
// Assert
expect(results).toHaveLength(2); // Content chunk + merged finish/usage chunk
expect(results[0]).toBe(mockContentResponse);
// The last result should have both finishReason and valid usageMetadata
const lastResult = results[1];
expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP);
expect(lastResult.usageMetadata).toEqual({
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
});
expect(mockTelemetryService.logStreamingSuccess).toHaveBeenCalledWith(
expect.objectContaining({
userPromptId,
model: 'test-model',
authType: 'openai',
isStreaming: true,
}),
results,
expect.any(Object),
[mockChunk1, mockChunk2, mockChunk3],
);
});
it('should handle providers that send finishReason and valid usage in same chunk', async () => {
// Arrange
const request: GenerateContentParameters = {
model: 'test-model',
contents: [{ parts: [{ text: 'Hello' }], role: 'user' }],
};
const userPromptId = 'test-prompt-id';
// Content chunk with zero usage
const mockChunk1 = {
id: 'chunk-1',
choices: [
{ delta: { content: 'Hello response' }, finish_reason: null },
],
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
} as OpenAI.Chat.ChatCompletionChunk;
// Finish chunk with both finishReason and valid usage in same chunk
const mockChunk2 = {
id: 'chunk-2',
choices: [{ delta: { content: '' }, finish_reason: 'stop' }],
usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
} as OpenAI.Chat.ChatCompletionChunk;
const mockStream = {
async *[Symbol.asyncIterator]() {
yield mockChunk1;
yield mockChunk2;
},
};
// Mock converter responses
const mockContentResponse = new GenerateContentResponse();
mockContentResponse.candidates = [
{ content: { parts: [{ text: 'Hello response' }], role: 'model' } },
];
mockContentResponse.usageMetadata = {
promptTokenCount: 0,
candidatesTokenCount: 0,
totalTokenCount: 0,
};
const mockFinalResponse = new GenerateContentResponse();
mockFinalResponse.candidates = [
{
content: { parts: [], role: 'model' },
finishReason: FinishReason.STOP,
},
];
mockFinalResponse.usageMetadata = {
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
};
(mockConverter.convertGeminiRequestToOpenAI as Mock).mockReturnValue([]);
(mockConverter.convertOpenAIChunkToGemini as Mock)
.mockReturnValueOnce(mockContentResponse)
.mockReturnValueOnce(mockFinalResponse);
(mockClient.chat.completions.create as Mock).mockResolvedValue(
mockStream,
);
// Act
const resultGenerator = await pipeline.executeStream(
request,
userPromptId,
);
const results = [];
for await (const result of resultGenerator) {
results.push(result);
}
// Assert
expect(results).toHaveLength(2);
expect(results[0]).toBe(mockContentResponse);
expect(results[1]).toBe(mockFinalResponse);
// The last result should have both finishReason and valid usageMetadata
const lastResult = results[1];
expect(lastResult.candidates?.[0]?.finishReason).toBe(FinishReason.STOP);
expect(lastResult.usageMetadata).toEqual({
promptTokenCount: 10,
candidatesTokenCount: 20,
totalTokenCount: 30,
});
});
});
describe('buildRequest', () => {

View File

@@ -6,15 +6,18 @@
import OpenAI from 'openai';
import {
GenerateContentParameters,
type GenerateContentParameters,
GenerateContentResponse,
} from '@google/genai';
import { Config } from '../../config/config.js';
import { ContentGeneratorConfig } from '../contentGenerator.js';
import { type ContentGeneratorConfig } from '../contentGenerator.js';
import { type OpenAICompatibleProvider } from './provider/index.js';
import { OpenAIContentConverter } from './converter.js';
import { TelemetryService, RequestContext } from './telemetryService.js';
import { ErrorHandler } from './errorHandler.js';
import {
type TelemetryService,
type RequestContext,
} from './telemetryService.js';
import { type ErrorHandler } from './errorHandler.js';
export interface PipelineConfig {
cliConfig: Config;
@@ -96,8 +99,9 @@ export class ContentGenerationPipeline {
* This method handles the complete stream processing pipeline:
* 1. Convert OpenAI chunks to Gemini format while preserving original chunks
* 2. Filter empty responses
* 3. Collect both formats for logging
* 4. Handle success/error logging with original OpenAI format
* 3. Handle chunk merging for providers that send finishReason and usageMetadata separately
* 4. Collect both formats for logging
* 5. Handle success/error logging with original OpenAI format
*/
private async *processStreamWithLogging(
stream: AsyncIterable<OpenAI.Chat.ChatCompletionChunk>,
@@ -111,6 +115,9 @@ export class ContentGenerationPipeline {
// Reset streaming tool calls to prevent data pollution from previous streams
this.converter.resetStreamingToolCalls();
// State for handling chunk merging
let pendingFinishResponse: GenerateContentResponse | null = null;
try {
// Stage 2a: Convert and yield each chunk while preserving original
for await (const chunk of stream) {
@@ -119,18 +126,40 @@ export class ContentGenerationPipeline {
// Stage 2b: Filter empty responses to avoid downstream issues
if (
response.candidates?.[0]?.content?.parts?.length === 0 &&
!response.candidates?.[0]?.finishReason &&
!response.usageMetadata
) {
continue;
}
// Stage 2c: Collect both formats and yield Gemini format to consumer
collectedGeminiResponses.push(response);
collectedOpenAIChunks.push(chunk);
yield response;
// Stage 2c: Handle chunk merging for providers that send finishReason and usageMetadata separately
const shouldYield = this.handleChunkMerging(
response,
chunk,
collectedGeminiResponses,
collectedOpenAIChunks,
(mergedResponse) => {
pendingFinishResponse = mergedResponse;
},
);
if (shouldYield) {
// If we have a pending finish response, yield it instead
if (pendingFinishResponse) {
yield pendingFinishResponse;
pendingFinishResponse = null;
} else {
yield response;
}
}
}
// Stage 2d: Stream completed successfully - perform logging with original OpenAI chunks
// Stage 2d: If there's still a pending finish response at the end, yield it
if (pendingFinishResponse) {
yield pendingFinishResponse;
}
// Stage 2e: Stream completed successfully - perform logging with original OpenAI chunks
context.duration = Date.now() - context.startTime;
await this.config.telemetryService.logStreamingSuccess(
@@ -156,6 +185,72 @@ export class ContentGenerationPipeline {
}
}
/**
* Handle chunk merging for providers that send finishReason and usageMetadata separately.
*
* Strategy: When we encounter a finishReason chunk, we hold it and merge all subsequent
* chunks into it until the stream ends. This ensures the final chunk contains both
* finishReason and the most up-to-date usage information from any provider pattern.
*
* @param response Current Gemini response
* @param chunk Current OpenAI chunk
* @param collectedGeminiResponses Array to collect responses for logging
* @param collectedOpenAIChunks Array to collect chunks for logging
* @param setPendingFinish Callback to set pending finish response
* @returns true if the response should be yielded, false if it should be held for merging
*/
private handleChunkMerging(
response: GenerateContentResponse,
chunk: OpenAI.Chat.ChatCompletionChunk,
collectedGeminiResponses: GenerateContentResponse[],
collectedOpenAIChunks: OpenAI.Chat.ChatCompletionChunk[],
setPendingFinish: (response: GenerateContentResponse) => void,
): boolean {
const isFinishChunk = response.candidates?.[0]?.finishReason;
// Check if we have a pending finish response from previous chunks
const hasPendingFinish =
collectedGeminiResponses.length > 0 &&
collectedGeminiResponses[collectedGeminiResponses.length - 1]
.candidates?.[0]?.finishReason;
if (isFinishChunk) {
// This is a finish reason chunk
collectedGeminiResponses.push(response);
collectedOpenAIChunks.push(chunk);
setPendingFinish(response);
return false; // Don't yield yet, wait for potential subsequent chunks to merge
} else if (hasPendingFinish) {
// We have a pending finish chunk, merge this chunk's data into it
const lastResponse =
collectedGeminiResponses[collectedGeminiResponses.length - 1];
const mergedResponse = new GenerateContentResponse();
// Keep the finish reason from the previous chunk
mergedResponse.candidates = lastResponse.candidates;
// Merge usage metadata if this chunk has it
if (response.usageMetadata) {
mergedResponse.usageMetadata = response.usageMetadata;
} else {
mergedResponse.usageMetadata = lastResponse.usageMetadata;
}
// Update the collected responses with the merged response
collectedGeminiResponses[collectedGeminiResponses.length - 1] =
mergedResponse;
collectedOpenAIChunks.push(chunk);
setPendingFinish(mergedResponse);
return true; // Yield the merged response
}
// Normal chunk - collect and yield
collectedGeminiResponses.push(response);
collectedOpenAIChunks.push(chunk);
return true;
}
private async buildRequest(
request: GenerateContentParameters,
userPromptId: string,