Fix(core): Fix stream validation logic (#7832)

This commit is contained in:
Sandy Tao
2025-09-05 16:22:54 -07:00
parent a2a3c66e28
commit ddd4659d10
2 changed files with 65 additions and 55 deletions

View File

@@ -383,9 +383,9 @@ describe('GeminiChat', () => {
expect(modelTurn?.parts![0]!.functionCall).toBeDefined();
});
it('should succeed if the stream ends with an empty part but has a valid finishReason', async () => {
// 1. Mock a stream that ends with an invalid part but has a 'STOP' finish reason.
const streamWithValidFinish = (async function* () {
it('should fail if the stream ends with an empty part and has no finishReason', async () => {
// 1. Mock a stream that ends with an invalid part and has no finish reason.
const streamWithNoFinish = (async function* () {
yield {
candidates: [
{
@@ -396,7 +396,7 @@ describe('GeminiChat', () => {
},
],
} as unknown as GenerateContentResponse;
// This second chunk is invalid, but the finishReason should save it from retrying.
// This second chunk is invalid and has no finishReason, so it should fail.
yield {
candidates: [
{
@@ -404,21 +404,19 @@ describe('GeminiChat', () => {
role: 'model',
parts: [{ text: '' }],
},
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
vi.mocked(mockModelsModule.generateContentStream).mockResolvedValue(
streamWithValidFinish,
streamWithNoFinish,
);
// 2. Action & Assert: The stream should complete successfully because the valid
// finishReason overrides the invalid final chunk.
// 2. Action & Assert: The stream should fail because there's no finish reason.
const stream = await chat.sendMessageStream(
{ message: 'test message' },
'prompt-id-valid-finish-empty-end',
'prompt-id-no-finish-empty-end',
);
await expect(
(async () => {
@@ -426,14 +424,7 @@ describe('GeminiChat', () => {
/* consume stream */
}
})(),
).resolves.not.toThrow();
// 3. Verify history was recorded correctly
const history = chat.getHistory();
expect(history.length).toBe(2);
const modelTurn = history[1]!;
expect(modelTurn?.parts?.length).toBe(1); // The empty part is discarded
expect(modelTurn?.parts![0]!.text).toBe('Initial content...');
).rejects.toThrow(EmptyStreamError);
});
it('should not consolidate text into a part that also contains a functionCall', async () => {
// 1. Mock the API to stream a malformed part followed by a valid text part.
@@ -509,7 +500,10 @@ describe('GeminiChat', () => {
// as the important part is consolidating what comes after.
yield {
candidates: [
{ content: { role: 'model', parts: [{ text: ' World!' }] } },
{
content: { role: 'model', parts: [{ text: ' World!' }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
@@ -612,6 +606,7 @@ describe('GeminiChat', () => {
{ text: 'This is the visible text that should not be lost.' },
],
},
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
@@ -672,7 +667,10 @@ describe('GeminiChat', () => {
const emptyStreamResponse = (async function* () {
yield {
candidates: [
{ content: { role: 'model', parts: [{ thought: true }] } },
{
content: { role: 'model', parts: [{ thought: true }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
@@ -942,7 +940,12 @@ describe('GeminiChat', () => {
// Second attempt (the retry): A minimal valid stream.
(async function* () {
yield {
candidates: [{ content: { parts: [{ text: 'Success' }] } }],
candidates: [
{
content: { parts: [{ text: 'Success' }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})(),
);
@@ -979,7 +982,10 @@ describe('GeminiChat', () => {
(async function* () {
yield {
candidates: [
{ content: { parts: [{ text: 'Successful response' }] } },
{
content: { parts: [{ text: 'Successful response' }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})(),
@@ -1090,7 +1096,12 @@ describe('GeminiChat', () => {
// Second attempt succeeds
(async function* () {
yield {
candidates: [{ content: { parts: [{ text: 'Second answer' }] } }],
candidates: [
{
content: { parts: [{ text: 'Second answer' }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})(),
);
@@ -1239,6 +1250,7 @@ describe('GeminiChat', () => {
content: {
parts: [{ text: 'Successful response after empty' }],
},
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
@@ -1300,13 +1312,23 @@ describe('GeminiChat', () => {
} as unknown as GenerateContentResponse;
await firstStreamContinuePromise; // Pause the stream
yield {
candidates: [{ content: { parts: [{ text: ' part 2' }] } }],
candidates: [
{
content: { parts: [{ text: ' part 2' }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
const secondStreamGenerator = (async function* () {
yield {
candidates: [{ content: { parts: [{ text: 'second response' }] } }],
candidates: [
{
content: { parts: [{ text: 'second response' }] },
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;
})();
@@ -1391,6 +1413,7 @@ describe('GeminiChat', () => {
content: {
parts: [{ text: 'Successful final response' }],
},
finishReason: 'STOP',
},
],
} as unknown as GenerateContentResponse;

View File

@@ -581,21 +581,14 @@ export class GeminiChat {
let hasReceivedAnyChunk = false;
let hasToolCall = false;
let lastChunk: GenerateContentResponse | null = null;
let isStreamInvalid = false;
let firstInvalidChunkEncountered = false;
let validChunkAfterInvalidEncountered = false;
let lastChunkIsInvalid = false;
for await (const chunk of streamResponse) {
hasReceivedAnyChunk = true;
lastChunk = chunk;
if (isValidResponse(chunk)) {
if (firstInvalidChunkEncountered) {
// A valid chunk appeared *after* an invalid one.
validChunkAfterInvalidEncountered = true;
}
lastChunkIsInvalid = false;
const content = chunk.candidates?.[0]?.content;
if (content?.parts) {
modelResponseParts.push(...content.parts);
@@ -608,8 +601,7 @@ export class GeminiChat {
this.config,
new InvalidChunkEvent('Invalid chunk received from stream.'),
);
isStreamInvalid = true;
firstInvalidChunkEncountered = true;
lastChunkIsInvalid = true;
}
yield chunk;
}
@@ -618,27 +610,22 @@ export class GeminiChat {
throw new EmptyStreamError('Model stream completed without any chunks.');
}
// --- FIX: The entire validation block was restructured for clarity and correctness ---
// Only apply complex validation if an invalid chunk was actually found.
if (isStreamInvalid) {
// Fail immediately if an invalid chunk was not the absolute last chunk.
if (validChunkAfterInvalidEncountered) {
throw new EmptyStreamError(
'Model stream had invalid intermediate chunks without a tool call.',
);
}
const hasFinishReason = lastChunk?.candidates?.some(
(candidate) => candidate.finishReason,
);
if (!hasToolCall) {
// If the *only* invalid part was the last chunk, we still check its finish reason.
const finishReason = lastChunk?.candidates?.[0]?.finishReason;
const isSuccessfulFinish =
finishReason === 'STOP' || finishReason === 'MAX_TOKENS';
if (!isSuccessfulFinish) {
throw new EmptyStreamError(
'Model stream ended with an invalid chunk and a failed finish reason.',
);
}
}
// --- FIX: The entire validation block was restructured for clarity and correctness ---
// Stream validation logic: A stream is considered successful if:
// 1. There's a tool call (tool calls can end without explicit finish reasons), OR
// 2. Both conditions are met: last chunk is valid AND any candidate has a finish reason
//
// We throw an error only when there's no tool call AND either:
// - The last chunk is invalid, OR
// - No candidate in the last chunk has a finish reason
if (!hasToolCall && (lastChunkIsInvalid || !hasFinishReason)) {
throw new EmptyStreamError(
'Model stream ended with an invalid chunk or missing finish reason.',
);
}
// Bundle all streamed parts into a single Content object