From c96852dc56494cbc0eb7457eb7d283858f6de295 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Mon, 11 Aug 2025 22:13:56 +0800 Subject: [PATCH] feat: add usage statistics logging for Qwen integration --- docs/cli/configuration.md | 2 + packages/core/src/config/config.ts | 4 +- .../clearcut-logger/event-metadata-key.ts | 2 +- .../telemetry/integration.test.circular.ts | 15 +- packages/core/src/telemetry/loggers.ts | 22 +- .../src/telemetry/qwen-logger/event-types.ts | 91 ++++ .../src/telemetry/qwen-logger/qwen-logger.ts | 490 ++++++++++++++++++ packages/core/src/telemetry/sdk.ts | 4 +- 8 files changed, 608 insertions(+), 22 deletions(-) create mode 100644 packages/core/src/telemetry/qwen-logger/event-types.ts create mode 100644 packages/core/src/telemetry/qwen-logger/qwen-logger.ts diff --git a/docs/cli/configuration.md b/docs/cli/configuration.md index 52824519..9c16b440 100644 --- a/docs/cli/configuration.md +++ b/docs/cli/configuration.md @@ -523,3 +523,5 @@ You can opt out of usage statistics collection at any time by setting the `usage "usageStatisticsEnabled": false } ``` + +Note: When usage statistics are enabled, events are sent to an Alibaba Cloud RUM collection endpoint. diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 684f1fdd..e854de31 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -43,7 +43,7 @@ import { DEFAULT_GEMINI_EMBEDDING_MODEL, DEFAULT_GEMINI_FLASH_MODEL, } from './models.js'; -import { ClearcutLogger } from '../telemetry/clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from '../telemetry/qwen-logger/qwen-logger.js'; import { shouldAttemptBrowserLaunch } from '../utils/browser.js'; import { MCPOAuthConfig } from '../mcp/oauth-provider.js'; import { IdeClient } from '../ide/ide-client.js'; @@ -360,7 +360,7 @@ export class Config { } if (this.getUsageStatisticsEnabled()) { - ClearcutLogger.getInstance(this)?.logStartSessionEvent( + QwenLogger.getInstance(this)?.logStartSessionEvent( new StartSessionEvent(this), ); } else { diff --git a/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts b/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts index 0fc35894..0c270f63 100644 --- a/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts +++ b/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -// Defines valid event metadata keys for Clearcut logging. +// Defines valid event metadata keys for Qwen logging. export enum EventMetadataKey { GEMINI_CLI_KEY_UNKNOWN = 0, diff --git a/packages/core/src/telemetry/integration.test.circular.ts b/packages/core/src/telemetry/integration.test.circular.ts index 958ec3cb..0d639925 100644 --- a/packages/core/src/telemetry/integration.test.circular.ts +++ b/packages/core/src/telemetry/integration.test.circular.ts @@ -9,11 +9,12 @@ */ import { describe, it, expect } from 'vitest'; -import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from './qwen-logger/qwen-logger.js'; +import { RumEvent } from './qwen-logger/event-types.js'; import { Config } from '../config/config.js'; describe('Circular Reference Integration Test', () => { - it('should handle HttpsProxyAgent-like circular references in clearcut logging', () => { + it('should handle HttpsProxyAgent-like circular references in qwen logging', () => { // Create a mock config with proxy const mockConfig = { getTelemetryEnabled: () => true, @@ -44,16 +45,18 @@ describe('Circular Reference Integration Test', () => { proxyAgentLike.sockets['cloudcode-pa.googleapis.com:443'] = [socketLike]; // Create an event that would contain this circular structure - const problematicEvent = { + const problematicEvent: RumEvent = { + timestamp: Date.now(), + event_type: 'exception', error: new Error('Network error'), function_args: { filePath: '/test/file.txt', httpAgent: proxyAgentLike, // This would cause the circular reference }, - }; + } as RumEvent; - // Test that ClearcutLogger can handle this - const logger = ClearcutLogger.getInstance(mockConfig); + // Test that QwenLogger can handle this + const logger = QwenLogger.getInstance(mockConfig); expect(() => { logger?.enqueueLogEvent(problematicEvent); diff --git a/packages/core/src/telemetry/loggers.ts b/packages/core/src/telemetry/loggers.ts index 2aa0d86a..d93580e5 100644 --- a/packages/core/src/telemetry/loggers.ts +++ b/packages/core/src/telemetry/loggers.ts @@ -39,7 +39,7 @@ import { } from './metrics.js'; import { isTelemetrySdkInitialized } from './sdk.js'; import { uiTelemetryService, UiEvent } from './uiTelemetry.js'; -import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from './qwen-logger/qwen-logger.js'; import { safeJsonStringify } from '../utils/safeJsonStringify.js'; const shouldLogUserPrompts = (config: Config): boolean => @@ -55,7 +55,7 @@ export function logCliConfiguration( config: Config, event: StartSessionEvent, ): void { - ClearcutLogger.getInstance(config)?.logStartSessionEvent(event); + QwenLogger.getInstance(config)?.logStartSessionEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -84,7 +84,7 @@ export function logCliConfiguration( } export function logUserPrompt(config: Config, event: UserPromptEvent): void { - ClearcutLogger.getInstance(config)?.logNewPromptEvent(event); + QwenLogger.getInstance(config)?.logNewPromptEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -113,7 +113,7 @@ export function logToolCall(config: Config, event: ToolCallEvent): void { 'event.timestamp': new Date().toISOString(), } as UiEvent; uiTelemetryService.addEvent(uiEvent); - ClearcutLogger.getInstance(config)?.logToolCallEvent(event); + QwenLogger.getInstance(config)?.logToolCallEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -146,7 +146,7 @@ export function logToolCall(config: Config, event: ToolCallEvent): void { } export function logApiRequest(config: Config, event: ApiRequestEvent): void { - ClearcutLogger.getInstance(config)?.logApiRequestEvent(event); + QwenLogger.getInstance(config)?.logApiRequestEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -168,7 +168,7 @@ export function logFlashFallback( config: Config, event: FlashFallbackEvent, ): void { - ClearcutLogger.getInstance(config)?.logFlashFallbackEvent(event); + QwenLogger.getInstance(config)?.logFlashFallbackEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -193,7 +193,7 @@ export function logApiError(config: Config, event: ApiErrorEvent): void { 'event.timestamp': new Date().toISOString(), } as UiEvent; uiTelemetryService.addEvent(uiEvent); - ClearcutLogger.getInstance(config)?.logApiErrorEvent(event); + QwenLogger.getInstance(config)?.logApiErrorEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -235,7 +235,7 @@ export function logApiResponse(config: Config, event: ApiResponseEvent): void { 'event.timestamp': new Date().toISOString(), } as UiEvent; uiTelemetryService.addEvent(uiEvent); - ClearcutLogger.getInstance(config)?.logApiResponseEvent(event); + QwenLogger.getInstance(config)?.logApiResponseEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { ...getCommonAttributes(config), @@ -298,7 +298,7 @@ export function logLoopDetected( config: Config, event: LoopDetectedEvent, ): void { - ClearcutLogger.getInstance(config)?.logLoopDetectedEvent(event); + QwenLogger.getInstance(config)?.logLoopDetectedEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -318,7 +318,7 @@ export function logNextSpeakerCheck( config: Config, event: NextSpeakerCheckEvent, ): void { - ClearcutLogger.getInstance(config)?.logNextSpeakerCheck(event); + QwenLogger.getInstance(config)?.logNextSpeakerCheck(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -339,7 +339,7 @@ export function logSlashCommand( config: Config, event: SlashCommandEvent, ): void { - ClearcutLogger.getInstance(config)?.logSlashCommandEvent(event); + QwenLogger.getInstance(config)?.logSlashCommandEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { diff --git a/packages/core/src/telemetry/qwen-logger/event-types.ts b/packages/core/src/telemetry/qwen-logger/event-types.ts new file mode 100644 index 00000000..14e0de83 --- /dev/null +++ b/packages/core/src/telemetry/qwen-logger/event-types.ts @@ -0,0 +1,91 @@ +// RUM Protocol Data Structures +export interface RumApp { + id: string; + env: string; + version: string; + type: 'cli' | 'extension'; +} + +export interface RumUser { + id: string; +} + +export interface RumSession { + id: string; +} + +export interface RumView { + id: string; + name: string; +} + +export interface RumEvent { + timestamp?: number; + event_type?: 'view' | 'action' | 'exception' | 'resource'; + // [key: string]: unknown; +} + +export interface RumViewEvent extends RumEvent { + type?: string; // View event type: pv, perf + name?: string; // View event name + view_type?: string; // View rendering type + time_spent?: number; // Time spent on current view in ms + snapshots?: string; // View snapshots JSON string, mainly for native apps +} + +export interface RumActionEvent extends RumEvent { + type?: string; // User action type + name?: string; // Semantic name, e.g.: click#checkout + target_name?: string; // Element user interacted with (for auto-collected actions only) + duration?: number; // Action duration in ms + snapshots?: string; // Action snapshots + method_info?: string; // Action callback, e.g.: onClick() +} + +export interface RumExceptionEvent extends RumEvent { + source?: string; // Error source, e.g.: console, event + file?: string; // Error file + type?: string; // Error type: crash, custom, error + subtype?: string; // Secondary classification of error type + name?: string; // Error name + message?: string; // Concise, readable message explaining the event + stack?: string; // Stack trace or supplemental information about the error + caused_by?: string; // Exception cause + line?: number; // Line number where exception occurred + column?: number; // Column number where exception occurred + thread_id?: string; // Thread ID + binary_images?: string; // Error source + snapshots?: string; // Error snapshots +} + +export interface RumResourceEvent extends RumEvent { + type?: string; // Resource type: css, javascript, media, XHR, image, navigation (XHR/fetch will be considered as API) + method?: string; // HTTP request method: POST, GET, etc. + status_code?: string; // Resource status code + message?: string; // Error message content, corresponds to resource.error_msg + url?: string; // Resource URL + name?: string; // Default is URL path part, can be matched by rules or user configuration + provider_type?: string; // Resource provider type: first-party, cdn, ad, analytics + trace_id?: string; // Resource request TraceID + success?: number; // Resource loading success: 1 (default) success, 0 failure + duration?: number; // Total time spent loading resource in ms (responseEnd - redirectStart) + size?: number; // Resource size in bytes, corresponds to decodedBodySize + connect_duration?: number; // Time spent establishing connection to server in ms (connectEnd - connectStart) + ssl_duration?: number; // Time spent on TLS handshake in ms (connectEnd - secureConnectionStart), 0 if no SSL + dns_duration?: number; // Time spent resolving DNS name in ms (domainLookupEnd - domainLookupStart) + redirect_duration?: number; // Time spent on HTTP redirects in ms (redirectEnd - redirectStart) + first_byte_duration?: number; // Time waiting for first byte of response in ms (responseStart - requestStart) + download_duration?: number; // Time spent downloading response in ms (responseEnd - responseStart) + timing_data?: string; // JSON string of PerformanceResourceTiming + trace_data?: string; // Trace information snapshot JSON string + snapshots?: string; // View snapshots, mainly for native apps +} + +export interface RumPayload { + app: RumApp; + user: RumUser; + session: RumSession; + view: RumView; + events: RumEvent[]; + _v: string; +} diff --git a/packages/core/src/telemetry/qwen-logger/qwen-logger.ts b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts new file mode 100644 index 00000000..0f67c5fa --- /dev/null +++ b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts @@ -0,0 +1,490 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Buffer } from 'buffer'; +import * as https from 'https'; +import { HttpsProxyAgent } from 'https-proxy-agent'; +import { randomUUID } from 'crypto'; + +import { + StartSessionEvent, + EndSessionEvent, + UserPromptEvent, + ToolCallEvent, + ApiRequestEvent, + ApiResponseEvent, + ApiErrorEvent, + FlashFallbackEvent, + LoopDetectedEvent, + NextSpeakerCheckEvent, + SlashCommandEvent, + MalformedJsonResponseEvent, +} from '../types.js'; +import { + RumEvent, + RumViewEvent, + RumActionEvent, + RumResourceEvent, + RumExceptionEvent, + RumPayload, +} from './event-types.js'; +// Removed unused EventMetadataKey import +import { Config } from '../../config/config.js'; +import { safeJsonStringify } from '../../utils/safeJsonStringify.js'; +// Removed unused import +import { HttpError, retryWithBackoff } from '../../utils/retry.js'; +import { getInstallationId } from '../../utils/user_id.js'; + +// Usage statistics collection endpoint +const USAGE_STATS_HOSTNAME = 'gb4w8c3ygj-default-sea.rum.aliyuncs.com'; +const USAGE_STATS_PATH = '/'; + +const RUN_APP_ID = 'gb4w8c3ygj@851d5d500f08f92'; + +export interface LogResponse { + nextRequestWaitMs?: number; +} + +// Singleton class for batch posting log events to RUM. When a new event comes in, the elapsed time +// is checked and events are flushed to RUM if at least a minute has passed since the last flush. +export class QwenLogger { + private static instance: QwenLogger; + private config?: Config; + private readonly events: RumEvent[] = []; + private last_flush_time: number = Date.now(); + private flush_interval_ms: number = 1000 * 60; // Wait at least a minute before flushing events. + private userId: string; + private sessionId: string; + private viewId: string; + private isFlushInProgress: boolean = false; + + private constructor(config?: Config) { + this.config = config; + this.userId = this.generateUserId(); + this.sessionId = this.config?.getSessionId() ?? ''; + this.viewId = randomUUID(); + } + + private generateUserId(): string { + // Use installation ID as user ID for consistency + return `user-${getInstallationId()}`; + } + + static getInstance(config?: Config): QwenLogger | undefined { + if (config === undefined || !config?.getUsageStatisticsEnabled()) + return undefined; + if (!QwenLogger.instance) { + QwenLogger.instance = new QwenLogger(config); + } + return QwenLogger.instance; + } + + enqueueLogEvent(event: RumEvent): void { + this.events.push(event); + } + + createRumEvent( + eventType: 'view' | 'action' | 'exception' | 'resource', + properties: RumEvent, + ): RumEvent { + return { + timestamp: Date.now(), + event_type: eventType, + ...properties, + }; + } + + createViewEvent(properties: RumViewEvent): RumEvent { + return this.createRumEvent('view', properties); + } + + createActionEvent(properties: RumActionEvent): RumEvent { + return this.createRumEvent('action', properties); + } + + createResourceEvent(properties: RumResourceEvent): RumEvent { + return this.createRumEvent('resource', properties); + } + + createExceptionEvent(properties: RumExceptionEvent): RumEvent { + return this.createRumEvent('exception', properties); + } + + createRumPayload(): RumPayload { + const version = process.env.CLI_VERSION || process.version; + + return { + app: { + id: RUN_APP_ID, + env: process.env.DEBUG ? 'dev' : 'prod', + version, + type: 'cli', + }, + user: { + id: this.userId, + }, + session: { + id: this.sessionId, + }, + view: { + id: this.viewId, + name: 'qwen-code-cli', + }, + events: [...this.events], + _v: `qwen-code@${version}`, + }; + } + + flushIfNeeded(): void { + if (Date.now() - this.last_flush_time < this.flush_interval_ms) { + return; + } + + // Prevent concurrent flush operations + if (this.isFlushInProgress) { + return; + } + + this.flushToRum().catch((error) => { + console.debug('Error flushing to RUM:', error); + }); + } + + async flushToRum(): Promise { + if (this.config?.getDebugMode()) { + console.log('Flushing log events to RUM.'); + } + if (this.events.length === 0) { + return {}; + } + + this.isFlushInProgress = true; + + const rumPayload = this.createRumPayload(); + const flushFn = () => + new Promise((resolve, reject) => { + const body = safeJsonStringify(rumPayload); + const options = { + hostname: USAGE_STATS_HOSTNAME, + path: USAGE_STATS_PATH, + method: 'POST', + headers: { + 'Content-Length': Buffer.byteLength(body), + 'Content-Type': 'text/plain;charset=UTF-8', + }, + }; + const bufs: Buffer[] = []; + const req = https.request( + { + ...options, + agent: this.getProxyAgent(), + }, + (res) => { + if ( + res.statusCode && + (res.statusCode < 200 || res.statusCode >= 300) + ) { + const err: HttpError = new Error( + `Request failed with status ${res.statusCode}`, + ); + err.status = res.statusCode; + res.resume(); + return reject(err); + } + res.on('data', (buf) => bufs.push(buf)); + res.on('end', () => resolve(Buffer.concat(bufs))); + }, + ); + req.on('error', reject); + req.end(body); + }); + + try { + await retryWithBackoff(flushFn, { + maxAttempts: 3, + initialDelayMs: 200, + shouldRetry: (err: unknown) => { + if (!(err instanceof Error)) return false; + const status = (err as HttpError).status as number | undefined; + // If status is not available, it's likely a network error + if (status === undefined) return true; + + // Retry on 429 (Too many Requests) and 5xx server errors. + return status === 429 || (status >= 500 && status < 600); + }, + }); + + this.events.splice(0, this.events.length); + this.last_flush_time = Date.now(); + return {}; + } catch (error) { + if (this.config?.getDebugMode()) { + console.error('RUM flush failed after multiple retries.', error); + } + return {}; + } finally { + this.isFlushInProgress = false; + } + } + + // Visible for testing. Decodes protobuf-encoded response from Qwen server. + decodeLogResponse(buf: Buffer): LogResponse | undefined { + // TODO(obrienowen): return specific errors to facilitate debugging. + if (buf.length < 1) { + return undefined; + } + + // The first byte of the buffer is `field<<3 | type`. We're looking for field + // 1, with type varint, represented by type=0. If the first byte isn't 8, that + // means field 1 is missing or the message is corrupted. Either way, we return + // undefined. + if (buf.readUInt8(0) !== 8) { + return undefined; + } + + let ms = BigInt(0); + let cont = true; + + // In each byte, the most significant bit is the continuation bit. If it's + // set, we keep going. The lowest 7 bits, are data bits. They are concatenated + // in reverse order to form the final number. + for (let i = 1; cont && i < buf.length; i++) { + const byte = buf.readUInt8(i); + ms |= BigInt(byte & 0x7f) << BigInt(7 * (i - 1)); + cont = (byte & 0x80) !== 0; + } + + if (cont) { + // We have fallen off the buffer without seeing a terminating byte. The + // message is corrupted. + return undefined; + } + + const returnVal = { + nextRequestWaitMs: Number(ms), + }; + return returnVal; + } + + logStartSessionEvent(event: StartSessionEvent): void { + const applicationEvent = this.createViewEvent({ + type: 'session', + name: 'session_start', + snapshots: JSON.stringify({ + model: event.model, + embedding_model: event.embedding_model, + sandbox_enabled: event.sandbox_enabled, + core_tools_enabled: event.core_tools_enabled, + approval_mode: event.approval_mode, + api_key_enabled: event.api_key_enabled, + vertex_ai_enabled: event.vertex_ai_enabled, + debug_enabled: event.debug_enabled, + mcp_servers: event.mcp_servers, + telemetry_enabled: event.telemetry_enabled, + telemetry_log_user_prompts_enabled: + event.telemetry_log_user_prompts_enabled, + file_filtering_respect_git_ignore: + event.file_filtering_respect_git_ignore, + }), + }); + + // Flush start event immediately + this.enqueueLogEvent(applicationEvent); + this.flushToRum().catch((error: unknown) => { + console.debug('Error flushing to RUM:', error); + }); + } + + logNewPromptEvent(event: UserPromptEvent): void { + const rumEvent = this.createActionEvent({ + type: 'user_prompt', + name: 'user_prompt', + snapshots: JSON.stringify({ + prompt_length: event.prompt_length, + prompt_id: event.prompt_id, + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logToolCallEvent(event: ToolCallEvent): void { + const rumEvent = this.createActionEvent({ + type: 'tool_call', + name: `tool_call#${event.function_name}`, + snapshots: JSON.stringify({ + function_name: event.function_name, + prompt_id: event.prompt_id, + decision: event.decision, + success: event.success, + duration_ms: event.duration_ms, + error: event.error, + error_type: event.error_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logApiRequestEvent(event: ApiRequestEvent): void { + const rumEvent = this.createResourceEvent({ + type: 'api', + name: 'api_request', + snapshots: JSON.stringify({ + model: event.model, + prompt_id: event.prompt_id, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logApiResponseEvent(event: ApiResponseEvent): void { + const rumEvent = this.createResourceEvent({ + type: 'api', + name: 'api_response', + status_code: event.status_code?.toString() ?? '', + duration: event.duration_ms, + success: event.status_code === 200 ? 1 : 0, + message: event.error, + snapshots: JSON.stringify({ + model: event.model, + prompt_id: event.prompt_id, + input_token_count: event.input_token_count, + output_token_count: event.output_token_count, + cached_content_token_count: event.cached_content_token_count, + thoughts_token_count: event.thoughts_token_count, + tool_token_count: event.tool_token_count, + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logApiErrorEvent(event: ApiErrorEvent): void { + const rumEvent = this.createExceptionEvent({ + type: 'error', + subtype: 'api_error', + message: event.error, + snapshots: JSON.stringify({ + model: event.model, + prompt_id: event.prompt_id, + error_type: event.error_type, + status_code: event.status_code, + duration_ms: event.duration_ms, + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logFlashFallbackEvent(event: FlashFallbackEvent): void { + const rumEvent = this.createActionEvent({ + type: 'fallback', + name: 'flash_fallback', + snapshots: JSON.stringify({ + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logLoopDetectedEvent(event: LoopDetectedEvent): void { + const rumEvent = this.createExceptionEvent({ + type: 'error', + subtype: 'loop_detected', + snapshots: JSON.stringify({ + prompt_id: event.prompt_id, + loop_type: event.loop_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logNextSpeakerCheck(event: NextSpeakerCheckEvent): void { + const rumEvent = this.createActionEvent({ + type: 'check', + name: 'next_speaker_check', + snapshots: JSON.stringify({ + prompt_id: event.prompt_id, + finish_reason: event.finish_reason, + result: event.result, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logSlashCommandEvent(event: SlashCommandEvent): void { + const rumEvent = this.createActionEvent({ + type: 'command', + name: 'slash_command', + snapshots: JSON.stringify({ + command: event.command, + subcommand: event.subcommand, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logMalformedJsonResponseEvent(event: MalformedJsonResponseEvent): void { + const rumEvent = this.createExceptionEvent({ + type: 'error', + subtype: 'malformed_json_response', + snapshots: JSON.stringify({ + model: event.model, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logEndSessionEvent(_event: EndSessionEvent): void { + const applicationEvent = this.createViewEvent({ + type: 'session', + name: 'session_end', + }); + + // Flush immediately on session end. + this.enqueueLogEvent(applicationEvent); + this.flushToRum().catch((error: unknown) => { + console.debug('Error flushing to RUM:', error); + }); + } + + getProxyAgent() { + const proxyUrl = this.config?.getProxy(); + if (!proxyUrl) return undefined; + // undici which is widely used in the repo can only support http & https proxy protocol, + // https://github.com/nodejs/undici/issues/2224 + if (proxyUrl.startsWith('http')) { + return new HttpsProxyAgent(proxyUrl); + } else { + throw new Error('Unsupported proxy type'); + } + } + + shutdown() { + const event = new EndSessionEvent(this.config); + this.logEndSessionEvent(event); + } +} diff --git a/packages/core/src/telemetry/sdk.ts b/packages/core/src/telemetry/sdk.ts index 1167750a..16c378b8 100644 --- a/packages/core/src/telemetry/sdk.ts +++ b/packages/core/src/telemetry/sdk.ts @@ -28,7 +28,7 @@ import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; import { Config } from '../config/config.js'; import { SERVICE_NAME } from './constants.js'; import { initializeMetrics } from './metrics.js'; -import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from './qwen-logger/qwen-logger.js'; import { FileLogExporter, FileMetricExporter, @@ -141,7 +141,7 @@ export async function shutdownTelemetry(): Promise { return; } try { - ClearcutLogger.getInstance()?.shutdown(); + QwenLogger.getInstance()?.shutdown(); await sdk.shutdown(); console.log('OpenTelemetry SDK shut down successfully.'); } catch (error) {