From c867eba7ae3667519d61cb4f00bbfac9678bbdc8 Mon Sep 17 00:00:00 2001 From: singhhp1069 Date: Wed, 4 Feb 2026 17:19:29 +0400 Subject: [PATCH 1/2] fix: script leak and shut down and remove per-message goroutine --- grpc_p2p_client/cmd/multi-publish/main.go | 46 +++++++++----- grpc_p2p_client/cmd/multi-subscribe/main.go | 69 +++++++++------------ grpc_p2p_client/cmd/single/main.go | 42 ++++--------- grpc_p2p_client/shared/utils.go | 32 ++++++---- 4 files changed, 97 insertions(+), 92 deletions(-) diff --git a/grpc_p2p_client/cmd/multi-publish/main.go b/grpc_p2p_client/cmd/multi-publish/main.go index 1043d47..f59e048 100644 --- a/grpc_p2p_client/cmd/multi-publish/main.go +++ b/grpc_p2p_client/cmd/multi-publish/main.go @@ -38,7 +38,13 @@ var ( func main() { flag.Parse() if *topic == "" { - log.Fatalf("−topic is required") + log.Fatal("-topic is required") + } + if *count < 1 { + log.Fatal("-count must be >= 1") + } + if *dataSize < 1 { + log.Fatal("-datasize must be >= 1") } _ips, err := shared.ReadIPsFromFile(*ipfile) @@ -48,6 +54,10 @@ func main() { } fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) *endIdx = min(len(_ips), *endIdx) + if *startIdx < 0 || *startIdx >= *endIdx || *startIdx >= len(_ips) { + log.Fatalf("invalid index range: start-index=%d end-index=%d (num IPs=%d)", *startIdx, *endIdx, len(_ips)) + } + ips := _ips[*startIdx:*endIdx] fmt.Printf("Found %d IPs: %v\n", len(ips), ips) @@ -62,31 +72,42 @@ func main() { }() dataCh := make(chan string, 100) - *dataSize = int(float32(*dataSize) / 2.0) + randomByteLen := max(1, *dataSize/2) var done chan bool var wg sync.WaitGroup + errCh := make(chan error, len(ips)) if *output != "" { done = make(chan bool) - go func() { - header := "sender\tsize\tsha256(msg)" - go shared.WriteToFile(ctx, dataCh, done, *output, header) - }() + header := "sender\tsize\tsha256(msg)" + go shared.WriteToFile(ctx, dataCh, done, *output, header) } for _, ip := range ips { wg.Add(1) go func(ip string) { defer wg.Done() - datasize := *dataSize - sendMessages(ctx, ip, datasize, *output != "", dataCh) + if err := sendMessages(ctx, ip, randomByteLen, *output != "", dataCh); err != nil { + errCh <- err + cancel() + } }(ip) } wg.Wait() + close(errCh) close(dataCh) if done != nil { <-done } + + hasErrors := false + for err := range errCh { + hasErrors = true + log.Printf("publish worker error: %v", err) + } + if hasErrors { + os.Exit(1) + } } func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { @@ -114,8 +135,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data for i := 0; i < *count; i++ { select { case <-ctx.Done(): - log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() + return nil default: } @@ -136,17 +156,15 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data if err := stream.Send(pubReq); err != nil { return fmt.Errorf("[%s] send publish: %w", ip, err) } - fmt.Printf("Published data size %d\n", len(data)) elapsed := time.Since(start) hash := sha256.Sum256(data) hexHashString := hex.EncodeToString(hash[:]) - var dataToSend string if write { - dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) + dataToSend := fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) dataCh <- dataToSend } - fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed) + fmt.Printf("[%s] published %d bytes to %q (took %v)\n", ip, len(data), *topic, elapsed) if *poisson { lambda := 1.0 / (*sleep).Seconds() diff --git a/grpc_p2p_client/cmd/multi-subscribe/main.go b/grpc_p2p_client/cmd/multi-subscribe/main.go index 4986970..a1ce868 100644 --- a/grpc_p2p_client/cmd/multi-subscribe/main.go +++ b/grpc_p2p_client/cmd/multi-subscribe/main.go @@ -32,7 +32,7 @@ var ( func main() { flag.Parse() if *topic == "" { - log.Fatalf("−topic is required") + log.Fatal("-topic is required") } _ips, err := shared.ReadIPsFromFile(*ipfile) @@ -63,33 +63,34 @@ func main() { traceCh := make(chan string, 100) var dataDone chan bool var traceDone chan bool + errCh := make(chan error, len(ips)) var wg sync.WaitGroup if *outputData != "" { dataDone = make(chan bool) - go func() { - header := "receiver\tsender\tsize\tsha256(msg)" - go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header) - }() + header := "receiver\tsender\tsize\tsha256(msg)" + go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header) } if *outputTrace != "" { traceDone = make(chan bool) - go func() { - header := "" - go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, header) - }() + header := "" + go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, header) } for _, ip := range ips { wg.Add(1) go func(ip string) { defer wg.Done() - receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) + if err := receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh); err != nil { + errCh <- err + cancel() + } }(ip) } wg.Wait() + close(errCh) close(dataCh) close(traceCh) if dataDone != nil { @@ -98,6 +99,15 @@ func main() { if traceDone != nil { <-traceDone } + + hasErrors := false + for err := range errCh { + hasErrors = true + log.Printf("subscribe worker error: %v", err) + } + if hasErrors { + os.Exit(1) + } } func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, @@ -106,7 +116,7 @@ func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan select { case <-ctx.Done(): log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() + return nil default: } @@ -145,37 +155,20 @@ func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) var receivedCount int32 - msgChan := make(chan *protobuf.Response, 10000) - - go func() { - for { - resp, err := stream.Recv() - if err == io.EOF { - close(msgChan) - return - } - if err != nil { - log.Printf("recv error: %v", err) - close(msgChan) - return - } - msgChan <- resp - } - }() - for { - select { - case <-ctx.Done(): - log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + resp, err := stream.Recv() + if err == io.EOF { + log.Printf("[%s] stream closed. Total messages received: %d", ip, atomic.LoadInt32(&receivedCount)) return nil - case resp, ok := <-msgChan: - if !ok { - log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + } + if err != nil { + if ctx.Err() != nil { + log.Printf("[%s] context canceled. Total messages received: %d", ip, atomic.LoadInt32(&receivedCount)) return nil } - go func(resp *protobuf.Response) { - shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) - }(resp) + return fmt.Errorf("[%s] recv error: %w", ip, err) } + + shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) } } diff --git a/grpc_p2p_client/cmd/single/main.go b/grpc_p2p_client/cmd/single/main.go index e0ff1d7..1b26ac1 100644 --- a/grpc_p2p_client/cmd/single/main.go +++ b/grpc_p2p_client/cmd/single/main.go @@ -34,7 +34,7 @@ var ( func main() { flag.Parse() if *topic == "" { - log.Fatalf("−topic is required") + log.Fatal("-topic is required") } println(fmt.Sprintf("Connecting to node at: %s…", *addr)) @@ -89,38 +89,22 @@ func subscribe(ctx context.Context, stream protobuf.CommandStream_ListenCommands fmt.Printf("Subscribed to topic %q, waiting for messages…\n", topic) var receivedCount int32 - msgChan := make(chan *protobuf.Response, 10000) - - go func() { - for { - resp, err := stream.Recv() - if err == io.EOF { - close(msgChan) - return - } - if err != nil { - log.Printf("recv error: %v", err) - close(msgChan) - return - } - msgChan <- resp - } - }() - for { - select { - case <-ctx.Done(): - log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + resp, err := stream.Recv() + if err == io.EOF { + log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) return - case resp, ok := <-msgChan: - if !ok { - log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + } + if err != nil { + if ctx.Err() != nil { + log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) return } - go func(resp *protobuf.Response) { - shared.HandleResponse(resp, &receivedCount) - }(resp) + log.Printf("recv error: %v", err) + return } + + shared.HandleResponse(resp, &receivedCount) } } @@ -128,7 +112,7 @@ func publish(ctx context.Context, stream protobuf.CommandStream_ListenCommandsCl topic, msg string, count int, sleep time.Duration) { if msg == "" && count == 1 { - log.Fatalf("−msg is required in publish mode") + log.Fatal("-msg is required in publish mode") } for i := 0; i < count; i++ { diff --git a/grpc_p2p_client/shared/utils.go b/grpc_p2p_client/shared/utils.go index 2821978..d967dff 100644 --- a/grpc_p2p_client/shared/utils.go +++ b/grpc_p2p_client/shared/utils.go @@ -23,6 +23,10 @@ import ( ) func ReadIPsFromFile(filename string) ([]string, error) { + if strings.TrimSpace(filename) == "" { + return nil, fmt.Errorf("-ipfile is required") + } + file, err := os.Open(filename) if err != nil { return nil, fmt.Errorf("failed to open file: %w", err) @@ -215,6 +219,7 @@ func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, fi log.Fatal(err) } defer file.Close() + defer close(done) writer := bufio.NewWriter(file) defer writer.Flush() @@ -226,18 +231,23 @@ func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, fi } } - for data := range dataCh { + ctxDone := ctx.Done() + for { select { - case <-ctx.Done(): - return - default: - } - _, err := writer.WriteString(data + "\n") - writer.Flush() - if err != nil { - log.Printf("Write error: %v", err) + case <-ctxDone: + // Continue draining channel so producers don't block on shutdown. + ctxDone = nil + case data, ok := <-dataCh: + if !ok { + fmt.Println("All data flushed to disk") + return + } + + _, err := writer.WriteString(data + "\n") + writer.Flush() + if err != nil { + log.Printf("Write error: %v", err) + } } } - done <- true - fmt.Println("All data flushed to disk") } From bb8e73582a2176c4e70e8b53f25e2950217b89d3 Mon Sep 17 00:00:00 2001 From: swarnabhasinha Date: Wed, 4 Feb 2026 19:37:03 +0530 Subject: [PATCH 2/2] fix: docs --- docs/guide.md | 7 ++++--- readme.md | 8 +++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/guide.md b/docs/guide.md index d5d3113..c72f7b4 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -1059,7 +1059,7 @@ For testing and stress testing across multiple P2P nodes simultaneously, the pro Publish messages to multiple nodes simultaneously: -Create a text file (`ips.txt`) with IP addresses, one per line: +Create a text file (`ips.txt`) with IP addresses, one per line. For the Docker Compose stack, host ports are 33221–33224: ``` localhost:33221 @@ -1079,7 +1079,8 @@ Then publish to all nodes in the file: - `-ipfile`: File containing IP addresses, one per line (required) - `-start-index`: Starting index in IP file for selecting a subset of IPs (default: 0) - `-end-index`: Ending index in IP file (exclusive, default: 10000) -- `-count`: Number of messages to publish per node (default: 1) +- `-count`: Number of messages to publish per node (default: 1, must be >= 1) +- `-datasize`: Size in bytes of random message payload (default: 100, must be >= 1) - `-sleep`: Delay between messages (e.g., `500ms`, `1s`) **Index Range Selection (`-start-index` and `-end-index`):** @@ -1136,7 +1137,7 @@ Published to "test-topic" (took 263.709µs) Subscribe to multiple nodes and collect messages: ```sh -# Subscribe to all nodes in the file +# Subscribe to all nodes in the file (Docker: use ips with ports 33221–33224, start-index=0, end-index=4) ./grpc_p2p_client/p2p-multi-subscribe -topic=test-topic -ipfile=ips.txt # With output files for data analysis diff --git a/readme.md b/readme.md index 48e8765..9d55bf4 100644 --- a/readme.md +++ b/readme.md @@ -111,12 +111,14 @@ make publish 127.0.0.1:33221 testtopic random 10 1s **Multi-Node Clients (for stress testing):** +For the Docker Compose stack, P2P sidecar ports on the host are 33221–33224. Use an `ips.txt` with those addresses and `-start-index=0 -end-index=4` for four nodes: + ```sh # Publish to multiple nodes simultaneously -./grpc_p2p_client/p2p-multi-publish -topic=testtopic -ipfile=ips.txt -count=10 -sleep=500ms +./grpc_p2p_client/p2p-multi-publish -topic=testtopic -ipfile=ips.txt -count=10 -start-index=0 -end-index=4 -sleep=500ms -# Subscribe to multiple nodes and collect data -./grpc_p2p_client/p2p-multi-subscribe -topic=testtopic -ipfile=ips.txt -output-data=data.tsv +# Subscribe to multiple nodes (run in one terminal) +./grpc_p2p_client/p2p-multi-subscribe -topic=testtopic -ipfile=ips.txt -start-index=0 -end-index=4 ``` See the [Complete Setup Guide](./docs/guide.md#multi-node-client-tools) for detailed multi-node client usage.