Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 148 additions & 1 deletion docs/en/task/a2a.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,150 @@ agent.interrupt(Msg.builder()
.build());
```

---
## Achieving Highly Reliable Asynchronous Communication with Apache RocketMQ
Note: Apache RocketMQ must be deployed with either an open-source version or a commercial edition that supports the lightweight consumption model LiteTopic(The open-source version is expected to be released in January — stay tuned!)

### Configure the client to use Apache RocketMQ as the communication channel

```xml
<!-- Add the dependency in pom.xml -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>
```
The client uses Apache RocketMQ to build A2aAgent

```java
//Construct a RocketMQTransportConfig object to configure RocketMQTransport.
RocketMQTransportConfig rocketMQTransportConfig = new RocketMQTransportConfig();
//Configure the Apache RocketMQ account
rocketMQTransportConfig.setAccessKey(accessKey);
//Configure the Apache RocketMQ password
rocketMQTransportConfig.setSecretKey(secretKey);
//Configure a LiteTopic to receive response messages
rocketMQTransportConfig.setWorkAgentResponseTopic(workAgentResponseTopic);
//Configure a consumer that subscribes to the LiteTopic
rocketMQTransportConfig.setWorkAgentResponseGroupID(workAgentResponseGroupID);
//Configure the namespace for Apache RocketMQ
rocketMQTransportConfig.setRocketMQNamespace(rocketMQNamespace);
rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient());
//Build A2aAgentConfig using RocketMQTransport and rocketMQTransportConfig
A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build();
//Parse the corresponding Agent service and build an A2aAgent
A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://127.0.0.1:10001").build()).build();
```

| Parameter | Type | Description | Required |
|-----------|------|-----------|----|
| `accessKey` | String | RocketMQ AccessKey | NO |
| `secretKey` | String | RocketMQ SecretKey | NO |
| `workAgentResponseTopic` | String | LiteTopic | YES |
| `workAgentResponseGroupID` | String | The CID of the consumer subscribed to LiteTopic | YES |
| `rocketMQNamespace` | String | RocketMQ Namespace | NO |
Comment on lines +329 to +335

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The values in the Required column (YES/NO) are inconsistent with another table in this document (lines 352-356) which uses Yes/NO. To maintain consistency, please use Yes and No in all tables.

Suggested change
| Parameter | Type | Description | Required |
|-----------|------|-----------|----|
| `accessKey` | String | RocketMQ AccessKey | NO |
| `secretKey` | String | RocketMQ SecretKey | NO |
| `workAgentResponseTopic` | String | LiteTopic | YES |
| `workAgentResponseGroupID` | String | The CID of the consumer subscribed to LiteTopic | YES |
| `rocketMQNamespace` | String | RocketMQ Namespace | NO |
| Parameter | Type | Description | Required |
| ------------------------ | ------ | -------------------------------------------- | -------- |
| `accessKey` | String | RocketMQ AccessKey | No |
| `secretKey` | String | RocketMQ SecretKey | No |
| `workAgentResponseTopic` | String | LiteTopic | Yes |
| `workAgentResponseGroupID` | String | The CID of the consumer subscribed to LiteTopic | Yes |
| `rocketMQNamespace` | String | RocketMQ Namespace | No |


### The server exposes an Agent service externally over the Apache RocketMQ communication protocol.

Construct the URL for the Apache RocketMQ communication protocol.

```java
private static String buildRocketMQUrl(String rocketMQEndpoint, String rocketMQNamespace, String bizTopic) {
if (StringUtils.isEmpty(rocketMQEndpoint) || StringUtils.isEmpty(bizTopic)) {
throw new IllegalArgumentException(
"Invalid parameters for building RocketMQ URL: 'rocketMQEndpoint' and 'bizTopic' must not be empty. Please check your RocketMQ configuration."
);
}
return "http://" + rocketMQEndpoint + "/" + rocketMQNamespace + "/" + bizTopic;
}
```

| Parameter | Type | Description | Required |
|-----------|------|-------------|----|
| `rocketmqEndpoint` | String | RocketMQ Endpoint | Yes |
| `rocketMQNamespace` | String | RocketMQ Namespace | NO |
| `bizTopic` | String | Normal Topic | Yes |
Comment on lines +352 to +356

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The values in the Required column are inconsistent. This table uses Yes and NO, while the previous one uses YES and NO. For consistency across the document, please use Yes and No.

Suggested change
| Parameter | Type | Description | Required |
|-----------|------|-------------|----|
| `rocketmqEndpoint` | String | RocketMQ Endpoint | Yes |
| `rocketMQNamespace` | String | RocketMQ Namespace | NO |
| `bizTopic` | String | Normal Topic | Yes |
| Parameter | Type | Description | Required |
| ------------------ | ------ | ------------------- | -------- |
| `rocketmqEndpoint` | String | RocketMQ Endpoint | Yes |
| `rocketMQNamespace`| String | RocketMQ Namespace | No |
| `bizTopic` | String | Normal Topic | Yes |


The server externally exposes the AgentCard service

```java
//Externally expose the AgentCard service based on RocketMQ communication
AgentInterface agentInterface = new AgentInterface(RocketMQA2AConstant.ROCKETMQ_PROTOCOL, buildRocketMQUrl());
ConfigurableAgentCard agentCard = new ConfigurableAgentCard.Builder()
.url(buildRocketMQUrl())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The buildRocketMQUrl() method is called again here. To improve readability and avoid redundant calls, it's better to call it once before line 362, store its result in a local variable, and reuse that variable here and on line 362.

