Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion cmd/igord/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/simonovic86/igor/internal/agent"
"github.com/simonovic86/igor/internal/authority"
"github.com/simonovic86/igor/internal/config"
"github.com/simonovic86/igor/internal/inspector"
"github.com/simonovic86/igor/internal/logging"
Expand Down Expand Up @@ -46,6 +47,8 @@ func main() {
simVerify := flag.Bool("verify", false, "Per-tick replay verification during simulation")
simDeterministic := flag.Bool("deterministic", false, "Use fixed clock and seeded rand for reproducible simulation")
simSeed := flag.Uint64("seed", 0, "Random seed for deterministic simulation")
leaseDuration := flag.Duration("lease-duration", 60*time.Second, "Lease validity period (0 = disabled)")
leaseGrace := flag.Duration("lease-grace", 10*time.Second, "Grace period after lease expiry")
flag.Parse()

// Checkpoint inspector — standalone, no config/P2P/engine needed
Expand Down Expand Up @@ -87,6 +90,8 @@ func main() {
if *replayOnDivergence != "" {
cfg.ReplayOnDivergence = *replayOnDivergence
}
cfg.LeaseDuration = *leaseDuration
cfg.LeaseGracePeriod = *leaseGrace

// Initialize logging
logger := logging.NewLogger()
Expand Down Expand Up @@ -119,7 +124,12 @@ func main() {
defer engine.Close(ctx)

// Initialize migration service
migrationSvc := migration.NewService(node.Host, engine, storageProvider, cfg.ReplayMode, cfg.ReplayCostLog, cfg.PricePerSecond, logger)
leaseCfg := authority.LeaseConfig{
Duration: cfg.LeaseDuration,
RenewalWindow: cfg.LeaseRenewalWindow,
GracePeriod: cfg.LeaseGracePeriod,
}
migrationSvc := migration.NewService(node.Host, engine, storageProvider, cfg.ReplayMode, cfg.ReplayCostLog, cfg.PricePerSecond, leaseCfg, logger)

// Initialize pricing service for inter-node price discovery
_ = pricing.NewService(node.Host, cfg.PricePerSecond, logger)
Expand Down Expand Up @@ -230,6 +240,21 @@ func runLocalAgent(
// Continue anyway with fresh state
}

// Grant initial lease if leases are enabled
if cfg.LeaseDuration > 0 {
leaseCfg := authority.LeaseConfig{
Duration: cfg.LeaseDuration,
RenewalWindow: cfg.LeaseRenewalWindow,
GracePeriod: cfg.LeaseGracePeriod,
}
instance.Lease = authority.NewLease(leaseCfg)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve checkpoint epoch when initializing local lease

This always creates a fresh lease with NewLease, which resets authority to epoch (0,0) even when LoadCheckpointFromStorage just loaded a v0x03 checkpoint containing a higher epoch from prior renewals or migrations. After a restart, the node can therefore emit regressed epochs on later migrations, breaking monotonic authority ordering and weakening the anti-clone guarantees introduced by this change; initialize from the checkpoint epoch instead of unconditionally bootstrapping a new one.

Useful? React with 👍 / 👎.

logger.Info("Lease granted",
"epoch", instance.Lease.Epoch,
"expiry", instance.Lease.Expiry,
"duration", cfg.LeaseDuration,
)
}

// Setup signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
Expand Down Expand Up @@ -277,6 +302,11 @@ func runLocalAgent(
return nil

case <-tickTimer.C:
// Pre-tick lease validation (EI-6: safety over liveness)
if leaseErr := runner.CheckAndRenewLease(instance, logger); leaseErr != nil {
return runner.HandleLeaseExpiry(ctx, instance, leaseErr, logger)
}

hasMoreWork, tickErr := runner.SafeTick(ctx, instance)
if tickErr != nil {
return runner.HandleTickFailure(ctx, instance, tickErr, logger)
Expand Down
77 changes: 61 additions & 16 deletions internal/agent/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"time"

"github.com/simonovic86/igor/internal/authority"
"github.com/simonovic86/igor/internal/config"
"github.com/simonovic86/igor/internal/eventlog"
"github.com/simonovic86/igor/internal/hostcall"
Expand All @@ -27,8 +28,12 @@ import (
)

const (
checkpointVersion byte = 0x02
checkpointHeaderLen int = 57 // 1 (version) + 8 (budget) + 8 (pricePerSecond) + 8 (tickNumber) + 32 (wasmHash)
checkpointVersionV2 byte = 0x02
checkpointVersionV3 byte = 0x03
checkpointVersion byte = checkpointVersionV3 // current version for writing
checkpointHeaderLenV2 int = 57 // 1 + 8 + 8 + 8 + 32
checkpointHeaderLenV3 int = 81 // 57 + 8 (majorVersion) + 8 (leaseGeneration) + 8 (leaseExpiry)
checkpointHeaderLen int = checkpointHeaderLenV3

// DefaultReplayWindowSize is the number of recent tick snapshots retained
// for sliding replay verification (CM-4).
Expand Down Expand Up @@ -85,6 +90,9 @@ type Instance struct {
signingKey ed25519.PrivateKey // Node's signing key; nil = receipts disabled
nodeID string // Node's peer ID string
BudgetAdapter settlement.BudgetAdapter // optional; nil = no external budget validation

// Lease-based authority (Phase 5: Hardening)
Lease *authority.Lease // Lease state; nil = leases disabled
}

// walletStateRef is an indirection layer that lets wallet hostcall closures
Expand Down Expand Up @@ -563,14 +571,25 @@ func (i *Instance) SaveCheckpointToStorage(ctx context.Context) error {
return fmt.Errorf("failed to checkpoint agent: %w", err)
}

// Format: [version:1][budget:8][price:8][tick:8][wasmHash:32][state:N]
// Format v0x03: [version:1][budget:8][price:8][tick:8][wasmHash:32][majorVersion:8][leaseGeneration:8][leaseExpiry:8][state:N]
checkpoint := make([]byte, checkpointHeaderLen+len(state))
checkpoint[0] = checkpointVersion
binary.LittleEndian.PutUint64(checkpoint[1:9], uint64(i.Budget))
binary.LittleEndian.PutUint64(checkpoint[9:17], uint64(i.PricePerSecond))
binary.LittleEndian.PutUint64(checkpoint[17:25], i.TickNumber)
copy(checkpoint[25:57], i.WASMHash[:])
copy(checkpoint[57:], state)
// Lease epoch metadata (zero values if leases disabled)
var majorVersion, leaseGeneration uint64
var leaseExpiry int64
if i.Lease != nil {
majorVersion = i.Lease.Epoch.MajorVersion
leaseGeneration = i.Lease.Epoch.LeaseGeneration
leaseExpiry = i.Lease.Expiry.UnixNano()
}
binary.LittleEndian.PutUint64(checkpoint[57:65], majorVersion)
binary.LittleEndian.PutUint64(checkpoint[65:73], leaseGeneration)
binary.LittleEndian.PutUint64(checkpoint[73:81], uint64(leaseExpiry))
copy(checkpoint[81:], state)

// Save to storage provider
if err := i.Storage.SaveCheckpoint(ctx, i.AgentID, checkpoint); err != nil {
Expand Down Expand Up @@ -612,7 +631,7 @@ func (i *Instance) LoadCheckpointFromStorage(ctx context.Context) error {
return fmt.Errorf("failed to load checkpoint: %w", err)
}

restoredBudget, restoredPrice, restoredTick, storedHash, state, err := ParseCheckpointHeader(checkpoint)
restoredBudget, restoredPrice, restoredTick, storedHash, epoch, _, state, err := ParseCheckpointHeader(checkpoint)
if err != nil {
return fmt.Errorf("invalid checkpoint: %w", err)
}
Expand All @@ -630,6 +649,7 @@ func (i *Instance) LoadCheckpointFromStorage(ctx context.Context) error {
"budget", budget.Format(restoredBudget),
"price_per_second", budget.Format(restoredPrice),
"tick_number", restoredTick,
"epoch", epoch,
)

// Load receipts from storage (non-fatal: missing receipts is normal for old checkpoints).
Expand All @@ -655,36 +675,61 @@ func (i *Instance) LoadCheckpointFromStorage(ctx context.Context) error {
return nil
}

// ParseCheckpointHeader parses a checkpoint header.
// Returns budget, pricePerSecond, tickNumber, wasmHash, agentState, and any error.
func ParseCheckpointHeader(checkpoint []byte) (budgetVal int64, price int64, tick uint64, wasmHash [32]byte, state []byte, err error) {
if len(checkpoint) < checkpointHeaderLen {
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("checkpoint too short: %d bytes (need %d)", len(checkpoint), checkpointHeaderLen)
// ParseCheckpointHeader parses a checkpoint header (v0x02 or v0x03).
// Returns budget, pricePerSecond, tickNumber, wasmHash, epoch, leaseExpiry (unix nanos), agentState, and any error.
// v0x02 checkpoints return a zero epoch and zero leaseExpiry.
func ParseCheckpointHeader(checkpoint []byte) (budgetVal int64, price int64, tick uint64, wasmHash [32]byte, epoch authority.Epoch, leaseExpiry int64, state []byte, err error) {
if len(checkpoint) < 1 {
return 0, 0, 0, [32]byte{}, authority.Epoch{}, 0, nil, fmt.Errorf("checkpoint empty")
}
if checkpoint[0] != checkpointVersion {
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("unsupported checkpoint version: %d", checkpoint[0])

var headerLen int
switch checkpoint[0] {
case checkpointVersionV2:
headerLen = checkpointHeaderLenV2
case checkpointVersionV3:
headerLen = checkpointHeaderLenV3
default:
return 0, 0, 0, [32]byte{}, authority.Epoch{}, 0, nil, fmt.Errorf("unsupported checkpoint version: %d", checkpoint[0])
}

if len(checkpoint) < headerLen {
return 0, 0, 0, [32]byte{}, authority.Epoch{}, 0, nil, fmt.Errorf("checkpoint too short: %d bytes (need %d)", len(checkpoint), headerLen)
}

budgetParsed := int64(binary.LittleEndian.Uint64(checkpoint[1:9]))
priceParsed := int64(binary.LittleEndian.Uint64(checkpoint[9:17]))
if budgetParsed < 0 {
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("checkpoint contains negative budget: %d", budgetParsed)
return 0, 0, 0, [32]byte{}, authority.Epoch{}, 0, nil, fmt.Errorf("checkpoint contains negative budget: %d", budgetParsed)
}
if priceParsed < 0 {
return 0, 0, 0, [32]byte{}, nil, fmt.Errorf("checkpoint contains negative price: %d", priceParsed)
return 0, 0, 0, [32]byte{}, authority.Epoch{}, 0, nil, fmt.Errorf("checkpoint contains negative price: %d", priceParsed)
}

var hash [32]byte
copy(hash[:], checkpoint[25:57])

var parsedEpoch authority.Epoch
var parsedLeaseExpiry int64
if checkpoint[0] == checkpointVersionV3 {
parsedEpoch.MajorVersion = binary.LittleEndian.Uint64(checkpoint[57:65])
parsedEpoch.LeaseGeneration = binary.LittleEndian.Uint64(checkpoint[65:73])
parsedLeaseExpiry = int64(binary.LittleEndian.Uint64(checkpoint[73:81]))
}

return budgetParsed,
priceParsed,
binary.LittleEndian.Uint64(checkpoint[17:25]),
hash,
checkpoint[checkpointHeaderLen:],
parsedEpoch,
parsedLeaseExpiry,
checkpoint[headerLen:],
nil
}

// ExtractAgentState extracts the agent state portion from a checkpoint.
func ExtractAgentState(checkpoint []byte) ([]byte, error) {
_, _, _, _, state, err := ParseCheckpointHeader(checkpoint)
_, _, _, _, _, _, state, err := ParseCheckpointHeader(checkpoint)
return state, err
}

Expand Down
72 changes: 61 additions & 11 deletions internal/agent/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,12 @@ func TestCheckpointAndResume(t *testing.T) {
t.Fatalf("LoadCheckpoint: %v", err)
}

// Verify checkpoint format: [version:1][budget:8][price:8][tick:8][wasmHash:32][state:N]
if len(rawCheckpoint) < 57 {
// Verify checkpoint format v0x03: [version:1][budget:8][price:8][tick:8][wasmHash:32][majorVer:8][leaseGen:8][leaseExpiry:8][state:N]
if len(rawCheckpoint) < 81 {
t.Fatalf("checkpoint too short: %d bytes", len(rawCheckpoint))
}
if rawCheckpoint[0] != 0x02 {
t.Fatalf("checkpoint version: got %d, want 2", rawCheckpoint[0])
if rawCheckpoint[0] != 0x03 {
t.Fatalf("checkpoint version: got %d, want 3", rawCheckpoint[0])
}

storedBudget := int64(binary.LittleEndian.Uint64(rawCheckpoint[1:9]))
Expand All @@ -397,8 +397,8 @@ func TestCheckpointAndResume(t *testing.T) {
t.Error("stored WASM hash should not be zero")
}

// State portion should be 8 bytes (uint64 counter)
state := rawCheckpoint[57:]
// State portion should be 8 bytes (uint64 counter) — starts at offset 81 for v0x03
state := rawCheckpoint[81:]
if len(state) != 8 {
t.Errorf("state size: got %d, want 8", len(state))
}
Expand Down Expand Up @@ -559,7 +559,7 @@ func TestParseCheckpointHeader_Golden(t *testing.T) {
t.Fatalf("read golden fixture: %v", err)
}

budgetVal, price, tick, wasmHash, state, err := ParseCheckpointHeader(data)
budgetVal, price, tick, wasmHash, epoch, _, state, err := ParseCheckpointHeader(data)
if err != nil {
t.Fatalf("ParseCheckpointHeader: %v", err)
}
Expand All @@ -579,6 +579,11 @@ func TestParseCheckpointHeader_Golden(t *testing.T) {
t.Errorf("wasmHash mismatch")
}

// v0x02 fixture should return zero epoch
if epoch.MajorVersion != 0 || epoch.LeaseGeneration != 0 {
t.Errorf("v0x02 epoch: got %s, want (0,0)", epoch)
}

if len(state) != 8 {
t.Fatalf("state length: got %d, want 8", len(state))
}
Expand All @@ -588,13 +593,58 @@ func TestParseCheckpointHeader_Golden(t *testing.T) {
}
}

func TestParseCheckpointHeader_V3Golden(t *testing.T) {
data, err := os.ReadFile("testdata/checkpoint_v3.bin")
if err != nil {
t.Fatalf("read golden fixture: %v", err)
}

budgetVal, price, tick, wasmHash, epoch, leaseExpiry, state, err := ParseCheckpointHeader(data)
if err != nil {
t.Fatalf("ParseCheckpointHeader: %v", err)
}

if budgetVal != 2000000 {
t.Errorf("budget: got %d, want 2000000", budgetVal)
}
if price != 1500 {
t.Errorf("price: got %d, want 1500", price)
}
if tick != 10 {
t.Errorf("tick: got %d, want 10", tick)
}

expectedHash := sha256.Sum256([]byte("known-wasm-binary-for-golden-test"))
if wasmHash != expectedHash {
t.Errorf("wasmHash mismatch")
}

if epoch.MajorVersion != 3 {
t.Errorf("majorVersion: got %d, want 3", epoch.MajorVersion)
}
if epoch.LeaseGeneration != 7 {
t.Errorf("leaseGeneration: got %d, want 7", epoch.LeaseGeneration)
}
if leaseExpiry != 1700000000000000000 {
t.Errorf("leaseExpiry: got %d, want 1700000000000000000", leaseExpiry)
}

if len(state) != 8 {
t.Fatalf("state length: got %d, want 8", len(state))
}
counter := binary.LittleEndian.Uint64(state)
if counter != 5 {
t.Errorf("counter: got %d, want 5", counter)
}
}

func TestParseCheckpointHeader_EmptyState(t *testing.T) {
data, err := os.ReadFile("testdata/checkpoint_empty_state.bin")
if err != nil {
t.Fatalf("read golden fixture: %v", err)
}

budgetVal, price, tick, _, state, err := ParseCheckpointHeader(data)
budgetVal, price, tick, _, _, _, state, err := ParseCheckpointHeader(data)
if err != nil {
t.Fatalf("ParseCheckpointHeader: %v", err)
}
Expand All @@ -621,7 +671,7 @@ func TestParseCheckpointHeader_NegativeBudget(t *testing.T) {
binary.LittleEndian.PutUint64(checkpoint[1:9], uint64(negBudget))
binary.LittleEndian.PutUint64(checkpoint[9:17], 1000)

_, _, _, _, _, err := ParseCheckpointHeader(checkpoint)
_, _, _, _, _, _, _, err := ParseCheckpointHeader(checkpoint)
if err == nil {
t.Error("expected error for negative budget in checkpoint")
}
Expand All @@ -634,7 +684,7 @@ func TestParseCheckpointHeader_NegativePrice(t *testing.T) {
negPrice := int64(-500)
binary.LittleEndian.PutUint64(checkpoint[9:17], uint64(negPrice))

_, _, _, _, _, err := ParseCheckpointHeader(checkpoint)
_, _, _, _, _, _, _, err := ParseCheckpointHeader(checkpoint)
if err == nil {
t.Error("expected error for negative price in checkpoint")
}
Expand All @@ -658,7 +708,7 @@ func TestParseCheckpointHeader_Corruption(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, _, _, _, err := ParseCheckpointHeader(tt.input)
_, _, _, _, _, _, _, err := ParseCheckpointHeader(tt.input)
if err == nil {
t.Error("expected error for corrupted checkpoint")
}
Expand Down
Binary file added internal/agent/testdata/checkpoint_v3.bin
Binary file not shown.
14 changes: 14 additions & 0 deletions internal/agent/testdata/gen_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ func main() {
binary.LittleEndian.PutUint64(empty[17:25], 0)
copy(empty[25:57], hash[:])
os.WriteFile("checkpoint_empty_state.bin", empty, 0644)

// checkpoint_v3.bin: 81-byte v0x03 header + 8-byte counter state
// budget=2000000, price=1500, tick=10, epoch=(3,7), leaseExpiry=1700000000000000000, state=counter(5)
v3 := make([]byte, 81+8)
v3[0] = 0x03
binary.LittleEndian.PutUint64(v3[1:9], 2000000)
binary.LittleEndian.PutUint64(v3[9:17], 1500)
binary.LittleEndian.PutUint64(v3[17:25], 10)
copy(v3[25:57], hash[:])
binary.LittleEndian.PutUint64(v3[57:65], 3) // majorVersion
binary.LittleEndian.PutUint64(v3[65:73], 7) // leaseGeneration
binary.LittleEndian.PutUint64(v3[73:81], 1700000000000000000) // leaseExpiry (unix nanos)
binary.LittleEndian.PutUint64(v3[81:89], 5) // counter=5
os.WriteFile("checkpoint_v3.bin", v3, 0644)
}
Loading