From 68628bf9520d450b7281ec40b319d9c133004d31 Mon Sep 17 00:00:00 2001 From: skyfire Date: Wed, 24 Dec 2025 20:45:17 +0800 Subject: [PATCH] add ProcessTransport --- packages/sdk-java/checkstyle.xml | 12 +- packages/sdk-java/pom.xml | 8 +- .../code/cli/transport/ProcessTransport.java | 10 - .../code/cli/transport/TransportOptions.java | 66 +++---- .../transport/process/ProcessTransport.java | 182 ++++++++++++++++++ .../process/TransportOptionsAdapter.java | 107 ++++++++++ .../process/ProcessTransportTest.java | 18 ++ 7 files changed, 347 insertions(+), 56 deletions(-) delete mode 100644 packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/ProcessTransport.java create mode 100644 packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java create mode 100644 packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java create mode 100644 packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransportTest.java diff --git a/packages/sdk-java/checkstyle.xml b/packages/sdk-java/checkstyle.xml index fa316ec72..c67c1319f 100644 --- a/packages/sdk-java/checkstyle.xml +++ b/packages/sdk-java/checkstyle.xml @@ -96,12 +96,12 @@ - - - - - - + + + + + + diff --git a/packages/sdk-java/pom.xml b/packages/sdk-java/pom.xml index 0ec6bb888..1d1c7c25a 100644 --- a/packages/sdk-java/pom.xml +++ b/packages/sdk-java/pom.xml @@ -26,7 +26,7 @@ UTF-8 3.6.0 5.14.1 - 1.5.23 + 1.3.16 @@ -45,7 +45,11 @@ ch.qos.logback logback-classic ${logback-classic.version} - provided + + + org.apache.commons + commons-lang3 + 3.20.0 org.junit.jupiter diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/ProcessTransport.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/ProcessTransport.java deleted file mode 100644 index 89f03f1b1..000000000 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/ProcessTransport.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.alibaba.qwen.code.cli.transport; - -public class ProcessTransport { - Process process; - TransportOptions transportOptions; - - public ProcessTransport(TransportOptions transportOptions) { - this.transportOptions = transportOptions; - } -} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java index ca4cb83f9..b64df2ec6 100644 --- a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/TransportOptions.java @@ -2,24 +2,21 @@ package com.alibaba.qwen.code.cli.transport; import java.util.List; import java.util.Map; -import java.util.function.Consumer; -public class TransportOptions { +public class TransportOptions implements Cloneable { private String pathToQwenExecutable; private String cwd; private String model; private PermissionMode permissionMode; private Map env; - private Object abortController; // AbortController in JavaScript does not have a direct Java equivalent - private Boolean debug; - private Consumer stderr; // Equivalent to (message: string) => void - private String logLevel; // Can be 'debug', 'info', 'warn', or 'error' private Integer maxSessionTurns; private List coreTools; private List excludeTools; private List allowedTools; private String authType; private Boolean includePartialMessages; + private Long turnTimeoutMs; + private Long messageTimeoutMs; public String getPathToQwenExecutable() { return pathToQwenExecutable; @@ -61,38 +58,6 @@ public class TransportOptions { this.env = env; } - public Object getAbortController() { - return abortController; - } - - public void setAbortController(Object abortController) { - this.abortController = abortController; - } - - public Boolean getDebug() { - return debug; - } - - public void setDebug(Boolean debug) { - this.debug = debug; - } - - public Consumer getStderr() { - return stderr; - } - - public void setStderr(Consumer stderr) { - this.stderr = stderr; - } - - public String getLogLevel() { - return logLevel; - } - - public void setLogLevel(String logLevel) { - this.logLevel = logLevel; - } - public Integer getMaxSessionTurns() { return maxSessionTurns; } @@ -140,4 +105,29 @@ public class TransportOptions { public void setIncludePartialMessages(Boolean includePartialMessages) { this.includePartialMessages = includePartialMessages; } + + public Long getTurnTimeoutMs() { + return turnTimeoutMs; + } + + public void setTurnTimeoutMs(Long turnTimeoutMs) { + this.turnTimeoutMs = turnTimeoutMs; + } + + public Long getMessageTimeoutMs() { + return messageTimeoutMs; + } + + public void setMessageTimeoutMs(Long messageTimeoutMs) { + this.messageTimeoutMs = messageTimeoutMs; + } + + @Override + public TransportOptions clone() { + try { + return (TransportOptions) super.clone(); + } catch (CloneNotSupportedException e) { + throw new AssertionError(); + } + } } diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java new file mode 100644 index 000000000..3adc1072b --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransport.java @@ -0,0 +1,182 @@ +package com.alibaba.qwen.code.cli.transport.process; + +import com.alibaba.qwen.code.cli.transport.TransportOptions; + +import org.apache.commons.lang3.exception.ContextedRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.lang.ProcessBuilder.Redirect; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class ProcessTransport { + private static final Logger log = LoggerFactory.getLogger(ProcessTransport.class); + TransportOptionsAdapter transportOptionsAdapter; + + protected final Long turnTimeoutMs; + protected final Long messageTimeoutMs; + + protected Process process; + protected BufferedWriter processInput; + protected BufferedReader processOutput; + protected BufferedReader processError; + + public ProcessTransport(TransportOptions transportOptions) throws IOException { + this.transportOptionsAdapter = new TransportOptionsAdapter(transportOptions); + turnTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getTurnTimeoutMs(); + messageTimeoutMs = transportOptionsAdapter.getHandledTransportOptions().getMessageTimeoutMs(); + start(); + } + + protected void start() throws IOException { + String[] commandArgs = transportOptionsAdapter.buildCommandArgs(); + log.debug("trans to command args: {}", transportOptionsAdapter); + + ProcessBuilder processBuilder = new ProcessBuilder(commandArgs) + .redirectOutput(Redirect.PIPE) + .redirectInput(Redirect.PIPE) + .redirectError(Redirect.PIPE) + .redirectErrorStream(false) + .directory(new File(transportOptionsAdapter.getCwd())); + + process = processBuilder.start(); + processInput = new BufferedWriter(new OutputStreamWriter(process.getOutputStream())); + processOutput = new BufferedReader(new InputStreamReader(process.getInputStream())); + processError = new BufferedReader(new InputStreamReader(process.getErrorStream())); + startErrorReading(); + } + + public void close() throws IOException { + if (processInput != null) { + processInput.close(); + } + if (processOutput != null) { + processOutput.close(); + } + if (processError != null) { + processError.close(); + } + if (process != null) { + process.destroy(); + } + } + + public String inputWaitForOneLine(String message) throws IOException, ExecutionException, InterruptedException, TimeoutException { + return inputWaitForOneLine(message, turnTimeoutMs); + } + + private String inputWaitForOneLine(String message, long timeOutInMs) + throws IOException, TimeoutException, InterruptedException, ExecutionException { + inputNoWaitResponse(message); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + return processOutput.readLine(); + } catch (IOException e) { + throw new ContextedRuntimeException("read line error", e) + .addContextValue("message", message); + } + }); + + try { + String line = future.get(timeOutInMs, TimeUnit.MILLISECONDS); + log.info("inputWaitForOneLine result: {}", line); + return line; + } catch (TimeoutException e) { + future.cancel(true); + log.warn("read message timeout {}, canceled readOneLine task", timeOutInMs, e); + throw e; + } catch (InterruptedException e) { + future.cancel(true); + log.warn("interrupted task, canceled task", e); + throw e; + } catch (ExecutionException e) { + future.cancel(true); + log.warn("the readOneLine task execute error", e); + throw e; + } + } + + public void inputWaitForMultiLine(String message, Function callBackFunction) throws IOException { + inputWaitForMultiLine(message, callBackFunction, turnTimeoutMs); + } + + private void inputWaitForMultiLine(String message, Function callBackFunction, long timeOutInMs) throws IOException { + log.debug("input message for multiLine: {}", message); + inputNoWaitResponse(message); + + CompletableFuture future = CompletableFuture.runAsync(() -> iterateOutput(callBackFunction)); + try { + future.get(timeOutInMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + future.cancel(true); + log.warn("read message timeout {}, canceled readMultiMessages task", timeOutInMs, e); + } catch (InterruptedException e) { + future.cancel(true); + log.warn("interrupted task, canceled task", e); + } catch (ExecutionException e) { + future.cancel(true); + log.warn("the readMultiMessages task execute error", e); + } catch (Exception e) { + future.cancel(true); + log.warn("other error"); + } + } + + public void inputNoWaitResponse(String message) throws IOException { + log.debug("input message to agent: {}", message); + processInput.write(message); + processInput.newLine(); + processInput.flush(); + } + + private void startErrorReading() { + CompletableFuture.runAsync(() -> { + try { + String line; + while ((line = processError.readLine()) != null) { + System.err.println("错误: " + line); + } + } catch (Exception e) { + System.err.println("错误: " + e.getMessage()); + } + }); + } + + private void iterateOutput(Function callBackFunction) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + for (String line = processOutput.readLine(); line != null; line = processOutput.readLine()) { + log.debug("read a message from agent {}", line); + if (callBackFunction.apply(line)) { + break; + } + } + } catch (IOException e) { + throw new RuntimeException("read process output error", e); + } + }); + + try { + future.get(messageTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.warn("read message task interrupted", e); + future.cancel(true); + } catch (TimeoutException e) { + log.warn("Operation timed out", e); + future.cancel(true); + } catch (Exception e) { + future.cancel(true); + log.warn("Operation error", e); + } + } +} diff --git a/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java new file mode 100644 index 000000000..e22f2fd27 --- /dev/null +++ b/packages/sdk-java/src/main/java/com/alibaba/qwen/code/cli/transport/process/TransportOptionsAdapter.java @@ -0,0 +1,107 @@ +package com.alibaba.qwen.code.cli.transport.process; + +import com.alibaba.qwen.code.cli.transport.TransportOptions; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +class TransportOptionsAdapter { + TransportOptions transportOptions; + private static final Long DEFAULT_TURN_TIMEOUT_MS = 1000 * 60 * 30L; + private static final Long DEFAULT_MESSAGE_TIMEOUT_MS = 1000 * 60 * 3L; + + TransportOptionsAdapter(TransportOptions userTransportOptions) { + transportOptions = addDefaultTransportOptions(userTransportOptions); + } + + TransportOptions getHandledTransportOptions() { + return transportOptions; + } + + String getCwd() { + return transportOptions.getCwd(); + } + + String[] buildCommandArgs() { + List args = new ArrayList<>( + Arrays.asList(transportOptions.getPathToQwenExecutable(), "--input-format", "stream-json", "--output-format", + "stream-json", "--channel=SDK")); + + if (StringUtils.isNotBlank(transportOptions.getModel())) { + args.add("--model"); + args.add(transportOptions.getModel()); + } + + if (StringUtils.isNotBlank(transportOptions.getCwd())) { + args.add("--cwd"); + args.add(transportOptions.getCwd()); + } + + if (transportOptions.getPermissionMode() != null) { + args.add("--permission-mode"); + args.add(transportOptions.getPermissionMode().getValue()); + } + + if (transportOptions.getMaxSessionTurns() != null) { + args.add("--max-session-turns"); + args.add(transportOptions.getMaxSessionTurns().toString()); + } + + if (transportOptions.getCoreTools() != null && !transportOptions.getCoreTools().isEmpty()) { + args.add("--core-tools"); + args.add(String.join(",", transportOptions.getCoreTools())); + } + + if (transportOptions.getExcludeTools() != null && !transportOptions.getExcludeTools().isEmpty()) { + args.add("--exclude-tools"); + args.add(String.join(",", transportOptions.getExcludeTools())); + } + + if (transportOptions.getAllowedTools() != null && !transportOptions.getAllowedTools().isEmpty()) { + args.add("--allowed-tools"); + args.add(String.join(",", transportOptions.getAllowedTools())); + } + + if (StringUtils.isNotBlank(transportOptions.getAuthType())) { + args.add("--auth-type"); + args.add(transportOptions.getAuthType()); + } + + if (transportOptions.getIncludePartialMessages() != null && transportOptions.getIncludePartialMessages()) { + args.add("--include-partial-messages"); + } + return args.toArray(new String[] {}); + } + + private TransportOptions addDefaultTransportOptions(TransportOptions userTransportOptions) { + TransportOptions transportOptions = userTransportOptions.clone(); + + if (StringUtils.isBlank(transportOptions.getPathToQwenExecutable())) { + transportOptions.setPathToQwenExecutable("qwen"); + } + + if (StringUtils.isBlank(transportOptions.getCwd())) { + transportOptions.setCwd(new File("").getAbsolutePath()); + } + + Map env = new HashMap<>(System.getenv()); + Optional.ofNullable(transportOptions.getEnv()).ifPresent(env::putAll); + transportOptions.setEnv(env); + + if (transportOptions.getTurnTimeoutMs() == null) { + transportOptions.setTurnTimeoutMs(DEFAULT_TURN_TIMEOUT_MS); + } + + if (transportOptions.getMessageTimeoutMs() == null) { + transportOptions.setMessageTimeoutMs(DEFAULT_MESSAGE_TIMEOUT_MS); + } + return transportOptions; + } +} diff --git a/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransportTest.java b/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransportTest.java new file mode 100644 index 000000000..af2a8d363 --- /dev/null +++ b/packages/sdk-java/src/test/java/com/alibaba/qwen/code/cli/transport/process/ProcessTransportTest.java @@ -0,0 +1,18 @@ +package com.alibaba.qwen.code.cli.transport.process; + +import java.io.IOException; + +import com.alibaba.qwen.code.cli.transport.TransportOptions; + +import org.junit.jupiter.api.Test; + +class ProcessTransportTest { + + @Test + void shouldStartAndCloseSuccessfully() throws IOException { + TransportOptions transportOptions = new TransportOptions(); + ProcessTransport processTransport = new ProcessTransport(transportOptions); + processTransport.close(); + } + +}