Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,17 @@ publish: $(P2P_CLIENT) generate-identity ## publish message to p2p topic: make p

test: $(P2P_CLIENT) $(PROXY_CLIENT) $(KEYGEN_BINARY) ## Run tests for Go clients

lint: ## Run golangci-lint
@cd $(P2P_CLIENT_DIR) && golangci-lint run --skip-dirs-use-default || echo "Linting issues found in P2P client"
@cd $(PROXY_CLIENT_DIR) && golangci-lint run --skip-dirs-use-default || echo "Linting issues found in Proxy client"
@cd keygen && golangci-lint run --skip-dirs-use-default || echo "Linting issues found in Keygen"
GOLANGCI_LINT := $(shell command -v golangci-lint 2>/dev/null || echo "$(shell go env GOPATH)/bin/golangci-lint")

install-lint: ## Install golangci-lint if not present
@command -v golangci-lint >/dev/null 2>&1 || (echo "Installing golangci-lint..." && go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest)

lint: install-lint ## Run golangci-lint
@cd $(P2P_CLIENT_DIR) && $(GOLANGCI_LINT) run || echo "Linting issues found in P2P client"
@cd $(PROXY_CLIENT_DIR) && $(GOLANGCI_LINT) run || echo "Linting issues found in Proxy client"
@cd keygen && $(GOLANGCI_LINT) run || echo "Linting issues found in Keygen"



test-docker: setup-scripts ## Test Docker Compose setup
@./script/generate-identity.sh
Expand Down Expand Up @@ -149,4 +156,4 @@ clean: ## Clean build artifacts
@:

.DEFAULT_GOAL := help
.PHONY: help build generate-identity subscribe publish test lint test-docker test-scripts validate ci clean setup-scripts dashboard
.PHONY: help build generate-identity subscribe publish test lint install-lint test-docker test-scripts validate ci clean setup-scripts dashboard
Binary file added grpc_p2p_client/batch-publish
Binary file not shown.
173 changes: 173 additions & 0 deletions grpc_p2p_client/batch_publish.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "a54020ef",
"metadata": {},
"outputs": [],
"source": [
"import subprocess\n",
"from pathos.multiprocessing import ProcessPool\n",
"import os\n",
"import random\n",
"import requests\n",
"\n",
"if not os.path.exists(\"raw_traces\"):\n",
" os.mkdir(\"raw_traces\")\n",
"else:\n",
" # delete all files in raw_traces\n",
" for file in os.listdir(\"raw_traces\"):\n",
" os.remove(os.path.join(\"raw_traces\", file))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ec52698f",
"metadata": {},
"outputs": [],
"source": [
"num_topics = 16"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d6834a18",
"metadata": {},
"outputs": [],
"source": [
"topics = [f'/eth2/fulu_fork_digest/data_column_sidecar_{i}/ssz_snappy' for i in range(num_topics)]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "21e027de",
"metadata": {},
"outputs": [],
"source": [
"ips = []\n",
"with open(\"ips.txt\", \"r\") as f:\n",
" for line in f:\n",
" ips.append(line.strip().split(\":\")[0])\n",
"\n",
"nodes_down = []\n",
"\n",
"for ip in ips: \n",
" status = requests.get(f\"http://{ip}:9090/api/v1/health\", timeout=10)\n",
" print(status.json())\n",
" if status.json()['status'] != 'ok':\n",
" nodes_down.append(ip)\n",
"\n",
"print(\"Nodes up: \", len(ips) - len(nodes_down))\n",
"print(\"Nodes down: \", len(nodes_down))\n",
"\n",
"if len(nodes_down) > 0:\n",
" print(\"Nodes down: \", nodes_down)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "24ee83c0",
"metadata": {},
"outputs": [],
"source": [
"subscribe_processes = [] # Keep track of all started processes\n",
"trace_files = []\n",
"\n",
"for i,topic in enumerate(topics):\n",
" trace_file = f\"raw_traces/trace-data-col-{i}.tsv\"\n",
" trace_files.append(trace_file)\n",
" # select a random node to publish\n",
" proc = subprocess.Popen([\n",
" \"./p2p-multi-subscribe\",\n",
" \"-topic\", topic,\n",
" \"-ipfile\", \"ips.txt\",\n",
" \"-output-trace\", trace_file,\n",
" ])\n",
" subscribe_processes.append(proc)\n",
"\n",
"print(f\"Started {len(subscribe_processes)} subscriber processes\")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aca419e7",
"metadata": {},
"outputs": [],
"source": [
"cmd = [\n",
" \"./batch-publish\",\n",
" \"-addr\", f\"{ips[0]}:33212\",\n",
" \"-topics\", \",\".join(topics),\n",
" \"-msg\", \"16384\",\n",
" \"-sleep\", \"12s\",\n",
" \"-num_batches\",\"1\",\n",
"]\n",
"\n",
"subprocess.run(cmd)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "64ac941e",
"metadata": {},
"outputs": [],
"source": [
"merged_traces_file = \"merged_traces.tsv\"\n",
"with open(merged_traces_file, \"w\") as f:\n",
" for trace_file in trace_files:\n",
" if os.path.exists(trace_file):\n",
" with open(trace_file, \"r\") as t:\n",
" \n",
" f.write(t.read())\n",
" else:\n",
" print(f\"Warning: trace file {trace_file} not found. Skipping.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "db69cd4d",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "18a0e7bd",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading