Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ log

venv/
__pycache__/
*.pyc
*.pyc
temp
4 changes: 4 additions & 0 deletions network-graph/graph/standard_graph/erdos_reyni.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ func ErdosRenyiGraph(n int, p float64, isUndirected bool) *graph.Graph {

g := graph.New(isUndirected)

for i := 0; i < n; i++ {
g.AddNode(node.ID(toString(i)))
}

if isUndirected {
for i := 0; i < n; i++ {
for j := i + 1; j < n; j++ {
Expand Down
18 changes: 18 additions & 0 deletions p2p/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package p2p

import (
"math"
"math/rand"
)

// LogNormalRand generates a log-normally distributed random number
// with given mu and sigma parameters.
func LogNormalRand(mu, sigma float64, src rand.Source) float64 {
r := rand.New(src)

u1 := r.Float64()
u2 := r.Float64()
z := math.Sqrt(-2.0*math.Log(u1)) * math.Cos(2*math.Pi*u2)

return math.Exp(mu + sigma*z)
}
68 changes: 68 additions & 0 deletions p2p/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package p2p

import (
"strconv"

"github.com/elecbug/netkit/network-graph/graph"
"github.com/elecbug/netkit/network-graph/node"
)

// GenerateNetwork creates a P2P network from the given graph.
// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively.
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (map[ID]*Node, error) {
nodes := make(map[ID]*Node)
maps := make(map[node.ID]ID)

// create nodes
for _, gn := range g.Nodes() {
num, err := strconv.Atoi(gn.String())

if err != nil {
return nil, err
}

n := &Node{
ID: ID(num),
Latency: nodeLatency(),
Edges: make(map[ID]Edge),
}

nodes[n.ID] = n
maps[gn] = n.ID
}

for _, gn := range g.Nodes() {
num, err := strconv.Atoi(gn.String())

if err != nil {
return nil, err
}

n := nodes[ID(num)]

for _, neighbor := range g.Neighbors(gn) {
j := maps[neighbor]

edge := Edge{
TargetID: ID(j),
Latency: edgeLatency(),
}

n.Edges[edge.TargetID] = edge
}
}

return nodes, nil
}

// RunNetworkSimulation starts the message handling routines for all nodes in the network.
func RunNetworkSimulation(nodes map[ID]*Node) {
for _, n := range nodes {
n.eachRun(nodes)
}
}

// Publish sends a message to the specified node's message queue.
func Publish(node *Node, msg string) {
node.msgQueue <- Message{From: node.ID, Content: msg}
}
112 changes: 112 additions & 0 deletions p2p/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package p2p

import (
"sync"
"time"
)

// ID represents a unique identifier for a node in the P2P network.
type ID uint64

// Message represents a message sent between nodes in the P2P network.
type Message struct {
From ID
Content string
}

// Edge represents a connection from one node to another in the P2P network.
type Edge struct {
TargetID ID
Latency float64 // in milliseconds
}

// Node represents a node in the P2P network.
type Node struct {
ID ID
Latency float64
Edges map[ID]Edge

RecvFrom map[string]map[ID]struct{} // content -> set of senders
SentTo map[string]map[ID]struct{} // content -> set of targets
SeenAt map[string]time.Time // content -> first arrival time

msgQueue chan Message
mu sync.Mutex
}

// Degree returns the number of edges connected to the node.
func (n *Node) Degree() int {
return len(n.Edges)
}

// eachRun starts the message handling routine for the node.
func (n *Node) eachRun(network map[ID]*Node) {
go func() {
n.msgQueue = make(chan Message, 1000)
n.RecvFrom = make(map[string]map[ID]struct{})
n.SentTo = make(map[string]map[ID]struct{})
n.SeenAt = make(map[string]time.Time)

for msg := range n.msgQueue {
first := false
var excludeSnapshot map[ID]struct{}

n.mu.Lock()
if _, ok := n.RecvFrom[msg.Content]; !ok {
n.RecvFrom[msg.Content] = make(map[ID]struct{})
}
n.RecvFrom[msg.Content][msg.From] = struct{}{}

if _, ok := n.SeenAt[msg.Content]; !ok {
n.SeenAt[msg.Content] = time.Now()
first = true
excludeSnapshot = copyIDSet(n.RecvFrom[msg.Content])
}
n.mu.Unlock()

if first {
go func(content string, exclude map[ID]struct{}) {
time.Sleep(time.Duration(n.Latency) * time.Millisecond)
n.publish(network, content, exclude)
}(msg.Content, excludeSnapshot)
}
}
}()
}

// copyIDSet creates a shallow copy of a set of IDs.
func copyIDSet(src map[ID]struct{}) map[ID]struct{} {
dst := make(map[ID]struct{}, len(src))
for k := range src {
dst[k] = struct{}{}
}
return dst
}

// publish sends the message to neighbors, excluding 'exclude' and already-sent targets.
func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) {
n.mu.Lock()
if _, ok := n.SentTo[content]; !ok {
n.SentTo[content] = make(map[ID]struct{})
}

for _, edge := range n.Edges {
if _, wasSender := exclude[edge.TargetID]; wasSender {
continue
}
if _, already := n.SentTo[content][edge.TargetID]; already {
continue
}
if _, received := n.RecvFrom[content][edge.TargetID]; received {
continue
}
n.SentTo[content][edge.TargetID] = struct{}{}

edgeCopy := edge
go func(e Edge) {
time.Sleep(time.Duration(e.Latency) * time.Millisecond)
network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content}
}(edgeCopy)
}
n.mu.Unlock()
}
45 changes: 45 additions & 0 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package p2p_test

import (
"math/rand"
"testing"
"time"

"github.com/elecbug/netkit/network-graph/graph/standard_graph"
"github.com/elecbug/netkit/p2p"
)

func TestGenerateNetwork(t *testing.T) {
g := standard_graph.ErdosRenyiGraph(1000, 0.005, true)
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())
src := rand.NewSource(time.Now().UnixNano())

nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) }
edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) }

nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency)
t.Logf("Generated network with %d nodes\n", len(nw))
for id, node := range nw {
t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges)
}

p2p.RunNetworkSimulation(nw)
p2p.Publish(nw[0], "Hello, P2P Network!")

time.Sleep(5 * time.Second)

count := 0
for id, node := range nw {
c := len(node.SentTo["Hello, P2P Network!"])
t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges))
t.Logf("Node %d data: recv: %v, sent: %v, seen: %v\n",
id,
node.RecvFrom["Hello, P2P Network!"],
node.SentTo["Hello, P2P Network!"],
node.SeenAt["Hello, P2P Network!"],
)
count += c
}

t.Logf("Total received count: %d\n", count)
}