Files
qwen-code/packages/core/src/tools/mcp-client.ts
2025-06-13 16:18:06 -04:00

354 lines
11 KiB
TypeScript

/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { parse } from 'shell-quote';
import { MCPServerConfig } from '../config/config.js';
import { DiscoveredMCPTool } from './mcp-tool.js';
import { CallableTool, FunctionDeclaration, mcpToTool } from '@google/genai';
import { ToolRegistry } from './tool-registry.js';
export const MCP_DEFAULT_TIMEOUT_MSEC = 10 * 60 * 1000; // default to 10 minutes
/**
* Enum representing the connection status of an MCP server
*/
export enum MCPServerStatus {
/** Server is disconnected or experiencing errors */
DISCONNECTED = 'disconnected',
/** Server is in the process of connecting */
CONNECTING = 'connecting',
/** Server is connected and ready to use */
CONNECTED = 'connected',
}
/**
* Enum representing the overall MCP discovery state
*/
export enum MCPDiscoveryState {
/** Discovery has not started yet */
NOT_STARTED = 'not_started',
/** Discovery is currently in progress */
IN_PROGRESS = 'in_progress',
/** Discovery has completed (with or without errors) */
COMPLETED = 'completed',
}
/**
* Map to track the status of each MCP server within the core package
*/
const mcpServerStatusesInternal: Map<string, MCPServerStatus> = new Map();
/**
* Track the overall MCP discovery state
*/
let mcpDiscoveryState: MCPDiscoveryState = MCPDiscoveryState.NOT_STARTED;
/**
* Event listeners for MCP server status changes
*/
type StatusChangeListener = (
serverName: string,
status: MCPServerStatus,
) => void;
const statusChangeListeners: StatusChangeListener[] = [];
/**
* Add a listener for MCP server status changes
*/
export function addMCPStatusChangeListener(
listener: StatusChangeListener,
): void {
statusChangeListeners.push(listener);
}
/**
* Remove a listener for MCP server status changes
*/
export function removeMCPStatusChangeListener(
listener: StatusChangeListener,
): void {
const index = statusChangeListeners.indexOf(listener);
if (index !== -1) {
statusChangeListeners.splice(index, 1);
}
}
/**
* Update the status of an MCP server
*/
function updateMCPServerStatus(
serverName: string,
status: MCPServerStatus,
): void {
mcpServerStatusesInternal.set(serverName, status);
// Notify all listeners
for (const listener of statusChangeListeners) {
listener(serverName, status);
}
}
/**
* Get the current status of an MCP server
*/
export function getMCPServerStatus(serverName: string): MCPServerStatus {
return (
mcpServerStatusesInternal.get(serverName) || MCPServerStatus.DISCONNECTED
);
}
/**
* Get all MCP server statuses
*/
export function getAllMCPServerStatuses(): Map<string, MCPServerStatus> {
return new Map(mcpServerStatusesInternal);
}
/**
* Get the current MCP discovery state
*/
export function getMCPDiscoveryState(): MCPDiscoveryState {
return mcpDiscoveryState;
}
export async function discoverMcpTools(
mcpServers: Record<string, MCPServerConfig>,
mcpServerCommand: string | undefined,
toolRegistry: ToolRegistry,
): Promise<void> {
// Set discovery state to in progress
mcpDiscoveryState = MCPDiscoveryState.IN_PROGRESS;
try {
if (mcpServerCommand) {
const cmd = mcpServerCommand;
const args = parse(cmd, process.env) as string[];
if (args.some((arg) => typeof arg !== 'string')) {
throw new Error('failed to parse mcpServerCommand: ' + cmd);
}
// use generic server name 'mcp'
mcpServers['mcp'] = {
command: args[0],
args: args.slice(1),
};
}
const discoveryPromises = Object.entries(mcpServers).map(
([mcpServerName, mcpServerConfig]) =>
connectAndDiscover(mcpServerName, mcpServerConfig, toolRegistry),
);
await Promise.all(discoveryPromises);
// Mark discovery as completed
mcpDiscoveryState = MCPDiscoveryState.COMPLETED;
} catch (error) {
// Still mark as completed even with errors
mcpDiscoveryState = MCPDiscoveryState.COMPLETED;
throw error;
}
}
async function connectAndDiscover(
mcpServerName: string,
mcpServerConfig: MCPServerConfig,
toolRegistry: ToolRegistry,
): Promise<void> {
// Initialize the server status as connecting
updateMCPServerStatus(mcpServerName, MCPServerStatus.CONNECTING);
let transport;
if (mcpServerConfig.httpUrl) {
transport = new StreamableHTTPClientTransport(
new URL(mcpServerConfig.httpUrl),
);
} else if (mcpServerConfig.url) {
transport = new SSEClientTransport(new URL(mcpServerConfig.url));
} else if (mcpServerConfig.command) {
transport = new StdioClientTransport({
command: mcpServerConfig.command,
args: mcpServerConfig.args || [],
env: {
...process.env,
...(mcpServerConfig.env || {}),
} as Record<string, string>,
cwd: mcpServerConfig.cwd,
stderr: 'pipe',
});
} else {
console.error(
`MCP server '${mcpServerName}' has invalid configuration: missing httpUrl (for Streamable HTTP), url (for SSE), and command (for stdio). Skipping.`,
);
// Update status to disconnected
updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED);
return;
}
const mcpClient = new Client({
name: 'gemini-cli-mcp-client',
version: '0.0.1',
});
// patch Client.callTool to use request timeout as genai McpCallTool.callTool does not do it
// TODO: remove this hack once GenAI SDK does callTool with request options
if ('callTool' in mcpClient) {
const origCallTool = mcpClient.callTool.bind(mcpClient);
mcpClient.callTool = function (params, resultSchema, options) {
return origCallTool(params, resultSchema, {
...options,
timeout: mcpServerConfig.timeout ?? MCP_DEFAULT_TIMEOUT_MSEC,
});
};
}
try {
await mcpClient.connect(transport, {
timeout: mcpServerConfig.timeout ?? MCP_DEFAULT_TIMEOUT_MSEC,
});
// Connection successful
updateMCPServerStatus(mcpServerName, MCPServerStatus.CONNECTED);
} catch (error) {
// Create a safe config object that excludes sensitive information
const safeConfig = {
command: mcpServerConfig.command,
url: mcpServerConfig.url,
cwd: mcpServerConfig.cwd,
timeout: mcpServerConfig.timeout,
trust: mcpServerConfig.trust,
// Exclude args and env which may contain sensitive data
};
console.error(
`failed to start or connect to MCP server '${mcpServerName}' ` +
`${JSON.stringify(safeConfig)}; \n${error}`,
);
// Update status to disconnected
updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED);
return;
}
mcpClient.onerror = (error) => {
console.error(`MCP ERROR (${mcpServerName}):`, error.toString());
// Update status to disconnected on error
updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED);
};
if (transport instanceof StdioClientTransport && transport.stderr) {
transport.stderr.on('data', (data) => {
const stderrStr = data.toString();
// Filter out verbose INFO logs from some MCP servers
if (!stderrStr.includes('] INFO')) {
console.debug(`MCP STDERR (${mcpServerName}):`, stderrStr);
}
});
}
try {
const mcpCallableTool: CallableTool = mcpToTool(mcpClient);
const discoveredToolFunctions = await mcpCallableTool.tool();
if (
!discoveredToolFunctions ||
!Array.isArray(discoveredToolFunctions.functionDeclarations)
) {
console.error(
`MCP server '${mcpServerName}' did not return valid tool function declarations. Skipping.`,
);
if (
transport instanceof StdioClientTransport ||
transport instanceof SSEClientTransport ||
transport instanceof StreamableHTTPClientTransport
) {
await transport.close();
}
// Update status to disconnected
updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED);
return;
}
for (const funcDecl of discoveredToolFunctions.functionDeclarations) {
if (!funcDecl.name) {
console.warn(
`Discovered a function declaration without a name from MCP server '${mcpServerName}'. Skipping.`,
);
continue;
}
let toolNameForModel = funcDecl.name;
// Replace invalid characters (based on 400 error message from Gemini API) with underscores
toolNameForModel = toolNameForModel.replace(/[^a-zA-Z0-9_.-]/g, '_');
const existingTool = toolRegistry.getTool(toolNameForModel);
if (existingTool) {
toolNameForModel = mcpServerName + '__' + toolNameForModel;
}
// If longer than 63 characters, replace middle with '___'
// (Gemini API says max length 64, but actual limit seems to be 63)
if (toolNameForModel.length > 63) {
toolNameForModel =
toolNameForModel.slice(0, 28) + '___' + toolNameForModel.slice(-32);
}
// Ensure parameters is a valid JSON schema object, default to empty if not.
const parameterSchema: Record<string, unknown> =
funcDecl.parameters && typeof funcDecl.parameters === 'object'
? { ...(funcDecl.parameters as FunctionDeclaration) }
: { type: 'object', properties: {} };
toolRegistry.registerTool(
new DiscoveredMCPTool(
mcpCallableTool,
mcpServerName,
toolNameForModel,
funcDecl.description ?? '',
parameterSchema,
funcDecl.name,
mcpServerConfig.timeout ?? MCP_DEFAULT_TIMEOUT_MSEC,
mcpServerConfig.trust,
),
);
}
} catch (error) {
console.error(
`Failed to list or register tools for MCP server '${mcpServerName}': ${error}`,
);
// Ensure transport is cleaned up on error too
if (
transport instanceof StdioClientTransport ||
transport instanceof SSEClientTransport ||
transport instanceof StreamableHTTPClientTransport
) {
await transport.close();
}
// Update status to disconnected
updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED);
}
// If no tools were registered from this MCP server, the following 'if' block
// will close the connection. This is done to conserve resources and prevent
// an orphaned connection to a server that isn't providing any usable
// functionality. Connections to servers that did provide tools are kept
// open, as those tools will require the connection to function.
if (toolRegistry.getToolsByServer(mcpServerName).length === 0) {
console.log(
`No tools registered from MCP server '${mcpServerName}'. Closing connection.`,
);
if (
transport instanceof StdioClientTransport ||
transport instanceof SSEClientTransport ||
transport instanceof StreamableHTTPClientTransport
) {
await transport.close();
// Update status to disconnected
updateMCPServerStatus(mcpServerName, MCPServerStatus.DISCONNECTED);
}
}
}