Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,28 +168,28 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
switch e {
case cascade.SupernodeEventType_ACTION_RETRIEVED:
return event.TaskProgressActionRetrievedBySupernode
return event.SupernodeActionRetrieved
case cascade.SupernodeEventType_ACTION_FEE_VERIFIED:
return event.TaskProgressActionFeeValidated
return event.SupernodeActionFeeVerified
case cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED:
return event.TaskProgressTopSupernodeCheckValidated
return event.SupernodeTopCheckPassed
case cascade.SupernodeEventType_METADATA_DECODED:
return event.TaskProgressCascadeMetadataDecoded
return event.SupernodeMetadataDecoded
case cascade.SupernodeEventType_DATA_HASH_VERIFIED:
return event.TaskProgressDataHashVerified
return event.SupernodeDataHashVerified
case cascade.SupernodeEventType_INPUT_ENCODED:
return event.TaskProgressInputDataEncoded
return event.SupernodeInputEncoded
case cascade.SupernodeEventType_SIGNATURE_VERIFIED:
return event.TaskProgressSignatureVerified
return event.SupernodeSignatureVerified
case cascade.SupernodeEventType_RQID_GENERATED:
return event.TaskProgressRQIDFilesGenerated
return event.SupernodeRQIDGenerated
case cascade.SupernodeEventType_RQID_VERIFIED:
return event.TaskProgressRQIDsVerified
return event.SupernodeRQIDVerified
case cascade.SupernodeEventType_ARTEFACTS_STORED:
return event.TaskProgressArtefactsStored
return event.SupernodeArtefactsStored
case cascade.SupernodeEventType_ACTION_FINALIZED:
return event.TaskProgressActionFinalized
return event.SupernodeActionFinalized
default:
return event.EventType("task.progress.unknown")
return event.SupernodeUnknown
}
}
23 changes: 11 additions & 12 deletions sdk/adapters/supernodeservice/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ func TestTranslateSupernodeEvent(t *testing.T) {
input cascade.SupernodeEventType
expected event.EventType
}{
{"ACTION_RETRIEVED", cascade.SupernodeEventType_ACTION_RETRIEVED, event.TaskProgressActionRetrievedBySupernode},
{"ACTION_FEE_VERIFIED", cascade.SupernodeEventType_ACTION_FEE_VERIFIED, event.TaskProgressActionFeeValidated},
{"TOP_SUPERNODE_CHECK_PASSED", cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED, event.TaskProgressTopSupernodeCheckValidated},
{"METADATA_DECODED", cascade.SupernodeEventType_METADATA_DECODED, event.TaskProgressCascadeMetadataDecoded},
{"DATA_HASH_VERIFIED", cascade.SupernodeEventType_DATA_HASH_VERIFIED, event.TaskProgressDataHashVerified},
{"INPUT_ENCODED", cascade.SupernodeEventType_INPUT_ENCODED, event.TaskProgressInputDataEncoded},
{"SIGNATURE_VERIFIED", cascade.SupernodeEventType_SIGNATURE_VERIFIED, event.TaskProgressSignatureVerified},
{"RQID_GENERATED", cascade.SupernodeEventType_RQID_GENERATED, event.TaskProgressRQIDFilesGenerated},
{"RQID_VERIFIED", cascade.SupernodeEventType_RQID_VERIFIED, event.TaskProgressRQIDsVerified},
{"ARTEFACTS_STORED", cascade.SupernodeEventType_ARTEFACTS_STORED, event.TaskProgressArtefactsStored},
{"ACTION_FINALIZED", cascade.SupernodeEventType_ACTION_FINALIZED, event.TaskProgressActionFinalized},
{"UNKNOWN_TYPE", cascade.SupernodeEventType(999), event.EventType("task.progress.unknown")},
{"ACTION_RETRIEVED", cascade.SupernodeEventType_ACTION_RETRIEVED, event.SupernodeActionRetrieved},
{"ACTION_FEE_VERIFIED", cascade.SupernodeEventType_ACTION_FEE_VERIFIED, event.SupernodeActionFeeVerified},
{"TOP_SUPERNODE_CHECK_PASSED", cascade.SupernodeEventType_TOP_SUPERNODE_CHECK_PASSED, event.SupernodeTopCheckPassed},
{"METADATA_DECODED", cascade.SupernodeEventType_METADATA_DECODED, event.SupernodeMetadataDecoded},
{"DATA_HASH_VERIFIED", cascade.SupernodeEventType_DATA_HASH_VERIFIED, event.SupernodeDataHashVerified},
{"INPUT_ENCODED", cascade.SupernodeEventType_INPUT_ENCODED, event.SupernodeInputEncoded},
{"SIGNATURE_VERIFIED", cascade.SupernodeEventType_SIGNATURE_VERIFIED, event.SupernodeSignatureVerified},
{"RQID_GENERATED", cascade.SupernodeEventType_RQID_GENERATED, event.SupernodeRQIDGenerated},
{"RQID_VERIFIED", cascade.SupernodeEventType_RQID_VERIFIED, event.SupernodeRQIDVerified},
{"ARTEFACTS_STORED", cascade.SupernodeEventType_ARTEFACTS_STORED, event.SupernodeArtefactsStored},
{"ACTION_FINALIZED", cascade.SupernodeEventType_ACTION_FINALIZED, event.SupernodeActionFinalized},
}

