refactor: openaiContentGenerator (#501)

* refactor: openaiContentGenerator

* refactor: optimize stream handling

* refactor: re-organize refactored files

* fix: unit test cases

* fix: try to fix parallel tool calls with irregular chunk content
This commit is contained in:
Mingholy
2025-09-09 15:34:08 +08:00
committed by GitHub
parent e63233cefc
commit 51b5947627
27 changed files with 7565 additions and 100 deletions

View File

@@ -4,7 +4,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { OpenAIContentGenerator } from '../core/openaiContentGenerator.js';
import { OpenAIContentGenerator } from '../core/openaiContentGenerator/index.js';
import { DashScopeOpenAICompatibleProvider } from '../core/openaiContentGenerator/provider/dashscope.js';
import { IQwenOAuth2Client } from './qwenOAuth2.js';
import { SharedTokenManager } from './sharedTokenManager.js';
import { Config } from '../config/config.js';
@@ -33,15 +34,24 @@ export class QwenContentGenerator extends OpenAIContentGenerator {
constructor(
qwenClient: IQwenOAuth2Client,
contentGeneratorConfig: ContentGeneratorConfig,
config: Config,
cliConfig: Config,
) {
// Initialize with empty API key, we'll override it dynamically
super(contentGeneratorConfig, config);
// Create DashScope provider for Qwen
const dashscopeProvider = new DashScopeOpenAICompatibleProvider(
contentGeneratorConfig,
cliConfig,
);
// Initialize with DashScope provider
super(contentGeneratorConfig, cliConfig, dashscopeProvider);
this.qwenClient = qwenClient;
this.sharedManager = SharedTokenManager.getInstance();
// Set default base URL, will be updated dynamically
this.client.baseURL = DEFAULT_QWEN_BASE_URL;
if (contentGeneratorConfig?.baseUrl && contentGeneratorConfig?.apiKey) {
this.pipeline.client.baseURL = contentGeneratorConfig?.baseUrl;
this.pipeline.client.apiKey = contentGeneratorConfig?.apiKey;
}
}
/**
@@ -106,46 +116,24 @@ export class QwenContentGenerator extends OpenAIContentGenerator {
* Execute an operation with automatic credential management and retry logic.
* This method handles:
* - Dynamic token and endpoint retrieval
* - Temporary client configuration updates
* - Automatic restoration of original configuration
* - Client configuration updates
* - Retry logic on authentication errors with token refresh
*
* @param operation - The operation to execute with updated client configuration
* @param restoreOnCompletion - Whether to restore original config after operation completes
* @returns The result of the operation
*/
private async executeWithCredentialManagement<T>(
operation: () => Promise<T>,
restoreOnCompletion: boolean = true,
): Promise<T> {
// Attempt the operation with credential management and retry logic
const attemptOperation = async (): Promise<T> => {
const { token, endpoint } = await this.getValidToken();
// Store original configuration
const originalApiKey = this.client.apiKey;
const originalBaseURL = this.client.baseURL;
// Apply dynamic configuration
this.client.apiKey = token;
this.client.baseURL = endpoint;
this.pipeline.client.apiKey = token;
this.pipeline.client.baseURL = endpoint;
try {
const result = await operation();
// For streaming operations, we may need to keep the configuration active
if (restoreOnCompletion) {
this.client.apiKey = originalApiKey;
this.client.baseURL = originalBaseURL;
}
return result;
} catch (error) {
// Always restore on error
this.client.apiKey = originalApiKey;
this.client.baseURL = originalBaseURL;
throw error;
}
return await operation();
};
// Execute with retry logic for auth errors
@@ -175,17 +163,14 @@ export class QwenContentGenerator extends OpenAIContentGenerator {
}
/**
* Override to use dynamic token and endpoint with automatic retry.
* Note: For streaming, the client configuration is not restored immediately
* since the generator may continue to be used after this method returns.
* Override to use dynamic token and endpoint with automatic retry
*/
override async generateContentStream(
request: GenerateContentParameters,
userPromptId: string,
): Promise<AsyncGenerator<GenerateContentResponse>> {
return this.executeWithCredentialManagement(
() => super.generateContentStream(request, userPromptId),
false, // Don't restore immediately for streaming
return this.executeWithCredentialManagement(() =>
super.generateContentStream(request, userPromptId),
);
}