diff --git a/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.test.ts b/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.test.ts index 10d1dff9..dea1f638 100644 --- a/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.test.ts +++ b/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.test.ts @@ -310,7 +310,7 @@ describe('StreamingToolCallParser', () => { expect(parser.getToolCallMeta(0).name).toBe('my_function'); }); - it('should detect new tool call with same index and reset state', () => { + it('should detect new tool call with same index and reassign to new index', () => { // First tool call const result1 = parser.addChunk( 0, @@ -320,12 +320,23 @@ describe('StreamingToolCallParser', () => { ); expect(result1.complete).toBe(true); - // New tool call with same index but different function name should reset + // New tool call with same index but different ID should get reassigned to new index const result2 = parser.addChunk(0, '{"param2":', 'call_2', 'function2'); expect(result2.complete).toBe(false); - expect(parser.getBuffer(0)).toBe('{"param2":'); - expect(parser.getToolCallMeta(0).name).toBe('function2'); - expect(parser.getToolCallMeta(0).id).toBe('call_2'); + + // The original index 0 should still have the first tool call + expect(parser.getBuffer(0)).toBe('{"param1": "value1"}'); + expect(parser.getToolCallMeta(0)).toEqual({ + id: 'call_1', + name: 'function1', + }); + + // The new tool call should be at a different index (1) + expect(parser.getBuffer(1)).toBe('{"param2":'); + expect(parser.getToolCallMeta(1)).toEqual({ + id: 'call_2', + name: 'function2', + }); }); }); @@ -390,43 +401,6 @@ describe('StreamingToolCallParser', () => { }); }); - describe('Reset functionality', () => { - it('should reset specific index', () => { - parser.addChunk(0, '{"param":', 'call_1', 'function1'); - parser.addChunk(1, '{"other":', 'call_2', 'function2'); - - parser.resetIndex(0); - - expect(parser.getBuffer(0)).toBe(''); - expect(parser.getState(0)).toEqual({ - depth: 0, - inString: false, - escape: false, - }); - expect(parser.getToolCallMeta(0)).toEqual({}); - - // Index 1 should remain unchanged - expect(parser.getBuffer(1)).toBe('{"other":'); - expect(parser.getToolCallMeta(1)).toEqual({ - id: 'call_2', - name: 'function2', - }); - }); - - it('should reset entire parser state', () => { - parser.addChunk(0, '{"param1":', 'call_1', 'function1'); - parser.addChunk(1, '{"param2":', 'call_2', 'function2'); - - parser.reset(); - - expect(parser.getBuffer(0)).toBe(''); - expect(parser.getBuffer(1)).toBe(''); - expect(parser.getToolCallMeta(0)).toEqual({}); - expect(parser.getToolCallMeta(1)).toEqual({}); - expect(parser.getCompletedToolCalls()).toHaveLength(0); - }); - }); - describe('Edge cases', () => { it('should handle very large JSON objects', () => { const largeObject = { data: 'x'.repeat(10000) }; @@ -567,4 +541,255 @@ describe('StreamingToolCallParser', () => { expect(completed[0].args).toEqual({ message: 'Hello world' }); }); }); + + describe('Tool call ID collision detection and mapping', () => { + it('should handle tool call ID reuse correctly', () => { + // First tool call with ID 'call_1' at index 0 + const result1 = parser.addChunk( + 0, + '{"param1": "value1"}', + 'call_1', + 'function1', + ); + expect(result1.complete).toBe(true); + + // Second tool call with same ID 'call_1' should reuse the same internal index + // and append to the buffer (this is the actual behavior) + const result2 = parser.addChunk( + 0, + '{"param2": "value2"}', + 'call_1', + 'function2', + ); + expect(result2.complete).toBe(false); // Not complete because buffer is malformed + + // Should have updated the metadata but appended to buffer + expect(parser.getToolCallMeta(0)).toEqual({ + id: 'call_1', + name: 'function2', + }); + expect(parser.getBuffer(0)).toBe( + '{"param1": "value1"}{"param2": "value2"}', + ); + }); + + it('should detect index collision and find new index', () => { + // First complete tool call at index 0 + parser.addChunk(0, '{"param1": "value1"}', 'call_1', 'function1'); + + // New tool call with different ID but same index should get reassigned + const result = parser.addChunk(0, '{"param2":', 'call_2', 'function2'); + expect(result.complete).toBe(false); + + // Complete the second tool call + const result2 = parser.addChunk(0, ' "value2"}'); + expect(result2.complete).toBe(true); + + const completed = parser.getCompletedToolCalls(); + expect(completed).toHaveLength(2); + + // Should have both tool calls with different IDs + const call1 = completed.find((tc) => tc.id === 'call_1'); + const call2 = completed.find((tc) => tc.id === 'call_2'); + expect(call1).toBeDefined(); + expect(call2).toBeDefined(); + expect(call1?.args).toEqual({ param1: 'value1' }); + expect(call2?.args).toEqual({ param2: 'value2' }); + }); + + it('should handle continuation chunks without ID correctly', () => { + // Start a tool call + parser.addChunk(0, '{"param":', 'call_1', 'function1'); + + // Add continuation chunk without ID + const result = parser.addChunk(0, ' "value"}'); + expect(result.complete).toBe(true); + expect(result.value).toEqual({ param: 'value' }); + + expect(parser.getToolCallMeta(0)).toEqual({ + id: 'call_1', + name: 'function1', + }); + }); + + it('should find most recent incomplete tool call for continuation chunks', () => { + // Start multiple tool calls + parser.addChunk(0, '{"param1": "complete"}', 'call_1', 'function1'); + parser.addChunk(1, '{"param2":', 'call_2', 'function2'); + parser.addChunk(2, '{"param3":', 'call_3', 'function3'); + + // Add continuation chunk without ID at index 1 - should continue the incomplete tool call at index 1 + const result = parser.addChunk(1, ' "continuation"}'); + expect(result.complete).toBe(true); + + const completed = parser.getCompletedToolCalls(); + const call2 = completed.find((tc) => tc.id === 'call_2'); + expect(call2?.args).toEqual({ param2: 'continuation' }); + }); + }); + + describe('Index management and reset functionality', () => { + it('should reset individual index correctly', () => { + // Set up some state at index 0 + parser.addChunk(0, '{"partial":', 'call_1', 'function1'); + expect(parser.getBuffer(0)).toBe('{"partial":'); + expect(parser.getState(0).depth).toBe(1); + expect(parser.getToolCallMeta(0)).toEqual({ + id: 'call_1', + name: 'function1', + }); + + // Reset the index + parser.resetIndex(0); + + // Verify everything is cleared + expect(parser.getBuffer(0)).toBe(''); + expect(parser.getState(0)).toEqual({ + depth: 0, + inString: false, + escape: false, + }); + expect(parser.getToolCallMeta(0)).toEqual({}); + }); + + it('should find next available index when all lower indices are occupied', () => { + // Fill up indices 0, 1, 2 with complete tool calls + parser.addChunk(0, '{"param0": "value0"}', 'call_0', 'function0'); + parser.addChunk(1, '{"param1": "value1"}', 'call_1', 'function1'); + parser.addChunk(2, '{"param2": "value2"}', 'call_2', 'function2'); + + // New tool call should get assigned to index 3 + const result = parser.addChunk( + 0, + '{"param3": "value3"}', + 'call_3', + 'function3', + ); + expect(result.complete).toBe(true); + + const completed = parser.getCompletedToolCalls(); + expect(completed).toHaveLength(4); + + // Verify the new tool call got a different index + const call3 = completed.find((tc) => tc.id === 'call_3'); + expect(call3).toBeDefined(); + expect(call3?.index).toBe(3); + }); + + it('should reuse incomplete index when available', () => { + // Create an incomplete tool call at index 0 + parser.addChunk(0, '{"incomplete":', 'call_1', 'function1'); + + // New tool call with different ID should reuse the incomplete index + const result = parser.addChunk(0, ' "completed"}', 'call_2', 'function2'); + expect(result.complete).toBe(true); + + // Should have updated the metadata for the same index + expect(parser.getToolCallMeta(0)).toEqual({ + id: 'call_2', + name: 'function2', + }); + }); + }); + + describe('Repair functionality and flags', () => { + it('should test repair functionality in getCompletedToolCalls', () => { + // The repair functionality is primarily used in getCompletedToolCalls, not addChunk + parser.addChunk(0, '{"message": "unclosed string', 'call_1', 'function1'); + + // The addChunk should not complete because depth > 0 and inString = true + expect(parser.getState(0).depth).toBe(1); + expect(parser.getState(0).inString).toBe(true); + + // But getCompletedToolCalls should repair it + const completed = parser.getCompletedToolCalls(); + expect(completed).toHaveLength(1); + expect(completed[0].args).toEqual({ message: 'unclosed string' }); + }); + + it('should not set repaired flag for normal parsing', () => { + const result = parser.addChunk( + 0, + '{"message": "normal"}', + 'call_1', + 'function1', + ); + + expect(result.complete).toBe(true); + expect(result.repaired).toBeUndefined(); + expect(result.value).toEqual({ message: 'normal' }); + }); + + it('should not attempt repair when still in nested structure', () => { + const result = parser.addChunk( + 0, + '{"nested": {"unclosed": "string', + 'call_1', + 'function1', + ); + + // Should not attempt repair because depth > 0 + expect(result.complete).toBe(false); + expect(result.repaired).toBeUndefined(); + expect(parser.getState(0).depth).toBe(2); + }); + + it('should handle repair failure gracefully', () => { + // Create malformed JSON that can't be repaired at depth 0 + const result = parser.addChunk( + 0, + '{invalid: json}', + 'call_1', + 'function1', + ); + + expect(result.complete).toBe(false); + expect(result.error).toBeInstanceOf(Error); + expect(result.repaired).toBeUndefined(); + }); + }); + + describe('Complex collision scenarios', () => { + it('should handle rapid tool call switching at same index', () => { + // Rapid switching between different tool calls at index 0 + parser.addChunk(0, '{"step1":', 'call_1', 'function1'); + parser.addChunk(0, ' "done"}', 'call_1', 'function1'); + + // New tool call immediately at same index + parser.addChunk(0, '{"step2":', 'call_2', 'function2'); + parser.addChunk(0, ' "done"}', 'call_2', 'function2'); + + const completed = parser.getCompletedToolCalls(); + expect(completed).toHaveLength(2); + + const call1 = completed.find((tc) => tc.id === 'call_1'); + const call2 = completed.find((tc) => tc.id === 'call_2'); + expect(call1?.args).toEqual({ step1: 'done' }); + expect(call2?.args).toEqual({ step2: 'done' }); + }); + + it('should handle interleaved chunks from multiple tool calls with ID mapping', () => { + // Start tool call 1 at index 0 + parser.addChunk(0, '{"param1":', 'call_1', 'function1'); + + // Start tool call 2 at index 1 (different index to avoid collision) + parser.addChunk(1, '{"param2":', 'call_2', 'function2'); + + // Continue tool call 1 at its index + const result1 = parser.addChunk(0, ' "value1"}'); + expect(result1.complete).toBe(true); + + // Continue tool call 2 at its index + const result2 = parser.addChunk(1, ' "value2"}'); + expect(result2.complete).toBe(true); + + const completed = parser.getCompletedToolCalls(); + expect(completed).toHaveLength(2); + + const call1 = completed.find((tc) => tc.id === 'call_1'); + const call2 = completed.find((tc) => tc.id === 'call_2'); + expect(call1?.args).toEqual({ param1: 'value1' }); + expect(call2?.args).toEqual({ param2: 'value2' }); + }); + }); }); diff --git a/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.ts b/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.ts index f6b8eab5..31fe7528 100644 --- a/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.ts +++ b/packages/core/src/core/openaiContentGenerator/streamingToolCallParser.ts @@ -21,20 +21,14 @@ export interface ToolCallParseResult { } /** - * Streaming Tool Call Parser Implementation + * StreamingToolCallParser - Handles streaming tool call objects with inconsistent chunk formats * - * This class implements a sophisticated streaming parser specifically designed for - * handling tool call arguments that arrive as partial JSON data in chunks during - * OpenAI streaming responses. It extends the principles from the streaming JSON parser - * to handle the specific requirements of tool call processing. - * - * Key Features: - * - Real-time depth tracking for nested JSON structures in tool arguments - * - Proper handling of string literals and escape sequences - * - Automatic repair of common JSON formatting issues - * - Support for multiple consecutive tool calls with same or different function names - * - Memory-efficient processing without storing complete JSON in memory - * - State management for individual tool call indices + * Problems this parser addresses: + * - Tool calls arrive with varying chunk shapes (empty strings, partial JSON, complete objects) + * - Tool calls may lack IDs, names, or have inconsistent indices + * - Multiple tool calls can be processed simultaneously with interleaved chunks + * - Index collisions occur when the same index is reused for different tool calls + * - JSON arguments are fragmented across multiple chunks and need reconstruction */ export class StreamingToolCallParser { /** Accumulated buffer containing all received chunks for each tool call index */ @@ -47,24 +41,26 @@ export class StreamingToolCallParser { private escapes: Map = new Map(); /** Metadata for each tool call index */ private toolCallMeta: Map = new Map(); + /** Map from tool call ID to actual index used for storage */ + private idToIndexMap: Map = new Map(); + /** Counter for generating new indices when collisions occur */ + private nextAvailableIndex: number = 0; /** * Processes a new chunk of tool call data and attempts to parse complete JSON objects * - * This method implements a state machine that tracks: - * 1. JSON structure depth (brackets and braces) per tool call index - * 2. String literal boundaries per tool call index - * 3. Escape sequences within strings per tool call index - * 4. Tool call metadata (id, function name) per tool call index + * Handles the core problems of streaming tool call parsing: + * - Resolves index collisions when the same index is reused for different tool calls + * - Routes chunks without IDs to the correct incomplete tool call + * - Tracks JSON parsing state (depth, string boundaries, escapes) per tool call + * - Attempts parsing only when JSON structure is complete (depth = 0) + * - Repairs common issues like unclosed strings * - * The parser only attempts to parse when the depth returns to 0, indicating - * a complete JSON structure has been received for that specific tool call index. - * - * @param index - The tool call index from OpenAI streaming response - * @param chunk - A string chunk containing partial JSON data for arguments - * @param id - Optional tool call ID - * @param name - Optional function name - * @returns ToolCallParseResult indicating whether parsing is complete and any parsed value + * @param index - Tool call index from streaming response (may collide with existing calls) + * @param chunk - String chunk that may be empty, partial JSON, or complete data + * @param id - Optional tool call ID for collision detection and chunk routing + * @param name - Optional function name stored as metadata + * @returns ToolCallParseResult with completion status, parsed value, and repair info */ addChunk( index: number, @@ -72,54 +68,92 @@ export class StreamingToolCallParser { id?: string, name?: string, ): ToolCallParseResult { - // Initialize state for this index if not exists - if (!this.buffers.has(index)) { - this.buffers.set(index, ''); - this.depths.set(index, 0); - this.inStrings.set(index, false); - this.escapes.set(index, false); - this.toolCallMeta.set(index, {}); - } + let actualIndex = index; - // Update metadata - const meta = this.toolCallMeta.get(index)!; - if (id) meta.id = id; - if (name) { - // If this is a new function name and we have existing arguments, - // it might be a new tool call with the same index - reset the buffer - if (meta.name && meta.name !== name && this.buffers.get(index)) { - const currentBuffer = this.buffers.get(index)!; - // Check if current buffer contains complete JSON - if (currentBuffer.trim()) { - try { - JSON.parse(currentBuffer); - // If we can parse it, this is likely a new tool call - reset state - this.resetIndex(index); - // Update metadata after reset - const resetMeta = this.toolCallMeta.get(index)!; - if (id) resetMeta.id = id; - resetMeta.name = name; - } catch { - // Current buffer is incomplete, continue accumulating - meta.name = name; - } - } else { - meta.name = name; - } + // Handle tool call ID mapping for collision detection + if (id) { + // This is the start of a new tool call with an ID + if (this.idToIndexMap.has(id)) { + // We've seen this ID before, use the existing mapped index + actualIndex = this.idToIndexMap.get(id)!; } else { - meta.name = name; + // New tool call ID + // Check if the requested index is already occupied by a different complete tool call + if (this.buffers.has(index)) { + const existingBuffer = this.buffers.get(index)!; + const existingDepth = this.depths.get(index)!; + const existingMeta = this.toolCallMeta.get(index); + + // Check if we have a complete tool call at this index + if ( + existingBuffer.trim() && + existingDepth === 0 && + existingMeta?.id && + existingMeta.id !== id + ) { + try { + JSON.parse(existingBuffer); + // We have a complete tool call with a different ID at this index + // Find a new index for this tool call + actualIndex = this.findNextAvailableIndex(); + } catch { + // Existing buffer is not complete JSON, we can reuse this index + } + } + } + + // Map this ID to the actual index we're using + this.idToIndexMap.set(id, actualIndex); + } + } else { + // No ID provided - this is a continuation chunk + // Try to find which tool call this belongs to based on the index + // Look for an existing tool call at this index that's not complete + if (this.buffers.has(index)) { + const existingBuffer = this.buffers.get(index)!; + const existingDepth = this.depths.get(index)!; + + // If there's an incomplete tool call at this index, continue with it + if (existingDepth > 0 || !existingBuffer.trim()) { + actualIndex = index; + } else { + // Check if the buffer at this index is complete + try { + JSON.parse(existingBuffer); + // Buffer is complete, this chunk might belong to a different tool call + // Find the most recent incomplete tool call + actualIndex = this.findMostRecentIncompleteIndex(); + } catch { + // Buffer is incomplete, continue with this index + actualIndex = index; + } + } } } - // Get current state for this index - const currentBuffer = this.buffers.get(index)!; - const currentDepth = this.depths.get(index)!; - const currentInString = this.inStrings.get(index)!; - const currentEscape = this.escapes.get(index)!; + // Initialize state for the actual index if not exists + if (!this.buffers.has(actualIndex)) { + this.buffers.set(actualIndex, ''); + this.depths.set(actualIndex, 0); + this.inStrings.set(actualIndex, false); + this.escapes.set(actualIndex, false); + this.toolCallMeta.set(actualIndex, {}); + } + + // Update metadata + const meta = this.toolCallMeta.get(actualIndex)!; + if (id) meta.id = id; + if (name) meta.name = name; + + // Get current state for the actual index + const currentBuffer = this.buffers.get(actualIndex)!; + const currentDepth = this.depths.get(actualIndex)!; + const currentInString = this.inStrings.get(actualIndex)!; + const currentEscape = this.escapes.get(actualIndex)!; // Add chunk to buffer const newBuffer = currentBuffer + chunk; - this.buffers.set(index, newBuffer); + this.buffers.set(actualIndex, newBuffer); // Track JSON structure depth - only count brackets/braces outside of strings let depth = currentDepth; @@ -141,9 +175,9 @@ export class StreamingToolCallParser { } // Update state - this.depths.set(index, depth); - this.inStrings.set(index, inString); - this.escapes.set(index, escape); + this.depths.set(actualIndex, depth); + this.inStrings.set(actualIndex, inString); + this.escapes.set(actualIndex, escape); // Attempt parse when we're back at root level (depth 0) and have data if (depth === 0 && newBuffer.trim().length > 0) { @@ -188,7 +222,14 @@ export class StreamingToolCallParser { /** * Gets all completed tool calls that are ready to be emitted - * This method should be called when the streaming is complete (finish_reason is present) + * + * Attempts to parse accumulated buffers using multiple strategies: + * 1. Standard JSON.parse() + * 2. Auto-close unclosed strings and retry + * 3. Fallback to safeJsonParse for malformed data + * + * Only returns tool calls with both name metadata and non-empty buffers. + * Should be called when streaming is complete (finish_reason is present). * * @returns Array of completed tool calls with their metadata and parsed arguments */ @@ -240,6 +281,79 @@ export class StreamingToolCallParser { return completed; } + /** + * Finds the next available index for a new tool call + * + * Scans indices starting from nextAvailableIndex to find one that's safe to use. + * Reuses indices with empty buffers or incomplete parsing states. + * Skips indices with complete, parseable tool call data to prevent overwriting. + * + * @returns The next available index safe for storing a new tool call + */ + private findNextAvailableIndex(): number { + while (this.buffers.has(this.nextAvailableIndex)) { + // Check if this index has a complete tool call + const buffer = this.buffers.get(this.nextAvailableIndex)!; + const depth = this.depths.get(this.nextAvailableIndex)!; + const meta = this.toolCallMeta.get(this.nextAvailableIndex); + + // If buffer is empty or incomplete (depth > 0), this index is available + if (!buffer.trim() || depth > 0 || !meta?.id) { + return this.nextAvailableIndex; + } + + // Try to parse the buffer to see if it's complete + try { + JSON.parse(buffer); + // If parsing succeeds and depth is 0, this index has a complete tool call + if (depth === 0) { + this.nextAvailableIndex++; + continue; + } + } catch { + // If parsing fails, this index is available for reuse + return this.nextAvailableIndex; + } + + this.nextAvailableIndex++; + } + return this.nextAvailableIndex++; + } + + /** + * Finds the most recent incomplete tool call index + * + * Used when continuation chunks arrive without IDs. Scans existing tool calls + * to find the highest index with incomplete parsing state (depth > 0, empty buffer, + * or unparseable JSON). Falls back to creating a new index if none found. + * + * @returns The index of the most recent incomplete tool call, or a new available index + */ + private findMostRecentIncompleteIndex(): number { + // Look for the highest index that has an incomplete tool call + let maxIndex = -1; + for (const [index, buffer] of this.buffers.entries()) { + const depth = this.depths.get(index)!; + const meta = this.toolCallMeta.get(index); + + // Check if this tool call is incomplete + if (meta?.id && (depth > 0 || !buffer.trim())) { + maxIndex = Math.max(maxIndex, index); + } else if (buffer.trim()) { + // Check if buffer is parseable (complete) + try { + JSON.parse(buffer); + // Buffer is complete, skip this index + } catch { + // Buffer is incomplete, this could be our target + maxIndex = Math.max(maxIndex, index); + } + } + } + + return maxIndex >= 0 ? maxIndex : this.findNextAvailableIndex(); + } + /** * Resets the parser state for a specific tool call index * @@ -256,8 +370,9 @@ export class StreamingToolCallParser { /** * Resets the entire parser state for processing a new stream * - * This method clears all internal state variables, allowing the parser - * to be reused for multiple streams without interference. + * Clears all accumulated buffers, parsing states, metadata, and counters. + * Allows the parser to be reused for multiple independent streams without + * data leakage between sessions. */ reset(): void { this.buffers.clear(); @@ -265,16 +380,15 @@ export class StreamingToolCallParser { this.inStrings.clear(); this.escapes.clear(); this.toolCallMeta.clear(); + this.idToIndexMap.clear(); + this.nextAvailableIndex = 0; } /** * Gets the current accumulated buffer content for a specific index * - * Useful for debugging or when you need to inspect the raw data - * that has been accumulated so far. - * - * @param index - The tool call index - * @returns The current buffer content for the specified index + * @param index - The tool call index to retrieve buffer for + * @returns The current buffer content for the specified index (empty string if not found) */ getBuffer(index: number): string { return this.buffers.get(index) || ''; @@ -283,8 +397,8 @@ export class StreamingToolCallParser { /** * Gets the current parsing state information for a specific index * - * @param index - The tool call index - * @returns Object containing current depth, string state, and escape state + * @param index - The tool call index to get state information for + * @returns Object containing current parsing state (depth, inString, escape) */ getState(index: number): { depth: number;