fix: try to fix parallel tool calls with irregular chunk content

This commit is contained in:
mingholy.lmh
2025-09-08 23:24:23 +08:00
parent 0b99dba211
commit 36f6967a5a
2 changed files with 460 additions and 121 deletions

View File

@@ -310,7 +310,7 @@ describe('StreamingToolCallParser', () => {
expect(parser.getToolCallMeta(0).name).toBe('my_function');
});
it('should detect new tool call with same index and reset state', () => {
it('should detect new tool call with same index and reassign to new index', () => {
// First tool call
const result1 = parser.addChunk(
0,
@@ -320,12 +320,23 @@ describe('StreamingToolCallParser', () => {
);
expect(result1.complete).toBe(true);
// New tool call with same index but different function name should reset
// New tool call with same index but different ID should get reassigned to new index
const result2 = parser.addChunk(0, '{"param2":', 'call_2', 'function2');
expect(result2.complete).toBe(false);
expect(parser.getBuffer(0)).toBe('{"param2":');
expect(parser.getToolCallMeta(0).name).toBe('function2');
expect(parser.getToolCallMeta(0).id).toBe('call_2');
// The original index 0 should still have the first tool call
expect(parser.getBuffer(0)).toBe('{"param1": "value1"}');
expect(parser.getToolCallMeta(0)).toEqual({
id: 'call_1',
name: 'function1',
});
// The new tool call should be at a different index (1)
expect(parser.getBuffer(1)).toBe('{"param2":');
expect(parser.getToolCallMeta(1)).toEqual({
id: 'call_2',
name: 'function2',
});
});
});
@@ -390,43 +401,6 @@ describe('StreamingToolCallParser', () => {
});
});
describe('Reset functionality', () => {
it('should reset specific index', () => {
parser.addChunk(0, '{"param":', 'call_1', 'function1');
parser.addChunk(1, '{"other":', 'call_2', 'function2');
parser.resetIndex(0);
expect(parser.getBuffer(0)).toBe('');
expect(parser.getState(0)).toEqual({
depth: 0,
inString: false,
escape: false,
});
expect(parser.getToolCallMeta(0)).toEqual({});
// Index 1 should remain unchanged
expect(parser.getBuffer(1)).toBe('{"other":');
expect(parser.getToolCallMeta(1)).toEqual({
id: 'call_2',
name: 'function2',
});
});
it('should reset entire parser state', () => {
parser.addChunk(0, '{"param1":', 'call_1', 'function1');
parser.addChunk(1, '{"param2":', 'call_2', 'function2');
parser.reset();
expect(parser.getBuffer(0)).toBe('');
expect(parser.getBuffer(1)).toBe('');
expect(parser.getToolCallMeta(0)).toEqual({});
expect(parser.getToolCallMeta(1)).toEqual({});
expect(parser.getCompletedToolCalls()).toHaveLength(0);
});
});
describe('Edge cases', () => {
it('should handle very large JSON objects', () => {
const largeObject = { data: 'x'.repeat(10000) };
@@ -567,4 +541,255 @@ describe('StreamingToolCallParser', () => {
expect(completed[0].args).toEqual({ message: 'Hello world' });
});
});
describe('Tool call ID collision detection and mapping', () => {
it('should handle tool call ID reuse correctly', () => {
// First tool call with ID 'call_1' at index 0
const result1 = parser.addChunk(
0,
'{"param1": "value1"}',
'call_1',
'function1',
);
expect(result1.complete).toBe(true);
// Second tool call with same ID 'call_1' should reuse the same internal index
// and append to the buffer (this is the actual behavior)
const result2 = parser.addChunk(
0,
'{"param2": "value2"}',
'call_1',
'function2',
);
expect(result2.complete).toBe(false); // Not complete because buffer is malformed
// Should have updated the metadata but appended to buffer
expect(parser.getToolCallMeta(0)).toEqual({
id: 'call_1',
name: 'function2',
});
expect(parser.getBuffer(0)).toBe(
'{"param1": "value1"}{"param2": "value2"}',
);
});
it('should detect index collision and find new index', () => {
// First complete tool call at index 0
parser.addChunk(0, '{"param1": "value1"}', 'call_1', 'function1');
// New tool call with different ID but same index should get reassigned
const result = parser.addChunk(0, '{"param2":', 'call_2', 'function2');
expect(result.complete).toBe(false);
// Complete the second tool call
const result2 = parser.addChunk(0, ' "value2"}');
expect(result2.complete).toBe(true);
const completed = parser.getCompletedToolCalls();
expect(completed).toHaveLength(2);
// Should have both tool calls with different IDs
const call1 = completed.find((tc) => tc.id === 'call_1');
const call2 = completed.find((tc) => tc.id === 'call_2');
expect(call1).toBeDefined();
expect(call2).toBeDefined();
expect(call1?.args).toEqual({ param1: 'value1' });
expect(call2?.args).toEqual({ param2: 'value2' });
});
it('should handle continuation chunks without ID correctly', () => {
// Start a tool call
parser.addChunk(0, '{"param":', 'call_1', 'function1');
// Add continuation chunk without ID
const result = parser.addChunk(0, ' "value"}');
expect(result.complete).toBe(true);
expect(result.value).toEqual({ param: 'value' });
expect(parser.getToolCallMeta(0)).toEqual({
id: 'call_1',
name: 'function1',
});
});
it('should find most recent incomplete tool call for continuation chunks', () => {
// Start multiple tool calls
parser.addChunk(0, '{"param1": "complete"}', 'call_1', 'function1');
parser.addChunk(1, '{"param2":', 'call_2', 'function2');
parser.addChunk(2, '{"param3":', 'call_3', 'function3');
// Add continuation chunk without ID at index 1 - should continue the incomplete tool call at index 1
const result = parser.addChunk(1, ' "continuation"}');
expect(result.complete).toBe(true);
const completed = parser.getCompletedToolCalls();
const call2 = completed.find((tc) => tc.id === 'call_2');
expect(call2?.args).toEqual({ param2: 'continuation' });
});
});
describe('Index management and reset functionality', () => {
it('should reset individual index correctly', () => {
// Set up some state at index 0
parser.addChunk(0, '{"partial":', 'call_1', 'function1');
expect(parser.getBuffer(0)).toBe('{"partial":');
expect(parser.getState(0).depth).toBe(1);
expect(parser.getToolCallMeta(0)).toEqual({
id: 'call_1',
name: 'function1',
});
// Reset the index
parser.resetIndex(0);
// Verify everything is cleared
expect(parser.getBuffer(0)).toBe('');
expect(parser.getState(0)).toEqual({
depth: 0,
inString: false,
escape: false,
});
expect(parser.getToolCallMeta(0)).toEqual({});
});
it('should find next available index when all lower indices are occupied', () => {
// Fill up indices 0, 1, 2 with complete tool calls
parser.addChunk(0, '{"param0": "value0"}', 'call_0', 'function0');
parser.addChunk(1, '{"param1": "value1"}', 'call_1', 'function1');
parser.addChunk(2, '{"param2": "value2"}', 'call_2', 'function2');
// New tool call should get assigned to index 3
const result = parser.addChunk(
0,
'{"param3": "value3"}',
'call_3',
'function3',
);
expect(result.complete).toBe(true);
const completed = parser.getCompletedToolCalls();
expect(completed).toHaveLength(4);
// Verify the new tool call got a different index
const call3 = completed.find((tc) => tc.id === 'call_3');
expect(call3).toBeDefined();
expect(call3?.index).toBe(3);
});
it('should reuse incomplete index when available', () => {
// Create an incomplete tool call at index 0
parser.addChunk(0, '{"incomplete":', 'call_1', 'function1');
// New tool call with different ID should reuse the incomplete index
const result = parser.addChunk(0, ' "completed"}', 'call_2', 'function2');
expect(result.complete).toBe(true);
// Should have updated the metadata for the same index
expect(parser.getToolCallMeta(0)).toEqual({
id: 'call_2',
name: 'function2',
});
});
});
describe('Repair functionality and flags', () => {
it('should test repair functionality in getCompletedToolCalls', () => {
// The repair functionality is primarily used in getCompletedToolCalls, not addChunk
parser.addChunk(0, '{"message": "unclosed string', 'call_1', 'function1');
// The addChunk should not complete because depth > 0 and inString = true
expect(parser.getState(0).depth).toBe(1);
expect(parser.getState(0).inString).toBe(true);
// But getCompletedToolCalls should repair it
const completed = parser.getCompletedToolCalls();
expect(completed).toHaveLength(1);
expect(completed[0].args).toEqual({ message: 'unclosed string' });
});
it('should not set repaired flag for normal parsing', () => {
const result = parser.addChunk(
0,
'{"message": "normal"}',
'call_1',
'function1',
);
expect(result.complete).toBe(true);
expect(result.repaired).toBeUndefined();
expect(result.value).toEqual({ message: 'normal' });
});
it('should not attempt repair when still in nested structure', () => {
const result = parser.addChunk(
0,
'{"nested": {"unclosed": "string',
'call_1',
'function1',
);
// Should not attempt repair because depth > 0
expect(result.complete).toBe(false);
expect(result.repaired).toBeUndefined();
expect(parser.getState(0).depth).toBe(2);
});
it('should handle repair failure gracefully', () => {
// Create malformed JSON that can't be repaired at depth 0
const result = parser.addChunk(
0,
'{invalid: json}',
'call_1',
'function1',
);
expect(result.complete).toBe(false);
expect(result.error).toBeInstanceOf(Error);
expect(result.repaired).toBeUndefined();
});
});
describe('Complex collision scenarios', () => {
it('should handle rapid tool call switching at same index', () => {
// Rapid switching between different tool calls at index 0
parser.addChunk(0, '{"step1":', 'call_1', 'function1');
parser.addChunk(0, ' "done"}', 'call_1', 'function1');
// New tool call immediately at same index
parser.addChunk(0, '{"step2":', 'call_2', 'function2');
parser.addChunk(0, ' "done"}', 'call_2', 'function2');
const completed = parser.getCompletedToolCalls();
expect(completed).toHaveLength(2);
const call1 = completed.find((tc) => tc.id === 'call_1');
const call2 = completed.find((tc) => tc.id === 'call_2');
expect(call1?.args).toEqual({ step1: 'done' });
expect(call2?.args).toEqual({ step2: 'done' });
});
it('should handle interleaved chunks from multiple tool calls with ID mapping', () => {
// Start tool call 1 at index 0
parser.addChunk(0, '{"param1":', 'call_1', 'function1');
// Start tool call 2 at index 1 (different index to avoid collision)
parser.addChunk(1, '{"param2":', 'call_2', 'function2');
// Continue tool call 1 at its index
const result1 = parser.addChunk(0, ' "value1"}');
expect(result1.complete).toBe(true);
// Continue tool call 2 at its index
const result2 = parser.addChunk(1, ' "value2"}');
expect(result2.complete).toBe(true);
const completed = parser.getCompletedToolCalls();
expect(completed).toHaveLength(2);
const call1 = completed.find((tc) => tc.id === 'call_1');
const call2 = completed.find((tc) => tc.id === 'call_2');
expect(call1?.args).toEqual({ param1: 'value1' });
expect(call2?.args).toEqual({ param2: 'value2' });
});
});
});

