diff --git a/pkg/messaging/point2point.go b/pkg/messaging/point2point.go index 225f090..ca65d51 100644 --- a/pkg/messaging/point2point.go +++ b/pkg/messaging/point2point.go @@ -64,9 +64,10 @@ func (d *natsDirectMessaging) SendToOther(topic string, message []byte) error { } return nil }, - retry.Attempts(3), - retry.Delay(50*time.Millisecond), - retry.DelayType(retry.FixedDelay), + retry.Attempts(10), + retry.Delay(100*time.Millisecond), + retry.MaxDelay(1*time.Second), + retry.DelayType(retry.BackOffDelay), retry.OnRetry(func(n uint, err error) { logger.Error("Failed to send direct message", err, "attempt", n+1, "topic", topic) }), diff --git a/pkg/mpc/session.go b/pkg/mpc/session.go index b1a76b5..5438a2d 100644 --- a/pkg/mpc/session.go +++ b/pkg/mpc/session.go @@ -3,6 +3,7 @@ package mpc import ( "encoding/json" "fmt" + "runtime/debug" "strings" "sync" @@ -218,6 +219,17 @@ func (s *session) receiveBroadcastTssMessage(rawMsg []byte) { // update: the logic of receiving message should be modified func (s *session) receiveTssMessage(msg *types.TssMessage) { + defer func() { + if r := recover(); r != nil { + logger.Error("Panic recovered in receiveTssMessage", + fmt.Errorf("%v", r), + "walletID", s.walletID, + "stack", string(debug.Stack()), + ) + s.ErrCh <- fmt.Errorf("panic in receiveTssMessage: %v", r) + } + }() + toIDs := make([]string, len(msg.To)) for i, id := range msg.To { toIDs[i] = id.String() @@ -285,17 +297,15 @@ func (s *session) subscribeFromPeersAsync(fromIDs []string) { } func (s *session) subscribeBroadcastAsync() { - go func() { - topic := s.topicComposer.ComposeBroadcastTopic() - sub, err := s.pubSub.Subscribe(topic, func(natMsg *nats.Msg) { - s.receiveBroadcastTssMessage(natMsg.Data) - }) - if err != nil { - s.ErrCh <- fmt.Errorf("Failed to subscribe to broadcast topic %s: %w", topic, err) - return - } - s.broadcastSub = sub - }() + topic := s.topicComposer.ComposeBroadcastTopic() + sub, err := s.pubSub.Subscribe(topic, func(natMsg *nats.Msg) { + s.receiveBroadcastTssMessage(natMsg.Data) + }) + if err != nil { + s.ErrCh <- fmt.Errorf("Failed to subscribe to broadcast topic %s: %w", topic, err) + return + } + s.broadcastSub = sub } func (s *session) ListenToIncomingMessageAsync() {