diff --git a/.gitignore b/.gitignore index fb99b84..bf2a116 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,5 @@ log venv/ __pycache__/ *.pyc -temp \ No newline at end of file +temp +temp/ \ No newline at end of file diff --git a/README.md b/README.md index 46b629b..2afe1f8 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ go get github.com/elecbug/netkit@latest - [`graph`](./network-graph/graph/): Library for creating and building graphs. - [`algorithm`](./network-graph/algorithm/): Library containing various graph algorithms. -# Extensible +### Extensible - [`bimap`](./bimap/): Bidirectional map with O(1) lookups key->value and value->key. - [`slice`](./slice/): Generic helpers: binary search, stable merge sort, parallel sort, and `IsSorted`. diff --git a/p2p/network.go b/p2p/network.go index 8700e63..4b06bbe 100644 --- a/p2p/network.go +++ b/p2p/network.go @@ -2,6 +2,7 @@ package p2p import ( "strconv" + "sync" "github.com/elecbug/netkit/network-graph/graph" "github.com/elecbug/netkit/network-graph/node" @@ -9,7 +10,7 @@ import ( // GenerateNetwork creates a P2P network from the given graph. // nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. -func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (map[ID]*Node, error) { +func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency func() float64) (map[ID]*Node, error) { nodes := make(map[ID]*Node) maps := make(map[node.ID]ID) @@ -22,9 +23,9 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m } n := &Node{ - ID: ID(num), - Latency: nodeLatency(), - Edges: make(map[ID]Edge), + ID: ID(num), + ValidationLatency: nodeLatency(), + Edges: make(map[ID]Edge), } nodes[n.ID] = n @@ -57,9 +58,14 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m // RunNetworkSimulation starts the message handling routines for all nodes in the network. func RunNetworkSimulation(nodes map[ID]*Node) { + wg := &sync.WaitGroup{} + wg.Add(len(nodes)) + for _, n := range nodes { - n.eachRun(nodes) + n.eachRun(nodes, wg) } + + wg.Wait() } // Publish sends a message to the specified node's message queue. diff --git a/p2p/node.go b/p2p/node.go index e8ab8fd..a6e056d 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -22,9 +22,10 @@ type Edge struct { // Node represents a node in the P2P network. type Node struct { - ID ID - Latency float64 - Edges map[ID]Edge + ID ID + ValidationLatency float64 + QueuingLatency float64 + Edges map[ID]Edge RecvFrom map[string]map[ID]struct{} // content -> set of senders SentTo map[string]map[ID]struct{} // content -> set of targets @@ -40,13 +41,15 @@ func (n *Node) Degree() int { } // eachRun starts the message handling routine for the node. -func (n *Node) eachRun(network map[ID]*Node) { - go func() { - n.msgQueue = make(chan Message, 1000) - n.RecvFrom = make(map[string]map[ID]struct{}) - n.SentTo = make(map[string]map[ID]struct{}) - n.SeenAt = make(map[string]time.Time) +func (n *Node) eachRun(network map[ID]*Node, wg *sync.WaitGroup) { + defer wg.Done() + + n.msgQueue = make(chan Message, 1000) + n.RecvFrom = make(map[string]map[ID]struct{}) + n.SentTo = make(map[string]map[ID]struct{}) + n.SeenAt = make(map[string]time.Time) + go func() { for msg := range n.msgQueue { first := false var excludeSnapshot map[ID]struct{} @@ -66,7 +69,7 @@ func (n *Node) eachRun(network map[ID]*Node) { if first { go func(content string, exclude map[ID]struct{}) { - time.Sleep(time.Duration(n.Latency) * time.Millisecond) + time.Sleep(time.Duration(n.ValidationLatency) * time.Millisecond) n.publish(network, content, exclude) }(msg.Content, excludeSnapshot) } @@ -86,6 +89,10 @@ func copyIDSet(src map[ID]struct{}) map[ID]struct{} { // publish sends the message to neighbors, excluding 'exclude' and already-sent targets. func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) { n.mu.Lock() + defer n.mu.Unlock() + + time.Sleep(time.Duration(n.QueuingLatency) * time.Millisecond) + if _, ok := n.SentTo[content]; !ok { n.SentTo[content] = make(map[ID]struct{}) } @@ -103,10 +110,10 @@ func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]stru n.SentTo[content][edge.TargetID] = struct{}{} edgeCopy := edge + go func(e Edge) { time.Sleep(time.Duration(e.Latency) * time.Millisecond) network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content} }(edgeCopy) } - n.mu.Unlock() } diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 99f44ad..22fd484 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -1,45 +1,118 @@ package p2p_test import ( + "encoding/json" + "fmt" + "math" "math/rand" + "os" + "sync" "testing" "time" + "github.com/elecbug/netkit/network-graph/algorithm" "github.com/elecbug/netkit/network-graph/graph/standard_graph" "github.com/elecbug/netkit/p2p" ) func TestGenerateNetwork(t *testing.T) { - g := standard_graph.ErdosRenyiGraph(1000, 0.005, true) + g := standard_graph.ErdosRenyiGraph(1000, 0.05, true) t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) src := rand.NewSource(time.Now().UnixNano()) nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) } edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) } + queuingLatency := func() float64 { return p2p.LogNormalRand(5.0, 0.2, src) } - nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency) + nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) t.Logf("Generated network with %d nodes\n", len(nw)) for id, node := range nw { - t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges) + t.Logf("Node %d: validation_latency=%.2fms, queuing_latency=%.2fms, edges=%v\n", id, node.ValidationLatency, node.QueuingLatency, node.Edges) } - p2p.RunNetworkSimulation(nw) - p2p.Publish(nw[0], "Hello, P2P Network!") + msg := "Hello, P2P World!" - time.Sleep(5 * time.Second) + p2p.RunNetworkSimulation(nw) + p2p.Publish(nw[0], msg) + time.Sleep(1 * time.Second) count := 0 + result := make(map[string]map[string]any) + for id, node := range nw { - c := len(node.SentTo["Hello, P2P Network!"]) + c := len(node.SentTo[msg]) t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges)) - t.Logf("Node %d data: recv: %v, sent: %v, seen: %v\n", - id, - node.RecvFrom["Hello, P2P Network!"], - node.SentTo["Hello, P2P Network!"], - node.SeenAt["Hello, P2P Network!"], - ) + + result[fmt.Sprintf("node_%d", id)] = map[string]any{} + result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] + result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] + result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] + count += c } t.Logf("Total received count: %d\n", count) + + data, _ := json.Marshal(result) + + os.WriteFile("p2p_result.log", data, 0644) +} + +func TestExpCase(t *testing.T) { + run := false + + if run { + for i := 4; i <= 11; i++ { + wg := &sync.WaitGroup{} + + for j := 0; j < 60; j++ { + wg.Add(1) + go func(j int) { + defer wg.Done() + filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j) + + if _, err := os.Stat(filename); err == nil { + t.Logf("File %s already exists, skipping...\n", filename) + return + } + + t.Logf("Experiment case: %02d-%03d\n", i, j) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + n := r.Int()%20 + 170 + g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) + + nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) } + edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(500), 0.01, rand.NewSource(time.Now().UnixNano())) } + queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) } + + nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) + msg := "Hello, P2P World!" + + t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) + + p2p.RunNetworkSimulation(nw) + p2p.Publish(nw[0], msg) + time.Sleep(5 * time.Second) + + result := make(map[string]map[string]any) + + for id, node := range nw { + result[fmt.Sprintf("node_%d", id)] = map[string]any{} + result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] + result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] + result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] + } + + data, _ := json.Marshal(result) + + os.WriteFile(filename, data, 0644) + + algorithm.CacheClear() + }(j) + } + + wg.Wait() + } + } }