File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -58,11 +58,16 @@ func NewServer(policyEngine *policy.Engine, providerMap map[string]providers.Com
5858
5959// Publish handles incoming events from clients
6060func (s * Server ) Publish (ctx context.Context , event * eventsv1.Event ) (* busv1.PublishResponse , error ) {
61- // Store the incoming event immediately
62- if err := s .storage .StoreEvent (ctx , event , nil ); err != nil {
63- log .Printf ("Failed to store incoming event: %v" , err )
64- // Continue processing even if storage fails
65- }
61+ // Store the incoming event immediately
62+ if err := s .storage .StoreEvent (ctx , event , nil ); err != nil {
63+ log .Printf ("Failed to store incoming event: %v" , err )
64+ // Continue processing even if storage fails
65+ }
66+
67+ // Broadcast the incoming event to all subscribers so external D-Apps can react
68+ // This enables event-driven providers to consume request events and publish
69+ // their own response/callback events with correlation_id.
70+ s .broadcastEvent (event )
6671
6772 // Start vectorization in background if providers are available
6873 // Only vectorize fact events
You can’t perform that action at this time.
0 commit comments