for _, tt := range tests {
Expand Down
94 changes: 94 additions & 0 deletions sdk/event/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package event

// ProgressInfo provides comprehensive progress information
type ProgressInfo struct {
Percentage int // Percentage complete (0-100)
Status TaskStatus // Current status (pending, active, completed, failed)
CurrentEvent EventType // The current event type
IsErrorState bool // Whether this is an error state

}

// TaskStatus represents the current status of a task
type TaskStatus string

const (
StatusPending TaskStatus = "PENDING"
StatusActive TaskStatus = "ACTIVE"
StatusCompleted TaskStatus = "COMPLETED"
StatusFailed TaskStatus = "FAILED"
)

// eventProgressMap maps event types to their progress percentages
var eventProgressMap = map[EventType]int{
//SDK
SDKTaskStarted: 0,
SDKSupernodesUnavailable: 5,
SDKSupernodesFound: 10,
SDKRegistrationAttempt: 12,
SDKRegistrationFailure: 12,
//Supernode
SupernodeActionRetrieved: 15,
SupernodeActionFeeVerified: 20,
SupernodeTopCheckPassed: 25,
SupernodeMetadataDecoded: 30,
SupernodeDataHashVerified: 35,
SupernodeInputEncoded: 40,
SupernodeSignatureVerified: 45,
SupernodeRQIDGenerated: 50,
SupernodeRQIDVerified: 55,
SupernodeArtefactsStored: 60,
SupernodeActionFinalized: 80,
// SDK
SDKTaskTxHashReceived: 97,
SDKRegistrationSuccessful: 99,
SDKTaskCompleted: 100,
SDKTaskFailed: 100,
}

// GetProgressFromEvent calculates progress information from a single event
func GetProgressFromEvent(e EventType) ProgressInfo {
// Determine percentage
percentage := 0
if p, exists := eventProgressMap[e]; exists {
percentage = p
}

// Determine status
var status TaskStatus
isError := false

switch e {
case SDKTaskStarted:
status = StatusActive
case SDKTaskCompleted:
status = StatusCompleted
case SDKTaskFailed:
status = StatusFailed
isError = true
default:
status = StatusActive
}

return ProgressInfo{
Percentage: percentage,
Status: status,
CurrentEvent: e,
IsErrorState: isError,
}
}

// GetLatestProgress calculates progress info from the most recent event in a list
func GetLatestProgress(events []Event) ProgressInfo {
if len(events) == 0 {
return ProgressInfo{
Percentage: 0,
Status: StatusPending,
CurrentEvent: "",
IsErrorState: false,
}
}

// Return progress based on the latest event
return GetProgressFromEvent(events[len(events)-1].Type)
}
71 changes: 23 additions & 48 deletions sdk/event/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,31 @@ type EventType string
// These events are used to track the progress of tasks
// and to notify subscribers about important changes in the system.
const (
TaskStarted EventType = "task.started"
TaskProgressActionVerified EventType = "task.progress.action_verified"
TaskProgressActionVerificationFailed EventType = "task.progress.action_verification_failed"
TaskProgressSupernodesFound EventType = "task.progress.supernode_found"
TaskProgressSupernodesUnavailable EventType = "task.progress.supernodes_unavailable"
TaskProgressActionRetrievedBySupernode EventType = "task.progress.action_retrieved_by_supernode"
TaskProgressActionFeeValidated EventType = "task.progress.action_fee_validated"
TaskProgressTopSupernodeCheckValidated EventType = "task.progress.top_sn_check_validated"
TaskProgressArtefactsStored EventType = "task.progress.artefacts_stored"
TaskProgressCascadeMetadataDecoded EventType = "task.progress.cascade_metadata_decoded"
TaskProgressDataHashVerified EventType = "task.progress.data_hash_verified"
TaskProgressInputDataEncoded EventType = "task.progress.input_data_encoded"
TaskProgressSignatureVerified EventType = "task.progress.signature_verified"
TaskProgressRQIDFilesGenerated EventType = "task.progress.rq_id_files_generated"
TaskProgressRQIDsVerified EventType = "task.progress.rq_ids_verified"
TaskProgressActionFinalized EventType = "task.progress.action_finalized"
TaskProgressRegistrationInProgress EventType = "task.progress.registration_in_progress"
TaskProgressRegistrationFailure EventType = "task.progress.registration_failure"
TaskProgressRegistrationSuccessful EventType = "task.progress.registration_successful"
TaskCompleted EventType = "task.completed"
TxhasReceived EventType = "txhash.received"
TaskFailed EventType = "task.failed"
SDKTaskStarted EventType = "sdk:started"
SDKSupernodesUnavailable EventType = "sdk:supernodes_unavailable"
SDKSupernodesFound EventType = "sdk:supernodes_found"
SDKRegistrationAttempt EventType = "sdk:registration_attempt"
SDKRegistrationFailure EventType = "sdk:registration_failure"
SDKRegistrationSuccessful EventType = "sdk:registration_successful"
SDKTaskTxHashReceived EventType = "sdk:txhash_received"
SDKTaskCompleted EventType = "sdk:completed"
SDKTaskFailed EventType = "sdk:failed"
)

// Task progress steps in order
// This is the order in which events are expected to occur
// during the task lifecycle. It is used to track progress.
// The order of events in this slice should match the order
// in which they are expected to occur in the task lifecycle.
// The index of each event in this slice represents its
// position in the task lifecycle. The first event in the slice is the
// first event that should be emitted when a task starts.
var taskProgressSteps = []EventType{
TaskStarted,
TaskProgressActionVerified,
TaskProgressSupernodesFound,
TaskProgressRegistrationInProgress,
TaskCompleted,
}
const (
SupernodeActionRetrieved EventType = "supernode:action_retrieved"
SupernodeActionFeeVerified EventType = "supernode:action_fee_verified"
SupernodeTopCheckPassed EventType = "supernode:top_check_passed"
SupernodeMetadataDecoded EventType = "supernode:metadata_decoded"
SupernodeDataHashVerified EventType = "supernode:data_hash_verified"
SupernodeInputEncoded EventType = "supernode:input_encoded"
SupernodeSignatureVerified EventType = "supernode:signature_verified"
SupernodeRQIDGenerated EventType = "supernode:rqid_generated"
SupernodeRQIDVerified EventType = "supernode:rqid_verified"
SupernodeArtefactsStored EventType = "supernode:artefacts_stored"
SupernodeActionFinalized EventType = "supernode:action_finalized"
SupernodeUnknown EventType = "supernode:unknown"
)

// EventData is a map of event data attributes using standardized keys
type EventData map[EventDataKey]any
Expand Down Expand Up @@ -87,14 +73,3 @@ func NewEvent(ctx context.Context, eventType EventType, taskID, taskType string,
ActionID: actionID,
}
}

