-
Notifications
You must be signed in to change notification settings - Fork 1
refactor: move federated server logic to its own service #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,20 +2,9 @@ package cli | |||||||||||||||||
|
|
||||||||||||||||||
| import ( | ||||||||||||||||||
| "context" | ||||||||||||||||||
| "errors" | ||||||||||||||||||
| "fmt" | ||||||||||||||||||
| "io" | ||||||||||||||||||
| "net" | ||||||||||||||||||
| "time" | ||||||||||||||||||
|
|
||||||||||||||||||
| "math/rand/v2" | ||||||||||||||||||
|
|
||||||||||||||||||
| cliContext "github.com/mudler/LocalAI/core/cli/context" | ||||||||||||||||||
| "github.com/mudler/LocalAI/core/p2p" | ||||||||||||||||||
| "github.com/mudler/edgevpn/pkg/node" | ||||||||||||||||||
| "github.com/mudler/edgevpn/pkg/protocol" | ||||||||||||||||||
| "github.com/mudler/edgevpn/pkg/types" | ||||||||||||||||||
| "github.com/rs/zerolog/log" | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| type FederatedCLI struct { | ||||||||||||||||||
|
|
@@ -24,107 +13,7 @@ type FederatedCLI struct { | |||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func (f *FederatedCLI) Run(ctx *cliContext.Context) error { | ||||||||||||||||||
| fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a non-p2p build, running the federated CLI now always fails with "not implemented" because this caller was switched to FederatedServer.Start while the new implementation exists only in the p2p-only file; trigger: build without the Either move the real FederatedServer.Start implementation into an untagged file, add the same build tag to the CLI command so it is unavailable without Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||
|
|
||||||||||||||||||
| n, err := p2p.NewNode(f.Peer2PeerToken) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return fmt.Errorf("creating a new node: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
| err = n.Start(context.Background()) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return fmt.Errorf("creating a new node: %w", err) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil { | ||||||||||||||||||
| return err | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| return Proxy(context.Background(), n, f.Address, p2p.FederatedID) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error { | ||||||||||||||||||
|
|
||||||||||||||||||
| log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) | ||||||||||||||||||
| // Open local port for listening | ||||||||||||||||||
| l, err := net.Listen("tcp", listenAddr) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| log.Error().Err(err).Msg("Error listening") | ||||||||||||||||||
| return err | ||||||||||||||||||
| } | ||||||||||||||||||
| // ll.Info("Binding local port on", srcaddr) | ||||||||||||||||||
|
|
||||||||||||||||||
| ledger, _ := node.Ledger() | ||||||||||||||||||
|
|
||||||||||||||||||
| // Announce ourselves so nodes accepts our connection | ||||||||||||||||||
| ledger.Announce( | ||||||||||||||||||
| ctx, | ||||||||||||||||||
| 10*time.Second, | ||||||||||||||||||
| func() { | ||||||||||||||||||
| // Retrieve current ID for ip in the blockchain | ||||||||||||||||||
| //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) | ||||||||||||||||||
| // If mismatch, update the blockchain | ||||||||||||||||||
| //if !found { | ||||||||||||||||||
| updatedMap := map[string]interface{}{} | ||||||||||||||||||
| updatedMap[node.Host().ID().String()] = &types.User{ | ||||||||||||||||||
| PeerID: node.Host().ID().String(), | ||||||||||||||||||
| Timestamp: time.Now().String(), | ||||||||||||||||||
| } | ||||||||||||||||||
| ledger.Add(protocol.UsersLedgerKey, updatedMap) | ||||||||||||||||||
| // } | ||||||||||||||||||
| }, | ||||||||||||||||||
| ) | ||||||||||||||||||
|
|
||||||||||||||||||
| defer l.Close() | ||||||||||||||||||
| for { | ||||||||||||||||||
| select { | ||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||
| return errors.New("context canceled") | ||||||||||||||||||
| default: | ||||||||||||||||||
| log.Debug().Msg("New for connection") | ||||||||||||||||||
| // Listen for an incoming connection. | ||||||||||||||||||
| conn, err := l.Accept() | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| fmt.Println("Error accepting: ", err.Error()) | ||||||||||||||||||
| continue | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Handle connections in a new goroutine, forwarding to the p2p service | ||||||||||||||||||
| go func() { | ||||||||||||||||||
| var tunnelAddresses []string | ||||||||||||||||||
| for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) { | ||||||||||||||||||
| if v.IsOnline() { | ||||||||||||||||||
| tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) | ||||||||||||||||||
| } else { | ||||||||||||||||||
| log.Info().Msgf("Node %s is offline", v.ID) | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // open a TCP stream to one of the tunnels | ||||||||||||||||||
| // chosen randomly | ||||||||||||||||||
| // TODO: optimize this and track usage | ||||||||||||||||||
| tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] | ||||||||||||||||||
|
|
||||||||||||||||||
| tunnelConn, err := net.Dial("tcp", tunnelAddr) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| log.Error().Err(err).Msg("Error connecting to tunnel") | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) | ||||||||||||||||||
| closer := make(chan struct{}, 2) | ||||||||||||||||||
| go copyStream(closer, tunnelConn, conn) | ||||||||||||||||||
| go copyStream(closer, conn, tunnelConn) | ||||||||||||||||||
| <-closer | ||||||||||||||||||
|
|
||||||||||||||||||
| tunnelConn.Close() | ||||||||||||||||||
| conn.Close() | ||||||||||||||||||
| // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) | ||||||||||||||||||
| }() | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { | ||||||||||||||||||
| defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy | ||||||||||||||||||
| io.Copy(dst, src) | ||||||||||||||||||
| return fs.Start(context.Background()) | ||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new code passes
Suggested change
Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new code passes
Suggested change
Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ignores the CLI context and starts the server with a detached background context, so shutdown and cancellation will not propagate; pass the provided context instead.
Suggested change
Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||
| } | ||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| package p2p | ||
|
|
||
| type FederatedServer struct { | ||
| listenAddr, service, p2ptoken string | ||
| } | ||
|
|
||
| func NewFederatedServer(listenAddr, service, p2pToken string) *FederatedServer { | ||
| return &FederatedServer{ | ||
| listenAddr: listenAddr, | ||
| service: service, | ||
| p2ptoken: p2pToken, | ||
| } | ||
| } | ||
|
Comment on lines
+7
to
+13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The p2p token is stored as a plain string field with no validation or sanitization; an empty or malformed token passed to NewFederatedServer will propagate silently and be used to initialize a network node, potentially allowing unauthenticated federation. Add a non-empty check and return an error or panic early. Suggested fixfunc NewFederatedServer(listenAddr, service, p2pToken string) (*FederatedServer, error) {
if p2pToken == "" {
return nil, fmt.Errorf("p2pToken must not be empty")
}
if listenAddr == "" {
return nil, fmt.Errorf("listenAddr must not be empty")
}
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
}, nil
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,127 @@ | ||||||||||||||||||||||||||||
| //go:build p2p | ||||||||||||||||||||||||||||
| // +build p2p | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| package p2p | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||
| "io" | ||||||||||||||||||||||||||||
| "net" | ||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| "github.com/rs/zerolog/log" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| "math/rand/v2" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| "github.com/mudler/edgevpn/pkg/node" | ||||||||||||||||||||||||||||
| "github.com/mudler/edgevpn/pkg/protocol" | ||||||||||||||||||||||||||||
| "github.com/mudler/edgevpn/pkg/types" | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func (fs *FederatedServer) Start(ctx context.Context) error { | ||||||||||||||||||||||||||||
| n, err := NewNode(fs.p2ptoken) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| return fmt.Errorf("creating a new node: %w", err) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| err = n.Start(ctx) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| return fmt.Errorf("starting a new node: %w", err) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| if err := ServiceDiscoverer(ctx, n, fs.p2ptoken, FederatedID, nil); err != nil { | ||||||||||||||||||||||||||||
|
Comment on lines
+28
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| return fs.proxy(ctx, n) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr) | ||||||||||||||||||||||||||||
| // Open local port for listening | ||||||||||||||||||||||||||||
| l, err := net.Listen("tcp", fs.listenAddr) | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| log.Error().Err(err).Msg("Error listening") | ||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| // ll.Info("Binding local port on", srcaddr) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| ledger, _ := node.Ledger() | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error returned by Suggested fix ledger, err := node.Ledger()
if err != nil {
log.Error().Err(err).Msg("Error getting ledger")
return err
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error from Suggested fix ledger, err := node.Ledger()
if err != nil {
log.Error().Err(err).Msg("Error getting ledger")
return err
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error return from Suggested fix ledger, err := node.Ledger()
if err != nil {
return fmt.Errorf("getting ledger: %w", err)
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Announce ourselves so nodes accepts our connection | ||||||||||||||||||||||||||||
| ledger.Announce( | ||||||||||||||||||||||||||||
| ctx, | ||||||||||||||||||||||||||||
| 10*time.Second, | ||||||||||||||||||||||||||||
| func() { | ||||||||||||||||||||||||||||
| // Retrieve current ID for ip in the blockchain | ||||||||||||||||||||||||||||
| //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) | ||||||||||||||||||||||||||||
| // If mismatch, update the blockchain | ||||||||||||||||||||||||||||
| //if !found { | ||||||||||||||||||||||||||||
| updatedMap := map[string]interface{}{} | ||||||||||||||||||||||||||||
| updatedMap[node.Host().ID().String()] = &types.User{ | ||||||||||||||||||||||||||||
| PeerID: node.Host().ID().String(), | ||||||||||||||||||||||||||||
| Timestamp: time.Now().String(), | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| ledger.Add(protocol.UsersLedgerKey, updatedMap) | ||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| defer l.Close() | ||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||
| return errors.New("context canceled") | ||||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||||
| log.Debug().Msg("New for connection") | ||||||||||||||||||||||||||||
| // Listen for an incoming connection. | ||||||||||||||||||||||||||||
| conn, err := l.Accept() | ||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| fmt.Println("Error accepting: ", err.Error()) | ||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Handle connections in a new goroutine, forwarding to the p2p service | ||||||||||||||||||||||||||||
| go handleConn(conn) | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spawning an unbounded goroutine for every accepted connection allows trivial resource exhaustion, so enforce a concurrency limit or backpressure before launching handleConn. Suggested fix select {
case sem <- struct{}{}:
go func() {
defer func() { <-sem }()
handleConn(conn)
}()
default:
log.Error().Msg("Too many concurrent connections")
conn.Close()
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
+73
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Suggested fix go func() {
<-ctx.Done()
l.Close()
}()
for {
conn, err := l.Accept()
if err != nil {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Error().Err(err).Msg("Error accepting")
continue
}
}
go handleConn(conn)
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM:
Comment on lines
+73
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When Suggested fix for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
}
log.Debug().Msg("New for connection")
conn, err := l.Accept()
if err != nil {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
}
log.Error().Err(err).Msg("Error accepting connection")
continue
}
go handleConn(conn)
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM:
Comment on lines
+73
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested fix go func() {
<-ctx.Done()
l.Close()
}()
for {
conn, err := l.Accept()
if err != nil {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Error().Err(err).Msg("Error accepting")
continue
}
}
go handleConn(conn)
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func handleConn(conn net.Conn) { | ||||||||||||||||||||||||||||
| var tunnelAddresses []string | ||||||||||||||||||||||||||||
| for _, v := range GetAvailableNodes(FederatedID) { | ||||||||||||||||||||||||||||
| if v.IsOnline() { | ||||||||||||||||||||||||||||
| tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) | ||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||
| log.Info().Msgf("Node %s is offline", v.ID) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // open a TCP stream to one of the tunnels | ||||||||||||||||||||||||||||
| // chosen randomly | ||||||||||||||||||||||||||||
| // TODO: optimize this and track usage | ||||||||||||||||||||||||||||
| tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If all federated nodes are offline,
Comment on lines
+103
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online tunnel nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online tunnel nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online tunnel nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Selecting a random tunnel without checking for an empty list can panic, so return early when no online federated nodes are available. Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online federated nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Selecting a random tunnel without checking for an empty slice will panic, so return early when no online nodes are available. Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online federated nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Selecting a random tunnel from an empty slice panics, so return early when no online federated nodes are available. Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online federated nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Selecting a tunnel with rand.IntN(len(tunnelAddresses)) panics when no online federated nodes are available, so guard the empty slice and close the client connection cleanly. Suggested fix if len(tunnelAddresses) == 0 {
log.Error().Msg("No online federated nodes available")
conn.Close()
return
}
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| tunnelConn, err := net.Dial("tcp", tunnelAddr) | ||||||||||||||||||||||||||||
|
Comment on lines
+97
to
+108
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested fix // Validate that tunnelAddr resolves to an expected loopback/VPN range before dialing.
// Example: parse host and verify it is within the allowed CIDR before appending.
host, _, err := net.SplitHostPort(v.TunnelAddress)
if err != nil {
log.Warn().Msgf("Skipping invalid tunnel address %s", v.TunnelAddress)
continue
}
ip := net.ParseIP(host)
if ip == nil || !allowedCIDR.Contains(ip) {
log.Warn().Msgf("Tunnel address %s outside allowed range, skipping", v.TunnelAddress)
continue
}
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||
| log.Error().Err(err).Msg("Error connecting to tunnel") | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
Comment on lines
+108
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The accepted client connection is left open on dial failure, so close it before returning. Suggested fix tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
conn.Close()
return
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM:
Comment on lines
+108
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The accepted client connection is leaked when net.Dial fails, so close conn before returning on this error path. Suggested fix tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
conn.Close()
return
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM:
Comment on lines
+108
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The accepted client connection is leaked on dial failure, so close conn before returning. Suggested fix tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
conn.Close()
return
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM:
Comment on lines
+108
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The accepted client connection is leaked on tunnel dial failure, so close conn before returning from the error path. Suggested fix tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
conn.Close()
return
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
+108
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When Suggested fix tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
conn.Close()
return
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) | ||||||||||||||||||||||||||||
| closer := make(chan struct{}, 2) | ||||||||||||||||||||||||||||
| go copyStream(closer, tunnelConn, conn) | ||||||||||||||||||||||||||||
| go copyStream(closer, conn, tunnelConn) | ||||||||||||||||||||||||||||
| <-closer | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| tunnelConn.Close() | ||||||||||||||||||||||||||||
| conn.Close() | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { | ||||||||||||||||||||||||||||
| defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy | ||||||||||||||||||||||||||||
| io.Copy(dst, src) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
+124
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested fixfunc copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
if _, err := io.Copy(dst, src); err != nil {
log.Debug().Err(err).Msg("copyStream error")
}
}Prompt for AI assistanceCopy the prompt below and paste it into ChatGPT, Claude, or any LLM: |
||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
federation.gohandleConn, iftunnelAddressesis empty (no online nodes),rand.IntN(0)panics with a divide-by-zero, crashing the goroutine and potentially the process; add a length check and close the connection before returning.Suggested fix
Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM: