HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72
HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72ldornele wants to merge 7 commits intoopenshift-hyperfleet:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @ldornele. Thanks for your PR. I'm waiting for a openshift-hyperfleet member to verify that this patch is reasonable to test. If it is, they should reply with Tip We noticed you've done this a few times! Consider joining the org to skip this step and gain Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds PostgreSQL advisory lock support and integrates it into migrations. Introduces LockType and AdvisoryLock types, context-scoped lock management via NewAdvisoryLockContext and Unlock, and a public MigrateWithLock(ctx, factory) that acquires/releases an advisory lock around migrations. Adds advisory lock timeout configuration and accessors on session factories, new logging fields for lock metadata, integration tests exercising concurrent locking and migrations, documentation describing the coordination approach, and updates the migrate command to use MigrateWithLock (removing the previous final success log). Sequence Diagram(s)sequenceDiagram
participant Pod as Hyperfleet Pod
participant Ctx as Context Manager
participant Lock as Advisory Lock
participant TX as DB Transaction
participant Migration as Migration Engine
participant PG as PostgreSQL
Pod->>Ctx: call db.MigrateWithLock(ctx, factory)
Ctx->>Lock: NewAdvisoryLockContext(ctx, factory, id, Migrations)
Lock->>TX: open session & BEGIN (txid_current())
Lock->>PG: pg_advisory_xact_lock(hash(id), hash(Migrations)) (blocking)
PG-->>Lock: lock acquired (or callers wait)
Lock-->>Ctx: return locked context + lockOwnerID
Ctx->>Migration: run migrations using locked session
Migration->>PG: execute migration statements
PG-->>Migration: statements complete
Ctx->>Lock: Unlock(ctx, lockOwnerID)
Lock->>TX: COMMIT (releases advisory lock)
TX->>PG: COMMIT
Lock-->>Ctx: lock released
Ctx-->>Pod: return success / error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment Tip You can validate your CodeRabbit configuration file in your editor.If your editor has YAML language server, you can enable auto-completion and validation by adding |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
pkg/db/context.go (1)
118-149: Consider documenting the context usage pattern.The
advisoryLockMapis shared by reference when stored in context. The implementation is safe when each goroutine starts with its owncontext.Background()(as the tests demonstrate), but could lead to subtle map race conditions if callers share a single context across goroutines that concurrently callUnlock.Consider adding a doc comment clarifying that each concurrent operation should derive from a fresh context.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/db/context.go` around lines 118 - 149, The advisoryLockMap stored in context is shared by reference which can cause map race conditions if a single context is reused across goroutines; add a doc comment near Unlock (and/or the advisoryLockMap type) explaining the usage pattern: callers must not share the same context with advisory locks across concurrent goroutines and should instead derive a fresh context (e.g., context.Background() or context.WithCancel/WithTimeout) per concurrent operation to avoid concurrent mutation of the map; reference advisoryLockMap and Unlock in the comment so callers know which API has this constraint.test/integration/advisory_locks_test.go (1)
288-297: Minor: Consider using error channel instead of t.Errorf in goroutine.Calling
t.Errorffrom a goroutine can be fragile if the test function exits before the goroutine completes. While this test correctly synchronizes via channels, using an error channel would make the pattern more robust.♻️ Alternative pattern using error channel
// Track when the second goroutine acquires the lock acquired := make(chan bool, 1) released := make(chan bool, 1) + errChan := make(chan error, 1) // Second goroutine tries to acquire the same lock go func() { ctx2, lockOwnerID2, err := db.NewAdvisoryLockContext(context.Background(), helper.DBFactory, "blocking-test", db.Migrations) if err != nil { - t.Errorf("Failed to acquire second lock: %v", err) + errChan <- err return } defer db.Unlock(ctx2, lockOwnerID2) + errChan <- nil acquired <- true <-released // Wait for signal to release }()Then check
errChanafter the test completes to report any errors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/integration/advisory_locks_test.go` around lines 288 - 297, Replace the goroutine's direct t.Errorf calls with sending errors into an error channel (e.g., errChan := make(chan error, 1)) so the main test goroutine can assert them after synchronization; specifically, in the block that calls db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2) send any non-nil err into errChan instead of calling t.Errorf, defer db.Unlock(ctx2, lockOwnerID2) as before, and after the goroutine finishes (after receiving on released) read from errChan and fail the test with t.Fatal or t.Errorf if an error was reported.pkg/db/advisory_locks.go (1)
56-66: Consider returning nil on error for defensive clarity.Returning a non-nil
AdvisoryLockalongside an error is an unusual pattern. While the caller (context.go) correctly checks the error before using the lock, this could lead to accidental misuse if future callers forget to check.♻️ Proposed fix
var txid struct{ ID int64 } err := tx.Raw("select txid_current() as id").Scan(&txid).Error + if err != nil { + _ = tx.Rollback() // Clean up on error + return nil, err + } return &AdvisoryLock{ txid: txid.ID, ownerUUID: ownerUUID, id: id, lockType: locktype, g2: tx, startTime: time.Now(), }, err }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/db/advisory_locks.go` around lines 56 - 66, The function is returning a non-nil *AdvisoryLock together with err (created via tx.Raw("select txid_current() as id").Scan(&txid).Error), which is fragile; instead, after executing tx.Raw(...).Scan(&txid) check if err != nil and immediately return nil, err; only construct and return &AdvisoryLock{...} (using txid.ID, ownerUUID, id, locktype, g2: tx, startTime: time.Now()) when err is nil so callers never receive a non-nil lock on error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@pkg/db/advisory_locks.go`:
- Around line 56-66: The function is returning a non-nil *AdvisoryLock together
with err (created via tx.Raw("select txid_current() as id").Scan(&txid).Error),
which is fragile; instead, after executing tx.Raw(...).Scan(&txid) check if err
!= nil and immediately return nil, err; only construct and return
&AdvisoryLock{...} (using txid.ID, ownerUUID, id, locktype, g2: tx, startTime:
time.Now()) when err is nil so callers never receive a non-nil lock on error.
In `@pkg/db/context.go`:
- Around line 118-149: The advisoryLockMap stored in context is shared by
reference which can cause map race conditions if a single context is reused
across goroutines; add a doc comment near Unlock (and/or the advisoryLockMap
type) explaining the usage pattern: callers must not share the same context with
advisory locks across concurrent goroutines and should instead derive a fresh
context (e.g., context.Background() or context.WithCancel/WithTimeout) per
concurrent operation to avoid concurrent mutation of the map; reference
advisoryLockMap and Unlock in the comment so callers know which API has this
constraint.
In `@test/integration/advisory_locks_test.go`:
- Around line 288-297: Replace the goroutine's direct t.Errorf calls with
sending errors into an error channel (e.g., errChan := make(chan error, 1)) so
the main test goroutine can assert them after synchronization; specifically, in
the block that calls db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2)
send any non-nil err into errChan instead of calling t.Errorf, defer
db.Unlock(ctx2, lockOwnerID2) as before, and after the goroutine finishes (after
receiving on released) read from errChan and fail the test with t.Fatal or
t.Errorf if an error was reported.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a77cb951-fff5-462c-b81b-486f29aee60b
📒 Files selected for processing (6)
cmd/hyperfleet-api/migrate/cmd.godocs/database.mdpkg/db/advisory_locks.gopkg/db/context.gopkg/db/migrations.gotest/integration/advisory_locks_test.go
| locks = make(advisoryLockMap) | ||
| } | ||
|
|
||
| lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType) |
There was a problem hiding this comment.
Priority: Bug
The two error paths in NewAdvisoryLockContext (lines 97-100 and 104-107) leak the GORM
transaction created inside newAdvisoryLock. When either newAdvisoryLock or lock.lock() fails,
lock.g2 holds an open transaction that is never committed or rolled back.
The cleanest fix is to handle it inside newAdvisoryLock itself — roll back the transaction if
txid_current() fails:
var txid struct{ ID int64 }
err := tx.Raw("select txid_current() as id").Scan(&txid).Error
if err != nil {
tx.Rollback()
return nil, err
}
return &AdvisoryLock{
txid: txid.ID,
ownerUUID: ownerUUID,
id: id,
lockType: locktype,
g2: tx,
startTime: time.Now(),
}, nilAnd in NewAdvisoryLockContext, roll back if lock.lock() fails:
err = lock.lock()
if err != nil {
logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
lock.g2.Rollback() // clean up the open transaction
return ctx, lockOwnerID, err
}
pkg/db/context.go
Outdated
| logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock") | ||
| } else if *lock.ownerUUID == callerUUID { | ||
| lockID := "<missing>" | ||
| lockType := *lock.lockType |
There was a problem hiding this comment.
Priority: Bug
Line 131 dereferences lock.lockType without a nil check, but both lock.id (line 132) and
lock.ownerUUID (line 127) have nil guards. Should be consistent — if these fields are
pointers, guard them all:
lockID := "<missing>"
lockTypeStr := LockType("<missing>")
if lock.lockType != nil {
lockTypeStr = *lock.lockType
}
if lock.id != nil {
lockID = *lock.id
}Minor, but a panic in Unlock during cleanup would be hard to debug in production.
pkg/db/migrations.go
Outdated
| // MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations | ||
| func MigrateWithLock(ctx context.Context, factory SessionFactory) error { | ||
| // Acquire advisory lock for migrations | ||
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations) |
There was a problem hiding this comment.
Priority: Pattern
The lock ID "migrations" on line 30 is a bare string, while the lock type already uses a
constant (Migrations). Since advisory_locks.go already has a constants block for lock types,
consider adding the lock ID there too:
const (
// Migrations lock type for database migrations
Migrations LockType = "Migrations"
// MigrationsLockID is the advisory lock ID used for migration coordination
MigrationsLockID = "migrations"
)Then in migrations.go:
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations) | |
| ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, MigrationsLockID, Migrations) |
| if l.lockType == nil { | ||
| return errors.New("AdvisoryLock: lockType is missing") | ||
| } | ||
|
|
There was a problem hiding this comment.
`pg_advisory_xact_lock` blocks indefinitely — if a pod hangs holding the migration lock (stuck migration, network partition), every other pod's init container blocks forever waiting.
The Helm chart doesn't set `activeDeadlineSeconds` on the pod spec either, so there's no Kubernetes-level safety net.
A couple of options:
- Use `pg_try_advisory_xact_lock()` in a retry loop with a configurable timeout
- `SET LOCAL statement_timeout = '300s'` on the lock session before acquiring
- At minimum, add `activeDeadlineSeconds` to the pod spec as a backstop
I'd lean toward option 2 — it's a one-liner and gives a clean error if the lock can't be acquired within 5 minutes.
There was a problem hiding this comment.
We've implemented option 2 with an enhancement.
We added SET LOCAL statement_timeout before acquiring the lock (see pkg/db/advisory_locks.go:87-90). This prevents indefinite blocking with a clean timeout error.
Additionally, we made the timeout configurable:
- Default: 300 seconds (5 minutes)
- Command-line flag: --db-advisory-lock-timeout
- Environment variable: DB_ADVISORY_LOCK_TIMEOUT
- Config field: DatabaseConfig.AdvisoryLockTimeoutSeconds
Regarding timeout strategy:
I noticed uhc-account-manager uses a different approach — they set driver-level timeouts via pq-timeouts parameter in the connection string, which applies a global timeout to all database operations (SELECT,
INSERT, UPDATE, advisory locks, etc.).
Our current implementation uses statement-level timeout (SET LOCAL), which only protects the migration lock acquisition. Regular database queries (reads/writes) have no timeout protection.
Question: Should we create a follow-up issue to align with uhc-account-manager's strategy and implement driver-level timeouts for all database operations? This would provide timeout protection across the board, not just for migrations.
The trade-off:
- Current approach: Simple, no dependencies, targets only migration locks
- uhc-account-manager approach: Broader protection, but requires pq-timeouts driver support and affects all queries
| locks = make(advisoryLockMap) | ||
| } | ||
|
|
||
| lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType) |
There was a problem hiding this comment.
`advisoryLockMap` is a plain Go map stored via `context.WithValue`. The map is retrieved and mutated in place here — since maps are reference types, this mutation is visible to anything holding the same context. Go maps aren't safe for concurrent access and the runtime will panic if two goroutines hit the same map.
Current usage is fine (each init container starts from `context.Background()`), but since this is an exported API, a future caller sharing contexts across goroutines could hit a data race.
Worth either adding a `sync.Mutex` to the map wrapper or a doc comment on `NewAdvisoryLockContext` noting that the returned context must not be shared across goroutines.
There was a problem hiding this comment.
Excellent point! You're right that this could race if contexts are shared across goroutines.
This PR: Added a doc comment warning about concurrent usage (see NewAdvisoryLockContext). Current usage (init containers) is safe since each starts from isolated context.Background().
Follow-up: I'll create a separate Jira issue to harden this with sync.RWMutex + race detector tests. Keeping this PR focused on the migration coordination feature makes sense IMO, but happy to add the mutex here if you prefer.
WDYT?
| go acquireLock(helper, &total, &waiter) | ||
| } | ||
|
|
||
| // Wait for all goroutines to complete |
There was a problem hiding this comment.
This test reads and writes `*total` from multiple goroutines without any Go-level synchronization. The advisory lock serializes execution at the DB level, but Go's memory model still requires explicit synchronization (mutex, atomic, channel) for visibility guarantees across goroutines. Running with `-race` would flag this.
Two options:
- Add a `sync.Mutex` around the `*total` read/write (quick fix, still proves DB-level serialization)
- Move the "work" into the database — read a row, sleep, update the row, verify final value. This would be a true end-to-end proof of lock serialization.
There was a problem hiding this comment.
We've implemented option 2 — moving the work into the database.
Changes made:
Both TestAdvisoryLocksConcurrently and TestAdvisoryLocksWithTransactions now:
- Create a temporary counter table with a single row initialized to 0
- Each goroutine acquires the advisory lock, reads the current value from the DB, sleeps, then increments and writes back
- Verify the final counter value equals the expected total
- Clean up the table with defer DROP TABLE
Benefits:
- ✅ Race detector clean: Tests pass with -race flag (verified)
- ✅ True end-to-end validation: Actually proves the advisory lock serializes database operations, not just Go memory
- ✅ More realistic: Mirrors the actual migration use case where the lock protects database state, not in-memory variables
- ✅ Eliminates Go-level synchronization concerns: No mutex/atomic needed since all shared state is in the database
Test results:
$ make test-integration TESTFLAGS="-race -run 'TestAdvisory|TestConcurrentMigrations'"
✅ PASS test/integration.TestAdvisoryLocksConcurrently (0.27s)
✅ PASS test/integration.TestAdvisoryLocksWithTransactions (0.31s)
✅ PASS test/integration.TestConcurrentMigrations (0.09s)
✅ PASS test/integration.TestAdvisoryLockBlocking (0.10s)
No data races detected. The tests now provide robust proof that advisory locks correctly serialize concurrent database operations.
| lockType *LockType | ||
| startTime time.Time | ||
| } | ||
|
|
There was a problem hiding this comment.
`txid` and `startTime` are set but never read anywhere. If `startTime` is intended for future lock-duration logging, it'd be nice to wire it into `unlock()` now (e.g., log the hold duration). Otherwise these are dead fields and the `txid_current()` query is a wasted round-trip per lock.
There was a problem hiding this comment.
Great point! Fixed in latest commit:
Removed:
txidfield (never used)SELECT txid_current()query (wasted round-trip)
Added:
- Lock duration logging in
unlock()usingstartTime - Field constants (
FieldLockID,FieldLockType,FieldLockDurationMs)
Now logs lock_duration_ms on every lock release — useful for monitoring migration performance and detecting contention.
| logger.WithError(ctx, err).Error("Failed to acquire advisory lock") | ||
| return ctx, lockOwnerID, err | ||
| } | ||
|
|
There was a problem hiding this comment.
`Unlock` returns `context.Context` but every caller discards it (`defer Unlock(ctx, lockOwnerID)`). Since the map is mutated in place via `delete()`, the returned context is the exact same object. The return type suggests immutable context semantics but doesn't deliver them.
Simplest fix: drop the return value to match the `defer` usage pattern.
There was a problem hiding this comment.
We've dropped the return value from Unlock to match the actual usage pattern and semantics.
| } | ||
|
|
||
| // Track when the second goroutine acquires the lock | ||
| acquired := make(chan bool, 1) |
There was a problem hiding this comment.
The 100ms sleep here assumes the second goroutine has reached the `pg_advisory_xact_lock` call by that point. In CI with slow testcontainers under load, the goroutine might not have even started the lock call yet — the test would still pass but wouldn't actually be testing blocking behavior.
A more reliable approach: query `pg_locks` to confirm the second connection is actively waiting on the advisory lock before checking the `acquired` channel.
There was a problem hiding this comment.
We've replaced the 100ms sleep with active polling of pg_locks to verify the second goroutine is actually waiting on the advisory lock before proceeding with the test.
| case <-time.After(5 * time.Second): | ||
| t.Error("Second goroutine did not acquire lock after first was released") | ||
| } | ||
| } |
There was a problem hiding this comment.
A couple of edge cases worth covering:
-
Context cancellation while waiting — the current impl passes `ctx` to `connection.New(ctx)` but not to the blocking `pg_advisory_xact_lock` call. So context cancellation has no effect on a waiting lock. Even if we don't fix that now, a test documenting this behavior would be useful.
-
Migration failure under lock — `TestConcurrentMigrations` only tests the happy path. A test that injects a migration error and verifies the lock is released (so other waiters proceed) would strengthen confidence in the defer cleanup.
There was a problem hiding this comment.
We've added both edge case tests to strengthen test coverage.
- TestAdvisoryLockContextCancellation
Tests context cancellation behavior while waiting for an advisory lock.
Key finding: Context cancellation DOES work properly! When a goroutine is blocked waiting on pg_advisory_xact_lock and its context is cancelled, it receives a "pq: canceling statement due to user request"
error and exits gracefully.
Test flow:
- First goroutine acquires the lock
- Second goroutine tries to acquire with a cancellable context
- Verify second goroutine is actively waiting (via pg_locks)
- Cancel the context
- Verify second goroutine exits with cancellation error (not stuck indefinitely)
- TestMigrationFailureUnderLock
Tests lock cleanup when migrations fail, ensuring other waiters can proceed.
Test flow:
- First goroutine acquires lock and runs a failing migration
- Verify the lock is released via defer even though migration failed
- Second goroutine successfully acquires the same lock afterward
- Validates the cleanup path works correctly
Test results:
$ make test-integration TESTFLAGS="-run 'TestAdvisory|TestConcurrentMigrations'"
✅ PASS test/integration.TestAdvisoryLocksConcurrently (0.22s)
✅ PASS test/integration.TestAdvisoryLocksWithTransactions (0.25s)
✅ PASS test/integration.TestConcurrentMigrations (0.04s)
✅ PASS test/integration.TestAdvisoryLockBlocking (0.10s)
✅ PASS test/integration.TestAdvisoryLockContextCancellation (0.00s)
✅ PASS test/integration.TestMigrationFailureUnderLock (0.10s)
DONE 6 tests in 3.263s
These tests now cover both happy and error paths, giving confidence that the lock mechanism is robust even under failure conditions.
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
test/integration/advisory_locks_test.go (1)
483-492:⚠️ Potential issue | 🟠 MajorThis test never exercises
MigrateWithLock’s failure cleanup path.Both goroutines manually acquire and release advisory locks, so a regression in
db.MigrateWithLock’s defer/unlock logic would still pass here. If the goal is to validate migration-failure cleanup, the failing path needs to be driven throughMigrateWithLockitself.Also applies to: 508-513
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/integration/advisory_locks_test.go` around lines 483 - 492, The test currently grabs advisory locks directly using db.NewAdvisoryLockContext/db.Unlock and then calls failingMigration, so it never exercises the failure cleanup in db.MigrateWithLock; change the test to drive the failing path through db.MigrateWithLock instead of manually acquiring locks: invoke db.MigrateWithLock (passing the same migration name/context and a migration function that calls failingMigration(helper.DBFactory.New(ctx))) so that MigrateWithLock's defer/unlock logic runs on error and the test can assert that locks are released; update both locations (the current block and the similar block at lines 508-513) to use db.MigrateWithLock rather than manual lock management.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/db/context.go`:
- Around line 145-150: The code currently calls delete(locks, k) regardless of
unlock outcome, which drops the only handle if lock.unlock(ctx) fails; change
the control flow so the bookkeeping entry in the locks map is removed only when
unlock succeeds — i.e., move or restrict delete(locks, k) to the success branch
(the else after lock.unlock) and keep the map entry on the error path so callers
can retry or cleanup; ensure you reference the lock.unlock call, the locks map,
and the key k when making this change.
In `@test/integration/advisory_locks_test.go`:
- Around line 3-15: Replace the ad-hoc test setup that calls test.NewHelper(t)
and raw testing assertions with the standard integration harness: call
test.RegisterIntegration(t) at the top of the test file and use Gomega's
RegisterTestingT(t) to enable Gomega assertions; update imports to include
"github.com/onsi/gomega" (or use . "github.com/onsi/gomega" if desired) and
change any plain testing asserts to Expect(...) style; locate and update the
top-level test setup and any functions referencing NewHelper, testing.T
assertions, or plain t.Fatalf/t.Errorf so they instead rely on the registered
integration context and Gomega's Expect API.
- Around line 411-417: The test currently treats any non-nil error from
db.NewAdvisoryLockContext as the expected cancellation; update the goroutine to
only send on gotCancelError when the error is a cancellation-type error: check
for context.Canceled or context.DeadlineExceeded (compare to context.Canceled /
context.DeadlineExceeded), and also accept the DB driver cancellation message
(e.g., error text contains "canceling statement due to user request") before
signaling success; leave other errors to fail the test (t.Fatalf or t.Errorf) so
unrelated DB failures don't incorrectly pass. Ensure you reference the existing
call to NewAdvisoryLockContext (ctx2, helper.DBFactory, "cancel-test",
db.Migrations) and the gotCancelError channel when making the conditional.
- Around line 195-198: After acquiring the lock via db.NewAdvisoryLockContext
(ctx, lockOwnerID, err := ...), add an immediate deferred cleanup to release the
transaction-scoped lock so a test failure can’t leak it; e.g. right after the
successful return from db.NewAdvisoryLockContext add a defer that calls the
corresponding release function with the same ctx and lockOwnerID (mirror the
usage pattern used elsewhere for releasing advisory locks) to ensure the primary
lock is always released on test exit; apply the same change to the other
acquisition site used in TestAdvisoryLockBlocking.
---
Duplicate comments:
In `@test/integration/advisory_locks_test.go`:
- Around line 483-492: The test currently grabs advisory locks directly using
db.NewAdvisoryLockContext/db.Unlock and then calls failingMigration, so it never
exercises the failure cleanup in db.MigrateWithLock; change the test to drive
the failing path through db.MigrateWithLock instead of manually acquiring locks:
invoke db.MigrateWithLock (passing the same migration name/context and a
migration function that calls failingMigration(helper.DBFactory.New(ctx))) so
that MigrateWithLock's defer/unlock logic runs on error and the test can assert
that locks are released; update both locations (the current block and the
similar block at lines 508-513) to use db.MigrateWithLock rather than manual
lock management.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 388900b1-3b1d-42a0-9396-a83ccb4a26f6
📒 Files selected for processing (12)
docs/database.mdpkg/config/db.gopkg/db/advisory_locks.gopkg/db/context.gopkg/db/db_session/default.gopkg/db/db_session/test.gopkg/db/db_session/testcontainer.gopkg/db/migrations.gopkg/db/mocks/session_factory.gopkg/db/session.gopkg/logger/fields.gotest/integration/advisory_locks_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/db/advisory_locks.go
There was a problem hiding this comment.
🧹 Nitpick comments (2)
test/integration/advisory_locks_test.go (2)
243-243: Variableerrorsshadows the imported package.Naming this slice
errorsshadows theerrorspackage imported on line 5. While the scope is limited to this function, it could cause confusion or compile errors if someone later addserrors.Is()orerrors.As()calls.Suggested rename
- errors := make([]error, 0) + errs := make([]error, 0)And update the append on line 257:
- errors = append(errors, err) + errs = append(errs, err)And the assertion on line 267:
- Expect(errors).To(BeEmpty(), "Expected no errors during concurrent migrations") + Expect(errs).To(BeEmpty(), "Expected no errors during concurrent migrations")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/integration/advisory_locks_test.go` at line 243, The local slice named `errors` shadows the imported `errors` package; rename the variable (e.g., `errs` or `gotErrors`) wherever it is declared (the current `errors := make([]error, 0)`), update the subsequent append call that pushes into this slice, and update the final assertion that inspects the slice to use the new name so code can still call `errors.Is`/`errors.As` without conflict. Ensure all references in this test function are renamed consistently.
445-446: Consider using a channel for synchronization instead of sleep.The 100ms sleep assumes the first goroutine will acquire, fail, and release the lock within that window. While the test logic is resilient to ordering variations (both outcomes are valid), using a channel to signal when the first goroutine has acquired the lock would make the test more deterministic and faster.
Optional improvement for deterministic ordering
+ lockAcquired := make(chan struct{}) + // First goroutine: acquire lock and fail migration wg.Add(1) go func() { defer wg.Done() ctx := context.Background() ctx, lockOwnerID, err := db.NewAdvisoryLockContext(ctx, h.DBFactory, "migration-fail-test", db.Migrations) Expect(err).NotTo(HaveOccurred(), "Failed to acquire lock") defer db.Unlock(ctx, lockOwnerID) + close(lockAcquired) // Signal that lock is held + // Simulate migration failure if err := failingMigration(h.DBFactory.New(ctx)); err != nil { mu.Lock() failureCount++ mu.Unlock() } // Lock should be released via defer even though migration failed }() - // Give first goroutine time to acquire lock and fail - time.Sleep(100 * time.Millisecond) + // Wait for first goroutine to acquire lock before starting second + <-lockAcquired🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/integration/advisory_locks_test.go` around lines 445 - 446, Replace the brittle time.Sleep(100 * time.Millisecond) synchronization with a channel signal: create a startedCh (chan struct{}) that the first goroutine closes or sends to immediately after it successfully acquires the advisory lock, and have the main test (or the second goroutine) block on <-startedCh before proceeding; remove the sleep and use this startedCh to guarantee the first goroutine has the lock before continuing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@test/integration/advisory_locks_test.go`:
- Line 243: The local slice named `errors` shadows the imported `errors`
package; rename the variable (e.g., `errs` or `gotErrors`) wherever it is
declared (the current `errors := make([]error, 0)`), update the subsequent
append call that pushes into this slice, and update the final assertion that
inspects the slice to use the new name so code can still call
`errors.Is`/`errors.As` without conflict. Ensure all references in this test
function are renamed consistently.
- Around line 445-446: Replace the brittle time.Sleep(100 * time.Millisecond)
synchronization with a channel signal: create a startedCh (chan struct{}) that
the first goroutine closes or sends to immediately after it successfully
acquires the advisory lock, and have the main test (or the second goroutine)
block on <-startedCh before proceeding; remove the sleep and use this startedCh
to guarantee the first goroutine has the lock before continuing.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ecee3479-c6bc-4e5c-a1d1-51d3be0e9d2e
📒 Files selected for processing (2)
pkg/db/context.gotest/integration/advisory_locks_test.go
|
/retest |
| // It should have 1 lock | ||
| g2 := h.DBFactory.New(ctx) | ||
| var pgLocks []struct{ Granted bool } | ||
| g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) |
There was a problem hiding this comment.
Priority: Bug
The g2.Raw(...).Scan(&pgLocks) calls on lines
- 177,
- 188,
- 196,
- 207,
- 214,
- and 222
don't check .Error. If the query fails, pgLocks stays empty and the length
assertion could pass or fail for the wrong reason.
The same pattern is already done correctly in TestAdvisoryLockBlocking (line
306-307). Apply it consistently:
| g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks) | |
| err = g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks).Error | |
| Expect(err).NotTo(HaveOccurred(), "Failed to query pg_locks") |
pkg/db/mocks/session_factory.go
Outdated
| } | ||
|
|
||
| func (m *MockSessionFactory) GetAdvisoryLockTimeout() int { | ||
| return 300 // 5 minutes default |
There was a problem hiding this comment.
GetAdvisoryLockTimeout() returns a hardcoded 300 here, duplicating the default
from NewDatabaseConfig(). The other implementations (Default, Test,
Testcontainer) all read from f.config.AdvisoryLockTimeoutSeconds, so they stay
in sync -- but the mock won't if the default changes.
Consider either:
- Adding a config field to the mock and initializing it from
NewDatabaseConfig():
func (m *MockSessionFactory) GetAdvisoryLockTimeout() int {
return config.NewDatabaseConfig().AdvisoryLockTimeoutSeconds
}- Or extracting a package-level constant (DefaultAdvisoryLockTimeoutSeconds = 300) and referencing it in both places.
There was a problem hiding this comment.
Done. NewDatabaseConfig() has been added.
| "Maximum open DB connections for this instance", | ||
| ) | ||
|
|
||
| fs.IntVar( |
There was a problem hiding this comment.
Priority: Pattern
The new --db-advisory-lock-timeout flag (line 80) and DB_ADVISORY_LOCK_TIMEOUT
env var binding (line 107) have no unit test coverage. The env var logic
silently ignores invalid values (0, -1, "abc") which could surprise operators.
The repo already has thorough AddFlags/BindEnv tests for LoggingConfig in
logging_test.go. Consider adding equivalent tests for DatabaseConfig covering:
default value, flag override, env var override, flag-over-env precedence, and
invalid env var handling.
There was a problem hiding this comment.
We've added comprehensive test coverage for DatabaseConfig in pkg/config/db_test.go following the same patterns established in logging_test.go.
Tests added:
- ✅ Default value - TestNewDatabaseConfig_Defaults
- Validates AdvisoryLockTimeoutSeconds = 300 (5 minutes)
- Validates Debug = false - ✅ Flag override - TestDatabaseConfig_AddFlags
- Verifies --db-advisory-lock-timeout flag registration
- Tests flag parsing with custom values (e.g., --db-advisory-lock-timeout=600) - ✅ Env var override - TestDatabaseConfig_BindEnv + TestDatabaseConfig_EnvOverridesDefaults
- Tests DB_ADVISORY_LOCK_TIMEOUT and DB_DEBUG env var binding
- Validates env vars override defaults when no flag is set - ✅ Flag-over-env precedence - TestDatabaseConfig_FlagsOverrideEnv + TestDatabaseConfig_PriorityMixed
- Verifies flags take priority over env vars
- Tests mixed scenarios (flag for timeout, env for debug) - ✅ Invalid env var handling - TestDatabaseConfig_BindEnv + TestDatabaseConfig_InvalidEnvHandling
- Tests exactly the cases you mentioned: 0, -1, "abc"
- Documents silent fallback behavior with t.Logf()
- Invalid values keep the default (300 seconds)
Coverage highlights:
- 6 test functions covering all requested scenarios
- 30+ sub-tests using table-driven approach
- Follows the exact syntax/semantic patterns from logging_test.go
- All tests passing ✅
| ctx2, cancel := context.WithCancel(context.Background()) | ||
|
|
||
| // Second goroutine tries to acquire the same lock with cancellable context | ||
| go func() { |
There was a problem hiding this comment.
Priority: Improvement
The goroutine at line 359 has no sync.WaitGroup to guarantee it finishes
before the test returns. If gotCancelError is never sent (e.g., the t.Errorf
path fires instead), the test exits after the 2s timeout but the goroutine may
still be blocked inside NewAdvisoryLockContext, leaking the DB transaction.
This is inconsistent with TestConcurrentMigrations and
TestMigrationFailureUnderLock which use sync.WaitGroup. Consider adding one
here too:
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// ... existing goroutine body ...
}()
// ... existing test logic ...
wg.Wait() // ensure goroutine exits before test cleanupThere was a problem hiding this comment.
Done. The improvement has been added.
Co-authored-by: Rafael Benevides <rafabene@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
test/integration/advisory_locks_test.go (1)
177-222:⚠️ Potential issue | 🟡 MinorCheck the
pg_locksquery errors before asserting on the counts.Lines 177-222 still ignore
.Erroron severalScan(&pgLocks)calls. If one of those queries fails, the subsequentlen(pgLocks)assertion can report the wrong lock state.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/integration/advisory_locks_test.go` around lines 177 - 222, The Raw(...).Scan(&pgLocks) calls ignore the returned error; update each invocation in this test (the Scan calls using g2 and variable pgLocks around NewAdvisoryLockContext/Unlock) to capture the result (e.g., res := g2.Raw(...).Scan(&pgLocks)) and assert res.Error is nil (or Expect(res.Error).NotTo(HaveOccurred()) ) before using len(pgLocks), so any DB query failure fails the test rather than producing an incorrect lock count.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@test/integration/advisory_locks_test.go`:
- Around line 120-123: Replace the non-deterministic random selection (r,
txBeforeLock, txAfterLock) with explicit, deterministic cases: define a small
table or three t.Run subtests named "tx_before_lock", "tx_after_lock", and
"no_tx" and set txBeforeLock/txAfterLock accordingly for each case; then execute
the same test logic for each case so the three transaction-order paths are
reliably and reproducibly exercised (locate the variables r, txBeforeLock,
txAfterLock in advisory_locks_test.go and replace the rand.Intn branch with the
table/subtest loop).
- Around line 423-457: The test currently acquires/releases the advisory lock
directly (using NewAdvisoryLockContext/Unlock) and uses a no-op failingMigration
plus a fixed sleep, so it doesn't exercise the production path; change the test
to call db.MigrateWithLock (passing the failingMigration that returns an error
and optionally blocks until signaled) instead of manually using
NewAdvisoryLockContext/Unlock, so the codepath that must defer/unlock on error
is exercised; coordinate the two goroutines with channels (or WaitGroup) rather
than a fixed sleep so the second goroutine attempts to MigrateWithLock while the
first has the lock and failed, and assert that the lock is released and the
second migration acquires it and proceeds.
---
Duplicate comments:
In `@test/integration/advisory_locks_test.go`:
- Around line 177-222: The Raw(...).Scan(&pgLocks) calls ignore the returned
error; update each invocation in this test (the Scan calls using g2 and variable
pgLocks around NewAdvisoryLockContext/Unlock) to capture the result (e.g., res
:= g2.Raw(...).Scan(&pgLocks)) and assert res.Error is nil (or
Expect(res.Error).NotTo(HaveOccurred()) ) before using len(pgLocks), so any DB
query failure fails the test rather than producing an incorrect lock count.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e5db4fdd-27f1-4e95-adec-e81aee72b173
📒 Files selected for processing (1)
test/integration/advisory_locks_test.go
…dvisory lock tests
|
/retest |
|
@ldornele: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
…ch, remove unused parameters, mark intentionally ignored errors with underscore
|
/retest |
|
@ldornele: Cannot trigger testing until a trusted user reviews the PR and leaves an DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/ok-to-test |
|
PR needs rebase. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Summary
Adds PostgreSQL advisory locks to coordinate database migrations during rolling deployments, preventing race conditions when multiple pods start simultaneously.
Problem
During rolling deployments (e.g., scaling from 0→3 replicas), each pod's init container attempts to run database migrations concurrently. The current implementation using gormigrate lacks coordination for concurrent execution, leading to:
Solution: PostgreSQL Advisory Locks
We chose PostgreSQL advisory locks over other coordination mechanisms for the following reasons:
Why Advisory Locks?
✅ Zero infrastructure overhead
✅ Automatic cleanup
✅ Simple implementation
pg_advisory_xact_lock(id, type)call✅ Battle-tested pattern
Why NOT Other Approaches?
❌ Kubernetes Job
❌ Helm Hooks (pre-upgrade/pre-install)
❌ Leader Election
References
Summary by CodeRabbit
New Features
Documentation
Tests
Logging