From c96852dc56494cbc0eb7457eb7d283858f6de295 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Mon, 11 Aug 2025 22:13:56 +0800 Subject: [PATCH 1/3] feat: add usage statistics logging for Qwen integration --- docs/cli/configuration.md | 2 + packages/core/src/config/config.ts | 4 +- .../clearcut-logger/event-metadata-key.ts | 2 +- .../telemetry/integration.test.circular.ts | 15 +- packages/core/src/telemetry/loggers.ts | 22 +- .../src/telemetry/qwen-logger/event-types.ts | 91 ++++ .../src/telemetry/qwen-logger/qwen-logger.ts | 490 ++++++++++++++++++ packages/core/src/telemetry/sdk.ts | 4 +- 8 files changed, 608 insertions(+), 22 deletions(-) create mode 100644 packages/core/src/telemetry/qwen-logger/event-types.ts create mode 100644 packages/core/src/telemetry/qwen-logger/qwen-logger.ts diff --git a/docs/cli/configuration.md b/docs/cli/configuration.md index 52824519..9c16b440 100644 --- a/docs/cli/configuration.md +++ b/docs/cli/configuration.md @@ -523,3 +523,5 @@ You can opt out of usage statistics collection at any time by setting the `usage "usageStatisticsEnabled": false } ``` + +Note: When usage statistics are enabled, events are sent to an Alibaba Cloud RUM collection endpoint. diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 684f1fdd..e854de31 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -43,7 +43,7 @@ import { DEFAULT_GEMINI_EMBEDDING_MODEL, DEFAULT_GEMINI_FLASH_MODEL, } from './models.js'; -import { ClearcutLogger } from '../telemetry/clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from '../telemetry/qwen-logger/qwen-logger.js'; import { shouldAttemptBrowserLaunch } from '../utils/browser.js'; import { MCPOAuthConfig } from '../mcp/oauth-provider.js'; import { IdeClient } from '../ide/ide-client.js'; @@ -360,7 +360,7 @@ export class Config { } if (this.getUsageStatisticsEnabled()) { - ClearcutLogger.getInstance(this)?.logStartSessionEvent( + QwenLogger.getInstance(this)?.logStartSessionEvent( new StartSessionEvent(this), ); } else { diff --git a/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts b/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts index 0fc35894..0c270f63 100644 --- a/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts +++ b/packages/core/src/telemetry/clearcut-logger/event-metadata-key.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -// Defines valid event metadata keys for Clearcut logging. +// Defines valid event metadata keys for Qwen logging. export enum EventMetadataKey { GEMINI_CLI_KEY_UNKNOWN = 0, diff --git a/packages/core/src/telemetry/integration.test.circular.ts b/packages/core/src/telemetry/integration.test.circular.ts index 958ec3cb..0d639925 100644 --- a/packages/core/src/telemetry/integration.test.circular.ts +++ b/packages/core/src/telemetry/integration.test.circular.ts @@ -9,11 +9,12 @@ */ import { describe, it, expect } from 'vitest'; -import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from './qwen-logger/qwen-logger.js'; +import { RumEvent } from './qwen-logger/event-types.js'; import { Config } from '../config/config.js'; describe('Circular Reference Integration Test', () => { - it('should handle HttpsProxyAgent-like circular references in clearcut logging', () => { + it('should handle HttpsProxyAgent-like circular references in qwen logging', () => { // Create a mock config with proxy const mockConfig = { getTelemetryEnabled: () => true, @@ -44,16 +45,18 @@ describe('Circular Reference Integration Test', () => { proxyAgentLike.sockets['cloudcode-pa.googleapis.com:443'] = [socketLike]; // Create an event that would contain this circular structure - const problematicEvent = { + const problematicEvent: RumEvent = { + timestamp: Date.now(), + event_type: 'exception', error: new Error('Network error'), function_args: { filePath: '/test/file.txt', httpAgent: proxyAgentLike, // This would cause the circular reference }, - }; + } as RumEvent; - // Test that ClearcutLogger can handle this - const logger = ClearcutLogger.getInstance(mockConfig); + // Test that QwenLogger can handle this + const logger = QwenLogger.getInstance(mockConfig); expect(() => { logger?.enqueueLogEvent(problematicEvent); diff --git a/packages/core/src/telemetry/loggers.ts b/packages/core/src/telemetry/loggers.ts index 2aa0d86a..d93580e5 100644 --- a/packages/core/src/telemetry/loggers.ts +++ b/packages/core/src/telemetry/loggers.ts @@ -39,7 +39,7 @@ import { } from './metrics.js'; import { isTelemetrySdkInitialized } from './sdk.js'; import { uiTelemetryService, UiEvent } from './uiTelemetry.js'; -import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from './qwen-logger/qwen-logger.js'; import { safeJsonStringify } from '../utils/safeJsonStringify.js'; const shouldLogUserPrompts = (config: Config): boolean => @@ -55,7 +55,7 @@ export function logCliConfiguration( config: Config, event: StartSessionEvent, ): void { - ClearcutLogger.getInstance(config)?.logStartSessionEvent(event); + QwenLogger.getInstance(config)?.logStartSessionEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -84,7 +84,7 @@ export function logCliConfiguration( } export function logUserPrompt(config: Config, event: UserPromptEvent): void { - ClearcutLogger.getInstance(config)?.logNewPromptEvent(event); + QwenLogger.getInstance(config)?.logNewPromptEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -113,7 +113,7 @@ export function logToolCall(config: Config, event: ToolCallEvent): void { 'event.timestamp': new Date().toISOString(), } as UiEvent; uiTelemetryService.addEvent(uiEvent); - ClearcutLogger.getInstance(config)?.logToolCallEvent(event); + QwenLogger.getInstance(config)?.logToolCallEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -146,7 +146,7 @@ export function logToolCall(config: Config, event: ToolCallEvent): void { } export function logApiRequest(config: Config, event: ApiRequestEvent): void { - ClearcutLogger.getInstance(config)?.logApiRequestEvent(event); + QwenLogger.getInstance(config)?.logApiRequestEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -168,7 +168,7 @@ export function logFlashFallback( config: Config, event: FlashFallbackEvent, ): void { - ClearcutLogger.getInstance(config)?.logFlashFallbackEvent(event); + QwenLogger.getInstance(config)?.logFlashFallbackEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -193,7 +193,7 @@ export function logApiError(config: Config, event: ApiErrorEvent): void { 'event.timestamp': new Date().toISOString(), } as UiEvent; uiTelemetryService.addEvent(uiEvent); - ClearcutLogger.getInstance(config)?.logApiErrorEvent(event); + QwenLogger.getInstance(config)?.logApiErrorEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -235,7 +235,7 @@ export function logApiResponse(config: Config, event: ApiResponseEvent): void { 'event.timestamp': new Date().toISOString(), } as UiEvent; uiTelemetryService.addEvent(uiEvent); - ClearcutLogger.getInstance(config)?.logApiResponseEvent(event); + QwenLogger.getInstance(config)?.logApiResponseEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { ...getCommonAttributes(config), @@ -298,7 +298,7 @@ export function logLoopDetected( config: Config, event: LoopDetectedEvent, ): void { - ClearcutLogger.getInstance(config)?.logLoopDetectedEvent(event); + QwenLogger.getInstance(config)?.logLoopDetectedEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -318,7 +318,7 @@ export function logNextSpeakerCheck( config: Config, event: NextSpeakerCheckEvent, ): void { - ClearcutLogger.getInstance(config)?.logNextSpeakerCheck(event); + QwenLogger.getInstance(config)?.logNextSpeakerCheck(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { @@ -339,7 +339,7 @@ export function logSlashCommand( config: Config, event: SlashCommandEvent, ): void { - ClearcutLogger.getInstance(config)?.logSlashCommandEvent(event); + QwenLogger.getInstance(config)?.logSlashCommandEvent(event); if (!isTelemetrySdkInitialized()) return; const attributes: LogAttributes = { diff --git a/packages/core/src/telemetry/qwen-logger/event-types.ts b/packages/core/src/telemetry/qwen-logger/event-types.ts new file mode 100644 index 00000000..14e0de83 --- /dev/null +++ b/packages/core/src/telemetry/qwen-logger/event-types.ts @@ -0,0 +1,91 @@ +// RUM Protocol Data Structures +export interface RumApp { + id: string; + env: string; + version: string; + type: 'cli' | 'extension'; +} + +export interface RumUser { + id: string; +} + +export interface RumSession { + id: string; +} + +export interface RumView { + id: string; + name: string; +} + +export interface RumEvent { + timestamp?: number; + event_type?: 'view' | 'action' | 'exception' | 'resource'; + // [key: string]: unknown; +} + +export interface RumViewEvent extends RumEvent { + type?: string; // View event type: pv, perf + name?: string; // View event name + view_type?: string; // View rendering type + time_spent?: number; // Time spent on current view in ms + snapshots?: string; // View snapshots JSON string, mainly for native apps +} + +export interface RumActionEvent extends RumEvent { + type?: string; // User action type + name?: string; // Semantic name, e.g.: click#checkout + target_name?: string; // Element user interacted with (for auto-collected actions only) + duration?: number; // Action duration in ms + snapshots?: string; // Action snapshots + method_info?: string; // Action callback, e.g.: onClick() +} + +export interface RumExceptionEvent extends RumEvent { + source?: string; // Error source, e.g.: console, event + file?: string; // Error file + type?: string; // Error type: crash, custom, error + subtype?: string; // Secondary classification of error type + name?: string; // Error name + message?: string; // Concise, readable message explaining the event + stack?: string; // Stack trace or supplemental information about the error + caused_by?: string; // Exception cause + line?: number; // Line number where exception occurred + column?: number; // Column number where exception occurred + thread_id?: string; // Thread ID + binary_images?: string; // Error source + snapshots?: string; // Error snapshots +} + +export interface RumResourceEvent extends RumEvent { + type?: string; // Resource type: css, javascript, media, XHR, image, navigation (XHR/fetch will be considered as API) + method?: string; // HTTP request method: POST, GET, etc. + status_code?: string; // Resource status code + message?: string; // Error message content, corresponds to resource.error_msg + url?: string; // Resource URL + name?: string; // Default is URL path part, can be matched by rules or user configuration + provider_type?: string; // Resource provider type: first-party, cdn, ad, analytics + trace_id?: string; // Resource request TraceID + success?: number; // Resource loading success: 1 (default) success, 0 failure + duration?: number; // Total time spent loading resource in ms (responseEnd - redirectStart) + size?: number; // Resource size in bytes, corresponds to decodedBodySize + connect_duration?: number; // Time spent establishing connection to server in ms (connectEnd - connectStart) + ssl_duration?: number; // Time spent on TLS handshake in ms (connectEnd - secureConnectionStart), 0 if no SSL + dns_duration?: number; // Time spent resolving DNS name in ms (domainLookupEnd - domainLookupStart) + redirect_duration?: number; // Time spent on HTTP redirects in ms (redirectEnd - redirectStart) + first_byte_duration?: number; // Time waiting for first byte of response in ms (responseStart - requestStart) + download_duration?: number; // Time spent downloading response in ms (responseEnd - responseStart) + timing_data?: string; // JSON string of PerformanceResourceTiming + trace_data?: string; // Trace information snapshot JSON string + snapshots?: string; // View snapshots, mainly for native apps +} + +export interface RumPayload { + app: RumApp; + user: RumUser; + session: RumSession; + view: RumView; + events: RumEvent[]; + _v: string; +} diff --git a/packages/core/src/telemetry/qwen-logger/qwen-logger.ts b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts new file mode 100644 index 00000000..0f67c5fa --- /dev/null +++ b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts @@ -0,0 +1,490 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Buffer } from 'buffer'; +import * as https from 'https'; +import { HttpsProxyAgent } from 'https-proxy-agent'; +import { randomUUID } from 'crypto'; + +import { + StartSessionEvent, + EndSessionEvent, + UserPromptEvent, + ToolCallEvent, + ApiRequestEvent, + ApiResponseEvent, + ApiErrorEvent, + FlashFallbackEvent, + LoopDetectedEvent, + NextSpeakerCheckEvent, + SlashCommandEvent, + MalformedJsonResponseEvent, +} from '../types.js'; +import { + RumEvent, + RumViewEvent, + RumActionEvent, + RumResourceEvent, + RumExceptionEvent, + RumPayload, +} from './event-types.js'; +// Removed unused EventMetadataKey import +import { Config } from '../../config/config.js'; +import { safeJsonStringify } from '../../utils/safeJsonStringify.js'; +// Removed unused import +import { HttpError, retryWithBackoff } from '../../utils/retry.js'; +import { getInstallationId } from '../../utils/user_id.js'; + +// Usage statistics collection endpoint +const USAGE_STATS_HOSTNAME = 'gb4w8c3ygj-default-sea.rum.aliyuncs.com'; +const USAGE_STATS_PATH = '/'; + +const RUN_APP_ID = 'gb4w8c3ygj@851d5d500f08f92'; + +export interface LogResponse { + nextRequestWaitMs?: number; +} + +// Singleton class for batch posting log events to RUM. When a new event comes in, the elapsed time +// is checked and events are flushed to RUM if at least a minute has passed since the last flush. +export class QwenLogger { + private static instance: QwenLogger; + private config?: Config; + private readonly events: RumEvent[] = []; + private last_flush_time: number = Date.now(); + private flush_interval_ms: number = 1000 * 60; // Wait at least a minute before flushing events. + private userId: string; + private sessionId: string; + private viewId: string; + private isFlushInProgress: boolean = false; + + private constructor(config?: Config) { + this.config = config; + this.userId = this.generateUserId(); + this.sessionId = this.config?.getSessionId() ?? ''; + this.viewId = randomUUID(); + } + + private generateUserId(): string { + // Use installation ID as user ID for consistency + return `user-${getInstallationId()}`; + } + + static getInstance(config?: Config): QwenLogger | undefined { + if (config === undefined || !config?.getUsageStatisticsEnabled()) + return undefined; + if (!QwenLogger.instance) { + QwenLogger.instance = new QwenLogger(config); + } + return QwenLogger.instance; + } + + enqueueLogEvent(event: RumEvent): void { + this.events.push(event); + } + + createRumEvent( + eventType: 'view' | 'action' | 'exception' | 'resource', + properties: RumEvent, + ): RumEvent { + return { + timestamp: Date.now(), + event_type: eventType, + ...properties, + }; + } + + createViewEvent(properties: RumViewEvent): RumEvent { + return this.createRumEvent('view', properties); + } + + createActionEvent(properties: RumActionEvent): RumEvent { + return this.createRumEvent('action', properties); + } + + createResourceEvent(properties: RumResourceEvent): RumEvent { + return this.createRumEvent('resource', properties); + } + + createExceptionEvent(properties: RumExceptionEvent): RumEvent { + return this.createRumEvent('exception', properties); + } + + createRumPayload(): RumPayload { + const version = process.env.CLI_VERSION || process.version; + + return { + app: { + id: RUN_APP_ID, + env: process.env.DEBUG ? 'dev' : 'prod', + version, + type: 'cli', + }, + user: { + id: this.userId, + }, + session: { + id: this.sessionId, + }, + view: { + id: this.viewId, + name: 'qwen-code-cli', + }, + events: [...this.events], + _v: `qwen-code@${version}`, + }; + } + + flushIfNeeded(): void { + if (Date.now() - this.last_flush_time < this.flush_interval_ms) { + return; + } + + // Prevent concurrent flush operations + if (this.isFlushInProgress) { + return; + } + + this.flushToRum().catch((error) => { + console.debug('Error flushing to RUM:', error); + }); + } + + async flushToRum(): Promise { + if (this.config?.getDebugMode()) { + console.log('Flushing log events to RUM.'); + } + if (this.events.length === 0) { + return {}; + } + + this.isFlushInProgress = true; + + const rumPayload = this.createRumPayload(); + const flushFn = () => + new Promise((resolve, reject) => { + const body = safeJsonStringify(rumPayload); + const options = { + hostname: USAGE_STATS_HOSTNAME, + path: USAGE_STATS_PATH, + method: 'POST', + headers: { + 'Content-Length': Buffer.byteLength(body), + 'Content-Type': 'text/plain;charset=UTF-8', + }, + }; + const bufs: Buffer[] = []; + const req = https.request( + { + ...options, + agent: this.getProxyAgent(), + }, + (res) => { + if ( + res.statusCode && + (res.statusCode < 200 || res.statusCode >= 300) + ) { + const err: HttpError = new Error( + `Request failed with status ${res.statusCode}`, + ); + err.status = res.statusCode; + res.resume(); + return reject(err); + } + res.on('data', (buf) => bufs.push(buf)); + res.on('end', () => resolve(Buffer.concat(bufs))); + }, + ); + req.on('error', reject); + req.end(body); + }); + + try { + await retryWithBackoff(flushFn, { + maxAttempts: 3, + initialDelayMs: 200, + shouldRetry: (err: unknown) => { + if (!(err instanceof Error)) return false; + const status = (err as HttpError).status as number | undefined; + // If status is not available, it's likely a network error + if (status === undefined) return true; + + // Retry on 429 (Too many Requests) and 5xx server errors. + return status === 429 || (status >= 500 && status < 600); + }, + }); + + this.events.splice(0, this.events.length); + this.last_flush_time = Date.now(); + return {}; + } catch (error) { + if (this.config?.getDebugMode()) { + console.error('RUM flush failed after multiple retries.', error); + } + return {}; + } finally { + this.isFlushInProgress = false; + } + } + + // Visible for testing. Decodes protobuf-encoded response from Qwen server. + decodeLogResponse(buf: Buffer): LogResponse | undefined { + // TODO(obrienowen): return specific errors to facilitate debugging. + if (buf.length < 1) { + return undefined; + } + + // The first byte of the buffer is `field<<3 | type`. We're looking for field + // 1, with type varint, represented by type=0. If the first byte isn't 8, that + // means field 1 is missing or the message is corrupted. Either way, we return + // undefined. + if (buf.readUInt8(0) !== 8) { + return undefined; + } + + let ms = BigInt(0); + let cont = true; + + // In each byte, the most significant bit is the continuation bit. If it's + // set, we keep going. The lowest 7 bits, are data bits. They are concatenated + // in reverse order to form the final number. + for (let i = 1; cont && i < buf.length; i++) { + const byte = buf.readUInt8(i); + ms |= BigInt(byte & 0x7f) << BigInt(7 * (i - 1)); + cont = (byte & 0x80) !== 0; + } + + if (cont) { + // We have fallen off the buffer without seeing a terminating byte. The + // message is corrupted. + return undefined; + } + + const returnVal = { + nextRequestWaitMs: Number(ms), + }; + return returnVal; + } + + logStartSessionEvent(event: StartSessionEvent): void { + const applicationEvent = this.createViewEvent({ + type: 'session', + name: 'session_start', + snapshots: JSON.stringify({ + model: event.model, + embedding_model: event.embedding_model, + sandbox_enabled: event.sandbox_enabled, + core_tools_enabled: event.core_tools_enabled, + approval_mode: event.approval_mode, + api_key_enabled: event.api_key_enabled, + vertex_ai_enabled: event.vertex_ai_enabled, + debug_enabled: event.debug_enabled, + mcp_servers: event.mcp_servers, + telemetry_enabled: event.telemetry_enabled, + telemetry_log_user_prompts_enabled: + event.telemetry_log_user_prompts_enabled, + file_filtering_respect_git_ignore: + event.file_filtering_respect_git_ignore, + }), + }); + + // Flush start event immediately + this.enqueueLogEvent(applicationEvent); + this.flushToRum().catch((error: unknown) => { + console.debug('Error flushing to RUM:', error); + }); + } + + logNewPromptEvent(event: UserPromptEvent): void { + const rumEvent = this.createActionEvent({ + type: 'user_prompt', + name: 'user_prompt', + snapshots: JSON.stringify({ + prompt_length: event.prompt_length, + prompt_id: event.prompt_id, + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logToolCallEvent(event: ToolCallEvent): void { + const rumEvent = this.createActionEvent({ + type: 'tool_call', + name: `tool_call#${event.function_name}`, + snapshots: JSON.stringify({ + function_name: event.function_name, + prompt_id: event.prompt_id, + decision: event.decision, + success: event.success, + duration_ms: event.duration_ms, + error: event.error, + error_type: event.error_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logApiRequestEvent(event: ApiRequestEvent): void { + const rumEvent = this.createResourceEvent({ + type: 'api', + name: 'api_request', + snapshots: JSON.stringify({ + model: event.model, + prompt_id: event.prompt_id, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logApiResponseEvent(event: ApiResponseEvent): void { + const rumEvent = this.createResourceEvent({ + type: 'api', + name: 'api_response', + status_code: event.status_code?.toString() ?? '', + duration: event.duration_ms, + success: event.status_code === 200 ? 1 : 0, + message: event.error, + snapshots: JSON.stringify({ + model: event.model, + prompt_id: event.prompt_id, + input_token_count: event.input_token_count, + output_token_count: event.output_token_count, + cached_content_token_count: event.cached_content_token_count, + thoughts_token_count: event.thoughts_token_count, + tool_token_count: event.tool_token_count, + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logApiErrorEvent(event: ApiErrorEvent): void { + const rumEvent = this.createExceptionEvent({ + type: 'error', + subtype: 'api_error', + message: event.error, + snapshots: JSON.stringify({ + model: event.model, + prompt_id: event.prompt_id, + error_type: event.error_type, + status_code: event.status_code, + duration_ms: event.duration_ms, + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logFlashFallbackEvent(event: FlashFallbackEvent): void { + const rumEvent = this.createActionEvent({ + type: 'fallback', + name: 'flash_fallback', + snapshots: JSON.stringify({ + auth_type: event.auth_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logLoopDetectedEvent(event: LoopDetectedEvent): void { + const rumEvent = this.createExceptionEvent({ + type: 'error', + subtype: 'loop_detected', + snapshots: JSON.stringify({ + prompt_id: event.prompt_id, + loop_type: event.loop_type, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logNextSpeakerCheck(event: NextSpeakerCheckEvent): void { + const rumEvent = this.createActionEvent({ + type: 'check', + name: 'next_speaker_check', + snapshots: JSON.stringify({ + prompt_id: event.prompt_id, + finish_reason: event.finish_reason, + result: event.result, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logSlashCommandEvent(event: SlashCommandEvent): void { + const rumEvent = this.createActionEvent({ + type: 'command', + name: 'slash_command', + snapshots: JSON.stringify({ + command: event.command, + subcommand: event.subcommand, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logMalformedJsonResponseEvent(event: MalformedJsonResponseEvent): void { + const rumEvent = this.createExceptionEvent({ + type: 'error', + subtype: 'malformed_json_response', + snapshots: JSON.stringify({ + model: event.model, + }), + }); + + this.enqueueLogEvent(rumEvent); + this.flushIfNeeded(); + } + + logEndSessionEvent(_event: EndSessionEvent): void { + const applicationEvent = this.createViewEvent({ + type: 'session', + name: 'session_end', + }); + + // Flush immediately on session end. + this.enqueueLogEvent(applicationEvent); + this.flushToRum().catch((error: unknown) => { + console.debug('Error flushing to RUM:', error); + }); + } + + getProxyAgent() { + const proxyUrl = this.config?.getProxy(); + if (!proxyUrl) return undefined; + // undici which is widely used in the repo can only support http & https proxy protocol, + // https://github.com/nodejs/undici/issues/2224 + if (proxyUrl.startsWith('http')) { + return new HttpsProxyAgent(proxyUrl); + } else { + throw new Error('Unsupported proxy type'); + } + } + + shutdown() { + const event = new EndSessionEvent(this.config); + this.logEndSessionEvent(event); + } +} diff --git a/packages/core/src/telemetry/sdk.ts b/packages/core/src/telemetry/sdk.ts index 1167750a..16c378b8 100644 --- a/packages/core/src/telemetry/sdk.ts +++ b/packages/core/src/telemetry/sdk.ts @@ -28,7 +28,7 @@ import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; import { Config } from '../config/config.js'; import { SERVICE_NAME } from './constants.js'; import { initializeMetrics } from './metrics.js'; -import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; +import { QwenLogger } from './qwen-logger/qwen-logger.js'; import { FileLogExporter, FileMetricExporter, @@ -141,7 +141,7 @@ export async function shutdownTelemetry(): Promise { return; } try { - ClearcutLogger.getInstance()?.shutdown(); + QwenLogger.getInstance()?.shutdown(); await sdk.shutdown(); console.log('OpenTelemetry SDK shut down successfully.'); } catch (error) { From 807844fb575e94654b98f3f4d5cb9b2734f9ed9b Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Tue, 12 Aug 2025 12:16:38 +0800 Subject: [PATCH 2/3] feat: implement usage stats logging with telemetry refactoring --- .../__tests__/openaiTimeoutHandling.test.ts | 19 +- packages/core/src/core/geminiChat.ts | 24 +- .../src/core/openaiContentGenerator.test.ts | 1 + .../core/src/core/openaiContentGenerator.ts | 124 +++------ .../src/services/loopDetectionService.test.ts | 2 + .../telemetry/integration.test.circular.ts | 2 + packages/core/src/telemetry/loggers.test.ts | 3 + .../src/telemetry/qwen-logger/event-types.ts | 16 +- .../src/telemetry/qwen-logger/qwen-logger.ts | 241 +++++++++--------- packages/core/src/telemetry/sdk.ts | 1 - packages/core/src/telemetry/types.ts | 6 + packages/core/test-setup.ts | 6 + 12 files changed, 203 insertions(+), 242 deletions(-) diff --git a/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts b/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts index 702c1326..158b0c33 100644 --- a/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts +++ b/packages/core/src/core/__tests__/openaiTimeoutHandling.test.ts @@ -15,6 +15,7 @@ vi.mock('openai'); // Mock logger modules vi.mock('../../telemetry/loggers.js', () => ({ logApiResponse: vi.fn(), + logApiError: vi.fn(), })); vi.mock('../../utils/openaiLogger.js', () => ({ @@ -290,28 +291,18 @@ describe('OpenAIContentGenerator Timeout Handling', () => { }); describe('token estimation on timeout', () => { - it('should estimate tokens even when request times out', async () => { + it('should surface a clear timeout error when request times out', async () => { const timeoutError = new Error('Request timeout'); mockOpenAIClient.chat.completions.create.mockRejectedValue(timeoutError); - // Mock countTokens to return a value - const mockCountTokens = vi.spyOn(generator, 'countTokens'); - mockCountTokens.mockResolvedValue({ totalTokens: 100 }); - const request = { contents: [{ role: 'user' as const, parts: [{ text: 'Hello world' }] }], model: 'gpt-4', }; - try { - await generator.generateContent(request, 'test-prompt-id'); - } catch (_error) { - // Verify that countTokens was called for estimation - expect(mockCountTokens).toHaveBeenCalledWith({ - contents: request.contents, - model: 'gpt-4', - }); - } + await expect( + generator.generateContent(request, 'test-prompt-id'), + ).rejects.toThrow(/Request timeout after \d+s/); }); it('should fall back to character-based estimation if countTokens fails', async () => { diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index 50f15b72..c66237c4 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -158,14 +158,23 @@ export class GeminiChat { prompt_id: string, usageMetadata?: GenerateContentResponseUsageMetadata, responseText?: string, + responseId?: string, ): Promise { + const authType = this.config.getContentGeneratorConfig()?.authType; + + // Don't log API responses for openaiContentGenerator + if (authType === AuthType.QWEN_OAUTH || authType === AuthType.USE_OPENAI) { + return; + } + logApiResponse( this.config, new ApiResponseEvent( + responseId || `gemini-${Date.now()}`, this.config.getModel(), durationMs, prompt_id, - this.config.getContentGeneratorConfig()?.authType, + authType, usageMetadata, responseText, ), @@ -176,18 +185,27 @@ export class GeminiChat { durationMs: number, error: unknown, prompt_id: string, + responseId?: string, ): void { const errorMessage = error instanceof Error ? error.message : String(error); const errorType = error instanceof Error ? error.name : 'unknown'; + const authType = this.config.getContentGeneratorConfig()?.authType; + + // Don't log API errors for openaiContentGenerator + if (authType === AuthType.QWEN_OAUTH || authType === AuthType.USE_OPENAI) { + return; + } + logApiError( this.config, new ApiErrorEvent( + responseId, this.config.getModel(), errorMessage, durationMs, prompt_id, - this.config.getContentGeneratorConfig()?.authType, + authType, errorType, ), ); @@ -320,6 +338,7 @@ export class GeminiChat { prompt_id, response.usageMetadata, JSON.stringify(response), + response.responseId, ); this.sendPromise = (async () => { @@ -563,6 +582,7 @@ export class GeminiChat { prompt_id, this.getFinalUsageMetadata(chunks), JSON.stringify(chunks), + chunks[chunks.length - 1]?.responseId, ); } this.recordHistory(inputContent, outputContent); diff --git a/packages/core/src/core/openaiContentGenerator.test.ts b/packages/core/src/core/openaiContentGenerator.test.ts index 6dc0690a..a24ce622 100644 --- a/packages/core/src/core/openaiContentGenerator.test.ts +++ b/packages/core/src/core/openaiContentGenerator.test.ts @@ -23,6 +23,7 @@ vi.mock('openai'); // Mock logger modules vi.mock('../telemetry/loggers.js', () => ({ logApiResponse: vi.fn(), + logApiError: vi.fn(), })); vi.mock('../utils/openaiLogger.js', () => ({ diff --git a/packages/core/src/core/openaiContentGenerator.ts b/packages/core/src/core/openaiContentGenerator.ts index 4e9e73da..ffb5b1a3 100644 --- a/packages/core/src/core/openaiContentGenerator.ts +++ b/packages/core/src/core/openaiContentGenerator.ts @@ -22,8 +22,8 @@ import { } from '@google/genai'; import { ContentGenerator } from './contentGenerator.js'; import OpenAI from 'openai'; -import { logApiResponse } from '../telemetry/loggers.js'; -import { ApiResponseEvent } from '../telemetry/types.js'; +import { logApiError, logApiResponse } from '../telemetry/loggers.js'; +import { ApiErrorEvent, ApiResponseEvent } from '../telemetry/types.js'; import { Config } from '../config/config.js'; import { openaiLogger } from '../utils/openaiLogger.js'; @@ -226,6 +226,7 @@ export class OpenAIContentGenerator implements ContentGenerator { // Log API response event for UI telemetry const responseEvent = new ApiResponseEvent( + response.responseId || 'unknown', this.model, durationMs, userPromptId, @@ -254,41 +255,21 @@ export class OpenAIContentGenerator implements ContentGenerator { ? error.message : String(error); - // Estimate token usage even when there's an error - // This helps track costs and usage even for failed requests - let estimatedUsage; - try { - const tokenCountResult = await this.countTokens({ - contents: request.contents, - model: this.model, - }); - estimatedUsage = { - promptTokenCount: tokenCountResult.totalTokens, - candidatesTokenCount: 0, // No completion tokens since request failed - totalTokenCount: tokenCountResult.totalTokens, - }; - } catch { - // If token counting also fails, provide a minimal estimate - const contentStr = JSON.stringify(request.contents); - const estimatedTokens = Math.ceil(contentStr.length / 4); - estimatedUsage = { - promptTokenCount: estimatedTokens, - candidatesTokenCount: 0, - totalTokenCount: estimatedTokens, - }; - } - - // Log API error event for UI telemetry with estimated usage - const errorEvent = new ApiResponseEvent( + // Log API error event for UI telemetry + const errorEvent = new ApiErrorEvent( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).requestID || 'unknown', this.model, + errorMessage, durationMs, userPromptId, this.config.getContentGeneratorConfig()?.authType, - estimatedUsage, - undefined, - errorMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).type, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).code, ); - logApiResponse(this.config, errorEvent); + logApiError(this.config, errorEvent); // Log error interaction if enabled if (this.config.getContentGeneratorConfig()?.enableOpenAILogging) { @@ -380,6 +361,7 @@ export class OpenAIContentGenerator implements ContentGenerator { // Log API response event for UI telemetry const responseEvent = new ApiResponseEvent( + responses[responses.length - 1]?.responseId || 'unknown', this.model, durationMs, userPromptId, @@ -411,40 +393,21 @@ export class OpenAIContentGenerator implements ContentGenerator { ? error.message : String(error); - // Estimate token usage even when there's an error in streaming - let estimatedUsage; - try { - const tokenCountResult = await this.countTokens({ - contents: request.contents, - model: this.model, - }); - estimatedUsage = { - promptTokenCount: tokenCountResult.totalTokens, - candidatesTokenCount: 0, // No completion tokens since request failed - totalTokenCount: tokenCountResult.totalTokens, - }; - } catch { - // If token counting also fails, provide a minimal estimate - const contentStr = JSON.stringify(request.contents); - const estimatedTokens = Math.ceil(contentStr.length / 4); - estimatedUsage = { - promptTokenCount: estimatedTokens, - candidatesTokenCount: 0, - totalTokenCount: estimatedTokens, - }; - } - - // Log API error event for UI telemetry with estimated usage - const errorEvent = new ApiResponseEvent( + // Log API error event for UI telemetry + const errorEvent = new ApiErrorEvent( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).requestID || 'unknown', this.model, + errorMessage, durationMs, userPromptId, this.config.getContentGeneratorConfig()?.authType, - estimatedUsage, - undefined, - errorMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).type, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).code, ); - logApiResponse(this.config, errorEvent); + logApiError(this.config, errorEvent); // Log error interaction if enabled if (this.config.getContentGeneratorConfig()?.enableOpenAILogging) { @@ -484,40 +447,21 @@ export class OpenAIContentGenerator implements ContentGenerator { ? error.message : String(error); - // Estimate token usage even when there's an error in streaming setup - let estimatedUsage; - try { - const tokenCountResult = await this.countTokens({ - contents: request.contents, - model: this.model, - }); - estimatedUsage = { - promptTokenCount: tokenCountResult.totalTokens, - candidatesTokenCount: 0, // No completion tokens since request failed - totalTokenCount: tokenCountResult.totalTokens, - }; - } catch { - // If token counting also fails, provide a minimal estimate - const contentStr = JSON.stringify(request.contents); - const estimatedTokens = Math.ceil(contentStr.length / 4); - estimatedUsage = { - promptTokenCount: estimatedTokens, - candidatesTokenCount: 0, - totalTokenCount: estimatedTokens, - }; - } - - // Log API error event for UI telemetry with estimated usage - const errorEvent = new ApiResponseEvent( + // Log API error event for UI telemetry + const errorEvent = new ApiErrorEvent( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).requestID || 'unknown', this.model, + errorMessage, durationMs, userPromptId, this.config.getContentGeneratorConfig()?.authType, - estimatedUsage, - undefined, - errorMessage, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).type, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (error as any).code, ); - logApiResponse(this.config, errorEvent); + logApiError(this.config, errorEvent); // Allow subclasses to suppress error logging for specific scenarios if (!this.shouldSuppressErrorLogging(error, request)) { diff --git a/packages/core/src/services/loopDetectionService.test.ts b/packages/core/src/services/loopDetectionService.test.ts index 2ec32ae7..f3fe3f79 100644 --- a/packages/core/src/services/loopDetectionService.test.ts +++ b/packages/core/src/services/loopDetectionService.test.ts @@ -19,6 +19,8 @@ import { LoopDetectionService } from './loopDetectionService.js'; vi.mock('../telemetry/loggers.js', () => ({ logLoopDetected: vi.fn(), + logApiError: vi.fn(), + logApiResponse: vi.fn(), })); const TOOL_CALL_LOOP_THRESHOLD = 5; diff --git a/packages/core/src/telemetry/integration.test.circular.ts b/packages/core/src/telemetry/integration.test.circular.ts index 0d639925..614f5e02 100644 --- a/packages/core/src/telemetry/integration.test.circular.ts +++ b/packages/core/src/telemetry/integration.test.circular.ts @@ -48,6 +48,8 @@ describe('Circular Reference Integration Test', () => { const problematicEvent: RumEvent = { timestamp: Date.now(), event_type: 'exception', + type: 'error', + name: 'api_error', error: new Error('Network error'), function_args: { filePath: '/test/file.txt', diff --git a/packages/core/src/telemetry/loggers.test.ts b/packages/core/src/telemetry/loggers.test.ts index 3d8116cc..d37227e5 100644 --- a/packages/core/src/telemetry/loggers.test.ts +++ b/packages/core/src/telemetry/loggers.test.ts @@ -212,6 +212,7 @@ describe('loggers', () => { toolUsePromptTokenCount: 2, }; const event = new ApiResponseEvent( + 'test-response-id', 'test-model', 100, 'prompt-id-1', @@ -229,6 +230,7 @@ describe('loggers', () => { 'event.name': EVENT_API_RESPONSE, 'event.timestamp': '2025-01-01T00:00:00.000Z', [SemanticAttributes.HTTP_STATUS_CODE]: 200, + response_id: 'test-response-id', model: 'test-model', status_code: 200, duration_ms: 100, @@ -275,6 +277,7 @@ describe('loggers', () => { toolUsePromptTokenCount: 2, }; const event = new ApiResponseEvent( + 'test-response-id-2', 'test-model', 100, 'prompt-id-1', diff --git a/packages/core/src/telemetry/qwen-logger/event-types.ts b/packages/core/src/telemetry/qwen-logger/event-types.ts index 14e0de83..1549d2ba 100644 --- a/packages/core/src/telemetry/qwen-logger/event-types.ts +++ b/packages/core/src/telemetry/qwen-logger/event-types.ts @@ -22,32 +22,28 @@ export interface RumView { export interface RumEvent { timestamp?: number; event_type?: 'view' | 'action' | 'exception' | 'resource'; + type: string; // Event type + name: string; // Event name + snapshots?: string; // JSON string of event snapshots + properties?: Record; // [key: string]: unknown; } export interface RumViewEvent extends RumEvent { - type?: string; // View event type: pv, perf - name?: string; // View event name view_type?: string; // View rendering type time_spent?: number; // Time spent on current view in ms - snapshots?: string; // View snapshots JSON string, mainly for native apps } export interface RumActionEvent extends RumEvent { - type?: string; // User action type - name?: string; // Semantic name, e.g.: click#checkout target_name?: string; // Element user interacted with (for auto-collected actions only) duration?: number; // Action duration in ms - snapshots?: string; // Action snapshots method_info?: string; // Action callback, e.g.: onClick() } export interface RumExceptionEvent extends RumEvent { source?: string; // Error source, e.g.: console, event file?: string; // Error file - type?: string; // Error type: crash, custom, error subtype?: string; // Secondary classification of error type - name?: string; // Error name message?: string; // Concise, readable message explaining the event stack?: string; // Stack trace or supplemental information about the error caused_by?: string; // Exception cause @@ -55,16 +51,13 @@ export interface RumExceptionEvent extends RumEvent { column?: number; // Column number where exception occurred thread_id?: string; // Thread ID binary_images?: string; // Error source - snapshots?: string; // Error snapshots } export interface RumResourceEvent extends RumEvent { - type?: string; // Resource type: css, javascript, media, XHR, image, navigation (XHR/fetch will be considered as API) method?: string; // HTTP request method: POST, GET, etc. status_code?: string; // Resource status code message?: string; // Error message content, corresponds to resource.error_msg url?: string; // Resource URL - name?: string; // Default is URL path part, can be matched by rules or user configuration provider_type?: string; // Resource provider type: first-party, cdn, ad, analytics trace_id?: string; // Resource request TraceID success?: number; // Resource loading success: 1 (default) success, 0 failure @@ -78,7 +71,6 @@ export interface RumResourceEvent extends RumEvent { download_duration?: number; // Time spent downloading response in ms (responseEnd - responseStart) timing_data?: string; // JSON string of PerformanceResourceTiming trace_data?: string; // Trace information snapshot JSON string - snapshots?: string; // View snapshots, mainly for native apps } export interface RumPayload { diff --git a/packages/core/src/telemetry/qwen-logger/qwen-logger.ts b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts index 0f67c5fa..e03f45cd 100644 --- a/packages/core/src/telemetry/qwen-logger/qwen-logger.ts +++ b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts @@ -60,11 +60,15 @@ export class QwenLogger { private sessionId: string; private viewId: string; private isFlushInProgress: boolean = false; + private isShutdown: boolean = false; private constructor(config?: Config) { this.config = config; this.userId = this.generateUserId(); - this.sessionId = this.config?.getSessionId() ?? ''; + this.sessionId = + typeof this.config?.getSessionId === 'function' + ? this.config.getSessionId() + : ''; this.viewId = randomUUID(); } @@ -79,6 +83,9 @@ export class QwenLogger { if (!QwenLogger.instance) { QwenLogger.instance = new QwenLogger(config); } + + process.on('exit', QwenLogger.instance.shutdown.bind(QwenLogger.instance)); + return QwenLogger.instance; } @@ -88,29 +95,49 @@ export class QwenLogger { createRumEvent( eventType: 'view' | 'action' | 'exception' | 'resource', - properties: RumEvent, + type: string, + name: string, + properties: Partial, ): RumEvent { return { timestamp: Date.now(), event_type: eventType, - ...properties, + type, + name, + ...(properties || {}), }; } - createViewEvent(properties: RumViewEvent): RumEvent { - return this.createRumEvent('view', properties); + createViewEvent( + type: string, + name: string, + properties: Partial, + ): RumEvent { + return this.createRumEvent('view', type, name, properties); } - createActionEvent(properties: RumActionEvent): RumEvent { - return this.createRumEvent('action', properties); + createActionEvent( + type: string, + name: string, + properties: Partial, + ): RumEvent { + return this.createRumEvent('action', type, name, properties); } - createResourceEvent(properties: RumResourceEvent): RumEvent { - return this.createRumEvent('resource', properties); + createResourceEvent( + type: string, + name: string, + properties: Partial, + ): RumEvent { + return this.createRumEvent('resource', type, name, properties); } - createExceptionEvent(properties: RumExceptionEvent): RumEvent { - return this.createRumEvent('exception', properties); + createExceptionEvent( + type: string, + name: string, + properties: Partial, + ): RumEvent { + return this.createRumEvent('exception', type, name, properties); } createRumPayload(): RumPayload { @@ -230,51 +257,12 @@ export class QwenLogger { } } - // Visible for testing. Decodes protobuf-encoded response from Qwen server. - decodeLogResponse(buf: Buffer): LogResponse | undefined { - // TODO(obrienowen): return specific errors to facilitate debugging. - if (buf.length < 1) { - return undefined; - } - - // The first byte of the buffer is `field<<3 | type`. We're looking for field - // 1, with type varint, represented by type=0. If the first byte isn't 8, that - // means field 1 is missing or the message is corrupted. Either way, we return - // undefined. - if (buf.readUInt8(0) !== 8) { - return undefined; - } - - let ms = BigInt(0); - let cont = true; - - // In each byte, the most significant bit is the continuation bit. If it's - // set, we keep going. The lowest 7 bits, are data bits. They are concatenated - // in reverse order to form the final number. - for (let i = 1; cont && i < buf.length; i++) { - const byte = buf.readUInt8(i); - ms |= BigInt(byte & 0x7f) << BigInt(7 * (i - 1)); - cont = (byte & 0x80) !== 0; - } - - if (cont) { - // We have fallen off the buffer without seeing a terminating byte. The - // message is corrupted. - return undefined; - } - - const returnVal = { - nextRequestWaitMs: Number(ms), - }; - return returnVal; - } - logStartSessionEvent(event: StartSessionEvent): void { - const applicationEvent = this.createViewEvent({ - type: 'session', - name: 'session_start', - snapshots: JSON.stringify({ + const applicationEvent = this.createViewEvent('session', 'session_start', { + properties: { model: event.model, + }, + snapshots: JSON.stringify({ embedding_model: event.embedding_model, sandbox_enabled: event.sandbox_enabled, core_tools_enabled: event.core_tools_enabled, @@ -286,8 +274,6 @@ export class QwenLogger { telemetry_enabled: event.telemetry_enabled, telemetry_log_user_prompts_enabled: event.telemetry_log_user_prompts_enabled, - file_filtering_respect_git_ignore: - event.file_filtering_respect_git_ignore, }), }); @@ -299,13 +285,13 @@ export class QwenLogger { } logNewPromptEvent(event: UserPromptEvent): void { - const rumEvent = this.createActionEvent({ - type: 'user_prompt', - name: 'user_prompt', + const rumEvent = this.createActionEvent('user_prompt', 'user_prompt', { + properties: { + auth_type: event.auth_type, + prompt_id: event.prompt_id, + }, snapshots: JSON.stringify({ prompt_length: event.prompt_length, - prompt_id: event.prompt_id, - auth_type: event.auth_type, }), }); @@ -314,55 +300,62 @@ export class QwenLogger { } logToolCallEvent(event: ToolCallEvent): void { - const rumEvent = this.createActionEvent({ - type: 'tool_call', - name: `tool_call#${event.function_name}`, - snapshots: JSON.stringify({ - function_name: event.function_name, - prompt_id: event.prompt_id, - decision: event.decision, - success: event.success, - duration_ms: event.duration_ms, - error: event.error, - error_type: event.error_type, - }), - }); + const rumEvent = this.createActionEvent( + 'tool_call', + `tool_call#${event.function_name}`, + { + properties: { + prompt_id: event.prompt_id, + }, + snapshots: JSON.stringify({ + function_name: event.function_name, + decision: event.decision, + success: event.success, + duration_ms: event.duration_ms, + error: event.error, + error_type: event.error_type, + }), + }, + ); this.enqueueLogEvent(rumEvent); this.flushIfNeeded(); } logApiRequestEvent(event: ApiRequestEvent): void { - const rumEvent = this.createResourceEvent({ - type: 'api', - name: 'api_request', - snapshots: JSON.stringify({ - model: event.model, - prompt_id: event.prompt_id, - }), - }); + // ignore for now + console.log('logApiRequestEvent', event); + return; - this.enqueueLogEvent(rumEvent); - this.flushIfNeeded(); + // const rumEvent = this.createResourceEvent('api', 'api_request', { + // properties: { + // model: event.model, + // prompt_id: event.prompt_id, + // }, + // }); + + // this.enqueueLogEvent(rumEvent); + // this.flushIfNeeded(); } logApiResponseEvent(event: ApiResponseEvent): void { - const rumEvent = this.createResourceEvent({ - type: 'api', - name: 'api_response', + const rumEvent = this.createResourceEvent('api', 'api_response', { status_code: event.status_code?.toString() ?? '', duration: event.duration_ms, - success: event.status_code === 200 ? 1 : 0, + success: 1, message: event.error, - snapshots: JSON.stringify({ + trace_id: event.response_id, + properties: { + auth_type: event.auth_type, model: event.model, prompt_id: event.prompt_id, + }, + snapshots: JSON.stringify({ input_token_count: event.input_token_count, output_token_count: event.output_token_count, cached_content_token_count: event.cached_content_token_count, thoughts_token_count: event.thoughts_token_count, tool_token_count: event.tool_token_count, - auth_type: event.auth_type, }), }); @@ -371,17 +364,19 @@ export class QwenLogger { } logApiErrorEvent(event: ApiErrorEvent): void { - const rumEvent = this.createExceptionEvent({ - type: 'error', - subtype: 'api_error', + const rumEvent = this.createResourceEvent('api', 'api_error', { + status_code: event.status_code?.toString() ?? '', + duration: event.duration_ms, + success: 0, message: event.error, - snapshots: JSON.stringify({ + trace_id: event.response_id, + properties: { + auth_type: event.auth_type, model: event.model, prompt_id: event.prompt_id, + }, + snapshots: JSON.stringify({ error_type: event.error_type, - status_code: event.status_code, - duration_ms: event.duration_ms, - auth_type: event.auth_type, }), }); @@ -390,12 +385,10 @@ export class QwenLogger { } logFlashFallbackEvent(event: FlashFallbackEvent): void { - const rumEvent = this.createActionEvent({ - type: 'fallback', - name: 'flash_fallback', - snapshots: JSON.stringify({ + const rumEvent = this.createActionEvent('fallback', 'flash_fallback', { + properties: { auth_type: event.auth_type, - }), + }, }); this.enqueueLogEvent(rumEvent); @@ -403,11 +396,12 @@ export class QwenLogger { } logLoopDetectedEvent(event: LoopDetectedEvent): void { - const rumEvent = this.createExceptionEvent({ - type: 'error', + const rumEvent = this.createExceptionEvent('error', 'loop_detected', { subtype: 'loop_detected', - snapshots: JSON.stringify({ + properties: { prompt_id: event.prompt_id, + }, + snapshots: JSON.stringify({ loop_type: event.loop_type, }), }); @@ -417,11 +411,11 @@ export class QwenLogger { } logNextSpeakerCheck(event: NextSpeakerCheckEvent): void { - const rumEvent = this.createActionEvent({ - type: 'check', - name: 'next_speaker_check', - snapshots: JSON.stringify({ + const rumEvent = this.createActionEvent('check', 'next_speaker_check', { + properties: { prompt_id: event.prompt_id, + }, + snapshots: JSON.stringify({ finish_reason: event.finish_reason, result: event.result, }), @@ -432,9 +426,7 @@ export class QwenLogger { } logSlashCommandEvent(event: SlashCommandEvent): void { - const rumEvent = this.createActionEvent({ - type: 'command', - name: 'slash_command', + const rumEvent = this.createActionEvent('command', 'slash_command', { snapshots: JSON.stringify({ command: event.command, subcommand: event.subcommand, @@ -446,23 +438,23 @@ export class QwenLogger { } logMalformedJsonResponseEvent(event: MalformedJsonResponseEvent): void { - const rumEvent = this.createExceptionEvent({ - type: 'error', - subtype: 'malformed_json_response', - snapshots: JSON.stringify({ - model: event.model, - }), - }); + const rumEvent = this.createExceptionEvent( + 'error', + 'malformed_json_response', + { + subtype: 'malformed_json_response', + properties: { + model: event.model, + }, + }, + ); this.enqueueLogEvent(rumEvent); this.flushIfNeeded(); } logEndSessionEvent(_event: EndSessionEvent): void { - const applicationEvent = this.createViewEvent({ - type: 'session', - name: 'session_end', - }); + const applicationEvent = this.createViewEvent('session', 'session_end', {}); // Flush immediately on session end. this.enqueueLogEvent(applicationEvent); @@ -484,6 +476,9 @@ export class QwenLogger { } shutdown() { + if (this.isShutdown) return; + + this.isShutdown = true; const event = new EndSessionEvent(this.config); this.logEndSessionEvent(event); } diff --git a/packages/core/src/telemetry/sdk.ts b/packages/core/src/telemetry/sdk.ts index 16c378b8..ea3408a0 100644 --- a/packages/core/src/telemetry/sdk.ts +++ b/packages/core/src/telemetry/sdk.ts @@ -141,7 +141,6 @@ export async function shutdownTelemetry(): Promise { return; } try { - QwenLogger.getInstance()?.shutdown(); await sdk.shutdown(); console.log('OpenTelemetry SDK shut down successfully.'); } catch (error) { diff --git a/packages/core/src/telemetry/types.ts b/packages/core/src/telemetry/types.ts index 9d1fd77a..22a0c44b 100644 --- a/packages/core/src/telemetry/types.ts +++ b/packages/core/src/telemetry/types.ts @@ -161,6 +161,7 @@ export class ApiRequestEvent { export class ApiErrorEvent { 'event.name': 'api_error'; 'event.timestamp': string; // ISO 8601 + response_id?: string; model: string; error: string; error_type?: string; @@ -170,6 +171,7 @@ export class ApiErrorEvent { auth_type?: string; constructor( + response_id: string | undefined, model: string, error: string, duration_ms: number, @@ -180,6 +182,7 @@ export class ApiErrorEvent { ) { this['event.name'] = 'api_error'; this['event.timestamp'] = new Date().toISOString(); + this.response_id = response_id; this.model = model; this.error = error; this.error_type = error_type; @@ -193,6 +196,7 @@ export class ApiErrorEvent { export class ApiResponseEvent { 'event.name': 'api_response'; 'event.timestamp': string; // ISO 8601 + response_id: string; model: string; status_code?: number | string; duration_ms: number; @@ -208,6 +212,7 @@ export class ApiResponseEvent { auth_type?: string; constructor( + response_id: string, model: string, duration_ms: number, prompt_id: string, @@ -218,6 +223,7 @@ export class ApiResponseEvent { ) { this['event.name'] = 'api_response'; this['event.timestamp'] = new Date().toISOString(); + this.response_id = response_id; this.model = model; this.duration_ms = duration_ms; this.status_code = 200; diff --git a/packages/core/test-setup.ts b/packages/core/test-setup.ts index ed06d5f1..44c46c47 100644 --- a/packages/core/test-setup.ts +++ b/packages/core/test-setup.ts @@ -8,3 +8,9 @@ import { setSimulate429 } from './src/utils/testUtils.js'; // Disable 429 simulation globally for all tests setSimulate429(false); + +// Some dependencies (e.g., undici) expect a global File constructor in Node. +// Provide a minimal shim for test environment if missing. +if (typeof (globalThis as unknown as { File?: unknown }).File === 'undefined') { + (globalThis as unknown as { File: unknown }).File = class {} as unknown; +} From 2655af079aa5e88015a176ec6356089d80186ea5 Mon Sep 17 00:00:00 2001 From: tanzhenxin Date: Tue, 12 Aug 2025 12:21:39 +0800 Subject: [PATCH 3/3] chore: npm run format & lint --- packages/core/src/telemetry/qwen-logger/qwen-logger.ts | 3 +-- packages/core/src/telemetry/sdk.ts | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/core/src/telemetry/qwen-logger/qwen-logger.ts b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts index e03f45cd..e3ec55d8 100644 --- a/packages/core/src/telemetry/qwen-logger/qwen-logger.ts +++ b/packages/core/src/telemetry/qwen-logger/qwen-logger.ts @@ -322,9 +322,8 @@ export class QwenLogger { this.flushIfNeeded(); } - logApiRequestEvent(event: ApiRequestEvent): void { + logApiRequestEvent(_event: ApiRequestEvent): void { // ignore for now - console.log('logApiRequestEvent', event); return; // const rumEvent = this.createResourceEvent('api', 'api_request', { diff --git a/packages/core/src/telemetry/sdk.ts b/packages/core/src/telemetry/sdk.ts index ea3408a0..531c905f 100644 --- a/packages/core/src/telemetry/sdk.ts +++ b/packages/core/src/telemetry/sdk.ts @@ -28,7 +28,6 @@ import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'; import { Config } from '../config/config.js'; import { SERVICE_NAME } from './constants.js'; import { initializeMetrics } from './metrics.js'; -import { QwenLogger } from './qwen-logger/qwen-logger.js'; import { FileLogExporter, FileMetricExporter,