diff --git a/p2p/network.go b/p2p/network.go index 9a19a91..1409243 100644 --- a/p2p/network.go +++ b/p2p/network.go @@ -50,11 +50,11 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64, cf j := maps[neighbor] edge := p2pEdge{ - TargetID: PeerID(j), - Latency: edgeLatency(), + targetID: PeerID(j), + edgeLatency: edgeLatency(), } - n.edges[edge.TargetID] = edge + n.edges[edge.targetID] = edge } } diff --git a/p2p/node.go b/p2p/node.go index 9ae851c..6ad51c9 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -25,8 +25,8 @@ type p2pNode struct { // p2pEdge represents a connection from one node to another in the P2P network. type p2pEdge struct { - TargetID PeerID - Latency float64 // in milliseconds + targetID PeerID + edgeLatency float64 // in milliseconds } // newNode creates a new Node with the given ID and node latency. @@ -58,7 +58,6 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont return default: first := false - var excludeSnapshot map[PeerID]struct{} n.mu.Lock() if _, ok := n.recvFrom[msg.Content]; !ok { @@ -69,32 +68,31 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont if _, ok := n.seenAt[msg.Content]; !ok { n.seenAt[msg.Content] = time.Now() first = true - excludeSnapshot = copyIDSet(n.recvFrom[msg.Content]) } n.mu.Unlock() if first { - go func(msg Message, exclude map[PeerID]struct{}) { + go func(msg Message) { time.Sleep(time.Duration(n.nodeLatency) * time.Millisecond) - n.publish(network, msg, exclude) - }(msg, excludeSnapshot) + n.publish(network, msg) + }(msg) } } } }(ctx, wg) } -// copyIDSet creates a shallow copy of a set of IDs. -func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} { - dst := make(map[PeerID]struct{}, len(src)) - for k := range src { - dst[k] = struct{}{} - } - return dst -} +// // copyIDSet creates a shallow copy of a set of IDs. +// func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} { +// dst := make(map[PeerID]struct{}, len(src)) +// for k := range src { +// dst[k] = struct{}{} +// } +// return dst +// } // publish sends the message to neighbors, excluding 'exclude' and already-sent targets. -func (n *p2pNode) publish(network *Network, msg Message, exclude map[PeerID]struct{}) { +func (n *p2pNode) publish(network *Network, msg Message) { content := msg.Content protocol := msg.Protocol hopCount := msg.HopCount @@ -105,40 +103,46 @@ func (n *p2pNode) publish(network *Network, msg Message, exclude map[PeerID]stru if _, ok := n.sentTo[content]; !ok { n.sentTo[content] = make(map[PeerID]struct{}) } + if _, ok := n.recvFrom[content]; !ok { + n.recvFrom[content] = make(map[PeerID]struct{}) + } willSendEdges := make([]p2pEdge, 0) - for _, edge := range n.edges { - if _, wasSender := exclude[edge.TargetID]; wasSender { - continue - } - if _, already := n.sentTo[content][edge.TargetID]; already { - continue - } - if _, received := n.recvFrom[content][edge.TargetID]; received { - continue + if protocol == Flooding || protocol == Gossiping { + for _, edge := range n.edges { + if _, already := n.sentTo[content][edge.targetID]; already { + continue + } + if _, received := n.recvFrom[content][edge.targetID]; received { + continue + } + n.sentTo[content][edge.targetID] = struct{}{} + + willSendEdges = append(willSendEdges, edge) } - n.sentTo[content][edge.TargetID] = struct{}{} - willSendEdges = append(willSendEdges, edge) - } + if protocol == Gossiping && len(willSendEdges) > 0 { + rand.Shuffle(len(willSendEdges), func(i, j int) { + willSendEdges[i], willSendEdges[j] = willSendEdges[j], willSendEdges[i] + }) - if protocol == Gossiping && len(willSendEdges) > 0 { - rand.Shuffle(len(willSendEdges), func(i, j int) { - willSendEdges[i], willSendEdges[j] = willSendEdges[j], willSendEdges[i] - }) + k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor) + willSendEdges = willSendEdges[:k] + } + } else if protocol == Custom { - k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor) - willSendEdges = willSendEdges[:k] + } else { + return } for _, edge := range willSendEdges { edgeCopy := edge go func(e p2pEdge) { - time.Sleep(time.Duration(e.Latency) * time.Millisecond) + time.Sleep(time.Duration(e.edgeLatency) * time.Millisecond) - network.nodes[e.TargetID].msgQueue <- Message{ + network.nodes[e.targetID].msgQueue <- Message{ From: n.id, Content: content, Protocol: protocol,