Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
9c2da86
Beginning of DB unit tests.
jchavannes Jul 13, 2024
fa9ab93
Test get by prefixes
jchavannes Jul 14, 2024
34883c4
Test with multiple prefixes.
jchavannes Jul 14, 2024
6723e0c
Add multiple GetByPrefixes tests.
jchavannes Jul 14, 2024
854adbd
Newest prefix messages test.
jchavannes Jul 15, 2024
1af629e
New store get by prefixes interface.
jchavannes Jul 16, 2024
bb10c55
Close DBs after tests.
jchavannes Jul 18, 2024
3b360e1
Add GetByPrefixes to queue protobuf interface. Implement interface. U…
jchavannes Jul 18, 2024
e7da508
New client GetByPrefixes, convert some uses.
jchavannes Jul 22, 2024
bd452dc
Convert GetByPrefixes uses to GetByPrefixesNew.
jchavannes Jul 23, 2024
0577549
Update GetByPrefix to use new prefixes. Rename GetShardPrefixesNew an…
jchavannes Jul 23, 2024
c407734
Remove some old client functions. Move prefix and options into new fi…
jchavannes Jul 23, 2024
be76f25
Always pass context with GetSingle.
jchavannes Jul 25, 2024
92f092a
Rename max to limit and newest to order descending.
jchavannes Jul 25, 2024
2d9ead2
Remove wait option from queue get messages.
jchavannes Jul 26, 2024
aaccdc0
Remove timeout option.
jchavannes Jul 26, 2024
45ff52d
Start converting calls to GetWOpts to new functions.
jchavannes Jul 26, 2024
f7e93e0
Convert some GetWOpts calls to new interface.
jchavannes Jul 26, 2024
73ad584
GraphQL Playground.
jchavannes Jul 27, 2024
b93b6ce
Convert addr GetWOpts uses.
jchavannes Jul 29, 2024
0b9d506
Get specific items by uids to continue removing GetWOpts usage.
jchavannes Aug 1, 2024
4304569
Finish converting uses of GetWOpts.
jchavannes Aug 2, 2024
8f26561
Merge branch 'master' into search_master
jchavannes Feb 18, 2026
06122e5
Merge branch 'master' into search_master
jchavannes Feb 18, 2026
cfe431a
Architecture diagram
jchavannes Feb 19, 2026
27a8eef
Fix panic where connection is used before initialization
jchavannes Feb 19, 2026
d532ad4
Rename serve all to serve dev
jchavannes Feb 19, 2026
9480bfc
Fix tx hash byte reversal in shard queries
jchavannes Feb 20, 2026
9d23019
Fix hash length validation logic in script packages
jchavannes Feb 20, 2026
6fd759d
Backfill and scan headers maint commands. Add sync status to admin to…
jchavannes Feb 20, 2026
5ce4e91
Move header scanning into lead processor with auto-resume. Remove har…
jchavannes Mar 3, 2026
381a8de
Add tx hash subscription and WaitForTx websocket client. Rename Addrs…
jchavannes Mar 3, 2026
edb217d
Add GetTxProcessed query and check for already-processed tx before su…
jchavannes Mar 3, 2026
8ae7e7a
Simplify Request proto to only support uid lookups. Remove unused sta…
jchavannes Mar 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ go build
./index serve live
```

## Architecture

```mermaid
graph TD
BCH[BCH Node] <-->|P2P| Lead[Lead Processor]
Lead -->|Blocks/Txs| CS[Cluster Shards]

subgraph Shard["Each Shard (0..N)"]
CS --> Queue[Queue Server]
Queue --> DB[(LevelDB)]
end

GraphQL[GraphQL Server] -->|gRPC| Queue
Admin[Admin Server] -->|gRPC| Queue
Network[Network Server] -->|gRPC| Queue

