Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,5 @@ func (instance *Instance[T]) Get(userId string, sessionId string) (*Session[T],
}

func (instance *Instance[T]) GetConnections(userId string) int {
sessionList, ok := instance.sessionsCache.sessions.Load(userId)
if !ok {
return 0
}
sessions := sessionList.(*SessionsList)
sessions.mutex.RLock()
sessionIds := sessions.sessions
sessions.mutex.RUnlock()

return len(sessionIds)
return len(instance.GetSessions(userId))
}
14 changes: 3 additions & 11 deletions neogate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,11 @@ var Log = log.New(log.Writer(), "neogate ", log.Flags())
type Instance[T any] struct {
Config Config[T]
connectionsCache *sync.Map // UserId:sessionId -> *Session
sessionsCache SessionCache
sessions *sync.Map // UserId -> SessionsList
adapters *sync.Map // AdapterId -> *Adapter
routes map[string]func(*Context[T]) Event
}

type SessionCache struct {
mutex *sync.Mutex
sessions *sync.Map // UserId -> SessionsList
}

type SessionsList struct {
mutex *sync.RWMutex
sessions []string // session ids
Expand Down Expand Up @@ -83,11 +78,8 @@ func Setup[T any](config Config[T]) *Instance[T] {
Config: config,
adapters: &sync.Map{},
connectionsCache: &sync.Map{},
sessionsCache: SessionCache{
sessions: &sync.Map{},
mutex: &sync.Mutex{},
},
routes: make(map[string]func(*Context[T]) Event),
sessions: &sync.Map{},
routes: make(map[string]func(*Context[T]) Event),
}
return instance
}
Expand Down
14 changes: 2 additions & 12 deletions send.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
package neogate

import (
"errors"

"github.com/bytedance/sonic"
"github.com/gofiber/websocket/v2"
)

// SendEventToUser sends the event to all sessions connected to the userId
func (instance *Instance[T]) SendEventToUser(userId string, event Event) error {

sessionList, ok := instance.sessionsCache.sessions.Load(userId)
if !ok {
return errors.New("no sessions found")
}
sessions := sessionList.(*SessionsList)
sessions.mutex.RLock()
sessionIds := sessions.sessions
sessions.mutex.RUnlock()
sessions := instance.GetSessions(userId)

adapterIds := []string{}
for _, sessionId := range sessionIds {
for _, sessionId := range sessions {
_, sessionAdapterName := instance.Config.SessionAdapterHandler(userId, sessionId)
adapterIds = append(adapterIds, sessionAdapterName)
}
Expand Down
57 changes: 20 additions & 37 deletions sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,22 @@ func (instance *Instance[T]) createSession(session *Session[T]) {
},
})

sessionAny, ok := instance.sessionsCache.sessions.Load(session.GetUserId())
if ok {
sessionList := sessionAny.(*SessionsList)

sessionList.mutex.Lock()
defer sessionList.mutex.Unlock()

if sessionAny, ok = instance.sessionsCache.sessions.Load(session.GetUserId()); ok {
sessionList = sessionAny.(*SessionsList)
sessionList.sessions = append(sessionList.sessions, session.GetSessionId())
instance.sessionsCache.sessions.Store(session.GetUserId(), sessionList)
return
}
sessionList := &SessionsList{
mutex: &sync.RWMutex{},
sessions: []string{session.GetSessionId()},
}
sessionAny, loaded := instance.sessions.LoadOrStore(session.GetUserId(), sessionList)
if loaded {
sessionList = sessionAny.(*SessionsList)
}

instance.sessionsCache.mutex.Lock()

sessionAny, ok = instance.sessionsCache.sessions.Load(session.GetUserId())
if ok {
sessionList := sessionAny.(*SessionsList)
instance.sessionsCache.mutex.Unlock()
sessionList.mutex.Lock()
defer sessionList.mutex.Unlock()

sessionList.Add(session.GetSessionId())
} else {
instance.sessionsCache.sessions.Store(session.GetUserId(), &SessionsList{
mutex: &sync.RWMutex{},
sessions: []string{session.GetSessionId()},
})
instance.sessionsCache.mutex.Unlock()
if loaded {
sessionList.sessions = append(sessionList.sessions, session.GetSessionId())
}
instance.sessions.Store(session.GetUserId(), sessionList) // Store here in case the thing was deleted while the mutex was locked
Comment thread
Unbreathable marked this conversation as resolved.
}

func (instance *Instance[T]) removeSession(userId string, session string) {
Expand All @@ -130,10 +116,7 @@ func (instance *Instance[T]) removeSession(userId string, session string) {
_, sessionAdapter := instance.Config.SessionAdapterHandler(userId, session)
instance.RemoveAdapter(sessionAdapter)

instance.sessionsCache.mutex.Lock()
defer instance.sessionsCache.mutex.Unlock()

sessionAny, ok := instance.sessionsCache.sessions.Load(userId)
sessionAny, ok := instance.sessions.Load(userId)
if !ok {
return
}
Expand All @@ -145,22 +128,22 @@ func (instance *Instance[T]) removeSession(userId string, session string) {
sessionList.sessions = slices.DeleteFunc(sessionList.sessions, func(s string) bool {
return s == session
})
instance.sessionsCache.sessions.Store(userId, sessionList)

if len(sessionList.sessions) == 0 {
instance.sessionsCache.sessions.Delete(userId)
instance.sessions.Delete(userId)
}
Comment thread
Unbreathable marked this conversation as resolved.
}

func (instance *Instance[T]) GetSessions(userId string) []string {
sessionList, ok := instance.sessionsCache.sessions.Load(userId)
sessionListAny, ok := instance.sessions.Load(userId)
if !ok {
return []string{}
}
sessions := sessionList.(*SessionsList)
sessions.mutex.RLock()
sessionIds := sessions.sessions
sessions.mutex.RUnlock()
sessionList := sessionListAny.(*SessionsList)
sessionList.mutex.RLock()
sessionIds := make([]string, len(sessionList.sessions))
copy(sessionIds, sessionList.sessions)
sessionList.mutex.RUnlock()

return sessionIds
}