Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agents/example/agent.manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"capabilities": {
"clock": { "version": 1 },
"rand": { "version": 1 },
"log": { "version": 1 }
"log": { "version": 1 },
"pricing": { "version": 1 }
},
"resource_limits": {
"max_memory_bytes": 67108864
Expand Down
8 changes: 8 additions & 0 deletions cmd/igord/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"github.com/simonovic86/igor/internal/logging"
"github.com/simonovic86/igor/internal/migration"
"github.com/simonovic86/igor/internal/p2p"
"github.com/simonovic86/igor/internal/pricing"
"github.com/simonovic86/igor/internal/replay"
"github.com/simonovic86/igor/internal/runner"
"github.com/simonovic86/igor/internal/runtime"
"github.com/simonovic86/igor/internal/settlement"
"github.com/simonovic86/igor/internal/simulator"
"github.com/simonovic86/igor/internal/storage"
"github.com/simonovic86/igor/pkg/budget"
Expand Down Expand Up @@ -119,6 +121,9 @@ func main() {
// Initialize migration service
migrationSvc := migration.NewService(node.Host, engine, storageProvider, cfg.ReplayMode, cfg.ReplayCostLog, cfg.PricePerSecond, logger)

// Initialize pricing service for inter-node price discovery
_ = pricing.NewService(node.Host, cfg.PricePerSecond, logger)

// If --migrate-agent flag is provided, perform migration
if *migrateAgent != "" {
if *targetPeer == "" {
Expand Down Expand Up @@ -203,6 +208,9 @@ func runLocalAgent(
// Configure replay window size
instance.SetReplayWindowSize(cfg.ReplayWindowSize)

// Wire budget adapter for settlement validation (EI-6)
instance.BudgetAdapter = settlement.NewMockAdapter(logger)

// Register agent with migration service
migrationSvc.RegisterAgent("local-agent", instance)

Expand Down
41 changes: 34 additions & 7 deletions internal/agent/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/simonovic86/igor/internal/eventlog"
"github.com/simonovic86/igor/internal/hostcall"
"github.com/simonovic86/igor/internal/runtime"
"github.com/simonovic86/igor/internal/settlement"
"github.com/simonovic86/igor/internal/storage"
"github.com/simonovic86/igor/internal/wasmutil"
"github.com/simonovic86/igor/pkg/budget"
Expand Down Expand Up @@ -78,11 +79,12 @@ type Instance struct {
replayWindowMax int // Maximum snapshots retained (0 = use DefaultReplayWindowSize)

// Receipt tracking (Phase 4: Economics)
Receipts []receipt.Receipt // Accumulated payment receipts
lastReceiptTick uint64 // Tick number of the last receipt's epoch end
epochCost int64 // Accumulated cost since last receipt
signingKey ed25519.PrivateKey // Node's signing key; nil = receipts disabled
nodeID string // Node's peer ID string
Receipts []receipt.Receipt // Accumulated payment receipts
lastReceiptTick uint64 // Tick number of the last receipt's epoch end
epochCost int64 // Accumulated cost since last receipt
signingKey ed25519.PrivateKey // Node's signing key; nil = receipts disabled
nodeID string // Node's peer ID string
BudgetAdapter settlement.BudgetAdapter // optional; nil = no external budget validation
}

// walletStateRef is an indirection layer that lets wallet hostcall closures
Expand All @@ -100,6 +102,14 @@ func (w *walletStateRef) GetReceiptBytes(index int) ([]byte, error) {
return w.instance.GetReceiptBytes(index)
}

// pricingStateRef is an indirection layer that lets pricing hostcall closures
// reference the Instance before it is fully constructed. Same pattern as walletStateRef.
type pricingStateRef struct {
instance *Instance
}

func (p *pricingStateRef) GetNodePrice() int64 { return p.instance.PricePerSecond }

// GetBudget returns the current budget (implements hostcall.WalletState).
func (i *Instance) GetBudget() int64 {
return i.Budget
Expand Down Expand Up @@ -229,9 +239,11 @@ func loadAgent(
// The wallet hostcall closures capture this ref; it is only dereferenced during
// agent_tick (not during loading), so the nil instance is safe at this point.
wsRef := &walletStateRef{}
psRef := &pricingStateRef{}

registry := hostcall.NewRegistry(logger, el)
registry.SetWalletState(wsRef)
registry.SetPricingState(psRef)
if err := registry.RegisterHostModule(ctx, engine.Runtime(), capManifest); err != nil {
return nil, fmt.Errorf("failed to register host module: %w", err)
}
Expand Down Expand Up @@ -273,9 +285,10 @@ func loadAgent(
nodeID: nodeID,
}

// Now that the instance exists, wire the wallet state ref so wallet
// hostcalls can access budget and receipts during agent_tick.
// Now that the instance exists, wire state refs so hostcall closures
// can access budget, receipts, and pricing during agent_tick.
wsRef.instance = instance
psRef.instance = instance

if err := instance.verifyExports(); err != nil {
return nil, fmt.Errorf("agent lifecycle validation failed: %w", err)
Expand Down Expand Up @@ -325,6 +338,13 @@ func (i *Instance) Tick(ctx context.Context) (bool, error) {
return false, fmt.Errorf("budget exhausted: %s", budget.Format(i.Budget))
}

// Budget adapter validation (EI-6: Safety Over Liveness)
if i.BudgetAdapter != nil {
if err := i.BudgetAdapter.ValidateBudget(ctx, i.AgentID, i.Budget); err != nil {
return false, fmt.Errorf("budget validation failed: %w", err)
}
}

// Capture pre-tick state for replay verification
preState, err := i.captureState(ctx)
if err != nil {
Expand Down Expand Up @@ -566,6 +586,13 @@ func (i *Instance) SaveCheckpointToStorage(ctx context.Context) error {
if err := i.Storage.SaveReceipts(ctx, i.AgentID, data); err != nil {
i.logger.Error("Failed to save receipts", "error", err)
}
// Record latest receipt with budget adapter for settlement (non-fatal).
if i.BudgetAdapter != nil {
latestReceipt := i.Receipts[len(i.Receipts)-1]
if err := i.BudgetAdapter.RecordSettlement(ctx, latestReceipt); err != nil {
i.logger.Error("Failed to record settlement", "error", err)
}
}
}

return nil
Expand Down
1 change: 1 addition & 0 deletions internal/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
WalletBalance HostcallID = 4
WalletReceiptCount HostcallID = 5
WalletReceipt HostcallID = 6
NodePrice HostcallID = 7
)

// Entry is a single observation recorded during a tick.
Expand Down
27 changes: 27 additions & 0 deletions internal/hostcall/pricing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package hostcall

import (
"context"
"encoding/binary"

"github.com/simonovic86/igor/internal/eventlog"
"github.com/tetratelabs/wazero"
)

// PricingState provides pricing hostcalls with access to node price configuration.
type PricingState interface {
GetNodePrice() int64 // price per second in microcents
}

// registerPricing adds node_price to the host module builder.
// node_price is an observation hostcall recorded in the event log (CM-4).
func (r *Registry) registerPricing(builder wazero.HostModuleBuilder, ps PricingState) {
builder.NewFunctionBuilder().
WithFunc(func(_ context.Context) int64 {
price := ps.GetNodePrice()
payload := binary.LittleEndian.AppendUint64(nil, uint64(price))
r.eventLog.Record(eventlog.NodePrice, payload)
return price
}).
Export("node_price")
}
18 changes: 15 additions & 3 deletions internal/hostcall/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (

// Registry builds and manages the igor host module for a single agent.
type Registry struct {
logger *slog.Logger
eventLog *eventlog.EventLog
walletState WalletState // optional; nil = wallet hostcalls not available
logger *slog.Logger
eventLog *eventlog.EventLog
walletState WalletState // optional; nil = wallet hostcalls not available
pricingState PricingState // optional; nil = pricing hostcalls not available
}

// NewRegistry creates a hostcall registry bound to the given event log.
Expand All @@ -35,6 +36,12 @@ func (r *Registry) SetWalletState(ws WalletState) {
r.walletState = ws
}

// SetPricingState installs the pricing state provider for pricing hostcalls.
// Must be called before RegisterHostModule if the agent declares "pricing" capability.
func (r *Registry) SetPricingState(ps PricingState) {
r.pricingState = ps
}

// RegisterHostModule builds and instantiates the "igor" WASM host module
// with only the capabilities declared in the manifest.
// Must be called after WASI instantiation and before agent module instantiation.
Expand Down Expand Up @@ -76,6 +83,11 @@ func (r *Registry) RegisterHostModule(
registered++
}

if m.Has("pricing") && r.pricingState != nil {
r.registerPricing(builder, r.pricingState)
registered++
}

// Only instantiate if at least one capability was registered.
// If the agent has an empty manifest, skip module creation entirely.
// If the agent's WASM imports from "igor", instantiation will fail
Expand Down
133 changes: 133 additions & 0 deletions internal/pricing/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Package pricing implements the /igor/price/1.0.0 protocol for inter-node
// price discovery. Nodes respond to price queries with their current execution
// pricing, enabling agents to make cost-aware migration decisions.
package pricing

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)

// PriceProtocol is the Igor price query protocol identifier.
const PriceProtocol protocol.ID = "/igor/price/1.0.0"

// PriceRequest is sent by the querying node.
type PriceRequest struct {
// AgentID is optional: future use for agent-specific pricing.
AgentID string `json:"agent_id,omitempty"`
}

// PriceResponse is returned by the responding node.
type PriceResponse struct {
PricePerSecond int64 `json:"price_per_second"` // microcents/sec
NodeID string `json:"node_id"` // peer ID of responding node
}

// Service handles price advertisement and queries over libp2p streams.
type Service struct {
host host.Host
pricePerSecond int64
logger *slog.Logger
}

// NewService creates and registers the pricing service on the given host.
func NewService(h host.Host, pricePerSecond int64, logger *slog.Logger) *Service {
svc := &Service{
host: h,
pricePerSecond: pricePerSecond,
logger: logger,
}
h.SetStreamHandler(PriceProtocol, svc.handlePriceQuery)
logger.Info("Pricing service initialized",
"price_per_second", pricePerSecond,
)
return svc
}

// handlePriceQuery responds to incoming price queries.
func (s *Service) handlePriceQuery(stream network.Stream) {
defer stream.Close()

if err := stream.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
s.logger.Error("Failed to set price query read deadline", "error", err)
return
}

remotePeer := stream.Conn().RemotePeer()

var req PriceRequest
if err := json.NewDecoder(stream).Decode(&req); err != nil {
s.logger.Error("Failed to decode price request",
"from_peer", remotePeer.String(),
"error", err,
)
return
}

resp := PriceResponse{
PricePerSecond: s.pricePerSecond,
NodeID: s.host.ID().String(),
}

if err := json.NewEncoder(stream).Encode(resp); err != nil {
s.logger.Error("Failed to encode price response",
"to_peer", remotePeer.String(),
"error", err,
)
return
}

s.logger.Info("Served price query",
"from_peer", remotePeer.String(),
"price", s.pricePerSecond,
)
}

// QueryPeerPrice queries a remote peer's execution price.
func (s *Service) QueryPeerPrice(ctx context.Context, peerAddr string) (*PriceResponse, error) {
maddr, err := multiaddr.NewMultiaddr(peerAddr)
if err != nil {
return nil, fmt.Errorf("invalid peer address: %w", err)
}

addrInfo, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
return nil, fmt.Errorf("failed to extract peer info: %w", err)
}

if err := s.host.Connect(ctx, *addrInfo); err != nil {
return nil, fmt.Errorf("failed to connect to peer: %w", err)
}

stream, err := s.host.NewStream(ctx, addrInfo.ID, PriceProtocol)
if err != nil {
return nil, fmt.Errorf("failed to open price stream: %w", err)
}
defer stream.Close()

req := PriceRequest{}
if err := json.NewEncoder(stream).Encode(req); err != nil {
return nil, fmt.Errorf("failed to send price request: %w", err)
}

var resp PriceResponse
if err := json.NewDecoder(stream).Decode(&resp); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound QueryPeerPrice stream reads to the request context

After NewStream succeeds, the code does json.NewDecoder(stream).Decode(&resp) without setting a read deadline, so a peer that accepts the stream but never sends a response can block this call indefinitely even if the caller passed a timed context. This makes price discovery hang in partially-failing or adversarial networks instead of respecting the caller’s timeout budget.

Useful? React with 👍 / 👎.

return nil, fmt.Errorf("failed to read price response: %w", err)
}

s.logger.Info("Received peer price",
"peer_id", addrInfo.ID.String(),
"price_per_second", resp.PricePerSecond,
)

return &resp, nil
}
Loading