diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index f7013907..e3405413 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -168,28 +168,28 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca func toSdkEvent(e cascade.SupernodeEventType) event.EventType { switch e { case cascade.SupernodeEventType_ACTION_RETRIEVED: - return event.TaskProgressActionRetrievedBySupernode + return event.SupernodeActionRetrieved case cascade.SupernodeEventType_ACTION_FEE_VERIFIED: - return event.TaskProgressActionFeeValidated + return event.SupernodeActionFeeVerified case cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED: - return event.TaskProgressTopSupernodeCheckValidated + return event.SupernodeTopCheckPassed case cascade.SupernodeEventType_METADATA_DECODED: - return event.TaskProgressCascadeMetadataDecoded + return event.SupernodeMetadataDecoded case cascade.SupernodeEventType_DATA_HASH_VERIFIED: - return event.TaskProgressDataHashVerified + return event.SupernodeDataHashVerified case cascade.SupernodeEventType_INPUT_ENCODED: - return event.TaskProgressInputDataEncoded + return event.SupernodeInputEncoded case cascade.SupernodeEventType_SIGNATURE_VERIFIED: - return event.TaskProgressSignatureVerified + return event.SupernodeSignatureVerified case cascade.SupernodeEventType_RQID_GENERATED: - return event.TaskProgressRQIDFilesGenerated + return event.SupernodeRQIDGenerated case cascade.SupernodeEventType_RQID_VERIFIED: - return event.TaskProgressRQIDsVerified + return event.SupernodeRQIDVerified case cascade.SupernodeEventType_ARTEFACTS_STORED: - return event.TaskProgressArtefactsStored + return event.SupernodeArtefactsStored case cascade.SupernodeEventType_ACTION_FINALIZED: - return event.TaskProgressActionFinalized + return event.SupernodeActionFinalized default: - return event.EventType("task.progress.unknown") + return event.SupernodeUnknown } } diff --git a/sdk/adapters/supernodeservice/types_test.go b/sdk/adapters/supernodeservice/types_test.go index ea18d042..359cac15 100644 --- a/sdk/adapters/supernodeservice/types_test.go +++ b/sdk/adapters/supernodeservice/types_test.go @@ -14,18 +14,17 @@ func TestTranslateSupernodeEvent(t *testing.T) { input cascade.SupernodeEventType expected event.EventType }{ - {"ACTION_RETRIEVED", cascade.SupernodeEventType_ACTION_RETRIEVED, event.TaskProgressActionRetrievedBySupernode}, - {"ACTION_FEE_VERIFIED", cascade.SupernodeEventType_ACTION_FEE_VERIFIED, event.TaskProgressActionFeeValidated}, - {"TOP_SUPERNODE_CHECK_PASSED", cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED, event.TaskProgressTopSupernodeCheckValidated}, - {"METADATA_DECODED", cascade.SupernodeEventType_METADATA_DECODED, event.TaskProgressCascadeMetadataDecoded}, - {"DATA_HASH_VERIFIED", cascade.SupernodeEventType_DATA_HASH_VERIFIED, event.TaskProgressDataHashVerified}, - {"INPUT_ENCODED", cascade.SupernodeEventType_INPUT_ENCODED, event.TaskProgressInputDataEncoded}, - {"SIGNATURE_VERIFIED", cascade.SupernodeEventType_SIGNATURE_VERIFIED, event.TaskProgressSignatureVerified}, - {"RQID_GENERATED", cascade.SupernodeEventType_RQID_GENERATED, event.TaskProgressRQIDFilesGenerated}, - {"RQID_VERIFIED", cascade.SupernodeEventType_RQID_VERIFIED, event.TaskProgressRQIDsVerified}, - {"ARTEFACTS_STORED", cascade.SupernodeEventType_ARTEFACTS_STORED, event.TaskProgressArtefactsStored}, - {"ACTION_FINALIZED", cascade.SupernodeEventType_ACTION_FINALIZED, event.TaskProgressActionFinalized}, - {"UNKNOWN_TYPE", cascade.SupernodeEventType(999), event.EventType("task.progress.unknown")}, + {"ACTION_RETRIEVED", cascade.SupernodeEventType_ACTION_RETRIEVED, event.SupernodeActionRetrieved}, + {"ACTION_FEE_VERIFIED", cascade.SupernodeEventType_ACTION_FEE_VERIFIED, event.SupernodeActionFeeVerified}, + {"TOP_SUPERNODE_CHECK_PASSED", cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED, event.SupernodeTopCheckPassed}, + {"METADATA_DECODED", cascade.SupernodeEventType_METADATA_DECODED, event.SupernodeMetadataDecoded}, + {"DATA_HASH_VERIFIED", cascade.SupernodeEventType_DATA_HASH_VERIFIED, event.SupernodeDataHashVerified}, + {"INPUT_ENCODED", cascade.SupernodeEventType_INPUT_ENCODED, event.SupernodeInputEncoded}, + {"SIGNATURE_VERIFIED", cascade.SupernodeEventType_SIGNATURE_VERIFIED, event.SupernodeSignatureVerified}, + {"RQID_GENERATED", cascade.SupernodeEventType_RQID_GENERATED, event.SupernodeRQIDGenerated}, + {"RQID_VERIFIED", cascade.SupernodeEventType_RQID_VERIFIED, event.SupernodeRQIDVerified}, + {"ARTEFACTS_STORED", cascade.SupernodeEventType_ARTEFACTS_STORED, event.SupernodeArtefactsStored}, + {"ACTION_FINALIZED", cascade.SupernodeEventType_ACTION_FINALIZED, event.SupernodeActionFinalized}, } for _, tt := range tests { diff --git a/sdk/event/progress.go b/sdk/event/progress.go new file mode 100644 index 00000000..b1b7a27e --- /dev/null +++ b/sdk/event/progress.go @@ -0,0 +1,94 @@ +package event + +// ProgressInfo provides comprehensive progress information +type ProgressInfo struct { + Percentage int // Percentage complete (0-100) + Status TaskStatus // Current status (pending, active, completed, failed) + CurrentEvent EventType // The current event type + IsErrorState bool // Whether this is an error state + +} + +// TaskStatus represents the current status of a task +type TaskStatus string + +const ( + StatusPending TaskStatus = "PENDING" + StatusActive TaskStatus = "ACTIVE" + StatusCompleted TaskStatus = "COMPLETED" + StatusFailed TaskStatus = "FAILED" +) + +// eventProgressMap maps event types to their progress percentages +var eventProgressMap = map[EventType]int{ + //SDK + SDKTaskStarted: 0, + SDKSupernodesUnavailable: 5, + SDKSupernodesFound: 10, + SDKRegistrationAttempt: 12, + SDKRegistrationFailure: 12, + //Supernode + SupernodeActionRetrieved: 15, + SupernodeActionFeeVerified: 20, + SupernodeTopCheckPassed: 25, + SupernodeMetadataDecoded: 30, + SupernodeDataHashVerified: 35, + SupernodeInputEncoded: 40, + SupernodeSignatureVerified: 45, + SupernodeRQIDGenerated: 50, + SupernodeRQIDVerified: 55, + SupernodeArtefactsStored: 60, + SupernodeActionFinalized: 80, + // SDK + SDKTaskTxHashReceived: 97, + SDKRegistrationSuccessful: 99, + SDKTaskCompleted: 100, + SDKTaskFailed: 100, +} + +// GetProgressFromEvent calculates progress information from a single event +func GetProgressFromEvent(e EventType) ProgressInfo { + // Determine percentage + percentage := 0 + if p, exists := eventProgressMap[e]; exists { + percentage = p + } + + // Determine status + var status TaskStatus + isError := false + + switch e { + case SDKTaskStarted: + status = StatusActive + case SDKTaskCompleted: + status = StatusCompleted + case SDKTaskFailed: + status = StatusFailed + isError = true + default: + status = StatusActive + } + + return ProgressInfo{ + Percentage: percentage, + Status: status, + CurrentEvent: e, + IsErrorState: isError, + } +} + +// GetLatestProgress calculates progress info from the most recent event in a list +func GetLatestProgress(events []Event) ProgressInfo { + if len(events) == 0 { + return ProgressInfo{ + Percentage: 0, + Status: StatusPending, + CurrentEvent: "", + IsErrorState: false, + } + } + + // Return progress based on the latest event + return GetProgressFromEvent(events[len(events)-1].Type) +} diff --git a/sdk/event/types.go b/sdk/event/types.go index d0cac2b2..ce782617 100644 --- a/sdk/event/types.go +++ b/sdk/event/types.go @@ -14,45 +14,31 @@ type EventType string // These events are used to track the progress of tasks // and to notify subscribers about important changes in the system. const ( - TaskStarted EventType = "task.started" - TaskProgressActionVerified EventType = "task.progress.action_verified" - TaskProgressActionVerificationFailed EventType = "task.progress.action_verification_failed" - TaskProgressSupernodesFound EventType = "task.progress.supernode_found" - TaskProgressSupernodesUnavailable EventType = "task.progress.supernodes_unavailable" - TaskProgressActionRetrievedBySupernode EventType = "task.progress.action_retrieved_by_supernode" - TaskProgressActionFeeValidated EventType = "task.progress.action_fee_validated" - TaskProgressTopSupernodeCheckValidated EventType = "task.progress.top_sn_check_validated" - TaskProgressArtefactsStored EventType = "task.progress.artefacts_stored" - TaskProgressCascadeMetadataDecoded EventType = "task.progress.cascade_metadata_decoded" - TaskProgressDataHashVerified EventType = "task.progress.data_hash_verified" - TaskProgressInputDataEncoded EventType = "task.progress.input_data_encoded" - TaskProgressSignatureVerified EventType = "task.progress.signature_verified" - TaskProgressRQIDFilesGenerated EventType = "task.progress.rq_id_files_generated" - TaskProgressRQIDsVerified EventType = "task.progress.rq_ids_verified" - TaskProgressActionFinalized EventType = "task.progress.action_finalized" - TaskProgressRegistrationInProgress EventType = "task.progress.registration_in_progress" - TaskProgressRegistrationFailure EventType = "task.progress.registration_failure" - TaskProgressRegistrationSuccessful EventType = "task.progress.registration_successful" - TaskCompleted EventType = "task.completed" - TxhasReceived EventType = "txhash.received" - TaskFailed EventType = "task.failed" + SDKTaskStarted EventType = "sdk:started" + SDKSupernodesUnavailable EventType = "sdk:supernodes_unavailable" + SDKSupernodesFound EventType = "sdk:supernodes_found" + SDKRegistrationAttempt EventType = "sdk:registration_attempt" + SDKRegistrationFailure EventType = "sdk:registration_failure" + SDKRegistrationSuccessful EventType = "sdk:registration_successful" + SDKTaskTxHashReceived EventType = "sdk:txhash_received" + SDKTaskCompleted EventType = "sdk:completed" + SDKTaskFailed EventType = "sdk:failed" ) -// Task progress steps in order -// This is the order in which events are expected to occur -// during the task lifecycle. It is used to track progress. -// The order of events in this slice should match the order -// in which they are expected to occur in the task lifecycle. -// The index of each event in this slice represents its -// position in the task lifecycle. The first event in the slice is the -// first event that should be emitted when a task starts. -var taskProgressSteps = []EventType{ - TaskStarted, - TaskProgressActionVerified, - TaskProgressSupernodesFound, - TaskProgressRegistrationInProgress, - TaskCompleted, -} +const ( + SupernodeActionRetrieved EventType = "supernode:action_retrieved" + SupernodeActionFeeVerified EventType = "supernode:action_fee_verified" + SupernodeTopCheckPassed EventType = "supernode:top_check_passed" + SupernodeMetadataDecoded EventType = "supernode:metadata_decoded" + SupernodeDataHashVerified EventType = "supernode:data_hash_verified" + SupernodeInputEncoded EventType = "supernode:input_encoded" + SupernodeSignatureVerified EventType = "supernode:signature_verified" + SupernodeRQIDGenerated EventType = "supernode:rqid_generated" + SupernodeRQIDVerified EventType = "supernode:rqid_verified" + SupernodeArtefactsStored EventType = "supernode:artefacts_stored" + SupernodeActionFinalized EventType = "supernode:action_finalized" + SupernodeUnknown EventType = "supernode:unknown" +) // EventData is a map of event data attributes using standardized keys type EventData map[EventDataKey]any @@ -87,14 +73,3 @@ func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string, ActionID: actionID, } } - -// GetTaskProgress returns current progress as (y, x), where y = current step number, x = total steps. -func GetTaskProgress(current EventType) (int, int) { - for idx, step := range taskProgressSteps { - if step == current { - return idx + 1, len(taskProgressSteps) - } - } - // Unknown event, treat as 0 progress - return 0, len(taskProgressSteps) -} diff --git a/sdk/task/cache.go b/sdk/task/cache.go index dbce2087..4395efad 100644 --- a/sdk/task/cache.go +++ b/sdk/task/cache.go @@ -7,8 +7,8 @@ import ( "time" "github.com/LumeraProtocol/supernode/sdk/event" + eventspkg "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/log" - "github.com/dgraph-io/ristretto/v2" ) @@ -17,7 +17,7 @@ type TaskEntry struct { TaskID string ActionID string TaskType TaskType - Status TaskStatus + Status eventspkg.TaskStatus TxHash string Error error Events []event.Event @@ -78,7 +78,7 @@ func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType TaskID: taskID, ActionID: actionID, TaskType: taskType, - Status: StatusPending, + Status: eventspkg.StatusPending, Events: make([]event.Event, 0), CreatedAt: now, LastUpdatedAt: now, @@ -101,8 +101,13 @@ func (tc *TaskCache) Get(ctx context.Context, taskID string) (*TaskEntry, bool) return entry, found } +// GetProgress returns the current progress information for the task +func (t *TaskEntry) GetProgress() eventspkg.ProgressInfo { + return eventspkg.GetLatestProgress(t.Events) +} + // UpdateStatus updates the status of a task in the cache atomically -func (tc *TaskCache) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, err error) bool { +func (tc *TaskCache) UpdateStatus(ctx context.Context, taskID string, status eventspkg.TaskStatus, err error) bool { mu := tc.getOrCreateMutex(taskID) mu.Lock() defer mu.Unlock() diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index a3f7bcd2..2daae71c 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -38,36 +38,27 @@ func NewCascadeTask(base BaseTask, filePath string, actionId string) *CascadeTas // Run executes the full cascade‐task lifecycle. func (t *CascadeTask) Run(ctx context.Context) error { - t.LogEvent(ctx, event.TaskStarted, "Running cascade task", nil) - // Use the already validated Action directly to get the height + t.LogEvent(ctx, event.SDKTaskStarted, "Running cascade task", nil) + + // 1 - Fetch the supernodes supernodes, err := t.fetchSupernodes(ctx, t.Action.Height) + if err != nil { - t.logger.Error(ctx, "Task failed", "taskID", t.TaskID, "actionID", t.ActionID, "error", err) - t.EmitEvent(ctx, event.TaskProgressSupernodesUnavailable, event.EventData{ - event.KeyError: err.Error(), - }) - t.EmitEvent(ctx, event.TaskFailed, event.EventData{ - event.KeyError: err.Error(), - }) + t.LogEvent(ctx, event.SDKSupernodesUnavailable, "Supernodes unavailable", event.EventData{event.KeyError: err.Error()}) + t.LogEvent(ctx, event.SDKTaskFailed, "Task failed", event.EventData{event.KeyError: err.Error()}) return err } - t.LogEvent(ctx, event.TaskProgressSupernodesFound, "Supernodes found.", event.EventData{ - event.KeyCount: len(supernodes), - }) + t.LogEvent(ctx, event.SDKSupernodesFound, "Supernodes found.", event.EventData{event.KeyCount: len(supernodes)}) + + // 2 - Register with the supernodes if err := t.registerWithSupernodes(ctx, supernodes); err != nil { - t.logger.Error(ctx, "Task failed", "taskID", t.TaskID, "actionID", t.ActionID, "error", err) - t.EmitEvent(ctx, event.TaskProgressRegistrationFailure, event.EventData{ - event.KeyError: err.Error(), - }) - t.EmitEvent(ctx, event.TaskFailed, event.EventData{ - event.KeyError: err.Error(), - }) + t.LogEvent(ctx, event.SDKTaskFailed, "Task failed", event.EventData{event.KeyError: err.Error()}) return err } - t.LogEvent(ctx, event.TaskCompleted, "Cascade task completed successfully", nil) + t.LogEvent(ctx, event.SDKTaskCompleted, "Cascade task completed successfully", nil) return nil } @@ -77,7 +68,6 @@ func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera if err != nil { return nil, fmt.Errorf("fetch supernodes: %w", err) } - t.logger.Info(ctx, "Supernodes fetched", "count", len(sns)) if len(sns) == 0 { return nil, errors.New("no supernodes found") @@ -110,7 +100,6 @@ func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera if len(healthy) == 0 { return nil, errors.New("no healthy supernodes found") } - t.logger.Info(ctx, "Healthy supernodes", "count", len(healthy)) return healthy, nil } @@ -124,8 +113,7 @@ func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) boo LocalCosmosAddress: t.config.Account.LocalCosmosAddress, }).CreateClient(ctx, sn) if err != nil { - logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{ - logtrace.FieldMethod: "isServing"}) + logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{logtrace.FieldMethod: "isServing"}) return false } defer client.Close(ctx) @@ -148,23 +136,35 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum var lastErr error for idx, sn := range supernodes { + // 1 + t.LogEvent(ctx, event.SDKRegistrationAttempt, "attempting registration with supernode", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: idx + 1, + }) if err := t.attemptRegistration(ctx, idx, sn, clientFactory, req); err != nil { + // + t.LogEvent(ctx, event.SDKRegistrationFailure, "registration with supernode failed", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: idx + 1, + event.KeyError: err.Error(), + }) lastErr = err continue } + t.LogEvent(ctx, event.SDKRegistrationSuccessful, "successfully registratered with supernode", event.EventData{ + event.KeySupernode: sn.GrpcEndpoint, + event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: idx + 1, + }) return nil // success } return fmt.Errorf("failed to upload to all supernodes: %w", lastErr) } -func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error { - t.LogEvent(ctx, event.TaskProgressRegistrationInProgress, "attempting registration with supernode", event.EventData{ - event.KeySupernode: sn.GrpcEndpoint, - event.KeySupernodeAddress: sn.CosmosAddress, - event.KeyIteration: index + 1, - }) - +func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error { client, err := factory.CreateClient(ctx, sn) if err != nil { return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err) @@ -185,14 +185,10 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, index int, sn lum return fmt.Errorf("upload rejected by %s: %s", sn.CosmosAddress, resp.Message) } - // Use txhash directly without cleaning - t.LogEvent(ctx, event.TxhasReceived, "txhash received", event.EventData{ + t.LogEvent(ctx, event.SDKTaskTxHashReceived, "txhash received", event.EventData{ event.KeyTxHash: resp.TxHash, event.KeySupernode: sn.CosmosAddress, }) - t.logger.Info(ctx, "upload OK", "taskID", t.TaskID, "address", sn.CosmosAddress) return nil } - -// logEvent writes a structured log entry **and** emits the SDK event. diff --git a/sdk/task/manager.go b/sdk/task/manager.go index a59c729a..327a3eef 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -7,8 +7,8 @@ import ( "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/config" "github.com/LumeraProtocol/supernode/sdk/event" + taskstatus "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/log" - "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/google/uuid" ) @@ -121,7 +121,6 @@ func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, ac // Error handling is done via events in the task.Run method // This is just a failsafe in case something goes wrong m.logger.Error(ctx, "Cascade task failed with error", "taskID", taskID, "error", err) - m.taskCache.UpdateStatus(ctx, taskID, StatusFailed, err) } }() @@ -182,13 +181,13 @@ func (m *ManagerImpl) handleEvent(ctx context.Context, e event.Event) { // Update the task status based on event type switch e.Type { - case event.TaskStarted: + case event.SDKTaskStarted: m.logger.Info(ctx, "Task started", "taskID", e.TaskID, "taskType", e.TaskType) - m.taskCache.UpdateStatus(ctx, e.TaskID, StatusProcessing, nil) - case event.TaskCompleted: + m.taskCache.UpdateStatus(ctx, e.TaskID, taskstatus.StatusActive, nil) + case event.SDKTaskCompleted: m.logger.Info(ctx, "Task completed", "taskID", e.TaskID, "taskType", e.TaskType) - m.taskCache.UpdateStatus(ctx, e.TaskID, StatusCompleted, nil) - case event.TaskFailed: + m.taskCache.UpdateStatus(ctx, e.TaskID, taskstatus.StatusCompleted, nil) + case event.SDKTaskFailed: var err error if errMsg, ok := e.Data[event.KeyError].(string); ok { err = fmt.Errorf("%s", errMsg) @@ -196,8 +195,8 @@ func (m *ManagerImpl) handleEvent(ctx context.Context, e event.Event) { } else { m.logger.Error(ctx, "Task failed with unknown error", "taskID", e.TaskID, "taskType", e.TaskType) } - m.taskCache.UpdateStatus(ctx, e.TaskID, StatusFailed, err) - case event.TxhasReceived: + m.taskCache.UpdateStatus(ctx, e.TaskID, taskstatus.StatusFailed, err) + case event.SDKTaskTxHashReceived: // Capture and store transaction hash from event if txHash, ok := e.Data[event.KeyTxHash].(string); ok && txHash != "" { m.logger.Info(ctx, "Transaction hash received", "taskID", e.TaskID, "txHash", txHash) diff --git a/sdk/task/task.go b/sdk/task/task.go index aa158613..1b738d68 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -18,16 +18,6 @@ const ( TaskTypeCascade TaskType = "CASCADE" ) -// TaskStatus represents the possible states of a task -type TaskStatus string - -const ( - StatusPending TaskStatus = "PENDING" - StatusProcessing TaskStatus = "PROCESSING" - StatusCompleted TaskStatus = "COMPLETED" - StatusFailed TaskStatus = "FAILED" -) - // EventCallback is a function that processes events from tasks type EventCallback func(ctx context.Context, e event.Event) @@ -52,7 +42,7 @@ type BaseTask struct { } // EmitEvent creates and sends an event with the specified type and data -func (t *BaseTask) EmitEvent(ctx context.Context, eventType event.EventType, data event.EventData) { +func (t *BaseTask) emitEvent(ctx context.Context, eventType event.EventType, data event.EventData) { if t.onEvent != nil { // Create event with the provided context e := event.NewEvent(ctx, eventType, t.TaskID, string(t.TaskType), t.ActionID, data) @@ -75,5 +65,5 @@ func (t *BaseTask) LogEvent(ctx context.Context, evt event.EventType, msg string } t.logger.Info(ctx, msg, kvs...) - t.EmitEvent(ctx, evt, additionalInfo) + t.emitEvent(ctx, evt, additionalInfo) } diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index 9c18358c..be389337 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -431,15 +431,15 @@ func TestCascadeE2E(t *testing.T) { // Subscribe to ALL events err = actionClient.SubscribeToAllEvents(ctx, func(ctx context.Context, e event.Event) { // Only capture TxhasReceived events - if e.Type == event.TxhasReceived { - if txHash, ok := e.Data["txhash"].(string); ok && txHash != "" { + if e.Type == event.SDKTaskTxHashReceived { + if txHash, ok := e.Data[event.KeyTxHash].(string); ok && txHash != "" { // Send the hash to our channel txHashCh <- txHash } } // Also monitor for task completion - if e.Type == event.TaskCompleted { + if e.Type == event.SDKTaskCompleted { completionCh <- true } })