Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,33 @@ nats consumer sub transfer transaction-consumer

---

## 🌐 HTTP API Examples

```bash
# set this to your configured services.port
export INDEXER_PORT=8080
```

### Health Check

```bash
curl -s "http://localhost:${INDEXER_PORT}/health" | jq
```

### Reload TON Jetton Registry (all TON chains)

```bash
curl -s -X POST "http://localhost:${INDEXER_PORT}/ton/jettons/reload" | jq
```

### Reload TON Jetton Registry For One Chain

```bash
curl -s -X POST "http://localhost:${INDEXER_PORT}/ton/jettons/reload?chain=ton_mainnet" | jq
```

---

## 📝 Example `configs/config.yaml` (chains section)

```yaml
Expand Down
157 changes: 151 additions & 6 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -199,8 +201,10 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from
redisClient,
managerCfg,
)
tonWalletReloadService := worker.NewTonWalletReloadServiceFromManager(manager)
tonJettonReloadService := worker.NewTonJettonReloadServiceFromManager(manager)

healthServer := startHealthServer(cfg.Services.Port, cfg)
healthServer := startHealthServer(cfg.Services.Port, cfg, tonWalletReloadService, tonJettonReloadService)

// Start all workers
logger.Info("Starting all workers")
Expand Down Expand Up @@ -232,7 +236,34 @@ type HealthResponse struct {
Version string `json:"version"`
}

func startHealthServer(port int, cfg *config.Config) *http.Server {
type APIErrorResponse struct {
Status string `json:"status"`
Error string `json:"error"`
Timestamp time.Time `json:"timestamp"`
}

type TonWalletReloadResponse struct {
Status string `json:"status"`
Source worker.WalletReloadSource `json:"source"`
Chain string `json:"chain,omitempty"`
Results []worker.TonWalletReloadResult `json:"results"`
TriggeredAtUTC time.Time `json:"triggered_at_utc"`
SupportedSources []worker.WalletReloadSource `json:"supported_sources"`
}

type TonJettonReloadResponse struct {
Status string `json:"status"`
Chain string `json:"chain,omitempty"`
Results []worker.TonJettonReloadResult `json:"results"`
TriggeredAtUTC time.Time `json:"triggered_at_utc"`
}

func startHealthServer(
port int,
cfg *config.Config,
tonWalletReloadService *worker.TonWalletReloadService,
tonJettonReloadService *worker.TonJettonReloadService,
) *http.Server {
mux := http.NewServeMux()

version := cfg.Version
Expand All @@ -246,10 +277,84 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
Timestamp: time.Now().UTC(),
Version: version,
}
writeJSON(w, http.StatusOK, response)
})

mux.HandleFunc("/ton/wallets/reload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeErrorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}

if tonWalletReloadService == nil {
writeErrorJSON(w, http.StatusNotFound, worker.ErrNoTonWorkerConfigured.Error())
return
}

source := worker.WalletReloadSource(r.URL.Query().Get("source")).Normalize()
if !source.IsValid() {
writeErrorJSON(w, http.StatusBadRequest, "invalid source (supported: kv, db)")
return
}

req := worker.TonWalletReloadRequest{
Source: source,
ChainFilter: strings.TrimSpace(r.URL.Query().Get("chain")),
}

results, err := tonWalletReloadService.ReloadTonWallets(r.Context(), req)
if err != nil {
statusCode := http.StatusInternalServerError
if errors.Is(err, worker.ErrTonWorkerNotFound) || errors.Is(err, worker.ErrNoTonWorkerConfigured) {
statusCode = http.StatusNotFound
}
writeErrorJSON(w, statusCode, err.Error())
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
response := TonWalletReloadResponse{
Status: reloadWalletStatus(results),
Source: source,
Chain: req.ChainFilter,
Results: results,
TriggeredAtUTC: time.Now().UTC(),
SupportedSources: []worker.WalletReloadSource{worker.WalletReloadSourceKV, worker.WalletReloadSourceDB},
}
writeJSON(w, http.StatusOK, response)
})

mux.HandleFunc("/ton/jettons/reload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeErrorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}

if tonJettonReloadService == nil {
writeErrorJSON(w, http.StatusNotFound, worker.ErrNoTonWorkerConfigured.Error())
return
}

req := worker.TonJettonReloadRequest{
ChainFilter: r.URL.Query().Get("chain"),
}

results, err := tonJettonReloadService.ReloadTonJettons(r.Context(), req)
if err != nil {
statusCode := http.StatusInternalServerError
if errors.Is(err, worker.ErrTonWorkerNotFound) || errors.Is(err, worker.ErrNoTonWorkerConfigured) {
statusCode = http.StatusNotFound
}
writeErrorJSON(w, statusCode, err.Error())
return
}

response := TonJettonReloadResponse{
Status: reloadJettonStatus(results),
Chain: req.ChainFilter,
Results: results,
TriggeredAtUTC: time.Now().UTC(),
}
writeJSON(w, http.StatusOK, response)
})

server := &http.Server{
Expand All @@ -258,7 +363,13 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
}

go func() {
logger.Info("Health check server started", "port", port, "endpoint", "/health")
logger.Info(
"Health check server started",
"port", port,
"health_endpoint", "/health",
"wallet_reload_endpoint", "/ton/wallets/reload",
"jetton_reload_endpoint", "/ton/jettons/reload",
)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("Health server failed to start", "error", err)
}
Expand All @@ -267,6 +378,40 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
return server
}

func reloadWalletStatus(results []worker.TonWalletReloadResult) string {
for _, result := range results {
if result.Error != "" {
return "partial_error"
}
}
return "ok"
}

func reloadJettonStatus(results []worker.TonJettonReloadResult) string {
for _, result := range results {
if result.Error != "" {
return "partial_error"
}
}
return "ok"
}

func writeJSON(w http.ResponseWriter, statusCode int, payload any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
if err := json.NewEncoder(w).Encode(payload); err != nil {
logger.Error("Failed to encode response", "status", statusCode, "err", err)
}
}

func writeErrorJSON(w http.ResponseWriter, statusCode int, message string) {
writeJSON(w, statusCode, APIErrorResponse{
Status: "error",
Error: message,
Timestamp: time.Now().UTC(),
})
}

func waitForShutdown() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
Expand Down
35 changes: 35 additions & 0 deletions configs/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,41 @@ chains:
- url: "fullnode.mainnet.sui.io:443" # e.g. 127.0.0.1:9000
- url: "sui-mainnet.nodeinfra.com:443"

ton_mainnet:
network_id: "ton"
internal_code: "TON_MAINNET"
type: "ton"
poll_interval: "5s"
nodes:
- url: "https://ton.org/global.config.json"
throttle:
concurrency: 10
batch_size: 50
jettons:
- master_address: "EQCxE6mUtQJKFnGfaROTgt1lZbDiiX1Gw83iAV82Cn_0opUb" # USDT
symbol: "USDT"
decimals: 6

ton_testnet:
network_id: "ton_testnet"
internal_code: "TON_TESTNET"
type: "ton"
start_block: 0
poll_interval: "4s"
nodes:
- url: "https://ton.org/testnet-global-config.json"
client:
timeout: "15s"
throttle:
rps: 10
burst: 20
batch_size: 1
concurrency: 4
jettons:
- master_address: "0:1de7d5cc59be344d17f1a93274a2dad44ea0fc66ec4b1e10bc4f8d35039f8aaf"
symbol: "USDT"
decimals: 6

# Infrastructure services
services:
port: 8080 # Health check and monitoring server port
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/shopspring/decimal v1.4.0
github.com/stretchr/testify v1.11.1
github.com/tyler-smith/go-bip39 v1.1.0
github.com/xssnick/tonutils-go v1.15.5
golang.org/x/crypto v0.44.0
golang.org/x/sync v0.18.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda
Expand All @@ -34,6 +35,7 @@ require (
replace github.com/imdario/mergo => github.com/imdario/mergo v0.3.16

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/assert/v2 v2.11.0 h1:2Q9r3ki8+JYXvGsDyBXwH3LcJ+WK5D0gc5E8vS6K3D0=
Expand Down Expand Up @@ -281,6 +283,8 @@ github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8=
github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U=
github.com/xssnick/tonutils-go v1.15.5 h1:yAcHnDaY5QW0aIQE47lT0PuDhhHYE+N+NyZssdPKR0s=
github.com/xssnick/tonutils-go v1.15.5/go.mod h1:3/B8mS5IWLTd1xbGbFbzRem55oz/Q86HG884bVsTqZ8=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
Expand Down
2 changes: 1 addition & 1 deletion internal/indexer/sui.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *SuiIndexer) convertCheckpoint(cp *sui.Checkpoint) *types.Block {
func (s *SuiIndexer) convertTransaction(execTx *v2.ExecutedTransaction, blockNumber, blockTs uint64) types.Transaction {
t := types.Transaction{
TxHash: execTx.GetDigest(),
NetworkId: s.cfg.InternalCode,
NetworkId: s.cfg.NetworkId,
BlockNumber: blockNumber,
Timestamp: blockTs,
}
Expand Down
Loading