Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64, cf
j := maps[neighbor]

edge := p2pEdge{
TargetID: PeerID(j),
Latency: edgeLatency(),
targetID: PeerID(j),
edgeLatency: edgeLatency(),
}

n.edges[edge.TargetID] = edge
n.edges[edge.targetID] = edge
}
}

Expand Down
76 changes: 40 additions & 36 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type p2pNode struct {

// p2pEdge represents a connection from one node to another in the P2P network.
type p2pEdge struct {
TargetID PeerID
Latency float64 // in milliseconds
targetID PeerID
edgeLatency float64 // in milliseconds
}

// newNode creates a new Node with the given ID and node latency.
Expand Down Expand Up @@ -58,7 +58,6 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont
return
default:
first := false
var excludeSnapshot map[PeerID]struct{}

n.mu.Lock()
if _, ok := n.recvFrom[msg.Content]; !ok {
Expand All @@ -69,32 +68,31 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont
if _, ok := n.seenAt[msg.Content]; !ok {
n.seenAt[msg.Content] = time.Now()
first = true
excludeSnapshot = copyIDSet(n.recvFrom[msg.Content])
}
n.mu.Unlock()

if first {
go func(msg Message, exclude map[PeerID]struct{}) {
go func(msg Message) {
time.Sleep(time.Duration(n.nodeLatency) * time.Millisecond)
n.publish(network, msg, exclude)
}(msg, excludeSnapshot)
n.publish(network, msg)
}(msg)
}
}
}
}(ctx, wg)
}

// copyIDSet creates a shallow copy of a set of IDs.
func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} {
dst := make(map[PeerID]struct{}, len(src))
for k := range src {
dst[k] = struct{}{}
}
return dst
}
// // copyIDSet creates a shallow copy of a set of IDs.
// func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} {
// dst := make(map[PeerID]struct{}, len(src))
// for k := range src {
// dst[k] = struct{}{}
// }
// return dst
// }

// publish sends the message to neighbors, excluding 'exclude' and already-sent targets.
func (n *p2pNode) publish(network *Network, msg Message, exclude map[PeerID]struct{}) {
func (n *p2pNode) publish(network *Network, msg Message) {
content := msg.Content
protocol := msg.Protocol
hopCount := msg.HopCount
Expand All @@ -105,40 +103,46 @@ func (n *p2pNode) publish(network *Network, msg Message, exclude map[PeerID]stru
if _, ok := n.sentTo[content]; !ok {
n.sentTo[content] = make(map[PeerID]struct{})
}
if _, ok := n.recvFrom[content]; !ok {
n.recvFrom[content] = make(map[PeerID]struct{})
}

willSendEdges := make([]p2pEdge, 0)

for _, edge := range n.edges {
if _, wasSender := exclude[edge.TargetID]; wasSender {
continue
}
if _, already := n.sentTo[content][edge.TargetID]; already {
continue
}
if _, received := n.recvFrom[content][edge.TargetID]; received {
continue
if protocol == Flooding || protocol == Gossiping {
for _, edge := range n.edges {
if _, already := n.sentTo[content][edge.targetID]; already {
continue
}
if _, received := n.recvFrom[content][edge.targetID]; received {
continue
}
n.sentTo[content][edge.targetID] = struct{}{}

willSendEdges = append(willSendEdges, edge)
}
n.sentTo[content][edge.TargetID] = struct{}{}

willSendEdges = append(willSendEdges, edge)
}
if protocol == Gossiping && len(willSendEdges) > 0 {
rand.Shuffle(len(willSendEdges), func(i, j int) {
willSendEdges[i], willSendEdges[j] = willSendEdges[j], willSendEdges[i]
})

if protocol == Gossiping && len(willSendEdges) > 0 {
rand.Shuffle(len(willSendEdges), func(i, j int) {
willSendEdges[i], willSendEdges[j] = willSendEdges[j], willSendEdges[i]
})
k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
willSendEdges = willSendEdges[:k]
}
} else if protocol == Custom {

k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
willSendEdges = willSendEdges[:k]
} else {
return
}

for _, edge := range willSendEdges {
edgeCopy := edge

go func(e p2pEdge) {
time.Sleep(time.Duration(e.Latency) * time.Millisecond)
time.Sleep(time.Duration(e.edgeLatency) * time.Millisecond)

network.nodes[e.TargetID].msgQueue <- Message{
network.nodes[e.targetID].msgQueue <- Message{
From: n.id,
Content: content,
Protocol: protocol,
Expand Down