for partial message processing and event timeout processing

This commit is contained in:
skyfire
2025-12-31 20:15:51 +08:00
parent ac7ba95d65
commit e4caa7a856
30 changed files with 934 additions and 254 deletions

View File

@@ -25,6 +25,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<checkstyle-maven-plugin.version>3.6.0</checkstyle-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.12</jacoco-maven-plugin.version>
<junit5.version>5.14.1</junit5.version>
<logback-classic.version>1.3.16</logback-classic.version>
<fastjson2.version>2.0.60</fastjson2.version>
@@ -81,6 +82,25 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@@ -1,6 +0,0 @@
package com.alibaba.qwen.code.cli;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
public class Options extends TransportOptions {
}

View File

@@ -1,54 +0,0 @@
package com.alibaba.qwen.code.cli;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.qwen.code.cli.protocol.message.Message;
import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage;
import com.alibaba.qwen.code.cli.session.Session;
import com.alibaba.qwen.code.cli.session.event.SessionEventSimpleConsumers;
import com.alibaba.qwen.code.cli.transport.Transport;
import com.alibaba.qwen.code.cli.transport.process.ProcessTransport;
public class QwenCli {
public static List<Message> query(String prompt) {
Transport transport;
try {
transport = new ProcessTransport();
} catch (Exception e) {
throw new RuntimeException("initialized ProcessTransport error!", e);
}
Session session;
try {
session = new Session(transport);
} catch (Exception e) {
throw new RuntimeException("initialized Session error!", e);
}
final List<Message> response = new ArrayList<>();
try {
session.sendPrompt(prompt, new SessionEventSimpleConsumers() {
@Override
public void onSystemMessage(Session session, SDKSystemMessage systemMessage) {
response.add(systemMessage);
}
@Override
public void onAssistantMessage(Session session, SDKAssistantMessage assistantMessage) {
response.add(assistantMessage);
}
});
} catch (Exception e) {
throw new RuntimeException("sendPrompt error!", e);
}
try {
session.close();
} catch (Exception e) {
throw new RuntimeException("close Session error!", e);
}
return response;
}
}

View File

@@ -0,0 +1,70 @@
package com.alibaba.qwen.code.cli;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.alibaba.fastjson2.JSON;
import com.alibaba.qwen.code.cli.protocol.data.AssistantContent;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior.Operation;
import com.alibaba.qwen.code.cli.session.Session;
import com.alibaba.qwen.code.cli.session.event.SessionEventSimpleConsumers;
import com.alibaba.qwen.code.cli.transport.Transport;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import com.alibaba.qwen.code.cli.transport.process.ProcessTransport;
import com.alibaba.qwen.code.cli.utils.MyConcurrentUtils;
import com.alibaba.qwen.code.cli.utils.Timeout;
public class QwenCodeCli {
public static List<String> simpleQuery(String prompt) {
final List<String> response = new ArrayList<>();
MyConcurrentUtils.runAndWait(() -> simpleQuery(prompt, response::add), Timeout.TIMEOUT_30_MINUTES);
return response;
}
public static void simpleQuery(String prompt, Consumer<String> messageConsumer) {
Session session = newSessionWithProcessTransport(new TransportOptions());
try {
session.sendPrompt(prompt, new SessionEventSimpleConsumers() {
@Override
public void onAssistantMessageIncludePartial(Session session, List<AssistantContent> assistantContents, AssistantMessageOutputType assistantMessageOutputType) {
messageConsumer.accept(assistantContents.stream()
.map(AssistantContent::getContent)
.map(content -> {
if (content instanceof String) {
return (String) content;
} else {
return JSON.toJSONString(content);
}
}).collect(Collectors.joining()));
}
}.setDefaultPermissionOperation(Operation.allow));
} catch (Exception e) {
throw new RuntimeException("sendPrompt error!", e);
}
try {
session.close();
} catch (Exception e) {
throw new RuntimeException("close Session error!", e);
}
}
public static Session newSessionWithProcessTransport(TransportOptions transportOptions) {
Transport transport;
try {
transport = new ProcessTransport(transportOptions);
} catch (Exception e) {
throw new RuntimeException("initialized ProcessTransport error!", e);
}
Session session;
try {
session = new Session(transport);
} catch (Exception e) {
throw new RuntimeException("initialized Session error!", e);
}
return session;
}
}

View File

@@ -0,0 +1,6 @@
package com.alibaba.qwen.code.cli.protocol.data;
public interface AssistantContent {
String getType();
Object getContent();
}

View File

