Skip to content

Commit 2277e25

Browse files
committed
feat: on start fetch onchain state of oracle
1 parent 37de0de commit 2277e25

6 files changed

Lines changed: 161 additions & 62 deletions

File tree

services/bridge/internal/bridge/bridge.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,22 @@ func (b *Bridge) Start(ctx context.Context) error {
365365

366366
logger.Info("Starting bridge service")
367367

368+
// Load chain state per oracle per symbol
369+
if b.routerRegistry != nil {
370+
logger.Info("Fetching router state from on-chain...")
371+
ethClients := make(map[int64]rpc.EthClient)
372+
for chainID, writeClient := range b.writeClients {
373+
ethClients[chainID] = writeClient.GetEthClient()
374+
}
375+
routers := b.routerRegistry.GetActiveRouters()
376+
for _, router := range routers {
377+
if err := router.FetchOracleStateFromOnChain(ctx, ethClients); err != nil {
378+
logger.Warnf("Router state fetch failed for %s: %v", router.ID(), err)
379+
}
380+
}
381+
logger.Info("Router state fetch completed")
382+
}
383+
368384
// Start transaction queue manager
369385
b.queueManager.Start()
370386

@@ -450,6 +466,7 @@ func (b *Bridge) getOrCreateOraclePool(routerID string) *worker.WorkerPool {
450466
b.workerPoolConfig.TaskTimeout.Duration())
451467

452468
pool = worker.NewWorkerPool(
469+
routerID,
453470
b.workerPoolConfig.MaxWorkers,
454471
b.workerPoolConfig.TaskQueueSize,
455472
b.workerPoolConfig.TaskTimeout.Duration(),

services/bridge/internal/leader/onchain_monitor.go

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
package leader
22

33
import (
4-
"bytes"
54
"context"
6-
"fmt"
75
"math/big"
86
"strconv"
97
"strings"
108
"sync"
119
"time"
1210

13-
"github.com/ethereum/go-ethereum"
14-
"github.com/ethereum/go-ethereum/accounts/abi"
1511
"github.com/ethereum/go-ethereum/common"
1612

1713
"github.com/diadata.org/Spectra-interoperability/pkg/logger"
@@ -288,58 +284,7 @@ func (m *OnChainMonitor) generateKey(chainID int64, contractAddress common.Addre
288284
}
289285

290286
func (m *OnChainMonitor) getValueFromContract(dest *DestinationMonitor, symbol string) (*big.Int, uint64, error) {
291-
292-
const getValueABI = `[{
293-
"inputs": [{"internalType": "string", "name": "key", "type": "string"}],
294-
"name": "getValue",
295-
"outputs": [
296-
{"internalType": "uint128", "name": "value", "type": "uint128"},
297-
{"internalType": "uint128", "name": "timestamp", "type": "uint128"}
298-
],
299-
"stateMutability": "view",
300-
"type": "function"
301-
}]`
302-
303-
parsedABI, err := abi.JSON(bytes.NewReader([]byte(getValueABI)))
304-
if err != nil {
305-
return nil, 0, fmt.Errorf("failed to parse ABI: %w", err)
306-
}
307-
308-
data, err := parsedABI.Pack("getValue", symbol)
309-
if err != nil {
310-
return nil, 0, fmt.Errorf("failed to pack input data: %w", err)
311-
}
312-
313-
callMsg := ethereum.CallMsg{
314-
To: &dest.ContractAddress,
315-
Data: data,
316-
}
317-
318-
resultBytes, err := dest.Client.CallContract(m.ctx, callMsg, nil)
319-
if err != nil {
320-
return nil, 0, fmt.Errorf("contract call failed: %w", err)
321-
}
322-
323-
outputs, err := parsedABI.Unpack("getValue", resultBytes)
324-
if err != nil {
325-
return nil, 0, fmt.Errorf("failed to unpack result: %w", err)
326-
}
327-
328-
if len(outputs) != 2 {
329-
return nil, 0, fmt.Errorf("unexpected number of outputs: got %d, want 2", len(outputs))
330-
}
331-
332-
value, ok := outputs[0].(*big.Int)
333-
if !ok {
334-
return nil, 0, fmt.Errorf("failed to convert value to big.Int, got type %T: %v", outputs[0], outputs[0])
335-
}
336-
337-
timestamp, ok := outputs[1].(*big.Int)
338-
if !ok {
339-
return nil, 0, fmt.Errorf("failed to convert timestamp to big.Int, got type %T: %v", outputs[1], outputs[1])
340-
}
341-
342-
return value, timestamp.Uint64(), nil
287+
return utils.GetValueFromOracleContract(m.ctx, dest.Client, dest.ContractAddress, symbol)
343288
}
344289

345290
func formatValue(v *big.Int) string {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/big"
7+
"strings"
8+
9+
"github.com/ethereum/go-ethereum"
10+
"github.com/ethereum/go-ethereum/accounts/abi"
11+
"github.com/ethereum/go-ethereum/common"
12+
13+
"github.com/diadata.org/Spectra-interoperability/pkg/rpc"
14+
)
15+
16+
// GetValueFromOracleContract calls the getValue method on an oracle contract
17+
func GetValueFromOracleContract(ctx context.Context, client rpc.EthClient, contractAddress common.Address, symbol string) (*big.Int, uint64, error) {
18+
const getValueABI = `[{
19+
"inputs": [{"internalType": "string", "name": "key", "type": "string"}],
20+
"name": "getValue",
21+
"outputs": [
22+
{"internalType": "uint128", "name": "value", "type": "uint128"},
23+
{"internalType": "uint128", "name": "timestamp", "type": "uint128"}
24+
],
25+
"stateMutability": "view",
26+
"type": "function"
27+
}]`
28+
29+
parsedABI, err := abi.JSON(strings.NewReader(getValueABI))
30+
if err != nil {
31+
return nil, 0, fmt.Errorf("failed to parse ABI: %w", err)
32+
}
33+
34+
data, err := parsedABI.Pack("getValue", symbol)
35+
if err != nil {
36+
return nil, 0, fmt.Errorf("failed to pack input data: %w", err)
37+
}
38+
39+
callMsg := ethereum.CallMsg{
40+
To: &contractAddress,
41+
Data: data,
42+
}
43+
44+
resultBytes, err := client.CallContract(ctx, callMsg, nil)
45+
if err != nil {
46+
return nil, 0, fmt.Errorf("contract call failed: %w", err)
47+
}
48+
49+
outputs, err := parsedABI.Unpack("getValue", resultBytes)
50+
if err != nil {
51+
return nil, 0, fmt.Errorf("failed to unpack result: %w", err)
52+
}
53+
54+
if len(outputs) != 2 {
55+
return nil, 0, fmt.Errorf("unexpected number of outputs: got %d, want 2", len(outputs))
56+
}
57+
58+
value, ok := outputs[0].(*big.Int)
59+
if !ok {
60+
return nil, 0, fmt.Errorf("failed to convert value to big.Int, got type %T: %v", outputs[0], outputs[0])
61+
}
62+
63+
timestamp, ok := outputs[1].(*big.Int)
64+
if !ok {
65+
return nil, 0, fmt.Errorf("failed to convert timestamp to big.Int, got type %T: %v", outputs[1], outputs[1])
66+
}
67+
68+
return value, timestamp.Uint64(), nil
69+
}

services/bridge/internal/worker/worker_pool.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type WorkerTask struct {
2020

2121
// WorkerPool manages a pool of workers for processing update requests
2222
type WorkerPool struct {
23+
routerID string // Router/oracle identifier for logging
2324
maxWorkers int
2425
taskQueue chan *WorkerTask
2526
workers []*Worker
@@ -43,7 +44,7 @@ type Worker struct {
4344
}
4445

4546
// NewWorkerPool creates a new worker pool
46-
func NewWorkerPool(maxWorkers int, taskQueueSize int, taskTimeout time.Duration) *WorkerPool {
47+
func NewWorkerPool(routerID string, maxWorkers int, taskQueueSize int, taskTimeout time.Duration) *WorkerPool {
4748
// Use taskQueueSize if provided, otherwise fallback to maxWorkers*2
4849
queueSize := taskQueueSize
4950
if queueSize <= 0 {
@@ -54,9 +55,10 @@ func NewWorkerPool(maxWorkers int, taskQueueSize int, taskTimeout time.Duration)
5455
taskTimeout = 6 * time.Minute
5556
}
5657

57-
logger.Infof("Creating worker pool: maxWorkers=%d, taskQueueSize=%d, taskTimeout=%v", maxWorkers, queueSize, taskTimeout)
58+
logger.Infof("Creating worker pool for router %s: maxWorkers=%d, taskQueueSize=%d, taskTimeout=%v", routerID, maxWorkers, queueSize, taskTimeout)
5859

5960
return &WorkerPool{
61+
routerID: routerID,
6062
maxWorkers: maxWorkers,
6163
taskQueue: make(chan *WorkerTask, queueSize),
6264
shutdownChan: make(chan struct{}),
@@ -167,14 +169,14 @@ func (wp *WorkerPool) healthMonitor(ctx context.Context) {
167169

168170
// Log warning if queue is getting full (>80% capacity)
169171
if queueCap > 0 && float64(queueSize)/float64(queueCap) > 0.8 {
170-
logger.Warnf("Worker pool queue nearing capacity: %d/%d (%.1f%%), active workers: %d/%d",
171-
queueSize, queueCap, float64(queueSize)/float64(queueCap)*100, activeCount, wp.maxWorkers)
172+
logger.Warnf("Worker pool queue nearing capacity for %s: %d/%d (%.1f%%), active workers: %d/%d",
173+
wp.routerID, queueSize, queueCap, float64(queueSize)/float64(queueCap)*100, activeCount, wp.maxWorkers)
172174
}
173175

174176
// Log warning if all workers are busy and queue has items
175177
if int(activeCount) >= wp.maxWorkers && queueSize > 0 {
176-
logger.Warnf("All %d workers busy with %d tasks queued - consider increasing worker count",
177-
wp.maxWorkers, queueSize)
178+
logger.Warnf("All %d workers busy for %s with %d tasks queued - consider increasing worker count",
179+
wp.maxWorkers, wp.routerID, queueSize)
178180
}
179181

180182
logger.Debugf("Worker pool health: active=%d/%d, queue=%d/%d",

services/bridge/pkg/router/generic_interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package router
22

33
import (
4+
"context"
5+
6+
"github.com/diadata.org/Spectra-interoperability/pkg/rpc"
47
"github.com/diadata.org/Spectra-interoperability/services/bridge/config"
58
)
69

@@ -17,4 +20,5 @@ type GenericRouterInterface interface {
1720
GetStats() GenericRouterStats
1821
UpdateDestinationTime(dest config.RouterDestination, symbol string, data ...*config.ExtractedData)
1922
GetSymbolFromData(data *config.ExtractedData) string
23+
FetchOracleStateFromOnChain(ctx context.Context, clients map[int64]rpc.EthClient) error
2024
}

services/bridge/pkg/router/generic_router.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package router
22

33
import (
4+
"context"
45
"crypto/ecdsa"
56
"fmt"
67
"math/big"
@@ -14,6 +15,7 @@ import (
1415
"github.com/ethereum/go-ethereum/crypto"
1516

1617
"github.com/diadata.org/Spectra-interoperability/pkg/logger"
18+
"github.com/diadata.org/Spectra-interoperability/pkg/rpc"
1719
"github.com/diadata.org/Spectra-interoperability/services/bridge/config"
1820
"github.com/diadata.org/Spectra-interoperability/services/bridge/internal/utils"
1921
)
@@ -778,6 +780,66 @@ func (gr *GenericRouter) GetStats() GenericRouterStats {
778780
return gr.stats
779781
}
780782

783+
// FetchOracleStateFromOnChain fetches the latest state from on-chain contracts
784+
func (gr *GenericRouter) FetchOracleStateFromOnChain(ctx context.Context, clients map[int64]rpc.EthClient) error {
785+
logger.Infof("Fetching oracle state from on-chain for router: %s", gr.config.ID)
786+
787+
startTime := time.Now()
788+
totalDestinations := 0
789+
successfulFetches := 0
790+
791+
// Get symbols from router config
792+
symbols := GetSymbolsFromConfig(gr.config)
793+
if len(symbols) == 0 {
794+
logger.Infof("No symbols configured for router %s, skipping fetch", gr.config.ID)
795+
return nil
796+
}
797+
798+
logger.Infof("Found %d symbols in router config for fetch: %v", len(symbols), symbols)
799+
800+
for _, dest := range gr.config.Destinations {
801+
client, exists := clients[dest.ChainID]
802+
if !exists {
803+
logger.Warnf("No client found for chain %d, skipping fetch", dest.ChainID)
804+
continue
805+
}
806+
807+
totalDestinations += len(symbols)
808+
809+
for _, symbol := range symbols {
810+
value, timestamp, err := utils.GetValueFromOracleContract(ctx, client, common.HexToAddress(dest.Contract), symbol)
811+
if err != nil {
812+
logger.Warnf("Failed to fetch on-chain state for %s on chain %d: %v", symbol, dest.ChainID, err)
813+
continue
814+
}
815+
816+
// Update in-memory cache
817+
destKey := gr.generateDestinationKey(dest, symbol)
818+
state := gr.getOrCreateDestinationState(destKey)
819+
820+
state.mu.Lock()
821+
if timestamp > 0 {
822+
state.lastUpdate = time.Unix(int64(timestamp), 0)
823+
state.lastTimestamp = timestamp
824+
}
825+
if value != nil && value.Sign() != 0 {
826+
state.lastPrice = value.String()
827+
}
828+
state.mu.Unlock()
829+
830+
successfulFetches++
831+
logger.Infof("Fetched state for %s on chain %d: timestamp=%d, price=%s",
832+
symbol, dest.ChainID, timestamp, value.String())
833+
}
834+
}
835+
836+
duration := time.Since(startTime)
837+
logger.Infof("Router state fetch completed for %s: %d/%d states fetched in %v",
838+
gr.config.ID, successfulFetches, totalDestinations, duration)
839+
840+
return nil
841+
}
842+
781843
// ProcessingConfig returns the router's processing configuration
782844
func (gr *GenericRouter) ProcessingConfig() *config.ProcessingConfig {
783845
return &gr.config.Processing

0 commit comments

Comments
 (0)