mirror of
https://github.com/QwenLM/qwen-code.git
synced 2025-12-19 09:33:53 +00:00
🔧 Miscellaneous Improvements and Refactoring (#466)
This commit is contained in:
@@ -7,7 +7,6 @@
|
||||
import { Buffer } from 'buffer';
|
||||
import * as https from 'https';
|
||||
import { HttpsProxyAgent } from 'https-proxy-agent';
|
||||
import { randomUUID } from 'crypto';
|
||||
|
||||
import {
|
||||
StartSessionEvent,
|
||||
@@ -22,6 +21,8 @@ import {
|
||||
NextSpeakerCheckEvent,
|
||||
SlashCommandEvent,
|
||||
MalformedJsonResponseEvent,
|
||||
IdeConnectionEvent,
|
||||
KittySequenceOverflowEvent,
|
||||
} from '../types.js';
|
||||
import {
|
||||
RumEvent,
|
||||
@@ -31,12 +32,12 @@ import {
|
||||
RumExceptionEvent,
|
||||
RumPayload,
|
||||
} from './event-types.js';
|
||||
// Removed unused EventMetadataKey import
|
||||
import { Config } from '../../config/config.js';
|
||||
import { safeJsonStringify } from '../../utils/safeJsonStringify.js';
|
||||
// Removed unused import
|
||||
import { HttpError, retryWithBackoff } from '../../utils/retry.js';
|
||||
import { getInstallationId } from '../../utils/user_id.js';
|
||||
import { FixedDeque } from 'mnemonist';
|
||||
import { AuthType } from '../../core/contentGenerator.js';
|
||||
|
||||
// Usage statistics collection endpoint
|
||||
const USAGE_STATS_HOSTNAME = 'gb4w8c3ygj-default-sea.rum.aliyuncs.com';
|
||||
@@ -44,6 +45,23 @@ const USAGE_STATS_PATH = '/';
|
||||
|
||||
const RUN_APP_ID = 'gb4w8c3ygj@851d5d500f08f92';
|
||||
|
||||
/**
|
||||
* Interval in which buffered events are sent to RUM.
|
||||
*/
|
||||
const FLUSH_INTERVAL_MS = 1000 * 60;
|
||||
|
||||
/**
|
||||
* Maximum amount of events to keep in memory. Events added after this amount
|
||||
* are dropped until the next flush to RUM, which happens periodically as
|
||||
* defined by {@link FLUSH_INTERVAL_MS}.
|
||||
*/
|
||||
const MAX_EVENTS = 1000;
|
||||
|
||||
/**
|
||||
* Maximum events to retry after a failed RUM flush
|
||||
*/
|
||||
const MAX_RETRY_EVENTS = 100;
|
||||
|
||||
export interface LogResponse {
|
||||
nextRequestWaitMs?: number;
|
||||
}
|
||||
@@ -53,23 +71,42 @@ export interface LogResponse {
|
||||
export class QwenLogger {
|
||||
private static instance: QwenLogger;
|
||||
private config?: Config;
|
||||
private readonly events: RumEvent[] = [];
|
||||
private last_flush_time: number = Date.now();
|
||||
private flush_interval_ms: number = 1000 * 60; // Wait at least a minute before flushing events.
|
||||
|
||||
/**
|
||||
* Queue of pending events that need to be flushed to the server. New events
|
||||
* are added to this queue and then flushed on demand (via `flushToRum`)
|
||||
*/
|
||||
private readonly events: FixedDeque<RumEvent>;
|
||||
|
||||
/**
|
||||
* The last time that the events were successfully flushed to the server.
|
||||
*/
|
||||
private lastFlushTime: number = Date.now();
|
||||
|
||||
private userId: string;
|
||||
private sessionId: string;
|
||||
private viewId: string;
|
||||
|
||||
/**
|
||||
* The value is true when there is a pending flush happening. This prevents
|
||||
* concurrent flush operations.
|
||||
*/
|
||||
private isFlushInProgress: boolean = false;
|
||||
|
||||
/**
|
||||
* This value is true when a flush was requested during an ongoing flush.
|
||||
*/
|
||||
private pendingFlush: boolean = false;
|
||||
|
||||
private isShutdown: boolean = false;
|
||||
|
||||
private constructor(config?: Config) {
|
||||
this.config = config;
|
||||
this.events = new FixedDeque<RumEvent>(Array, MAX_EVENTS);
|
||||
this.userId = this.generateUserId();
|
||||
this.sessionId =
|
||||
typeof this.config?.getSessionId === 'function'
|
||||
? this.config.getSessionId()
|
||||
: '';
|
||||
this.viewId = randomUUID();
|
||||
}
|
||||
|
||||
private generateUserId(): string {
|
||||
@@ -92,7 +129,26 @@ export class QwenLogger {
|
||||
}
|
||||
|
||||
enqueueLogEvent(event: RumEvent): void {
|
||||
this.events.push(event);
|
||||
try {
|
||||
// Manually handle overflow for FixedDeque, which throws when full.
|
||||
const wasAtCapacity = this.events.size >= MAX_EVENTS;
|
||||
|
||||
if (wasAtCapacity) {
|
||||
this.events.shift(); // Evict oldest element to make space.
|
||||
}
|
||||
|
||||
this.events.push(event);
|
||||
|
||||
if (wasAtCapacity && this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
`QwenLogger: Dropped old event to prevent memory leak (queue size: ${this.events.size})`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('QwenLogger: Failed to enqueue log event.', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
createRumEvent(
|
||||
@@ -143,6 +199,7 @@ export class QwenLogger {
|
||||
}
|
||||
|
||||
async createRumPayload(): Promise<RumPayload> {
|
||||
const authType = this.config?.getAuthType();
|
||||
const version = this.config?.getCliVersion() || 'unknown';
|
||||
|
||||
return {
|
||||
@@ -159,40 +216,59 @@ export class QwenLogger {
|
||||
id: this.sessionId,
|
||||
},
|
||||
view: {
|
||||
id: this.viewId,
|
||||
id: this.sessionId,
|
||||
name: 'qwen-code-cli',
|
||||
},
|
||||
events: [...this.events],
|
||||
|
||||
events: this.events.toArray() as RumEvent[],
|
||||
properties: {
|
||||
auth_type: authType,
|
||||
model: this.config?.getModel(),
|
||||
base_url:
|
||||
authType === AuthType.USE_OPENAI ? process.env.OPENAI_BASE_URL : '',
|
||||
},
|
||||
_v: `qwen-code@${version}`,
|
||||
};
|
||||
}
|
||||
|
||||
flushIfNeeded(): void {
|
||||
if (Date.now() - this.last_flush_time < this.flush_interval_ms) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Prevent concurrent flush operations
|
||||
if (this.isFlushInProgress) {
|
||||
if (Date.now() - this.lastFlushTime < FLUSH_INTERVAL_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.flushToRum().catch((error) => {
|
||||
console.debug('Error flushing to RUM:', error);
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error flushing to RUM:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async flushToRum(): Promise<LogResponse> {
|
||||
if (this.isFlushInProgress) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
'QwenLogger: Flush already in progress, marking pending flush.',
|
||||
);
|
||||
}
|
||||
this.pendingFlush = true;
|
||||
return Promise.resolve({});
|
||||
}
|
||||
this.isFlushInProgress = true;
|
||||
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.log('Flushing log events to RUM.');
|
||||
}
|
||||
if (this.events.length === 0) {
|
||||
if (this.events.size === 0) {
|
||||
this.isFlushInProgress = false;
|
||||
return {};
|
||||
}
|
||||
|
||||
this.isFlushInProgress = true;
|
||||
const eventsToSend = this.events.toArray() as RumEvent[];
|
||||
this.events.clear();
|
||||
|
||||
const rumPayload = await this.createRumPayload();
|
||||
// Override events with the ones we're sending
|
||||
rumPayload.events = eventsToSend;
|
||||
const flushFn = () =>
|
||||
new Promise<Buffer>((resolve, reject) => {
|
||||
const body = safeJsonStringify(rumPayload);
|
||||
@@ -246,16 +322,29 @@ export class QwenLogger {
|
||||
},
|
||||
});
|
||||
|
||||
this.events.splice(0, this.events.length);
|
||||
this.last_flush_time = Date.now();
|
||||
this.lastFlushTime = Date.now();
|
||||
return {};
|
||||
} catch (error) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('RUM flush failed after multiple retries.', error);
|
||||
}
|
||||
|
||||
// Re-queue failed events for retry
|
||||
this.requeueFailedEvents(eventsToSend);
|
||||
return {};
|
||||
} finally {
|
||||
this.isFlushInProgress = false;
|
||||
|
||||
// If a flush was requested while we were flushing, flush again
|
||||
if (this.pendingFlush) {
|
||||
this.pendingFlush = false;
|
||||
// Fire and forget the pending flush
|
||||
this.flushToRum().catch((error) => {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error in pending flush to RUM:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,7 +371,9 @@ export class QwenLogger {
|
||||
// Flush start event immediately
|
||||
this.enqueueLogEvent(applicationEvent);
|
||||
this.flushToRum().catch((error: unknown) => {
|
||||
console.debug('Error flushing to RUM:', error);
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error flushing to RUM:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -451,13 +542,41 @@ export class QwenLogger {
|
||||
this.flushIfNeeded();
|
||||
}
|
||||
|
||||
logIdeConnectionEvent(event: IdeConnectionEvent): void {
|
||||
const rumEvent = this.createActionEvent('connection', 'ide_connection', {
|
||||
snapshots: JSON.stringify({ connection_type: event.connection_type }),
|
||||
});
|
||||
|
||||
this.enqueueLogEvent(rumEvent);
|
||||
this.flushIfNeeded();
|
||||
}
|
||||
|
||||
logKittySequenceOverflowEvent(event: KittySequenceOverflowEvent): void {
|
||||
const rumEvent = this.createExceptionEvent(
|
||||
'overflow',
|
||||
'kitty_sequence_overflow',
|
||||
{
|
||||
subtype: 'kitty_sequence_overflow',
|
||||
snapshots: JSON.stringify({
|
||||
sequence_length: event.sequence_length,
|
||||
truncated_sequence: event.truncated_sequence,
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
this.enqueueLogEvent(rumEvent);
|
||||
this.flushIfNeeded();
|
||||
}
|
||||
|
||||
logEndSessionEvent(_event: EndSessionEvent): void {
|
||||
const applicationEvent = this.createViewEvent('session', 'session_end', {});
|
||||
|
||||
// Flush immediately on session end.
|
||||
this.enqueueLogEvent(applicationEvent);
|
||||
this.flushToRum().catch((error: unknown) => {
|
||||
console.debug('Error flushing to RUM:', error);
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error flushing to RUM:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -480,4 +599,60 @@ export class QwenLogger {
|
||||
const event = new EndSessionEvent(this.config);
|
||||
this.logEndSessionEvent(event);
|
||||
}
|
||||
|
||||
private requeueFailedEvents(eventsToSend: RumEvent[]): void {
|
||||
// Add the events back to the front of the queue to be retried, but limit retry queue size
|
||||
const eventsToRetry = eventsToSend.slice(-MAX_RETRY_EVENTS); // Keep only the most recent events
|
||||
|
||||
// Log a warning if we're dropping events
|
||||
if (eventsToSend.length > MAX_RETRY_EVENTS && this.config?.getDebugMode()) {
|
||||
console.warn(
|
||||
`QwenLogger: Dropping ${
|
||||
eventsToSend.length - MAX_RETRY_EVENTS
|
||||
} events due to retry queue limit. Total events: ${
|
||||
eventsToSend.length
|
||||
}, keeping: ${MAX_RETRY_EVENTS}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Determine how many events can be re-queued
|
||||
const availableSpace = MAX_EVENTS - this.events.size;
|
||||
const numEventsToRequeue = Math.min(eventsToRetry.length, availableSpace);
|
||||
|
||||
if (numEventsToRequeue === 0) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
`QwenLogger: No events re-queued (queue size: ${this.events.size})`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the most recent events to re-queue
|
||||
const eventsToRequeue = eventsToRetry.slice(
|
||||
eventsToRetry.length - numEventsToRequeue,
|
||||
);
|
||||
|
||||
// Prepend events to the front of the deque to be retried first.
|
||||
// We iterate backwards to maintain the original order of the failed events.
|
||||
for (let i = eventsToRequeue.length - 1; i >= 0; i--) {
|
||||
this.events.unshift(eventsToRequeue[i]);
|
||||
}
|
||||
// Clear any potential overflow
|
||||
while (this.events.size > MAX_EVENTS) {
|
||||
this.events.pop();
|
||||
}
|
||||
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
`QwenLogger: Re-queued ${numEventsToRequeue} events for retry (queue size: ${this.events.size})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const TEST_ONLY = {
|
||||
MAX_RETRY_EVENTS,
|
||||
MAX_EVENTS,
|
||||
FLUSH_INTERVAL_MS,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user