Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ For testing and stress testing across multiple P2P nodes simultaneously, the pro

Publish messages to multiple nodes simultaneously:

Create a text file (`ips.txt`) with IP addresses, one per line:
Create a text file (`ips.txt`) with IP addresses, one per line. For the Docker Compose stack, host ports are 33221–33224:

```
localhost:33221
Expand All @@ -1079,7 +1079,8 @@ Then publish to all nodes in the file:
- `-ipfile`: File containing IP addresses, one per line (required)
- `-start-index`: Starting index in IP file for selecting a subset of IPs (default: 0)
- `-end-index`: Ending index in IP file (exclusive, default: 10000)
- `-count`: Number of messages to publish per node (default: 1)
- `-count`: Number of messages to publish per node (default: 1, must be >= 1)
- `-datasize`: Size in bytes of random message payload (default: 100, must be >= 1)
- `-sleep`: Delay between messages (e.g., `500ms`, `1s`)

**Index Range Selection (`-start-index` and `-end-index`):**
Expand Down Expand Up @@ -1136,7 +1137,7 @@ Published to "test-topic" (took 263.709µs)
Subscribe to multiple nodes and collect messages:

```sh
# Subscribe to all nodes in the file
# Subscribe to all nodes in the file (Docker: use ips with ports 33221–33224, start-index=0, end-index=4)
./grpc_p2p_client/p2p-multi-subscribe -topic=test-topic -ipfile=ips.txt

# With output files for data analysis
Expand Down
46 changes: 32 additions & 14 deletions grpc_p2p_client/cmd/multi-publish/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ var (
func main() {
flag.Parse()
if *topic == "" {
log.Fatalf("−topic is required")
log.Fatal("-topic is required")
}
if *count < 1 {
log.Fatal("-count must be >= 1")
}
if *dataSize < 1 {
log.Fatal("-datasize must be >= 1")
}

_ips, err := shared.ReadIPsFromFile(*ipfile)
Expand All @@ -48,6 +54,10 @@ func main() {
}
fmt.Printf("numip %d index %d\n", len(_ips), *endIdx)
*endIdx = min(len(_ips), *endIdx)
if *startIdx < 0 || *startIdx >= *endIdx || *startIdx >= len(_ips) {
log.Fatalf("invalid index range: start-index=%d end-index=%d (num IPs=%d)", *startIdx, *endIdx, len(_ips))
}

ips := _ips[*startIdx:*endIdx]
fmt.Printf("Found %d IPs: %v\n", len(ips), ips)

Expand All @@ -62,31 +72,42 @@ func main() {
}()

dataCh := make(chan string, 100)
*dataSize = int(float32(*dataSize) / 2.0)
randomByteLen := max(1, *dataSize/2)
var done chan bool
var wg sync.WaitGroup
errCh := make(chan error, len(ips))

if *output != "" {
done = make(chan bool)
go func() {
header := "sender\tsize\tsha256(msg)"
go shared.WriteToFile(ctx, dataCh, done, *output, header)
}()
header := "sender\tsize\tsha256(msg)"
go shared.WriteToFile(ctx, dataCh, done, *output, header)
}

for _, ip := range ips {
wg.Add(1)
go func(ip string) {
defer wg.Done()
datasize := *dataSize
sendMessages(ctx, ip, datasize, *output != "", dataCh)
if err := sendMessages(ctx, ip, randomByteLen, *output != "", dataCh); err != nil {
errCh <- err
cancel()
}
}(ip)
}
wg.Wait()
close(errCh)
close(dataCh)
if done != nil {
<-done
}

hasErrors := false
for err := range errCh {
hasErrors = true
log.Printf("publish worker error: %v", err)
}
if hasErrors {
os.Exit(1)
}
}

func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error {
Expand Down Expand Up @@ -114,8 +135,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
for i := 0; i < *count; i++ {
select {
case <-ctx.Done():
log.Printf("[%s] context canceled, stopping", ip)
return ctx.Err()
return nil
default:
}

Expand All @@ -136,17 +156,15 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
if err := stream.Send(pubReq); err != nil {
return fmt.Errorf("[%s] send publish: %w", ip, err)
}
fmt.Printf("Published data size %d\n", len(data))

elapsed := time.Since(start)
hash := sha256.Sum256(data)
hexHashString := hex.EncodeToString(hash[:])
var dataToSend string
if write {
dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString)
dataToSend := fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString)
dataCh <- dataToSend
}
fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed)
fmt.Printf("[%s] published %d bytes to %q (took %v)\n", ip, len(data), *topic, elapsed)

if *poisson {
lambda := 1.0 / (*sleep).Seconds()
Expand Down
69 changes: 31 additions & 38 deletions grpc_p2p_client/cmd/multi-subscribe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
func main() {
flag.Parse()
if *topic == "" {
log.Fatalf("−topic is required")
log.Fatal("-topic is required")
}

_ips, err := shared.ReadIPsFromFile(*ipfile)
Expand Down Expand Up @@ -63,33 +63,34 @@ func main() {
traceCh := make(chan string, 100)
var dataDone chan bool
var traceDone chan bool
errCh := make(chan error, len(ips))

var wg sync.WaitGroup
if *outputData != "" {
dataDone = make(chan bool)
go func() {
header := "receiver\tsender\tsize\tsha256(msg)"
go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header)
}()
header := "receiver\tsender\tsize\tsha256(msg)"
go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header)
}

if *outputTrace != "" {
traceDone = make(chan bool)
go func() {
header := ""
go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, header)
}()
header := ""
go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, header)
}

for _, ip := range ips {
wg.Add(1)
go func(ip string) {
defer wg.Done()
receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh)
if err := receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh); err != nil {
errCh <- err
cancel()
}
}(ip)
}