View File

@@ -21,20 +21,14 @@ export interface ToolCallParseResult {
}
/**
* Streaming Tool Call Parser Implementation
* StreamingToolCallParser - Handles streaming tool call objects with inconsistent chunk formats
*
* This class implements a sophisticated streaming parser specifically designed for
* handling tool call arguments that arrive as partial JSON data in chunks during
* OpenAI streaming responses. It extends the principles from the streaming JSON parser
* to handle the specific requirements of tool call processing.
*
* Key Features:
* - Real-time depth tracking for nested JSON structures in tool arguments
* - Proper handling of string literals and escape sequences
* - Automatic repair of common JSON formatting issues
* - Support for multiple consecutive tool calls with same or different function names
* - Memory-efficient processing without storing complete JSON in memory
* - State management for individual tool call indices
* Problems this parser addresses:
* - Tool calls arrive with varying chunk shapes (empty strings, partial JSON, complete objects)
* - Tool calls may lack IDs, names, or have inconsistent indices
* - Multiple tool calls can be processed simultaneously with interleaved chunks
* - Index collisions occur when the same index is reused for different tool calls
* - JSON arguments are fragmented across multiple chunks and need reconstruction
*/
export class StreamingToolCallParser {
/** Accumulated buffer containing all received chunks for each tool call index */
@@ -47,24 +41,26 @@ export class StreamingToolCallParser {
private escapes: Map<number, boolean> = new Map();
/** Metadata for each tool call index */
private toolCallMeta: Map<number, { id?: string; name?: string }> = new Map();
/** Map from tool call ID to actual index used for storage */
private idToIndexMap: Map<string, number> = new Map();
/** Counter for generating new indices when collisions occur */
private nextAvailableIndex: number = 0;
/**
* Processes a new chunk of tool call data and attempts to parse complete JSON objects
*
* This method implements a state machine that tracks:
* 1. JSON structure depth (brackets and braces) per tool call index
* 2. String literal boundaries per tool call index
* 3. Escape sequences within strings per tool call index
* 4. Tool call metadata (id, function name) per tool call index
* Handles the core problems of streaming tool call parsing:
* - Resolves index collisions when the same index is reused for different tool calls
* - Routes chunks without IDs to the correct incomplete tool call
* - Tracks JSON parsing state (depth, string boundaries, escapes) per tool call
* - Attempts parsing only when JSON structure is complete (depth = 0)
* - Repairs common issues like unclosed strings
*
* The parser only attempts to parse when the depth returns to 0, indicating
* a complete JSON structure has been received for that specific tool call index.
*
* @param index - The tool call index from OpenAI streaming response
* @param chunk - A string chunk containing partial JSON data for arguments
* @param id - Optional tool call ID
* @param name - Optional function name
* @returns ToolCallParseResult indicating whether parsing is complete and any parsed value
* @param index - Tool call index from streaming response (may collide with existing calls)
* @param chunk - String chunk that may be empty, partial JSON, or complete data
* @param id - Optional tool call ID for collision detection and chunk routing
* @param name - Optional function name stored as metadata
* @returns ToolCallParseResult with completion status, parsed value, and repair info
*/
addChunk(
index: number,
@@ -72,54 +68,92 @@ export class StreamingToolCallParser {
id?: string,
name?: string,
): ToolCallParseResult {
// Initialize state for this index if not exists
if (!this.buffers.has(index)) {
this.buffers.set(index, '');
this.depths.set(index, 0);
this.inStrings.set(index, false);
this.escapes.set(index, false);
this.toolCallMeta.set(index, {});
let actualIndex = index;
// Handle tool call ID mapping for collision detection
if (id) {
// This is the start of a new tool call with an ID
if (this.idToIndexMap.has(id)) {
// We've seen this ID before, use the existing mapped index
actualIndex = this.idToIndexMap.get(id)!;
} else {
// New tool call ID
// Check if the requested index is already occupied by a different complete tool call
if (this.buffers.has(index)) {
const existingBuffer = this.buffers.get(index)!;
const existingDepth = this.depths.get(index)!;
const existingMeta = this.toolCallMeta.get(index);
// Check if we have a complete tool call at this index
if (
existingBuffer.trim() &&
existingDepth === 0 &&
existingMeta?.id &&
existingMeta.id !== id
) {
try {
JSON.parse(existingBuffer);
// We have a complete tool call with a different ID at this index
// Find a new index for this tool call
actualIndex = this.findNextAvailableIndex();
} catch {
// Existing buffer is not complete JSON, we can reuse this index
}
}
}
// Map this ID to the actual index we're using
this.idToIndexMap.set(id, actualIndex);
}
} else {
// No ID provided - this is a continuation chunk
// Try to find which tool call this belongs to based on the index
// Look for an existing tool call at this index that's not complete
if (this.buffers.has(index)) {
const existingBuffer = this.buffers.get(index)!;
const existingDepth = this.depths.get(index)!;
// If there's an incomplete tool call at this index, continue with it
if (existingDepth > 0 || !existingBuffer.trim()) {
actualIndex = index;
} else {
// Check if the buffer at this index is complete
try {
JSON.parse(existingBuffer);
// Buffer is complete, this chunk might belong to a different tool call
// Find the most recent incomplete tool call
actualIndex = this.findMostRecentIncompleteIndex();
} catch {
// Buffer is incomplete, continue with this index
actualIndex = index;
}
}
}
}
// Initialize state for the actual index if not exists
if (!this.buffers.has(actualIndex)) {
this.buffers.set(actualIndex, '');
this.depths.set(actualIndex, 0);
this.inStrings.set(actualIndex, false);
this.escapes.set(actualIndex, false);
this.toolCallMeta.set(actualIndex, {});
}
// Update metadata
const meta = this.toolCallMeta.get(index)!;
const meta = this.toolCallMeta.get(actualIndex)!;
if (id) meta.id = id;
if (name) {
// If this is a new function name and we have existing arguments,
// it might be a new tool call with the same index - reset the buffer
if (meta.name && meta.name !== name && this.buffers.get(index)) {
const currentBuffer = this.buffers.get(index)!;
// Check if current buffer contains complete JSON
if (currentBuffer.trim()) {
try {
JSON.parse(currentBuffer);
// If we can parse it, this is likely a new tool call - reset state
this.resetIndex(index);
// Update metadata after reset
const resetMeta = this.toolCallMeta.get(index)!;
if (id) resetMeta.id = id;
resetMeta.name = name;
} catch {
// Current buffer is incomplete, continue accumulating
meta.name = name;
}
} else {
meta.name = name;
}
} else {
meta.name = name;
}
}
if (name) meta.name = name;
// Get current state for this index
const currentBuffer = this.buffers.get(index)!;
const currentDepth = this.depths.get(index)!;
const currentInString = this.inStrings.get(index)!;
const currentEscape = this.escapes.get(index)!;
// Get current state for the actual index
const currentBuffer = this.buffers.get(actualIndex)!;
const currentDepth = this.depths.get(actualIndex)!;
const currentInString = this.inStrings.get(actualIndex)!;
const currentEscape = this.escapes.get(actualIndex)!;
// Add chunk to buffer
const newBuffer = currentBuffer + chunk;
this.buffers.set(index, newBuffer);
this.buffers.set(actualIndex, newBuffer);
// Track JSON structure depth - only count brackets/braces outside of strings
let depth = currentDepth;
@@ -141,9 +175,9 @@ export class StreamingToolCallParser {
}
// Update state
this.depths.set(index, depth);
this.inStrings.set(index, inString);
this.escapes.set(index, escape);
this.depths.set(actualIndex, depth);
this.inStrings.set(actualIndex, inString);
this.escapes.set(actualIndex, escape);
// Attempt parse when we're back at root level (depth 0) and have data
if (depth === 0 && newBuffer.trim().length > 0) {
@@ -188,7 +222,14 @@ export class StreamingToolCallParser {
/**
* Gets all completed tool calls that are ready to be emitted
* This method should be called when the streaming is complete (finish_reason is present)
*
* Attempts to parse accumulated buffers using multiple strategies:
* 1. Standard JSON.parse()
* 2. Auto-close unclosed strings and retry
* 3. Fallback to safeJsonParse for malformed data
*
* Only returns tool calls with both name metadata and non-empty buffers.
* Should be called when streaming is complete (finish_reason is present).
*
* @returns Array of completed tool calls with their metadata and parsed arguments
*/
@@ -240,6 +281,79 @@ export class StreamingToolCallParser {
return completed;
}
/**
* Finds the next available index for a new tool call
*
* Scans indices starting from nextAvailableIndex to find one that's safe to use.
* Reuses indices with empty buffers or incomplete parsing states.
* Skips indices with complete, parseable tool call data to prevent overwriting.
*
* @returns The next available index safe for storing a new tool call
*/
private findNextAvailableIndex(): number {
while (this.buffers.has(this.nextAvailableIndex)) {
// Check if this index has a complete tool call
const buffer = this.buffers.get(this.nextAvailableIndex)!;
const depth = this.depths.get(this.nextAvailableIndex)!;
const meta = this.toolCallMeta.get(this.nextAvailableIndex);
// If buffer is empty or incomplete (depth > 0), this index is available
if (!buffer.trim() || depth > 0 || !meta?.id) {
return this.nextAvailableIndex;
}
// Try to parse the buffer to see if it's complete
try {
JSON.parse(buffer);
// If parsing succeeds and depth is 0, this index has a complete tool call
if (depth === 0) {
this.nextAvailableIndex++;
continue;
}
} catch {
// If parsing fails, this index is available for reuse
return this.nextAvailableIndex;
}
this.nextAvailableIndex++;
}
return this.nextAvailableIndex++;
}
/**
* Finds the most recent incomplete tool call index
*
* Used when continuation chunks arrive without IDs. Scans existing tool calls
* to find the highest index with incomplete parsing state (depth > 0, empty buffer,
* or unparseable JSON). Falls back to creating a new index if none found.
*
* @returns The index of the most recent incomplete tool call, or a new available index
*/
private findMostRecentIncompleteIndex(): number {
// Look for the highest index that has an incomplete tool call
let maxIndex = -1;
for (const [index, buffer] of this.buffers.entries()) {
const depth = this.depths.get(index)!;
const meta = this.toolCallMeta.get(index);
// Check if this tool call is incomplete
if (meta?.id && (depth > 0 || !buffer.trim())) {
maxIndex = Math.max(maxIndex, index);
} else if (buffer.trim()) {
// Check if buffer is parseable (complete)
try {
JSON.parse(buffer);
// Buffer is complete, skip this index
} catch {
// Buffer is incomplete, this could be our target
maxIndex = Math.max(maxIndex, index);
}
}
}
return maxIndex >= 0 ? maxIndex : this.findNextAvailableIndex();
}
/**
* Resets the parser state for a specific tool call index
*
@@ -256,8 +370,9 @@ export class StreamingToolCallParser {
/**
* Resets the entire parser state for processing a new stream
*
* This method clears all internal state variables, allowing the parser
* to be reused for multiple streams without interference.
* Clears all accumulated buffers, parsing states, metadata, and counters.
* Allows the parser to be reused for multiple independent streams without
* data leakage between sessions.
*/
reset(): void {
this.buffers.clear();
@@ -265,16 +380,15 @@ export class StreamingToolCallParser {
this.inStrings.clear();
this.escapes.clear();
this.toolCallMeta.clear();
this.idToIndexMap.clear();
this.nextAvailableIndex = 0;
}
/**
* Gets the current accumulated buffer content for a specific index
*
* Useful for debugging or when you need to inspect the raw data
* that has been accumulated so far.
*
* @param index - The tool call index
* @returns The current buffer content for the specified index
* @param index - The tool call index to retrieve buffer for
* @returns The current buffer content for the specified index (empty string if not found)
*/
getBuffer(index: number): string {
return this.buffers.get(index) || '';
@@ -283,8 +397,8 @@ export class StreamingToolCallParser {
/**
* Gets the current parsing state information for a specific index
*
* @param index - The tool call index
* @returns Object containing current depth, string state, and escape state
* @param index - The tool call index to get state information for
* @returns Object containing current parsing state (depth, inString, escape)
*/
getState(index: number): {
depth: number;