Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions p2p/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package p2p

// BroadcastProtocol defines the protocol used for broadcasting messages in the P2P network.
type BroadcastProtocol int

var (
Flooding BroadcastProtocol = 0
Gossiping BroadcastProtocol = 1
)
123 changes: 105 additions & 18 deletions p2p/network.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
package p2p

import (
"context"
"fmt"
"strconv"
"sync"

"github.com/elecbug/netkit/network-graph/graph"
"github.com/elecbug/netkit/network-graph/node"
)

// Message represents a message sent between nodes in the P2P network.
type Message struct {
From ID
Content string
Protocol BroadcastProtocol
}

// Config holds configuration parameters for the P2P network.
type Config struct {
GossipFactor float64 // fraction of neighbors to gossip to
}

// Config holds configuration parameters for the P2P network.
type Network struct {
nodes map[ID]*p2pNode
cfg *Config
}

// GenerateNetwork creates a P2P network from the given graph.
// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively.
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency func() float64) (map[ID]*Node, error) {
nodes := make(map[ID]*Node)
func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *Config) (*Network, error) {
nodes := make(map[ID]*p2pNode)
maps := make(map[node.ID]ID)

// create nodes
Expand All @@ -22,14 +42,11 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency fu
return nil, err
}

n := &Node{
ID: ID(num),
ValidationLatency: nodeLatency(),
Edges: make(map[ID]Edge),
}
n := newNode(ID(num), nodeLatency())
n.edges = make(map[ID]p2pEdge)

nodes[n.ID] = n
maps[gn] = n.ID
nodes[n.id] = n
maps[gn] = n.id
}

for _, gn := range g.Nodes() {
Expand All @@ -44,31 +61,101 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency fu
for _, neighbor := range g.Neighbors(gn) {
j := maps[neighbor]

edge := Edge{
edge := p2pEdge{
TargetID: ID(j),
Latency: edgeLatency(),
}

n.Edges[edge.TargetID] = edge
n.edges[edge.TargetID] = edge
}
}

return nodes, nil
return &Network{nodes: nodes, cfg: cfg}, nil
}

