mirror of
https://github.com/QwenLM/qwen-code.git
synced 2025-12-19 09:33:53 +00:00
chore(usage telemetry): Freshen up Clearcut logging (#6013)
Co-authored-by: christine betts <chrstn@uw.edu> Co-authored-by: Jacob Richman <jacob314@gmail.com> Co-authored-by: matt korwel <matt.korwel@gmail.com>
This commit is contained in:
@@ -13,6 +13,25 @@ import {
|
||||
ConfigParameters,
|
||||
ContentGeneratorConfig,
|
||||
} from '@google/gemini-cli-core';
|
||||
import { http, HttpResponse } from 'msw';
|
||||
import { setupServer } from 'msw/node';
|
||||
|
||||
export const server = setupServer();
|
||||
|
||||
// TODO(richieforeman): Consider moving this to test setup globally.
|
||||
beforeAll(() => {
|
||||
server.listen({});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
server.resetHandlers();
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
server.close();
|
||||
});
|
||||
|
||||
const CLEARCUT_URL = 'https://play.googleapis.com/log';
|
||||
|
||||
const TEST_CONTENT_GENERATOR_CONFIG: ContentGeneratorConfig = {
|
||||
apiKey: 'test-key',
|
||||
@@ -37,6 +56,8 @@ describe('Configuration Integration Tests', () => {
|
||||
let originalEnv: NodeJS.ProcessEnv;
|
||||
|
||||
beforeEach(() => {
|
||||
server.resetHandlers(http.post(CLEARCUT_URL, () => HttpResponse.text()));
|
||||
|
||||
tempDir = fs.mkdtempSync(path.join(tmpdir(), 'gemini-cli-test-'));
|
||||
originalEnv = { ...process.env };
|
||||
process.env.GEMINI_API_KEY = 'test-api-key';
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi, beforeEach, Mock } from 'vitest';
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { Mock } from 'vitest';
|
||||
import { Config, ConfigParameters, SandboxConfig } from './config.js';
|
||||
import * as path from 'path';
|
||||
import { setGeminiMdFilename as mockSetGeminiMdFilename } from '../tools/memoryTool.js';
|
||||
@@ -18,6 +19,7 @@ import {
|
||||
} from '../core/contentGenerator.js';
|
||||
import { GeminiClient } from '../core/client.js';
|
||||
import { GitService } from '../services/gitService.js';
|
||||
import { ClearcutLogger } from '../telemetry/clearcut-logger/clearcut-logger.js';
|
||||
|
||||
vi.mock('fs', async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import('fs')>();
|
||||
@@ -119,11 +121,16 @@ describe('Server Config (config.ts)', () => {
|
||||
telemetry: TELEMETRY_SETTINGS,
|
||||
sessionId: SESSION_ID,
|
||||
model: MODEL,
|
||||
usageStatisticsEnabled: false,
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
// Reset mocks if necessary
|
||||
vi.clearAllMocks();
|
||||
vi.spyOn(
|
||||
ClearcutLogger.prototype,
|
||||
'logStartSessionEvent',
|
||||
).mockImplementation(() => undefined);
|
||||
});
|
||||
|
||||
describe('initialize', () => {
|
||||
@@ -372,6 +379,39 @@ describe('Server Config (config.ts)', () => {
|
||||
expect(fileService).toBeDefined();
|
||||
});
|
||||
|
||||
describe('Usage Statistics', () => {
|
||||
it('defaults usage statistics to enabled if not specified', () => {
|
||||
const config = new Config({
|
||||
...baseParams,
|
||||
usageStatisticsEnabled: undefined,
|
||||
});
|
||||
|
||||
expect(config.getUsageStatisticsEnabled()).toBe(true);
|
||||
});
|
||||
|
||||
it.each([{ enabled: true }, { enabled: false }])(
|
||||
'sets usage statistics based on the provided value (enabled: $enabled)',
|
||||
({ enabled }) => {
|
||||
const config = new Config({
|
||||
...baseParams,
|
||||
usageStatisticsEnabled: enabled,
|
||||
});
|
||||
expect(config.getUsageStatisticsEnabled()).toBe(enabled);
|
||||
},
|
||||
);
|
||||
|
||||
it('logs the session start event', () => {
|
||||
new Config({
|
||||
...baseParams,
|
||||
usageStatisticsEnabled: true,
|
||||
});
|
||||
|
||||
expect(
|
||||
ClearcutLogger.prototype.logStartSessionEvent,
|
||||
).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Telemetry Settings', () => {
|
||||
it('should return default telemetry target if not provided', () => {
|
||||
const params: ConfigParameters = {
|
||||
|
||||
9
packages/core/src/mocks/msw.ts
Normal file
9
packages/core/src/mocks/msw.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { setupServer } from 'msw/node';
|
||||
|
||||
export const server = setupServer();
|
||||
@@ -4,33 +4,49 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { vi, describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import * as https from 'https';
|
||||
import { ClientRequest, IncomingMessage } from 'http';
|
||||
import { Readable, Writable } from 'stream';
|
||||
|
||||
import {
|
||||
ClearcutLogger,
|
||||
LogResponse,
|
||||
LogEventEntry,
|
||||
} from './clearcut-logger.js';
|
||||
import { Config } from '../../config/config.js';
|
||||
vi,
|
||||
describe,
|
||||
it,
|
||||
expect,
|
||||
afterEach,
|
||||
beforeAll,
|
||||
afterAll,
|
||||
} from 'vitest';
|
||||
|
||||
import { ClearcutLogger, LogEventEntry, TEST_ONLY } from './clearcut-logger.js';
|
||||
import { ConfigParameters } from '../../config/config.js';
|
||||
import * as userAccount from '../../utils/user_account.js';
|
||||
import * as userId from '../../utils/user_id.js';
|
||||
import { EventMetadataKey } from './event-metadata-key.js';
|
||||
import { makeFakeConfig } from '../../test-utils/config.js';
|
||||
import { http, HttpResponse } from 'msw';
|
||||
import { server } from '../../mocks/msw.js';
|
||||
|
||||
// Mock dependencies
|
||||
vi.mock('https-proxy-agent');
|
||||
vi.mock('https');
|
||||
vi.mock('../../utils/user_account');
|
||||
vi.mock('../../utils/user_id');
|
||||
|
||||
const mockHttps = vi.mocked(https);
|
||||
const mockUserAccount = vi.mocked(userAccount);
|
||||
const mockUserId = vi.mocked(userId);
|
||||
|
||||
// TODO(richieforeman): Consider moving this to test setup globally.
|
||||
beforeAll(() => {
|
||||
server.listen({});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
server.resetHandlers();
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
server.close();
|
||||
});
|
||||
|
||||
describe('ClearcutLogger', () => {
|
||||
let mockConfig: Config;
|
||||
let logger: ClearcutLogger | undefined;
|
||||
const NEXT_WAIT_MS = 1234;
|
||||
const CLEARCUT_URL = 'https://play.googleapis.com/log';
|
||||
const MOCK_DATE = new Date('2025-01-02T00:00:00.000Z');
|
||||
const EXAMPLE_RESPONSE = `["${NEXT_WAIT_MS}",null,[[["ANDROID_BACKUP",0],["BATTERY_STATS",0],["SMART_SETUP",0],["TRON",0]],-3334737594024971225],[]]`;
|
||||
|
||||
// A helper to get the internal events array for testing
|
||||
const getEvents = (l: ClearcutLogger): LogEventEntry[][] =>
|
||||
@@ -38,32 +54,37 @@ describe('ClearcutLogger', () => {
|
||||
|
||||
const getEventsSize = (l: ClearcutLogger): number => l['events'].size;
|
||||
|
||||
const getMaxEvents = (l: ClearcutLogger): number => l['max_events'];
|
||||
|
||||
const getMaxRetryEvents = (l: ClearcutLogger): number =>
|
||||
l['max_retry_events'];
|
||||
|
||||
const requeueFailedEvents = (l: ClearcutLogger, events: LogEventEntry[][]) =>
|
||||
l['requeueFailedEvents'](events);
|
||||
|
||||
beforeEach(() => {
|
||||
function setup({
|
||||
config = {} as Partial<ConfigParameters>,
|
||||
lifetimeGoogleAccounts = 1,
|
||||
cachedGoogleAccount = 'test@google.com',
|
||||
installationId = 'test-installation-id',
|
||||
} = {}) {
|
||||
server.resetHandlers(
|
||||
http.post(CLEARCUT_URL, () => HttpResponse.text(EXAMPLE_RESPONSE)),
|
||||
);
|
||||
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date());
|
||||
vi.setSystemTime(MOCK_DATE);
|
||||
|
||||
mockConfig = {
|
||||
getUsageStatisticsEnabled: vi.fn().mockReturnValue(true),
|
||||
getDebugMode: vi.fn().mockReturnValue(false),
|
||||
getSessionId: vi.fn().mockReturnValue('test-session-id'),
|
||||
getProxy: vi.fn().mockReturnValue(undefined),
|
||||
} as unknown as Config;
|
||||
const loggerConfig = makeFakeConfig({
|
||||
...config,
|
||||
});
|
||||
ClearcutLogger.clearInstance();
|
||||
|
||||
mockUserAccount.getCachedGoogleAccount.mockReturnValue('test@google.com');
|
||||
mockUserAccount.getLifetimeGoogleAccounts.mockReturnValue(1);
|
||||
mockUserId.getInstallationId.mockReturnValue('test-installation-id');
|
||||
mockUserAccount.getCachedGoogleAccount.mockReturnValue(cachedGoogleAccount);
|
||||
mockUserAccount.getLifetimeGoogleAccounts.mockReturnValue(
|
||||
lifetimeGoogleAccounts,
|
||||
);
|
||||
mockUserId.getInstallationId.mockReturnValue(installationId);
|
||||
|
||||
logger = ClearcutLogger.getInstance(mockConfig);
|
||||
expect(logger).toBeDefined();
|
||||
});
|
||||
const logger = ClearcutLogger.getInstance(loggerConfig);
|
||||
|
||||
return { logger, loggerConfig };
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
ClearcutLogger.clearInstance();
|
||||
@@ -71,109 +92,131 @@ describe('ClearcutLogger', () => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('should not return an instance if usage statistics are disabled', () => {
|
||||
ClearcutLogger.clearInstance();
|
||||
vi.spyOn(mockConfig, 'getUsageStatisticsEnabled').mockReturnValue(false);
|
||||
const disabledLogger = ClearcutLogger.getInstance(mockConfig);
|
||||
expect(disabledLogger).toBeUndefined();
|
||||
describe('getInstance', () => {
|
||||
it.each([
|
||||
{ usageStatisticsEnabled: false, expectedValue: undefined },
|
||||
{
|
||||
usageStatisticsEnabled: true,
|
||||
expectedValue: expect.any(ClearcutLogger),
|
||||
},
|
||||
])(
|
||||
'returns an instance if usage statistics are enabled',
|
||||
({ usageStatisticsEnabled, expectedValue }) => {
|
||||
ClearcutLogger.clearInstance();
|
||||
const { logger } = setup({
|
||||
config: {
|
||||
usageStatisticsEnabled,
|
||||
},
|
||||
});
|
||||
expect(logger).toEqual(expectedValue);
|
||||
},
|
||||
);
|
||||
|
||||
it('is a singleton', () => {
|
||||
ClearcutLogger.clearInstance();
|
||||
const { loggerConfig } = setup();
|
||||
const logger1 = ClearcutLogger.getInstance(loggerConfig);
|
||||
const logger2 = ClearcutLogger.getInstance(loggerConfig);
|
||||
expect(logger1).toBe(logger2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('createLogEvent', () => {
|
||||
it('logs the total number of google accounts', () => {
|
||||
const { logger } = setup({
|
||||
lifetimeGoogleAccounts: 9001,
|
||||
});
|
||||
|
||||
const event = logger?.createLogEvent('abc', []);
|
||||
|
||||
expect(event?.event_metadata[0][0]).toEqual({
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_GOOGLE_ACCOUNTS_COUNT,
|
||||
value: '9001',
|
||||
});
|
||||
});
|
||||
|
||||
it('logs the current surface', () => {
|
||||
const { logger } = setup({});
|
||||
|
||||
const event = logger?.createLogEvent('abc', []);
|
||||
|
||||
expect(event?.event_metadata[0][1]).toEqual({
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_SURFACE,
|
||||
value: 'SURFACE_NOT_SET',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('enqueueLogEvent', () => {
|
||||
it('should add events to the queue', () => {
|
||||
const { logger } = setup();
|
||||
logger!.enqueueLogEvent({ test: 'event1' });
|
||||
expect(getEventsSize(logger!)).toBe(1);
|
||||
});
|
||||
|
||||
it('should evict the oldest event when the queue is full', () => {
|
||||
const maxEvents = getMaxEvents(logger!);
|
||||
const { logger } = setup();
|
||||
|
||||
for (let i = 0; i < maxEvents; i++) {
|
||||
for (let i = 0; i < TEST_ONLY.MAX_EVENTS; i++) {
|
||||
logger!.enqueueLogEvent({ event_id: i });
|
||||
}
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(maxEvents);
|
||||
expect(getEventsSize(logger!)).toBe(TEST_ONLY.MAX_EVENTS);
|
||||
const firstEvent = JSON.parse(
|
||||
getEvents(logger!)[0][0].source_extension_json,
|
||||
);
|
||||
expect(firstEvent.event_id).toBe(0);
|
||||
|
||||
// This should push out the first event
|
||||
logger!.enqueueLogEvent({ event_id: maxEvents });
|
||||
logger!.enqueueLogEvent({ event_id: TEST_ONLY.MAX_EVENTS });
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(maxEvents);
|
||||
expect(getEventsSize(logger!)).toBe(TEST_ONLY.MAX_EVENTS);
|
||||
const newFirstEvent = JSON.parse(
|
||||
getEvents(logger!)[0][0].source_extension_json,
|
||||
);
|
||||
expect(newFirstEvent.event_id).toBe(1);
|
||||
const lastEvent = JSON.parse(
|
||||
getEvents(logger!)[maxEvents - 1][0].source_extension_json,
|
||||
getEvents(logger!)[TEST_ONLY.MAX_EVENTS - 1][0].source_extension_json,
|
||||
);
|
||||
expect(lastEvent.event_id).toBe(maxEvents);
|
||||
expect(lastEvent.event_id).toBe(TEST_ONLY.MAX_EVENTS);
|
||||
});
|
||||
});
|
||||
|
||||
describe('flushToClearcut', () => {
|
||||
let mockRequest: Writable;
|
||||
let mockResponse: Readable & Partial<IncomingMessage>;
|
||||
|
||||
beforeEach(() => {
|
||||
mockRequest = new Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
callback();
|
||||
it('allows for usage with a configured proxy agent', async () => {
|
||||
const { logger } = setup({
|
||||
config: {
|
||||
proxy: 'http://mycoolproxy.whatever.com:3128',
|
||||
},
|
||||
});
|
||||
vi.spyOn(mockRequest, 'on');
|
||||
vi.spyOn(mockRequest, 'end').mockReturnThis();
|
||||
vi.spyOn(mockRequest, 'destroy').mockReturnThis();
|
||||
|
||||
mockResponse = new Readable({ read() {} }) as Readable &
|
||||
Partial<IncomingMessage>;
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
|
||||
mockHttps.request.mockImplementation(
|
||||
(
|
||||
_options: string | https.RequestOptions | URL,
|
||||
...args: unknown[]
|
||||
): ClientRequest => {
|
||||
const callback = args.find((arg) => typeof arg === 'function') as
|
||||
| ((res: IncomingMessage) => void)
|
||||
| undefined;
|
||||
const response = await logger!.flushToClearcut();
|
||||
|
||||
if (callback) {
|
||||
callback(mockResponse as IncomingMessage);
|
||||
}
|
||||
return mockRequest as ClientRequest;
|
||||
},
|
||||
);
|
||||
expect(response.nextRequestWaitMs).toBe(NEXT_WAIT_MS);
|
||||
});
|
||||
|
||||
it('should clear events on successful flush', async () => {
|
||||
mockResponse.statusCode = 200;
|
||||
const mockResponseBody = { nextRequestWaitMs: 1000 };
|
||||
// Encoded protobuf for {nextRequestWaitMs: 1000} which is `08 E8 07`
|
||||
const encodedResponse = Buffer.from([8, 232, 7]);
|
||||
const { logger } = setup();
|
||||
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
const flushPromise = logger!.flushToClearcut();
|
||||
const response = await logger!.flushToClearcut();
|
||||
|
||||
mockResponse.push(encodedResponse);
|
||||
mockResponse.push(null); // End the stream
|
||||
|
||||
const response: LogResponse = await flushPromise;
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(0);
|
||||
expect(response.nextRequestWaitMs).toBe(
|
||||
mockResponseBody.nextRequestWaitMs,
|
||||
);
|
||||
expect(getEvents(logger!)).toEqual([]);
|
||||
expect(response.nextRequestWaitMs).toBe(NEXT_WAIT_MS);
|
||||
});
|
||||
|
||||
it('should handle a network error and requeue events', async () => {
|
||||
const { logger } = setup();
|
||||
|
||||
server.resetHandlers(http.post(CLEARCUT_URL, () => HttpResponse.error()));
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
logger!.enqueueLogEvent({ event_id: 2 });
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
|
||||
const flushPromise = logger!.flushToClearcut();
|
||||
mockRequest.emit('error', new Error('Network error'));
|
||||
await flushPromise;
|
||||
const x = logger!.flushToClearcut();
|
||||
await x;
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
const events = getEvents(logger!);
|
||||
@@ -181,18 +224,28 @@ describe('ClearcutLogger', () => {
|
||||
});
|
||||
|
||||
it('should handle an HTTP error and requeue events', async () => {
|
||||
mockResponse.statusCode = 500;
|
||||
mockResponse.statusMessage = 'Internal Server Error';
|
||||
const { logger } = setup();
|
||||
|
||||
server.resetHandlers(
|
||||
http.post(
|
||||
CLEARCUT_URL,
|
||||
() =>
|
||||
new HttpResponse(
|
||||
{ 'the system is down': true },
|
||||
{
|
||||
status: 500,
|
||||
},
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
logger!.enqueueLogEvent({ event_id: 2 });
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
|
||||
const flushPromise = logger!.flushToClearcut();
|
||||
mockResponse.emit('end'); // End the response to trigger promise resolution
|
||||
await flushPromise;
|
||||
expect(getEvents(logger!).length).toBe(2);
|
||||
await logger!.flushToClearcut();
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
expect(getEvents(logger!).length).toBe(2);
|
||||
const events = getEvents(logger!);
|
||||
expect(JSON.parse(events[0][0].source_extension_json).event_id).toBe(1);
|
||||
});
|
||||
@@ -200,7 +253,8 @@ describe('ClearcutLogger', () => {
|
||||
|
||||
describe('requeueFailedEvents logic', () => {
|
||||
it('should limit the number of requeued events to max_retry_events', () => {
|
||||
const maxRetryEvents = getMaxRetryEvents(logger!);
|
||||
const { logger } = setup();
|
||||
const maxRetryEvents = TEST_ONLY.MAX_RETRY_EVENTS;
|
||||
const eventsToLogCount = maxRetryEvents + 5;
|
||||
const eventsToSend: LogEventEntry[][] = [];
|
||||
for (let i = 0; i < eventsToLogCount; i++) {
|
||||
@@ -225,7 +279,8 @@ describe('ClearcutLogger', () => {
|
||||
});
|
||||
|
||||
it('should not requeue more events than available space in the queue', () => {
|
||||
const maxEvents = getMaxEvents(logger!);
|
||||
const { logger } = setup();
|
||||
const maxEvents = TEST_ONLY.MAX_EVENTS;
|
||||
const spaceToLeave = 5;
|
||||
const initialEventCount = maxEvents - spaceToLeave;
|
||||
for (let i = 0; i < initialEventCount; i++) {
|
||||
|
||||
@@ -4,10 +4,7 @@
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { Buffer } from 'buffer';
|
||||
import * as https from 'https';
|
||||
import { HttpsProxyAgent } from 'https-proxy-agent';
|
||||
|
||||
import {
|
||||
StartSessionEvent,
|
||||
EndSessionEvent,
|
||||
@@ -56,19 +53,25 @@ export interface LogEventEntry {
|
||||
source_extension_json: string;
|
||||
}
|
||||
|
||||
export type EventValue = {
|
||||
export interface EventValue {
|
||||
gemini_cli_key: EventMetadataKey | string;
|
||||
value: string;
|
||||
};
|
||||
}
|
||||
|
||||
export type LogEvent = {
|
||||
console_type: string;
|
||||
export interface LogEvent {
|
||||
console_type: 'GEMINI_CLI';
|
||||
application: number;
|
||||
event_name: string;
|
||||
event_metadata: EventValue[][];
|
||||
client_email?: string;
|
||||
client_install_id?: string;
|
||||
};
|
||||
}
|
||||
|
||||
export interface LogRequest {
|
||||
log_source_name: 'CONCORD';
|
||||
request_time_ms: number;
|
||||
log_event: LogEventEntry[][];
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the surface that the user is currently using. Surface is effectively the
|
||||
@@ -89,22 +92,59 @@ function determineSurface(): string {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clearcut URL to send logging events to.
|
||||
*/
|
||||
const CLEARCUT_URL = 'https://play.googleapis.com/log?format=json&hasfast=true';
|
||||
|
||||
/**
|
||||
* Interval in which buffered events are sent to clearcut.
|
||||
*/
|
||||
const FLUSH_INTERVAL_MS = 1000 * 60;
|
||||
|
||||
/**
|
||||
* Maximum amount of events to keep in memory. Events added after this amount
|
||||
* are dropped until the next flush to clearcut, which happens periodically as
|
||||
* defined by {@link FLUSH_INTERVAL_MS}.
|
||||
*/
|
||||
const MAX_EVENTS = 1000;
|
||||
|
||||
/**
|
||||
* Maximum events to retry after a failed clearcut flush
|
||||
*/
|
||||
const MAX_RETRY_EVENTS = 100;
|
||||
|
||||
// Singleton class for batch posting log events to Clearcut. When a new event comes in, the elapsed time
|
||||
// is checked and events are flushed to Clearcut if at least a minute has passed since the last flush.
|
||||
export class ClearcutLogger {
|
||||
private static instance: ClearcutLogger;
|
||||
private config?: Config;
|
||||
|
||||
/**
|
||||
* Queue of pending events that need to be flushed to the server. New events
|
||||
* are added to this queue and then flushed on demand (via `flushToClearcut`)
|
||||
*/
|
||||
private readonly events: FixedDeque<LogEventEntry[]>;
|
||||
private last_flush_time: number = Date.now();
|
||||
private flush_interval_ms: number = 1000 * 60; // Wait at least a minute before flushing events.
|
||||
private readonly max_events: number = 1000; // Maximum events to keep in memory
|
||||
private readonly max_retry_events: number = 100; // Maximum failed events to retry
|
||||
private flushing: boolean = false; // Prevent concurrent flush operations
|
||||
private pendingFlush: boolean = false; // Track if a flush was requested during an ongoing flush
|
||||
|
||||
/**
|
||||
* The last time that the events were successfully flushed to the server.
|
||||
*/
|
||||
private lastFlushTime: number = Date.now();
|
||||
|
||||
/**
|
||||
* the value is true when there is a pending flush happening. This prevents
|
||||
* concurrent flush operations.
|
||||
*/
|
||||
private flushing: boolean = false;
|
||||
|
||||
/**
|
||||
* This value is true when a flush was requested during an ongoing flush.
|
||||
*/
|
||||
private pendingFlush: boolean = false;
|
||||
|
||||
private constructor(config?: Config) {
|
||||
this.config = config;
|
||||
this.events = new FixedDeque<LogEventEntry[]>(Array, this.max_events);
|
||||
this.events = new FixedDeque<LogEventEntry[]>(Array, MAX_EVENTS);
|
||||
}
|
||||
|
||||
static getInstance(config?: Config): ClearcutLogger | undefined {
|
||||
@@ -125,7 +165,7 @@ export class ClearcutLogger {
|
||||
enqueueLogEvent(event: object): void {
|
||||
try {
|
||||
// Manually handle overflow for FixedDeque, which throws when full.
|
||||
const wasAtCapacity = this.events.size >= this.max_events;
|
||||
const wasAtCapacity = this.events.size >= MAX_EVENTS;
|
||||
|
||||
if (wasAtCapacity) {
|
||||
this.events.shift(); // Evict oldest element to make space.
|
||||
@@ -150,31 +190,14 @@ export class ClearcutLogger {
|
||||
}
|
||||
}
|
||||
|
||||
addDefaultFields(data: EventValue[]): void {
|
||||
const totalAccounts = getLifetimeGoogleAccounts();
|
||||
const surface = determineSurface();
|
||||
const defaultLogMetadata = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_GOOGLE_ACCOUNTS_COUNT,
|
||||
value: totalAccounts.toString(),
|
||||
},
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_SURFACE,
|
||||
value: surface,
|
||||
},
|
||||
];
|
||||
data.push(...defaultLogMetadata);
|
||||
}
|
||||
|
||||
createLogEvent(name: string, data: EventValue[]): LogEvent {
|
||||
const email = getCachedGoogleAccount();
|
||||
|
||||
// Add default fields that should exist for all logs
|
||||
this.addDefaultFields(data);
|
||||
data = addDefaultFields(data);
|
||||
|
||||
const logEvent: LogEvent = {
|
||||
console_type: 'GEMINI_CLI',
|
||||
application: 102,
|
||||
application: 102, // GEMINI_CLI
|
||||
event_name: name,
|
||||
event_metadata: [data],
|
||||
};
|
||||
@@ -190,7 +213,7 @@ export class ClearcutLogger {
|
||||
}
|
||||
|
||||
flushIfNeeded(): void {
|
||||
if (Date.now() - this.last_flush_time < this.flush_interval_ms) {
|
||||
if (Date.now() - this.lastFlushTime < FLUSH_INTERVAL_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -217,140 +240,67 @@ export class ClearcutLogger {
|
||||
const eventsToSend = this.events.toArray() as LogEventEntry[][];
|
||||
this.events.clear();
|
||||
|
||||
return new Promise<{ buffer: Buffer; statusCode?: number }>(
|
||||
(resolve, reject) => {
|
||||
const request = [
|
||||
{
|
||||
log_source_name: 'CONCORD',
|
||||
request_time_ms: Date.now(),
|
||||
log_event: eventsToSend,
|
||||
},
|
||||
];
|
||||
const body = safeJsonStringify(request);
|
||||
const options = {
|
||||
hostname: 'play.googleapis.com',
|
||||
path: '/log',
|
||||
method: 'POST',
|
||||
headers: { 'Content-Length': Buffer.byteLength(body) },
|
||||
timeout: 30000, // 30-second timeout
|
||||
};
|
||||
const bufs: Buffer[] = [];
|
||||
const req = https.request(
|
||||
{
|
||||
...options,
|
||||
agent: this.getProxyAgent(),
|
||||
},
|
||||
(res) => {
|
||||
res.on('error', reject); // Handle stream errors
|
||||
res.on('data', (buf) => bufs.push(buf));
|
||||
res.on('end', () => {
|
||||
try {
|
||||
const buffer = Buffer.concat(bufs);
|
||||
// Check if we got a successful response
|
||||
if (
|
||||
res.statusCode &&
|
||||
res.statusCode >= 200 &&
|
||||
res.statusCode < 300
|
||||
) {
|
||||
resolve({ buffer, statusCode: res.statusCode });
|
||||
} else {
|
||||
// HTTP error - reject with status code for retry handling
|
||||
reject(
|
||||
new Error(`HTTP ${res.statusCode}: ${res.statusMessage}`),
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
req.on('error', (e) => {
|
||||
// Network-level error
|
||||
reject(e);
|
||||
});
|
||||
req.on('timeout', () => {
|
||||
if (!req.destroyed) {
|
||||
req.destroy(new Error('Request timeout after 30 seconds'));
|
||||
}
|
||||
});
|
||||
req.end(body);
|
||||
const request: LogRequest[] = [
|
||||
{
|
||||
log_source_name: 'CONCORD',
|
||||
request_time_ms: Date.now(),
|
||||
log_event: eventsToSend,
|
||||
},
|
||||
)
|
||||
.then(({ buffer }) => {
|
||||
try {
|
||||
this.last_flush_time = Date.now();
|
||||
return this.decodeLogResponse(buffer) || {};
|
||||
} catch (error: unknown) {
|
||||
console.error('Error decoding log response:', error);
|
||||
return {};
|
||||
}
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
// Handle both network-level and HTTP-level errors
|
||||
];
|
||||
|
||||
let result: LogResponse = {};
|
||||
|
||||
try {
|
||||
const response = await fetch(CLEARCUT_URL, {
|
||||
method: 'POST',
|
||||
body: safeJsonStringify(request),
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
});
|
||||
|
||||
const responseBody = await response.text();
|
||||
|
||||
if (response.status >= 200 && response.status < 300) {
|
||||
this.lastFlushTime = Date.now();
|
||||
const nextRequestWaitMs = Number(JSON.parse(responseBody)[0]);
|
||||
result = {
|
||||
...result,
|
||||
nextRequestWaitMs,
|
||||
};
|
||||
} else {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('Error flushing log events:', error);
|
||||
console.error(
|
||||
`Error flushing log events: HTTP ${response.status}: ${response.statusText}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Re-queue failed events for retry
|
||||
this.requeueFailedEvents(eventsToSend);
|
||||
}
|
||||
} catch (e: unknown) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('Error flushing log events:', e as Error);
|
||||
}
|
||||
|
||||
// Return empty response to maintain the Promise<LogResponse> contract
|
||||
return {};
|
||||
})
|
||||
.finally(() => {
|
||||
this.flushing = false;
|
||||
// Re-queue failed events for retry
|
||||
this.requeueFailedEvents(eventsToSend);
|
||||
}
|
||||
|
||||
// If a flush was requested while we were flushing, flush again
|
||||
if (this.pendingFlush) {
|
||||
this.pendingFlush = false;
|
||||
// Fire and forget the pending flush
|
||||
this.flushToClearcut().catch((error) => {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error in pending flush to Clearcut:', error);
|
||||
}
|
||||
});
|
||||
this.flushing = false;
|
||||
|
||||
// If a flush was requested while we were flushing, flush again
|
||||
if (this.pendingFlush) {
|
||||
this.pendingFlush = false;
|
||||
// Fire and forget the pending flush
|
||||
this.flushToClearcut().catch((error) => {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error in pending flush to Clearcut:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Visible for testing. Decodes protobuf-encoded response from Clearcut server.
|
||||
decodeLogResponse(buf: Buffer): LogResponse | undefined {
|
||||
// TODO(obrienowen): return specific errors to facilitate debugging.
|
||||
if (buf.length < 1) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// The first byte of the buffer is `field<<3 | type`. We're looking for field
|
||||
// 1, with type varint, represented by type=0. If the first byte isn't 8, that
|
||||
// means field 1 is missing or the message is corrupted. Either way, we return
|
||||
// undefined.
|
||||
if (buf.readUInt8(0) !== 8) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
let ms = BigInt(0);
|
||||
let cont = true;
|
||||
|
||||
// In each byte, the most significant bit is the continuation bit. If it's
|
||||
// set, we keep going. The lowest 7 bits, are data bits. They are concatenated
|
||||
// in reverse order to form the final number.
|
||||
for (let i = 1; cont && i < buf.length; i++) {
|
||||
const byte = buf.readUInt8(i);
|
||||
ms |= BigInt(byte & 0x7f) << BigInt(7 * (i - 1));
|
||||
cont = (byte & 0x80) !== 0;
|
||||
}
|
||||
|
||||
if (cont) {
|
||||
// We have fallen off the buffer without seeing a terminating byte. The
|
||||
// message is corrupted.
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const returnVal = {
|
||||
nextRequestWaitMs: Number(ms),
|
||||
};
|
||||
return returnVal;
|
||||
return result;
|
||||
}
|
||||
|
||||
logStartSessionEvent(event: StartSessionEvent): void {
|
||||
@@ -752,24 +702,21 @@ export class ClearcutLogger {
|
||||
|
||||
private requeueFailedEvents(eventsToSend: LogEventEntry[][]): void {
|
||||
// Add the events back to the front of the queue to be retried, but limit retry queue size
|
||||
const eventsToRetry = eventsToSend.slice(-this.max_retry_events); // Keep only the most recent events
|
||||
const eventsToRetry = eventsToSend.slice(-MAX_RETRY_EVENTS); // Keep only the most recent events
|
||||
|
||||
// Log a warning if we're dropping events
|
||||
if (
|
||||
eventsToSend.length > this.max_retry_events &&
|
||||
this.config?.getDebugMode()
|
||||
) {
|
||||
if (eventsToSend.length > MAX_RETRY_EVENTS && this.config?.getDebugMode()) {
|
||||
console.warn(
|
||||
`ClearcutLogger: Dropping ${
|
||||
eventsToSend.length - this.max_retry_events
|
||||
eventsToSend.length - MAX_RETRY_EVENTS
|
||||
} events due to retry queue limit. Total events: ${
|
||||
eventsToSend.length
|
||||
}, keeping: ${this.max_retry_events}`,
|
||||
}, keeping: ${MAX_RETRY_EVENTS}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Determine how many events can be re-queued
|
||||
const availableSpace = this.max_events - this.events.size;
|
||||
const availableSpace = MAX_EVENTS - this.events.size;
|
||||
const numEventsToRequeue = Math.min(eventsToRetry.length, availableSpace);
|
||||
|
||||
if (numEventsToRequeue === 0) {
|
||||
@@ -792,7 +739,7 @@ export class ClearcutLogger {
|
||||
this.events.unshift(eventsToRequeue[i]);
|
||||
}
|
||||
// Clear any potential overflow
|
||||
while (this.events.size > this.max_events) {
|
||||
while (this.events.size > MAX_EVENTS) {
|
||||
this.events.pop();
|
||||
}
|
||||
|
||||
@@ -803,3 +750,28 @@ export class ClearcutLogger {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds default fields to data, and returns a new data array. This fields
|
||||
* should exist on all log events.
|
||||
*/
|
||||
function addDefaultFields(data: EventValue[]): EventValue[] {
|
||||
const totalAccounts = getLifetimeGoogleAccounts();
|
||||
const surface = determineSurface();
|
||||
const defaultLogMetadata: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_GOOGLE_ACCOUNTS_COUNT,
|
||||
value: `${totalAccounts}`,
|
||||
},
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_SURFACE,
|
||||
value: surface,
|
||||
},
|
||||
];
|
||||
return [...data, ...defaultLogMetadata];
|
||||
}
|
||||
|
||||
export const TEST_ONLY = {
|
||||
MAX_RETRY_EVENTS,
|
||||
MAX_EVENTS,
|
||||
};
|
||||
|
||||
36
packages/core/src/test-utils/config.ts
Normal file
36
packages/core/src/test-utils/config.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
/**
|
||||
* @license
|
||||
* Copyright 2025 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { Config, ConfigParameters } from '../config/config.js';
|
||||
|
||||
/**
|
||||
* Default parameters used for {@link FAKE_CONFIG}
|
||||
*/
|
||||
export const DEFAULT_CONFIG_PARAMETERS: ConfigParameters = {
|
||||
usageStatisticsEnabled: true,
|
||||
debugMode: false,
|
||||
sessionId: 'test-session-id',
|
||||
proxy: undefined,
|
||||
model: 'gemini-9001-super-duper',
|
||||
targetDir: '/',
|
||||
cwd: '/',
|
||||
};
|
||||
|
||||
/**
|
||||
* Produces a config. Default paramters are set to
|
||||
* {@link DEFAULT_CONFIG_PARAMETERS}, optionally, fields can be specified to
|
||||
* override those defaults.
|
||||
*/
|
||||
export function makeFakeConfig(
|
||||
config: Partial<ConfigParameters> = {
|
||||
...DEFAULT_CONFIG_PARAMETERS,
|
||||
},
|
||||
): Config {
|
||||
return new Config({
|
||||
...DEFAULT_CONFIG_PARAMETERS,
|
||||
...config,
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user