Client([Client]) -->|Query| GraphQL
Client -->|Manage| Admin
Client -->|Submit Tx| Broadcast[Broadcast Server]
Broadcast -->|Raw Tx| Lead
```

## Configuration

Two options for setting config values.
Expand Down
2 changes: 1 addition & 1 deletion admin/server/node/found_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var foundPeersRoute = admin.Route{
var shard uint32
FoundPeersLoop:
for {
foundPeers, err := item.GetFoundPeers(shard, startId, request.Ip, request.Port)
foundPeers, err := item.GetFoundPeers(r.Request.Context(), shard, startId, request.Ip, request.Port)
if err != nil {
log.Fatalf("fatal error getting found peers; %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion admin/server/node/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var historyRoute = admin.Route{
var shard uint32
PeerConnectionsLoop:
for {
peerConnections, err := item.GetPeerConnections(item.PeerConnectionsRequest{
peerConnections, err := item.GetPeerConnections(r.Request.Context(), item.PeerConnectionsRequest{
Shard: shard,
StartId: startId,
Ip: net.ParseIP(historyRequest.Ip),
Expand Down
2 changes: 1 addition & 1 deletion admin/server/node/peer_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var peerReportRoute = admin.Route{
var peerInfo = new(PeerInfo)
for shard := uint32(0); shard < config.GetTotalShards(); shard++ {
for startId := []byte{}; ; {
peerConnections, err := item.GetPeerConnections(item.PeerConnectionsRequest{
peerConnections, err := item.GetPeerConnections(r.Request.Context(), item.PeerConnectionsRequest{
Shard: shard,
StartId: startId,
Max: client.LargeLimit,
Expand Down
2 changes: 1 addition & 1 deletion admin/server/node/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var peersRoute = admin.Route{
r.Error(fmt.Errorf("error unmarshalling peers request; %w", err))
return
}
peerList := peer.NewList()
peerList := peer.NewList(r.Request.Context())
if err := peerList.GetPeers(request.Filter); err != nil {
r.Error(fmt.Errorf("error getting list of peers with filter; %w", err))
return
Expand Down
2 changes: 1 addition & 1 deletion admin/server/topic/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var itemRoute = admin.Route{
var topicItemResponse = new(admin.TopicItemResponse)
shardConfig := config.GetShardConfig(uint32(topicItemRequest.Shard), config.GetQueueShards())
dbClient := client.NewClient(shardConfig.GetHost())
if err := dbClient.GetSingle(topicItemRequest.Topic, uid); err != nil {
if err := dbClient.GetSingle(r.Request.Context(), topicItemRequest.Topic, uid); err != nil {
log.Printf("error getting topic item for admin view; %v", err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion admin/server/topic/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var viewRoute = admin.Route{
continue
}
db := client.NewClient(shardConfig.GetHost())
if err := db.Get(topicViewRequest.Topic, start, false); err != nil {
if err := db.GetByPrefix(r.Request.Context(), topicViewRequest.Topic, client.NewStart(start)); err != nil {
log.Printf("error getting topic items for admin view; %v", err)
return
}
Expand Down
70 changes: 70 additions & 0 deletions client/lib/graph/tx_processed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package graph

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gorilla/websocket"
)

func WaitForTx(ctx context.Context, graphUrl, txHash string) error {
wsUrl := strings.Replace(graphUrl, "https://", "wss://", 1)
wsUrl = strings.Replace(wsUrl, "http://", "ws://", 1)
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
Subprotocols: []string{"graphql-transport-ws"},
}
conn, _, err := dialer.Dial(wsUrl, nil)
if err != nil {
return fmt.Errorf("error connecting websocket for wait tx; %w", err)
}
defer conn.Close()
go func() {
<-ctx.Done()
conn.Close()
}()
if err := conn.WriteJSON(map[string]string{"type": "connection_init"}); err != nil {
return fmt.Errorf("error sending connection_init for wait tx; %w", err)
}
var ack wsMessage
if err := conn.ReadJSON(&ack); err != nil {
return fmt.Errorf("error reading connection_ack for wait tx; %w", err)
}
if ack.Type != "connection_ack" {
return fmt.Errorf("error expected connection_ack, got %s", ack.Type)
}
subscribeMsg := map[string]interface{}{
"id": "1",
"type": "subscribe",
"payload": map[string]interface{}{
"query": `subscription ($hash: Hash!) { tx(hash: $hash) { hash } }`,
"variables": map[string]string{"hash": txHash},
},
}
if err := conn.WriteJSON(subscribeMsg); err != nil {
return fmt.Errorf("error sending subscribe for wait tx; %w", err)
}
for {
var msg wsMessage
if err := conn.ReadJSON(&msg); err != nil {
return fmt.Errorf("error reading message for wait tx; %w", err)
}
switch msg.Type {
case "next":
return nil
case "error":
return fmt.Errorf("error subscription error for wait tx: %s", string(msg.Payload))
case "complete":
return nil
}
}
}

type wsMessage struct {
ID string `json:"id,omitempty"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload,omitempty"`
}
33 changes: 33 additions & 0 deletions cmd/maint/backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package maint

