From c58106079e6c0725db80d21c0b96afba1ff0e729 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Thu, 14 Aug 2025 16:39:26 +0800 Subject: [PATCH 1/2] feat: prevent concurrent query submissions in useGeminiStream hook --- .../cli/src/ui/hooks/useGeminiStream.test.tsx | 263 ++++++++++++++++++ packages/cli/src/ui/hooks/useGeminiStream.ts | 12 + 2 files changed, 275 insertions(+) diff --git a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx index 5f89083a..bf0f4fc0 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx +++ b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx @@ -448,6 +448,7 @@ describe('useGeminiStream', () => { callId: 'call1', responseParts: [{ text: 'tool 1 response' }], error: undefined, + errorType: undefined, resultDisplay: 'Tool 1 success display', }, tool: { @@ -655,6 +656,7 @@ describe('useGeminiStream', () => { ], resultDisplay: undefined, error: undefined, + errorType: undefined, }, responseSubmittedToGemini: false, }; @@ -679,6 +681,7 @@ describe('useGeminiStream', () => { ], resultDisplay: undefined, error: undefined, + errorType: undefined, }, responseSubmittedToGemini: false, }; @@ -775,6 +778,7 @@ describe('useGeminiStream', () => { callId: 'call1', responseParts: toolCallResponseParts, error: undefined, + errorType: undefined, resultDisplay: 'Tool 1 success display', }, endTime: Date.now(), @@ -1128,6 +1132,7 @@ describe('useGeminiStream', () => { responseParts: [{ text: 'Memory saved' }], resultDisplay: 'Success: Memory saved', error: undefined, + errorType: undefined, }, tool: { name: 'save_memory', @@ -1649,4 +1654,262 @@ describe('useGeminiStream', () => { ); }); }); + + describe('Concurrent Execution Prevention', () => { + it('should prevent concurrent submitQuery calls', async () => { + let resolveFirstCall!: () => void; + let resolveSecondCall!: () => void; + + const firstCallPromise = new Promise((resolve) => { + resolveFirstCall = resolve; + }); + + const secondCallPromise = new Promise((resolve) => { + resolveSecondCall = resolve; + }); + + // Mock a long-running stream for the first call + const firstStream = (async function* () { + yield { type: ServerGeminiEventType.Content, value: 'First call content' }; + await firstCallPromise; // Wait until we manually resolve + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(); + + // Mock a stream for the second call (should not be used) + const secondStream = (async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Second call content' }; + await secondCallPromise; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(); + + let callCount = 0; + mockSendMessageStream.mockImplementation(() => { + callCount++; + if (callCount === 1) { + return firstStream; + } else { + return secondStream; + } + }); + + const { result } = renderTestHook(); + + // Start first call + const firstCallResult = act(async () => { + await result.current.submitQuery('First query'); + }); + + // Wait a bit to ensure first call has started + await new Promise(resolve => setTimeout(resolve, 10)); + + // Try to start second call while first is still running + const secondCallResult = act(async () => { + await result.current.submitQuery('Second query'); + }); + + // Resolve both calls + resolveFirstCall(); + resolveSecondCall(); + + await Promise.all([firstCallResult, secondCallResult]); + + // Verify only one call was made to sendMessageStream + expect(mockSendMessageStream).toHaveBeenCalledTimes(1); + expect(mockSendMessageStream).toHaveBeenCalledWith( + 'First query', + expect.any(AbortSignal), + expect.any(String), + ); + + // Verify only the first query was added to history + const userMessages = mockAddItem.mock.calls.filter( + call => call[0].type === MessageType.USER + ); + expect(userMessages).toHaveLength(1); + expect(userMessages[0][0].text).toBe('First query'); + }); + + it('should allow subsequent calls after first call completes', async () => { + // Mock streams that complete immediately + mockSendMessageStream + .mockReturnValueOnce((async function* () { + yield { type: ServerGeminiEventType.Content, value: 'First response' }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })()) + .mockReturnValueOnce((async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Second response' }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })()); + + const { result } = renderTestHook(); + + // First call + await act(async () => { + await result.current.submitQuery('First query'); + }); + + // Second call after first completes + await act(async () => { + await result.current.submitQuery('Second query'); + }); + + // Both calls should have been made + expect(mockSendMessageStream).toHaveBeenCalledTimes(2); + expect(mockSendMessageStream).toHaveBeenNthCalledWith(1, + 'First query', + expect.any(AbortSignal), + expect.any(String), + ); + expect(mockSendMessageStream).toHaveBeenNthCalledWith(2, + 'Second query', + expect.any(AbortSignal), + expect.any(String), + ); + }); + + it('should reset execution flag even when query preparation fails', async () => { + const { result } = renderTestHook(); + + // First call with empty query (should fail in preparation) + await act(async () => { + await result.current.submitQuery(' '); // Empty trimmed query + }); + + // Second call should work normally + mockSendMessageStream.mockReturnValue((async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Valid response' }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })()); + + await act(async () => { + await result.current.submitQuery('Valid query'); + }); + + // The second call should have been made + expect(mockSendMessageStream).toHaveBeenCalledTimes(1); + expect(mockSendMessageStream).toHaveBeenCalledWith( + 'Valid query', + expect.any(AbortSignal), + expect.any(String), + ); + }); + + it('should reset execution flag when user cancels', async () => { + let resolveCancelledStream!: () => void; + const cancelledStreamPromise = new Promise((resolve) => { + resolveCancelledStream = resolve; + }); + + // Mock a stream that can be cancelled + const cancelledStream = (async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Cancelled content' }; + await cancelledStreamPromise; + yield { type: ServerGeminiEventType.UserCancelled }; + })(); + + mockSendMessageStream.mockReturnValueOnce(cancelledStream); + + const { result } = renderTestHook(); + + // Start first call + const firstCallResult = act(async () => { + await result.current.submitQuery('First query'); + }); + + // Wait a bit then resolve to trigger cancellation + await new Promise(resolve => setTimeout(resolve, 10)); + resolveCancelledStream(); + await firstCallResult; + + // Now try a second call - should work + mockSendMessageStream.mockReturnValue((async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Second response' }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })()); + + await act(async () => { + await result.current.submitQuery('Second query'); + }); + + // Both calls should have been made + expect(mockSendMessageStream).toHaveBeenCalledTimes(2); + }); + + it('should reset execution flag when an error occurs', async () => { + // Mock a stream that throws an error + mockSendMessageStream.mockReturnValueOnce((async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Error content' }; + throw new Error('Stream error'); + })()); + + const { result } = renderTestHook(); + + // First call that will error + await act(async () => { + await result.current.submitQuery('Error query'); + }); + + // Second call should work normally + mockSendMessageStream.mockReturnValue((async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Success response' }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })()); + + await act(async () => { + await result.current.submitQuery('Success query'); + }); + + // Both calls should have been attempted + expect(mockSendMessageStream).toHaveBeenCalledTimes(2); + }); + + it('should handle rapid multiple concurrent calls correctly', async () => { + let resolveStream!: () => void; + const streamPromise = new Promise((resolve) => { + resolveStream = resolve; + }); + + // Mock a long-running stream + const longStream = (async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Long running content' }; + await streamPromise; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(); + + mockSendMessageStream.mockReturnValue(longStream); + + const { result } = renderTestHook(); + + // Start multiple concurrent calls + const calls = [ + act(async () => { await result.current.submitQuery('Query 1'); }), + act(async () => { await result.current.submitQuery('Query 2'); }), + act(async () => { await result.current.submitQuery('Query 3'); }), + act(async () => { await result.current.submitQuery('Query 4'); }), + act(async () => { await result.current.submitQuery('Query 5'); }), + ]; + + // Wait a bit then resolve the stream + await new Promise(resolve => setTimeout(resolve, 10)); + resolveStream(); + + // Wait for all calls to complete + await Promise.all(calls); + + // Only the first call should have been made + expect(mockSendMessageStream).toHaveBeenCalledTimes(1); + expect(mockSendMessageStream).toHaveBeenCalledWith( + 'Query 1', + expect.any(AbortSignal), + expect.any(String), + ); + + // Only one user message should have been added + const userMessages = mockAddItem.mock.calls.filter( + call => call[0].type === MessageType.USER + ); + expect(userMessages).toHaveLength(1); + expect(userMessages[0][0].text).toBe('Query 1'); + }); + }); }); diff --git a/packages/cli/src/ui/hooks/useGeminiStream.ts b/packages/cli/src/ui/hooks/useGeminiStream.ts index 85614d3b..43fccdb6 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.ts +++ b/packages/cli/src/ui/hooks/useGeminiStream.ts @@ -97,6 +97,7 @@ export const useGeminiStream = ( const [initError, setInitError] = useState(null); const abortControllerRef = useRef(null); const turnCancelledRef = useRef(false); + const isSubmittingQueryRef = useRef(false); const [isResponding, setIsResponding] = useState(false); const [thought, setThought] = useState(null); const [pendingHistoryItemRef, setPendingHistoryItem] = @@ -622,6 +623,11 @@ export const useGeminiStream = ( options?: { isContinuation: boolean }, prompt_id?: string, ) => { + // Prevent concurrent executions of submitQuery + if (isSubmittingQueryRef.current) { + return; + } + if ( (streamingState === StreamingState.Responding || streamingState === StreamingState.WaitingForConfirmation) && @@ -629,6 +635,9 @@ export const useGeminiStream = ( ) return; + // Set the flag to indicate we're now executing + isSubmittingQueryRef.current = true; + const userMessageTimestamp = Date.now(); // Reset quota error flag when starting a new query (not a continuation) @@ -653,6 +662,7 @@ export const useGeminiStream = ( ); if (!shouldProceed || queryToSend === null) { + isSubmittingQueryRef.current = false; return; } @@ -677,6 +687,7 @@ export const useGeminiStream = ( ); if (processingStatus === StreamProcessingStatus.UserCancelled) { + isSubmittingQueryRef.current = false; return; } @@ -708,6 +719,7 @@ export const useGeminiStream = ( } } finally { setIsResponding(false); + isSubmittingQueryRef.current = false; } }, [ From 0265b67b9011355eb42085262f13ed27ca2ca5be Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Thu, 14 Aug 2025 16:48:51 +0800 Subject: [PATCH 2/2] chore: npm run format & lint --- .../cli/src/ui/hooks/useGeminiStream.test.tsx | 135 ++++++++++++------ 1 file changed, 93 insertions(+), 42 deletions(-) diff --git a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx index bf0f4fc0..f4543666 100644 --- a/packages/cli/src/ui/hooks/useGeminiStream.test.tsx +++ b/packages/cli/src/ui/hooks/useGeminiStream.test.tsx @@ -1659,25 +1659,31 @@ describe('useGeminiStream', () => { it('should prevent concurrent submitQuery calls', async () => { let resolveFirstCall!: () => void; let resolveSecondCall!: () => void; - + const firstCallPromise = new Promise((resolve) => { resolveFirstCall = resolve; }); - + const secondCallPromise = new Promise((resolve) => { resolveSecondCall = resolve; }); // Mock a long-running stream for the first call const firstStream = (async function* () { - yield { type: ServerGeminiEventType.Content, value: 'First call content' }; + yield { + type: ServerGeminiEventType.Content, + value: 'First call content', + }; await firstCallPromise; // Wait until we manually resolve yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; })(); // Mock a stream for the second call (should not be used) const secondStream = (async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Second call content' }; + yield { + type: ServerGeminiEventType.Content, + value: 'Second call content', + }; await secondCallPromise; yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; })(); @@ -1700,7 +1706,7 @@ describe('useGeminiStream', () => { }); // Wait a bit to ensure first call has started - await new Promise(resolve => setTimeout(resolve, 10)); + await new Promise((resolve) => setTimeout(resolve, 10)); // Try to start second call while first is still running const secondCallResult = act(async () => { @@ -1723,7 +1729,7 @@ describe('useGeminiStream', () => { // Verify only the first query was added to history const userMessages = mockAddItem.mock.calls.filter( - call => call[0].type === MessageType.USER + (call) => call[0].type === MessageType.USER, ); expect(userMessages).toHaveLength(1); expect(userMessages[0][0].text).toBe('First query'); @@ -1732,14 +1738,24 @@ describe('useGeminiStream', () => { it('should allow subsequent calls after first call completes', async () => { // Mock streams that complete immediately mockSendMessageStream - .mockReturnValueOnce((async function* () { - yield { type: ServerGeminiEventType.Content, value: 'First response' }; - yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; - })()) - .mockReturnValueOnce((async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Second response' }; - yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; - })()); + .mockReturnValueOnce( + (async function* () { + yield { + type: ServerGeminiEventType.Content, + value: 'First response', + }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(), + ) + .mockReturnValueOnce( + (async function* () { + yield { + type: ServerGeminiEventType.Content, + value: 'Second response', + }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(), + ); const { result } = renderTestHook(); @@ -1755,12 +1771,14 @@ describe('useGeminiStream', () => { // Both calls should have been made expect(mockSendMessageStream).toHaveBeenCalledTimes(2); - expect(mockSendMessageStream).toHaveBeenNthCalledWith(1, + expect(mockSendMessageStream).toHaveBeenNthCalledWith( + 1, 'First query', expect.any(AbortSignal), expect.any(String), ); - expect(mockSendMessageStream).toHaveBeenNthCalledWith(2, + expect(mockSendMessageStream).toHaveBeenNthCalledWith( + 2, 'Second query', expect.any(AbortSignal), expect.any(String), @@ -1776,10 +1794,15 @@ describe('useGeminiStream', () => { }); // Second call should work normally - mockSendMessageStream.mockReturnValue((async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Valid response' }; - yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; - })()); + mockSendMessageStream.mockReturnValue( + (async function* () { + yield { + type: ServerGeminiEventType.Content, + value: 'Valid response', + }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(), + ); await act(async () => { await result.current.submitQuery('Valid query'); @@ -1802,7 +1825,10 @@ describe('useGeminiStream', () => { // Mock a stream that can be cancelled const cancelledStream = (async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Cancelled content' }; + yield { + type: ServerGeminiEventType.Content, + value: 'Cancelled content', + }; await cancelledStreamPromise; yield { type: ServerGeminiEventType.UserCancelled }; })(); @@ -1817,15 +1843,20 @@ describe('useGeminiStream', () => { }); // Wait a bit then resolve to trigger cancellation - await new Promise(resolve => setTimeout(resolve, 10)); + await new Promise((resolve) => setTimeout(resolve, 10)); resolveCancelledStream(); await firstCallResult; // Now try a second call - should work - mockSendMessageStream.mockReturnValue((async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Second response' }; - yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; - })()); + mockSendMessageStream.mockReturnValue( + (async function* () { + yield { + type: ServerGeminiEventType.Content, + value: 'Second response', + }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(), + ); await act(async () => { await result.current.submitQuery('Second query'); @@ -1837,10 +1868,12 @@ describe('useGeminiStream', () => { it('should reset execution flag when an error occurs', async () => { // Mock a stream that throws an error - mockSendMessageStream.mockReturnValueOnce((async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Error content' }; - throw new Error('Stream error'); - })()); + mockSendMessageStream.mockReturnValueOnce( + (async function* () { + yield { type: ServerGeminiEventType.Content, value: 'Error content' }; + throw new Error('Stream error'); + })(), + ); const { result } = renderTestHook(); @@ -1850,10 +1883,15 @@ describe('useGeminiStream', () => { }); // Second call should work normally - mockSendMessageStream.mockReturnValue((async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Success response' }; - yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; - })()); + mockSendMessageStream.mockReturnValue( + (async function* () { + yield { + type: ServerGeminiEventType.Content, + value: 'Success response', + }; + yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; + })(), + ); await act(async () => { await result.current.submitQuery('Success query'); @@ -1871,7 +1909,10 @@ describe('useGeminiStream', () => { // Mock a long-running stream const longStream = (async function* () { - yield { type: ServerGeminiEventType.Content, value: 'Long running content' }; + yield { + type: ServerGeminiEventType.Content, + value: 'Long running content', + }; await streamPromise; yield { type: ServerGeminiEventType.Finished, value: 'STOP' }; })(); @@ -1882,15 +1923,25 @@ describe('useGeminiStream', () => { // Start multiple concurrent calls const calls = [ - act(async () => { await result.current.submitQuery('Query 1'); }), - act(async () => { await result.current.submitQuery('Query 2'); }), - act(async () => { await result.current.submitQuery('Query 3'); }), - act(async () => { await result.current.submitQuery('Query 4'); }), - act(async () => { await result.current.submitQuery('Query 5'); }), + act(async () => { + await result.current.submitQuery('Query 1'); + }), + act(async () => { + await result.current.submitQuery('Query 2'); + }), + act(async () => { + await result.current.submitQuery('Query 3'); + }), + act(async () => { + await result.current.submitQuery('Query 4'); + }), + act(async () => { + await result.current.submitQuery('Query 5'); + }), ]; // Wait a bit then resolve the stream - await new Promise(resolve => setTimeout(resolve, 10)); + await new Promise((resolve) => setTimeout(resolve, 10)); resolveStream(); // Wait for all calls to complete @@ -1906,7 +1957,7 @@ describe('useGeminiStream', () => { // Only one user message should have been added const userMessages = mockAddItem.mock.calls.filter( - call => call[0].type === MessageType.USER + (call) => call[0].type === MessageType.USER, ); expect(userMessages).toHaveLength(1); expect(userMessages[0][0].text).toBe('Query 1');