From df14d264be75a9c8246af67c06fa6c06bb928879 Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Thu, 27 Nov 2025 12:35:05 +0530 Subject: [PATCH 1/4] fix: refactor grpc_p2p_client to resolve multiple main() conflict --- .gitignore | 17 + docs/guide.md | 8 +- .../multi-publish/main.go} | 138 +---- grpc_p2p_client/cmd/multi-subscribe/main.go | 178 +++++++ grpc_p2p_client/cmd/single/main.go | 169 ++++++ grpc_p2p_client/go.sum | 46 ++ grpc_p2p_client/p2p_client.go | 219 -------- .../p2p_client_multi_streams_subscribe.go | 481 ------------------ grpc_p2p_client/shared/types.go | 19 + grpc_p2p_client/shared/utils.go | 244 +++++++++ 10 files changed, 687 insertions(+), 832 deletions(-) rename grpc_p2p_client/{p2p_client_multi_streams_publish.go => cmd/multi-publish/main.go} (53%) create mode 100644 grpc_p2p_client/cmd/multi-subscribe/main.go create mode 100644 grpc_p2p_client/cmd/single/main.go delete mode 100644 grpc_p2p_client/p2p_client.go delete mode 100644 grpc_p2p_client/p2p_client_multi_streams_subscribe.go create mode 100644 grpc_p2p_client/shared/types.go create mode 100644 grpc_p2p_client/shared/utils.go diff --git a/.gitignore b/.gitignore index 6b2e388..affe363 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,20 @@ .env .idea/* identity + +# macOS +.DS_Store + + +# Build binaries +grpc_p2p_client/p2p-client +grpc_p2p_client/p2p-multi-publish +grpc_p2p_client/p2p-multi-subscribe + +# Test files +grpc_p2p_client/test-ips.txt +grpc_p2p_client/remote-ips.txt + +# Output files +*.txt +!grpc_p2p_client/local.ips.tsv diff --git a/docs/guide.md b/docs/guide.md index aea3c64..fc51d25 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -268,8 +268,8 @@ The `.env.example` file contains: ```bash BOOTSTRAP_PEER_ID=12D3KooWD5RtEPmMR9Yb2ku5VuxqK7Yj1Y5Gv8DmffJ6Ei8maU44 CLUSTER_ID=docker-dev-cluster -PROXY_VERSION=v0.0.1-rc13 -P2P_NODE_VERSION=v0.0.1-rc13 +PROXY_VERSION=v0.0.1-rc16 +P2P_NODE_VERSION=v0.0.1-rc16 ``` **Variables explained:** @@ -572,7 +572,7 @@ curl http://localhost:8081/api/v1/version ```json { - "version": "v0.0.1-rc13", + "version": "v0.0.1-rc16", "commit_hash": "8f3057d" } ``` @@ -1081,7 +1081,7 @@ response: ```json { - "version": "v0.0.1-rc13", + "version": "v0.0.1-rc16", "commit_hash": "8f3057d" } ``` diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/cmd/multi-publish/main.go similarity index 53% rename from grpc_p2p_client/p2p_client_multi_streams_publish.go rename to grpc_p2p_client/cmd/multi-publish/main.go index c43d713..d54fb3f 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/cmd/multi-publish/main.go @@ -1,12 +1,10 @@ package main import ( - "bufio" "context" "crypto/rand" "crypto/sha256" "encoding/hex" - "encoding/json" "flag" "fmt" "log" @@ -14,43 +12,22 @@ import ( mathrand "math/rand" "os" "os/signal" - "strings" "sync" - "sync/atomic" "syscall" "time" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" protobuf "p2p_client/grpc" -) + "p2p_client/shared" -// P2PMessage represents a message structure used in P2P communication -type P2PMessage struct { - MessageID string // Unique identifier for the message - Topic string // Topic name where the message was published - Message []byte // Actual message data - SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) -} - -// Command possible operation that sidecar may perform with p2p node -type Command int32 - -const ( - CommandUnknown Command = iota - CommandPublishData - CommandSubscribeToTopic - CommandUnSubscribeToTopic + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) var ( - topic = flag.String("topic", "", "topic name") - - // optional: number of messages to publish (for stress testing or batch sending) + topic = flag.String("topic", "", "topic name") count = flag.Int("count", 1, "number of messages to publish") poisson = flag.Bool("poisson", false, "Enable Poisson arrival") dataSize = flag.Int("datasize", 100, "size of random of messages to publish") - // optional: sleep duration between publishes sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") ipfile = flag.String("ipfile", "", "file with a list of IP addresses") startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") @@ -64,7 +41,7 @@ func main() { log.Fatalf("−topic is required") } - _ips, err := readIPsFromFile(*ipfile) + _ips, err := shared.ReadIPsFromFile(*ipfile) if err != nil { fmt.Printf("Error: %v\n", err) return @@ -84,21 +61,19 @@ func main() { cancel() }() - // Buffered channel to prevent blocking dataCh := make(chan string, 100) *dataSize = int(float32(*dataSize) / 2.0) var done chan bool var wg sync.WaitGroup - // Start writing the has of the published data + if *output != "" { done = make(chan bool) go func() { header := fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeHashToFile(dataCh, done, *output, header) + go shared.WriteToFile(ctx, dataCh, done, *output, header) }() } - // Launch goroutines with synchronization for _, ip := range ips { wg.Add(1) go func(ip string) { @@ -112,11 +87,9 @@ func main() { if done != nil { <-done } - } func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { - // connect with simple gRPC settings for i := 0; i < *count; i++ { select { case <-ctx.Done(): @@ -135,11 +108,9 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data if err != nil { log.Fatalf("failed to connect to node %v", err) } - //defer conn.Close() println(fmt.Sprintf("Connected to node at: %s…", ip)) client := protobuf.NewCommandStreamClient(conn) - stream, err := client.ListenCommands(ctx) if err != nil { @@ -147,18 +118,15 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } start := time.Now() - var data []byte - //currentTime := time.Now().UnixNano() randomBytes := make([]byte, datasize) if _, err := rand.Read(randomBytes); err != nil { return fmt.Errorf("[%s] failed to generate random bytes: %w", ip, err) - } randomSuffix := hex.EncodeToString(randomBytes) - data = []byte(fmt.Sprintf("%s-%s", ip, randomSuffix)) + data := []byte(fmt.Sprintf("%s-%s", ip, randomSuffix)) pubReq := &protobuf.Request{ - Command: int32(CommandPublishData), + Command: int32(shared.CommandPublishData), Topic: *topic, Data: data, } @@ -169,11 +137,10 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data fmt.Printf("Published data size %d\n", len(data)) elapsed := time.Since(start) - hash := sha256.Sum256(data) hexHashString := hex.EncodeToString(hash[:]) var dataToSend string - if write == true { + if write { dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) dataCh <- dataToSend } @@ -186,95 +153,10 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data time.Sleep(waitTime) } else { time.Sleep(*sleep) - } conn.Close() } return nil - -} - -func readIPsFromFile(filename string) ([]string, error) { - file, err := os.Open(filename) - if err != nil { - return nil, fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - var ips []string - scanner := bufio.NewScanner(file) - - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - // Skip empty lines and comments - if line == "" || strings.HasPrefix(line, "#") { - continue - } - ips = append(ips, line) - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading file: %w", err) - } - - return ips, nil -} - -func handleResponse(resp *protobuf.Response, counter *int32) { - switch resp.GetCommand() { - case protobuf.ResponseType_Message: - var p2pMessage P2PMessage - if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { - log.Printf("Error unmarshalling message: %v", err) - return - } - n := atomic.AddInt32(counter, 1) - - currentTime := time.Now().UnixNano() - messageSize := len(p2pMessage.Message) - - //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) - default: - log.Println("Unknown response command:", resp.GetCommand()) - } -} - -func headHex(b []byte, n int) string { - if len(b) > n { - b = b[:n] - } - return hex.EncodeToString(b) -} - -func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) { - file, err := os.Create(filename) - if err != nil { - log.Fatal(err) - } - defer file.Close() - - writer := bufio.NewWriter(file) - defer writer.Flush() - - // write the header - if header != "" { - _, err := writer.WriteString(header + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } - - // Process until channel is closed - for data := range dataCh { - _, err := writer.WriteString(data + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } - done <- true - fmt.Println("All data flushed to disk") - } diff --git a/grpc_p2p_client/cmd/multi-subscribe/main.go b/grpc_p2p_client/cmd/multi-subscribe/main.go new file mode 100644 index 0000000..fe3283b --- /dev/null +++ b/grpc_p2p_client/cmd/multi-subscribe/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "math" + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" + + protobuf "p2p_client/grpc" + "p2p_client/shared" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + topic = flag.String("topic", "", "topic name") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") + outputTrace = flag.String("output-trace", "", "file to write the outgoing data hashes") + outputData = flag.String("output-data", "", "file to write the outgoing data hashes") +) + +func main() { + flag.Parse() + if *topic == "" { + log.Fatalf("−topic is required") + } + + _ips, err := shared.ReadIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) + if *startIdx < 0 || *startIdx >= *endIdx || *startIdx >= len(_ips) { + log.Fatalf("invalid index range: start-index=%d end-index=%d (num IPs=%d)", *startIdx, *endIdx, len(_ips)) + } + + ips := _ips[*startIdx:*endIdx] + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nShutting down gracefully…") + cancel() + }() + + dataCh := make(chan string, 100) + traceCh := make(chan string, 100) + var dataDone chan bool + var traceDone chan bool + + var wg sync.WaitGroup + if *outputData != "" { + dataDone = make(chan bool) + go func() { + header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header) + }() + } + + if *outputTrace != "" { + traceDone = make(chan bool) + go func() { + header := "" + go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, header) + }() + } + + for _, ip := range ips { + wg.Add(1) + go func(ip string) { + defer wg.Done() + receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) + }(ip) + } + + wg.Wait() + close(dataCh) + close(traceCh) + if dataDone != nil { + <-dataDone + } + if traceDone != nil { + <-traceDone + } +} + +func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, + writeTrace bool, traceCh chan<- string) error { + + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + + fmt.Printf("IP - %v\n", ip) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + defer conn.Close() + + client := protobuf.NewCommandStreamClient(conn) + stream, err := client.ListenCommands(ctx) + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + println(fmt.Sprintf("Connected to node at: %s…", ip)) + println(fmt.Sprintf("Trying to subscribe to topic %s…", *topic)) + subReq := &protobuf.Request{ + Command: int32(shared.CommandSubscribeToTopic), + Topic: *topic, + } + if err := stream.Send(subReq); err != nil { + log.Fatalf("send subscribe: %v", err) + } + fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) + + var receivedCount int32 + msgChan := make(chan *protobuf.Response, 10000) + + go func() { + for { + resp, err := stream.Recv() + if err == io.EOF { + close(msgChan) + return + } + if err != nil { + log.Printf("recv error: %v", err) + close(msgChan) + return + } + msgChan <- resp + } + }() + + for { + select { + case <-ctx.Done(): + log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return nil + case resp, ok := <-msgChan: + if !ok { + log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return nil + } + go func(resp *protobuf.Response) { + shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) + }(resp) + } + } +} diff --git a/grpc_p2p_client/cmd/single/main.go b/grpc_p2p_client/cmd/single/main.go new file mode 100644 index 0000000..61131e2 --- /dev/null +++ b/grpc_p2p_client/cmd/single/main.go @@ -0,0 +1,169 @@ +package main + +import ( + "context" + "crypto/rand" + "encoding/hex" + "flag" + "fmt" + "io" + "log" + "math" + "os" + "os/signal" + "sync/atomic" + "syscall" + "time" + + protobuf "p2p_client/grpc" + "p2p_client/shared" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + addr = flag.String("addr", "localhost:33212", "sidecar gRPC address") + mode = flag.String("mode", "subscribe", "mode: subscribe | publish") + topic = flag.String("topic", "", "topic name") + message = flag.String("msg", "", "message data (for publish)") + count = flag.Int("count", 1, "number of messages to publish (for publish mode)") + sleep = flag.Duration("sleep", 0, "optional delay between publishes (e.g., 1s, 500ms)") +) + +func main() { + flag.Parse() + if *topic == "" { + log.Fatalf("−topic is required") + } + + println(fmt.Sprintf("Connecting to node at: %s…", *addr)) + conn, err := grpc.NewClient(*addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + defer conn.Close() + + client := protobuf.NewCommandStreamClient(conn) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.ListenCommands(ctx) + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nshutting down…") + cancel() + os.Exit(0) + }() + + switch *mode { + case "subscribe": + subscribe(ctx, stream, *topic) + case "publish": + publish(ctx, stream, *topic, *message, *count, *sleep) + default: + log.Fatalf("unknown mode %q", *mode) + } +} + +func subscribe(ctx context.Context, stream protobuf.CommandStream_ListenCommandsClient, topic string) { + println(fmt.Sprintf("Trying to subscribe to topic %s…", topic)) + subReq := &protobuf.Request{ + Command: int32(shared.CommandSubscribeToTopic), + Topic: topic, + } + if err := stream.Send(subReq); err != nil { + log.Fatalf("send subscribe: %v", err) + } + fmt.Printf("Subscribed to topic %q, waiting for messages…\n", topic) + + var receivedCount int32 + msgChan := make(chan *protobuf.Response, 10000) + + go func() { + for { + resp, err := stream.Recv() + if err == io.EOF { + close(msgChan) + return + } + if err != nil { + log.Printf("recv error: %v", err) + close(msgChan) + return + } + msgChan <- resp + } + }() + + for { + select { + case <-ctx.Done(): + log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return + case resp, ok := <-msgChan: + if !ok { + log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return + } + go func(resp *protobuf.Response) { + shared.HandleResponse(resp, &receivedCount) + }(resp) + } + } +} + +func publish(ctx context.Context, stream protobuf.CommandStream_ListenCommandsClient, + topic, msg string, count int, sleep time.Duration) { + + if msg == "" && count == 1 { + log.Fatalf("−msg is required in publish mode") + } + + for i := 0; i < count; i++ { + start := time.Now() + var data []byte + currentTime := time.Now().UnixNano() + + if count == 1 { + prefix := fmt.Sprintf("[%d %d] ", currentTime, len(msg)) + prefixBytes := []byte(prefix) + data = append(prefixBytes, msg...) + } else { + randomBytes := make([]byte, 4) + if _, err := rand.Read(randomBytes); err != nil { + log.Fatalf("failed to generate random bytes: %v", err) + } + randomSuffix := hex.EncodeToString(randomBytes) + data = []byte(fmt.Sprintf("[%d %d] %d - %s XXX", currentTime, len(randomSuffix), i+1, randomSuffix)) + } + + pubReq := &protobuf.Request{ + Command: int32(shared.CommandPublishData), + Topic: topic, + Data: data, + } + if err := stream.Send(pubReq); err != nil { + log.Fatalf("send publish: %v", err) + } + + elapsed := time.Since(start) + fmt.Printf("Published %q to %q (took %v)\n", string(data), topic, elapsed) + + if sleep > 0 { + time.Sleep(sleep) + } + } +} diff --git a/grpc_p2p_client/go.sum b/grpc_p2p_client/go.sum index e44889f..dd15b76 100644 --- a/grpc_p2p_client/go.sum +++ b/grpc_p2p_client/go.sum @@ -1,3 +1,9 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= +github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -10,10 +16,42 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg= +github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= +github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= +github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= +github.com/libp2p/go-libp2p v0.39.1 h1:1Ur6rPCf3GR+g8jkrnaQaM0ha2IGespsnNlCqJLLALE= +github.com/libp2p/go-libp2p v0.39.1/go.mod h1:3zicI8Lp7Isun+Afo/JOACUbbJqqR2owK6RQWFsVAbI= github.com/libp2p/go-libp2p-pubsub v0.14.2 h1:nT5lFHPQOFJcp9CW8hpKtvbpQNdl2udJuzLQWbgRum8= github.com/libp2p/go-libp2p-pubsub v0.14.2/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= +github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE= +github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= +github.com/multiformats/go-multiaddr v0.14.0 h1:bfrHrJhrRuh/NXH5mCnemjpbGjzRw/b+tJFOD41g2tU= +github.com/multiformats/go-multiaddr v0.14.0/go.mod h1:6EkVAxtznq2yC3QT5CM1UTAwG0GTP3EWAIcjHuzQ+r4= +github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= +github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= +github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg= +github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= +github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= +github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= +github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= +github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -31,6 +69,10 @@ go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -65,3 +107,7 @@ google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +lukechampine.com/blake3 v1.3.0 h1:sJ3XhFINmHSrYCgl958hscfIa3bw8x4DqMP3u1YvoYE= +lukechampine.com/blake3 v1.3.0/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k= diff --git a/grpc_p2p_client/p2p_client.go b/grpc_p2p_client/p2p_client.go deleted file mode 100644 index 0a6cdd8..0000000 --- a/grpc_p2p_client/p2p_client.go +++ /dev/null @@ -1,219 +0,0 @@ -package main - -import ( - "context" - "crypto/rand" - "encoding/hex" - "encoding/json" - "flag" - "fmt" - "io" - "log" - "math" - "os" - "os/signal" - "sync/atomic" - "syscall" - "time" - - protobuf "p2p_client/grpc" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// P2PMessage represents a message structure used in P2P communication -type P2PMessage struct { - MessageID string // Unique identifier for the message - Topic string // Topic name where the message was published - Message []byte // Actual message data - SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) -} - -// Command possible operation that sidecar may perform with p2p node -type Command int32 - -const ( - CommandUnknown Command = iota - CommandPublishData - CommandSubscribeToTopic - CommandUnSubscribeToTopic -) - -var ( - addr = flag.String("addr", "localhost:33212", "sidecar gRPC address") - mode = flag.String("mode", "subscribe", "mode: subscribe | publish") - topic = flag.String("topic", "", "topic name") - message = flag.String("msg", "", "message data (for publish)") - - // optional: number of messages to publish (for stress testing or batch sending) - count = flag.Int("count", 1, "number of messages to publish (for publish mode)") - // optional: sleep duration between publishes - sleep = flag.Duration("sleep", 0, "optional delay between publishes (e.g., 1s, 500ms)") -) - -func main() { - flag.Parse() - if *topic == "" { - log.Fatalf("−topic is required") - } - - // connect with simple gRPC settings - println(fmt.Sprintf("Connecting to node at: %s…", *addr)) - conn, err := grpc.NewClient(*addr, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt), - grpc.MaxCallSendMsgSize(math.MaxInt), - ), - ) - if err != nil { - log.Fatalf("failed to connect to node %v", err) - } - defer conn.Close() - - client := protobuf.NewCommandStreamClient(conn) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.ListenCommands(ctx) - if err != nil { - log.Fatalf("ListenCommands: %v", err) - } - - // intercept CTRL+C for clean shutdown - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - fmt.Println("\nshutting down…") - cancel() - os.Exit(0) - }() - - switch *mode { - case "subscribe": - println(fmt.Sprintf("Trying to subscribe to topic %s…", *topic)) - subReq := &protobuf.Request{ - Command: int32(CommandSubscribeToTopic), - Topic: *topic, - } - if err := stream.Send(subReq); err != nil { - log.Fatalf("send subscribe: %v", err) - } - fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) - - var receivedCount int32 - msgChan := make(chan *protobuf.Response, 10000) - - // recv goroutine - go func() { - for { - resp, err := stream.Recv() - if err == io.EOF { - close(msgChan) - return - } - if err != nil { - log.Printf("recv error: %v", err) - close(msgChan) - return - } - msgChan <- resp - } - }() - - // message handler loop - for { - select { - case <-ctx.Done(): - log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) - return - case resp, ok := <-msgChan: - if !ok { - log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) - return - } - go func(resp *protobuf.Response) { - handleResponse(resp, &receivedCount) - }(resp) - } - } - - case "publish": - if *message == "" && *count == 1 { - log.Fatalf("−msg is required in publish mode") - } - for i := 0; i < *count; i++ { - start := time.Now() - var data []byte - currentTime := time.Now().UnixNano() - if *count == 1 { - // Create the prefix string and convert it to bytes - prefix := fmt.Sprintf("[%d %d] ", currentTime, len(*message)) - prefixBytes := []byte(prefix) - // Prepend the prefixBytes to your existing data - data = append(prefixBytes, *message...) - } else { - // generate secure random 4-byte hex - randomBytes := make([]byte, 4) - if _, err := rand.Read(randomBytes); err != nil { - log.Fatalf("failed to generate random bytes: %v", err) - } - randomSuffix := hex.EncodeToString(randomBytes) - data = []byte(fmt.Sprintf("[%d %d] %d - %s XXX", currentTime, len(randomSuffix), i+1, randomSuffix)) - } - - pubReq := &protobuf.Request{ - Command: int32(CommandPublishData), - Topic: *topic, - Data: data, - } - if err := stream.Send(pubReq); err != nil { - log.Fatalf("send publish: %v", err) - } - - elapsed := time.Since(start) - fmt.Printf("Published %q to %q (took %v)\n", string(data), *topic, elapsed) - - if *sleep > 0 { - time.Sleep(*sleep) - } - } - - default: - log.Fatalf("unknown mode %q", *mode) - } -} - -func handleResponse(resp *protobuf.Response, counter *int32) { - switch resp.GetCommand() { - case protobuf.ResponseType_Message: - var p2pMessage P2PMessage - if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { - log.Printf("Error unmarshalling message: %v", err) - return - } - messageSize := len(p2pMessage.Message) - n := atomic.AddInt32(counter, 1) - - //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: [%d] [%d] %s\n\n", n, messageSize, string(p2pMessage.Message)) - case protobuf.ResponseType_MessageTraceGossipSub: - // Note: These trace handlers are not implemented in this file - log.Printf("GossipSub trace received but handler not implemented") - case protobuf.ResponseType_MessageTraceMumP2P: - // Note: These trace handlers are not implemented in this file - log.Printf("MumP2P trace received but handler not implemented") - case protobuf.ResponseType_Unknown: - default: - log.Println("Unknown response command:", resp.GetCommand()) - } -} - -func headHex(b []byte, n int) string { - if len(b) > n { - b = b[:n] - } - return hex.EncodeToString(b) -} diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go deleted file mode 100644 index 79c0d72..0000000 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ /dev/null @@ -1,481 +0,0 @@ -package main - -import ( - "bufio" - "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "flag" - "fmt" - "github.com/mr-tron/base58" - "io" - "log" - "math" - "os" - "os/signal" - "strings" - "sync" - "sync/atomic" - "syscall" - - "github.com/gogo/protobuf/proto" - pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p/core/peer" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - protobuf "p2p_client/grpc" - optsub "p2p_client/grpc/mump2p_trace" -) - -// P2PMessage represents a message structure used in P2P communication -type P2PMessage struct { - MessageID string // Unique identifier for the message - Topic string // Topic name where the message was published - Message []byte // Actual message data - SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) -} - -// Command possible operation that sidecar may perform with p2p node -type Command int32 - -const ( - CommandUnknown Command = iota - CommandPublishData - CommandSubscribeToTopic - CommandUnSubscribeToTopic -) - -var ( - topic = flag.String("topic", "", "topic name") - ipfile = flag.String("ipfile", "", "file with a list of IP addresses") - startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") - endIdx = flag.Int("end-index", 10000, "index-1") - outputTrace = flag.String("output-trace", "", "file to write the outgoing data hashes") - outputData = flag.String("output-data", "", "file to write the outgoing data hashes") -) - -func main() { - flag.Parse() - if *topic == "" { - log.Fatalf("−topic is required") - } - - _ips, err := readIPsFromFile(*ipfile) - if err != nil { - fmt.Printf("Error: %v\n", err) - return - } - fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) - *endIdx = min(len(_ips), *endIdx) - if *startIdx < 0 || *startIdx >= *endIdx || *startIdx >= len(_ips) { - log.Fatalf("invalid index range: start-index=%d end-index=%d (num IPs=%d)", *startIdx, *endIdx, len(_ips)) - } - - ips := _ips[*startIdx:*endIdx] - fmt.Printf("Found %d IPs: %v\n", len(ips), ips) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - fmt.Println("\nShutting down gracefully…") - cancel() - }() - - // Buffered channel to prevent blocking - dataCh := make(chan string, 100) - traceCh := make(chan string, 100) - var dataDone chan bool - var traceDone chan bool - - // Launch goroutines with synchronization - var wg sync.WaitGroup - if *outputData != "" { - dataDone = make(chan bool) - go func() { - header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") - go writeToFile(ctx, dataCh, dataDone, *outputData, header) - }() - } - - if *outputTrace != "" { - traceDone = make(chan bool) - go func() { - header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) - }() - } - - for _, ip := range ips { - wg.Add(1) - go func(ip string) { - defer wg.Done() - receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) - }(ip) - } - - wg.Wait() - close(dataCh) - close(traceCh) - if dataDone != nil { - <-dataDone - } - if traceDone != nil { - <-traceDone - } -} - -func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, - writeTrace bool, traceCh chan<- string) error { - // connect with simple gRPC settings - //fmt.Println("Starting ", ip) - select { - case <-ctx.Done(): - log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() - default: - } - - conn, err := grpc.NewClient(ip, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt), - grpc.MaxCallSendMsgSize(math.MaxInt), - ), - ) - - fmt.Printf("IP - %v\n", ip) - if err != nil { - log.Fatalf("failed to connect to node %v", err) - } - defer conn.Close() - - client := protobuf.NewCommandStreamClient(conn) - - stream, err := client.ListenCommands(ctx) - if err != nil { - log.Fatalf("ListenCommands: %v", err) - } - - println(fmt.Sprintf("Connected to node at: %s…", ip)) - println(fmt.Sprintf("Trying to subscribe to topic %s…", *topic)) - subReq := &protobuf.Request{ - Command: int32(CommandSubscribeToTopic), - Topic: *topic, - } - if err := stream.Send(subReq); err != nil { - log.Fatalf("send subscribe: %v", err) - } - fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) - - var receivedCount int32 - msgChan := make(chan *protobuf.Response, 10000) - - // recv goroutine - go func() { - for { - resp, err := stream.Recv() - if err == io.EOF { - close(msgChan) - return - } - if err != nil { - log.Printf("recv error: %v", err) - close(msgChan) - return - } - msgChan <- resp - } - }() - - // message handler loop - for { - select { - case <-ctx.Done(): - log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) - return nil - case resp, ok := <-msgChan: - if !ok { - log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) - return nil - } - go func(resp *protobuf.Response) { - handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) - }(resp) - } - } - -} - -func readIPsFromFile(filename string) ([]string, error) { - file, err := os.Open(filename) - if err != nil { - return nil, fmt.Errorf("failed to open file: %w", err) - } - defer file.Close() - - var ips []string - scanner := bufio.NewScanner(file) - - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - // Skip empty lines and comments - if line == "" || strings.HasPrefix(line, "#") { - continue - } - ips = append(ips, line) - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading file: %w", err) - } - - return ips, nil -} - -func handleResponse(ip string, resp *protobuf.Response, counter *int32, - writedata bool, dataCh chan<- string, writetrace bool, traceCh chan<- string) { - - switch resp.GetCommand() { - case protobuf.ResponseType_Message: - var p2pMessage P2PMessage - if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { - log.Printf("Error unmarshalling message: %v", err) - return - } - _ = atomic.AddInt32(counter, 1) - - hash := sha256.Sum256(p2pMessage.Message) - hexHashString := hex.EncodeToString(hash[:]) - - parts := strings.Split(string(p2pMessage.Message), "-") - if len(parts) > 0 { - publisher := parts[0] - var dataToSend string - if writedata == true { - dataToSend = fmt.Sprintf("%s\t%s\t%d\t%s", ip, publisher, len(p2pMessage.Message), hexHashString) - dataCh <- dataToSend - } - } - - //fmt.Printf("Recv message: %s %d %s\n", ip, messageSize, string(p2pMessage.Message)) - case protobuf.ResponseType_MessageTraceMumP2P: - handleOptimumP2PTrace(resp.GetData(), writetrace, traceCh) - case protobuf.ResponseType_MessageTraceGossipSub: - handleGossipSubTrace(resp.GetData(), writetrace, traceCh) - default: - log.Println("Unknown response command:", resp.GetCommand()) - } -} - -func headHex(b []byte, n int) string { - if len(b) > n { - b = b[:n] - } - return hex.EncodeToString(b) -} - -func handleGossipSubTrace(data []byte, writetrace bool, traceCh chan<- string) { - evt := &pubsubpb.TraceEvent{} - if err := proto.Unmarshal(data, evt); err != nil { - fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", - err, len(data), headHex(data, 64)) - return - } - typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] - //fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) - //fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) - - rawBytes := []byte{} - var peerID peer.ID - if evt.PeerID != nil { - rawBytes := []byte(evt.PeerID) - peerID = peer.ID(rawBytes) - // fmt.Printf("peerID: %s\n", peerID) - } - - recvID := "" - if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { - rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) - recvID = base58.Encode(rawBytes) - // fmt.Printf("Receiv: %s\n", recvID) - } - - msgID := "" - topic := "" - if evt.DeliverMessage != nil { - rawBytes = []byte(evt.DeliverMessage.MessageID) - msgID = base58.Encode(rawBytes) - // fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.DeliverMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - if evt.PublishMessage != nil { - rawBytes = []byte(evt.PublishMessage.MessageID) - msgID = base58.Encode(rawBytes) - //fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.PublishMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - - timestamp := int64(0) - if evt.Timestamp != nil { - timestamp = *evt.Timestamp - // fmt.Printf("Timestamp: %d\n", timestamp) - } - - //jb, _ := json.Marshal(evt) - //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - if writetrace { - //dataToSend := fmt.Sprintf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) - dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) - traceCh <- dataToSend - } else { - //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) - } - -} - -func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) { - evt := &optsub.TraceEvent{} - if err := proto.Unmarshal(data, evt); err != nil { - fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) - return - } - - // print type - typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] - //fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) - //fmt.Printf("[TRACE] OptimumP2P type=%s msg_id=%x time=%d, recvr_id=%s, size=%dB\n", - // typeStr, evt.GetDuplicateShard().GetMessageID(), time.Unix(0, evt.GetTimestamp()), evt.GetPeerID(), len(data)) - - // if shard-related - /* - switch evt.GetType() { - case optsub.TraceEvent_NEW_SHARD: - fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) - case optsub.TraceEvent_DUPLICATE_SHARD: - fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) - case optsub.TraceEvent_UNHELPFUL_SHARD: - fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) - case optsub.TraceEvent_UNNECESSARY_SHARD: - fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) - } - */ - - /*if evt.PeerID != nil { - fmt.Printf("PeerID: %s\n", string(evt.PeerID)) - } - */ - - rawBytes := []byte{} - var peerID peer.ID - if evt.PeerID != nil { - rawBytes := []byte(evt.PeerID) - peerID = peer.ID(rawBytes) - // fmt.Printf("peerID: %s\n", peerID) - } - - recvID := "" - if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { - rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) - recvID = base58.Encode(rawBytes) - // fmt.Printf("Receiv: %s\n", recvID) - } - - if evt.NewShard != nil && evt.NewShard.ReceivedFrom != nil { - rawBytes = []byte(evt.NewShard.ReceivedFrom) - recvID = base58.Encode(rawBytes) - // fmt.Printf("Receiv: %s\n", recvID) - } - - msgID := "" - topic := "" - if evt.DeliverMessage != nil { - rawBytes = []byte(evt.DeliverMessage.MessageID) - msgID = base58.Encode(rawBytes) - // fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.DeliverMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - if evt.PublishMessage != nil { - rawBytes = []byte(evt.PublishMessage.MessageID) - msgID = base58.Encode(rawBytes) - //fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.PublishMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - if evt.NewShard != nil { - rawBytes = []byte(evt.NewShard.MessageID) - msgID = base58.Encode(rawBytes) - //fmt.Printf("MsgID: %s\n", msgID) - //fmt.Printf("Topic: %q\n", topic) - } - - timestamp := int64(0) - if evt.Timestamp != nil { - timestamp = *evt.Timestamp - // fmt.Printf("Timestamp: %d\n", timestamp) - } - - //jb, _ := json.Marshal(evt) - - if writetrace { - //dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) - // fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) - traceCh <- dataToSend - } else { - //fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) - } - /* - message_type <- systems information - message_id <- application layer - time_stamp <- event occuring the event publish, new shard, duplicate shard - receiver_id - sender_id - - */ - -} - -func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) { - file, err := os.Create(filename) - if err != nil { - log.Fatal(err) - } - defer file.Close() - - writer := bufio.NewWriter(file) - defer writer.Flush() - - // write the header - if header != "" { - _, err := writer.WriteString(header + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } - - // Process until channel is closed - for data := range dataCh { - select { - case <-ctx.Done(): - return - default: - - } - _, err := writer.WriteString(data + "\n") - writer.Flush() - if err != nil { - log.Printf("Write error: %v", err) - } - } - done <- true - fmt.Println("All data flushed to disk") -} diff --git a/grpc_p2p_client/shared/types.go b/grpc_p2p_client/shared/types.go new file mode 100644 index 0000000..b4f5b95 --- /dev/null +++ b/grpc_p2p_client/shared/types.go @@ -0,0 +1,19 @@ +package shared + +// P2PMessage represents a message structure used in P2P communication +type P2PMessage struct { + MessageID string + Topic string + Message []byte + SourceNodeID string +} + +// Command represents possible operations that sidecar may perform with p2p node +type Command int32 + +const ( + CommandUnknown Command = iota + CommandPublishData + CommandSubscribeToTopic + CommandUnSubscribeToTopic +) diff --git a/grpc_p2p_client/shared/utils.go b/grpc_p2p_client/shared/utils.go new file mode 100644 index 0000000..bed2e1f --- /dev/null +++ b/grpc_p2p_client/shared/utils.go @@ -0,0 +1,244 @@ +package shared + +import ( + "bufio" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "log" + "os" + "strings" + "sync/atomic" + + protobuf "p2p_client/grpc" + optsub "p2p_client/grpc/mump2p_trace" + + "github.com/gogo/protobuf/proto" + pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/mr-tron/base58" +) + +func ReadIPsFromFile(filename string) ([]string, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var ips []string + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + ips = append(ips, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading file: %w", err) + } + + return ips, nil +} + +func HeadHex(b []byte, n int) string { + if len(b) > n { + b = b[:n] + } + return hex.EncodeToString(b) +} + +func HandleResponse(resp *protobuf.Response, counter *int32) { + switch resp.GetCommand() { + case protobuf.ResponseType_Message: + var p2pMessage P2PMessage + if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { + log.Printf("Error unmarshalling message: %v", err) + return + } + n := atomic.AddInt32(counter, 1) + messageSize := len(p2pMessage.Message) + currentTime := atomic.LoadInt64(&timeNow) + fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) + case protobuf.ResponseType_MessageTraceGossipSub: + log.Printf("GossipSub trace received but handler not implemented") + case protobuf.ResponseType_MessageTraceMumP2P: + log.Printf("MumP2P trace received but handler not implemented") + case protobuf.ResponseType_Unknown: + default: + log.Println("Unknown response command:", resp.GetCommand()) + } +} + +var timeNow int64 + +func HandleResponseWithTracking(ip string, resp *protobuf.Response, counter *int32, + writeData bool, dataCh chan<- string, writeTrace bool, traceCh chan<- string) { + + switch resp.GetCommand() { + case protobuf.ResponseType_Message: + var p2pMessage P2PMessage + if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { + log.Printf("Error unmarshalling message: %v", err) + return + } + _ = atomic.AddInt32(counter, 1) + + hash := sha256.Sum256(p2pMessage.Message) + hexHashString := hex.EncodeToString(hash[:]) + + parts := strings.Split(string(p2pMessage.Message), "-") + if len(parts) > 0 && writeData { + publisher := parts[0] + dataToSend := fmt.Sprintf("%s\t%s\t%d\t%s", ip, publisher, len(p2pMessage.Message), hexHashString) + dataCh <- dataToSend + } + + case protobuf.ResponseType_MessageTraceMumP2P: + HandleOptimumP2PTrace(resp.GetData(), writeTrace, traceCh) + case protobuf.ResponseType_MessageTraceGossipSub: + HandleGossipSubTrace(resp.GetData(), writeTrace, traceCh) + default: + log.Println("Unknown response command:", resp.GetCommand()) + } +} + +func HandleGossipSubTrace(data []byte, writeTrace bool, traceCh chan<- string) { + evt := &pubsubpb.TraceEvent{} + if err := proto.Unmarshal(data, evt); err != nil { + fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", + err, len(data), HeadHex(data, 64)) + return + } + + typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes := []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes := []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + topic = string(*evt.DeliverMessage.Topic) + } + if evt.PublishMessage != nil { + rawBytes := []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + topic = string(*evt.PublishMessage.Topic) + } + + timestamp := int64(0) + if evt.Timestamp != nil { + timestamp = *evt.Timestamp + } + + if writeTrace { + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend + } else { + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + } +} + +func HandleOptimumP2PTrace(data []byte, writeTrace bool, traceCh chan<- string) { + evt := &optsub.TraceEvent{} + if err := proto.Unmarshal(data, evt); err != nil { + fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) + return + } + + typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] + + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes := []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + } + if evt.NewShard != nil && evt.NewShard.ReceivedFrom != nil { + rawBytes := []byte(evt.NewShard.ReceivedFrom) + recvID = base58.Encode(rawBytes) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes := []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + topic = string(*evt.DeliverMessage.Topic) + } + if evt.PublishMessage != nil { + rawBytes := []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + topic = string(*evt.PublishMessage.Topic) + } + if evt.NewShard != nil { + rawBytes := []byte(evt.NewShard.MessageID) + msgID = base58.Encode(rawBytes) + } + + timestamp := int64(0) + if evt.Timestamp != nil { + timestamp = *evt.Timestamp + } + + if writeTrace { + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend + } else { + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + } +} + +func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) { + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + writer := bufio.NewWriter(file) + defer writer.Flush() + + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + + for data := range dataCh { + select { + case <-ctx.Done(): + return + default: + } + _, err := writer.WriteString(data + "\n") + writer.Flush() + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") +} From 3b654046899c4e82a7b9705e1c0f645f2c37bac1 Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Thu, 27 Nov 2025 12:40:44 +0530 Subject: [PATCH 2/4] fix: update CI and docs for refactored grpc_p2p_client structure --- .github/workflows/ci.yml | 32 +++++--------------------------- Makefile | 4 +++- docs/guide.md | 2 +- readme.md | 10 +++++++--- 4 files changed, 16 insertions(+), 32 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3eb1e5e..900afd6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,11 +32,13 @@ jobs: cache: true - uses: actions/checkout@v4 - - name: Build P2P Client + - name: Build P2P Clients run: | cd grpc_p2p_client - go build -o p2p-client ./p2p_client.go - echo "P2P client built successfully" + go build -o p2p-client ./cmd/single/ + go build -o p2p-multi-publish ./cmd/multi-publish/ + go build -o p2p-multi-subscribe ./cmd/multi-subscribe/ + echo "All P2P clients built successfully" - name: Build Proxy Client run: | @@ -218,27 +220,3 @@ jobs: cd ../grpc_proxy_client && go mod verify cd ../keygen && go mod verify echo "All Go modules are valid" - - # This job ensures all checks pass - useful for branch protection rules - ci-success: - name: CI Success - runs-on: ubuntu-latest - needs: [test-go-clients, golangci, test-docker-setup, test-scripts, validate-config] - if: always() - steps: - - name: Check all jobs succeeded - run: | - if [[ "${{ needs.test-go-clients.result }}" != "success" || - "${{ needs.golangci.result }}" != "success" || - "${{ needs.test-docker-setup.result }}" != "success" || - "${{ needs.test-scripts.result }}" != "success" || - "${{ needs.validate-config.result }}" != "success" ]]; then - echo "CI failed:" - echo " - Go Clients: ${{ needs.test-go-clients.result }}" - echo " - Lint: ${{ needs.golangci.result }}" - echo " - Docker Setup: ${{ needs.test-docker-setup.result }}" - echo " - Scripts: ${{ needs.test-scripts.result }}" - echo " - Config: ${{ needs.validate-config.result }}" - exit 1 - fi - echo "All CI checks passed successfully!" diff --git a/Makefile b/Makefile index 63de6e2..e473647 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,9 @@ SCRIPTS := ./script/generate-identity.sh ./script/proxy_client.sh ./test_suite.s .PHONY: $(P2P_CLIENT) $(PROXY_CLIENT) $(KEYGEN_BINARY) setup-scripts $(P2P_CLIENT): - @cd $(P2P_CLIENT_DIR) && go build -o p2p-client ./p2p_client.go + @cd $(P2P_CLIENT_DIR) && go build -o p2p-client ./cmd/single/ + @cd $(P2P_CLIENT_DIR) && go build -o p2p-multi-publish ./cmd/multi-publish/ + @cd $(P2P_CLIENT_DIR) && go build -o p2p-multi-subscribe ./cmd/multi-subscribe/ $(PROXY_CLIENT): @cd $(PROXY_CLIENT_DIR) && go build -o proxy-client ./proxy_client.go diff --git a/docs/guide.md b/docs/guide.md index fc51d25..190056f 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -1155,7 +1155,7 @@ The client recognizes these OptimumP2P trace events (observed in practice): #### Implementation Details -The trace parsing is implemented in `grpc_p2p_client/p2p_client.go`: +The trace parsing is implemented in `grpc_p2p_client/shared/utils.go`: ```go func handleGossipSubTrace(data []byte) { diff --git a/readme.md b/readme.md index 80e8c1b..096ba7d 100644 --- a/readme.md +++ b/readme.md @@ -130,9 +130,13 @@ optimum-dev-setup-guide/ │ ├── guide.md # Complete setup guide │ └── intro.png # Architecture diagram ├── grpc_p2p_client/ # P2P client implementation -│ ├── grpc/ # Generated gRPC files -│ ├── proto/ # Protocol definitions -│ └── p2p_client.go # Main P2P client +│ ├── cmd/ # Client binaries +│ │ ├── single/ # Single node client +│ │ ├── multi-publish/ # Multi-node publisher +│ │ └── multi-subscribe/ # Multi-node subscriber +│ ├── shared/ # Shared types and utilities +│ ├── grpc/ # Generated gRPC files +│ └── proto/ # Protocol definitions ├── grpc_proxy_client/ # Proxy client implementation │ ├── grpc/ # Generated gRPC files │ ├── proto/ # Protocol definitions From e44f43ecdbe63327f1b50dfec89137610b340188 Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Thu, 27 Nov 2025 13:23:11 +0530 Subject: [PATCH 3/4] fix: address critical CodeRabbit issues --- grpc_p2p_client/cmd/multi-subscribe/main.go | 11 +++++++---- grpc_p2p_client/cmd/single/main.go | 1 - grpc_p2p_client/shared/utils.go | 5 ++--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/grpc_p2p_client/cmd/multi-subscribe/main.go b/grpc_p2p_client/cmd/multi-subscribe/main.go index fe3283b..4986970 100644 --- a/grpc_p2p_client/cmd/multi-subscribe/main.go +++ b/grpc_p2p_client/cmd/multi-subscribe/main.go @@ -68,7 +68,7 @@ func main() { if *outputData != "" { dataDone = make(chan bool) go func() { - header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + header := "receiver\tsender\tsize\tsha256(msg)" go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header) }() } @@ -120,14 +120,16 @@ func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan fmt.Printf("IP - %v\n", ip) if err != nil { - log.Fatalf("failed to connect to node %v", err) + log.Printf("[%s] failed to connect to node: %v", ip, err) + return fmt.Errorf("failed to connect to node %s: %w", ip, err) } defer conn.Close() client := protobuf.NewCommandStreamClient(conn) stream, err := client.ListenCommands(ctx) if err != nil { - log.Fatalf("ListenCommands: %v", err) + log.Printf("[%s] ListenCommands failed: %v", ip, err) + return fmt.Errorf("ListenCommands failed for %s: %w", ip, err) } println(fmt.Sprintf("Connected to node at: %s…", ip)) @@ -137,7 +139,8 @@ func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan Topic: *topic, } if err := stream.Send(subReq); err != nil { - log.Fatalf("send subscribe: %v", err) + log.Printf("[%s] send subscribe failed: %v", ip, err) + return fmt.Errorf("send subscribe failed for %s: %w", ip, err) } fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) diff --git a/grpc_p2p_client/cmd/single/main.go b/grpc_p2p_client/cmd/single/main.go index 61131e2..e0ff1d7 100644 --- a/grpc_p2p_client/cmd/single/main.go +++ b/grpc_p2p_client/cmd/single/main.go @@ -65,7 +65,6 @@ func main() { <-c fmt.Println("\nshutting down…") cancel() - os.Exit(0) }() switch *mode { diff --git a/grpc_p2p_client/shared/utils.go b/grpc_p2p_client/shared/utils.go index bed2e1f..30676f1 100644 --- a/grpc_p2p_client/shared/utils.go +++ b/grpc_p2p_client/shared/utils.go @@ -11,6 +11,7 @@ import ( "os" "strings" "sync/atomic" + "time" protobuf "p2p_client/grpc" optsub "p2p_client/grpc/mump2p_trace" @@ -63,7 +64,7 @@ func HandleResponse(resp *protobuf.Response, counter *int32) { } n := atomic.AddInt32(counter, 1) messageSize := len(p2pMessage.Message) - currentTime := atomic.LoadInt64(&timeNow) + currentTime := time.Now().UnixNano() fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_MessageTraceGossipSub: log.Printf("GossipSub trace received but handler not implemented") @@ -75,8 +76,6 @@ func HandleResponse(resp *protobuf.Response, counter *int32) { } } -var timeNow int64 - func HandleResponseWithTracking(ip string, resp *protobuf.Response, counter *int32, writeData bool, dataCh chan<- string, writeTrace bool, traceCh chan<- string) { From a1679f4bc67f62f1788876971ee89e13ee15bcec Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Thu, 27 Nov 2025 15:48:47 +0530 Subject: [PATCH 4/4] fix --- .github/workflows/ci.yml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 900afd6..e46b959 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,31 +32,20 @@ jobs: cache: true - uses: actions/checkout@v4 - - name: Build P2P Clients - run: | - cd grpc_p2p_client - go build -o p2p-client ./cmd/single/ - go build -o p2p-multi-publish ./cmd/multi-publish/ - go build -o p2p-multi-subscribe ./cmd/multi-subscribe/ - echo "All P2P clients built successfully" - - name: Build Proxy Client run: | cd grpc_proxy_client go build -o proxy-client ./proxy_client.go - echo "Proxy client built successfully" - name: Build Key Generator run: | cd keygen go build -o generate-p2p-key ./generate_p2p_key.go - echo "Key generator built successfully" - name: Test Key Generation run: | cd keygen go run ./generate_p2p_key.go - echo "Key generation test passed" # Lint Go code golangci: