Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/spf13/cobra v1.10.2
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.42.0
golang.org/x/sys v0.45.0
golang.org/x/time v0.15.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -131,7 +132,6 @@ require (
golang.org/x/net v0.55.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.45.0 // indirect
golang.org/x/telemetry v0.0.0-20260421165255-392afab6f40e // indirect
golang.org/x/term v0.43.0 // indirect
golang.org/x/text v0.37.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions pkg/auth/store/lock_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//go:build !unix || aix

package store

// tryFileLock is a no-op on platforms without flock(2). Process-local
// serialization via refreshMu still applies; cross-process coordination is
// simply unavailable, matching the historical behaviour.
func tryFileLock(_ string) (release func(), acquired bool, err error) {
return func() {}, true, nil
}
269 changes: 269 additions & 0 deletions pkg/auth/store/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
//go:build unix && !aix

package store

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/sirupsen/logrus"

authclient "github.com/ethpandaops/panda/pkg/auth/client"
)

func TestTryFileLockIsMutuallyExclusive(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "creds.json")

release1, ok1, err1 := tryFileLock(path)
if err1 != nil || !ok1 {
t.Fatalf("first lock should be acquired: ok=%v err=%v", ok1, err1)
}

if _, ok2, _ := tryFileLock(path); ok2 {
t.Fatal("second lock should fail while the first is held")
}

release1()

release3, ok3, err3 := tryFileLock(path)
if err3 != nil || !ok3 {
t.Fatalf("lock should be acquirable after release: ok=%v err=%v", ok3, err3)
}
release3()
}

// TestSaveAndClearFailClosedWhileLockHeld verifies that an interactive write
// refuses to proceed (rather than clobber a rotation) while another process
// holds the credentials lock, and succeeds once it is released.
func TestSaveAndClearFailClosedWhileLockHeld(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "creds.json")
st := New(logrus.New(), Config{
Path: path,
WriteLockWait: 100 * time.Millisecond,
}).(*store)

// Simulate a refresh in another process holding the lock.
release, ok, err := tryFileLock(path)
if err != nil || !ok {
t.Fatalf("failed to take lock: ok=%v err=%v", ok, err)
}

tokens := &authclient.Tokens{
AccessToken: "access",
RefreshToken: "refresh",
ExpiresAt: time.Now().Add(time.Hour),
}

if err := st.Save(tokens); !errors.Is(err, ErrCredentialBusy) {
t.Fatalf("expected ErrCredentialBusy while lock held, got %v", err)
}

if err := st.Clear(); !errors.Is(err, ErrCredentialBusy) {
t.Fatalf("expected ErrCredentialBusy from Clear while lock held, got %v", err)
}

release()

if err := st.Save(tokens); err != nil {
t.Fatalf("Save should succeed after the lock is released: %v", err)
}
}

// TestRefreshReusesTokenRotatedByAnotherProcess verifies the reload-recheck
// step: if another process rotated the on-disk token after we decided to
// refresh but before we acquired the lock, we reuse that token instead of
// refreshing again (which would present a token the other process revoked).
func TestRefreshReusesTokenRotatedByAnotherProcess(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "creds.json")
client := &countingAuthClient{}
store := New(logrus.New(), Config{
Path: path,
AuthClient: client,
RefreshBuffer: 5 * time.Minute,
}).(*store)

// In-memory token is due for refresh (inside the buffer).
store.tokens = &authclient.Tokens{
AccessToken: "stale",
RefreshToken: "stale-rt",
ExpiresAt: time.Now().Add(time.Minute),
}

// Another process has already written a fresh token to the shared file.
writeTokens(t, path, &authclient.Tokens{
AccessToken: "fresh",
RefreshToken: "fresh-rt",
ExpiresIn: 3600,
ExpiresAt: time.Now().Add(time.Hour),
})

token, err := store.GetAccessToken()
if err != nil {
t.Fatalf("GetAccessToken returned error: %v", err)
}

if token != "fresh" {
t.Fatalf("expected the token written by another process, got %q", token)
}

if client.refreshCalls.Load() != 0 {
t.Fatalf("expected no provider refresh, got %d", client.refreshCalls.Load())
}
}

// TestSharedCredentialRefreshNeverReusesRevokedToken hammers several stores
// that share one credentials file against a provider that rotates and revokes
// like authentik. The file lock must ensure no store ever presents a
// rotated-away token (which would return invalid_grant).
func TestSharedCredentialRefreshNeverReusesRevokedToken(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "creds.json")
client := newRotatingAuthClient("rt-0")
writeTokens(t, path, &authclient.Tokens{
AccessToken: "at-0",
RefreshToken: "rt-0",
ExpiresIn: 3600,
ExpiresAt: time.Now().Add(time.Hour),
RefreshTokenIssuedAt: time.Now(),
})

const (
stores = 4
iterations = 50
)

newSharedStore := func() *store {
return New(logrus.New(), Config{
Path: path,
AuthClient: client,
RefreshBuffer: 5 * time.Minute,
RefreshTokenTTL: time.Nanosecond, // always past half-life: forces a refresh every call
}).(*store)
}

