diff --git a/pkg/capabilities/base_trigger.go b/pkg/capabilities/base_trigger.go new file mode 100644 index 0000000000..fd8a6a595f --- /dev/null +++ b/pkg/capabilities/base_trigger.go @@ -0,0 +1,152 @@ +package capabilities + +import ( + "context" + log "github.com/smartcontractkit/chainlink-common/pkg/logger" + "sync" + "time" +) + +type PendingEvent struct { + TriggerId string + WorkflowId string + EventId string + Payload []byte + FirstAt time.Time + LastSentAt time.Time + Attempts int +} + +type EventStore interface { + Insert(ctx context.Context, rec PendingEvent) error + Delete(ctx context.Context, triggerId, workflowId, eventId string) error + List(ctx context.Context) ([]PendingEvent, error) +} + +type OutboundSend func(ctx context.Context, ev TriggerEvent, workflowId string) error +type LostHook func(ctx context.Context, rec PendingEvent) + +// TODO Implement BaseTriggerCapability - CRE-1523 +type BaseTriggerCapability struct { + /* + Keeps track of workflow registrations (similar to LLO streams trigger). + Handles retransmits based on T_retransmit and T_max. + Persists pending events in the DB to be resilient to node restarts. + */ + tRetransmit time.Duration // time window for an event being ACKd before we retransmit + tMax time.Duration // timeout before events are considered lost if not ACKd + + store EventStore + send OutboundSend + lost LostHook + lggr *log.Logger + + mu sync.Mutex + pending map[string]*PendingEvent // key(triggerID|workflowID|eventID) + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func (b *BaseTriggerCapability) Start(ctx context.Context) error { + b.ctx, b.cancel = context.WithCancel(ctx) + + recs, err := b.store.List(ctx) + if err != nil { + return err + } + + // Initialize in-memory persistence + b.pending = make(map[string]*PendingEvent) + for i := range recs { + r := recs[i] + b.pending[key(r.TriggerId, r.WorkflowId, r.EventId)] = &r + } + + b.wg.Add(1) + go func() { + defer b.wg.Done() + b.retransmitLoop() + }() + + for _, r := range recs { + _ = b.trySend(ctx, r.TriggerId, r.WorkflowId, r.EventId) + } + return nil +} + +func (b *BaseTriggerCapability) deliverEvent( + ctx context.Context, + ev TriggerEvent, + workflowIds []string, +) error { + /* + Base Trigger Capability can interact with the Don2Don layer (in the remote capability setting) + as well as directly with a consumer (in the local setting). + */ + now := time.Now() + + for _, workflowId := range workflowIds { + rec := PendingEvent{ + TriggerId: ev.TriggerId, + WorkflowId: workflowId, + EventId: ev.EventId, + Payload: ev.Payload, + FirstAt: now, + } + + if err := b.store.Insert(ctx, rec); err != nil { + return err + } + + b.mu.Lock() + b.pending[key(ev.TriggerId, workflowId, ev.EventId)] = &rec + b.mu.Unlock() + + _ = b.trySend(ctx, ev.TriggerId, workflowId, ev.EventId) + } + return nil // only when the event is successfully persisted and ready to be relaibly delivered +} + +func (b *BaseTriggerCapability) AckEvent( + ctx context.Context, + triggerId, workflowId, eventId string, +) error { + k := key(triggerId, workflowId, eventId) // NOTE: WorkflowID we want to start ;P + + b.mu.Lock() + delete(b.pending, k) + b.mu.Unlock() + + return b.store.Delete(ctx, triggerId, workflowId, eventId) +} + +func (b *BaseTriggerCapability) retransmitLoop() { + ticker := time.NewTicker(b.tRetransmit / 2) + defer ticker.Stop() + + for { + select { + case <-b.ctx.Done(): + return + case <-ticker.C: + b.scanPending() + } + } +} + +func (b *BaseTriggerCapability) scanPending() { + now := time.Now() + + for _, rec := range b.pending { + if now.Sub(rec.FirstAt) >= b.tMax { + _ = b.AckEvent(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId) + b.lost(b.ctx, *rec) + continue + } + if rec.LastSentAt.IsZero() || now.Sub(rec.LastSentAt) >= b.tRetransmit { + _ = b.trySend(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId) + } + } +} diff --git a/pkg/capabilities/capabilities.go b/pkg/capabilities/capabilities.go index ce5074678d..3ec8afdced 100644 --- a/pkg/capabilities/capabilities.go +++ b/pkg/capabilities/capabilities.go @@ -338,6 +338,7 @@ type OCRAttributedOnchainSignature struct { type TriggerExecutable interface { RegisterTrigger(ctx context.Context, request TriggerRegistrationRequest) (<-chan TriggerResponse, error) UnregisterTrigger(ctx context.Context, request TriggerRegistrationRequest) error + AckEvent(ctx context.Context, eventId string) error } // TriggerCapability interface needs to be implemented by all trigger capabilities. @@ -551,6 +552,7 @@ func MustNewRemoteCapabilityInfo( const ( DefaultRegistrationRefresh = 30 * time.Second DefaultRegistrationExpiry = 2 * time.Minute + DefaultEventTimeout = 2 * time.Minute // TODO: determine best value DefaultMessageExpiry = 2 * time.Minute DefaultBatchSize = 100 DefaultBatchCollectionPeriod = 100 * time.Millisecond @@ -561,6 +563,7 @@ const ( type RemoteTriggerConfig struct { RegistrationRefresh time.Duration RegistrationExpiry time.Duration + EventTimeout time.Duration MinResponsesToAggregate uint32 MessageExpiry time.Duration MaxBatchSize uint32 @@ -594,6 +597,9 @@ func (c *RemoteTriggerConfig) ApplyDefaults() { if c.RegistrationExpiry == 0 { c.RegistrationExpiry = DefaultRegistrationExpiry } + if c.EventTimeout == 0 { + c.EventTimeout = DefaultEventTimeout + } if c.MessageExpiry == 0 { c.MessageExpiry = DefaultMessageExpiry } diff --git a/pkg/capabilities/pb/registry.pb.go b/pkg/capabilities/pb/registry.pb.go index d6b2eef92c..b75df4c8d1 100644 --- a/pkg/capabilities/pb/registry.pb.go +++ b/pkg/capabilities/pb/registry.pb.go @@ -165,10 +165,11 @@ type RemoteTriggerConfig struct { state protoimpl.MessageState `protogen:"open.v1"` RegistrationRefresh *durationpb.Duration `protobuf:"bytes,1,opt,name=registrationRefresh,proto3" json:"registrationRefresh,omitempty"` RegistrationExpiry *durationpb.Duration `protobuf:"bytes,2,opt,name=registrationExpiry,proto3" json:"registrationExpiry,omitempty"` - MinResponsesToAggregate uint32 `protobuf:"varint,3,opt,name=minResponsesToAggregate,proto3" json:"minResponsesToAggregate,omitempty"` - MessageExpiry *durationpb.Duration `protobuf:"bytes,4,opt,name=messageExpiry,proto3" json:"messageExpiry,omitempty"` - MaxBatchSize uint32 `protobuf:"varint,5,opt,name=maxBatchSize,proto3" json:"maxBatchSize,omitempty"` - BatchCollectionPeriod *durationpb.Duration `protobuf:"bytes,6,opt,name=batchCollectionPeriod,proto3" json:"batchCollectionPeriod,omitempty"` + EventTimeout *durationpb.Duration `protobuf:"bytes,3,opt,name=eventTimeout,proto3" json:"eventTimeout,omitempty"` + MinResponsesToAggregate uint32 `protobuf:"varint,4,opt,name=minResponsesToAggregate,proto3" json:"minResponsesToAggregate,omitempty"` + MessageExpiry *durationpb.Duration `protobuf:"bytes,5,opt,name=messageExpiry,proto3" json:"messageExpiry,omitempty"` + MaxBatchSize uint32 `protobuf:"varint,6,opt,name=maxBatchSize,proto3" json:"maxBatchSize,omitempty"` + BatchCollectionPeriod *durationpb.Duration `protobuf:"bytes,7,opt,name=batchCollectionPeriod,proto3" json:"batchCollectionPeriod,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -217,6 +218,13 @@ func (x *RemoteTriggerConfig) GetRegistrationExpiry() *durationpb.Duration { return nil } +func (x *RemoteTriggerConfig) GetEventTimeout() *durationpb.Duration { + if x != nil { + return x.EventTimeout + } + return nil +} + func (x *RemoteTriggerConfig) GetMinResponsesToAggregate() uint32 { if x != nil { return x.MinResponsesToAggregate @@ -658,14 +666,15 @@ var File_registry_proto protoreflect.FileDescriptor const file_registry_proto_rawDesc = "" + "\n" + - "\x0eregistry.proto\x12\x04loop\x1a\x16values/v1/values.proto\x1a\x1egoogle/protobuf/duration.proto\"\x9d\x03\n" + + "\x0eregistry.proto\x12\x04loop\x1a\x16values/v1/values.proto\x1a\x1egoogle/protobuf/duration.proto\"\xdc\x03\n" + "\x13RemoteTriggerConfig\x12K\n" + "\x13registrationRefresh\x18\x01 \x01(\v2\x19.google.protobuf.DurationR\x13registrationRefresh\x12I\n" + - "\x12registrationExpiry\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\x12registrationExpiry\x128\n" + - "\x17minResponsesToAggregate\x18\x03 \x01(\rR\x17minResponsesToAggregate\x12?\n" + - "\rmessageExpiry\x18\x04 \x01(\v2\x19.google.protobuf.DurationR\rmessageExpiry\x12\"\n" + - "\fmaxBatchSize\x18\x05 \x01(\rR\fmaxBatchSize\x12O\n" + - "\x15batchCollectionPeriod\x18\x06 \x01(\v2\x19.google.protobuf.DurationR\x15batchCollectionPeriod\"Z\n" + + "\x12registrationExpiry\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\x12registrationExpiry\x12=\n" + + "\feventTimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\feventTimeout\x128\n" + + "\x17minResponsesToAggregate\x18\x04 \x01(\rR\x17minResponsesToAggregate\x12?\n" + + "\rmessageExpiry\x18\x05 \x01(\v2\x19.google.protobuf.DurationR\rmessageExpiry\x12\"\n" + + "\fmaxBatchSize\x18\x06 \x01(\rR\fmaxBatchSize\x12O\n" + + "\x15batchCollectionPeriod\x18\a \x01(\v2\x19.google.protobuf.DurationR\x15batchCollectionPeriod\"Z\n" + "\x12RemoteTargetConfig\x12D\n" + "\x1drequestHashExcludedAttributes\x18\x01 \x03(\tR\x1drequestHashExcludedAttributes\"\xc5\x03\n" + "\x16RemoteExecutableConfig\x12D\n" + @@ -740,28 +749,29 @@ var file_registry_proto_goTypes = []any{ var file_registry_proto_depIdxs = []int32{ 10, // 0: loop.RemoteTriggerConfig.registrationRefresh:type_name -> google.protobuf.Duration 10, // 1: loop.RemoteTriggerConfig.registrationExpiry:type_name -> google.protobuf.Duration - 10, // 2: loop.RemoteTriggerConfig.messageExpiry:type_name -> google.protobuf.Duration - 10, // 3: loop.RemoteTriggerConfig.batchCollectionPeriod:type_name -> google.protobuf.Duration - 0, // 4: loop.RemoteExecutableConfig.transmission_schedule:type_name -> loop.TransmissionSchedule - 10, // 5: loop.RemoteExecutableConfig.delta_stage:type_name -> google.protobuf.Duration - 10, // 6: loop.RemoteExecutableConfig.request_timeout:type_name -> google.protobuf.Duration - 1, // 7: loop.RemoteExecutableConfig.request_hasher_type:type_name -> loop.RequestHasherType - 2, // 8: loop.AggregatorConfig.aggregator_type:type_name -> loop.AggregatorType - 3, // 9: loop.CapabilityMethodConfig.remote_trigger_config:type_name -> loop.RemoteTriggerConfig - 5, // 10: loop.CapabilityMethodConfig.remote_executable_config:type_name -> loop.RemoteExecutableConfig - 6, // 11: loop.CapabilityMethodConfig.aggregator_config:type_name -> loop.AggregatorConfig - 11, // 12: loop.CapabilityConfig.default_config:type_name -> values.v1.Map - 3, // 13: loop.CapabilityConfig.remote_trigger_config:type_name -> loop.RemoteTriggerConfig - 4, // 14: loop.CapabilityConfig.remote_target_config:type_name -> loop.RemoteTargetConfig - 5, // 15: loop.CapabilityConfig.remote_executable_config:type_name -> loop.RemoteExecutableConfig - 11, // 16: loop.CapabilityConfig.restricted_config:type_name -> values.v1.Map - 9, // 17: loop.CapabilityConfig.method_configs:type_name -> loop.CapabilityConfig.MethodConfigsEntry - 7, // 18: loop.CapabilityConfig.MethodConfigsEntry.value:type_name -> loop.CapabilityMethodConfig - 19, // [19:19] is the sub-list for method output_type - 19, // [19:19] is the sub-list for method input_type - 19, // [19:19] is the sub-list for extension type_name - 19, // [19:19] is the sub-list for extension extendee - 0, // [0:19] is the sub-list for field type_name + 10, // 2: loop.RemoteTriggerConfig.eventTimeout:type_name -> google.protobuf.Duration + 10, // 3: loop.RemoteTriggerConfig.messageExpiry:type_name -> google.protobuf.Duration + 10, // 4: loop.RemoteTriggerConfig.batchCollectionPeriod:type_name -> google.protobuf.Duration + 0, // 5: loop.RemoteExecutableConfig.transmission_schedule:type_name -> loop.TransmissionSchedule + 10, // 6: loop.RemoteExecutableConfig.delta_stage:type_name -> google.protobuf.Duration + 10, // 7: loop.RemoteExecutableConfig.request_timeout:type_name -> google.protobuf.Duration + 1, // 8: loop.RemoteExecutableConfig.request_hasher_type:type_name -> loop.RequestHasherType + 2, // 9: loop.AggregatorConfig.aggregator_type:type_name -> loop.AggregatorType + 3, // 10: loop.CapabilityMethodConfig.remote_trigger_config:type_name -> loop.RemoteTriggerConfig + 5, // 11: loop.CapabilityMethodConfig.remote_executable_config:type_name -> loop.RemoteExecutableConfig + 6, // 12: loop.CapabilityMethodConfig.aggregator_config:type_name -> loop.AggregatorConfig + 11, // 13: loop.CapabilityConfig.default_config:type_name -> values.v1.Map + 3, // 14: loop.CapabilityConfig.remote_trigger_config:type_name -> loop.RemoteTriggerConfig + 4, // 15: loop.CapabilityConfig.remote_target_config:type_name -> loop.RemoteTargetConfig + 5, // 16: loop.CapabilityConfig.remote_executable_config:type_name -> loop.RemoteExecutableConfig + 11, // 17: loop.CapabilityConfig.restricted_config:type_name -> values.v1.Map + 9, // 18: loop.CapabilityConfig.method_configs:type_name -> loop.CapabilityConfig.MethodConfigsEntry + 7, // 19: loop.CapabilityConfig.MethodConfigsEntry.value:type_name -> loop.CapabilityMethodConfig + 20, // [20:20] is the sub-list for method output_type + 20, // [20:20] is the sub-list for method input_type + 20, // [20:20] is the sub-list for extension type_name + 20, // [20:20] is the sub-list for extension extendee + 0, // [0:20] is the sub-list for field type_name } func init() { file_registry_proto_init() } diff --git a/pkg/capabilities/pb/registry.proto b/pkg/capabilities/pb/registry.proto index d4adbd2c39..3f46be5908 100644 --- a/pkg/capabilities/pb/registry.proto +++ b/pkg/capabilities/pb/registry.proto @@ -10,10 +10,11 @@ import "google/protobuf/duration.proto"; message RemoteTriggerConfig { google.protobuf.Duration registrationRefresh = 1; google.protobuf.Duration registrationExpiry = 2; - uint32 minResponsesToAggregate = 3; - google.protobuf.Duration messageExpiry = 4; - uint32 maxBatchSize = 5; - google.protobuf.Duration batchCollectionPeriod = 6; + google.protobuf.Duration eventTimeout =3; + uint32 minResponsesToAggregate = 4; + google.protobuf.Duration messageExpiry = 5; + uint32 maxBatchSize = 6; + google.protobuf.Duration batchCollectionPeriod = 7; } // deprecated - v1 only diff --git a/pkg/capabilities/registry/base.go b/pkg/capabilities/registry/base.go index 39b864186b..e344957940 100644 --- a/pkg/capabilities/registry/base.go +++ b/pkg/capabilities/registry/base.go @@ -244,6 +244,14 @@ func (a *atomicTriggerCapability) GetState() connectivity.State { return connectivity.State(-1) // unknown } +func (a *atomicTriggerCapability) AckEvent(ctx context.Context, eventId string) error { + c := a.Load() + if c == nil { + return errors.New("capability unavailable") + } + return (*c).AckEvent(ctx, eventId) +} + func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { c := a.Load() if c == nil { @@ -360,6 +368,14 @@ func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State { return connectivity.State(-1) // unknown } +func (a *atomicExecuteAndTriggerCapability) AckEvent(ctx context.Context, eventId string) error { + c := a.Load() + if c == nil { + return errors.New("capability unavailable") + } + return (*c).AckEvent(ctx, eventId) +} + func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { c := a.Load() if c == nil { diff --git a/pkg/capabilities/triggers/mercury_trigger.go b/pkg/capabilities/triggers/mercury_trigger.go index 0853482bd8..68643c3f53 100644 --- a/pkg/capabilities/triggers/mercury_trigger.go +++ b/pkg/capabilities/triggers/mercury_trigger.go @@ -91,6 +91,11 @@ func (o *MercuryTriggerService) ProcessReport(reports []datastreams.FeedReport) return nil } +func (o *MercuryTriggerService) AckEvent(ctx context.Context, eventId string) error { + // TODO Implement? + return nil +} + func (o *MercuryTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { wid := req.Metadata.WorkflowID diff --git a/pkg/capabilities/triggers/on_demand_trigger.go b/pkg/capabilities/triggers/on_demand_trigger.go index b3ffd04b95..3e91bd6976 100644 --- a/pkg/capabilities/triggers/on_demand_trigger.go +++ b/pkg/capabilities/triggers/on_demand_trigger.go @@ -72,6 +72,11 @@ func (o *OnDemand) SendEvent(ctx context.Context, wid string, event capabilities return nil } +func (o *OnDemand) AckEvent(ctx context.Context, eventId string) error { + //TODO Implement? + return nil +} + func (o *OnDemand) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { wid := req.Metadata.WorkflowID o.mu.Lock() diff --git a/pkg/capabilities/v2/actions/confidentialhttp/server/client_server_gen.go b/pkg/capabilities/v2/actions/confidentialhttp/server/client_server_gen.go index be42c71b12..51e6db9507 100644 --- a/pkg/capabilities/v2/actions/confidentialhttp/server/client_server_gen.go +++ b/pkg/capabilities/v2/actions/confidentialhttp/server/client_server_gen.go @@ -107,6 +107,10 @@ func (c *clientCapability) UnregisterTrigger(ctx context.Context, request capabi return fmt.Errorf("trigger %s not found", request.Method) } +func (c *clientCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *clientCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/actions/http/server/client_server_gen.go b/pkg/capabilities/v2/actions/http/server/client_server_gen.go index 5eca8955ec..16d17c8bbb 100644 --- a/pkg/capabilities/v2/actions/http/server/client_server_gen.go +++ b/pkg/capabilities/v2/actions/http/server/client_server_gen.go @@ -107,6 +107,10 @@ func (c *clientCapability) UnregisterTrigger(ctx context.Context, request capabi return fmt.Errorf("trigger %s not found", request.Method) } +func (c *clientCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *clientCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/chain-capabilities/evm/server/client_server_gen.go b/pkg/capabilities/v2/chain-capabilities/evm/server/client_server_gen.go index c44e1c707b..06a974ef18 100644 --- a/pkg/capabilities/v2/chain-capabilities/evm/server/client_server_gen.go +++ b/pkg/capabilities/v2/chain-capabilities/evm/server/client_server_gen.go @@ -36,6 +36,7 @@ type ClientCapability interface { RegisterLogTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *evm.FilterLogTriggerRequest) (<-chan capabilities.TriggerAndId[*evm.Log], error) UnregisterLogTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *evm.FilterLogTriggerRequest) error + AckEvent(ctx context.Context, triggerId string, eventId string) error WriteReport(ctx context.Context, metadata capabilities.RequestMetadata, input *evm.WriteReportRequest) (*capabilities.ResponseAndMetadata[*evm.WriteReportReply], caperrors.Error) @@ -143,6 +144,10 @@ func (c *clientCapability) UnregisterTrigger(ctx context.Context, request capabi } } +func (c *clientCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return c.ClientCapability.AckEvent(ctx, triggerId, eventId) +} + func (c *clientCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/chain-capabilities/solana/server/client_server_gen.go b/pkg/capabilities/v2/chain-capabilities/solana/server/client_server_gen.go index 086ef5058e..920992a5cd 100644 --- a/pkg/capabilities/v2/chain-capabilities/solana/server/client_server_gen.go +++ b/pkg/capabilities/v2/chain-capabilities/solana/server/client_server_gen.go @@ -38,6 +38,7 @@ type ClientCapability interface { RegisterLogTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *solana.FilterLogTriggerRequest) (<-chan capabilities.TriggerAndId[*solana.Log], error) UnregisterLogTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *solana.FilterLogTriggerRequest) error + AckEvent(ctx context.Context, triggerId string, eventId string) error WriteReport(ctx context.Context, metadata capabilities.RequestMetadata, input *solana.WriteReportRequest) (*capabilities.ResponseAndMetadata[*solana.WriteReportReply], caperrors.Error) @@ -145,6 +146,10 @@ func (c *clientCapability) UnregisterTrigger(ctx context.Context, request capabi } } +func (c *clientCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return c.ClientCapability.AckEvent(ctx, triggerId, eventId) +} + func (c *clientCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/consensus/server/consensus_server_gen.go b/pkg/capabilities/v2/consensus/server/consensus_server_gen.go index 8fd928cf46..3c253ed1b3 100644 --- a/pkg/capabilities/v2/consensus/server/consensus_server_gen.go +++ b/pkg/capabilities/v2/consensus/server/consensus_server_gen.go @@ -110,6 +110,10 @@ func (c *consensusCapability) UnregisterTrigger(ctx context.Context, request cap return fmt.Errorf("trigger %s not found", request.Method) } +func (c *consensusCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *consensusCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/templates/server.go.tmpl b/pkg/capabilities/v2/protoc/pkg/templates/server.go.tmpl index 6b1c6ba9f1..0208950c5f 100644 --- a/pkg/capabilities/v2/protoc/pkg/templates/server.go.tmpl +++ b/pkg/capabilities/v2/protoc/pkg/templates/server.go.tmpl @@ -38,6 +38,7 @@ type {{.GoName}}Capability interface { {{ $hasTriggers = true }} Register{{.GoName}}(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *{{ImportAlias .Input.GoIdent.GoImportPath}}.{{.Input.GoIdent.GoName}}) (<- chan capabilities.TriggerAndId[*{{ImportAlias .Output.GoIdent.GoImportPath}}.{{.Output.GoIdent.GoName}}], error) Unregister{{.GoName}}(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *{{ImportAlias .Input.GoIdent.GoImportPath}}.{{.Input.GoIdent.GoName}}) error + AckEvent(ctx context.Context, triggerId string, eventId string) error {{- else }} {{ $hasActions = true }} {{.GoName}}(ctx context.Context, metadata capabilities.RequestMetadata, input *{{ImportAlias .Input.GoIdent.GoImportPath}}.{{.Input.GoIdent.GoName}} {{if ne "emptypb.Empty" (ConfigType $service)}}, {{(ConfigType $service)}}{{ end }}) (*capabilities.ResponseAndMetadata[*{{ImportAlias .Output.GoIdent.GoImportPath}}.{{.Output.GoIdent.GoName}}], caperrors.Error) @@ -169,6 +170,14 @@ func (c *{{.GoName|LowerFirst}}Capability) UnregisterTrigger(ctx context.Context {{- end }} } +func (c *{{.GoName|LowerFirst}}Capability) AckEvent(ctx context.Context, triggerId string, eventId string) error { +{{- if $hasTriggers }} + return c.{{.GoName}}Capability.AckEvent(ctx, triggerId, eventId) +{{- else }} + return fmt.Errorf("trigger %s not found", triggerId) +{{- end }} +} + func (c *{{.GoName|LowerFirst}}Capability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/test_capabilities/actionandtrigger/server/action_and_trigger_server_gen.go b/pkg/capabilities/v2/protoc/pkg/test_capabilities/actionandtrigger/server/action_and_trigger_server_gen.go index 7ea875038c..a17bb40127 100644 --- a/pkg/capabilities/v2/protoc/pkg/test_capabilities/actionandtrigger/server/action_and_trigger_server_gen.go +++ b/pkg/capabilities/v2/protoc/pkg/test_capabilities/actionandtrigger/server/action_and_trigger_server_gen.go @@ -23,6 +23,7 @@ type BasicCapability interface { RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *actionandtrigger.Config) (<-chan capabilities.TriggerAndId[*actionandtrigger.TriggerEvent], error) UnregisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *actionandtrigger.Config) error + AckEvent(ctx context.Context, triggerId string, eventId string) error Start(ctx context.Context) error Close() error @@ -126,6 +127,10 @@ func (c *basicCapability) UnregisterTrigger(ctx context.Context, request capabil } } +func (c *basicCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return c.BasicCapability.AckEvent(ctx, triggerId, eventId) +} + func (c *basicCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/test_capabilities/basicaction/server/basic_action_server_gen.go b/pkg/capabilities/v2/protoc/pkg/test_capabilities/basicaction/server/basic_action_server_gen.go index 1984f13d5c..77f149d4d2 100644 --- a/pkg/capabilities/v2/protoc/pkg/test_capabilities/basicaction/server/basic_action_server_gen.go +++ b/pkg/capabilities/v2/protoc/pkg/test_capabilities/basicaction/server/basic_action_server_gen.go @@ -107,6 +107,10 @@ func (c *basicActionCapability) UnregisterTrigger(ctx context.Context, request c return fmt.Errorf("trigger %s not found", request.Method) } +func (c *basicActionCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *basicActionCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/test_capabilities/basictrigger/server/basic_trigger_server_gen.go b/pkg/capabilities/v2/protoc/pkg/test_capabilities/basictrigger/server/basic_trigger_server_gen.go index 74545198f5..a54bfd0006 100644 --- a/pkg/capabilities/v2/protoc/pkg/test_capabilities/basictrigger/server/basic_trigger_server_gen.go +++ b/pkg/capabilities/v2/protoc/pkg/test_capabilities/basictrigger/server/basic_trigger_server_gen.go @@ -20,6 +20,7 @@ var _ = emptypb.Empty{} type BasicCapability interface { RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *basictrigger.Config) (<-chan capabilities.TriggerAndId[*basictrigger.Outputs], error) UnregisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *basictrigger.Config) error + AckEvent(ctx context.Context, triggerId string, eventId string) error Start(ctx context.Context) error Close() error @@ -123,6 +124,10 @@ func (c *basicCapability) UnregisterTrigger(ctx context.Context, request capabil } } +func (c *basicCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return c.BasicCapability.AckEvent(ctx, triggerId, eventId) +} + func (c *basicCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/test_capabilities/consensus/server/consensus_server_gen.go b/pkg/capabilities/v2/protoc/pkg/test_capabilities/consensus/server/consensus_server_gen.go index 8fd928cf46..3c253ed1b3 100644 --- a/pkg/capabilities/v2/protoc/pkg/test_capabilities/consensus/server/consensus_server_gen.go +++ b/pkg/capabilities/v2/protoc/pkg/test_capabilities/consensus/server/consensus_server_gen.go @@ -110,6 +110,10 @@ func (c *consensusCapability) UnregisterTrigger(ctx context.Context, request cap return fmt.Errorf("trigger %s not found", request.Method) } +func (c *consensusCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *consensusCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/test_capabilities/importclash/server/clash_server_gen.go b/pkg/capabilities/v2/protoc/pkg/test_capabilities/importclash/server/clash_server_gen.go index d1076bcc7f..b953cbe6b2 100644 --- a/pkg/capabilities/v2/protoc/pkg/test_capabilities/importclash/server/clash_server_gen.go +++ b/pkg/capabilities/v2/protoc/pkg/test_capabilities/importclash/server/clash_server_gen.go @@ -108,6 +108,10 @@ func (c *basicActionCapability) UnregisterTrigger(ctx context.Context, request c return fmt.Errorf("trigger %s not found", request.Method) } +func (c *basicActionCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *basicActionCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/protoc/pkg/test_capabilities/nodeaction/server/node_action_server_gen.go b/pkg/capabilities/v2/protoc/pkg/test_capabilities/nodeaction/server/node_action_server_gen.go index d90fcb8273..aafa70992d 100644 --- a/pkg/capabilities/v2/protoc/pkg/test_capabilities/nodeaction/server/node_action_server_gen.go +++ b/pkg/capabilities/v2/protoc/pkg/test_capabilities/nodeaction/server/node_action_server_gen.go @@ -107,6 +107,10 @@ func (c *basicActionCapability) UnregisterTrigger(ctx context.Context, request c return fmt.Errorf("trigger %s not found", request.Method) } +func (c *basicActionCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return fmt.Errorf("trigger %s not found", triggerId) +} + func (c *basicActionCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/triggers/cron/server/trigger_server_gen.go b/pkg/capabilities/v2/triggers/cron/server/trigger_server_gen.go index a74d49b8c2..5178f85804 100644 --- a/pkg/capabilities/v2/triggers/cron/server/trigger_server_gen.go +++ b/pkg/capabilities/v2/triggers/cron/server/trigger_server_gen.go @@ -20,9 +20,11 @@ var _ = emptypb.Empty{} type CronCapability interface { RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *cron.Config) (<-chan capabilities.TriggerAndId[*cron.Payload], error) UnregisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *cron.Config) error + AckEvent(ctx context.Context, triggerId string, eventId string) error RegisterLegacyTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *cron.Config) (<-chan capabilities.TriggerAndId[*cron.LegacyPayload], error) UnregisterLegacyTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *cron.Config) error + AckEvent(ctx context.Context, triggerId string, eventId string) error Start(ctx context.Context) error Close() error @@ -136,6 +138,10 @@ func (c *cronCapability) UnregisterTrigger(ctx context.Context, request capabili } } +func (c *cronCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return c.CronCapability.AckEvent(ctx, triggerId, eventId) +} + func (c *cronCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/capabilities/v2/triggers/http/server/trigger_server_gen.go b/pkg/capabilities/v2/triggers/http/server/trigger_server_gen.go index 0d6fd2dd40..b59f21f006 100644 --- a/pkg/capabilities/v2/triggers/http/server/trigger_server_gen.go +++ b/pkg/capabilities/v2/triggers/http/server/trigger_server_gen.go @@ -20,6 +20,7 @@ var _ = emptypb.Empty{} type HTTPCapability interface { RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *http.Config) (<-chan capabilities.TriggerAndId[*http.Payload], error) UnregisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *http.Config) error + AckEvent(ctx context.Context, triggerId string, eventId string) error Start(ctx context.Context) error Close() error @@ -123,6 +124,10 @@ func (c *hTTPCapability) UnregisterTrigger(ctx context.Context, request capabili } } +func (c *hTTPCapability) AckEvent(ctx context.Context, triggerId string, eventId string) error { + return c.HTTPCapability.AckEvent(ctx, triggerId, eventId) +} + func (c *hTTPCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { return nil } diff --git a/pkg/loop/internal/core/services/capability/capabilities.go b/pkg/loop/internal/core/services/capability/capabilities.go index 92637e529f..8884a90567 100644 --- a/pkg/loop/internal/core/services/capability/capabilities.go +++ b/pkg/loop/internal/core/services/capability/capabilities.go @@ -287,6 +287,11 @@ type triggerExecutableClient struct { cancelFuncs map[string]func() } +func (t *triggerExecutableClient) AckEvent(ctx context.Context, eventId string) error { + //TODO implement + return nil +} + func (t *triggerExecutableClient) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { ch, cancel, err := t.registerTrigger(ctx, req) if err != nil { diff --git a/pkg/loop/internal/core/services/capability/capabilities_registry.go b/pkg/loop/internal/core/services/capability/capabilities_registry.go index 8c9cee12ba..21164408f0 100644 --- a/pkg/loop/internal/core/services/capability/capabilities_registry.go +++ b/pkg/loop/internal/core/services/capability/capabilities_registry.go @@ -187,6 +187,7 @@ func decodeRemoteTriggerConfig(prtc *capabilitiespb.RemoteTriggerConfig) *capabi remoteTriggerConfig := &capabilities.RemoteTriggerConfig{} remoteTriggerConfig.RegistrationRefresh = prtc.RegistrationRefresh.AsDuration() remoteTriggerConfig.RegistrationExpiry = prtc.RegistrationExpiry.AsDuration() + remoteTriggerConfig.EventTimeout = prtc.EventTimeout.AsDuration() remoteTriggerConfig.MinResponsesToAggregate = prtc.MinResponsesToAggregate remoteTriggerConfig.MessageExpiry = prtc.MessageExpiry.AsDuration() remoteTriggerConfig.MaxBatchSize = prtc.MaxBatchSize