diff --git a/docs/en/task/a2a.md b/docs/en/task/a2a.md index 4b4c5a712..f8d1d5e0a 100644 --- a/docs/en/task/a2a.md +++ b/docs/en/task/a2a.md @@ -290,6 +290,150 @@ agent.interrupt(Msg.builder() .build()); ``` +--- +## Achieving Highly Reliable Asynchronous Communication with Apache RocketMQ +Note: Apache RocketMQ must be deployed with either an open-source version or a commercial edition that supports the lightweight consumption model LiteTopic(The open-source version is expected to be released in January — stay tuned!) + +### Configure the client to use Apache RocketMQ as the communication channel + +```xml + + + org.apache.rocketmq + rocketmq-a2a + ${RELEASE.VERSION} + +``` +The client uses Apache RocketMQ to build A2aAgent + +```java +//Construct a RocketMQTransportConfig object to configure RocketMQTransport. +RocketMQTransportConfig rocketMQTransportConfig = new RocketMQTransportConfig(); +//Configure the Apache RocketMQ account +rocketMQTransportConfig.setAccessKey(accessKey); +//Configure the Apache RocketMQ password +rocketMQTransportConfig.setSecretKey(secretKey); +//Configure a LiteTopic to receive response messages +rocketMQTransportConfig.setWorkAgentResponseTopic(workAgentResponseTopic); +//Configure a consumer that subscribes to the LiteTopic +rocketMQTransportConfig.setWorkAgentResponseGroupID(workAgentResponseGroupID); +//Configure the namespace for Apache RocketMQ +rocketMQTransportConfig.setRocketMQNamespace(rocketMQNamespace); +rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient()); +//Build A2aAgentConfig using RocketMQTransport and rocketMQTransportConfig +A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build(); +//Parse the corresponding Agent service and build an A2aAgent +A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://127.0.0.1:10001").build()).build(); +``` + +| Parameter | Type | Description | Required | +|-----------|------|-----------|----| +| `accessKey` | String | RocketMQ AccessKey | NO | +| `secretKey` | String | RocketMQ SecretKey | NO | +| `workAgentResponseTopic` | String | LiteTopic | YES | +| `workAgentResponseGroupID` | String | The CID of the consumer subscribed to LiteTopic | YES | +| `rocketMQNamespace` | String | RocketMQ Namespace | NO | + +### The server exposes an Agent service externally over the Apache RocketMQ communication protocol. + +Construct the URL for the Apache RocketMQ communication protocol. + +```java +private static String buildRocketMQUrl(String rocketMQEndpoint, String rocketMQNamespace, String bizTopic) { + if (StringUtils.isEmpty(rocketMQEndpoint) || StringUtils.isEmpty(bizTopic)) { + throw new IllegalArgumentException( + "Invalid parameters for building RocketMQ URL: 'rocketMQEndpoint' and 'bizTopic' must not be empty. Please check your RocketMQ configuration." + ); + } + return "http://" + rocketMQEndpoint + "/" + rocketMQNamespace + "/" + bizTopic; +} +``` + +| Parameter | Type | Description | Required | +|-----------|------|-------------|----| +| `rocketmqEndpoint` | String | RocketMQ Endpoint | Yes | +| `rocketMQNamespace` | String | RocketMQ Namespace | NO | +| `bizTopic` | String | Normal Topic | Yes | + +The server externally exposes the AgentCard service + +```java +//Externally expose the AgentCard service based on RocketMQ communication +AgentInterface agentInterface = new AgentInterface(RocketMQA2AConstant.ROCKETMQ_PROTOCOL, buildRocketMQUrl()); +ConfigurableAgentCard agentCard = new ConfigurableAgentCard.Builder() + .url(buildRocketMQUrl()) + .preferredTransport(RocketMQA2AConstant.ROCKETMQ_PROTOCOL) + .additionalInterfaces(List.of(agentInterface)) + .description("An intelligent assistant enabling highly reliable asynchronous communication based on Apache RocketMQ.") + .build(); +//Configure the DASHSCOPE_API_KEY to invoke the LLM service +AgentApp agentApp = new AgentApp(agent(agentBuilder(dashScopeChatModel(DASHSCOPE_API_KEY)))); +agentApp.deployManager(LocalDeployManager.builder() + .protocolConfigs(List.of(new A2aProtocolConfig(agentCard, 60, 10))) + .port(10001) + .build()); +``` +```java +//Build a DashScopeChatModel to invoke the LLM service. +private static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) { + if (StringUtils.isEmpty(dashScopeApiKey)) { + throw new IllegalArgumentException( + "DashScope API key is empty. Please set the environment variable `AI_DASHSCOPE_API_KEY`." + ); + } + return DashScopeChatModel.builder() + .apiKey(dashScopeApiKey) + .modelName("qwen-max") + .stream(true) + .enableThinking(true) + .build(); +} +``` + +```java +//Build ReActAgent.Builder +private static ReActAgent.Builder agentBuilder(DashScopeChatModel model) { + return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("You are an example implementation of the A2A (Agent-to-Agent) protocol using RocketMQTransport. You can answer simple questions based on your internal knowledge."); +} +``` +```java +//Build AgentScopeAgentHandler +private static AgentScopeAgentHandler agent(ReActAssistant.Builder builder) { + return new AgentScopeAgentHandler() { + @Override + public boolean isHealthy() { + return true; + } + @Override + public Flux streamQuery(AgentRequest request, Object messages) { + ReActAgent agent = builder.build(); + StreamOptions streamOptions = StreamOptions.builder() + .eventTypes(EventType.REASONING, EventType.TOOL_RESULT) + .incremental(true) + .build(); + + if (messages instanceof List) { + return agent.stream((List) messages, streamOptions); + } else if (messages instanceof Msg) { + return agent.stream((Msg) messages, streamOptions); + } else { + Msg msg = Msg.builder().role(MsgRole.USER).build(); + return agent.stream(msg, streamOptions); + } + } + + @Override + public String getName() { + return builder.build().getName(); + } + + @Override + public String getDescription() { + return builder.build().getDescription(); + } + }; +} +``` --- ## More Resources @@ -299,4 +443,7 @@ agent.interrupt(Msg.builder() - **Nacos Quick Start**: https://nacos.io/docs/latest/quickstart/quick-start - **Nacos Java SDK**: https://nacos.io/docs/latest/manual/user/java-sdk/usage - **Nacos Java SDK Additional Configuration Parameters**: https://nacos.io/docs/latest/manual/user/java-sdk/properties -- **Nacos Community**: https://github.com/alibaba/nacos \ No newline at end of file +- **Nacos Community**: https://github.com/alibaba/nacos +- **A demonstration of an AgentScope intelligent agent application based on Apache RocketMQ**: https://github.com/agentscope-ai/agentscope-runtime-java/tree/main/examples/simple_agent_use_rocketmq_example +- **Apache RocketMQ Community** : https://github.com/apache/rocketmq +- **Apache RocketMQ A2A Asynchronous Communication Component** : https://github.com/apache/rocketmq-a2a diff --git a/docs/zh/task/a2a.md b/docs/zh/task/a2a.md index 52d6ad81e..061dd0ba3 100644 --- a/docs/zh/task/a2a.md +++ b/docs/zh/task/a2a.md @@ -290,6 +290,147 @@ agent.interrupt(Msg.builder() .build()); ``` +## 基于Apache RocketMQ实现高可靠的异步通信 +注: Apache RocketMQ 需支持轻量级消费模型LiteTopic的开源版本或商业版本(开源版本预计将在1月发布,敬请期待) + +### 客户端配置Apache RocketMQ作为通信通道 + +```xml + + + org.apache.rocketmq + rocketmq-a2a + ${RELEASE.VERSION} + +``` +客户端使用Apache RocketMQ 构建A2aAgent + +```java +//构建RocketMQTransportConfig对象用于配置RocketMQTransport +RocketMQTransportConfig rocketMQTransportConfig = new RocketMQTransportConfig(); +//配置Apache RocketMQ账号 +rocketMQTransportConfig.setAccessKey(accessKey); +//配置Apache RocketMQ密码 +rocketMQTransportConfig.setSecretKey(secretKey); +//配置接收响应结果的轻量级LiteTopic +rocketMQTransportConfig.setWorkAgentResponseTopic(workAgentResponseTopic); +//配置订阅轻量级LiteTopic的消费者CID +rocketMQTransportConfig.setWorkAgentResponseGroupID(workAgentResponseGroupID); +//配置Apache RocketMQ的命名空间 +rocketMQTransportConfig.setRocketMQNamespace(rocketMQNamespace); +rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient()); +//使用RocketMQTransport和rocketMQTransportConfig 构建A2aAgentConfig +A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build(); +//解析对应的Agent服务构建A2aAgent +A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://127.0.0.1:10001").build()).build(); +``` +| 参数 | 类型 | 描述 | 是否必填 | +|-------------------------------|--------|-------------------|------| +| `accessKey` | String | Apache RocketMQ账号 | 否 | +| `secretKey` | String | Apache RocketMQ密码 | 否 | +| `workAgentResponseTopic` | String | 轻量级LiteTopic | 是 | +| `workAgentResponseGroupID` | String | 轻量级消费者CID | 是 | +| `rocketMQNamespace` | String | Apache RocketMQ命名空间 | 否 | + +### 服务端对外开放基于Apache RocketMQ通信协议的Agent服务 +构建Apache RocketMQ通信协议的URL + +```java +private static String buildRocketMQUrl(String rocketMQEndpoint, String rocketMQNamespace, String bizTopic) { + if (StringUtils.isEmpty(rocketMQEndpoint) || StringUtils.isEmpty(bizTopic)) { + throw new IllegalArgumentException( + "Invalid parameters for building RocketMQ URL: 'rocketMQEndpoint' and 'bizTopic' must not be empty. Please check your RocketMQ configuration." + ); + } + return "http://" + rocketMQEndpoint + "/" + rocketMQNamespace + "/" + bizTopic; +} +``` + +| 参数 | 类型 | 描述 | 是否必填 | +|---------------------|--------|----------------------|------| +| `rocketMQEndpoint` | String | Apache RocketMQ服务接入点 | 是 | +| `rocketMQNamespace` | String | Apache RocketMQ命名空间 | 否 | +| `bizTopic` | String | 普通Topic | 是 | + +服务端对外开放Agent服务 + +```java +//对外开放基于Apache RocketMQ通信的AgentCard服务 +AgentInterface agentInterface = new AgentInterface(RocketMQA2AConstant.ROCKETMQ_PROTOCOL, buildRocketMQUrl()); +ConfigurableAgentCard agentCard = new ConfigurableAgentCard.Builder() + .url(buildRocketMQUrl()) + .preferredTransport(RocketMQA2AConstant.ROCKETMQ_PROTOCOL) + .additionalInterfaces(List.of(agentInterface)) + .description("基于Apache RocketMQ进行高可靠异步通信的智能助手") + .build(); +//配置DASHSCOPE_API_KEY以调用LLM服务 +AgentApp agentApp = new AgentApp(agent(agentBuilder(dashScopeChatModel(DASHSCOPE_API_KEY)))); +agentApp.deployManager(LocalDeployManager.builder() + .protocolConfigs(List.of(new A2aProtocolConfig(agentCard, 60, 10))) + .port(10001) + .build()); +``` + +```java +//构建DashScopeChatModel 用于调用LLM服务 +public static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) { + if (StringUtils.isEmpty(dashScopeApiKey)) { + throw new IllegalArgumentException( + "DashScope API key is empty. Please set the environment variable `AI_DASHSCOPE_API_KEY`." + ); + } + return DashScopeChatModel.builder() + .apiKey(dashScopeApiKey) + .modelName("qwen-max") + .stream(true) + .enableThinking(true) + .build(); +} +``` +```java +//构建ReActAgent.Builder +public static ReActAgent.Builder agentBuilder(DashScopeChatModel model) { + return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("你是一个基于 RocketMQTransport 实现的 A2A(Agent-to-Agent,智能体间)协议的示例。你可以根据自身内置知识回答简单问题。"); +} +``` +```java +//构建AgentScopeAgentHandler +public static AgentScopeAgentHandler agent(ReActAssistant.Builder builder) { + return new AgentScopeAgentHandler() { + @Override + public boolean isHealthy() { + return true; + } + @Override + public Flux streamQuery(AgentRequest request, Object messages) { + ReActAgent agent = builder.build(); + StreamOptions streamOptions = StreamOptions.builder() + .eventTypes(EventType.REASONING, EventType.TOOL_RESULT) + .incremental(true) + .build(); + + if (messages instanceof List) { + return agent.stream((List) messages, streamOptions); + } else if (messages instanceof Msg) { + return agent.stream((Msg) messages, streamOptions); + } else { + Msg msg = Msg.builder().role(MsgRole.USER).build(); + return agent.stream(msg, streamOptions); + } + } + + @Override + public String getName() { + return builder.build().getName(); + } + + @Override + public String getDescription() { + return builder.build().getDescription(); + } + }; +} +``` --- ## 更多资源 @@ -299,4 +440,7 @@ agent.interrupt(Msg.builder() - **Nacos 快速开始** : https://nacos.io/docs/latest/quickstart/quick-start - **Nacos Java SDK** : https://nacos.io/docs/latest/manual/user/java-sdk/usage - **Nacos Java SDK 更多配置参数** : https://nacos.io/docs/latest/manual/user/java-sdk/properties -- **Nacos 社区** : https://github.com/alibaba/nacos \ No newline at end of file +- **Nacos 社区** : https://github.com/alibaba/nacos +- **基于Apache RocketMQ 实现高可靠异步通信的AgentScope智能体应用演示案例** : https://github.com/agentscope-ai/agentscope-runtime-java/tree/main/examples/simple_agent_use_rocketmq_example +- **Apache RocketMQ 社区** : https://github.com/apache/rocketmq +- **Apache RocketMQ A2A异步通信组件** : https://github.com/apache/rocketmq-a2a