Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b293231
update architecture
qqeasonchen Dec 24, 2025
1649391
update architecture image
qqeasonchen Dec 24, 2025
be74a2b
Merge branch 'apache:master' into master
qqeasonchen Dec 24, 2025
cb123a5
Test: Add unit test for SendAsyncEventProcessor to verify V1 and V2 l…
qqeasonchen Dec 24, 2025
1ace3f1
Test & Docs: Add unit tests for core engines and documentation for pl…
qqeasonchen Dec 24, 2025
6f28b79
feat: unify Connector, Function, and Core Runtime into eventmesh-runtime
qqeasonchen Dec 25, 2025
7f1e413
Refactor: Unified Ingress/Egress pipelines using IngressProcessor and…
qqeasonchen Dec 25, 2025
9fc61e2
Refactor: Complete HTTP and gRPC processor migration to unified Pipel…
qqeasonchen Dec 29, 2025
90f93d4
Merge branch 'apache:master' into refactor/unified-runtime-pipeline
qqeasonchen May 9, 2026
0fb26b7
[ISSUE #5247] Add A2A Agent Card Registry to EventMesh (#5246)
qqeasonchen May 11, 2026
83c3acc
Fix CI build: remove eventmesh-runtime-v2 from includedProjects list
qqeasonchen May 12, 2026
e1e5692
Merge upstream/master into refactor/unified-runtime-pipeline
qqeasonchen May 12, 2026
d0868c5
Resolve conflict: keep PR pipeline key feature
May 12, 2026
69829c6
Merge branch 'apache:master' into refactor/unified-runtime-pipeline
qqeasonchen Jun 30, 2026
03963a1
[ISSUE] Fix all review issues in unified-runtime-pipeline
qqeasonchen Jun 30, 2026
3422903
Revert "[ISSUE] Fix all review issues in unified-runtime-pipeline"
qqeasonchen Jun 30, 2026
d419b28
Fix Source connector filtered-event NPE
qqeasonchen Jun 30, 2026
c2e2c3d
[ISSUE #XXXX] Implement unified runtime pipeline, connector service, …
qqeasonchen Jul 1, 2026
884503b
fix(runtime): 完善 UnifiedRuntime 集成测试与 Pipeline 重构
qqeasonchen Jul 1, 2026
341cd53
docs: update unified-runtime-design to v2.0 Chinese architecture review
Jul 2, 2026
0054aaa
docs: add code-review results and implementation status to unified-ru…
qqeasonchen Jul 2, 2026
81f0779
docs: add current architecture problems review
Jul 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ tasks.register('dist') {
"eventmesh-registry:eventmesh-registry-api",
"eventmesh-retry:eventmesh-retry-api",
"eventmesh-runtime",
"eventmesh-runtime-v2",
"eventmesh-security-plugin:eventmesh-security-api",
"eventmesh-spi",
"eventmesh-starter",
Expand Down
612 changes: 612 additions & 0 deletions docs/eventmesh-current-architecture-problems.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion docs/plugins/core-engines-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ The configuration is not in local property files but distributed via the MetaSto

- **Data Source**: Configured via `eventMesh.metaStorage.plugin.type`.
- **Loading Mechanism**: Lazy loading & Hot-reloading.
- **Key Format**: `{EnginePrefix}-{GroupName}`.
- **Key Format**: `{EnginePrefix}-{GroupName}-{TopicName}`.
- **Value Format**: JSON Array.
- **Pipeline Key**: The engines are invoked using a pipeline key of format `{GroupName}-{TopicName}`, which is used to look up configurations with the prefix.
Comment on lines +29 to +31

| Engine | Prefix | Scope | Description |
| :--- | :--- | :--- | :--- |
| **Router** | `router-` | Pub Only | Routes messages to different topics. |
| **Filter** | `filter-` | Pub & Sub | Filters messages based on CloudEvent attributes. |
| **Transformer** | `transformer-` | Pub & Sub | Transforms message content (Payload/Header). |

**Note**: All protocol processors (TCP, HTTP, gRPC) now use unified `IngressProcessor` (for publishing) and `EgressProcessor` (for consuming) to consistently apply these engines.

---

## 2. Router (Routing)
Expand Down
1,107 changes: 1,107 additions & 0 deletions docs/unified-runtime-design.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,113 @@ public class CommonConfiguration {
@ConfigField(field = "registry.plugin.enabled")
private boolean eventMeshRegistryPluginEnabled = false;

@ConfigField(field = "connector.plugin.type")
private String eventMeshConnectorPluginType;

@ConfigField(field = "connector.plugin.name")
private String eventMeshConnectorPluginName;

@ConfigField(field = "connector.plugin.enabled")
private boolean eventMeshConnectorPluginEnable = false;

// ========== Unified Runtime: Connector Runtime ==========

@ConfigField(field = "connector.plugin.config.path")
private String eventMeshConnectorConfigPath = "conf/connectors/";

@ConfigField(field = "connector.thread.pool.size")
private int eventMeshConnectorThreadPoolSize = 4;

@ConfigField(field = "connector.max.retry")
private int eventMeshConnectorMaxRetry = 3;

@ConfigField(field = "connector.max.count")
private int eventMeshConnectorMaxCount = 16;

@ConfigField(field = "connector.pool.mode")
private String eventMeshConnectorPoolMode = "DEDICATED";

@ConfigField(field = "connector.verify.enabled")
private boolean eventMeshConnectorVerifyEnabled = false;

// ========== Unified Runtime: Admin Server ==========

@ConfigField(field = "admin.server.enabled")
private boolean eventMeshAdminServerEnabled = true;

@ConfigField(field = "admin.server.required")
private boolean eventMeshAdminServerRequired = false;

@ConfigField(field = "admin.server.address")
private String eventMeshAdminServerAddress = "localhost:50051";

@ConfigField(field = "admin.server.registry.type")
private String eventMeshAdminServerRegistryType = "static";

@ConfigField(field = "admin.server.heartbeat.interval.seconds")
private int eventMeshAdminHeartbeatIntervalSeconds = 5;

@ConfigField(field = "admin.server.monitor.report.interval.seconds")
private int eventMeshAdminMonitorReportIntervalSeconds = 30;

// ========== Unified Runtime: Offset Management ==========

@ConfigField(field = "offset.local.enabled")
private boolean eventMeshOffsetLocalEnabled = true;

@ConfigField(field = "offset.local.path")
private String eventMeshOffsetLocalPath = "data/offset/";

@ConfigField(field = "offset.remote.enabled")
private boolean eventMeshOffsetRemoteEnabled = false;

@ConfigField(field = "offset.remote.sync.interval.seconds")
private int eventMeshOffsetRemoteSyncIntervalSeconds = 60;

// ========== Unified Runtime: Pipeline ==========

@ConfigField(field = "pipeline.ingress.filters")
private String eventMeshPipelineIngressFilters = "auth,ratelimit,protocol";

@ConfigField(field = "pipeline.ingress.transformers")
private String eventMeshPipelineIngressTransformers = "protocol,enrichment";

@ConfigField(field = "pipeline.egress.filters")
private String eventMeshPipelineEgressFilters = "acl,sizelimit";

@ConfigField(field = "pipeline.egress.transformers")
private String eventMeshPipelineEgressTransformers = "protocol";

@ConfigField(field = "pipeline.dlq.enabled")
private boolean eventMeshPipelineDlqEnabled = true;

@ConfigField(field = "pipeline.dlq.topic")
private String eventMeshPipelineDlqTopic = "eventmesh-dlq";

// ========== Unified Runtime: A2A ==========

@ConfigField(field = "a2a.enabled")
private boolean eventMeshA2aEnabled = false;

@ConfigField(field = "a2a.gateway.port")
private int eventMeshA2aGatewayPort = 8080;

@ConfigField(field = "a2a.registry.ttl.seconds")
private int eventMeshA2aRegistryTtlSeconds = 30;

@ConfigField(field = "a2a.sse.max.connections")
private int eventMeshA2aSseMaxConnections = 1000;

// ========== Unified Runtime: FilePersistentOffsetStore ==========

@ConfigField(field = "file.offset.store.flush.interval.seconds")
private int eventMeshFileOffsetStoreFlushIntervalSeconds = 10;

// ========== Unified Runtime: Trace Context ==========

@ConfigField(field = "pipeline.trace.enabled")
private boolean eventMeshPipelineTraceEnabled = true;

public void reload() {

if (Strings.isNullOrEmpty(this.eventMeshServerIp)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol.pipeline;

import java.util.HashMap;
import java.util.Map;

/**
* Context passed through every pipeline stage.
* Carries metadata, trace info, and stage-specific configuration.
*/
public class PipelineContext {

/** Pipeline direction */
public enum Direction { INGRESS, EGRESS }

private final Direction direction;
private final String entryProtocol; // tcp / http / grpc / a2a / connector
private final Map<String, Object> attributes;
private String traceId;
private long startTimeMs;

public PipelineContext(Direction direction, String entryProtocol) {
this.direction = direction;
this.entryProtocol = entryProtocol;
this.attributes = new HashMap<>();
this.startTimeMs = System.currentTimeMillis();
}

// ---- Accessors ----

public Direction getDirection() { return direction; }
public String getEntryProtocol() { return entryProtocol; }
public String getTraceId() { return traceId; }

public void setTraceId(String traceId) { this.traceId = traceId; }

public long getStartTimeMs() { return startTimeMs; }
public long getElapsedMs() { return System.currentTimeMillis() - startTimeMs; }

// ---- Attribute helpers ----

public void setAttribute(String key, Object value) {
attributes.put(key, value);
}

public Object getAttribute(String key) {
return attributes.get(key);
}

@SuppressWarnings("unchecked")
public <T> T getAttribute(String key, Class<T> type) {
Object v = attributes.get(key);
if (v == null) return null;
if (type.isInstance(v)) return (T) v;
return null;
}

public Map<String, Object> getAttributes() {
return new HashMap<>(attributes);
}

@Override
public String toString() {
return "PipelineContext{direction=" + direction
+ ", protocol=" + entryProtocol
+ ", traceId=" + traceId
+ ", elapsed=" + getElapsedMs() + "ms}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol.pipeline;

import java.util.HashMap;
import java.util.Map;

import io.cloudevents.CloudEvent;

/**
* Unified pipeline result with explicit action semantics.
* Replaces the ambiguous {@code null} return value with a clear
* {@link Action} enum that every pipeline stage mutates.
*/
public class PipelineResult {

public enum Action {
/** Continue to next stage normally */
CONTINUE,
/** Silently drop the event */
DROP,
/** Retry the event (with retry count in metadata) */
RETRY,
/** Route to dead-letter queue */
DLQ,
/** Fatal failure — raise alert */
FAIL
}

private Action action;
private CloudEvent event;
private Throwable cause;
private final Map<String, String> metadata;

private PipelineResult(Action action, CloudEvent event, Throwable cause) {
this.action = action;
this.event = event;
this.cause = cause;
this.metadata = new HashMap<>();
}

// ---- Factory methods ----

public static PipelineResult cont(CloudEvent event) {
return new PipelineResult(Action.CONTINUE, event, null);
}

public static PipelineResult drop(CloudEvent event) {
return new PipelineResult(Action.DROP, event, null);
}

public static PipelineResult retry(CloudEvent event, int retryCount) {
PipelineResult r = new PipelineResult(Action.RETRY, event, null);
r.metadata.put("retryCount", String.valueOf(retryCount));
return r;
}

public static PipelineResult dlq(CloudEvent event, Throwable cause) {
return new PipelineResult(Action.DLQ, event, cause);
}

public static PipelineResult fail(CloudEvent event, Throwable cause) {
return new PipelineResult(Action.FAIL, event, cause);
}

// ---- Accessors ----

public Action getAction() { return action; }
public void setAction(Action a) { this.action = a; }

public CloudEvent getEvent() { return event; }
public void setEvent(CloudEvent e) { this.event = e; }

public Throwable getCause() { return cause; }
public void setCause(Throwable c) { this.cause = c; }

public Map<String, String> getMetadata() { return metadata; }

public void addMeta(String key, String value) {
this.metadata.put(key, value);
}

public String getMeta(String key) {
return metadata.get(key);
}

/** Convenience: was this stage passed? */
public boolean passed() {
return action == Action.CONTINUE;
}

@Override
public String toString() {
return "PipelineResult{action=" + action + ", event="
+ (event != null ? event.getId() : "null") + ", cause=" + cause + '}';
}
}
Loading
Loading