[WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250
Open
qqeasonchen wants to merge 22 commits into
Open
[WIP][ISSUE #5249]Refactor/unified runtime pipeline and protocol processors#5250qqeasonchen wants to merge 22 commits into
qqeasonchen wants to merge 22 commits into
Conversation
…ugin configuration
… EgressProcessor - Introduced IngressProcessor (Filter -> Transformer -> Router) and EgressProcessor (Filter -> Transformer) to centralize pipeline logic. - Refactored ClientGroupWrapper (TCP SDK) to use these processors. - Refactored EventMeshConnectorBootstrap (Connectors) to use these processors. - Updated SinkWorker to support embedded mode for unified runtime execution. - Updated Unified Runtime Design documentation to reflect architectural clarity.
…ine architecture
This commit completes the migration of all HTTP and gRPC protocol processors to use
the unified IngressProcessor and EgressProcessor pipeline architecture, ensuring
consistent Filter-Transformer-Router processing across all protocols.
Changes:
- HTTP Processors (5): Migrated SendAsyncMessageProcessor, SendSyncMessageProcessor,
BatchSendMessageProcessor, BatchSendMessageV2Processor, and refactored
SendAsyncEventProcessor to use IngressProcessor
- gRPC Processors (3): Migrated PublishCloudEventsProcessor,
BatchPublishCloudEventProcessor, and RequestCloudEventProcessor with bidirectional
pipeline support (Ingress for requests, Egress for responses)
- Added BatchProcessResult utility class to track success/filtered/failed counts for
batch processing with detailed statistics
- Added comprehensive unit tests: IngressProcessorTest, EgressProcessorTest,
BatchProcessResultTest, and enhanced SendAsyncEventProcessorTest
- Added IngressProcessor and EgressProcessor getters to EventMeshServer and
EventMeshGrpcServer for cross-module access
- Updated design documentation (unified-runtime-design.md) to reflect the new
architecture and migration status
- Updated configuration guide (core-engines-configuration.md) with pipeline key format
Architecture improvements:
- Unified pipeline key format: {producerGroup}-{topic}
- Consistent filter behavior: filtered messages return SUCCESS status
(except request-reply returns error)
- Router support: topic changes tracked with finalTopic variable
- Batch statistics: detailed success/filtered/failed counts with message IDs
- Bidirectional processing: RequestCloudEventProcessor applies Ingress to requests
and Egress to responses
All tests pass with no regressions.
Comment on lines
+20
to
+35
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.ArgumentMatchers.anyString; | ||
| import static org.mockito.Mockito.doReturn; | ||
| import static org.mockito.Mockito.lenient; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.spy; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| import org.apache.eventmesh.api.SendCallback; | ||
| import org.apache.eventmesh.common.protocol.tcp.Header; | ||
| import org.apache.eventmesh.common.protocol.tcp.UserAgent; | ||
| import org.apache.eventmesh.function.api.Router; | ||
| import org.apache.eventmesh.function.filter.pattern.Pattern; | ||
| import org.apache.eventmesh.function.transformer.Transformer; | ||
| import org.apache.eventmesh.runtime.boot.EventMeshServer; |
Comment on lines
+118
to
+125
| // Let's try to set the internal mqProducerWrapper field using reflection. | ||
| try { | ||
| java.lang.reflect.Field field = ClientGroupWrapper.class.getDeclaredField("mqProducerWrapper"); | ||
| field.setAccessible(true); | ||
| field.set(clientGroupWrapper, mqProducerWrapper); | ||
| } catch (Exception e) { | ||
| e.printStackTrace(); | ||
| } |
Comment on lines
26
to
30
| import org.apache.eventmesh.api.SendResult; | ||
| import org.apache.eventmesh.api.exception.OnExceptionContext; | ||
| import org.apache.eventmesh.api.exception.StorageRuntimeException; | ||
| import org.apache.eventmesh.common.exception.EventMeshException; | ||
| import org.apache.eventmesh.common.protocol.SubscriptionItem; |
Comment on lines
143
to
+155
| this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType()); | ||
| this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType()); | ||
| this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType()); | ||
|
|
||
| this.ingressProcessor = new IngressProcessor( | ||
| eventMeshTCPServer.getEventMeshServer().getFilterEngine(), | ||
| eventMeshTCPServer.getEventMeshServer().getTransformerEngine(), | ||
| eventMeshTCPServer.getEventMeshServer().getRouterEngine() | ||
| ); | ||
| this.egressProcessor = new EgressProcessor( | ||
| eventMeshTCPServer.getEventMeshServer().getFilterEngine(), | ||
| eventMeshTCPServer.getEventMeshServer().getTransformerEngine() | ||
| ); |
Comment on lines
+46
to
+74
| // 1. Filter | ||
| org.apache.eventmesh.function.filter.pattern.Pattern filterPattern = filterEngine.getFilterPattern(pipelineKey); | ||
| if (filterPattern != null && event.getData() != null) { | ||
| String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); | ||
| if (!filterPattern.filter(content)) { | ||
| // Filtered out | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| // 2. Transformer | ||
| org.apache.eventmesh.function.transformer.Transformer transformer = transformerEngine.getTransformer(pipelineKey); | ||
| if (transformer != null && event.getData() != null) { | ||
| String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); | ||
| String transformedContent = transformer.transform(content); | ||
| event = CloudEventBuilder.from(event) | ||
| .withData(transformedContent.getBytes(StandardCharsets.UTF_8)) | ||
| .build(); | ||
| } | ||
|
|
||
| // 3. Router | ||
| org.apache.eventmesh.function.api.Router router = routerEngine.getRouter(pipelineKey); | ||
| if (router != null && event.getData() != null) { | ||
| String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); | ||
| String newTopic = router.route(content); | ||
| event = CloudEventBuilder.from(event) | ||
| .withSubject(newTopic) | ||
| .build(); | ||
| } |
Comment on lines
+112
to
+117
| @Getter | ||
| private A2APublishSubscribeService a2aPublishSubscribeService; | ||
|
|
||
| public A2APublishSubscribeService getA2APublishSubscribeService() { | ||
| return a2aPublishSubscribeService; | ||
| } |
Comment on lines
+20
to
+28
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.mockito.ArgumentMatchers.anyString; | ||
| import static org.mockito.Mockito.lenient; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; |
Comment on lines
+20
to
+28
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assertions.assertNull; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.mockito.ArgumentMatchers.anyString; | ||
| import static org.mockito.Mockito.lenient; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.when; |
Comment on lines
+20
to
+25
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.ArgumentMatchers.anyBoolean; | ||
| import static org.mockito.ArgumentMatchers.eq; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
Comment on lines
+48
to
+55
| import org.apache.eventmesh.runtime.metrics.http.EventMeshHttpMetricsManager; | ||
| import org.apache.eventmesh.runtime.metrics.http.HttpMetrics; | ||
| import org.apache.eventmesh.runtime.util.RemotingHelper; | ||
|
|
||
| import java.net.InetSocketAddress; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.HashMap; | ||
| import java.util.Map; |
…5246) * update architecture * update architecture image * Test: Add unit test for SendAsyncEventProcessor to verify V1 and V2 logic integration * Test & Docs: Add unit tests for core engines and documentation for plugin configuration * 1.12.0-prepare (apache#5222) * Update copyright year to 2025 * update release version * feat: add A2A Agent Card Registry based on EMQX reference implementation - Add Agent Card Java model classes matching A2A spec (AgentCard, AgentInterface, AgentProvider, AgentCapabilities, AgentSkill, SecurityScheme, etc.) - Add Agent Card JSON Schema from EMQX for validation - Add AgentCardValidator with JSON Schema validation support - Add AgentIdentity with hierarchical ID (org_id/unit_id/agent_id) and discovery topic construction/parsing - Update A2AProtocolConstants with Agent Card operations, status constants, CE extension keys, and ID validation pattern - Update EnhancedA2AProtocolAdaptor for Agent Card operation routing and discovery topic support - Implement A2APublishSubscribeService with full Card Registry (CRUD, status tracking, event metadata augmentation) - Add A2ACardHttpHandler REST API for card management - Add AgentCardDemo example - Add json-schema-validator and protocol-a2a dependencies * fix: remove test files that reference refactoring code moved to separate PR Remove RouterEngineTest.java and SendAsyncEventProcessorTest.java which test RouterEngine/IngressProcessor pipeline code that was moved to the refactor/unified-runtime-pipeline branch. These files were added as part of the refactoring commits that have been separated into their own PR. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: resolve checkstyle violations blocking CI - Remove unused import java.nio.charset.StandardCharsets in A2APublishSubscribeService - Use try-with-resources for EventMeshTCPClient in example files to fix resource leak checkstyle warnings (AsyncPublish, SyncRequest, cloudevents AsyncPublish) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: resolve checkstyle import ordering and PMD violations - Remove unused java.util.Collections import and reorder imports to match checkstyle groups (org.apache.eventmesh, java, io, com, lombok) in A2APublishSubscribeService - Reorder java.* imports before io.* in A2ACardHttpHandler to match checkstyle ImportOrder rule - Inline EventMeshTCPClientConfig.builder() into factory calls in example files to avoid PMD DU anomaly warnings from standalone config variables Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: remove redundant and unused imports in AgentCardDemo - Remove redundant import for A2AAbstractDemo (same package) - Remove unused import for AgentIdentity Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: reorder imports in AgentCardValidator to match checkstyle ImportOrder Move java.* imports before com.* and lombok.* to comply with the project's import group ordering (org.apache.eventmesh, java, com, lombok). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: update FilterEngineTest and TransformerEngineTest for new constructors FilterEngine and TransformerEngine now require ProducerManager and ConsumerManager in addition to MetaStorage. Update test files to mock these additional dependencies. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: mike_xwm <mike_xwm@126.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
The eventmesh-runtime-v2 module was removed from settings.gradle but still referenced in build.gradle's includedProjects list, causing Gradle build failures.
Resolve conflicts: - docs/plugins/core-engines-configuration.md: Keep PR changes (Pipeline Key feature) - A2APublishSubscribeService.java: Keep upstream (Agent Card Registry) - FilterEngineTest.java: Keep upstream (ProducerManager/ConsumerManager) - TransformerEngineTest.java: Keep upstream (ProducerManager/ConsumerManager)
469ea79 to
e1e5692
Compare
P0: Fix NPE in EventMeshConnectorBootstrap when message filtered to null - Add null check after IngressProcessor.process() in SourceWorker publisher lambda - Skip send and log when message is filtered by pipeline P1.1: Eliminate duplicate IngressProcessor/EgressProcessor instances - EventMeshConnectorBootstrap and ClientGroupWrapper now use shared instances from EventMeshServer instead of creating their own P1.2: Fix SourceWorker synchronous blocking without timeout - Add 10s timeout to CountDownLatch.await() in embedded publisher mode - Throw EventMeshException on timeout instead of blocking indefinitely P1.3: Improve Router to support dynamic routing strategies - RouterBuilder now supports static, regex, and jsonpath routing - DefaultRouter renamed to StaticRouter; added RegexRouter and JsonPathRouter - Router config parsed as JSON with 'type' and 'targetTopic' fields P1.4: Fix missing trailing newline in EventMeshServer.java P2.1: Fix Transformer input semantic change - IngressProcessor and EgressProcessor now pass full CloudEvent JSON to Transformer.transform() instead of just raw payload data - Ensures backward compatibility with existing transformer rules P2.2: Integrate Pipeline into batch aggregation mode - BatchSendMessageProcessor batchEnabled branch now applies IngressProcessor per-event before aggregation - Filtered messages tracked in BatchProcessResult P2.3: Add periodic scan compensation to RouterEngine - Add ScheduledExecutorService scanning every 30s for new router configs - Consistent with FilterEngine and TransformerEngine patterns P2.4: Make A2APublishSubscribeService conditional - Add 'a2a.enabled' config field to CommonConfiguration (default: false) - Only initialize/start/shutdown A2A service when enabled Tests: - Update IngressProcessorTest and EgressProcessorTest to use anyString() matcher for Transformer/Router mocks, adapting to full CloudEvent JSON input Signed-off-by: Eason Chen <easonchen@example.com>
This reverts commit 03963a1.
When IngressProcessor filters a Source connector event, it returns null. The previous code still dereferenced event.getSubject() and event.getId(), which causes a NullPointerException. Keep the original topic and message id before pipeline processing, and use those values to complete the callback successfully when the event is filtered. This is a minimal correctness fix and does not change pipeline semantics. Signed-off-by: Eason Chen <easonchen@example.com>
…and admin client Architecture blueprint implementation per architecture-review.md: Pipeline Core (Phase 1): - PipelineFilter/PipelineTransformer/PipelineRouter interfaces - PipelineResult with 5 explicit actions (CONTINUE/DROP/RETRY/DLQ/FAIL) - PipelineContext with direction, protocol, trace, attributes - AuthFilter (non-bypassable, token/AK-SK validation) - RateLimitFilter (per-topic + per-client, bypassable) - ProtocolFilter (non-bypassable, CloudEvents spec compliance) - RuleFilter (topic allowlist/denylist + content rules, bypassable) - AclFilter (non-bypassable, IP deny/allow + delegate to existing Acl) - SizeLimitFilter (body size enforcement, bypassable) - Updated IngressProcessor/EgressProcessor with pipeline filter chain Connector Runtime (Phase 2): - ConnectorConfig (name, type, pluginClass, ThreadPoolMode, limits) - ConnectorRuntimeConfig (global: pool mode, max connectors, intervals) - ConnectorRuntimeService with DEDICATED/SHARED pool strategies - ConnectorLimitExceededException for max.count enforcement - ConnectorStatus with state, uptime, message counter, error tracking - JobInfo model for Admin Server compatibility - Fault isolation: per-connector try-catch + exponential backoff + auto-pause Offset & Admin (Phase 3): - OffsetStore interface + InMemoryOffsetStore implementation - AdminClient with heartbeat/monitor/offset-sync periodic tasks - JobApiController: full CRUD REST API (/admin/jobs/*) Tests (Phase 4): - PipelineAndConnectorTest: 35 tests covering all 6 filters, ConnectorRuntimeService lifecycle, OffsetStore CRUD, JobApiController, AdminClient, and PipelineResult edge cases
- 新增 UnifiedRuntimeIntegrationTest (10个测试模块, 覆盖全链路) - 新增 PipelineExtendedTest 补充边界测试用例 - 新增 Router/Transformer 实现 (StaticRoute/HeaderRoute/BroadcastRoute, Protocol/Enrichment Transformer) - 新增 ConnectorClassLoader/PluginLoader 支持插件隔离加载 - 新增 FilePersistentOffsetStore 支持 Offset 文件持久化 - 修复 AdminClient Job API 集成与状态机转换 - 修复 IngressProcessor 错误处理与指标采集 - 新增 ConnectorMonitor/PipelineMonitor 监控埋点 - 完善 CommonConfiguration 配置项与测试用例
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
#5249 This PR consolidates EventMesh's fragmented runtime architecture into a single unified runtime with standardized processing pipelines.
Commit 1: Unify Connector, Function, and Core Runtime into eventmesh-runtime
eventmesh-runtime-v2module entirely (33 files, ~1700 lines deleted), including its separate runtime lifecycle, connector/function managers, health/monitor/status services, meta storage, and configurationRouter,RouterBuilder,ConnectorEventPublisher,SourceWorker) intoeventmesh-runtimeandeventmesh-functionmodulesRouterEngine,FilterEngine,TransformerEngineas core processing engines ineventmesh-runtimeEventMeshConnectorBootstrapto manage connector lifecycle within the unified runtimeEventMeshServerto integrate engines and A2A service initializationClientGroupWrapper(TCP) to delegate to the new enginesCommit 2: Unified Ingress/Egress Pipelines
IngressProcessor— a centralized pipeline implementing Filter -> Transformer -> Router for incoming messagesEgressProcessor— a centralized pipeline implementing Filter -> Transformer for outgoing messagesClientGroupWrapper(TCP SDK) to useIngressProcessorandEgressProcessorEventMeshConnectorBootstrap(Connectors) to use the new processorsSinkWorkerto support embedded mode for unified runtime executionCommit 3: Complete HTTP and gRPC Processor Migration
SendAsyncMessageProcessor,SendSyncMessageProcessor,BatchSendMessageProcessor,BatchSendMessageV2Processor, andSendAsyncEventProcessorto useIngressProcessorPublishCloudEventsProcessor,BatchPublishCloudEventProcessor, andRequestCloudEventProcessorwith bidirectional pipeline supportBatchProcessResult— utility class for tracking success/filtered/failed countsIngressProcessorTest,EgressProcessorTest,BatchProcessResultTestKey Changes
eventmesh-runtime-v2/(entire module)eventmesh-runtime/.../core/protocol/IngressProcessor.javaeventmesh-runtime/.../core/protocol/EgressProcessor.javaeventmesh-runtime/.../core/protocol/BatchProcessResult.javaeventmesh-runtime/.../boot/RouterEngine.javaeventmesh-runtime/.../boot/EventMeshConnectorBootstrap.javaTest Plan
./gradlew :eventmesh-runtime:testpasses