// RunNetworkSimulation starts the message handling routines for all nodes in the network.
func RunNetworkSimulation(nodes map[ID]*Node) {
func (n *Network) RunNetworkSimulation(ctx context.Context) {
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
wg.Add(len(n.nodes))

for _, n := range nodes {
n.eachRun(nodes, wg)
for _, node := range n.nodes {
node.eachRun(n, wg, ctx)
}

wg.Wait()
}

// NodeIDs returns a slice of all node IDs in the network.
func (n *Network) NodeIDs() []ID {
ids := make([]ID, 0, len(n.nodes))

for id := range n.nodes {
ids = append(ids, id)
}

return ids
}

// GetNode retrieves a node by its ID.
func (n *Network) GetNode(id ID) *p2pNode {
return n.nodes[id]
}

// Publish sends a message to the specified node's message queue.
func Publish(node *Node, msg string) {
node.msgQueue <- Message{From: node.ID, Content: msg}
func (n *Network) Publish(nodeID ID, msg string, protocol BroadcastProtocol) error {
if node, ok := n.nodes[nodeID]; ok {
if !node.alive {
return fmt.Errorf("node %d is not alive", nodeID)
}

node.msgQueue <- Message{From: nodeID, Content: msg, Protocol: protocol}
return nil
}

return fmt.Errorf("node %d not found", nodeID)
}

// Reachability calculates the fraction of nodes that have received the specified message.
func (n *Network) Reachability(msg string) float64 {
total := 0
reached := 0

for _, node := range n.nodes {
total++
node.mu.Lock()
if _, ok := node.seenAt[msg]; ok {
reached++
}
node.mu.Unlock()
}

return float64(reached) / float64(total)
}

// MessageInfo returns a snapshot of the node's message-related information.
func (n *Network) MessageInfo(nodeID ID, content string) (map[string]any, error) {
node := n.nodes[nodeID]

if node == nil {
return nil, fmt.Errorf("node %d not found", nodeID)
}

node.mu.Lock()
defer node.mu.Unlock()

info := make(map[string]any)

info["recv"] = make([]ID, 0)
for k := range node.recvFrom[content] {
info["recv"] = append(info["recv"].([]ID), k)
}

info["sent"] = make([]ID, 0)
for k := range node.sentTo[content] {
info["sent"] = append(info["sent"].([]ID), k)
}

info["seen"] = node.seenAt[content].String()

return info, nil
}
144 changes: 83 additions & 61 deletions p2p/node.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,90 @@
package p2p

import (
"context"
"sync"
"time"
)

// ID represents a unique identifier for a node in the P2P network.
type ID uint64

// Message represents a message sent between nodes in the P2P network.
type Message struct {
From ID
Content string
// p2pNode represents a node in the P2P network.
type p2pNode struct {
id ID
nodeLatency float64
edges map[ID]p2pEdge

recvFrom map[string]map[ID]struct{} // content -> set of senders
sentTo map[string]map[ID]struct{} // content -> set of targets
seenAt map[string]time.Time // content -> first arrival time

msgQueue chan Message
mu sync.Mutex

alive bool
}

// Edge represents a connection from one node to another in the P2P network.
type Edge struct {
// p2pEdge represents a connection from one node to another in the P2P network.
type p2pEdge struct {
TargetID ID
Latency float64 // in milliseconds
}

// Node represents a node in the P2P network.
type Node struct {
ID ID
ValidationLatency float64
QueuingLatency float64
Edges map[ID]Edge
// newNode creates a new Node with the given ID and node latency.
func newNode(id ID, nodeLatency float64) *p2pNode {
return &p2pNode{
id: id,
nodeLatency: nodeLatency,
edges: make(map[ID]p2pEdge),

RecvFrom map[string]map[ID]struct{} // content -> set of senders
SentTo map[string]map[ID]struct{} // content -> set of targets
SeenAt map[string]time.Time // content -> first arrival time
recvFrom: make(map[string]map[ID]struct{}),
sentTo: make(map[string]map[ID]struct{}),
seenAt: make(map[string]time.Time),

msgQueue chan Message
mu sync.Mutex
}

// Degree returns the number of edges connected to the node.
func (n *Node) Degree() int {
return len(n.Edges)
msgQueue: make(chan Message, 1000),
mu: sync.Mutex{},
}
}

// eachRun starts the message handling routine for the node.
func (n *Node) eachRun(network map[ID]*Node, wg *sync.WaitGroup) {
func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done()

n.msgQueue = make(chan Message, 1000)
n.RecvFrom = make(map[string]map[ID]struct{})
n.SentTo = make(map[string]map[ID]struct{})
n.SeenAt = make(map[string]time.Time)
go func(ctx context.Context) {
n.alive = true

go func() {
for msg := range n.msgQueue {
first := false
var excludeSnapshot map[ID]struct{}

n.mu.Lock()
if _, ok := n.RecvFrom[msg.Content]; !ok {
n.RecvFrom[msg.Content] = make(map[ID]struct{})
}
n.RecvFrom[msg.Content][msg.From] = struct{}{}

if _, ok := n.SeenAt[msg.Content]; !ok {
n.SeenAt[msg.Content] = time.Now()
first = true
excludeSnapshot = copyIDSet(n.RecvFrom[msg.Content])
}
n.mu.Unlock()

if first {
go func(content string, exclude map[ID]struct{}) {
time.Sleep(time.Duration(n.ValidationLatency) * time.Millisecond)
n.publish(network, content, exclude)
}(msg.Content, excludeSnapshot)
select {
case <-ctx.Done():
n.alive = false
return
default:
first := false
var excludeSnapshot map[ID]struct{}

n.mu.Lock()
if _, ok := n.recvFrom[msg.Content]; !ok {
n.recvFrom[msg.Content] = make(map[ID]struct{})
}
n.recvFrom[msg.Content][msg.From] = struct{}{}

if _, ok := n.seenAt[msg.Content]; !ok {
n.seenAt[msg.Content] = time.Now()
first = true
excludeSnapshot = copyIDSet(n.recvFrom[msg.Content])
}
n.mu.Unlock()

if first {
go func(msg Message, exclude map[ID]struct{}) {
time.Sleep(time.Duration(n.nodeLatency) * time.Millisecond)
n.publish(network, msg, exclude)
}(msg, excludeSnapshot)
}
}
}
}()
}(ctx)
}

// copyIDSet creates a shallow copy of a set of IDs.
Expand All @@ -87,33 +97,45 @@ func copyIDSet(src map[ID]struct{}) map[ID]struct{} {
}

// publish sends the message to neighbors, excluding 'exclude' and already-sent targets.
func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) {
func (n *p2pNode) publish(network *Network, msg Message, exclude map[ID]struct{}) {
content := msg.Content
protocol := msg.Protocol

n.mu.Lock()
defer n.mu.Unlock()

time.Sleep(time.Duration(n.QueuingLatency) * time.Millisecond)

if _, ok := n.SentTo[content]; !ok {
n.SentTo[content] = make(map[ID]struct{})
if _, ok := n.sentTo[content]; !ok {
n.sentTo[content] = make(map[ID]struct{})
}

for _, edge := range n.Edges {
willSendEdges := make([]p2pEdge, 0)

for _, edge := range n.edges {
if _, wasSender := exclude[edge.TargetID]; wasSender {
continue
}
if _, already := n.SentTo[content][edge.TargetID]; already {
if _, already := n.sentTo[content][edge.TargetID]; already {
continue
}
if _, received := n.RecvFrom[content][edge.TargetID]; received {
if _, received := n.recvFrom[content][edge.TargetID]; received {
continue
}
n.SentTo[content][edge.TargetID] = struct{}{}
n.sentTo[content][edge.TargetID] = struct{}{}

willSendEdges = append(willSendEdges, edge)
}

if protocol == Gossiping && len(willSendEdges) > 0 {
k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
willSendEdges = willSendEdges[:k]
}

for _, edge := range willSendEdges {
edgeCopy := edge

go func(e Edge) {
go func(e p2pEdge) {
time.Sleep(time.Duration(e.Latency) * time.Millisecond)
network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content}
network.nodes[e.TargetID].msgQueue <- Message{From: n.id, Content: content, Protocol: protocol}
}(edgeCopy)
}
}
Loading
Loading