diff --git a/agents/example/agent.manifest.json b/agents/example/agent.manifest.json index 14550d6..4dba462 100644 --- a/agents/example/agent.manifest.json +++ b/agents/example/agent.manifest.json @@ -2,7 +2,8 @@ "capabilities": { "clock": { "version": 1 }, "rand": { "version": 1 }, - "log": { "version": 1 } + "log": { "version": 1 }, + "pricing": { "version": 1 } }, "resource_limits": { "max_memory_bytes": 67108864 diff --git a/cmd/igord/main.go b/cmd/igord/main.go index 5dcab44..08fd8cf 100644 --- a/cmd/igord/main.go +++ b/cmd/igord/main.go @@ -16,9 +16,11 @@ import ( "github.com/simonovic86/igor/internal/logging" "github.com/simonovic86/igor/internal/migration" "github.com/simonovic86/igor/internal/p2p" + "github.com/simonovic86/igor/internal/pricing" "github.com/simonovic86/igor/internal/replay" "github.com/simonovic86/igor/internal/runner" "github.com/simonovic86/igor/internal/runtime" + "github.com/simonovic86/igor/internal/settlement" "github.com/simonovic86/igor/internal/simulator" "github.com/simonovic86/igor/internal/storage" "github.com/simonovic86/igor/pkg/budget" @@ -119,6 +121,9 @@ func main() { // Initialize migration service migrationSvc := migration.NewService(node.Host, engine, storageProvider, cfg.ReplayMode, cfg.ReplayCostLog, cfg.PricePerSecond, logger) + // Initialize pricing service for inter-node price discovery + _ = pricing.NewService(node.Host, cfg.PricePerSecond, logger) + // If --migrate-agent flag is provided, perform migration if *migrateAgent != "" { if *targetPeer == "" { @@ -203,6 +208,9 @@ func runLocalAgent( // Configure replay window size instance.SetReplayWindowSize(cfg.ReplayWindowSize) + // Wire budget adapter for settlement validation (EI-6) + instance.BudgetAdapter = settlement.NewMockAdapter(logger) + // Register agent with migration service migrationSvc.RegisterAgent("local-agent", instance) diff --git a/internal/agent/instance.go b/internal/agent/instance.go index 4d9f529..5502717 100644 --- a/internal/agent/instance.go +++ b/internal/agent/instance.go @@ -16,6 +16,7 @@ import ( "github.com/simonovic86/igor/internal/eventlog" "github.com/simonovic86/igor/internal/hostcall" "github.com/simonovic86/igor/internal/runtime" + "github.com/simonovic86/igor/internal/settlement" "github.com/simonovic86/igor/internal/storage" "github.com/simonovic86/igor/internal/wasmutil" "github.com/simonovic86/igor/pkg/budget" @@ -78,11 +79,12 @@ type Instance struct { replayWindowMax int // Maximum snapshots retained (0 = use DefaultReplayWindowSize) // Receipt tracking (Phase 4: Economics) - Receipts []receipt.Receipt // Accumulated payment receipts - lastReceiptTick uint64 // Tick number of the last receipt's epoch end - epochCost int64 // Accumulated cost since last receipt - signingKey ed25519.PrivateKey // Node's signing key; nil = receipts disabled - nodeID string // Node's peer ID string + Receipts []receipt.Receipt // Accumulated payment receipts + lastReceiptTick uint64 // Tick number of the last receipt's epoch end + epochCost int64 // Accumulated cost since last receipt + signingKey ed25519.PrivateKey // Node's signing key; nil = receipts disabled + nodeID string // Node's peer ID string + BudgetAdapter settlement.BudgetAdapter // optional; nil = no external budget validation } // walletStateRef is an indirection layer that lets wallet hostcall closures @@ -100,6 +102,14 @@ func (w *walletStateRef) GetReceiptBytes(index int) ([]byte, error) { return w.instance.GetReceiptBytes(index) } +// pricingStateRef is an indirection layer that lets pricing hostcall closures +// reference the Instance before it is fully constructed. Same pattern as walletStateRef. +type pricingStateRef struct { + instance *Instance +} + +func (p *pricingStateRef) GetNodePrice() int64 { return p.instance.PricePerSecond } + // GetBudget returns the current budget (implements hostcall.WalletState). func (i *Instance) GetBudget() int64 { return i.Budget @@ -229,9 +239,11 @@ func loadAgent( // The wallet hostcall closures capture this ref; it is only dereferenced during // agent_tick (not during loading), so the nil instance is safe at this point. wsRef := &walletStateRef{} + psRef := &pricingStateRef{} registry := hostcall.NewRegistry(logger, el) registry.SetWalletState(wsRef) + registry.SetPricingState(psRef) if err := registry.RegisterHostModule(ctx, engine.Runtime(), capManifest); err != nil { return nil, fmt.Errorf("failed to register host module: %w", err) } @@ -273,9 +285,10 @@ func loadAgent( nodeID: nodeID, } - // Now that the instance exists, wire the wallet state ref so wallet - // hostcalls can access budget and receipts during agent_tick. + // Now that the instance exists, wire state refs so hostcall closures + // can access budget, receipts, and pricing during agent_tick. wsRef.instance = instance + psRef.instance = instance if err := instance.verifyExports(); err != nil { return nil, fmt.Errorf("agent lifecycle validation failed: %w", err) @@ -325,6 +338,13 @@ func (i *Instance) Tick(ctx context.Context) (bool, error) { return false, fmt.Errorf("budget exhausted: %s", budget.Format(i.Budget)) } + // Budget adapter validation (EI-6: Safety Over Liveness) + if i.BudgetAdapter != nil { + if err := i.BudgetAdapter.ValidateBudget(ctx, i.AgentID, i.Budget); err != nil { + return false, fmt.Errorf("budget validation failed: %w", err) + } + } + // Capture pre-tick state for replay verification preState, err := i.captureState(ctx) if err != nil { @@ -566,6 +586,13 @@ func (i *Instance) SaveCheckpointToStorage(ctx context.Context) error { if err := i.Storage.SaveReceipts(ctx, i.AgentID, data); err != nil { i.logger.Error("Failed to save receipts", "error", err) } + // Record latest receipt with budget adapter for settlement (non-fatal). + if i.BudgetAdapter != nil { + latestReceipt := i.Receipts[len(i.Receipts)-1] + if err := i.BudgetAdapter.RecordSettlement(ctx, latestReceipt); err != nil { + i.logger.Error("Failed to record settlement", "error", err) + } + } } return nil diff --git a/internal/eventlog/eventlog.go b/internal/eventlog/eventlog.go index fd4db20..66d333e 100644 --- a/internal/eventlog/eventlog.go +++ b/internal/eventlog/eventlog.go @@ -15,6 +15,7 @@ const ( WalletBalance HostcallID = 4 WalletReceiptCount HostcallID = 5 WalletReceipt HostcallID = 6 + NodePrice HostcallID = 7 ) // Entry is a single observation recorded during a tick. diff --git a/internal/hostcall/pricing.go b/internal/hostcall/pricing.go new file mode 100644 index 0000000..a23ea38 --- /dev/null +++ b/internal/hostcall/pricing.go @@ -0,0 +1,27 @@ +package hostcall + +import ( + "context" + "encoding/binary" + + "github.com/simonovic86/igor/internal/eventlog" + "github.com/tetratelabs/wazero" +) + +// PricingState provides pricing hostcalls with access to node price configuration. +type PricingState interface { + GetNodePrice() int64 // price per second in microcents +} + +// registerPricing adds node_price to the host module builder. +// node_price is an observation hostcall recorded in the event log (CM-4). +func (r *Registry) registerPricing(builder wazero.HostModuleBuilder, ps PricingState) { + builder.NewFunctionBuilder(). + WithFunc(func(_ context.Context) int64 { + price := ps.GetNodePrice() + payload := binary.LittleEndian.AppendUint64(nil, uint64(price)) + r.eventLog.Record(eventlog.NodePrice, payload) + return price + }). + Export("node_price") +} diff --git a/internal/hostcall/registry.go b/internal/hostcall/registry.go index 005cb40..5472eab 100644 --- a/internal/hostcall/registry.go +++ b/internal/hostcall/registry.go @@ -16,9 +16,10 @@ import ( // Registry builds and manages the igor host module for a single agent. type Registry struct { - logger *slog.Logger - eventLog *eventlog.EventLog - walletState WalletState // optional; nil = wallet hostcalls not available + logger *slog.Logger + eventLog *eventlog.EventLog + walletState WalletState // optional; nil = wallet hostcalls not available + pricingState PricingState // optional; nil = pricing hostcalls not available } // NewRegistry creates a hostcall registry bound to the given event log. @@ -35,6 +36,12 @@ func (r *Registry) SetWalletState(ws WalletState) { r.walletState = ws } +// SetPricingState installs the pricing state provider for pricing hostcalls. +// Must be called before RegisterHostModule if the agent declares "pricing" capability. +func (r *Registry) SetPricingState(ps PricingState) { + r.pricingState = ps +} + // RegisterHostModule builds and instantiates the "igor" WASM host module // with only the capabilities declared in the manifest. // Must be called after WASI instantiation and before agent module instantiation. @@ -76,6 +83,11 @@ func (r *Registry) RegisterHostModule( registered++ } + if m.Has("pricing") && r.pricingState != nil { + r.registerPricing(builder, r.pricingState) + registered++ + } + // Only instantiate if at least one capability was registered. // If the agent has an empty manifest, skip module creation entirely. // If the agent's WASM imports from "igor", instantiation will fail diff --git a/internal/pricing/service.go b/internal/pricing/service.go new file mode 100644 index 0000000..9995afc --- /dev/null +++ b/internal/pricing/service.go @@ -0,0 +1,133 @@ +// Package pricing implements the /igor/price/1.0.0 protocol for inter-node +// price discovery. Nodes respond to price queries with their current execution +// pricing, enabling agents to make cost-aware migration decisions. +package pricing + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" +) + +// PriceProtocol is the Igor price query protocol identifier. +const PriceProtocol protocol.ID = "/igor/price/1.0.0" + +// PriceRequest is sent by the querying node. +type PriceRequest struct { + // AgentID is optional: future use for agent-specific pricing. + AgentID string `json:"agent_id,omitempty"` +} + +// PriceResponse is returned by the responding node. +type PriceResponse struct { + PricePerSecond int64 `json:"price_per_second"` // microcents/sec + NodeID string `json:"node_id"` // peer ID of responding node +} + +// Service handles price advertisement and queries over libp2p streams. +type Service struct { + host host.Host + pricePerSecond int64 + logger *slog.Logger +} + +// NewService creates and registers the pricing service on the given host. +func NewService(h host.Host, pricePerSecond int64, logger *slog.Logger) *Service { + svc := &Service{ + host: h, + pricePerSecond: pricePerSecond, + logger: logger, + } + h.SetStreamHandler(PriceProtocol, svc.handlePriceQuery) + logger.Info("Pricing service initialized", + "price_per_second", pricePerSecond, + ) + return svc +} + +// handlePriceQuery responds to incoming price queries. +func (s *Service) handlePriceQuery(stream network.Stream) { + defer stream.Close() + + if err := stream.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { + s.logger.Error("Failed to set price query read deadline", "error", err) + return + } + + remotePeer := stream.Conn().RemotePeer() + + var req PriceRequest + if err := json.NewDecoder(stream).Decode(&req); err != nil { + s.logger.Error("Failed to decode price request", + "from_peer", remotePeer.String(), + "error", err, + ) + return + } + + resp := PriceResponse{ + PricePerSecond: s.pricePerSecond, + NodeID: s.host.ID().String(), + } + + if err := json.NewEncoder(stream).Encode(resp); err != nil { + s.logger.Error("Failed to encode price response", + "to_peer", remotePeer.String(), + "error", err, + ) + return + } + + s.logger.Info("Served price query", + "from_peer", remotePeer.String(), + "price", s.pricePerSecond, + ) +} + +// QueryPeerPrice queries a remote peer's execution price. +func (s *Service) QueryPeerPrice(ctx context.Context, peerAddr string) (*PriceResponse, error) { + maddr, err := multiaddr.NewMultiaddr(peerAddr) + if err != nil { + return nil, fmt.Errorf("invalid peer address: %w", err) + } + + addrInfo, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + return nil, fmt.Errorf("failed to extract peer info: %w", err) + } + + if err := s.host.Connect(ctx, *addrInfo); err != nil { + return nil, fmt.Errorf("failed to connect to peer: %w", err) + } + + stream, err := s.host.NewStream(ctx, addrInfo.ID, PriceProtocol) + if err != nil { + return nil, fmt.Errorf("failed to open price stream: %w", err) + } + defer stream.Close() + + req := PriceRequest{} + if err := json.NewEncoder(stream).Encode(req); err != nil { + return nil, fmt.Errorf("failed to send price request: %w", err) + } + + var resp PriceResponse + if err := json.NewDecoder(stream).Decode(&resp); err != nil { + return nil, fmt.Errorf("failed to read price response: %w", err) + } + + s.logger.Info("Received peer price", + "peer_id", addrInfo.ID.String(), + "price_per_second", resp.PricePerSecond, + ) + + return &resp, nil +} diff --git a/internal/pricing/service_test.go b/internal/pricing/service_test.go new file mode 100644 index 0000000..47a1dd2 --- /dev/null +++ b/internal/pricing/service_test.go @@ -0,0 +1,93 @@ +package pricing + +import ( + "context" + "fmt" + "log/slog" + "os" + "testing" + "time" + + "github.com/libp2p/go-libp2p" +) + +func testLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn})) +} + +func TestPriceQuery_RoundTrip(t *testing.T) { + ctx := context.Background() + + // Create two libp2p hosts + hostA, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + if err != nil { + t.Fatalf("create host A: %v", err) + } + defer hostA.Close() + + hostB, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + if err != nil { + t.Fatalf("create host B: %v", err) + } + defer hostB.Close() + + // Register pricing service on host B with price 5000 microcents/sec + _ = NewService(hostB, 5000, testLogger()) + + // Create pricing service on host A to query B + svcA := NewService(hostA, 1000, testLogger()) + + // Build multiaddr for host B + addrs := hostB.Addrs() + if len(addrs) == 0 { + t.Fatal("host B has no addresses") + } + peerAddr := fmt.Sprintf("%s/p2p/%s", addrs[0].String(), hostB.ID().String()) + + // Query B's price from A + resp, err := svcA.QueryPeerPrice(ctx, peerAddr) + if err != nil { + t.Fatalf("QueryPeerPrice: %v", err) + } + + if resp.PricePerSecond != 5000 { + t.Errorf("expected price 5000, got %d", resp.PricePerSecond) + } + if resp.NodeID != hostB.ID().String() { + t.Errorf("expected node ID %s, got %s", hostB.ID().String(), resp.NodeID) + } +} + +func TestPriceQuery_Timeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + hostA, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + if err != nil { + t.Fatalf("create host A: %v", err) + } + defer hostA.Close() + + svcA := NewService(hostA, 1000, testLogger()) + + // Query an unreachable peer — should fail + _, err = svcA.QueryPeerPrice(ctx, "/ip4/127.0.0.1/tcp/59999/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") + if err == nil { + t.Fatal("expected error for unreachable peer") + } +} + +func TestPriceQuery_InvalidAddress(t *testing.T) { + hostA, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + if err != nil { + t.Fatalf("create host A: %v", err) + } + defer hostA.Close() + + svcA := NewService(hostA, 1000, testLogger()) + + _, err = svcA.QueryPeerPrice(context.Background(), "not-a-valid-multiaddr") + if err == nil { + t.Fatal("expected error for invalid address") + } +} diff --git a/internal/replay/engine.go b/internal/replay/engine.go index 7bd84c0..b15a17c 100644 --- a/internal/replay/engine.go +++ b/internal/replay/engine.go @@ -277,6 +277,11 @@ func registerReplayHostModule( registered++ } + if m.Has("pricing") { + registerReplayPricing(builder, iter, repErr) + registered++ + } + if registered == 0 { return nil } @@ -419,6 +424,30 @@ func registerReplayWallet( Export("wallet_receipt") } +// registerReplayPricing registers node_price that returns the recorded value. +func registerReplayPricing( + builder wazero.HostModuleBuilder, + iter *entryIterator, + repErr *replayError, +) { + builder.NewFunctionBuilder(). + WithFunc(func(_ context.Context) int64 { + entry, err := iter.next(eventlog.NodePrice) + if err != nil { + repErr.err = err + return 0 + } + if len(entry.Payload) != 8 { + repErr.err = fmt.Errorf( + "node_price payload length %d, expected 8", len(entry.Payload), + ) + return 0 + } + return int64(binary.LittleEndian.Uint64(entry.Payload)) + }). + Export("node_price") +} + // replayResume restores agent state in the replay module. func replayResume(ctx context.Context, mod api.Module, state []byte) error { return wasmutil.ResumeAgent(ctx, mod, state) @@ -664,6 +693,11 @@ func registerChainReplayHostModule( registered++ } + if m.Has("pricing") { + registerChainReplayPricing(builder, holder, repErr) + registered++ + } + if registered == 0 { return nil } @@ -732,6 +766,29 @@ func registerChainReplayWallet( Export("wallet_receipt") } +// registerChainReplayPricing registers node_price replay that reads from +// the iteratorHolder, allowing the iterator to be swapped between ticks. +func registerChainReplayPricing( + builder wazero.HostModuleBuilder, + holder *iteratorHolder, + repErr *replayError, +) { + builder.NewFunctionBuilder(). + WithFunc(func(_ context.Context) int64 { + entry, err := holder.iter.next(eventlog.NodePrice) + if err != nil { + repErr.err = err + return 0 + } + if len(entry.Payload) != 8 { + repErr.err = fmt.Errorf("node_price payload length %d, expected 8", len(entry.Payload)) + return 0 + } + return int64(binary.LittleEndian.Uint64(entry.Payload)) + }). + Export("node_price") +} + // firstDiff returns the index of the first differing byte, or -1 if equal. func firstDiff(a, b []byte) int { n := len(a) diff --git a/internal/settlement/adapter.go b/internal/settlement/adapter.go new file mode 100644 index 0000000..bbf5e1a --- /dev/null +++ b/internal/settlement/adapter.go @@ -0,0 +1,27 @@ +// Package settlement provides pluggable budget validation and settlement +// recording. The BudgetAdapter interface abstracts payment infrastructure +// so the runtime can gate tick execution on budget validity (EI-6) and +// record receipts for external settlement systems. +package settlement + +import ( + "context" + + "github.com/simonovic86/igor/pkg/receipt" +) + +// BudgetAdapter validates agent budgets and records settlement events. +// Implementations range from a no-op mock (testing) to an EVM L2 +// settlement adapter (future). The runtime calls ValidateBudget before +// each tick and RecordSettlement after each checkpoint receipt. +type BudgetAdapter interface { + // ValidateBudget checks whether the agent's budget is valid for execution. + // Returns nil if valid, an error otherwise. Called before each tick to + // gate execution on budget validity per EI-6 (Safety Over Liveness). + ValidateBudget(ctx context.Context, agentID string, budget int64) error + + // RecordSettlement records a payment receipt for audit and settlement. + // Called after each checkpoint epoch when a new receipt is created. + // Errors are non-fatal: the runtime logs them but does not halt execution. + RecordSettlement(ctx context.Context, r receipt.Receipt) error +} diff --git a/internal/settlement/adapter_test.go b/internal/settlement/adapter_test.go new file mode 100644 index 0000000..fd798af --- /dev/null +++ b/internal/settlement/adapter_test.go @@ -0,0 +1,94 @@ +package settlement + +import ( + "context" + "log/slog" + "sync" + "testing" + + "github.com/simonovic86/igor/pkg/receipt" +) + +func TestMockAdapter_ValidateBudget_AlwaysNil(t *testing.T) { + adapter := NewMockAdapter(slog.Default()) + + // Should always return nil regardless of inputs. + if err := adapter.ValidateBudget(context.Background(), "agent-1", 0); err != nil { + t.Errorf("expected nil, got %v", err) + } + if err := adapter.ValidateBudget(context.Background(), "agent-2", -100); err != nil { + t.Errorf("expected nil for negative budget, got %v", err) + } + if err := adapter.ValidateBudget(context.Background(), "", 1_000_000); err != nil { + t.Errorf("expected nil for empty agent ID, got %v", err) + } +} + +func TestMockAdapter_RecordSettlement_Accumulates(t *testing.T) { + adapter := NewMockAdapter(slog.Default()) + ctx := context.Background() + + receipts := []receipt.Receipt{ + {AgentID: "a1", NodeID: "n1", EpochStart: 1, EpochEnd: 5, CostMicrocents: 100}, + {AgentID: "a1", NodeID: "n1", EpochStart: 6, EpochEnd: 10, CostMicrocents: 200}, + {AgentID: "a2", NodeID: "n2", EpochStart: 1, EpochEnd: 3, CostMicrocents: 50}, + } + + for _, r := range receipts { + if err := adapter.RecordSettlement(ctx, r); err != nil { + t.Fatalf("RecordSettlement failed: %v", err) + } + } + + got := adapter.Settlements() + if len(got) != 3 { + t.Fatalf("expected 3 settlements, got %d", len(got)) + } + for i, r := range got { + if r.AgentID != receipts[i].AgentID || r.CostMicrocents != receipts[i].CostMicrocents { + t.Errorf("settlement %d mismatch: got %+v, want %+v", i, r, receipts[i]) + } + } +} + +func TestMockAdapter_Settlements_ReturnsCopy(t *testing.T) { + adapter := NewMockAdapter(slog.Default()) + ctx := context.Background() + + _ = adapter.RecordSettlement(ctx, receipt.Receipt{AgentID: "a1", CostMicrocents: 100}) + + s1 := adapter.Settlements() + s1[0].CostMicrocents = 999 + + s2 := adapter.Settlements() + if s2[0].CostMicrocents != 100 { + t.Error("Settlements() did not return a copy; mutation leaked") + } +} + +func TestMockAdapter_ConcurrentSafe(t *testing.T) { + adapter := NewMockAdapter(slog.Default()) + ctx := context.Background() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(2) + go func() { + defer wg.Done() + _ = adapter.ValidateBudget(ctx, "agent", 1000) + }() + go func() { + defer wg.Done() + _ = adapter.RecordSettlement(ctx, receipt.Receipt{AgentID: "agent"}) + }() + } + wg.Wait() + + got := adapter.Settlements() + if len(got) != 100 { + t.Errorf("expected 100 settlements, got %d", len(got)) + } +} + +// Compile-time interface check. +var _ BudgetAdapter = (*MockAdapter)(nil) diff --git a/internal/settlement/mock.go b/internal/settlement/mock.go new file mode 100644 index 0000000..33bb522 --- /dev/null +++ b/internal/settlement/mock.go @@ -0,0 +1,50 @@ +package settlement + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/simonovic86/igor/pkg/receipt" +) + +// MockAdapter always validates budgets as valid and records settlements +// in memory. Used for testing and development without external payment rails. +type MockAdapter struct { + mu sync.Mutex + settlements []receipt.Receipt + logger *slog.Logger +} + +// NewMockAdapter creates a mock settlement adapter. +func NewMockAdapter(logger *slog.Logger) *MockAdapter { + return &MockAdapter{logger: logger} +} + +// ValidateBudget always returns nil (budget always valid in mock mode). +func (m *MockAdapter) ValidateBudget(_ context.Context, _ string, _ int64) error { + return nil +} + +// RecordSettlement stores the receipt in memory for inspection. +func (m *MockAdapter) RecordSettlement(_ context.Context, r receipt.Receipt) error { + m.mu.Lock() + defer m.mu.Unlock() + m.settlements = append(m.settlements, r) + m.logger.Info("Settlement recorded", + "agent_id", r.AgentID, + "cost", r.CostMicrocents, + "epoch", fmt.Sprintf("%d-%d", r.EpochStart, r.EpochEnd), + ) + return nil +} + +// Settlements returns a copy of all recorded settlements. +func (m *MockAdapter) Settlements() []receipt.Receipt { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]receipt.Receipt, len(m.settlements)) + copy(out, m.settlements) + return out +} diff --git a/internal/simulator/hostcalls.go b/internal/simulator/hostcalls.go index 26b3054..652ca4c 100644 --- a/internal/simulator/hostcalls.go +++ b/internal/simulator/hostcalls.go @@ -23,11 +23,13 @@ type deterministicHostcalls struct { eventLog *eventlog.EventLog logger *slog.Logger budget int64 + nodePrice int64 } func newDeterministicHostcalls( clockStart, clockDelta int64, randSeed uint64, + nodePrice int64, el *eventlog.EventLog, logger *slog.Logger, ) *deterministicHostcalls { @@ -37,6 +39,7 @@ func newDeterministicHostcalls( randSrc: rand.New(rand.NewPCG(randSeed, randSeed)), eventLog: el, logger: logger, + nodePrice: nodePrice, } } @@ -68,6 +71,11 @@ func (d *deterministicHostcalls) registerHostModule( registered++ } + if m.Has("pricing") { + d.registerPricing(builder) + registered++ + } + if registered == 0 { return nil } @@ -157,3 +165,17 @@ func (d *deterministicHostcalls) registerWallet(builder wazero.HostModuleBuilder }). Export("wallet_receipt") } + +func (d *deterministicHostcalls) registerPricing(builder wazero.HostModuleBuilder) { + // node_price() -> i64 + builder.NewFunctionBuilder(). + WithFunc(func(_ context.Context) int64 { + d.mu.Lock() + price := d.nodePrice + d.mu.Unlock() + payload := binary.LittleEndian.AppendUint64(nil, uint64(price)) + d.eventLog.Record(eventlog.NodePrice, payload) + return price + }). + Export("node_price") +} diff --git a/internal/simulator/simulator.go b/internal/simulator/simulator.go index e1d55a2..903cb5c 100644 --- a/internal/simulator/simulator.go +++ b/internal/simulator/simulator.go @@ -194,7 +194,7 @@ func registerHostcalls( if clockDelta == 0 { clockDelta = 1_000_000_000 } - dhc := newDeterministicHostcalls(clockStart, clockDelta, cfg.RandSeed, el, logger) + dhc := newDeterministicHostcalls(clockStart, clockDelta, cfg.RandSeed, budget.FromFloat(cfg.PricePerSecond), el, logger) if err := dhc.registerHostModule(ctx, rt, capManifest); err != nil { return fmt.Errorf("register deterministic hostcalls: %w", err) } diff --git a/pkg/manifest/parse.go b/pkg/manifest/parse.go index a6bd1f0..d7bd5b7 100644 --- a/pkg/manifest/parse.go +++ b/pkg/manifest/parse.go @@ -10,7 +10,7 @@ import ( ) // NodeCapabilities lists capabilities available on the current node. -var NodeCapabilities = []string{"clock", "rand", "log", "wallet"} +var NodeCapabilities = []string{"clock", "rand", "log", "wallet", "pricing"} // ParseCapabilityManifest parses a capability manifest from JSON bytes. // An empty or nil input returns an empty manifest (no capabilities declared). diff --git a/sdk/igor/hostcalls_pricing_wasm.go b/sdk/igor/hostcalls_pricing_wasm.go new file mode 100644 index 0000000..ccf9e1a --- /dev/null +++ b/sdk/igor/hostcalls_pricing_wasm.go @@ -0,0 +1,14 @@ +//go:build tinygo || wasip1 + +package igor + +// Raw WASM import for pricing hostcall from the igor host module. + +//go:wasmimport igor node_price +func nodePrice() int64 + +// NodePrice returns the current node's execution price in microcents per second. +// Requires the "pricing" capability in the agent manifest. +func NodePrice() int64 { + return nodePrice() +} diff --git a/sdk/igor/hostcalls_wrappers_stub.go b/sdk/igor/hostcalls_wrappers_stub.go index f2f29be..bba1755 100644 --- a/sdk/igor/hostcalls_wrappers_stub.go +++ b/sdk/igor/hostcalls_wrappers_stub.go @@ -70,3 +70,12 @@ func WalletReceipt(index int) ([]byte, error) { } panic("igor: WalletReceipt requires WASM runtime or mock (see sdk/igor/mock)") } + +// NodePrice returns the current node's execution price in microcents per second. +// In non-WASM builds, dispatches to the registered MockBackend. +func NodePrice() int64 { + if activeMock != nil { + return activeMock.NodePrice() + } + panic("igor: NodePrice requires WASM runtime or mock (see sdk/igor/mock)") +} diff --git a/sdk/igor/mock/mock.go b/sdk/igor/mock/mock.go index 6f6e1d4..3c3ea61 100644 --- a/sdk/igor/mock/mock.go +++ b/sdk/igor/mock/mock.go @@ -28,12 +28,13 @@ import ( // Runtime provides mock implementations of Igor hostcalls for native testing. type Runtime struct { - mu sync.Mutex - clock func() int64 - randSrc *rand.Rand - logs []string - budget int64 - receipts [][]byte + mu sync.Mutex + clock func() int64 + randSrc *rand.Rand + logs []string + budget int64 + receipts [][]byte + nodePrice int64 } // New creates a mock runtime using the real system clock and crypto-seeded rand. @@ -167,3 +168,17 @@ func (r *Runtime) AddReceipt(data []byte) { defer r.mu.Unlock() r.receipts = append(r.receipts, data) } + +// NodePrice implements MockBackend. +func (r *Runtime) NodePrice() int64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.nodePrice +} + +// SetNodePrice sets the mock node price returned by NodePrice. +func (r *Runtime) SetNodePrice(p int64) { + r.mu.Lock() + defer r.mu.Unlock() + r.nodePrice = p +} diff --git a/sdk/igor/mock_backend.go b/sdk/igor/mock_backend.go index 6408671..279ebd3 100644 --- a/sdk/igor/mock_backend.go +++ b/sdk/igor/mock_backend.go @@ -9,6 +9,7 @@ type MockBackend interface { WalletBalance() int64 WalletReceiptCount() int WalletReceipt(index int) ([]byte, error) + NodePrice() int64 } // activeMock is set by mock.Enable() and cleared by mock.Disable().