.preferredTransport(RocketMQA2AConstant.ROCKETMQ_PROTOCOL)
.additionalInterfaces(List.of(agentInterface))
.description("An intelligent assistant enabling highly reliable asynchronous communication based on Apache RocketMQ.")
.build();
//Configure the DASHSCOPE_API_KEY to invoke the LLM service
AgentApp agentApp = new AgentApp(agent(agentBuilder(dashScopeChatModel(DASHSCOPE_API_KEY))));
agentApp.deployManager(LocalDeployManager.builder()
.protocolConfigs(List.of(new A2aProtocolConfig(agentCard, 60, 10)))
.port(10001)
.build());
```
```java
//Build a DashScopeChatModel to invoke the LLM service.
private static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) {
if (StringUtils.isEmpty(dashScopeApiKey)) {
throw new IllegalArgumentException(
"DashScope API key is empty. Please set the environment variable `AI_DASHSCOPE_API_KEY`."
);
}
return DashScopeChatModel.builder()
.apiKey(dashScopeApiKey)
.modelName("qwen-max")
.stream(true)
.enableThinking(true)
.build();
}
```

```java
//Build ReActAgent.Builder
private static ReActAgent.Builder agentBuilder(DashScopeChatModel model) {
return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("You are an example implementation of the A2A (Agent-to-Agent) protocol using RocketMQTransport. You can answer simple questions based on your internal knowledge.");
}
```
```java
//Build AgentScopeAgentHandler
private static AgentScopeAgentHandler agent(ReActAssistant.Builder builder) {
return new AgentScopeAgentHandler() {
@Override
public boolean isHealthy() {
return true;
}
@Override
public Flux<?> streamQuery(AgentRequest request, Object messages) {
ReActAgent agent = builder.build();
StreamOptions streamOptions = StreamOptions.builder()
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
.incremental(true)
.build();

if (messages instanceof List<?>) {
return agent.stream((List<Msg>) messages, streamOptions);
} else if (messages instanceof Msg) {
return agent.stream((Msg) messages, streamOptions);
} else {
Msg msg = Msg.builder().role(MsgRole.USER).build();
return agent.stream(msg, streamOptions);
}
}

@Override
public String getName() {
return builder.build().getName();
}

@Override
public String getDescription() {
return builder.build().getDescription();
}
};
}
Comment on lines +401 to +435

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are a couple of issues in this agent helper method:

  1. The parameter type ReActAssistant.Builder seems to be a typo and should likely be ReActAgent.Builder.
  2. The builder.build() method is called in getName() and getDescription(), which is inefficient as it creates a new agent instance just to retrieve a property. It's better to build the agent once if its properties are needed in multiple places within the handler.

Here is a suggested refactoring to address these points:

