From 640a03a75700d32997aa79f6b76cb57b37bb82dd Mon Sep 17 00:00:00 2001 From: EB Date: Sat, 4 Oct 2025 20:54:40 +0900 Subject: [PATCH 1/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 46b629b..2afe1f8 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ go get github.com/elecbug/netkit@latest - [`graph`](./network-graph/graph/): Library for creating and building graphs. - [`algorithm`](./network-graph/algorithm/): Library containing various graph algorithms. -# Extensible +### Extensible - [`bimap`](./bimap/): Bidirectional map with O(1) lookups key->value and value->key. - [`slice`](./slice/): Generic helpers: binary search, stable merge sort, parallel sort, and `IsSorted`. From c194f245c49d31a13e85d4d78559f8aaa76525e0 Mon Sep 17 00:00:00 2001 From: elecbug Date: Tue, 7 Oct 2025 13:22:54 +0900 Subject: [PATCH 2/6] feat: fix test --- p2p/p2p_test.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 99f44ad..2975c8c 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -1,7 +1,10 @@ package p2p_test import ( + "encoding/json" + "fmt" "math/rand" + "os" "testing" "time" @@ -23,23 +26,31 @@ func TestGenerateNetwork(t *testing.T) { t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges) } + msg := "Hello, P2P World!" + p2p.RunNetworkSimulation(nw) - p2p.Publish(nw[0], "Hello, P2P Network!") + p2p.Publish(nw[0], msg) time.Sleep(5 * time.Second) count := 0 + result := make(map[string]map[string]any) + for id, node := range nw { - c := len(node.SentTo["Hello, P2P Network!"]) + c := len(node.SentTo[msg]) t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges)) - t.Logf("Node %d data: recv: %v, sent: %v, seen: %v\n", - id, - node.RecvFrom["Hello, P2P Network!"], - node.SentTo["Hello, P2P Network!"], - node.SeenAt["Hello, P2P Network!"], - ) + + result[fmt.Sprintf("node_%d", id)] = map[string]any{} + result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] + result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] + result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] + count += c } t.Logf("Total received count: %d\n", count) + + data, _ := json.Marshal(result) + + os.WriteFile("p2p_result.log", data, 0644) } From dac872828ead53496b416ce70d5ef132d7913273 Mon Sep 17 00:00:00 2001 From: elecbug Date: Tue, 7 Oct 2025 15:01:55 +0900 Subject: [PATCH 3/6] feat: fix test --- p2p/p2p_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 2975c8c..2a082f8 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -29,8 +29,8 @@ func TestGenerateNetwork(t *testing.T) { msg := "Hello, P2P World!" p2p.RunNetworkSimulation(nw) + time.Sleep(1 * time.Second) p2p.Publish(nw[0], msg) - time.Sleep(5 * time.Second) count := 0 @@ -54,3 +54,51 @@ func TestGenerateNetwork(t *testing.T) { os.WriteFile("p2p_result.log", data, 0644) } + +// func TestExpCase(t *testing.T) { +// for i := 4; i < 10; i++ { +// for j := 0; j < 100; j++ { +// filename := fmt.Sprintf("data/p2p_result-%02d-%03d.log", i, j) + +// if _, err := os.Stat(filename); err == nil { +// t.Logf("File %s already exists, skipping...\n", filename) +// continue +// } + +// t.Logf("Experiment case: %02d-%03d\n", i, j) +// src := rand.NewSource(time.Now().UnixNano()) +// r := rand.New(src) + +// n := r.Int()%(128-119) + 119 +// g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) + +// nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) } +// edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) } + +// nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency) +// msg := "Hello, P2P World!" + +// t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) + +// p2p.RunNetworkSimulation(nw) +// time.Sleep(1 * time.Second) +// p2p.Publish(nw[0], msg) +// time.Sleep(5 * time.Second) + +// result := make(map[string]map[string]any) + +// for id, node := range nw { +// result[fmt.Sprintf("node_%d", id)] = map[string]any{} +// result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] +// result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] +// result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] +// } + +// data, _ := json.Marshal(result) + +// os.WriteFile(filename, data, 0644) + +// algorithm.CacheClear() +// } +// } +// } From b122ef35862f3c62e4295ca999710e2cb22d64a4 Mon Sep 17 00:00:00 2001 From: elecbug Date: Mon, 13 Oct 2025 10:54:25 +0900 Subject: [PATCH 4/6] feat: update wg --- .gitignore | 3 +- p2p/network.go | 16 +++++++--- p2p/node.go | 29 ++++++++++------- p2p/p2p_test.go | 83 ++++++++++++++++++++++++++----------------------- 4 files changed, 75 insertions(+), 56 deletions(-) diff --git a/.gitignore b/.gitignore index fb99b84..bf2a116 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,5 @@ log venv/ __pycache__/ *.pyc -temp \ No newline at end of file +temp +temp/ \ No newline at end of file diff --git a/p2p/network.go b/p2p/network.go index 8700e63..4b06bbe 100644 --- a/p2p/network.go +++ b/p2p/network.go @@ -2,6 +2,7 @@ package p2p import ( "strconv" + "sync" "github.com/elecbug/netkit/network-graph/graph" "github.com/elecbug/netkit/network-graph/node" @@ -9,7 +10,7 @@ import ( // GenerateNetwork creates a P2P network from the given graph. // nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. -func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (map[ID]*Node, error) { +func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency func() float64) (map[ID]*Node, error) { nodes := make(map[ID]*Node) maps := make(map[node.ID]ID) @@ -22,9 +23,9 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m } n := &Node{ - ID: ID(num), - Latency: nodeLatency(), - Edges: make(map[ID]Edge), + ID: ID(num), + ValidationLatency: nodeLatency(), + Edges: make(map[ID]Edge), } nodes[n.ID] = n @@ -57,9 +58,14 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m // RunNetworkSimulation starts the message handling routines for all nodes in the network. func RunNetworkSimulation(nodes map[ID]*Node) { + wg := &sync.WaitGroup{} + wg.Add(len(nodes)) + for _, n := range nodes { - n.eachRun(nodes) + n.eachRun(nodes, wg) } + + wg.Wait() } // Publish sends a message to the specified node's message queue. diff --git a/p2p/node.go b/p2p/node.go index e8ab8fd..a6e056d 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -22,9 +22,10 @@ type Edge struct { // Node represents a node in the P2P network. type Node struct { - ID ID - Latency float64 - Edges map[ID]Edge + ID ID + ValidationLatency float64 + QueuingLatency float64 + Edges map[ID]Edge RecvFrom map[string]map[ID]struct{} // content -> set of senders SentTo map[string]map[ID]struct{} // content -> set of targets @@ -40,13 +41,15 @@ func (n *Node) Degree() int { } // eachRun starts the message handling routine for the node. -func (n *Node) eachRun(network map[ID]*Node) { - go func() { - n.msgQueue = make(chan Message, 1000) - n.RecvFrom = make(map[string]map[ID]struct{}) - n.SentTo = make(map[string]map[ID]struct{}) - n.SeenAt = make(map[string]time.Time) +func (n *Node) eachRun(network map[ID]*Node, wg *sync.WaitGroup) { + defer wg.Done() + + n.msgQueue = make(chan Message, 1000) + n.RecvFrom = make(map[string]map[ID]struct{}) + n.SentTo = make(map[string]map[ID]struct{}) + n.SeenAt = make(map[string]time.Time) + go func() { for msg := range n.msgQueue { first := false var excludeSnapshot map[ID]struct{} @@ -66,7 +69,7 @@ func (n *Node) eachRun(network map[ID]*Node) { if first { go func(content string, exclude map[ID]struct{}) { - time.Sleep(time.Duration(n.Latency) * time.Millisecond) + time.Sleep(time.Duration(n.ValidationLatency) * time.Millisecond) n.publish(network, content, exclude) }(msg.Content, excludeSnapshot) } @@ -86,6 +89,10 @@ func copyIDSet(src map[ID]struct{}) map[ID]struct{} { // publish sends the message to neighbors, excluding 'exclude' and already-sent targets. func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) { n.mu.Lock() + defer n.mu.Unlock() + + time.Sleep(time.Duration(n.QueuingLatency) * time.Millisecond) + if _, ok := n.SentTo[content]; !ok { n.SentTo[content] = make(map[ID]struct{}) } @@ -103,10 +110,10 @@ func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]stru n.SentTo[content][edge.TargetID] = struct{}{} edgeCopy := edge + go func(e Edge) { time.Sleep(time.Duration(e.Latency) * time.Millisecond) network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content} }(edgeCopy) } - n.mu.Unlock() } diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 2a082f8..b1b63c4 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -3,33 +3,35 @@ package p2p_test import ( "encoding/json" "fmt" + "math" "math/rand" "os" "testing" "time" + "github.com/elecbug/netkit/network-graph/algorithm" "github.com/elecbug/netkit/network-graph/graph/standard_graph" "github.com/elecbug/netkit/p2p" ) func TestGenerateNetwork(t *testing.T) { - g := standard_graph.ErdosRenyiGraph(1000, 0.005, true) + g := standard_graph.ErdosRenyiGraph(1000, 0.05, true) t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) src := rand.NewSource(time.Now().UnixNano()) nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) } edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) } + queuingLatency := func() float64 { return p2p.LogNormalRand(5.0, 0.2, src) } - nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency) + nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) t.Logf("Generated network with %d nodes\n", len(nw)) for id, node := range nw { - t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges) + t.Logf("Node %d: validation_latency=%.2fms, queuing_latency=%.2fms, edges=%v\n", id, node.ValidationLatency, node.QueuingLatency, node.Edges) } msg := "Hello, P2P World!" p2p.RunNetworkSimulation(nw) - time.Sleep(1 * time.Second) p2p.Publish(nw[0], msg) time.Sleep(5 * time.Second) @@ -55,50 +57,53 @@ func TestGenerateNetwork(t *testing.T) { os.WriteFile("p2p_result.log", data, 0644) } -// func TestExpCase(t *testing.T) { -// for i := 4; i < 10; i++ { -// for j := 0; j < 100; j++ { -// filename := fmt.Sprintf("data/p2p_result-%02d-%03d.log", i, j) +func TestExpCase(t *testing.T) { + run := false -// if _, err := os.Stat(filename); err == nil { -// t.Logf("File %s already exists, skipping...\n", filename) -// continue -// } + if run { + for i := 4; i < 10; i++ { + for j := 0; j < 100; j++ { + filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j) -// t.Logf("Experiment case: %02d-%03d\n", i, j) -// src := rand.NewSource(time.Now().UnixNano()) -// r := rand.New(src) + if _, err := os.Stat(filename); err == nil { + t.Logf("File %s already exists, skipping...\n", filename) + continue + } -// n := r.Int()%(128-119) + 119 -// g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) + t.Logf("Experiment case: %02d-%03d\n", i, j) + r := rand.New(rand.NewSource(time.Now().UnixNano())) -// nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) } -// edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) } + n := r.Int()%(128-119) + 119 + g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) -// nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency) -// msg := "Hello, P2P World!" + nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.1, rand.NewSource(time.Now().UnixNano())) } + edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(50), 0.1, rand.NewSource(time.Now().UnixNano())) } + queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(50), 0.1, rand.NewSource(time.Now().UnixNano())) } -// t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) + nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) + msg := "Hello, P2P World!" -// p2p.RunNetworkSimulation(nw) -// time.Sleep(1 * time.Second) -// p2p.Publish(nw[0], msg) -// time.Sleep(5 * time.Second) + t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) -// result := make(map[string]map[string]any) + p2p.RunNetworkSimulation(nw) + p2p.Publish(nw[0], msg) + time.Sleep(4 * time.Second) -// for id, node := range nw { -// result[fmt.Sprintf("node_%d", id)] = map[string]any{} -// result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] -// result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] -// result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] -// } + result := make(map[string]map[string]any) -// data, _ := json.Marshal(result) + for id, node := range nw { + result[fmt.Sprintf("node_%d", id)] = map[string]any{} + result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] + result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] + result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] + } -// os.WriteFile(filename, data, 0644) + data, _ := json.Marshal(result) -// algorithm.CacheClear() -// } -// } -// } + os.WriteFile(filename, data, 0644) + + algorithm.CacheClear() + } + } + } +} From 39e6c0f28c3aac607420a133e160d7c3913fa5d1 Mon Sep 17 00:00:00 2001 From: elecbug Date: Mon, 13 Oct 2025 14:04:13 +0900 Subject: [PATCH 5/6] chore: update script --- p2p/p2p_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index b1b63c4..09893d2 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -61,7 +61,7 @@ func TestExpCase(t *testing.T) { run := false if run { - for i := 4; i < 10; i++ { + for i := 4; i < 16; i++ { for j := 0; j < 100; j++ { filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j) @@ -73,7 +73,7 @@ func TestExpCase(t *testing.T) { t.Logf("Experiment case: %02d-%03d\n", i, j) r := rand.New(rand.NewSource(time.Now().UnixNano())) - n := r.Int()%(128-119) + 119 + n := r.Int()%(144-126) + 126 g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.1, rand.NewSource(time.Now().UnixNano())) } From 6b7db8df617247d0b1aff0991e2476bcee330b2c Mon Sep 17 00:00:00 2001 From: elecbug Date: Mon, 13 Oct 2025 17:39:59 +0900 Subject: [PATCH 6/6] chore: update script --- p2p/p2p_test.go | 71 ++++++++++++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 09893d2..22fd484 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -6,6 +6,7 @@ import ( "math" "math/rand" "os" + "sync" "testing" "time" @@ -33,7 +34,7 @@ func TestGenerateNetwork(t *testing.T) { p2p.RunNetworkSimulation(nw) p2p.Publish(nw[0], msg) - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) count := 0 result := make(map[string]map[string]any) @@ -61,49 +62,57 @@ func TestExpCase(t *testing.T) { run := false if run { - for i := 4; i < 16; i++ { - for j := 0; j < 100; j++ { - filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j) + for i := 4; i <= 11; i++ { + wg := &sync.WaitGroup{} - if _, err := os.Stat(filename); err == nil { - t.Logf("File %s already exists, skipping...\n", filename) - continue - } + for j := 0; j < 60; j++ { + wg.Add(1) + go func(j int) { + defer wg.Done() + filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j) - t.Logf("Experiment case: %02d-%03d\n", i, j) - r := rand.New(rand.NewSource(time.Now().UnixNano())) + if _, err := os.Stat(filename); err == nil { + t.Logf("File %s already exists, skipping...\n", filename) + return + } - n := r.Int()%(144-126) + 126 - g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) + t.Logf("Experiment case: %02d-%03d\n", i, j) + r := rand.New(rand.NewSource(time.Now().UnixNano())) - nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.1, rand.NewSource(time.Now().UnixNano())) } - edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(50), 0.1, rand.NewSource(time.Now().UnixNano())) } - queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(50), 0.1, rand.NewSource(time.Now().UnixNano())) } + n := r.Int()%20 + 170 + g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) - nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) - msg := "Hello, P2P World!" + nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) } + edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(500), 0.01, rand.NewSource(time.Now().UnixNano())) } + queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) } - t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) + nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) + msg := "Hello, P2P World!" - p2p.RunNetworkSimulation(nw) - p2p.Publish(nw[0], msg) - time.Sleep(4 * time.Second) + t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) - result := make(map[string]map[string]any) + p2p.RunNetworkSimulation(nw) + p2p.Publish(nw[0], msg) + time.Sleep(5 * time.Second) - for id, node := range nw { - result[fmt.Sprintf("node_%d", id)] = map[string]any{} - result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] - result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] - result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] - } + result := make(map[string]map[string]any) - data, _ := json.Marshal(result) + for id, node := range nw { + result[fmt.Sprintf("node_%d", id)] = map[string]any{} + result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] + result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] + result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] + } - os.WriteFile(filename, data, 0644) + data, _ := json.Marshal(result) - algorithm.CacheClear() + os.WriteFile(filename, data, 0644) + + algorithm.CacheClear() + }(j) } + + wg.Wait() } } }