diff --git a/Makefile b/Makefile index d37f832..a6a1315 100644 --- a/Makefile +++ b/Makefile @@ -96,10 +96,17 @@ publish: $(P2P_CLIENT) generate-identity ## publish message to p2p topic: make p test: $(P2P_CLIENT) $(PROXY_CLIENT) $(KEYGEN_BINARY) ## Run tests for Go clients -lint: ## Run golangci-lint - @cd $(P2P_CLIENT_DIR) && golangci-lint run --skip-dirs-use-default || echo "Linting issues found in P2P client" - @cd $(PROXY_CLIENT_DIR) && golangci-lint run --skip-dirs-use-default || echo "Linting issues found in Proxy client" - @cd keygen && golangci-lint run --skip-dirs-use-default || echo "Linting issues found in Keygen" +GOLANGCI_LINT := $(shell command -v golangci-lint 2>/dev/null || echo "$(shell go env GOPATH)/bin/golangci-lint") + +install-lint: ## Install golangci-lint if not present + @command -v golangci-lint >/dev/null 2>&1 || (echo "Installing golangci-lint..." && go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest) + +lint: install-lint ## Run golangci-lint + @cd $(P2P_CLIENT_DIR) && $(GOLANGCI_LINT) run || echo "Linting issues found in P2P client" + @cd $(PROXY_CLIENT_DIR) && $(GOLANGCI_LINT) run || echo "Linting issues found in Proxy client" + @cd keygen && $(GOLANGCI_LINT) run || echo "Linting issues found in Keygen" + + test-docker: setup-scripts ## Test Docker Compose setup @./script/generate-identity.sh @@ -149,4 +156,4 @@ clean: ## Clean build artifacts @: .DEFAULT_GOAL := help -.PHONY: help build generate-identity subscribe publish test lint test-docker test-scripts validate ci clean setup-scripts dashboard +.PHONY: help build generate-identity subscribe publish test lint install-lint test-docker test-scripts validate ci clean setup-scripts dashboard diff --git a/grpc_p2p_client/batch-publish b/grpc_p2p_client/batch-publish new file mode 100755 index 0000000..4170f36 Binary files /dev/null and b/grpc_p2p_client/batch-publish differ diff --git a/grpc_p2p_client/batch_publish.ipynb b/grpc_p2p_client/batch_publish.ipynb new file mode 100644 index 0000000..ca75606 --- /dev/null +++ b/grpc_p2p_client/batch_publish.ipynb @@ -0,0 +1,173 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "a54020ef", + "metadata": {}, + "outputs": [], + "source": [ + "import subprocess\n", + "from pathos.multiprocessing import ProcessPool\n", + "import os\n", + "import random\n", + "import requests\n", + "\n", + "if not os.path.exists(\"raw_traces\"):\n", + " os.mkdir(\"raw_traces\")\n", + "else:\n", + " # delete all files in raw_traces\n", + " for file in os.listdir(\"raw_traces\"):\n", + " os.remove(os.path.join(\"raw_traces\", file))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec52698f", + "metadata": {}, + "outputs": [], + "source": [ + "num_topics = 16" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d6834a18", + "metadata": {}, + "outputs": [], + "source": [ + "topics = [f'/eth2/fulu_fork_digest/data_column_sidecar_{i}/ssz_snappy' for i in range(num_topics)]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21e027de", + "metadata": {}, + "outputs": [], + "source": [ + "ips = []\n", + "with open(\"ips.txt\", \"r\") as f:\n", + " for line in f:\n", + " ips.append(line.strip().split(\":\")[0])\n", + "\n", + "nodes_down = []\n", + "\n", + "for ip in ips: \n", + " status = requests.get(f\"http://{ip}:9090/api/v1/health\", timeout=10)\n", + " print(status.json())\n", + " if status.json()['status'] != 'ok':\n", + " nodes_down.append(ip)\n", + "\n", + "print(\"Nodes up: \", len(ips) - len(nodes_down))\n", + "print(\"Nodes down: \", len(nodes_down))\n", + "\n", + "if len(nodes_down) > 0:\n", + " print(\"Nodes down: \", nodes_down)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24ee83c0", + "metadata": {}, + "outputs": [], + "source": [ + "subscribe_processes = [] # Keep track of all started processes\n", + "trace_files = []\n", + "\n", + "for i,topic in enumerate(topics):\n", + " trace_file = f\"raw_traces/trace-data-col-{i}.tsv\"\n", + " trace_files.append(trace_file)\n", + " # select a random node to publish\n", + " proc = subprocess.Popen([\n", + " \"./p2p-multi-subscribe\",\n", + " \"-topic\", topic,\n", + " \"-ipfile\", \"ips.txt\",\n", + " \"-output-trace\", trace_file,\n", + " ])\n", + " subscribe_processes.append(proc)\n", + "\n", + "print(f\"Started {len(subscribe_processes)} subscriber processes\")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aca419e7", + "metadata": {}, + "outputs": [], + "source": [ + "cmd = [\n", + " \"./batch-publish\",\n", + " \"-addr\", f\"{ips[0]}:33212\",\n", + " \"-topics\", \",\".join(topics),\n", + " \"-msg\", \"16384\",\n", + " \"-sleep\", \"12s\",\n", + " \"-num_batches\",\"1\",\n", + "]\n", + "\n", + "subprocess.run(cmd)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "64ac941e", + "metadata": {}, + "outputs": [], + "source": [ + "merged_traces_file = \"merged_traces.tsv\"\n", + "with open(merged_traces_file, \"w\") as f:\n", + " for trace_file in trace_files:\n", + " if os.path.exists(trace_file):\n", + " with open(trace_file, \"r\") as t:\n", + " \n", + " f.write(t.read())\n", + " else:\n", + " print(f\"Warning: trace file {trace_file} not found. Skipping.\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db69cd4d", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18a0e7bd", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/grpc_p2p_client/cmd/batch-publish/main.go b/grpc_p2p_client/cmd/batch-publish/main.go new file mode 100644 index 0000000..31f2884 --- /dev/null +++ b/grpc_p2p_client/cmd/batch-publish/main.go @@ -0,0 +1,215 @@ +package main + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "math" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + protobuf "p2p_client/grpc" + "p2p_client/shared" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + addr = flag.String("addr", "localhost:33212", "sidecar gRPC address") + topics = flag.String("topics", "", "topic names") + messageSize = flag.String("msg", "", "size per message (for publish)") + output = flag.String("output", "", "file to write the outgoing data hashes") + sleep = flag.Duration("sleep", 12*time.Second, "delay between batches (e.g. 12s)") + numBatches = flag.Int("num_batches", 1, "number of batches to publish") +) + +func validateFlags() { + if *topics == "" { + log.Fatal("-topics is required") + } + if *messageSize == "" { + log.Fatal("-msg is required") + } + if *addr == "" { + log.Fatal("-addr is required") + } + if *sleep < 0 { + log.Fatal("-sleep must be >= 0") + } + if *numBatches < 1 { + log.Fatal("-num_batches must be >= 1") + } +} + +func main() { + flag.Parse() + validateFlags() + topics := parseTopics(*topics) + if len(topics) == 0 { + log.Fatal("no topics provided") + } + msgSize, err := parseMessageSize(*messageSize) + if err != nil { + log.Fatalf("invalid message size: %v", err) + } + + fmt.Printf("Connecting to node at: %s…\n", *addr) + conn, err := grpc.NewClient(*addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + defer func() { + if err := conn.Close(); err != nil { + log.Printf("error closing connection: %v", err) + } + }() + + client := protobuf.NewCommandStreamClient(conn) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nshutting down…") + cancel() + }() + stream, err := client.ListenCommands(ctx) + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + var done chan bool + dataCh := make(chan string, 100) + if *output != "" { + done = make(chan bool) + header := "sender\tsize\tsha256(msg)" + go shared.WriteToFile(ctx, dataCh, done, *output, header) + } + for i := 0; i < *numBatches; i++ { + if err := batchPublish(ctx, stream, topics, msgSize, *output != "", dataCh); err != nil { + fmt.Printf("batch publish error: %v", err) + cancel() + } + time.Sleep(*sleep) + } + if err := stream.CloseSend(); err != nil { + fmt.Printf("failed to close send side of stream: %v", err) + } + fmt.Printf("\nBatch publish completed: %d batches published (%d messages total, %d bytes total)\n", *numBatches, *numBatches*len(topics), *numBatches*len(topics)*msgSize) + + close(dataCh) + if done != nil { + <-done + } +} + +func parseTopics(topicsStr string) []string { + if topicsStr == "" { + return nil + } + topics := strings.Split(topicsStr, ",") + result := make([]string, 0, len(topics)) + for _, t := range topics { + t = strings.TrimSpace(t) + if t != "" { + result = append(result, t) + } + } + return result +} + +func parseMessageSize(msgSizeStr string) (int, error) { + size, err := strconv.Atoi(msgSizeStr) + if err != nil { + return 0, err + } + return size, nil +} + +func batchPublish(ctx context.Context, stream protobuf.CommandStream_ListenCommandsClient, topics []string, messageSize int, write bool, dataCh chan<- string) error { + fmt.Printf("Batch publishing to %d topics: %v\n", len(topics), topics) + select { + case <-ctx.Done(): + fmt.Println("Context canceled, stopping batch publish") + return fmt.Errorf("context canceled") + default: + } + + start := time.Now() + messages := make([]shared.Message, 0, len(topics)) + for _, topic := range topics { + randomBytes := make([]byte, messageSize) + if _, err := rand.Read(randomBytes); err != nil { + return fmt.Errorf("failed to generate random bytes: %v", err) + } + randomSuffix := hex.EncodeToString(randomBytes[:min(len(randomBytes), 8)]) + currentTime := time.Now().UnixNano() + data := []byte(fmt.Sprintf("[%d %d] topic:%s - %s", currentTime, messageSize, topic, randomSuffix)) + + if len(data) < messageSize { + padding := make([]byte, messageSize-len(data)) + _, err := rand.Read(padding) + if err != nil { + return fmt.Errorf("failed to generate random bytes: %v", err) + } + data = append(data, padding...) + } else if len(data) > messageSize { + data = data[:messageSize] + } + + messages = append(messages, shared.Message{ + Topic: topic, + Msg: data, + }) + } + + batch := shared.MessageBatch{ + Messages: messages, + } + batchData, err := json.Marshal(batch) + if err != nil { + return fmt.Errorf("failed to marshal batch: %v", err) + } + batchReq := &protobuf.Request{ + Command: int32(shared.CommandPublishBatch), + Data: batchData, + } + if err := stream.Send(batchReq); err != nil { + return fmt.Errorf("send batch publish: %v", err) + } + + elapsed := time.Since(start) + hash := sha256.Sum256(batchData) + hexHashString := hex.EncodeToString(hash[:]) + if write { + dataToSend := fmt.Sprintf("%d\t%s", len(batchData), hexHashString) + dataCh <- dataToSend + } + fmt.Printf("Published batch to %d topics (%d bytes, took %v)\n", len(topics), len(batchData), elapsed) + + return nil +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/grpc_p2p_client/concurrent_publish.ipynb b/grpc_p2p_client/concurrent_publish.ipynb new file mode 100644 index 0000000..4927893 --- /dev/null +++ b/grpc_p2p_client/concurrent_publish.ipynb @@ -0,0 +1,242 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1217316c", + "metadata": {}, + "source": [ + "# PeerDAS simulation" + ] + }, + { + "cell_type": "markdown", + "id": "b20602c6", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "43f7e23e", + "metadata": {}, + "outputs": [], + "source": [ + "import subprocess\n", + "from pathos.multiprocessing import ProcessPool\n", + "import os\n", + "import random\n", + "import requests\n", + "\n", + "if not os.path.exists(\"raw_traces\"):\n", + " os.mkdir(\"raw_traces\")\n", + "else:\n", + " # delete all files in raw_traces\n", + " for file in os.listdir(\"raw_traces\"):\n", + " os.remove(os.path.join(\"raw_traces\", file))\n" + ] + }, + { + "cell_type": "markdown", + "id": "38b76362", + "metadata": {}, + "source": [ + "## Experiment 1: \n", + "Setup" + ] + }, + { + "cell_type": "markdown", + "id": "a69287c4", + "metadata": {}, + "source": [ + "## Setup topics and participants" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3822f0f5", + "metadata": {}, + "outputs": [], + "source": [ + "num_topics = 8" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "08952850", + "metadata": {}, + "outputs": [], + "source": [ + "topics = [f'/eth2/fulu_fork_digest/data_column_sidecar_{i}/ssz_snappy' for i in range(num_topics)]" + ] + }, + { + "cell_type": "markdown", + "id": "96a9b7de", + "metadata": {}, + "source": [ + "## Subscriptions" + ] + }, + { + "cell_type": "markdown", + "id": "a63f80a5", + "metadata": {}, + "source": [ + "### Check node status" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4d62ad6b", + "metadata": {}, + "outputs": [], + "source": [ + "ips = []\n", + "with open(\"ips.txt\", \"r\") as f:\n", + " for line in f:\n", + " ips.append(line.strip().split(\":\")[0])\n", + "\n", + "nodes_down = []\n", + "\n", + "for ip in ips: \n", + " status = requests.get(f\"http://{ip}:9090/api/v1/health\", timeout=10)\n", + " if status.json()['status'] != 'ok':\n", + " nodes_down.append(ip)\n", + "\n", + "print(\"Nodes up: \", len(ips) - len(nodes_down))\n", + "print(\"Nodes down: \", len(nodes_down))\n", + "\n", + "if len(nodes_down) > 0:\n", + " print(\"Nodes down: \", nodes_down)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ee98b289", + "metadata": {}, + "outputs": [], + "source": [ + "subscribe_processes = [] # Keep track of all started processes\n", + "trace_files = []\n", + "\n", + "for i,topic in enumerate(topics):\n", + " trace_file = f\"raw_traces/trace-data-col-{i}.tsv\"\n", + " trace_files.append(trace_file)\n", + " # select a random node to publish\n", + " proc = subprocess.Popen([\n", + " \"./p2p-multi-subscribe\",\n", + " \"-topic\", topic,\n", + " \"-ipfile\", \"ips.txt\",\n", + " \"-output-trace\", trace_file,\n", + " ])\n", + " subscribe_processes.append(proc)\n", + "\n", + "print(f\"Started {len(subscribe_processes)} subscriber processes\")\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d78339a9", + "metadata": {}, + "outputs": [], + "source": [ + "def publish_data(topic, node_id):\n", + "\n", + " return subprocess.Popen([\n", + " \"./p2p-multi-publish\",\n", + " \"-topic\", topic,\n", + " \"-ipfile\", \"ips.txt\",\n", + " \"-datasize\", \"16384\",\n", + " \"-start-index\", \"0\",\n", + " \"-end-index\", \"1\",\n", + " \"-count\", \"1\",\n", + " \"-sleep\", \"12s\"\n", + " ])\n", + "\n", + "pool = ProcessPool(nodes=len(topics))\n", + "\n", + "def worker_task(topics_and_node_ids):\n", + " publish_data(topics_and_node_ids[0], topics_and_node_ids[1])\n", + "\n", + "ids = [i for i in range(len(topics))]\n", + "\n", + "topics_and_node_ids = list(zip(topics, [ids.pop(random.randint(0, len(ids) - 1)) for _ in range(len(topics))]))\n", + "results = pool.map(worker_task, topics_and_node_ids)\n", + "\n", + "pool.close()\n", + "pool.join()\n", + "pool.clear()\n", + "\n", + "print(\"Done\")" + ] + }, + { + "cell_type": "markdown", + "id": "5316deb4", + "metadata": {}, + "source": [ + "## Merge raw traces" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4081689", + "metadata": {}, + "outputs": [], + "source": [ + "merged_traces_file = \"merged_traces.tsv\"\n", + "with open(merged_traces_file, \"w\") as f:\n", + " for trace_file in trace_files:\n", + " if os.path.exists(trace_file):\n", + " with open(trace_file, \"r\") as t:\n", + " \n", + " f.write(t.read())\n", + " else:\n", + " print(f\"Warning: trace file {trace_file} not found. Skipping.\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "167be28d", + "metadata": {}, + "outputs": [], + "source": [ + "for proc in subscribe_processes:\n", + " proc.terminate()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/grpc_p2p_client/multi-publish b/grpc_p2p_client/multi-publish new file mode 100755 index 0000000..0649ef1 Binary files /dev/null and b/grpc_p2p_client/multi-publish differ diff --git a/grpc_p2p_client/shared/types.go b/grpc_p2p_client/shared/types.go index b4f5b95..d79cf20 100644 --- a/grpc_p2p_client/shared/types.go +++ b/grpc_p2p_client/shared/types.go @@ -8,6 +8,15 @@ type P2PMessage struct { SourceNodeID string } +type Message struct { + Topic string `json:"topic"` + Msg []byte `json:"msg"` +} + +type MessageBatch struct { + Messages []Message `json:"messages"` +} + // Command represents possible operations that sidecar may perform with p2p node type Command int32 @@ -16,4 +25,7 @@ const ( CommandPublishData CommandSubscribeToTopic CommandUnSubscribeToTopic + CommandSubscribeToTopics + CommandPublishRandomData + CommandPublishBatch )