//Build AgentScopeAgentHandler
private static AgentScopeAgentHandler agent(ReActAgent.Builder builder) {
    // Build the agent once to get its name and description.
    ReActAgent agentForInfo = builder.build();
    return new AgentScopeAgentHandler() {
        @Override
        public boolean isHealthy() {
            return true;
        }
        @Override
        public Flux<?> streamQuery(AgentRequest request, Object messages) {
            // A new agent instance is created for each query to ensure isolation.
            ReActAgent agent = builder.build();
            StreamOptions streamOptions = StreamOptions.builder()
                .eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
                .incremental(true)
                .build();

            if (messages instanceof List<?>) {
                return agent.stream((List<Msg>) messages, streamOptions);
            } else if (messages instanceof Msg) {
                return agent.stream((Msg) messages, streamOptions);
            } else {
                Msg msg = Msg.builder().role(MsgRole.USER).build();
                return agent.stream(msg, streamOptions);
            }
        }

        @Override
        public String getName() {
            return agentForInfo.getName();
        }

        @Override
        public String getDescription() {
            return agentForInfo.getDescription();
        }
    };
}

```
---

## More Resources
Expand All @@ -299,4 +443,7 @@ agent.interrupt(Msg.builder()
- **Nacos Quick Start**: https://nacos.io/docs/latest/quickstart/quick-start
- **Nacos Java SDK**: https://nacos.io/docs/latest/manual/user/java-sdk/usage
- **Nacos Java SDK Additional Configuration Parameters**: https://nacos.io/docs/latest/manual/user/java-sdk/properties
- **Nacos Community**: https://github.com/alibaba/nacos
- **Nacos Community**: https://github.com/alibaba/nacos
- **A demonstration of an AgentScope intelligent agent application based on Apache RocketMQ**: https://github.com/agentscope-ai/agentscope-runtime-java/tree/main/examples/simple_agent_use_rocketmq_example
- **Apache RocketMQ Community** : https://github.com/apache/rocketmq
- **Apache RocketMQ A2A Asynchronous Communication Component** : https://github.com/apache/rocketmq-a2a
Comment on lines +448 to +449

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are extra spaces before the colons in these list items. For consistency with the other items in this list and standard markdown formatting, please remove the spaces.

Suggested change
- **Apache RocketMQ Community** : https://github.com/apache/rocketmq
- **Apache RocketMQ A2A Asynchronous Communication Component** : https://github.com/apache/rocketmq-a2a
- **Apache RocketMQ Community**: https://github.com/apache/rocketmq
- **Apache RocketMQ A2A Asynchronous Communication Component**: https://github.com/apache/rocketmq-a2a

146 changes: 145 additions & 1 deletion docs/zh/task/a2a.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,147 @@ agent.interrupt(Msg.builder()
.build());
```

## 基于Apache RocketMQ实现高可靠的异步通信
注: Apache RocketMQ 需支持轻量级消费模型LiteTopic的开源版本或商业版本(开源版本预计将在1月发布,敬请期待)

### 客户端配置Apache RocketMQ作为通信通道

```xml
<!--在pom.xml 中添加依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>
```
客户端使用Apache RocketMQ 构建A2aAgent

```java
//构建RocketMQTransportConfig对象用于配置RocketMQTransport
RocketMQTransportConfig rocketMQTransportConfig = new RocketMQTransportConfig();
//配置Apache RocketMQ账号
rocketMQTransportConfig.setAccessKey(accessKey);
//配置Apache RocketMQ密码
rocketMQTransportConfig.setSecretKey(secretKey);
//配置接收响应结果的轻量级LiteTopic
rocketMQTransportConfig.setWorkAgentResponseTopic(workAgentResponseTopic);
//配置订阅轻量级LiteTopic的消费者CID
rocketMQTransportConfig.setWorkAgentResponseGroupID(workAgentResponseGroupID);
//配置Apache RocketMQ的命名空间
rocketMQTransportConfig.setRocketMQNamespace(rocketMQNamespace);
rocketMQTransportConfig.setHttpClient(new JdkA2AHttpClient());
//使用RocketMQTransport和rocketMQTransportConfig 构建A2aAgentConfig
A2aAgentConfig a2aAgentConfig = new A2aAgentConfigBuilder().withTransport(RocketMQTransport.class, rocketMQTransportConfig).build();
//解析对应的Agent服务构建A2aAgent
A2aAgent agent = A2aAgent.builder().a2aAgentConfig(a2aAgentConfig).name(AGENT_NAME).agentCardResolver(WellKnownAgentCardResolver.builder().baseUrl("http://127.0.0.1:10001").build()).build();
```
| 参数 | 类型 | 描述 | 是否必填 |
|-------------------------------|--------|-------------------|------|
| `accessKey` | String | Apache RocketMQ账号 | 否 |
| `secretKey` | String | Apache RocketMQ密码 | 否 |
| `workAgentResponseTopic` | String | 轻量级LiteTopic | 是 |
| `workAgentResponseGroupID` | String | 轻量级消费者CID | 是 |
| `rocketMQNamespace` | String | Apache RocketMQ命名空间 | 否 |

