Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ log
venv/
__pycache__/
*.pyc
temp
temp
temp/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ go get github.com/elecbug/netkit@latest
- [`graph`](./network-graph/graph/): Library for creating and building graphs.
- [`algorithm`](./network-graph/algorithm/): Library containing various graph algorithms.

# Extensible
### Extensible

- [`bimap`](./bimap/): Bidirectional map with O(1) lookups key->value and value->key.
- [`slice`](./slice/): Generic helpers: binary search, stable merge sort, parallel sort, and `IsSorted`.
Expand Down
16 changes: 11 additions & 5 deletions p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package p2p

import (
"strconv"
"sync"

"github.com/elecbug/netkit/network-graph/graph"
"github.com/elecbug/netkit/network-graph/node"
)

// GenerateNetwork creates a P2P network from the given graph.
// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively.
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (map[ID]*Node, error) {
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency func() float64) (map[ID]*Node, error) {
nodes := make(map[ID]*Node)
maps := make(map[node.ID]ID)

Expand All @@ -22,9 +23,9 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m
}

n := &Node{
ID: ID(num),
Latency: nodeLatency(),
Edges: make(map[ID]Edge),
ID: ID(num),
ValidationLatency: nodeLatency(),
Edges: make(map[ID]Edge),
}

nodes[n.ID] = n
Expand Down Expand Up @@ -57,9 +58,14 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (m

// RunNetworkSimulation starts the message handling routines for all nodes in the network.
func RunNetworkSimulation(nodes map[ID]*Node) {
wg := &sync.WaitGroup{}
wg.Add(len(nodes))

for _, n := range nodes {
n.eachRun(nodes)
n.eachRun(nodes, wg)
}

wg.Wait()
}

// Publish sends a message to the specified node's message queue.
Expand Down
29 changes: 18 additions & 11 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ type Edge struct {

// Node represents a node in the P2P network.
type Node struct {
ID ID
Latency float64
Edges map[ID]Edge
ID ID
ValidationLatency float64
QueuingLatency float64
Edges map[ID]Edge

RecvFrom map[string]map[ID]struct{} // content -> set of senders
SentTo map[string]map[ID]struct{} // content -> set of targets
Expand All @@ -40,13 +41,15 @@ func (n *Node) Degree() int {
}

// eachRun starts the message handling routine for the node.
func (n *Node) eachRun(network map[ID]*Node) {
go func() {
n.msgQueue = make(chan Message, 1000)
n.RecvFrom = make(map[string]map[ID]struct{})
n.SentTo = make(map[string]map[ID]struct{})
n.SeenAt = make(map[string]time.Time)
func (n *Node) eachRun(network map[ID]*Node, wg *sync.WaitGroup) {
defer wg.Done()

n.msgQueue = make(chan Message, 1000)
n.RecvFrom = make(map[string]map[ID]struct{})
n.SentTo = make(map[string]map[ID]struct{})
n.SeenAt = make(map[string]time.Time)

go func() {
for msg := range n.msgQueue {
first := false
var excludeSnapshot map[ID]struct{}
Expand All @@ -66,7 +69,7 @@ func (n *Node) eachRun(network map[ID]*Node) {

if first {
go func(content string, exclude map[ID]struct{}) {
time.Sleep(time.Duration(n.Latency) * time.Millisecond)
time.Sleep(time.Duration(n.ValidationLatency) * time.Millisecond)
n.publish(network, content, exclude)
}(msg.Content, excludeSnapshot)
}
Expand All @@ -86,6 +89,10 @@ func copyIDSet(src map[ID]struct{}) map[ID]struct{} {
// publish sends the message to neighbors, excluding 'exclude' and already-sent targets.
func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) {
n.mu.Lock()
defer n.mu.Unlock()

time.Sleep(time.Duration(n.QueuingLatency) * time.Millisecond)

if _, ok := n.SentTo[content]; !ok {
n.SentTo[content] = make(map[ID]struct{})
}
Expand All @@ -103,10 +110,10 @@ func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]stru
n.SentTo[content][edge.TargetID] = struct{}{}

edgeCopy := edge

go func(e Edge) {
time.Sleep(time.Duration(e.Latency) * time.Millisecond)
network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content}
}(edgeCopy)
}
n.mu.Unlock()
}
99 changes: 86 additions & 13 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,118 @@
package p2p_test

import (
"encoding/json"
"fmt"
"math"
"math/rand"
"os"
"sync"
"testing"
"time"

"github.com/elecbug/netkit/network-graph/algorithm"
"github.com/elecbug/netkit/network-graph/graph/standard_graph"
"github.com/elecbug/netkit/p2p"
)

func TestGenerateNetwork(t *testing.T) {
g := standard_graph.ErdosRenyiGraph(1000, 0.005, true)
g := standard_graph.ErdosRenyiGraph(1000, 0.05, true)
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())
src := rand.NewSource(time.Now().UnixNano())

nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) }
edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) }
queuingLatency := func() float64 { return p2p.LogNormalRand(5.0, 0.2, src) }

nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency)
nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency)
t.Logf("Generated network with %d nodes\n", len(nw))
for id, node := range nw {
t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges)
t.Logf("Node %d: validation_latency=%.2fms, queuing_latency=%.2fms, edges=%v\n", id, node.ValidationLatency, node.QueuingLatency, node.Edges)
}

p2p.RunNetworkSimulation(nw)
p2p.Publish(nw[0], "Hello, P2P Network!")
msg := "Hello, P2P World!"

time.Sleep(5 * time.Second)
p2p.RunNetworkSimulation(nw)
p2p.Publish(nw[0], msg)
time.Sleep(1 * time.Second)

count := 0
result := make(map[string]map[string]any)

for id, node := range nw {
c := len(node.SentTo["Hello, P2P Network!"])
c := len(node.SentTo[msg])
t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges))
t.Logf("Node %d data: recv: %v, sent: %v, seen: %v\n",
id,
node.RecvFrom["Hello, P2P Network!"],
node.SentTo["Hello, P2P Network!"],
node.SeenAt["Hello, P2P Network!"],
)

result[fmt.Sprintf("node_%d", id)] = map[string]any{}
result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg]
result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg]
result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg]

count += c
}

t.Logf("Total received count: %d\n", count)

data, _ := json.Marshal(result)

os.WriteFile("p2p_result.log", data, 0644)
}

func TestExpCase(t *testing.T) {
run := false

if run {
for i := 4; i <= 11; i++ {
wg := &sync.WaitGroup{}

for j := 0; j < 60; j++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j)

if _, err := os.Stat(filename); err == nil {
t.Logf("File %s already exists, skipping...\n", filename)
return
}

t.Logf("Experiment case: %02d-%03d\n", i, j)
r := rand.New(rand.NewSource(time.Now().UnixNano()))

n := r.Int()%20 + 170
g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true)

nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) }
edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(500), 0.01, rand.NewSource(time.Now().UnixNano())) }
queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) }

nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency)
msg := "Hello, P2P World!"

t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())

p2p.RunNetworkSimulation(nw)
p2p.Publish(nw[0], msg)
time.Sleep(5 * time.Second)

result := make(map[string]map[string]any)

for id, node := range nw {
result[fmt.Sprintf("node_%d", id)] = map[string]any{}
result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg]
result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg]
result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg]
}

data, _ := json.Marshal(result)

os.WriteFile(filename, data, 0644)

algorithm.CacheClear()
}(j)
}

wg.Wait()
}
}
}