Files
qwen-code/packages/a2a-server/src/testing_utils.ts

98 lines
2.3 KiB
TypeScript

/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type {
Task as SDKTask,
TaskStatusUpdateEvent,
SendStreamingMessageSuccessResponse,
} from '@a2a-js/sdk';
import { expect } from 'vitest';
export function createStreamMessageRequest(
text: string,
messageId: string,
taskId?: string,
) {
const request: {
jsonrpc: string;
id: string;
method: string;
params: {
message: {
kind: string;
role: string;
parts: [{ kind: string; text: string }];
messageId: string;
};
metadata: {
coderAgent: {
kind: string;
workspacePath: string;
};
};
taskId?: string;
};
} = {
jsonrpc: '2.0',
id: '1',
method: 'message/stream',
params: {
message: {
kind: 'message',
role: 'user',
parts: [{ kind: 'text', text }],
messageId,
},
metadata: {
coderAgent: {
kind: 'agent-settings',
workspacePath: '/tmp',
},
},
},
};
if (taskId) {
request.params.taskId = taskId;
}
return request;
}
export function assertUniqueFinalEventIsLast(
events: SendStreamingMessageSuccessResponse[],
) {
// Final event is input-required & final
const finalEvent = events[events.length - 1].result as TaskStatusUpdateEvent;
expect(finalEvent.metadata?.['coderAgent']).toMatchObject({
kind: 'state-change',
});
expect(finalEvent.status?.state).toBe('input-required');
expect(finalEvent.final).toBe(true);
// There is only one event with final and its the last
expect(
events.filter((e) => (e.result as TaskStatusUpdateEvent).final).length,
).toBe(1);
expect(
events.findIndex((e) => (e.result as TaskStatusUpdateEvent).final),
).toBe(events.length - 1);
}
export function assertTaskCreationAndWorkingStatus(
events: SendStreamingMessageSuccessResponse[],
) {
// Initial task creation event
const taskEvent = events[0].result as SDKTask;
expect(taskEvent.kind).toBe('task');
expect(taskEvent.status.state).toBe('submitted');
// Status update: working
const workingEvent = events[1].result as TaskStatusUpdateEvent;
expect(workingEvent.kind).toBe('status-update');
expect(workingEvent.status.state).toBe('working');
}