### 服务端对外开放基于Apache RocketMQ通信协议的Agent服务
构建Apache RocketMQ通信协议的URL

```java
private static String buildRocketMQUrl(String rocketMQEndpoint, String rocketMQNamespace, String bizTopic) {
if (StringUtils.isEmpty(rocketMQEndpoint) || StringUtils.isEmpty(bizTopic)) {
throw new IllegalArgumentException(
"Invalid parameters for building RocketMQ URL: 'rocketMQEndpoint' and 'bizTopic' must not be empty. Please check your RocketMQ configuration."
);
}
return "http://" + rocketMQEndpoint + "/" + rocketMQNamespace + "/" + bizTopic;
}
```

| 参数 | 类型 | 描述 | 是否必填 |
|---------------------|--------|----------------------|------|
| `rocketMQEndpoint` | String | Apache RocketMQ服务接入点 | 是 |
| `rocketMQNamespace` | String | Apache RocketMQ命名空间 | 否 |
| `bizTopic` | String | 普通Topic | 是 |

服务端对外开放Agent服务

```java
//对外开放基于Apache RocketMQ通信的AgentCard服务
AgentInterface agentInterface = new AgentInterface(RocketMQA2AConstant.ROCKETMQ_PROTOCOL, buildRocketMQUrl());
ConfigurableAgentCard agentCard = new ConfigurableAgentCard.Builder()
.url(buildRocketMQUrl())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The buildRocketMQUrl() method is called again here. To improve readability and avoid redundant calls, it's better to call it once before line 359, store its result in a local variable, and reuse that variable here and on line 359.

.preferredTransport(RocketMQA2AConstant.ROCKETMQ_PROTOCOL)
.additionalInterfaces(List.of(agentInterface))
.description("基于Apache RocketMQ进行高可靠异步通信的智能助手")
.build();
//配置DASHSCOPE_API_KEY以调用LLM服务
AgentApp agentApp = new AgentApp(agent(agentBuilder(dashScopeChatModel(DASHSCOPE_API_KEY))));
agentApp.deployManager(LocalDeployManager.builder()
.protocolConfigs(List.of(new A2aProtocolConfig(agentCard, 60, 10)))
.port(10001)
.build());
```