wg.Wait()
close(errCh)
close(dataCh)
close(traceCh)
if dataDone != nil {
Expand All @@ -98,6 +99,15 @@ func main() {
if traceDone != nil {
<-traceDone
}

hasErrors := false
for err := range errCh {
hasErrors = true
log.Printf("subscribe worker error: %v", err)
}
if hasErrors {
os.Exit(1)
}
}

func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string,
Expand All @@ -106,7 +116,7 @@ func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan
select {
case <-ctx.Done():
log.Printf("[%s] context canceled, stopping", ip)
return ctx.Err()
return nil
default:
}

Expand Down Expand Up @@ -145,37 +155,20 @@ func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan
fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic)

var receivedCount int32
msgChan := make(chan *protobuf.Response, 10000)

go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
close(msgChan)
return
}
if err != nil {
log.Printf("recv error: %v", err)
close(msgChan)
return
}
msgChan <- resp
}
}()

for {
select {
case <-ctx.Done():
log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount))
resp, err := stream.Recv()
if err == io.EOF {
log.Printf("[%s] stream closed. Total messages received: %d", ip, atomic.LoadInt32(&receivedCount))
return nil
case resp, ok := <-msgChan:
if !ok {
log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount))
}
if err != nil {
if ctx.Err() != nil {
log.Printf("[%s] context canceled. Total messages received: %d", ip, atomic.LoadInt32(&receivedCount))
return nil
}
go func(resp *protobuf.Response) {
shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh)
}(resp)
return fmt.Errorf("[%s] recv error: %w", ip, err)
}

shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh)
}
}
42 changes: 13 additions & 29 deletions grpc_p2p_client/cmd/single/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
func main() {
flag.Parse()
if *topic == "" {
log.Fatalf("−topic is required")
log.Fatal("-topic is required")
}

println(fmt.Sprintf("Connecting to node at: %s…", *addr))
Expand Down Expand Up @@ -89,46 +89,30 @@ func subscribe(ctx context.Context, stream protobuf.CommandStream_ListenCommands
fmt.Printf("Subscribed to topic %q, waiting for messages…\n", topic)

var receivedCount int32
msgChan := make(chan *protobuf.Response, 10000)

go func() {
for {
resp, err := stream.Recv()
if err == io.EOF {
close(msgChan)
return
}
if err != nil {
log.Printf("recv error: %v", err)
close(msgChan)
return
}
msgChan <- resp
}
}()

for {
select {
case <-ctx.Done():
log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount))
resp, err := stream.Recv()
if err == io.EOF {
log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount))
return
case resp, ok := <-msgChan:
if !ok {
log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount))
}
if err != nil {
if ctx.Err() != nil {
log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount))
return
}
go func(resp *protobuf.Response) {
shared.HandleResponse(resp, &receivedCount)
}(resp)
log.Printf("recv error: %v", err)
return
}

shared.HandleResponse(resp, &receivedCount)
}
}

func publish(ctx context.Context, stream protobuf.CommandStream_ListenCommandsClient,
topic, msg string, count int, sleep time.Duration) {

if msg == "" && count == 1 {
log.Fatalf("−msg is required in publish mode")
log.Fatal("-msg is required in publish mode")
}

for i := 0; i < count; i++ {
Expand Down
32 changes: 21 additions & 11 deletions grpc_p2p_client/shared/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
)

func ReadIPsFromFile(filename string) ([]string, error) {
if strings.TrimSpace(filename) == "" {
return nil, fmt.Errorf("-ipfile is required")
}

file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
Expand Down Expand Up @@ -215,6 +219,7 @@ func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, fi
log.Fatal(err)
}
defer file.Close()
defer close(done)

writer := bufio.NewWriter(file)
defer writer.Flush()
Expand All @@ -226,18 +231,23 @@ func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, fi
}
}

for data := range dataCh {
ctxDone := ctx.Done()
for {
select {
case <-ctx.Done():
return
default:
}
_, err := writer.WriteString(data + "\n")
writer.Flush()
if err != nil {
log.Printf("Write error: %v", err)
case <-ctxDone:
// Continue draining channel so producers don't block on shutdown.
ctxDone = nil
case data, ok := <-dataCh:
if !ok {
fmt.Println("All data flushed to disk")
return
}

_, err := writer.WriteString(data + "\n")
writer.Flush()
if err != nil {
log.Printf("Write error: %v", err)
}
}
}
done <- true
fmt.Println("All data flushed to disk")
}
8 changes: 5 additions & 3 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ make publish 127.0.0.1:33221 testtopic random 10 1s

**Multi-Node Clients (for stress testing):**

For the Docker Compose stack, P2P sidecar ports on the host are 33221–33224. Use an `ips.txt` with those addresses and `-start-index=0 -end-index=4` for four nodes:

```sh
# Publish to multiple nodes simultaneously
./grpc_p2p_client/p2p-multi-publish -topic=testtopic -ipfile=ips.txt -count=10 -sleep=500ms
./grpc_p2p_client/p2p-multi-publish -topic=testtopic -ipfile=ips.txt -count=10 -start-index=0 -end-index=4 -sleep=500ms

# Subscribe to multiple nodes and collect data
./grpc_p2p_client/p2p-multi-subscribe -topic=testtopic -ipfile=ips.txt -output-data=data.tsv
# Subscribe to multiple nodes (run in one terminal)
./grpc_p2p_client/p2p-multi-subscribe -topic=testtopic -ipfile=ips.txt -start-index=0 -end-index=4
```

See the [Complete Setup Guide](./docs/guide.md#multi-node-client-tools) for detailed multi-node client usage.
Expand Down
Loading