@@ -15,6 +15,11 @@ public class SDKAssistantMessage extends MessageBase {
@JSONField(name = "parent_tool_use_id")
private String parentToolUseId;
public SDKAssistantMessage() {
super();
this.type = "assistant";
}
public String getUuid() {
return uuid;
}

View File

@@ -0,0 +1,55 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant;
import com.alibaba.fastjson2.annotation.JSONField;
import com.alibaba.fastjson2.annotation.JSONType;
import com.alibaba.qwen.code.cli.protocol.message.MessageBase;
import com.alibaba.qwen.code.cli.protocol.message.assistant.event.StreamEvent;
@JSONType(typeKey = "type", typeName = "stream_event")
public class SDKPartialAssistantMessage extends MessageBase {
private String uuid;
@JSONField(name = "session_id")
private String sessionId;
private StreamEvent event;
@JSONField(name = "parent_tool_use_id")
private String parentToolUseId;
public SDKPartialAssistantMessage() {
super();
this.type = "stream_event";
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public StreamEvent getEvent() {
return event;
}
public void setEvent(StreamEvent event) {
this.event = event;
}
public String getParentToolUseId() {
return parentToolUseId;
}
public void setParentToolUseId(String parentToolUseId) {
this.parentToolUseId = parentToolUseId;
}
}

View File

@@ -4,9 +4,10 @@ import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.annotation.JSONType;
import com.alibaba.qwen.code.cli.protocol.data.AssistantContent;
@JSONType(typeKey = "type", typeName = "ContentBlock", seeAlso = { TextBlock.class, ToolResultBlock.class, ThinkingBlock.class, ToolUseBlock.class })
public class ContentBlock {
public abstract class ContentBlock implements AssistantContent {
protected String type;
protected List<Annotation> annotations;

View File

@@ -13,4 +13,9 @@ public class TextBlock extends ContentBlock {
public void setText(String text) {
this.text = text;
}
@Override
public Object getContent() {
return text;
}
}

View File

@@ -22,4 +22,9 @@ public class ThinkingBlock extends ContentBlock{
public void setSignature(String signature) {
this.signature = signature;
}
@Override
public Object getContent() {
return thinking;
}
}

View File

@@ -46,4 +46,9 @@ public class ToolUseBlock extends ContentBlock {
public void setAnnotations(List<Annotation> annotations) {
this.annotations = annotations;
}
@Override
public Object getContent() {
return input;
}
}

View File

@@ -0,0 +1,96 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant.event;
import com.alibaba.fastjson2.annotation.JSONField;
import com.alibaba.fastjson2.annotation.JSONType;
import com.alibaba.qwen.code.cli.protocol.data.AssistantContent;
@JSONType(typeKey = "type", typeName = "content_block_delta")
public class ContentBlockDeltaEvent extends StreamEvent {
private int index;
private ContentBlockDelta delta;
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public ContentBlockDelta getDelta() {
return delta;
}
public void setDelta(ContentBlockDelta delta) {
this.delta = delta;
}
@JSONType(typeKey = "type", typeName = "ContentBlockDelta",
seeAlso = {ContentBlockDeltaText.class, ContentBlockDeltaThinking.class, ContentBlockDeltaInputJson.class})
public abstract static class ContentBlockDelta implements AssistantContent {
private String type;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
@JSONType(typeKey = "type", typeName = "text_delta")
public static class ContentBlockDeltaText extends ContentBlockDelta {
private String text;
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
@Override
public Object getContent() {
return text;
}
}
@JSONType(typeKey = "type", typeName = "thinking_delta")
public static class ContentBlockDeltaThinking extends ContentBlockDelta {
private String thinking;
public String getThinking() {
return thinking;
}
public void setThinking(String thinking) {
this.thinking = thinking;
}
@Override
public Object getContent() {
return thinking;
}
}
@JSONType(typeKey = "type", typeName = "input_json_delta")
public static class ContentBlockDeltaInputJson extends ContentBlockDelta {
@JSONField(name = "partial_json")
private String partialJson;
public String getPartialJson() {
return partialJson;
}
public void setPartialJson(String partialJson) {
this.partialJson = partialJson;
}
@Override
public Object getContent() {
return partialJson;
}
}
}

View File

@@ -0,0 +1,13 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant.event;
import com.alibaba.fastjson2.annotation.JSONField;
import com.alibaba.fastjson2.annotation.JSONType;
import com.alibaba.qwen.code.cli.protocol.message.assistant.block.ContentBlock;
@JSONType(typeKey = "type", typeName = "content_block_start")
public class ContentBlockStartEvent extends StreamEvent{
private int index;
@JSONField(name = "content_block")
private ContentBlock contentBlock;
}

View File

@@ -0,0 +1,16 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant.event;
import com.alibaba.fastjson2.annotation.JSONType;
@JSONType(typeKey = "type", typeName = "content_block_stop")
public class ContentBlockStopEvent extends StreamEvent{
Long index;
public Long getIndex() {
return index;
}
public void setIndex(Long index) {
this.index = index;
}
}

View File

@@ -0,0 +1,47 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant.event;
import com.alibaba.fastjson2.annotation.JSONType;
@JSONType(typeName = "message_start")
public class MessageStartStreamEvent extends StreamEvent{
private Message message;
public static class Message {
private String id;
private String role;
private String model;
// Getters and setters
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getRole() {
return role;
}
public void setRole(String role) {
this.role = role;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
}
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
}

View File

@@ -0,0 +1,7 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant.event;
import com.alibaba.fastjson2.annotation.JSONType;
@JSONType(typeName = "message_stop")
public class MessageStopStreamEvent extends StreamEvent{
}

View File

@@ -0,0 +1,18 @@
package com.alibaba.qwen.code.cli.protocol.message.assistant.event;
import com.alibaba.fastjson2.annotation.JSONType;
@JSONType(typeKey = "type", typeName = "StreamEvent",
seeAlso = {MessageStartStreamEvent.class, MessageStopStreamEvent.class, ContentBlockStartEvent.class, ContentBlockStopEvent.class,
ContentBlockDeltaEvent.class})
public class StreamEvent {
protected String type;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}

View File

@@ -0,0 +1,22 @@
package com.alibaba.qwen.code.cli.protocol.message.control;
public class CLIControlSetModelResponse {
String subtype = "set_model";
String model;
public String getSubtype() {
return subtype;
}
public void setSubtype(String subtype) {
this.subtype = subtype;
}
public String getModel() {
return model;
}
public void setModel(String model) {
this.model = model;
}
}

View File

@@ -2,6 +2,8 @@ package com.alibaba.qwen.code.cli.session;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
@@ -15,6 +17,7 @@ import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKUserMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKPartialAssistantMessage;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlInitializeRequest;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlInitializeResponse;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlInterruptRequest;
@@ -28,16 +31,19 @@ import com.alibaba.qwen.code.cli.session.event.SessionEventConsumers;
import com.alibaba.qwen.code.cli.session.exception.SessionControlException;
import com.alibaba.qwen.code.cli.session.exception.SessionSendPromptException;
import com.alibaba.qwen.code.cli.transport.Transport;
import com.alibaba.qwen.code.cli.utils.MyConcurrentUtils;
import com.alibaba.qwen.code.cli.utils.Timeout;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Session {
private static final Logger log = LoggerFactory.getLogger(Session.class);
private final Transport transport;
private CLIControlInitializeResponse lastCliControlInitializeResponse;
private SDKSystemMessage lastSdkSystemMessage;
private static final Logger log = LoggerFactory.getLogger(Session.class);
private final Timeout defaultEventTimeout = Timeout.TIMEOUT_60_SECONDS;
public Session(Transport transport) throws SessionControlException {
if (transport == null || !transport.isAvailable()) {
@@ -61,43 +67,44 @@ public class Session {
}
}
public void interrupt() throws SessionControlException {
if (!isAvailable()) {
throw new SessionControlException("Session is not available");
}
public void close() throws SessionControlException {
try {
transport.inputNoWaitResponse(
new CLIControlRequest<CLIControlInterruptRequest>().setRequest(new CLIControlInterruptRequest()).toString());
transport.close();
} catch (Exception e) {
throw new SessionControlException("Failed to interrupt the session", e);
throw new SessionControlException("Failed to close the session", e);
}
}
public void setModel(String modelName) throws SessionControlException {
if (!isAvailable()) {
throw new SessionControlException("Session is not available");
}
public Optional<Boolean> interrupt() throws SessionControlException {
checkAvailable();
return processControlRequest(new CLIControlRequest<CLIControlInterruptRequest>().setRequest(new CLIControlInterruptRequest()).toString());
}
public Optional<Boolean> setModel(String modelName) throws SessionControlException {
checkAvailable();
CLIControlSetModelRequest cliControlSetModelRequest = new CLIControlSetModelRequest();
cliControlSetModelRequest.setModel(modelName);
try {
transport.inputNoWaitResponse(new CLIControlRequest<CLIControlSetModelRequest>().setRequest(cliControlSetModelRequest).toString());
} catch (Exception e) {
throw new SessionControlException("Failed to set model", e);
}
return processControlRequest(new CLIControlRequest<CLIControlSetModelRequest>().setRequest(cliControlSetModelRequest).toString());
}
public void setPermissionMode(PermissionMode permissionMode) throws SessionControlException {
if (!isAvailable()) {
throw new SessionControlException("Session is not available");
}
public Optional<Boolean> setPermissionMode(PermissionMode permissionMode) throws SessionControlException {
checkAvailable();
CLIControlSetPermissionModeRequest cliControlSetPermissionModeRequest = new CLIControlSetPermissionModeRequest();
cliControlSetPermissionModeRequest.setMode(permissionMode.getValue());
return processControlRequest(
new CLIControlRequest<CLIControlSetPermissionModeRequest>().setRequest(cliControlSetPermissionModeRequest).toString());
}
private Optional<Boolean> processControlRequest(String request) throws SessionControlException {
try {
transport.inputNoWaitResponse(
new CLIControlRequest<CLIControlSetPermissionModeRequest>().setRequest(cliControlSetPermissionModeRequest).toString());
if (transport.isReading()) {
transport.inputNoWaitResponse(request);
return Optional.empty();
} else {
String response = transport.inputWaitForOneLine(request);
CLIControlResponse<?> cliControlResponse = JSON.parseObject(response, new TypeReference<CLIControlResponse<?>>() {});
return Optional.of("success".equals(cliControlResponse.getResponse().getSubtype()));
}
} catch (Exception e) {
throw new SessionControlException("Failed to set model", e);
}
@@ -108,61 +115,43 @@ public class Session {
}
public void resumeSession(String sessionId) throws SessionControlException {
if (!isAvailable()) {
throw new SessionControlException("Session is not available");
}
if (StringUtils.isNotBlank(sessionId)) {
transport.getTransportOptions().setResumeSessionId(sessionId);
}
this.start();
}
public String getSessionId() {
return Optional.ofNullable(lastSdkSystemMessage).map(SDKSystemMessage::getSessionId).orElse(null);
}
public void close() throws SessionControlException {
try {
transport.close();
} catch (Exception e) {
throw new SessionControlException("Failed to close the session", e);
}
}
public boolean isAvailable() {
return transport.isAvailable();
}
public Capabilities getCapabilities() {
return Optional.ofNullable(lastCliControlInitializeResponse).map(CLIControlInitializeResponse::getCapabilities).orElse(new Capabilities());
}
public void sendPrompt(String prompt, SessionEventConsumers sessionEventConsumers) throws SessionSendPromptException {
if (!transport.isAvailable()) {
throw new SessionSendPromptException("Session is not available");
}
public void sendPrompt(String prompt, SessionEventConsumers sessionEventConsumers) throws SessionSendPromptException, SessionControlException {
checkAvailable();
try {
transport.inputWaitForMultiLine(new SDKUserMessage().setContent(prompt).toString(), (line) -> {
log.debug("read a message from agent {}", line);
JSONObject jsonObject = JSON.parseObject(line);
String messageType = jsonObject.getString("type");
if ("system".equals(messageType)) {
lastSdkSystemMessage = jsonObject.to(SDKSystemMessage.class);
sessionEventConsumers.onSystemMessage(this, lastSdkSystemMessage);
MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onSystemMessage(this, lastSdkSystemMessage),
Optional.ofNullable(sessionEventConsumers.onSystemMessageTimeout(this)).orElse(defaultEventTimeout));
return false;
} else if ("assistant".equals(messageType)) {
sessionEventConsumers.onAssistantMessage(this, jsonObject.to(SDKAssistantMessage.class));
MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onAssistantMessage(this, jsonObject.to(SDKAssistantMessage.class)),
Optional.ofNullable(sessionEventConsumers.onAssistantMessageTimeout(this)).orElse(defaultEventTimeout));
return false;
} else if ("stream_event".equals(messageType)) {
MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onPartialAssistantMessage(this, jsonObject.to(SDKPartialAssistantMessage.class)),
Optional.ofNullable(sessionEventConsumers.onPartialAssistantMessageTimeout(this)).orElse(defaultEventTimeout));
return false;
} else if ("user".equals(messageType)) {
sessionEventConsumers.onUserMessage(this, jsonObject.to(SDKUserMessage.class, Feature.FieldBased));
MyConcurrentUtils.runAndWait(
() -> sessionEventConsumers.onUserMessage(this, jsonObject.to(SDKUserMessage.class, Feature.FieldBased)),
Optional.ofNullable(sessionEventConsumers.onUserMessageTimeout(this)).orElse(defaultEventTimeout));
return false;
} else if ("result".equals(messageType)) {
sessionEventConsumers.onResultMessage(this, jsonObject.to(SDKResultMessage.class));
MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onResultMessage(this, jsonObject.to(SDKResultMessage.class)),
Optional.ofNullable(sessionEventConsumers.onResultMessageTimeout(this)).orElse(defaultEventTimeout));
return true;
} else if ("control_response".equals(messageType)) {
sessionEventConsumers.onControlResponse(this, jsonObject.to(CLIControlResponse.class));
MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onControlResponse(this, jsonObject.to(CLIControlResponse.class)),
Optional.ofNullable(sessionEventConsumers.onControlResponseTimeout(this)).orElse(defaultEventTimeout));
if (!"error".equals(jsonObject.getString("subtype"))) {
return false;
} else {
@@ -170,10 +159,11 @@ public class Session {
return "error".equals(jsonObject.getString("subtype"));
}
} else if ("control_request".equals(messageType)) {
return processControlRequest(jsonObject, sessionEventConsumers);
return processControlRequestInThePrompting(jsonObject, sessionEventConsumers);
} else {
log.warn("unknown message type: {}", messageType);
sessionEventConsumers.onOtherMessage(this, line);
MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onOtherMessage(this, line),
Optional.ofNullable(sessionEventConsumers.onOtherMessageTimeout(this)).orElse(defaultEventTimeout));
return false;
}
});
@@ -182,7 +172,7 @@ public class Session {
}
}
private boolean processControlRequest(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) {
private boolean processControlRequestInThePrompting(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) {
String subType = Optional.of(jsonObject)
.map(cr -> cr.getJSONObject("request"))
.map(r -> r.getString("subtype"))
@@ -190,13 +180,21 @@ public class Session {
if ("can_use_tool".equals(subType)) {
try {
return processPermissionResponse(jsonObject, sessionEventConsumers);
} catch (IOException e) {
} catch (IOException | ExecutionException | InterruptedException | TimeoutException e) {
log.error("Failed to process permission response", e);
return false;
}
} else {
CLIControlResponse<?> cliControlResponse = sessionEventConsumers.onControlRequest(this,
jsonObject.to(new TypeReference<CLIControlRequest<?>>() {}));
CLIControlResponse<?> cliControlResponse;
try {
cliControlResponse = MyConcurrentUtils.runAndWait(
() -> sessionEventConsumers.onControlRequest(this, jsonObject.to(new TypeReference<CLIControlRequest<?>>() {})),
Optional.ofNullable(sessionEventConsumers.onControlRequestTimeout(this)).orElse(defaultEventTimeout));
} catch (Exception e) {
log.error("Failed to process control request", e);
return false;
}
if (cliControlResponse != null) {
try {
transport.inputNoWaitResponse(cliControlResponse.toString());
@@ -209,9 +207,13 @@ public class Session {
}
}
private boolean processPermissionResponse(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers) throws IOException {
CLIControlRequest<CLIControlPermissionRequest> permissionRequest = jsonObject.to(new TypeReference<CLIControlRequest<CLIControlPermissionRequest>>() {});
Behavior behavior = Optional.ofNullable(sessionEventConsumers.onPermissionRequest(this, permissionRequest))
private boolean processPermissionResponse(JSONObject jsonObject, SessionEventConsumers sessionEventConsumers)
throws IOException, ExecutionException, InterruptedException, TimeoutException {
CLIControlRequest<CLIControlPermissionRequest> permissionRequest = jsonObject.to(
new TypeReference<CLIControlRequest<CLIControlPermissionRequest>>() {});
Behavior behavior = Optional.ofNullable(MyConcurrentUtils.runAndWait(() -> sessionEventConsumers.onPermissionRequest(this, permissionRequest),
Optional.ofNullable(sessionEventConsumers.onPermissionRequestTimeout(this)).orElse(defaultEventTimeout)))
.map(b -> {
if (b instanceof Allow) {
Allow allow = (Allow) b;
@@ -223,11 +225,30 @@ public class Session {
})
.orElse(Behavior.defaultBehavior());
CLIControlResponse<CLIControlPermissionResponse> permissionResponse = new CLIControlResponse<>();
permissionResponse.createResponse().setResponse(new CLIControlPermissionResponse().setBehavior(behavior)).setRequestId(permissionRequest.getRequestId());
permissionResponse.createResponse().setResponse(new CLIControlPermissionResponse().setBehavior(behavior)).setRequestId(
permissionRequest.getRequestId());
String permissionMessage = permissionResponse.toString();
log.debug("send permission message to agent: {}", permissionMessage);
transport.inputNoWaitResponse(permissionMessage);
return false;
}
public String getSessionId() {
return Optional.ofNullable(lastSdkSystemMessage).map(SDKSystemMessage::getSessionId).orElse(null);
}
public boolean isAvailable() {
return transport.isAvailable();
}
public Capabilities getCapabilities() {
return Optional.ofNullable(lastCliControlInitializeResponse).map(CLIControlInitializeResponse::getCapabilities).orElse(new Capabilities());
}
private void checkAvailable() throws SessionControlException {
if (!isAvailable()) {
throw new SessionControlException("Session is not available");
}
}
}

View File

@@ -5,10 +5,12 @@ import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKUserMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKPartialAssistantMessage;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlPermissionRequest;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlRequest;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlResponse;
import com.alibaba.qwen.code.cli.session.Session;
import com.alibaba.qwen.code.cli.utils.Timeout;
public interface SessionEventConsumers {
void onSystemMessage(Session session, SDKSystemMessage systemMessage);
@@ -17,6 +19,8 @@ public interface SessionEventConsumers {
void onAssistantMessage(Session session, SDKAssistantMessage assistantMessage);
void onPartialAssistantMessage(Session session, SDKPartialAssistantMessage partialAssistantMessage);
void onUserMessage(Session session, SDKUserMessage userMessage);
void onOtherMessage(Session session, String message);
@@ -26,4 +30,22 @@ public interface SessionEventConsumers {
CLIControlResponse<?> onControlRequest(Session session, CLIControlRequest<?> cliControlRequest);
Behavior onPermissionRequest(Session session, CLIControlRequest<CLIControlPermissionRequest> permissionRequest);
Timeout onSystemMessageTimeout(Session session);
Timeout onResultMessageTimeout(Session session);
Timeout onAssistantMessageTimeout(Session session);
Timeout onPartialAssistantMessageTimeout(Session session);
Timeout onUserMessageTimeout(Session session);
Timeout onOtherMessageTimeout(Session session);
Timeout onControlResponseTimeout(Session session);
Timeout onControlRequestTimeout(Session session);
Timeout onPermissionRequestTimeout(Session session);
}

View File

@@ -1,14 +1,28 @@
package com.alibaba.qwen.code.cli.session.event;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import com.alibaba.qwen.code.cli.protocol.data.AssistantContent;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Allow;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior.Operation;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Deny;
import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKUserMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKPartialAssistantMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.event.ContentBlockDeltaEvent;
import com.alibaba.qwen.code.cli.protocol.message.assistant.event.StreamEvent;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlPermissionRequest;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlRequest;
import com.alibaba.qwen.code.cli.protocol.message.control.CLIControlResponse;
import com.alibaba.qwen.code.cli.session.Session;
import com.alibaba.qwen.code.cli.utils.Timeout;
public class SessionEventSimpleConsumers implements SessionEventConsumers {
@Override
@@ -21,6 +35,22 @@ public class SessionEventSimpleConsumers implements SessionEventConsumers {
@Override
public void onAssistantMessage(Session session, SDKAssistantMessage assistantMessage) {
onAssistantMessageIncludePartial(session, Optional.ofNullable(assistantMessage.getMessage().getContent())
.map(cbs -> cbs.stream().map(cb -> (AssistantContent) cb).collect(Collectors.toList()))
.orElse(new ArrayList<>()), AssistantMessageOutputType.entire);
}
@Override
public void onPartialAssistantMessage(Session session, SDKPartialAssistantMessage partialAssistantMessage) {
StreamEvent event = partialAssistantMessage.getEvent();
if (!(event instanceof ContentBlockDeltaEvent)) {
return;
}
onAssistantMessageIncludePartial(session, Collections.singletonList(((ContentBlockDeltaEvent) event).getDelta()), AssistantMessageOutputType.partial);
}
public void onAssistantMessageIncludePartial(Session session, List<AssistantContent> assistantContents,
AssistantMessageOutputType assistantMessageOutputType) {
}
@Override
@@ -42,6 +72,89 @@ public class SessionEventSimpleConsumers implements SessionEventConsumers {
@Override
public Behavior onPermissionRequest(Session session, CLIControlRequest<CLIControlPermissionRequest> permissionRequest) {
return Behavior.defaultBehavior();
if (Operation.deny.equals(this.defaultPermissionOperation)) {
return new Deny().setMessage("Permission denied.");
} else {
return new Allow().setUpdatedInput(permissionRequest.getRequest().getInput());
}
}
@Override
public Timeout onSystemMessageTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onResultMessageTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onAssistantMessageTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onPartialAssistantMessageTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onUserMessageTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onOtherMessageTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onControlResponseTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onControlRequestTimeout(Session session) {
return defaultEventTimeout;
}
@Override
public Timeout onPermissionRequestTimeout(Session session) {
return defaultEventTimeout;
}
public Timeout getDefaultEventTimeout() {
return defaultEventTimeout;
}
public SessionEventSimpleConsumers setDefaultEventTimeout(Timeout defaultEventTimeout) {
this.defaultEventTimeout = defaultEventTimeout;
return this;
}
public Operation getDefaultPermissionOperation() {
return defaultPermissionOperation;
}
public SessionEventSimpleConsumers setDefaultPermissionOperation(Operation defaultPermissionOperation) {
this.defaultPermissionOperation = defaultPermissionOperation;
return this;
}
public SessionEventSimpleConsumers() {
}
public SessionEventSimpleConsumers(Operation defaultPermissionOperation, Timeout defaultEventTimeout) {
this.defaultPermissionOperation = defaultPermissionOperation;
this.defaultEventTimeout = defaultEventTimeout;
}
private Operation defaultPermissionOperation = Operation.deny;
protected Timeout defaultEventTimeout = Timeout.TIMEOUT_60_SECONDS;
public enum AssistantMessageOutputType {
entire,
partial
}
}

View File

@@ -8,6 +8,8 @@ import java.util.function.Function;
public interface Transport {
TransportOptions getTransportOptions();
boolean isReading();
void start() throws IOException;
void close() throws IOException;

View File

@@ -4,6 +4,7 @@ import java.util.List;
import java.util.Map;
import com.alibaba.qwen.code.cli.protocol.data.PermissionMode;
import com.alibaba.qwen.code.cli.utils.Timeout;
public class TransportOptions implements Cloneable {
private String pathToQwenExecutable;
@@ -17,120 +18,154 @@ public class TransportOptions implements Cloneable {
private List<String> allowedTools;
private String authType;
private Boolean includePartialMessages;
private Long turnTimeoutMs;
private Long messageTimeoutMs;
private Boolean skillsEnable;
private Timeout turnTimeout;
private Timeout messageTimeout;
private String resumeSessionId;
private List<String> otherOptions;
public String getPathToQwenExecutable() {
return pathToQwenExecutable;
}
public void setPathToQwenExecutable(String pathToQwenExecutable) {
public TransportOptions setPathToQwenExecutable(String pathToQwenExecutable) {
this.pathToQwenExecutable = pathToQwenExecutable;
return this;
}
public String getCwd() {
return cwd;
}
public void setCwd(String cwd) {
public TransportOptions setCwd(String cwd) {
this.cwd = cwd;
return this;
}
public String getModel() {
return model;
}
public void setModel(String model) {
public TransportOptions setModel(String model) {
this.model = model;
return this;
}
public PermissionMode getPermissionMode() {
return permissionMode;
}
public void setPermissionMode(PermissionMode permissionMode) {
public TransportOptions setPermissionMode(PermissionMode permissionMode) {
this.permissionMode = permissionMode;
return this;
}
public Map<String, String> getEnv() {
return env;
}
public void setEnv(Map<String, String> env) {
public TransportOptions setEnv(Map<String, String> env) {
this.env = env;
return this;
}
public Integer getMaxSessionTurns() {
return maxSessionTurns;
}
public void setMaxSessionTurns(Integer maxSessionTurns) {
public TransportOptions setMaxSessionTurns(Integer maxSessionTurns) {
this.maxSessionTurns = maxSessionTurns;
return this;
}
public List<String> getCoreTools() {
return coreTools;
}
public void setCoreTools(List<String> coreTools) {
public TransportOptions setCoreTools(List<String> coreTools) {
this.coreTools = coreTools;
return this;
}
public List<String> getExcludeTools() {
return excludeTools;
}
public void setExcludeTools(List<String> excludeTools) {
public TransportOptions setExcludeTools(List<String> excludeTools) {
this.excludeTools = excludeTools;
return this;
}
public List<String> getAllowedTools() {
return allowedTools;
}
public void setAllowedTools(List<String> allowedTools) {
public TransportOptions setAllowedTools(List<String> allowedTools) {
this.allowedTools = allowedTools;
return this;
}
public String getAuthType() {
return authType;
}
public void setAuthType(String authType) {
public TransportOptions setAuthType(String authType) {
this.authType = authType;
return this;
}
public Boolean getIncludePartialMessages() {
return includePartialMessages;
}
public void setIncludePartialMessages(Boolean includePartialMessages) {
public TransportOptions setIncludePartialMessages(Boolean includePartialMessages) {
this.includePartialMessages = includePartialMessages;
return this;
}
public Long getTurnTimeoutMs() {
return turnTimeoutMs;
public Boolean getSkillsEnable() {
return skillsEnable;
}
public void setTurnTimeoutMs(Long turnTimeoutMs) {
this.turnTimeoutMs = turnTimeoutMs;
public TransportOptions setSkillsEnable(Boolean skillsEnable) {
this.skillsEnable = skillsEnable;
return this;
}
public Long getMessageTimeoutMs() {
return messageTimeoutMs;
public Timeout getTurnTimeout() {
return turnTimeout;
}
public void setMessageTimeoutMs(Long messageTimeoutMs) {
this.messageTimeoutMs = messageTimeoutMs;
public TransportOptions setTurnTimeout(Timeout turnTimeout) {
this.turnTimeout = turnTimeout;
return this;
}
public Timeout getMessageTimeout() {
return messageTimeout;
}
public TransportOptions setMessageTimeout(Timeout messageTimeout) {
this.messageTimeout = messageTimeout;
return this;
}
public String getResumeSessionId() {
return resumeSessionId;
}
public void setResumeSessionId(String resumeSessionId) {
public TransportOptions setResumeSessionId(String resumeSessionId) {
this.resumeSessionId = resumeSessionId;
return this;
}
public List<String> getOtherOptions() {
return otherOptions;
}
public TransportOptions setOtherOptions(List<String> otherOptions) {
this.otherOptions = otherOptions;
return this;
}
@Override

View File

@@ -2,6 +2,8 @@ package com.alibaba.qwen.code.cli.transport.process;
import com.alibaba.qwen.code.cli.transport.Transport;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import com.alibaba.qwen.code.cli.utils.MyConcurrentUtils;
import com.alibaba.qwen.code.cli.utils.Timeout;
import org.apache.commons.lang3.exception.ContextedRuntimeException;
import org.slf4j.Logger;
@@ -14,29 +16,37 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.ProcessBuilder.Redirect;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
public class ProcessTransport implements Transport {
private static final Logger log = LoggerFactory.getLogger(ProcessTransport.class);
private final TransportOptions transportOptions;
protected Long turnTimeoutMs;
protected Long messageTimeoutMs;
protected Timeout turnTimeout;
protected Timeout messageTimeout;
protected Process process;
protected BufferedWriter processInput;
protected BufferedReader processOutput;
protected BufferedReader processError;
protected final Consumer<String> errorHandler;
private final AtomicBoolean reading = new AtomicBoolean(false);
public ProcessTransport() throws IOException {
this(new TransportOptions());
}
public ProcessTransport(TransportOptions transportOptions) throws IOException {
this(transportOptions, (line) -> log.error("process error: {}", line));
}
public ProcessTransport(TransportOptions transportOptions, Consumer<String> errorHandler) throws IOException {
this.transportOptions = transportOptions;
this.errorHandler = errorHandler;
start();
}
@@ -45,11 +55,16 @@ public class ProcessTransport implements Transport {
return transportOptions;
}
@Override
public boolean isReading() {
return reading.get();
}
@Override
public void start() throws IOException {
TransportOptionsAdapter transportOptionsAdapter = new TransportOptionsAdapter(transportOptions);
this.turnTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getTurnTimeoutMs();
this.messageTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getMessageTimeoutMs();
this.turnTimeout = transportOptionsAdapter.getHandledTransportOptions().getTurnTimeout();
this.messageTimeout = transportOptionsAdapter.getHandledTransportOptions().getMessageTimeout();
String[] commandArgs = transportOptionsAdapter.buildCommandArgs();
log.debug("trans to command args: {}", transportOptionsAdapter);
@@ -91,113 +106,87 @@ public class ProcessTransport implements Transport {
@Override
public String inputWaitForOneLine(String message) throws IOException, ExecutionException, InterruptedException, TimeoutException {
return inputWaitForOneLine(message, turnTimeoutMs);
return inputWaitForOneLine(message, turnTimeout);
}
private String inputWaitForOneLine(String message, long timeOutInMs)
private String inputWaitForOneLine(String message, Timeout timeOut)
throws IOException, TimeoutException, InterruptedException, ExecutionException {
inputNoWaitResponse(message);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
return processOutput.readLine();
} catch (IOException e) {
throw new ContextedRuntimeException("read line error", e)
.addContextValue("message", message);
}
});
try {
String line = future.get(timeOutInMs, TimeUnit.MILLISECONDS);
reading.set(true);
String line = MyConcurrentUtils.runAndWait(() -> {
try {
return processOutput.readLine();
} catch (IOException e) {
throw new ContextedRuntimeException("read line error", e)
.addContextValue("message", message);
}
}, timeOut);
log.info("inputWaitForOneLine result: {}", line);
return line;
} catch (TimeoutException e) {
future.cancel(true);
log.warn("read message timeout {}, canceled readOneLine task", timeOutInMs, e);
throw e;
} catch (InterruptedException e) {
future.cancel(true);
log.warn("interrupted task, canceled task", e);
throw e;
} catch (ExecutionException e) {
future.cancel(true);
log.warn("the readOneLine task execute error", e);
throw e;
} finally {
reading.set(false);
}
}
@Override
public void inputWaitForMultiLine(String message, Function<String, Boolean> callBackFunction) throws IOException {
inputWaitForMultiLine(message, callBackFunction, turnTimeoutMs);
inputWaitForMultiLine(message, callBackFunction, turnTimeout);
}
private void inputWaitForMultiLine(String message, Function<String, Boolean> callBackFunction, long timeOutInMs) throws IOException {
private void inputWaitForMultiLine(String message, Function<String, Boolean> callBackFunction, Timeout timeOut) throws IOException {
log.debug("input message for multiLine: {}", message);
inputNoWaitResponse(message);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> iterateOutput(callBackFunction));
try {
future.get(timeOutInMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
log.warn("read message timeout {}, canceled readMultiMessages task", timeOutInMs, e);
} catch (InterruptedException e) {
future.cancel(true);
log.warn("interrupted task, canceled task", e);
} catch (ExecutionException e) {
future.cancel(true);
log.warn("the readMultiMessages task execute error", e);
} catch (Exception e) {
future.cancel(true);
log.warn("other error");
}
MyConcurrentUtils.runAndWait(() -> iterateOutput(callBackFunction), timeOut);
}
@Override
public void inputNoWaitResponse(String message) throws IOException {
log.debug("input message to agent: {}", message);
log.debug("input message to process: {}", message);
processInput.write(message);
processInput.newLine();
processInput.flush();
}
private void startErrorReading() {
CompletableFuture.runAsync(() -> {
MyConcurrentUtils.asyncRun(() -> {
try {
String line;
while ((line = processError.readLine()) != null) {
System.err.println("错误: " + line);
}
} catch (Exception e) {
System.err.println("错误: " + e.getMessage());
}
});
}
private void iterateOutput(Function<String, Boolean> callBackFunction) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
for (String line = processOutput.readLine(); line != null; line = processOutput.readLine()) {
log.debug("read a message from agent {}", line);
if (callBackFunction.apply(line)) {
for (;;) {
final String line = processError.readLine();
if (line == null) {
break;
}
if (errorHandler != null) {
try {
MyConcurrentUtils.runAndWait(() -> errorHandler.accept(line), messageTimeout);
} catch (Exception e) {
log.warn("error handler error", e);
}
}
}
} catch (IOException e) {
throw new RuntimeException("read process output error", e);
log.warn("Failed read error {}, caused by {}", e.getMessage(), e.getCause(), e);
}
});
}, (e, t) -> log.warn("read error {}", t.getMessage(), t));
}
private void iterateOutput(Function<String, Boolean> callBackFunction) {
try {
future.get(messageTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("read message task interrupted", e);
future.cancel(true);
} catch (TimeoutException e) {
log.warn("Operation timed out", e);
future.cancel(true);
} catch (Exception e) {
future.cancel(true);
log.warn("Operation error", e);
reading.set(true);
MyConcurrentUtils.runAndWait(() -> {
try {
for (String line = processOutput.readLine(); line != null; line = processOutput.readLine()) {
log.debug("read a message from process {}", line);
if (callBackFunction.apply(line)) {
break;
}
}
} catch (IOException e) {
throw new RuntimeException("read process output error", e);
}
}, messageTimeout);
} finally {
reading.set(false);
}
}
}

View File

@@ -1,6 +1,7 @@
package com.alibaba.qwen.code.cli.transport.process;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import com.alibaba.qwen.code.cli.utils.Timeout;
import org.apache.commons.lang3.StringUtils;
@@ -11,11 +12,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
class TransportOptionsAdapter {
TransportOptions transportOptions;
private static final Long DEFAULT_TURN_TIMEOUT_MS = 1000 * 60 * 30L;
private static final Long DEFAULT_MESSAGE_TIMEOUT_MS = 1000 * 60 * 3L;
private static final Timeout DEFAULT_TURN_TIMEOUT = new Timeout(1000 * 60 * 30L, TimeUnit.MILLISECONDS);
private static final Timeout DEFAULT_MESSAGE_TIMEOUT = new Timeout(1000 * 60 * 3L, TimeUnit.MILLISECONDS);
TransportOptionsAdapter(TransportOptions userTransportOptions) {
transportOptions = addDefaultTransportOptions(userTransportOptions);
@@ -73,10 +75,18 @@ class TransportOptionsAdapter {
args.add("--include-partial-messages");
}
if (transportOptions.getSkillsEnable() != null && transportOptions.getSkillsEnable()) {
args.add("--experimental-skills");
}
if (StringUtils.isNotBlank(transportOptions.getResumeSessionId())) {
args.add("--resume");
args.add(transportOptions.getResumeSessionId());
}
if (transportOptions.getOtherOptions() != null) {
args.addAll(transportOptions.getOtherOptions());
}
return args.toArray(new String[] {});
}
@@ -97,12 +107,12 @@ class TransportOptionsAdapter {
Optional.ofNullable(transportOptions.getEnv()).ifPresent(env::putAll);
transportOptions.setEnv(env);
if (transportOptions.getTurnTimeoutMs() == null) {
transportOptions.setTurnTimeoutMs(DEFAULT_TURN_TIMEOUT_MS);
if (transportOptions.getTurnTimeout() == null) {
transportOptions.setTurnTimeout(DEFAULT_TURN_TIMEOUT);
}
if (transportOptions.getMessageTimeoutMs() == null) {
transportOptions.setMessageTimeoutMs(DEFAULT_MESSAGE_TIMEOUT_MS);
if (transportOptions.getMessageTimeout() == null) {
transportOptions.setMessageTimeout(DEFAULT_MESSAGE_TIMEOUT);
}
return transportOptions;
}

View File

@@ -0,0 +1,65 @@
package com.alibaba.qwen.code.cli.utils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyConcurrentUtils {
private static final Logger log = LoggerFactory.getLogger(MyConcurrentUtils.class);
public static void runAndWait(Runnable runnable, Timeout timeOut) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
runnable.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, ThreadPoolConfig.getExecutor());
try {
future.get(timeOut.getValue(), timeOut.getUnit());
} catch (InterruptedException e) {
log.warn("task interrupted", e);
future.cancel(true);
} catch (TimeoutException e) {
log.warn("Operation timed out", e);
future.cancel(true);
} catch (Exception e) {
future.cancel(true);
log.warn("Operation error", e);
}
}
public static <T> T runAndWait(Supplier<T> supplier, Timeout timeOut)
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
return supplier.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, ThreadPoolConfig.getExecutor());
try {
return future.get(timeOut.getValue(), timeOut.getUnit());
} catch (TimeoutException | InterruptedException | ExecutionException e) {
future.cancel(true);
throw e;
}
}
public static void asyncRun(Runnable runnable, BiConsumer<Void, Throwable> errorCallback) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
runnable.run();
} catch (Exception e) {
log.warn("async task error", e);
}
}, ThreadPoolConfig.getExecutor());
future.whenComplete(errorCallback);
}
}

View File

@@ -0,0 +1,43 @@
package com.alibaba.qwen.code.cli.utils;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class ThreadPoolConfig {
private static final ThreadPoolExecutor defaultExecutor = new ThreadPoolExecutor(
10, 30, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(300),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "qwen_code_cli-pool-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
private static Supplier<ThreadPoolExecutor> executorSupplier;
public static void setExecutorSupplier(Supplier<ThreadPoolExecutor> executorSupplier) {
ThreadPoolConfig.executorSupplier = executorSupplier;
}
public static ExecutorService getExecutor() {
return Optional.ofNullable(executorSupplier).map(s -> {
try {
return s.get();
} catch (Exception e) {
return defaultExecutor;
}
}).orElse(defaultExecutor);
}
}

View File

@@ -0,0 +1,27 @@
package com.alibaba.qwen.code.cli.utils;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.Validate;
public class Timeout {
private final Long value;
private final TimeUnit unit;
public Timeout(Long value, TimeUnit unit) {
Validate.notNull(value, "value can not be null");
Validate.notNull(unit, "unit can not be null");
this.value = value;
this.unit = unit;
}
public Long getValue() {
return value;
}
public TimeUnit getUnit() {
return unit;
}
public static final Timeout TIMEOUT_60_SECONDS = new Timeout(60L, TimeUnit.SECONDS);
public static final Timeout TIMEOUT_30_MINUTES = new Timeout(60L, TimeUnit.MINUTES);
}

View File

@@ -2,21 +2,18 @@ package com.alibaba.qwen.code.cli;
import java.util.List;
import com.alibaba.qwen.code.cli.protocol.message.Message;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.*;
class QwenCliTest {
class QwenCodeCliTest {
private static final Logger log = LoggerFactory.getLogger(QwenCliTest.class);
private static final Logger log = LoggerFactory.getLogger(QwenCodeCliTest.class);
@Test
void query() {
List<Message> result = QwenCli.query("hello world");
void simpleQuery() {
List<String> result = QwenCodeCli.simpleQuery("hello world");
log.info("result: {}", result);
assertNotNull(result);
}

View File

@@ -1,11 +1,14 @@
package com.alibaba.qwen.code.cli.session;
import java.io.IOException;
import java.util.List;
import com.alibaba.fastjson2.JSON;
import com.alibaba.qwen.code.cli.protocol.data.AssistantContent;
import com.alibaba.qwen.code.cli.protocol.data.PermissionMode;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Allow;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior;
import com.alibaba.qwen.code.cli.protocol.data.behavior.Behavior.Operation;
import com.alibaba.qwen.code.cli.protocol.message.SDKResultMessage;
import com.alibaba.qwen.code.cli.protocol.message.SDKSystemMessage;
import com.alibaba.qwen.code.cli.protocol.message.assistant.SDKAssistantMessage;
@@ -17,6 +20,7 @@ import com.alibaba.qwen.code.cli.session.event.SessionEventSimpleConsumers;
import com.alibaba.qwen.code.cli.session.exception.SessionControlException;
import com.alibaba.qwen.code.cli.session.exception.SessionSendPromptException;
import com.alibaba.qwen.code.cli.transport.Transport;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import com.alibaba.qwen.code.cli.transport.process.ProcessTransport;
import org.apache.commons.lang3.StringUtils;
@@ -28,18 +32,34 @@ class SessionTest {
private static final Logger log = LoggerFactory.getLogger(SessionTest.class);
@Test
void partialSendPromptSuccessfully() throws IOException, SessionControlException, SessionSendPromptException {
Transport transport = new ProcessTransport(new TransportOptions().setIncludePartialMessages(true));
Session session = new Session(transport);
session.sendPrompt("in the dir src/test/temp/, create file empty file test.touch", new SessionEventSimpleConsumers() {
@Override
public void onAssistantMessageIncludePartial(Session session, List<AssistantContent> assistantContents,
AssistantMessageOutputType assistantMessageOutputType) {
log.info("onAssistantMessageIncludePartial: {}", JSON.toJSONString(assistantContents));
}
}.setDefaultPermissionOperation(Operation.allow));
}
@Test
void setPermissionModeSuccessfully() throws IOException, SessionControlException, SessionSendPromptException {
Transport transport = new ProcessTransport();
Session session = new Session(transport);
session.setPermissionMode(PermissionMode.YOLO);
log.info(session.setPermissionMode(PermissionMode.YOLO).map(s -> s ? "setPermissionMode 1 success" : "setPermissionMode 1 error")
.orElse("setPermissionMode 1 unknown"));
session.sendPrompt("in the dir src/test/temp/, create file empty file test.touch", new SessionEventSimpleConsumers());
session.setPermissionMode(PermissionMode.PLAN);
log.info(session.setPermissionMode(PermissionMode.PLAN).map(s -> s ? "setPermissionMode 2 success" : "setPermissionMode 2 error")
.orElse("setPermissionMode 2 unknown"));
session.sendPrompt("rename test.touch to test_rename.touch", new SessionEventSimpleConsumers());
session.setPermissionMode(PermissionMode.AUTO_EDIT);
log.info(session.setPermissionMode(PermissionMode.AUTO_EDIT).map(s -> s ? "setPermissionMode 3 success" : "setPermissionMode 3 error")
.orElse("setPermissionMode 3 unknown"));
session.sendPrompt("rename test.touch to test_rename.touch", new SessionEventSimpleConsumers());
session.sendPrompt("rename test.touch to test_rename.touch again user will allow", new SessionEventSimpleConsumers() {
@@ -57,19 +77,19 @@ class SessionTest {
Transport transport = new ProcessTransport();
Session session = new Session(transport);
session.setModel("qwen3-coder-flash");
log.info(session.setModel("qwen3-coder-flash").map(s -> s ? "setModel 1 success" : "setModel 1 error").orElse("setModel 1 unknown"));
writeSplitLine("setModel 1 end");
session.sendPrompt("hello world", new SessionEventSimpleConsumers());
writeSplitLine("prompt 1 end");
session.setModel("qwen3-coder-plus");
log.info(session.setModel("qwen3-coder-plus").map(s -> s ? "setModel 2 success" : "setModel 2 error").orElse("setModel 2 unknown"));
writeSplitLine("setModel 1 end");
session.sendPrompt("查看下当前目录有多少个文件", new SessionEventSimpleConsumers());
writeSplitLine("prompt 2 end");
session.setModel("qwen3-max");
log.info(session.setModel("qwen3-max").map(s -> s ? "setModel 3 success" : "setModel 3 error").orElse("setModel 3 unknown"));
writeSplitLine("setModel 1 end");
session.sendPrompt("查看下当前目录有多少个xml文件", new SessionEventSimpleConsumers());
@@ -129,12 +149,17 @@ class SessionTest {
}
public void writeSplitLine(String line) {
log.info("{} {}",line, StringUtils.repeat("=", 300));
log.info("{} {}", line, StringUtils.repeat("=", 300));
}
@Test
void testJSON() {
String json = "{\"type\":\"assistant\",\"uuid\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\",\"session_id\":\"166badc0-e6d3-4978-ae47-4ccd51c468ef\",\"message\":{\"content\":[{\"text\":\"Hello! How can I help you with the Qwen Code SDK for Java today?\",\"type\":\"text\"}],\"id\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\",\"model\":\"qwen3-coder-plus\",\"role\":\"assistant\",\"type\":\"message\",\"usage\":{\"cache_read_input_tokens\":12766,\"input_tokens\":12770,\"output_tokens\":17,\"total_tokens\":12787}}}";
String json
= "{\"type\":\"assistant\",\"uuid\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\","
+ "\"session_id\":\"166badc0-e6d3-4978-ae47-4ccd51c468ef\",\"message\":{\"content\":[{\"text\":\"Hello! How can I help you with the"
+ " Qwen Code SDK for Java today?\",\"type\":\"text\"}],\"id\":\"ed8374fe-a4eb-4fc0-9780-9bd2fd831cda\","
+ "\"model\":\"qwen3-coder-plus\",\"role\":\"assistant\",\"type\":\"message\",\"usage\":{\"cache_read_input_tokens\":12766,"
+ "\"input_tokens\":12770,\"output_tokens\":17,\"total_tokens\":12787}}}";
SDKAssistantMessage assistantMessage = JSON.parseObject(json, SDKAssistantMessage.class);
log.info("the assistantMessage: {}", assistantMessage);
}