import (
"log"

"github.com/memocash/index/node/act/maint"
"github.com/spf13/cobra"
)

const (
FlagStart = "start"
FlagEnd = "end"
)

var backfillCmd = &cobra.Command{
Use: "backfill",
Short: "Process full blocks in a height range through the saver pipeline",
Run: func(c *cobra.Command, args []string) {
start, _ := c.Flags().GetInt64(FlagStart)
end, _ := c.Flags().GetInt64(FlagEnd)
if start <= 0 || end <= 0 {
log.Fatalf("both --start and --end flags are required and must be positive")
}
if start > end {
log.Fatalf("--start (%d) must be less than or equal to --end (%d)", start, end)
}
backfill := maint.NewBackfill(start, end)
log.Printf("Starting backfill from %d to %d...\n", start, end)
if err := backfill.Run(); err != nil {
log.Fatalf("error running backfill; %v", err)
}
},
}
2 changes: 1 addition & 1 deletion cmd/maint/check_follows.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var checkFollowsCmd = &cobra.Command{
deleteItems, _ := c.Flags().GetBool(FlagDelete)
checkFollows := maint.NewCheckFollows(deleteItems)
log.Printf("Starting check follows (delete flag: %t)...\n", deleteItems)
if err := checkFollows.Check(); err != nil {
if err := checkFollows.Check(c.Context()); err != nil {
log.Fatalf("error maint check follows; %v", err)
}
log.Printf("Checked follows: %d, bad: %d\n", checkFollows.Processed, checkFollows.BadFollows)
Expand Down
2 changes: 1 addition & 1 deletion cmd/maint/double_spends.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var doubleSpendCmd = &cobra.Command{
Run: func(c *cobra.Command, args []string) {
doubleSpends := new(maint.DoubleSpends)
log.Println("Counting double spends...")
if err := doubleSpends.Check(); err != nil {
if err := doubleSpends.Check(c.Context()); err != nil {
log.Fatalf("error checking double spends; %v", err)
}
log.Printf("Total entries: %d\n", doubleSpends.TotalEntries)
Expand Down
6 changes: 6 additions & 0 deletions cmd/maint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func GetCommand() *cobra.Command {
populateP2shDirectCmd.Flags().BoolP(FlagRestart, "", false, "Restart from beginning")
populateAddrOutputsCmd.Flags().BoolP(FlagRestart, "", false, "Restart from beginning")
populateAddrInputsCmd.Flags().BoolP(FlagRestart, "", false, "Restart from beginning")
backfillCmd.Flags().Int64(FlagStart, 0, "Start height (required)")
backfillCmd.Flags().Int64(FlagEnd, 0, "End height (required)")
backfillCmd.MarkFlagRequired(FlagStart)
backfillCmd.MarkFlagRequired(FlagEnd)
maintCommand.AddCommand(
queueProfileCmd,
checkFollowsCmd,
Expand All @@ -29,6 +33,8 @@ func GetCommand() *cobra.Command {
populateSeenPostsCmd,
doubleSpendCmd,
randomDoubleSpendCmd,
rescanHeadersCmd,
backfillCmd,
)
return maintCommand
}
4 changes: 2 additions & 2 deletions cmd/maint/populate_addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var populateAddrOutputsCmd = &cobra.Command{
Short: "populate-addr-outputs",
Run: func(c *cobra.Command, args []string) {
restart, _ := c.Flags().GetBool(FlagRestart)
populateAddr := maint.NewPopulateAddr(false)
populateAddr := maint.NewPopulateAddr(c.Context(), false)
log.Printf("Starting populate addr outputs...\n")
if err := populateAddr.Populate(restart); err != nil {
log.Fatalf("error populate addr outputs; %v", err)
Expand All @@ -25,7 +25,7 @@ var populateAddrInputsCmd = &cobra.Command{
Short: "populate-addr-inputs",
Run: func(c *cobra.Command, args []string) {
restart, _ := c.Flags().GetBool(FlagRestart)
populateAddr := maint.NewPopulateAddr(true)
populateAddr := maint.NewPopulateAddr(c.Context(), true)
log.Printf("Starting populate addr inputs...\n")
if err := populateAddr.Populate(restart); err != nil {
log.Fatalf("error populate addr inputs; %v", err)
Expand Down
8 changes: 4 additions & 4 deletions cmd/maint/queue_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var queueProfileCmd = &cobra.Command{
startHeight = jutil.GetInt64FromString(args[0])
}
const Shard = 0
heightBlocks, err := chain.GetHeightBlocks(Shard, startHeight, false)
heightBlocks, err := chain.GetHeightBlocks(cmd.Context(), Shard, startHeight, false)
if err != nil {
log.Fatalf("fatal error getting height blocks; %v", err)
}
Expand All @@ -37,14 +37,14 @@ var queueProfileCmd = &cobra.Command{
if err != nil {
log.Fatalf("fatal error getting block txs; %v", err)
}
var uids [][]byte
var prefixes []client.Prefix
for _, blockTx := range blockTxs {
if db.GetShardIdFromByte(blockTx.TxHash[:]) == Shard {
uids = append(uids, jutil.ByteReverse(blockTx.TxHash[:]))
prefixes = append(prefixes, client.NewPrefix(jutil.ByteReverse(blockTx.TxHash[:])))
}
}
dbClient := client.NewClient(config.GetShardConfig(Shard, config.GetQueueShards()).GetHost())
if err := dbClient.GetByPrefixes(db.TopicChainTxOutput, uids); err != nil {
if err := dbClient.GetByPrefixes(cmd.Context(), db.TopicChainTxOutput, prefixes); err != nil {
log.Fatalf("fatal error getting db message tx outputs; %v", err)
}
log.Printf("%d outputs retrieved for shard 0 (height: %d, txs: %d)\n",
Expand Down
2 changes: 1 addition & 1 deletion cmd/maint/random_double_spend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var randomDoubleSpendCmd = &cobra.Command{
Short: "Find a random double spend in output_inputs topic",
Run: func(c *cobra.Command, args []string) {
r := new(maint.RandomDoubleSpend)
if err := r.Find(); err != nil {
if err := r.Find(c.Context()); err != nil {
log.Fatalf("error finding random double spend; %v", err)
}
for _, doubleSpend := range r.DoubleSpends {
Expand Down
21 changes: 21 additions & 0 deletions cmd/maint/rescan_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package maint

import (
"log"

"github.com/memocash/index/ref/cluster/lead"
"github.com/spf13/cobra"
)

var rescanHeadersCmd = &cobra.Command{
Use: "rescan-headers",
Short: "Rescan block headers from genesis, overwriting existing height/hash mappings",
Run: func(c *cobra.Command, args []string) {
scanHeaders := lead.NewScanHeaders()
scanHeaders.Rescan = true
log.Println("Starting full header rescan from genesis...")
if err := scanHeaders.Run(); err != nil {
log.Fatalf("error rescanning headers; %v", err)
}
},
}
8 changes: 4 additions & 4 deletions cmd/peer/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var listCmd = &cobra.Command{
if len(args) > 0 {
shard = jutil.GetUInt32FromString(args[0])
}
peers, err := item.GetPeers(shard, nil)
peers, err := item.GetPeers(cmd.Context(), shard, nil)
if err != nil {
log.Fatalf("fatal error getting peers; %v", err)
}
Expand All @@ -34,7 +34,7 @@ var listFoundPeersCmd = &cobra.Command{
if len(args) > 0 {
shard = jutil.GetUInt32FromString(args[0])
}
foundPeers, err := item.GetFoundPeers(shard, nil, nil, 0)
foundPeers, err := item.GetFoundPeers(cmd.Context(), shard, nil, nil, 0)
if err != nil {
log.Fatalf("fatal error getting found peers; %v", err)
}
Expand All @@ -53,7 +53,7 @@ var listPeerFoundsCmd = &cobra.Command{
if len(args) > 0 {
shard = jutil.GetUInt32FromString(args[0])
}
foundPeers, err := item.GetPeerFounds(shard, nil)
foundPeers, err := item.GetPeerFounds(cmd.Context(), shard, nil)
if err != nil {
log.Fatalf("fatal error getting peer founds; %v", err)
}
Expand All @@ -80,7 +80,7 @@ var listAttemptsCmd = &cobra.Command{
log.Fatalf("error parsing host ip")
}
port := jutil.GetUInt16FromString(portString)
lastPeerConnection, err := item.GetPeerConnectionLast(ip, port)
lastPeerConnection, err := item.GetPeerConnectionLast(cmd.Context(), ip, port)
if err != nil {
log.Fatalf("fatal error last peer connection; %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/serve/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"log"
)

var allCmd = &cobra.Command{
Use: "all",
var devCmd = &cobra.Command{
Use: "dev",
Run: func(c *cobra.Command, args []string) {
verbose, _ := c.Flags().GetBool(FlagVerbose)
server := run.NewServer(true, verbose)
server := run.NewServer(c.Context(), true, verbose)
log.Fatalf("fatal memo server error encountered (dev); %v", server.Run())
},
}
Expand All @@ -19,7 +19,7 @@ var liveCmd = &cobra.Command{
Use: "live",
Run: func(c *cobra.Command, args []string) {
verbose, _ := c.Flags().GetBool(FlagVerbose)
server := run.NewServer(false, verbose)
server := run.NewServer(c.Context(), false, verbose)
log.Fatalf("fatal memo server error encountered (live); %v", server.Run())
},
}
2 changes: 1 addition & 1 deletion cmd/serve/lead.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var leadCmd = &cobra.Command{
Short: "lead",
Run: func(c *cobra.Command, args []string) {
verbose, _ := c.Flags().GetBool(FlagVerbose)
p := lead.NewProcessor(verbose)
p := lead.NewProcessor(c.Context(), verbose)
log.Fatalf("fatal error running lead processor; %v", p.Run())
},
}
4 changes: 2 additions & 2 deletions cmd/serve/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ var serveCmd = &cobra.Command{
}

func GetCommand() *cobra.Command {
allCmd.Flags().BoolP(FlagVerbose, "v", false, "Additional logging")
devCmd.Flags().BoolP(FlagVerbose, "v", false, "Additional logging")
liveCmd.Flags().BoolP(FlagVerbose, "v", false, "Additional logging")
leadCmd.Flags().BoolP(FlagVerbose, "v", false, "Additional logging")
networkCmd.Flags().BoolP(FlagVerbose, "v", false, "Additional logging")
shardCmd.Flags().BoolP(FlagVerbose, "v", false, "Additional logging")
serveCmd.AddCommand(
allCmd,
devCmd,
liveCmd,
dbCmd,
adminCmd,
Expand Down
Loading