Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions core/capabilities/mocks/executable_and_trigger_capability.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions core/capabilities/mocks/trigger_capability.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions core/capabilities/mocks/trigger_executable.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions core/capabilities/remote/combined_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func (c *combinedClient) Info(ctx context.Context) (capabilities.CapabilityInfo,
return c.info, nil
}

func (c *combinedClient) AckEvent(ctx context.Context, eventId string) error {
// TODO: Do we need triggerID to match the triggerSubscriber? And then call AckEvent on that?
return nil
}

func (c *combinedClient) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
c.mu.RLock()
subscriber, ok := c.triggerSubscribers[request.Method]
Expand Down
47 changes: 46 additions & 1 deletion core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ type triggerPublisher struct {

messageCache *messagecache.MessageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
mu sync.RWMutex // protects messageCache and registrations
ackCache *messagecache.MessageCache[ackKey, p2ptypes.PeerID]
mu sync.RWMutex // protects messageCache, ackCache, and registrations

batchingQueue map[[32]byte]*batchedResponse
bqMu sync.Mutex // protects batchingQueue
stopCh services.StopChan
Expand All @@ -57,6 +59,11 @@ type registrationKey struct {
workflowID string
}

type ackKey struct {
callerDonID uint32
triggerEventID string
}

type pubRegState struct {
callback <-chan commoncap.TriggerResponse
request commoncap.TriggerRegistrationRequest
Expand Down Expand Up @@ -86,6 +93,7 @@ func NewTriggerPublisher(capabilityID string, capMethodName string, dispatcher t
capMethodName: capMethodName,
dispatcher: dispatcher,
messageCache: messagecache.NewMessageCache[registrationKey, p2ptypes.PeerID](),
ackCache: messagecache.NewMessageCache[ackKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
batchingQueue: make(map[[32]byte]*batchedResponse),
stopCh: make(services.StopChan),
Expand Down Expand Up @@ -245,6 +253,43 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
case types.MethodTriggerEvent:
p.lggr.Errorw("trigger request failed with error",
"method", SanitizeLogString(msg.Method), "sender", sender, "errorMsg", SanitizeLogString(msg.ErrorMsg))
case types.MethodTriggerEventAck:
triggerMetadata := msg.GetTriggerEventMetadata()
if triggerMetadata == nil {
p.lggr.Errorw("recieved empty trigger event ack metadata", "sender", sender)
break
}
triggerEventID := triggerMetadata.TriggerEventId
p.lggr.Debugw("received trigger event ACK", "sender", sender, "trigger event ID", triggerEventID)

p.mu.Lock()
defer p.mu.Unlock()
callerDon, ok := cfg.workflowDONs[msg.CallerDonId]
if !ok {
p.lggr.Errorw("received a message from unsupported workflow DON", "callerDonId", msg.CallerDonId)
return
}
if !cfg.membersCache[msg.CallerDonId][sender] {
p.lggr.Errorw("sender not a member of its workflow DON", "callerDonId", msg.CallerDonId, "sender", sender)
return
}

key := ackKey{msg.CallerDonId, triggerEventID}
nowMs := time.Now().UnixMilli()
p.ackCache.Insert(key, sender, nowMs, msg.Payload) // TODO: Payload is empty..
minRequired := uint32(2*callerDon.F + 1)
ready, _ := p.ackCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.EventTimeout.Milliseconds(), false)
if !ready {
p.lggr.Debugw("not ready to ACK trigger event yet", "triggerEventId", triggerEventID, "minRequired", minRequired)
return
}

ctx, cancel := p.stopCh.NewCtx()
defer cancel()
err = cfg.underlying.AckEvent(ctx, triggerEventID)
if err != nil {
p.lggr.Errorw("failed to AckEvent on underlying trigger capability", "err", err)
}
default:
p.lggr.Errorw("received message with unknown method",
"method", SanitizeLogString(msg.Method), "sender", sender)
Expand Down
33 changes: 33 additions & 0 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ func TestTriggerPublisher_ReceiveTriggerEvents_BatchingEnabled(t *testing.T) {
require.NoError(t, publisher.Close())
}

func TestTriggerPublisher_RecieveTriggerEventAcks(t *testing.T) {
ctx := testutils.Context(t)
capabilityDONID, workflowDONID := uint32(1), uint32(2)
underlyingTriggerCap, publisher, _, peers := newServices(t, capabilityDONID, workflowDONID, 2)
eventId := "123"
regEvent := newAckEventMessage(t, eventId, workflowDONID, peers[1])
publisher.Receive(ctx, regEvent)

require.True(t, underlyingTriggerCap.eventAckd)
require.NoError(t, publisher.Close())

// TODO: Increase event ACK test coverage
}

func TestTriggerPublisher_SetConfig_Basic(t *testing.T) {
t.Parallel()
lggr := logger.Test(t)
Expand Down Expand Up @@ -266,10 +280,24 @@ func newRegisterTriggerMessage(t *testing.T, callerDonID uint32, sender p2ptypes
}
}

func newAckEventMessage(t *testing.T, eventId string, callerDonID uint32, sender p2ptypes.PeerID) *remotetypes.MessageBody {
return &remotetypes.MessageBody{
Sender: sender[:],
Method: remotetypes.MethodTriggerEventAck,
CallerDonId: callerDonID,
Metadata: &remotetypes.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &remotetypes.TriggerEventMetadata{
TriggerEventId: eventId,
},
},
}
}

type testTrigger struct {
info commoncap.CapabilityInfo
registrationsCh chan commoncap.TriggerRegistrationRequest
eventCh chan commoncap.TriggerResponse
eventAckd bool
}

func (tr *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error) {
Expand All @@ -284,3 +312,8 @@ func (tr *testTrigger) RegisterTrigger(_ context.Context, request commoncap.Trig
func (tr *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) error {
return nil
}

func (tr *testTrigger) AckEvent(_ context.Context, eventId string) error {
tr.eventAckd = true
return nil
}
25 changes: 25 additions & 0 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,31 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return cfg.capInfo, nil
}

func (s *triggerSubscriber) AckEvent(ctx context.Context, eventId string) error {
s.mu.RLock()
cfg := s.cfg.Load()
for _, peerID := range cfg.capDonInfo.Members {
m := &types.MessageBody{
CapabilityId: cfg.capInfo.ID,
CapabilityDonId: cfg.capDonInfo.ID,
CallerDonId: cfg.localDonID,
Method: types.MethodTriggerEventAck,
CapabilityMethod: s.capMethodName,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
TriggerEventId: eventId,
},
},
}
err := s.dispatcher.Send(peerID, m)
if err != nil {
s.lggr.Errorw("failed to send message", "donId", cfg.capDonInfo.ID, "peerId", peerID, "err", err)
}
}
s.mu.RUnlock()
return nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) {
rawRequest, err := pb.MarshalTriggerRegistrationRequest(request)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
MethodUnRegisterTrigger = "UnregisterTrigger"
MethodTriggerEvent = "TriggerEvent"
MethodExecute = "Execute"
MethodTriggerEventAck = "TriggerEventACK"
)

type Dispatcher interface {
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20251128020529-88d93b01d749
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251219154553-3688afcb0761
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251224190747-4a1ca5d561d6
github.com/smartcontractkit/chainlink-data-streams v0.1.9
github.com/smartcontractkit/chainlink-deployments-framework v0.70.0
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251210110629-10c56e8d2cec
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1634,8 +1634,8 @@ github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20251027185542-babb
github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20251027185542-babb09e5363e/go.mod h1:IaoLCQE1miX3iUlQNxOPcVrXrshcO/YsFpxnFuhG9DM=
github.com/smartcontractkit/chainlink-ccv v0.0.0-20251215155942-4daf59a09b5a h1:GIy+bmMlABnWEvAIe7Jiae2UHk/FIN7hRYavZdm6+f8=
github.com/smartcontractkit/chainlink-ccv v0.0.0-20251215155942-4daf59a09b5a/go.mod h1:2BhunX29Hx2aL31X7rSg5HT8nmmy5fvNsQaZ7wP3c/o=
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251219154553-3688afcb0761 h1:K5uuKFGylvfxWEvaNcXHdXXNAjwhwz9+6FwTTX7ppGs=
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251219154553-3688afcb0761/go.mod h1:ra9yvW8HbLgtXY0fHgnVdA5SjZ06v2/TNyTfPEJzsqo=
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251224190747-4a1ca5d561d6 h1:S04uSTz2vy8lJJXN3+h0PKievspLWt1CU25OALKG8p8=
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251224190747-4a1ca5d561d6/go.mod h1:ra9yvW8HbLgtXY0fHgnVdA5SjZ06v2/TNyTfPEJzsqo=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg=
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw=
Expand Down
5 changes: 5 additions & 0 deletions core/services/llo/cre/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func (t *transmitter) processNewEvent(ctx context.Context, event *capabilities.O
return nil
}

func (t *transmitter) AckEvent(ctx context.Context, eventId string) error {
// TODO
return nil
}

func (t *transmitter) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
Loading