```java
//构建DashScopeChatModel 用于调用LLM服务
public static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method is declared public, but the corresponding method in the English documentation is private. To maintain consistency and encapsulate helper methods, it's better to use private. This also applies to agentBuilder (line 392) and agent (line 398) methods that follow.

Suggested change
public static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) {
private static DashScopeChatModel dashScopeChatModel(String dashScopeApiKey) {

if (StringUtils.isEmpty(dashScopeApiKey)) {
throw new IllegalArgumentException(
"DashScope API key is empty. Please set the environment variable `AI_DASHSCOPE_API_KEY`."
);
}
return DashScopeChatModel.builder()
.apiKey(dashScopeApiKey)
.modelName("qwen-max")
.stream(true)
.enableThinking(true)
.build();
}
```
```java
//构建ReActAgent.Builder
public static ReActAgent.Builder agentBuilder(DashScopeChatModel model) {
return ReActAgent.builder().model(model).name(AGENT_NAME).sysPrompt("你是一个基于 RocketMQTransport 实现的 A2A(Agent-to-Agent,智能体间)协议的示例。你可以根据自身内置知识回答简单问题。");
}
```
```java
//构建AgentScopeAgentHandler
public static AgentScopeAgentHandler agent(ReActAssistant.Builder builder) {
return new AgentScopeAgentHandler() {
@Override
public boolean isHealthy() {
return true;
}
@Override
public Flux<?> streamQuery(AgentRequest request, Object messages) {
ReActAgent agent = builder.build();
StreamOptions streamOptions = StreamOptions.builder()
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
.incremental(true)
.build();

if (messages instanceof List<?>) {
return agent.stream((List<Msg>) messages, streamOptions);
} else if (messages instanceof Msg) {
return agent.stream((Msg) messages, streamOptions);
} else {
Msg msg = Msg.builder().role(MsgRole.USER).build();
return agent.stream(msg, streamOptions);
}
}

@Override
public String getName() {
return builder.build().getName();
}

@Override
public String getDescription() {
return builder.build().getDescription();
}
};
}
Comment on lines +398 to +432

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are a couple of issues in this agent helper method:

  1. The parameter type ReActAssistant.Builder seems to be a typo and should likely be ReActAgent.Builder.
  2. The builder.build() method is called in getName() and getDescription(), which is inefficient as it creates a new agent instance just to retrieve a property. It's better to build the agent once if its properties are needed in multiple places within the handler.

Here is a suggested refactoring to address these points:

//构建AgentScopeAgentHandler
private static AgentScopeAgentHandler agent(ReActAgent.Builder builder) {
    // Build the agent once to get its name and description.
    ReActAgent agentForInfo = builder.build();
    return new AgentScopeAgentHandler() {
        @Override
        public boolean isHealthy() {
            return true;
        }
        @Override
        public Flux<?> streamQuery(AgentRequest request, Object messages) {
            // A new agent instance is created for each query to ensure isolation.
            ReActAgent agent = builder.build();
            StreamOptions streamOptions = StreamOptions.builder()
                .eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
                .incremental(true)
                .build();

            if (messages instanceof List<?>) {
                return agent.stream((List<Msg>) messages, streamOptions);
            } else if (messages instanceof Msg) {
                return agent.stream((Msg) messages, streamOptions);
            } else {
                Msg msg = Msg.builder().role(MsgRole.USER).build();
                return agent.stream(msg, streamOptions);
            }
        }

        @Override
        public String getName() {
            return agentForInfo.getName();
        }

        @Override
        public String getDescription() {
            return agentForInfo.getDescription();
        }
    };
}

```
---

## 更多资源
Expand All @@ -299,4 +440,7 @@ agent.interrupt(Msg.builder()
- **Nacos 快速开始** : https://nacos.io/docs/latest/quickstart/quick-start
- **Nacos Java SDK** : https://nacos.io/docs/latest/manual/user/java-sdk/usage
- **Nacos Java SDK 更多配置参数** : https://nacos.io/docs/latest/manual/user/java-sdk/properties
- **Nacos 社区** : https://github.com/alibaba/nacos
- **Nacos 社区** : https://github.com/alibaba/nacos
- **基于Apache RocketMQ 实现高可靠异步通信的AgentScope智能体应用演示案例** : https://github.com/agentscope-ai/agentscope-runtime-java/tree/main/examples/simple_agent_use_rocketmq_example
- **Apache RocketMQ 社区** : https://github.com/apache/rocketmq
- **Apache RocketMQ A2A异步通信组件** : https://github.com/apache/rocketmq-a2a
Comment on lines +445 to +446

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There are extra spaces before the colons in these list items. For consistency with the other items in this list and standard markdown formatting, please remove the spaces.

Suggested change
- **Apache RocketMQ 社区** : https://github.com/apache/rocketmq
- **Apache RocketMQ A2A异步通信组件** : https://github.com/apache/rocketmq-a2a
- **Apache RocketMQ 社区**: https://github.com/apache/rocketmq
- **Apache RocketMQ A2A异步通信组件**: https://github.com/apache/rocketmq-a2a

Loading