Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This indexer is designed to be used in a multi-chain environment, where each cha
- Bitcoin
- Solana
- Sui
- Cosmos (Osmosis, Celestia, Cosmos Hub)

---

Expand Down Expand Up @@ -236,6 +237,33 @@ nats consumer sub transfer transaction-consumer

---

## 🌐 HTTP API Examples

```bash
# set this to your configured services.port
export INDEXER_PORT=8080
```

### Health Check

```bash
curl -s "http://localhost:${INDEXER_PORT}/health" | jq
```

### Reload TON Jetton Registry (all TON chains)

```bash
curl -s -X POST "http://localhost:${INDEXER_PORT}/ton/jettons/reload" | jq
```

### Reload TON Jetton Registry For One Chain

```bash
curl -s -X POST "http://localhost:${INDEXER_PORT}/ton/jettons/reload?chain=ton_mainnet" | jq
```

---

## 📝 Example `configs/config.yaml` (chains section)

```yaml
Expand Down Expand Up @@ -277,6 +305,22 @@ chains:
throttle:
rps: 5
burst: 8

cosmoshub_mainnet:
type: "cosmos"
network_id: "cosmoshub-4"
native_denom: "uatom"
nodes:
- url: "https://rpc.cosmos.directory/cosmoshub"
- url: "https://cosmos-rpc.publicnode.com"
poll_interval: "5s"
client:
timeout: "20s"
max_retries: 3
retry_delay: "2s"
throttle:
rps: 20
burst: 40
```

## 📡 Consuming Transaction Events
Expand Down
157 changes: 151 additions & 6 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -199,8 +201,10 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from
redisClient,
managerCfg,
)
tonWalletReloadService := worker.NewTonWalletReloadServiceFromManager(manager)
tonJettonReloadService := worker.NewTonJettonReloadServiceFromManager(manager)

healthServer := startHealthServer(cfg.Services.Port, cfg)
healthServer := startHealthServer(cfg.Services.Port, cfg, tonWalletReloadService, tonJettonReloadService)

// Start all workers
logger.Info("Starting all workers")
Expand Down Expand Up @@ -232,7 +236,34 @@ type HealthResponse struct {
Version string `json:"version"`
}

func startHealthServer(port int, cfg *config.Config) *http.Server {
type APIErrorResponse struct {
Status string `json:"status"`
Error string `json:"error"`
Timestamp time.Time `json:"timestamp"`
}

type TonWalletReloadResponse struct {
Status string `json:"status"`
Source worker.WalletReloadSource `json:"source"`
Chain string `json:"chain,omitempty"`
Results []worker.TonWalletReloadResult `json:"results"`
TriggeredAtUTC time.Time `json:"triggered_at_utc"`
SupportedSources []worker.WalletReloadSource `json:"supported_sources"`
}

type TonJettonReloadResponse struct {
Status string `json:"status"`
Chain string `json:"chain,omitempty"`
Results []worker.TonJettonReloadResult `json:"results"`
TriggeredAtUTC time.Time `json:"triggered_at_utc"`
}

func startHealthServer(
port int,
cfg *config.Config,
tonWalletReloadService *worker.TonWalletReloadService,
tonJettonReloadService *worker.TonJettonReloadService,
) *http.Server {
mux := http.NewServeMux()

version := cfg.Version
Expand All @@ -246,10 +277,84 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
Timestamp: time.Now().UTC(),
Version: version,
}
writeJSON(w, http.StatusOK, response)
})

mux.HandleFunc("/ton/wallets/reload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeErrorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}

if tonWalletReloadService == nil {
writeErrorJSON(w, http.StatusNotFound, worker.ErrNoTonWorkerConfigured.Error())
return
}

source := worker.WalletReloadSource(r.URL.Query().Get("source")).Normalize()
if !source.IsValid() {
writeErrorJSON(w, http.StatusBadRequest, "invalid source (supported: kv, db)")
return
}

req := worker.TonWalletReloadRequest{
Source: source,
ChainFilter: strings.TrimSpace(r.URL.Query().Get("chain")),
}

results, err := tonWalletReloadService.ReloadTonWallets(r.Context(), req)
if err != nil {
statusCode := http.StatusInternalServerError
if errors.Is(err, worker.ErrTonWorkerNotFound) || errors.Is(err, worker.ErrNoTonWorkerConfigured) {
statusCode = http.StatusNotFound
}
writeErrorJSON(w, statusCode, err.Error())
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
response := TonWalletReloadResponse{
Status: reloadWalletStatus(results),
Source: source,
Chain: req.ChainFilter,
Results: results,
TriggeredAtUTC: time.Now().UTC(),
SupportedSources: []worker.WalletReloadSource{worker.WalletReloadSourceKV, worker.WalletReloadSourceDB},
}
writeJSON(w, http.StatusOK, response)
})

mux.HandleFunc("/ton/jettons/reload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeErrorJSON(w, http.StatusMethodNotAllowed, "method not allowed")
return
}

if tonJettonReloadService == nil {
writeErrorJSON(w, http.StatusNotFound, worker.ErrNoTonWorkerConfigured.Error())
return
}

req := worker.TonJettonReloadRequest{
ChainFilter: r.URL.Query().Get("chain"),
}

results, err := tonJettonReloadService.ReloadTonJettons(r.Context(), req)
if err != nil {
statusCode := http.StatusInternalServerError
if errors.Is(err, worker.ErrTonWorkerNotFound) || errors.Is(err, worker.ErrNoTonWorkerConfigured) {
statusCode = http.StatusNotFound
}
writeErrorJSON(w, statusCode, err.Error())
return
}

response := TonJettonReloadResponse{
Status: reloadJettonStatus(results),
Chain: req.ChainFilter,
Results: results,
TriggeredAtUTC: time.Now().UTC(),
}
writeJSON(w, http.StatusOK, response)
})

server := &http.Server{
Expand All @@ -258,7 +363,13 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
}

go func() {
logger.Info("Health check server started", "port", port, "endpoint", "/health")
logger.Info(
"Health check server started",
"port", port,
"health_endpoint", "/health",
"wallet_reload_endpoint", "/ton/wallets/reload",
"jetton_reload_endpoint", "/ton/jettons/reload",
)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("Health server failed to start", "error", err)
}
Expand All @@ -267,6 +378,40 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
return server
}

func reloadWalletStatus(results []worker.TonWalletReloadResult) string {
for _, result := range results {
if result.Error != "" {
return "partial_error"
}
}
return "ok"
}

func reloadJettonStatus(results []worker.TonJettonReloadResult) string {
for _, result := range results {
if result.Error != "" {
return "partial_error"
}
}
return "ok"
}

func writeJSON(w http.ResponseWriter, statusCode int, payload any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
if err := json.NewEncoder(w).Encode(payload); err != nil {
logger.Error("Failed to encode response", "status", statusCode, "err", err)
}
}

func writeErrorJSON(w http.ResponseWriter, statusCode int, message string) {
writeJSON(w, statusCode, APIErrorResponse{
Status: "error",
Error: message,
Timestamp: time.Now().UTC(),
})
}

func waitForShutdown() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
Expand Down
Loading