// GetTaskProgress returns current progress as (y, x), where y = current step number, x = total steps.
func GetTaskProgress(current EventType) (int, int) {
for idx, step := range taskProgressSteps {
if step == current {
return idx + 1, len(taskProgressSteps)
}
}
// Unknown event, treat as 0 progress
return 0, len(taskProgressSteps)
}
13 changes: 9 additions & 4 deletions sdk/task/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

"github.com/LumeraProtocol/supernode/sdk/event"
eventspkg "github.com/LumeraProtocol/supernode/sdk/event"
"github.com/LumeraProtocol/supernode/sdk/log"

"github.com/dgraph-io/ristretto/v2"
)

Expand All @@ -17,7 +17,7 @@ type TaskEntry struct {
TaskID string
ActionID string
TaskType TaskType
Status TaskStatus
Status eventspkg.TaskStatus
TxHash string
Error error
Events []event.Event
Expand Down Expand Up @@ -78,7 +78,7 @@ func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType
TaskID: taskID,
ActionID: actionID,
TaskType: taskType,
Status: StatusPending,
Status: eventspkg.StatusPending,
Events: make([]event.Event, 0),
CreatedAt: now,
LastUpdatedAt: now,
Expand All @@ -101,8 +101,13 @@ func (tc *TaskCache) Get(ctx context.Context, taskID string) (*TaskEntry, bool)
return entry, found
}

// GetProgress returns the current progress information for the task
func (t *TaskEntry) GetProgress() eventspkg.ProgressInfo {
return eventspkg.GetLatestProgress(t.Events)
}

// UpdateStatus updates the status of a task in the cache atomically
func (tc *TaskCache) UpdateStatus(ctx context.Context, taskID string, status TaskStatus, err error) bool {
func (tc *TaskCache) UpdateStatus(ctx context.Context, taskID string, status eventspkg.TaskStatus, err error) bool {
mu := tc.getOrCreateMutex(taskID)
mu.Lock()
defer mu.Unlock()
Expand Down
Loading