feat: implement usage stats logging with telemetry refactoring

This commit is contained in:
tanzhenxin
2025-08-12 12:16:38 +08:00
parent c96852dc56
commit 807844fb57
12 changed files with 203 additions and 242 deletions

View File

@@ -60,11 +60,15 @@ export class QwenLogger {
private sessionId: string;
private viewId: string;
private isFlushInProgress: boolean = false;
private isShutdown: boolean = false;
private constructor(config?: Config) {
this.config = config;
this.userId = this.generateUserId();
this.sessionId = this.config?.getSessionId() ?? '';
this.sessionId =
typeof this.config?.getSessionId === 'function'
? this.config.getSessionId()
: '';
this.viewId = randomUUID();
}
@@ -79,6 +83,9 @@ export class QwenLogger {
if (!QwenLogger.instance) {
QwenLogger.instance = new QwenLogger(config);
}
process.on('exit', QwenLogger.instance.shutdown.bind(QwenLogger.instance));
return QwenLogger.instance;
}
@@ -88,29 +95,49 @@ export class QwenLogger {
createRumEvent(
eventType: 'view' | 'action' | 'exception' | 'resource',
properties: RumEvent,
type: string,
name: string,
properties: Partial<RumEvent>,
): RumEvent {
return {
timestamp: Date.now(),
event_type: eventType,
...properties,
type,
name,
...(properties || {}),
};
}
createViewEvent(properties: RumViewEvent): RumEvent {
return this.createRumEvent('view', properties);
createViewEvent(
type: string,
name: string,
properties: Partial<RumViewEvent>,
): RumEvent {
return this.createRumEvent('view', type, name, properties);
}
createActionEvent(properties: RumActionEvent): RumEvent {
return this.createRumEvent('action', properties);
createActionEvent(
type: string,
name: string,
properties: Partial<RumActionEvent>,
): RumEvent {
return this.createRumEvent('action', type, name, properties);
}
createResourceEvent(properties: RumResourceEvent): RumEvent {
return this.createRumEvent('resource', properties);
createResourceEvent(
type: string,
name: string,
properties: Partial<RumResourceEvent>,
): RumEvent {
return this.createRumEvent('resource', type, name, properties);
}
createExceptionEvent(properties: RumExceptionEvent): RumEvent {
return this.createRumEvent('exception', properties);
createExceptionEvent(
type: string,
name: string,
properties: Partial<RumExceptionEvent>,
): RumEvent {
return this.createRumEvent('exception', type, name, properties);
}
createRumPayload(): RumPayload {
@@ -230,51 +257,12 @@ export class QwenLogger {
}
}
// Visible for testing. Decodes protobuf-encoded response from Qwen server.
decodeLogResponse(buf: Buffer): LogResponse | undefined {
// TODO(obrienowen): return specific errors to facilitate debugging.
if (buf.length < 1) {
return undefined;
}
// The first byte of the buffer is `field<<3 | type`. We're looking for field
// 1, with type varint, represented by type=0. If the first byte isn't 8, that
// means field 1 is missing or the message is corrupted. Either way, we return
// undefined.
if (buf.readUInt8(0) !== 8) {
return undefined;
}
let ms = BigInt(0);
let cont = true;
// In each byte, the most significant bit is the continuation bit. If it's
// set, we keep going. The lowest 7 bits, are data bits. They are concatenated
// in reverse order to form the final number.
for (let i = 1; cont && i < buf.length; i++) {
const byte = buf.readUInt8(i);
ms |= BigInt(byte & 0x7f) << BigInt(7 * (i - 1));
cont = (byte & 0x80) !== 0;
}
if (cont) {
// We have fallen off the buffer without seeing a terminating byte. The
// message is corrupted.
return undefined;
}
const returnVal = {
nextRequestWaitMs: Number(ms),
};
return returnVal;
}
logStartSessionEvent(event: StartSessionEvent): void {
const applicationEvent = this.createViewEvent({
type: 'session',
name: 'session_start',
snapshots: JSON.stringify({
const applicationEvent = this.createViewEvent('session', 'session_start', {
properties: {
model: event.model,
},
snapshots: JSON.stringify({
embedding_model: event.embedding_model,
sandbox_enabled: event.sandbox_enabled,
core_tools_enabled: event.core_tools_enabled,
@@ -286,8 +274,6 @@ export class QwenLogger {
telemetry_enabled: event.telemetry_enabled,
telemetry_log_user_prompts_enabled:
event.telemetry_log_user_prompts_enabled,
file_filtering_respect_git_ignore:
event.file_filtering_respect_git_ignore,
}),
});
@@ -299,13 +285,13 @@ export class QwenLogger {
}
logNewPromptEvent(event: UserPromptEvent): void {
const rumEvent = this.createActionEvent({
type: 'user_prompt',
name: 'user_prompt',
const rumEvent = this.createActionEvent('user_prompt', 'user_prompt', {
properties: {
auth_type: event.auth_type,
prompt_id: event.prompt_id,
},
snapshots: JSON.stringify({
prompt_length: event.prompt_length,
prompt_id: event.prompt_id,
auth_type: event.auth_type,
}),
});
@@ -314,55 +300,62 @@ export class QwenLogger {
}
logToolCallEvent(event: ToolCallEvent): void {
const rumEvent = this.createActionEvent({
type: 'tool_call',
name: `tool_call#${event.function_name}`,
snapshots: JSON.stringify({
function_name: event.function_name,
prompt_id: event.prompt_id,
decision: event.decision,
success: event.success,
duration_ms: event.duration_ms,
error: event.error,
error_type: event.error_type,
}),
});
const rumEvent = this.createActionEvent(
'tool_call',
`tool_call#${event.function_name}`,
{
properties: {
prompt_id: event.prompt_id,
},
snapshots: JSON.stringify({
function_name: event.function_name,
decision: event.decision,
success: event.success,
duration_ms: event.duration_ms,
error: event.error,
error_type: event.error_type,
}),
},
);
this.enqueueLogEvent(rumEvent);
this.flushIfNeeded();
}
logApiRequestEvent(event: ApiRequestEvent): void {
const rumEvent = this.createResourceEvent({
type: 'api',
name: 'api_request',
snapshots: JSON.stringify({
model: event.model,
prompt_id: event.prompt_id,
}),
});
// ignore for now
console.log('logApiRequestEvent', event);
return;
this.enqueueLogEvent(rumEvent);
this.flushIfNeeded();
// const rumEvent = this.createResourceEvent('api', 'api_request', {
// properties: {
// model: event.model,
// prompt_id: event.prompt_id,
// },
// });
// this.enqueueLogEvent(rumEvent);
// this.flushIfNeeded();
}
logApiResponseEvent(event: ApiResponseEvent): void {
const rumEvent = this.createResourceEvent({
type: 'api',
name: 'api_response',
const rumEvent = this.createResourceEvent('api', 'api_response', {
status_code: event.status_code?.toString() ?? '',
duration: event.duration_ms,
success: event.status_code === 200 ? 1 : 0,
success: 1,
message: event.error,
snapshots: JSON.stringify({
trace_id: event.response_id,
properties: {
auth_type: event.auth_type,
model: event.model,
prompt_id: event.prompt_id,
},
snapshots: JSON.stringify({
input_token_count: event.input_token_count,
output_token_count: event.output_token_count,
cached_content_token_count: event.cached_content_token_count,
thoughts_token_count: event.thoughts_token_count,
tool_token_count: event.tool_token_count,
auth_type: event.auth_type,
}),
});
@@ -371,17 +364,19 @@ export class QwenLogger {
}
logApiErrorEvent(event: ApiErrorEvent): void {
const rumEvent = this.createExceptionEvent({
type: 'error',
subtype: 'api_error',
const rumEvent = this.createResourceEvent('api', 'api_error', {
status_code: event.status_code?.toString() ?? '',
duration: event.duration_ms,
success: 0,
message: event.error,
snapshots: JSON.stringify({
trace_id: event.response_id,
properties: {
auth_type: event.auth_type,
model: event.model,
prompt_id: event.prompt_id,
},
snapshots: JSON.stringify({
error_type: event.error_type,
status_code: event.status_code,
duration_ms: event.duration_ms,
auth_type: event.auth_type,
}),
});
@@ -390,12 +385,10 @@ export class QwenLogger {
}
logFlashFallbackEvent(event: FlashFallbackEvent): void {
const rumEvent = this.createActionEvent({
type: 'fallback',
name: 'flash_fallback',
snapshots: JSON.stringify({
const rumEvent = this.createActionEvent('fallback', 'flash_fallback', {
properties: {
auth_type: event.auth_type,
}),
},
});
this.enqueueLogEvent(rumEvent);
@@ -403,11 +396,12 @@ export class QwenLogger {
}
logLoopDetectedEvent(event: LoopDetectedEvent): void {
const rumEvent = this.createExceptionEvent({
type: 'error',
const rumEvent = this.createExceptionEvent('error', 'loop_detected', {
subtype: 'loop_detected',
snapshots: JSON.stringify({
properties: {
prompt_id: event.prompt_id,
},
snapshots: JSON.stringify({
loop_type: event.loop_type,
}),
});
@@ -417,11 +411,11 @@ export class QwenLogger {
}
logNextSpeakerCheck(event: NextSpeakerCheckEvent): void {
const rumEvent = this.createActionEvent({
type: 'check',
name: 'next_speaker_check',
snapshots: JSON.stringify({
const rumEvent = this.createActionEvent('check', 'next_speaker_check', {
properties: {
prompt_id: event.prompt_id,
},
snapshots: JSON.stringify({
finish_reason: event.finish_reason,
result: event.result,
}),
@@ -432,9 +426,7 @@ export class QwenLogger {
}
logSlashCommandEvent(event: SlashCommandEvent): void {
const rumEvent = this.createActionEvent({
type: 'command',
name: 'slash_command',
const rumEvent = this.createActionEvent('command', 'slash_command', {
snapshots: JSON.stringify({
command: event.command,
subcommand: event.subcommand,
@@ -446,23 +438,23 @@ export class QwenLogger {
}
logMalformedJsonResponseEvent(event: MalformedJsonResponseEvent): void {
const rumEvent = this.createExceptionEvent({
type: 'error',
subtype: 'malformed_json_response',
snapshots: JSON.stringify({
model: event.model,
}),
});
const rumEvent = this.createExceptionEvent(
'error',
'malformed_json_response',
{
subtype: 'malformed_json_response',
properties: {
model: event.model,
},
},
);
this.enqueueLogEvent(rumEvent);
this.flushIfNeeded();
}
logEndSessionEvent(_event: EndSessionEvent): void {
const applicationEvent = this.createViewEvent({
type: 'session',
name: 'session_end',
});
const applicationEvent = this.createViewEvent('session', 'session_end', {});
// Flush immediately on session end.
this.enqueueLogEvent(applicationEvent);
@@ -484,6 +476,9 @@ export class QwenLogger {
}
shutdown() {
if (this.isShutdown) return;
this.isShutdown = true;
const event = new EndSessionEvent(this.config);
this.logEndSessionEvent(event);
}