mirror of
https://github.com/QwenLM/qwen-code.git
synced 2025-12-20 08:47:44 +00:00
refactor(telemetry): enhance flushToClearcut method with retry logic and early return for empty events (#1601)
Co-authored-by: Scott Densmore <scottdensmore@mac.com>
This commit is contained in:
@@ -22,12 +22,13 @@ import {
|
|||||||
} from '../types.js';
|
} from '../types.js';
|
||||||
import { EventMetadataKey } from './event-metadata-key.js';
|
import { EventMetadataKey } from './event-metadata-key.js';
|
||||||
import { Config } from '../../config/config.js';
|
import { Config } from '../../config/config.js';
|
||||||
import { getInstallationId } from '../../utils/user_id.js';
|
import { safeJsonStringify } from '../../utils/safeJsonStringify.js';
|
||||||
import {
|
import {
|
||||||
getCachedGoogleAccount,
|
getCachedGoogleAccount,
|
||||||
getLifetimeGoogleAccounts,
|
getLifetimeGoogleAccounts,
|
||||||
} from '../../utils/user_account.js';
|
} from '../../utils/user_account.js';
|
||||||
import { safeJsonStringify } from '../../utils/safeJsonStringify.js';
|
import { HttpError, retryWithBackoff } from '../../utils/retry.js';
|
||||||
|
import { getInstallationId } from '../../utils/user_id.js';
|
||||||
|
|
||||||
const start_session_event_name = 'start_session';
|
const start_session_event_name = 'start_session';
|
||||||
const new_prompt_event_name = 'new_prompt';
|
const new_prompt_event_name = 'new_prompt';
|
||||||
@@ -113,66 +114,81 @@ export class ClearcutLogger {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
flushToClearcut(): Promise<LogResponse> {
|
async flushToClearcut(): Promise<LogResponse> {
|
||||||
if (this.config?.getDebugMode()) {
|
if (this.config?.getDebugMode()) {
|
||||||
console.log('Flushing log events to Clearcut.');
|
console.log('Flushing log events to Clearcut.');
|
||||||
}
|
}
|
||||||
const eventsToSend = [...this.events];
|
const eventsToSend = [...this.events];
|
||||||
this.events.length = 0;
|
if (eventsToSend.length === 0) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
return new Promise<Buffer>((resolve, reject) => {
|
const flushFn = () =>
|
||||||
const request = [
|
new Promise<Buffer>((resolve, reject) => {
|
||||||
{
|
const request = [
|
||||||
log_source_name: 'CONCORD',
|
{
|
||||||
request_time_ms: Date.now(),
|
log_source_name: 'CONCORD',
|
||||||
log_event: eventsToSend,
|
request_time_ms: Date.now(),
|
||||||
},
|
log_event: eventsToSend,
|
||||||
];
|
},
|
||||||
const body = safeJsonStringify(request);
|
];
|
||||||
const options = {
|
const body = safeJsonStringify(request);
|
||||||
hostname: 'play.googleapis.com',
|
const options = {
|
||||||
path: '/log',
|
hostname: 'play.googleapis.com',
|
||||||
method: 'POST',
|
path: '/log',
|
||||||
headers: { 'Content-Length': Buffer.byteLength(body) },
|
method: 'POST',
|
||||||
};
|
headers: { 'Content-Length': Buffer.byteLength(body) },
|
||||||
const bufs: Buffer[] = [];
|
};
|
||||||
const req = https.request(
|
const bufs: Buffer[] = [];
|
||||||
{
|
const req = https.request(
|
||||||
...options,
|
{
|
||||||
agent: this.getProxyAgent(),
|
...options,
|
||||||
},
|
agent: this.getProxyAgent(),
|
||||||
(res) => {
|
},
|
||||||
res.on('data', (buf) => bufs.push(buf));
|
(res) => {
|
||||||
res.on('end', () => {
|
if (
|
||||||
resolve(Buffer.concat(bufs));
|
res.statusCode &&
|
||||||
});
|
(res.statusCode < 200 || res.statusCode >= 300)
|
||||||
},
|
) {
|
||||||
);
|
const err: HttpError = new Error(
|
||||||
req.on('error', (e) => {
|
`Request failed with status ${res.statusCode}`,
|
||||||
if (this.config?.getDebugMode()) {
|
);
|
||||||
console.log('Clearcut POST request error: ', e);
|
err.status = res.statusCode;
|
||||||
}
|
res.resume();
|
||||||
// Add the events back to the front of the queue to be retried.
|
return reject(err);
|
||||||
this.events.unshift(...eventsToSend);
|
}
|
||||||
reject(e);
|
res.on('data', (buf) => bufs.push(buf));
|
||||||
|
res.on('end', () => resolve(Buffer.concat(bufs)));
|
||||||
|
},
|
||||||
|
);
|
||||||
|
req.on('error', reject);
|
||||||
|
req.end(body);
|
||||||
});
|
});
|
||||||
req.end(body);
|
|
||||||
})
|
try {
|
||||||
.then((buf: Buffer) => {
|
const responseBuffer = await retryWithBackoff(flushFn, {
|
||||||
try {
|
maxAttempts: 3,
|
||||||
this.last_flush_time = Date.now();
|
initialDelayMs: 200,
|
||||||
return this.decodeLogResponse(buf) || {};
|
shouldRetry: (err: unknown) => {
|
||||||
} catch (error: unknown) {
|
if (!(err instanceof Error)) return false;
|
||||||
console.error('Error flushing log events:', error);
|
const status = (err as HttpError).status as number | undefined;
|
||||||
return {};
|
// If status is not available, it's likely a network error
|
||||||
}
|
if (status === undefined) return true;
|
||||||
})
|
|
||||||
.catch((error: unknown) => {
|
// Retry on 429 (Too many Requests) and 5xx server errors.
|
||||||
// Handle all errors to prevent unhandled promise rejections
|
return status === 429 || (status >= 500 && status < 600);
|
||||||
console.error('Error flushing log events:', error);
|
},
|
||||||
// Return empty response to maintain the Promise<LogResponse> contract
|
|
||||||
return {};
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.events.splice(0, eventsToSend.length);
|
||||||
|
this.last_flush_time = Date.now();
|
||||||
|
return this.decodeLogResponse(responseBuffer) || {};
|
||||||
|
} catch (error) {
|
||||||
|
if (this.config?.getDebugMode()) {
|
||||||
|
console.error('Clearcut flush failed after multiple retries.', error);
|
||||||
|
}
|
||||||
|
return {};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing. Decodes protobuf-encoded response from Clearcut server.
|
// Visible for testing. Decodes protobuf-encoded response from Clearcut server.
|
||||||
|
|||||||
@@ -6,14 +6,9 @@
|
|||||||
|
|
||||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||||
import { retryWithBackoff } from './retry.js';
|
import { retryWithBackoff, HttpError } from './retry.js';
|
||||||
import { setSimulate429 } from './testUtils.js';
|
import { setSimulate429 } from './testUtils.js';
|
||||||
|
|
||||||
// Define an interface for the error with a status property
|
|
||||||
interface HttpError extends Error {
|
|
||||||
status?: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper to create a mock function that fails a certain number of times
|
// Helper to create a mock function that fails a certain number of times
|
||||||
const createFailingFunction = (
|
const createFailingFunction = (
|
||||||
failures: number,
|
failures: number,
|
||||||
|
|||||||
@@ -10,6 +10,10 @@ import {
|
|||||||
isGenericQuotaExceededError,
|
isGenericQuotaExceededError,
|
||||||
} from './quotaErrorDetection.js';
|
} from './quotaErrorDetection.js';
|
||||||
|
|
||||||
|
export interface HttpError extends Error {
|
||||||
|
status?: number;
|
||||||
|
}
|
||||||
|
|
||||||
export interface RetryOptions {
|
export interface RetryOptions {
|
||||||
maxAttempts: number;
|
maxAttempts: number;
|
||||||
initialDelayMs: number;
|
initialDelayMs: number;
|
||||||
|
|||||||
Reference in New Issue
Block a user