start := make(chan struct{})

var wg sync.WaitGroup

wg.Add(stores)

for range stores {
s := newSharedStore()

go func() {
defer wg.Done()

<-start // release all goroutines together to maximise contention

for range iterations {
token, err := s.GetAccessToken()
if err != nil {
t.Errorf("GetAccessToken returned error: %v", err)

return
}

if token == "" {
t.Error("GetAccessToken returned an empty token")

return
}
}
}()
}

close(start)
wg.Wait()

if got := client.invalidGrants.Load(); got != 0 {
t.Fatalf("expected zero invalid_grant (revoked-token reuse), got %d", got)
}

if got := client.refreshCalls.Load(); got == 0 {
t.Fatal("expected at least one provider refresh")
}

// The shared credential must end in a usable, authenticated state.
final := New(logrus.New(), Config{Path: path, AuthClient: client}).(*store)
if !final.IsAuthenticated() {
t.Fatal("shared credential is not authenticated after concurrent refreshes")
}
}

func writeTokens(t *testing.T, path string, tokens *authclient.Tokens) {
t.Helper()

data, err := json.MarshalIndent(tokens, "", " ")
if err != nil {
t.Fatalf("marshaling tokens: %v", err)
}

if err := os.WriteFile(path, data, 0o600); err != nil {
t.Fatalf("writing tokens: %v", err)
}
}

// rotatingAuthClient models authentik: each refresh issues a new refresh token
// and revokes the presented one. Presenting a revoked token returns
// invalid_grant.
type rotatingAuthClient struct {
mu sync.Mutex
counter int
valid map[string]struct{}
refreshCalls atomic.Int64
invalidGrants atomic.Int64
}

func newRotatingAuthClient(initial string) *rotatingAuthClient {
return &rotatingAuthClient{valid: map[string]struct{}{initial: {}}}
}

func (c *rotatingAuthClient) Login(_ context.Context) (*authclient.Tokens, error) {
return nil, errors.New("not implemented")
}

func (c *rotatingAuthClient) ClientCredentials(_ context.Context) (*authclient.Tokens, error) {
return nil, errors.New("not implemented")
}

func (c *rotatingAuthClient) Refresh(_ context.Context, refreshToken string) (*authclient.Tokens, error) {
c.refreshCalls.Add(1)

c.mu.Lock()
defer c.mu.Unlock()

if _, ok := c.valid[refreshToken]; !ok {
c.invalidGrants.Add(1)

return nil, fmt.Errorf("token endpoint returned status 400: {\"error\": \"invalid_grant\"}")
}

delete(c.valid, refreshToken)
c.counter++
next := fmt.Sprintf("rt-%d", c.counter)
c.valid[next] = struct{}{}

return &authclient.Tokens{
AccessToken: fmt.Sprintf("at-%d", c.counter),
RefreshToken: next,
TokenType: "Bearer",
ExpiresIn: 3600,
ExpiresAt: time.Now().Add(time.Hour),
RefreshTokenIssuedAt: time.Now(),
}, nil
}
56 changes: 56 additions & 0 deletions pkg/auth/store/lock_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//go:build unix && !aix

package store

import (
"errors"
"fmt"
"os"
"path/filepath"

"golang.org/x/sys/unix"
)

// tryFileLock attempts a non-blocking exclusive lock on path+".lock" so that
// only one process drives a credential refresh at a time when several panda
// processes share one credentials file.
//
// It returns (release, true, nil) when this process holds the lock,
// (nil, false, nil) when another process currently holds it, and
// (no-op release, true, err) when the lock file itself cannot be created — in
// which case the caller proceeds relying on process-local serialization, so a
// single process is never blocked by lock-file errors.
//
// The lock is advisory and released automatically by the OS when the holding
// process exits, so a crashed holder never leaves a stale lock.
func tryFileLock(path string) (release func(), acquired bool, err error) {
lockPath := path + ".lock"

if mkErr := os.MkdirAll(filepath.Dir(lockPath), 0o700); mkErr != nil {
return func() {}, true, fmt.Errorf("creating lock directory: %w", mkErr)
}

f, openErr := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0o600)
if openErr != nil {
return func() {}, true, fmt.Errorf("opening lock file: %w", openErr)
}

if flockErr := unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB); flockErr != nil {
// EWOULDBLOCK/EAGAIN is genuine contention: another process holds the
// lock, so back off and reuse what it writes. Any other flock error is
// unexpected — fail open and refresh without cross-process coordination
// rather than risk being unable to refresh at all.
if errors.Is(flockErr, unix.EWOULDBLOCK) || errors.Is(flockErr, unix.EAGAIN) {
_ = f.Close()

return nil, false, nil
}

return func() { _ = f.Close() }, true, fmt.Errorf("locking credentials file: %w", flockErr)
}

return func() {
_ = unix.Flock(int(f.Fd()), unix.LOCK_UN)
_ = f.Close()
}, true, nil
}
Loading
Loading