add ProcessTransport

This commit is contained in:
skyfire
2025-12-24 20:45:17 +08:00
parent e5efad89e0
commit 68628bf952
7 changed files with 347 additions and 56 deletions

View File

@@ -96,12 +96,12 @@
<module name="RedundantImport" />
<module name="UnusedImports" />
<module name="ImportOrder">
<property name="groups" value="*,javax,java" />
<property name="separated" value="true" />
<property name="option" value="bottom" />
<property name="sortStaticImportsAlphabetically" value="true" />
</module>
<!-- <module name="ImportOrder">-->
<!-- <property name="groups" value="*,javax,java" />-->
<!-- <property name="separated" value="true" />-->
<!-- <property name="option" value="bottom" />-->
<!-- <property name="sortStaticImportsAlphabetically" value="true" />-->
<!-- </module>-->
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true" />

View File

@@ -26,7 +26,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<checkstyle-maven-plugin.version>3.6.0</checkstyle-maven-plugin.version>
<junit5.version>5.14.1</junit5.version>
<logback-classic.version>1.5.23</logback-classic.version>
<logback-classic.version>1.3.16</logback-classic.version>
</properties>
<dependencyManagement>
@@ -45,7 +45,11 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback-classic.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.20.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@@ -1,10 +0,0 @@
package com.alibaba.qwen.code.cli.transport;
public class ProcessTransport {
Process process;
TransportOptions transportOptions;
public ProcessTransport(TransportOptions transportOptions) {
this.transportOptions = transportOptions;
}
}

View File

