From e17175c3de61cafdf85df3729cfbe75d60744b16 Mon Sep 17 00:00:00 2001 From: elecbug Date: Tue, 14 Oct 2025 18:25:28 +0900 Subject: [PATCH] feat: update broadcast type and refactor code --- p2p/broadcast.go | 9 +++ p2p/network.go | 123 ++++++++++++++++++++++++++++++++++------ p2p/node.go | 144 +++++++++++++++++++++++++++-------------------- p2p/p2p_test.go | 136 +++++++++++++++++--------------------------- 4 files changed, 248 insertions(+), 164 deletions(-) create mode 100644 p2p/broadcast.go diff --git a/p2p/broadcast.go b/p2p/broadcast.go new file mode 100644 index 0000000..a8754c3 --- /dev/null +++ b/p2p/broadcast.go @@ -0,0 +1,9 @@ +package p2p + +// BroadcastProtocol defines the protocol used for broadcasting messages in the P2P network. +type BroadcastProtocol int + +var ( + Flooding BroadcastProtocol = 0 + Gossiping BroadcastProtocol = 1 +) diff --git a/p2p/network.go b/p2p/network.go index 4b06bbe..328a9b5 100644 --- a/p2p/network.go +++ b/p2p/network.go @@ -1,6 +1,8 @@ package p2p import ( + "context" + "fmt" "strconv" "sync" @@ -8,10 +10,28 @@ import ( "github.com/elecbug/netkit/network-graph/node" ) +// Message represents a message sent between nodes in the P2P network. +type Message struct { + From ID + Content string + Protocol BroadcastProtocol +} + +// Config holds configuration parameters for the P2P network. +type Config struct { + GossipFactor float64 // fraction of neighbors to gossip to +} + +// Config holds configuration parameters for the P2P network. +type Network struct { + nodes map[ID]*p2pNode + cfg *Config +} + // GenerateNetwork creates a P2P network from the given graph. // nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. -func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency func() float64) (map[ID]*Node, error) { - nodes := make(map[ID]*Node) +func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *Config) (*Network, error) { + nodes := make(map[ID]*p2pNode) maps := make(map[node.ID]ID) // create nodes @@ -22,14 +42,11 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency fu return nil, err } - n := &Node{ - ID: ID(num), - ValidationLatency: nodeLatency(), - Edges: make(map[ID]Edge), - } + n := newNode(ID(num), nodeLatency()) + n.edges = make(map[ID]p2pEdge) - nodes[n.ID] = n - maps[gn] = n.ID + nodes[n.id] = n + maps[gn] = n.id } for _, gn := range g.Nodes() { @@ -44,31 +61,101 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency, queuingLatency fu for _, neighbor := range g.Neighbors(gn) { j := maps[neighbor] - edge := Edge{ + edge := p2pEdge{ TargetID: ID(j), Latency: edgeLatency(), } - n.Edges[edge.TargetID] = edge + n.edges[edge.TargetID] = edge } } - return nodes, nil + return &Network{nodes: nodes, cfg: cfg}, nil } // RunNetworkSimulation starts the message handling routines for all nodes in the network. -func RunNetworkSimulation(nodes map[ID]*Node) { +func (n *Network) RunNetworkSimulation(ctx context.Context) { wg := &sync.WaitGroup{} - wg.Add(len(nodes)) + wg.Add(len(n.nodes)) - for _, n := range nodes { - n.eachRun(nodes, wg) + for _, node := range n.nodes { + node.eachRun(n, wg, ctx) } wg.Wait() } +// NodeIDs returns a slice of all node IDs in the network. +func (n *Network) NodeIDs() []ID { + ids := make([]ID, 0, len(n.nodes)) + + for id := range n.nodes { + ids = append(ids, id) + } + + return ids +} + +// GetNode retrieves a node by its ID. +func (n *Network) GetNode(id ID) *p2pNode { + return n.nodes[id] +} + // Publish sends a message to the specified node's message queue. -func Publish(node *Node, msg string) { - node.msgQueue <- Message{From: node.ID, Content: msg} +func (n *Network) Publish(nodeID ID, msg string, protocol BroadcastProtocol) error { + if node, ok := n.nodes[nodeID]; ok { + if !node.alive { + return fmt.Errorf("node %d is not alive", nodeID) + } + + node.msgQueue <- Message{From: nodeID, Content: msg, Protocol: protocol} + return nil + } + + return fmt.Errorf("node %d not found", nodeID) +} + +// Reachability calculates the fraction of nodes that have received the specified message. +func (n *Network) Reachability(msg string) float64 { + total := 0 + reached := 0 + + for _, node := range n.nodes { + total++ + node.mu.Lock() + if _, ok := node.seenAt[msg]; ok { + reached++ + } + node.mu.Unlock() + } + + return float64(reached) / float64(total) +} + +// MessageInfo returns a snapshot of the node's message-related information. +func (n *Network) MessageInfo(nodeID ID, content string) (map[string]any, error) { + node := n.nodes[nodeID] + + if node == nil { + return nil, fmt.Errorf("node %d not found", nodeID) + } + + node.mu.Lock() + defer node.mu.Unlock() + + info := make(map[string]any) + + info["recv"] = make([]ID, 0) + for k := range node.recvFrom[content] { + info["recv"] = append(info["recv"].([]ID), k) + } + + info["sent"] = make([]ID, 0) + for k := range node.sentTo[content] { + info["sent"] = append(info["sent"].([]ID), k) + } + + info["seen"] = node.seenAt[content].String() + + return info, nil } diff --git a/p2p/node.go b/p2p/node.go index a6e056d..5a562a6 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -1,6 +1,7 @@ package p2p import ( + "context" "sync" "time" ) @@ -8,73 +9,82 @@ import ( // ID represents a unique identifier for a node in the P2P network. type ID uint64 -// Message represents a message sent between nodes in the P2P network. -type Message struct { - From ID - Content string +// p2pNode represents a node in the P2P network. +type p2pNode struct { + id ID + nodeLatency float64 + edges map[ID]p2pEdge + + recvFrom map[string]map[ID]struct{} // content -> set of senders + sentTo map[string]map[ID]struct{} // content -> set of targets + seenAt map[string]time.Time // content -> first arrival time + + msgQueue chan Message + mu sync.Mutex + + alive bool } -// Edge represents a connection from one node to another in the P2P network. -type Edge struct { +// p2pEdge represents a connection from one node to another in the P2P network. +type p2pEdge struct { TargetID ID Latency float64 // in milliseconds } -// Node represents a node in the P2P network. -type Node struct { - ID ID - ValidationLatency float64 - QueuingLatency float64 - Edges map[ID]Edge +// newNode creates a new Node with the given ID and node latency. +func newNode(id ID, nodeLatency float64) *p2pNode { + return &p2pNode{ + id: id, + nodeLatency: nodeLatency, + edges: make(map[ID]p2pEdge), - RecvFrom map[string]map[ID]struct{} // content -> set of senders - SentTo map[string]map[ID]struct{} // content -> set of targets - SeenAt map[string]time.Time // content -> first arrival time + recvFrom: make(map[string]map[ID]struct{}), + sentTo: make(map[string]map[ID]struct{}), + seenAt: make(map[string]time.Time), - msgQueue chan Message - mu sync.Mutex -} - -// Degree returns the number of edges connected to the node. -func (n *Node) Degree() int { - return len(n.Edges) + msgQueue: make(chan Message, 1000), + mu: sync.Mutex{}, + } } // eachRun starts the message handling routine for the node. -func (n *Node) eachRun(network map[ID]*Node, wg *sync.WaitGroup) { +func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() - n.msgQueue = make(chan Message, 1000) - n.RecvFrom = make(map[string]map[ID]struct{}) - n.SentTo = make(map[string]map[ID]struct{}) - n.SeenAt = make(map[string]time.Time) + go func(ctx context.Context) { + n.alive = true - go func() { for msg := range n.msgQueue { - first := false - var excludeSnapshot map[ID]struct{} - - n.mu.Lock() - if _, ok := n.RecvFrom[msg.Content]; !ok { - n.RecvFrom[msg.Content] = make(map[ID]struct{}) - } - n.RecvFrom[msg.Content][msg.From] = struct{}{} - - if _, ok := n.SeenAt[msg.Content]; !ok { - n.SeenAt[msg.Content] = time.Now() - first = true - excludeSnapshot = copyIDSet(n.RecvFrom[msg.Content]) - } - n.mu.Unlock() - - if first { - go func(content string, exclude map[ID]struct{}) { - time.Sleep(time.Duration(n.ValidationLatency) * time.Millisecond) - n.publish(network, content, exclude) - }(msg.Content, excludeSnapshot) + select { + case <-ctx.Done(): + n.alive = false + return + default: + first := false + var excludeSnapshot map[ID]struct{} + + n.mu.Lock() + if _, ok := n.recvFrom[msg.Content]; !ok { + n.recvFrom[msg.Content] = make(map[ID]struct{}) + } + n.recvFrom[msg.Content][msg.From] = struct{}{} + + if _, ok := n.seenAt[msg.Content]; !ok { + n.seenAt[msg.Content] = time.Now() + first = true + excludeSnapshot = copyIDSet(n.recvFrom[msg.Content]) + } + n.mu.Unlock() + + if first { + go func(msg Message, exclude map[ID]struct{}) { + time.Sleep(time.Duration(n.nodeLatency) * time.Millisecond) + n.publish(network, msg, exclude) + }(msg, excludeSnapshot) + } } } - }() + }(ctx) } // copyIDSet creates a shallow copy of a set of IDs. @@ -87,33 +97,45 @@ func copyIDSet(src map[ID]struct{}) map[ID]struct{} { } // publish sends the message to neighbors, excluding 'exclude' and already-sent targets. -func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) { +func (n *p2pNode) publish(network *Network, msg Message, exclude map[ID]struct{}) { + content := msg.Content + protocol := msg.Protocol + n.mu.Lock() defer n.mu.Unlock() - time.Sleep(time.Duration(n.QueuingLatency) * time.Millisecond) - - if _, ok := n.SentTo[content]; !ok { - n.SentTo[content] = make(map[ID]struct{}) + if _, ok := n.sentTo[content]; !ok { + n.sentTo[content] = make(map[ID]struct{}) } - for _, edge := range n.Edges { + willSendEdges := make([]p2pEdge, 0) + + for _, edge := range n.edges { if _, wasSender := exclude[edge.TargetID]; wasSender { continue } - if _, already := n.SentTo[content][edge.TargetID]; already { + if _, already := n.sentTo[content][edge.TargetID]; already { continue } - if _, received := n.RecvFrom[content][edge.TargetID]; received { + if _, received := n.recvFrom[content][edge.TargetID]; received { continue } - n.SentTo[content][edge.TargetID] = struct{}{} + n.sentTo[content][edge.TargetID] = struct{}{} + + willSendEdges = append(willSendEdges, edge) + } + + if protocol == Gossiping && len(willSendEdges) > 0 { + k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor) + willSendEdges = willSendEdges[:k] + } + for _, edge := range willSendEdges { edgeCopy := edge - go func(e Edge) { + go func(e p2pEdge) { time.Sleep(time.Duration(e.Latency) * time.Millisecond) - network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content} + network.nodes[e.TargetID].msgQueue <- Message{From: n.id, Content: content, Protocol: protocol} }(edgeCopy) } } diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 22fd484..3190d41 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -1,118 +1,84 @@ package p2p_test import ( + "context" "encoding/json" "fmt" "math" "math/rand" "os" - "sync" "testing" "time" - "github.com/elecbug/netkit/network-graph/algorithm" "github.com/elecbug/netkit/network-graph/graph/standard_graph" "github.com/elecbug/netkit/p2p" ) func TestGenerateNetwork(t *testing.T) { - g := standard_graph.ErdosRenyiGraph(1000, 0.05, true) + g := standard_graph.ErdosRenyiGraph(1000, 50.000/1000, true) t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) src := rand.NewSource(time.Now().UnixNano()) - nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) } - edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) } - queuingLatency := func() float64 { return p2p.LogNormalRand(5.0, 0.2, src) } + nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5, src) } + edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3, src) } - nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) - t.Logf("Generated network with %d nodes\n", len(nw)) - for id, node := range nw { - t.Logf("Node %d: validation_latency=%.2fms, queuing_latency=%.2fms, edges=%v\n", id, node.ValidationLatency, node.QueuingLatency, node.Edges) + nw, err := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) + if err != nil { + t.Fatalf("Failed to generate network: %v", err) } - msg := "Hello, P2P World!" + t.Logf("Generated network with %d nodes\n", len(nw.NodeIDs())) - p2p.RunNetworkSimulation(nw) - p2p.Publish(nw[0], msg) - time.Sleep(1 * time.Second) + msg1 := "Hello, P2P World!" + msg2 := "Goodbye, P2P World!" + msg3 := "The quick brown fox jumps over the lazy dog." - count := 0 - result := make(map[string]map[string]any) - - for id, node := range nw { - c := len(node.SentTo[msg]) - t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nw.RunNetworkSimulation(ctx) - result[fmt.Sprintf("node_%d", id)] = map[string]any{} - result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] - result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] - result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] + t.Logf("Publishing message '%s' from node %d\n", msg1, nw.NodeIDs()[0]) + err = nw.Publish(nw.NodeIDs()[0], msg1, p2p.Flooding) + if err != nil { + t.Fatalf("Failed to publish message: %v", err) + } + time.Sleep(1000 * time.Millisecond) + t.Logf("Reachability of message '%s': %f\n", msg1, nw.Reachability(msg1)) - count += c + t.Logf("Publishing message '%s' from node %d\n", msg2, nw.NodeIDs()[1]) + err = nw.Publish(nw.NodeIDs()[1], msg2, p2p.Gossiping) + if err != nil { + t.Fatalf("Failed to publish message: %v", err) + } + time.Sleep(300 * time.Millisecond) + cancel() + time.Sleep(700 * time.Millisecond) + t.Logf("Reachability of message '%s': %f\n", msg2, nw.Reachability(msg2)) + + nw.RunNetworkSimulation(context.Background()) + t.Logf("Publishing message '%s' from node %d\n", msg3, nw.NodeIDs()[2]) + err = nw.Publish(nw.NodeIDs()[2], msg3, p2p.Gossiping) + if err != nil { + t.Fatalf("Failed to publish message: %v", err) } + time.Sleep(1000 * time.Millisecond) + t.Logf("Reachability of message '%s': %f\n", msg3, nw.Reachability(msg3)) - t.Logf("Total received count: %d\n", count) + result := make(map[string]map[string]any) + + for _, nodeID := range nw.NodeIDs() { + if info, err := nw.MessageInfo(nodeID, msg1); err == nil { + result[fmt.Sprintf("msg_1-node_%d", nodeID)] = info + } + if info, err := nw.MessageInfo(nodeID, msg2); err == nil { + result[fmt.Sprintf("msg_2-node_%d", nodeID)] = info + } + if info, err := nw.MessageInfo(nodeID, msg3); err == nil { + result[fmt.Sprintf("msg_3-node_%d", nodeID)] = info + } + } data, _ := json.Marshal(result) os.WriteFile("p2p_result.log", data, 0644) } - -func TestExpCase(t *testing.T) { - run := false - - if run { - for i := 4; i <= 11; i++ { - wg := &sync.WaitGroup{} - - for j := 0; j < 60; j++ { - wg.Add(1) - go func(j int) { - defer wg.Done() - filename := fmt.Sprintf("temp/p2p_result-%02d-%03d.log", i, j) - - if _, err := os.Stat(filename); err == nil { - t.Logf("File %s already exists, skipping...\n", filename) - return - } - - t.Logf("Experiment case: %02d-%03d\n", i, j) - r := rand.New(rand.NewSource(time.Now().UnixNano())) - - n := r.Int()%20 + 170 - g := standard_graph.ErdosRenyiGraph(n, float64(i)/float64(n), true) - - nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) } - edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(500), 0.01, rand.NewSource(time.Now().UnixNano())) } - queuingLatency := func() float64 { return p2p.LogNormalRand(math.Log(0), 0.01, rand.NewSource(time.Now().UnixNano())) } - - nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, queuingLatency) - msg := "Hello, P2P World!" - - t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) - - p2p.RunNetworkSimulation(nw) - p2p.Publish(nw[0], msg) - time.Sleep(5 * time.Second) - - result := make(map[string]map[string]any) - - for id, node := range nw { - result[fmt.Sprintf("node_%d", id)] = map[string]any{} - result[fmt.Sprintf("node_%d", id)]["recv"] = node.RecvFrom[msg] - result[fmt.Sprintf("node_%d", id)]["sent"] = node.SentTo[msg] - result[fmt.Sprintf("node_%d", id)]["seen"] = node.SeenAt[msg] - } - - data, _ := json.Marshal(result) - - os.WriteFile(filename, data, 0644) - - algorithm.CacheClear() - }(j) - } - - wg.Wait() - } - } -}