@@ -2,24 +2,21 @@ package com.alibaba.qwen.code.cli.transport;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
public class TransportOptions {
public class TransportOptions implements Cloneable {
private String pathToQwenExecutable;
private String cwd;
private String model;
private PermissionMode permissionMode;
private Map<String, String> env;
private Object abortController; // AbortController in JavaScript does not have a direct Java equivalent
private Boolean debug;
private Consumer<String> stderr; // Equivalent to (message: string) => void
private String logLevel; // Can be 'debug', 'info', 'warn', or 'error'
private Integer maxSessionTurns;
private List<String> coreTools;
private List<String> excludeTools;
private List<String> allowedTools;
private String authType;
private Boolean includePartialMessages;
private Long turnTimeoutMs;
private Long messageTimeoutMs;
public String getPathToQwenExecutable() {
return pathToQwenExecutable;
@@ -61,38 +58,6 @@ public class TransportOptions {
this.env = env;
}
public Object getAbortController() {
return abortController;
}
public void setAbortController(Object abortController) {
this.abortController = abortController;
}
public Boolean getDebug() {
return debug;
}
public void setDebug(Boolean debug) {
this.debug = debug;
}
public Consumer<String> getStderr() {
return stderr;
}
public void setStderr(Consumer<String> stderr) {
this.stderr = stderr;
}
public String getLogLevel() {
return logLevel;
}
public void setLogLevel(String logLevel) {
this.logLevel = logLevel;
}
public Integer getMaxSessionTurns() {
return maxSessionTurns;
}
@@ -140,4 +105,29 @@ public class TransportOptions {
public void setIncludePartialMessages(Boolean includePartialMessages) {
this.includePartialMessages = includePartialMessages;
}
public Long getTurnTimeoutMs() {
return turnTimeoutMs;
}
public void setTurnTimeoutMs(Long turnTimeoutMs) {
this.turnTimeoutMs = turnTimeoutMs;
}
public Long getMessageTimeoutMs() {
return messageTimeoutMs;
}
public void setMessageTimeoutMs(Long messageTimeoutMs) {
this.messageTimeoutMs = messageTimeoutMs;
}
@Override
public TransportOptions clone() {
try {
return (TransportOptions) super.clone();
} catch (CloneNotSupportedException e) {
throw new AssertionError();
}
}
}

View File

@@ -0,0 +1,182 @@
package com.alibaba.qwen.code.cli.transport.process;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import org.apache.commons.lang3.exception.ContextedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.ProcessBuilder.Redirect;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
public class ProcessTransport {
private static final Logger log = LoggerFactory.getLogger(ProcessTransport.class);
TransportOptionsAdapter transportOptionsAdapter;
protected final Long turnTimeoutMs;
protected final Long messageTimeoutMs;
protected Process process;
protected BufferedWriter processInput;
protected BufferedReader processOutput;
protected BufferedReader processError;
public ProcessTransport(TransportOptions transportOptions) throws IOException {
this.transportOptionsAdapter = new TransportOptionsAdapter(transportOptions);
turnTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getTurnTimeoutMs();
messageTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getMessageTimeoutMs();
start();
}
protected void start() throws IOException {
String[] commandArgs = transportOptionsAdapter.buildCommandArgs();
log.debug("trans to command args: {}", transportOptionsAdapter);
ProcessBuilder processBuilder = new ProcessBuilder(commandArgs)
.redirectOutput(Redirect.PIPE)
.redirectInput(Redirect.PIPE)
.redirectError(Redirect.PIPE)
.redirectErrorStream(false)
.directory(new File(transportOptionsAdapter.getCwd()));
process = processBuilder.start();
processInput = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));
processOutput = new BufferedReader(new InputStreamReader(process.getInputStream()));
processError = new BufferedReader(new InputStreamReader(process.getErrorStream()));
startErrorReading();
}
public void close() throws IOException {
if (processInput != null) {
processInput.close();
}
if (processOutput != null) {
processOutput.close();
}
if (processError != null) {
processError.close();
}
if (process != null) {
process.destroy();
}
}
public String inputWaitForOneLine(String message) throws IOException, ExecutionException, InterruptedException, TimeoutException {
return inputWaitForOneLine(message, turnTimeoutMs);
}
private String inputWaitForOneLine(String message, long timeOutInMs)
throws IOException, TimeoutException, InterruptedException, ExecutionException {
inputNoWaitResponse(message);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
return processOutput.readLine();
} catch (IOException e) {
throw new ContextedRuntimeException("read line error", e)
.addContextValue("message", message);
}
});
try {
String line = future.get(timeOutInMs, TimeUnit.MILLISECONDS);
log.info("inputWaitForOneLine result: {}", line);
return line;
} catch (TimeoutException e) {
future.cancel(true);
log.warn("read message timeout {}, canceled readOneLine task", timeOutInMs, e);
throw e;
} catch (InterruptedException e) {
future.cancel(true);
log.warn("interrupted task, canceled task", e);
throw e;
} catch (ExecutionException e) {
future.cancel(true);
log.warn("the readOneLine task execute error", e);
throw e;
}
}
public void inputWaitForMultiLine(String message, Function<String, Boolean> callBackFunction) throws IOException {
inputWaitForMultiLine(message, callBackFunction, turnTimeoutMs);
}
private void inputWaitForMultiLine(String message, Function<String, Boolean> callBackFunction, long timeOutInMs) throws IOException {
log.debug("input message for multiLine: {}", message);
inputNoWaitResponse(message);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> iterateOutput(callBackFunction));
try {
future.get(timeOutInMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
log.warn("read message timeout {}, canceled readMultiMessages task", timeOutInMs, e);
} catch (InterruptedException e) {
future.cancel(true);
log.warn("interrupted task, canceled task", e);
} catch (ExecutionException e) {
future.cancel(true);
log.warn("the readMultiMessages task execute error", e);
} catch (Exception e) {
future.cancel(true);
log.warn("other error");
}
}
public void inputNoWaitResponse(String message) throws IOException {
log.debug("input message to agent: {}", message);
processInput.write(message);
processInput.newLine();
processInput.flush();
}
private void startErrorReading() {
CompletableFuture.runAsync(() -> {
try {
String line;
while ((line = processError.readLine()) != null) {
System.err.println("错误: " + line);
}
} catch (Exception e) {
System.err.println("错误: " + e.getMessage());
}
});
}
private void iterateOutput(Function<String, Boolean> callBackFunction) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
for (String line = processOutput.readLine(); line != null; line = processOutput.readLine()) {
log.debug("read a message from agent {}", line);
if (callBackFunction.apply(line)) {
break;
}
}
} catch (IOException e) {
throw new RuntimeException("read process output error", e);
}
});
try {
future.get(messageTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("read message task interrupted", e);
future.cancel(true);
} catch (TimeoutException e) {
log.warn("Operation timed out", e);
future.cancel(true);
} catch (Exception e) {
future.cancel(true);
log.warn("Operation error", e);
}
}
}

View File

@@ -0,0 +1,107 @@
package com.alibaba.qwen.code.cli.transport.process;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
class TransportOptionsAdapter {
TransportOptions transportOptions;
private static final Long DEFAULT_TURN_TIMEOUT_MS = 1000 * 60 * 30L;
private static final Long DEFAULT_MESSAGE_TIMEOUT_MS = 1000 * 60 * 3L;
TransportOptionsAdapter(TransportOptions userTransportOptions) {
transportOptions = addDefaultTransportOptions(userTransportOptions);
}
TransportOptions getHandledTransportOptions() {
return transportOptions;
}
String getCwd() {
return transportOptions.getCwd();
}
String[] buildCommandArgs() {
List<String> args = new ArrayList<>(
Arrays.asList(transportOptions.getPathToQwenExecutable(), "--input-format", "stream-json", "--output-format",
"stream-json", "--channel=SDK"));
if (StringUtils.isNotBlank(transportOptions.getModel())) {
args.add("--model");
args.add(transportOptions.getModel());
}
if (StringUtils.isNotBlank(transportOptions.getCwd())) {
args.add("--cwd");
args.add(transportOptions.getCwd());
}
if (transportOptions.getPermissionMode() != null) {
args.add("--permission-mode");
args.add(transportOptions.getPermissionMode().getValue());
}
if (transportOptions.getMaxSessionTurns() != null) {
args.add("--max-session-turns");
args.add(transportOptions.getMaxSessionTurns().toString());
}
if (transportOptions.getCoreTools() != null && !transportOptions.getCoreTools().isEmpty()) {
args.add("--core-tools");
args.add(String.join(",", transportOptions.getCoreTools()));
}
if (transportOptions.getExcludeTools() != null && !transportOptions.getExcludeTools().isEmpty()) {
args.add("--exclude-tools");
args.add(String.join(",", transportOptions.getExcludeTools()));
}
if (transportOptions.getAllowedTools() != null && !transportOptions.getAllowedTools().isEmpty()) {
args.add("--allowed-tools");
args.add(String.join(",", transportOptions.getAllowedTools()));
}
if (StringUtils.isNotBlank(transportOptions.getAuthType())) {
args.add("--auth-type");
args.add(transportOptions.getAuthType());
}
if (transportOptions.getIncludePartialMessages() != null && transportOptions.getIncludePartialMessages()) {
args.add("--include-partial-messages");
}
return args.toArray(new String[] {});
}
private TransportOptions addDefaultTransportOptions(TransportOptions userTransportOptions) {
TransportOptions transportOptions = userTransportOptions.clone();
if (StringUtils.isBlank(transportOptions.getPathToQwenExecutable())) {
transportOptions.setPathToQwenExecutable("qwen");
}
if (StringUtils.isBlank(transportOptions.getCwd())) {
transportOptions.setCwd(new File("").getAbsolutePath());
}
Map<String, String> env = new HashMap<>(System.getenv());
Optional.ofNullable(transportOptions.getEnv()).ifPresent(env::putAll);
transportOptions.setEnv(env);
if (transportOptions.getTurnTimeoutMs() == null) {
transportOptions.setTurnTimeoutMs(DEFAULT_TURN_TIMEOUT_MS);
}
if (transportOptions.getMessageTimeoutMs() == null) {
transportOptions.setMessageTimeoutMs(DEFAULT_MESSAGE_TIMEOUT_MS);
}
return transportOptions;
}
}

View File

@@ -0,0 +1,18 @@
package com.alibaba.qwen.code.cli.transport.process;
import java.io.IOException;
import com.alibaba.qwen.code.cli.transport.TransportOptions;
import org.junit.jupiter.api.Test;
class ProcessTransportTest {
@Test
void shouldStartAndCloseSuccessfully() throws IOException {
TransportOptions transportOptions = new TransportOptions();
ProcessTransport processTransport = new ProcessTransport(transportOptions);
processTransport.close();
}
}