Skip to content

HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72

Open
ldornele wants to merge 7 commits intoopenshift-hyperfleet:mainfrom
ldornele:HYPERFLEET-618
Open

HYPERFLEET-618: add PostgreSQL advisory locks for migration coordination#72
ldornele wants to merge 7 commits intoopenshift-hyperfleet:mainfrom
ldornele:HYPERFLEET-618

Conversation

@ldornele
Copy link

@ldornele ldornele commented Mar 8, 2026

Summary

Adds PostgreSQL advisory locks to coordinate database migrations during rolling deployments, preventing race conditions when multiple pods start simultaneously.

Problem

During rolling deployments (e.g., scaling from 0→3 replicas), each pod's init container attempts to run database migrations concurrently. The current implementation using gormigrate lacks coordination for concurrent execution, leading to:

  • Race conditions between migration attempts
  • Duplicate migration executions causing database errors
  • Failed deployments and pod crashes
  • Inconsistent database state

Solution: PostgreSQL Advisory Locks

We chose PostgreSQL advisory locks over other coordination mechanisms for the following reasons:

Why Advisory Locks?

Zero infrastructure overhead

  • No additional Kubernetes resources (Jobs, ConfigMaps, leader election)
  • No external coordination service required
  • Pure PostgreSQL native functionality

Automatic cleanup

  • Locks automatically released on transaction commit/rollback
  • Pod crashes don't leave stale locks
  • No manual cleanup or timeout management needed

Simple implementation

  • Single pg_advisory_xact_lock(id, type) call
  • Transaction-scoped lock lifecycle
  • Minimal code complexity (~180 lines total)

Battle-tested pattern

  • Used successfully in uhc-account-manager for years
  • PostgreSQL advisory locks are well-documented and stable
  • Proven solution for this exact use case

Why NOT Other Approaches?

Kubernetes Job

  • Requires separate Job definition and lifecycle management
  • Adds complexity to deployment pipeline (pre-install hooks, job cleanup)
  • Need to handle Job failures, retries, and cleanup
  • Tightly couples migration to deployment orchestration

Helm Hooks (pre-upgrade/pre-install)

  • Only works with Helm deployments (limits flexibility)
  • Hooks run sequentially, slowing down deployments
  • Debugging hook failures is more complex
  • Still need locking if hook runs multiple replicas

Leader Election

  • Requires additional RBAC permissions for pods
  • More complex implementation (leader election libraries, health checks)
  • Potential leader churn during network issues
  • Overkill for simple migration coordination

References

Summary by CodeRabbit

  • New Features

    • Migrations now acquire a cluster-wide advisory lock so only one instance runs at a time.
    • Configurable advisory lock timeout (default 5 minutes) via flag/env.
  • Documentation

    • Added Migration Coordination guide describing locking behavior, timeout, and testing.
  • Tests

    • New integration tests for concurrent migrations, lock blocking, cancellation, transactional behavior, and lock ownership.
  • Logging

    • Added lock-related structured log fields (lock id, type, duration).

@openshift-ci openshift-ci bot requested review from aredenba-rh and vkareh March 8, 2026 00:37
@openshift-ci
Copy link

openshift-ci bot commented Mar 8, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign tirthct for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-ci
Copy link

openshift-ci bot commented Mar 8, 2026

Hi @ldornele. Thanks for your PR.

I'm waiting for a openshift-hyperfleet member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work.

Tip

We noticed you've done this a few times! Consider joining the org to skip this step and gain /lgtm and other bot rights. We recommend asking approvers on your previous PRs to sponsor you.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@coderabbitai
Copy link

coderabbitai bot commented Mar 8, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds PostgreSQL advisory lock support and integrates it into migrations. Introduces LockType and AdvisoryLock types, context-scoped lock management via NewAdvisoryLockContext and Unlock, and a public MigrateWithLock(ctx, factory) that acquires/releases an advisory lock around migrations. Adds advisory lock timeout configuration and accessors on session factories, new logging fields for lock metadata, integration tests exercising concurrent locking and migrations, documentation describing the coordination approach, and updates the migrate command to use MigrateWithLock (removing the previous final success log).

Sequence Diagram(s)

sequenceDiagram
    participant Pod as Hyperfleet Pod
    participant Ctx as Context Manager
    participant Lock as Advisory Lock
    participant TX as DB Transaction
    participant Migration as Migration Engine
    participant PG as PostgreSQL

    Pod->>Ctx: call db.MigrateWithLock(ctx, factory)
    Ctx->>Lock: NewAdvisoryLockContext(ctx, factory, id, Migrations)
    Lock->>TX: open session & BEGIN (txid_current())
    Lock->>PG: pg_advisory_xact_lock(hash(id), hash(Migrations)) (blocking)
    PG-->>Lock: lock acquired (or callers wait)
    Lock-->>Ctx: return locked context + lockOwnerID
    Ctx->>Migration: run migrations using locked session
    Migration->>PG: execute migration statements
    PG-->>Migration: statements complete
    Ctx->>Lock: Unlock(ctx, lockOwnerID)
    Lock->>TX: COMMIT (releases advisory lock)
    TX->>PG: COMMIT
    Lock-->>Ctx: lock released
    Ctx-->>Pod: return success / error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 78.95% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: adding PostgreSQL advisory locks for migration coordination.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

You can validate your CodeRabbit configuration file in your editor.

If your editor has YAML language server, you can enable auto-completion and validation by adding # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json at the top of your CodeRabbit configuration file.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
pkg/db/context.go (1)

118-149: Consider documenting the context usage pattern.

The advisoryLockMap is shared by reference when stored in context. The implementation is safe when each goroutine starts with its own context.Background() (as the tests demonstrate), but could lead to subtle map race conditions if callers share a single context across goroutines that concurrently call Unlock.

Consider adding a doc comment clarifying that each concurrent operation should derive from a fresh context.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/db/context.go` around lines 118 - 149, The advisoryLockMap stored in
context is shared by reference which can cause map race conditions if a single
context is reused across goroutines; add a doc comment near Unlock (and/or the
advisoryLockMap type) explaining the usage pattern: callers must not share the
same context with advisory locks across concurrent goroutines and should instead
derive a fresh context (e.g., context.Background() or
context.WithCancel/WithTimeout) per concurrent operation to avoid concurrent
mutation of the map; reference advisoryLockMap and Unlock in the comment so
callers know which API has this constraint.
test/integration/advisory_locks_test.go (1)

288-297: Minor: Consider using error channel instead of t.Errorf in goroutine.

Calling t.Errorf from a goroutine can be fragile if the test function exits before the goroutine completes. While this test correctly synchronizes via channels, using an error channel would make the pattern more robust.

♻️ Alternative pattern using error channel
 	// Track when the second goroutine acquires the lock
 	acquired := make(chan bool, 1)
 	released := make(chan bool, 1)
+	errChan := make(chan error, 1)

 	// Second goroutine tries to acquire the same lock
 	go func() {
 		ctx2, lockOwnerID2, err := db.NewAdvisoryLockContext(context.Background(), helper.DBFactory, "blocking-test", db.Migrations)
 		if err != nil {
-			t.Errorf("Failed to acquire second lock: %v", err)
+			errChan <- err
 			return
 		}
 		defer db.Unlock(ctx2, lockOwnerID2)

+		errChan <- nil
 		acquired <- true
 		<-released // Wait for signal to release
 	}()

Then check errChan after the test completes to report any errors.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/integration/advisory_locks_test.go` around lines 288 - 297, Replace the
goroutine's direct t.Errorf calls with sending errors into an error channel
(e.g., errChan := make(chan error, 1)) so the main test goroutine can assert
them after synchronization; specifically, in the block that calls
db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2) send any non-nil err
into errChan instead of calling t.Errorf, defer db.Unlock(ctx2, lockOwnerID2) as
before, and after the goroutine finishes (after receiving on released) read from
errChan and fail the test with t.Fatal or t.Errorf if an error was reported.
pkg/db/advisory_locks.go (1)

56-66: Consider returning nil on error for defensive clarity.

Returning a non-nil AdvisoryLock alongside an error is an unusual pattern. While the caller (context.go) correctly checks the error before using the lock, this could lead to accidental misuse if future callers forget to check.

♻️ Proposed fix
 	var txid struct{ ID int64 }
 	err := tx.Raw("select txid_current() as id").Scan(&txid).Error
+	if err != nil {
+		_ = tx.Rollback() // Clean up on error
+		return nil, err
+	}
 
 	return &AdvisoryLock{
 		txid:      txid.ID,
 		ownerUUID: ownerUUID,
 		id:        id,
 		lockType:  locktype,
 		g2:        tx,
 		startTime: time.Now(),
 	}, err
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/db/advisory_locks.go` around lines 56 - 66, The function is returning a
non-nil *AdvisoryLock together with err (created via tx.Raw("select
txid_current() as id").Scan(&txid).Error), which is fragile; instead, after
executing tx.Raw(...).Scan(&txid) check if err != nil and immediately return
nil, err; only construct and return &AdvisoryLock{...} (using txid.ID,
ownerUUID, id, locktype, g2: tx, startTime: time.Now()) when err is nil so
callers never receive a non-nil lock on error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@pkg/db/advisory_locks.go`:
- Around line 56-66: The function is returning a non-nil *AdvisoryLock together
with err (created via tx.Raw("select txid_current() as id").Scan(&txid).Error),
which is fragile; instead, after executing tx.Raw(...).Scan(&txid) check if err
!= nil and immediately return nil, err; only construct and return
&AdvisoryLock{...} (using txid.ID, ownerUUID, id, locktype, g2: tx, startTime:
time.Now()) when err is nil so callers never receive a non-nil lock on error.

In `@pkg/db/context.go`:
- Around line 118-149: The advisoryLockMap stored in context is shared by
reference which can cause map race conditions if a single context is reused
across goroutines; add a doc comment near Unlock (and/or the advisoryLockMap
type) explaining the usage pattern: callers must not share the same context with
advisory locks across concurrent goroutines and should instead derive a fresh
context (e.g., context.Background() or context.WithCancel/WithTimeout) per
concurrent operation to avoid concurrent mutation of the map; reference
advisoryLockMap and Unlock in the comment so callers know which API has this
constraint.

In `@test/integration/advisory_locks_test.go`:
- Around line 288-297: Replace the goroutine's direct t.Errorf calls with
sending errors into an error channel (e.g., errChan := make(chan error, 1)) so
the main test goroutine can assert them after synchronization; specifically, in
the block that calls db.NewAdvisoryLockContext (producing ctx2, lockOwnerID2)
send any non-nil err into errChan instead of calling t.Errorf, defer
db.Unlock(ctx2, lockOwnerID2) as before, and after the goroutine finishes (after
receiving on released) read from errChan and fail the test with t.Fatal or
t.Errorf if an error was reported.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a77cb951-fff5-462c-b81b-486f29aee60b

📥 Commits

Reviewing files that changed from the base of the PR and between 97284d9 and d0aa71f.

📒 Files selected for processing (6)
  • cmd/hyperfleet-api/migrate/cmd.go
  • docs/database.md
  • pkg/db/advisory_locks.go
  • pkg/db/context.go
  • pkg/db/migrations.go
  • test/integration/advisory_locks_test.go

locks = make(advisoryLockMap)
}

lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

The two error paths in NewAdvisoryLockContext (lines 97-100 and 104-107) leak the GORM
transaction created inside newAdvisoryLock. When either newAdvisoryLock or lock.lock() fails,
lock.g2 holds an open transaction that is never committed or rolled back.

The cleanest fix is to handle it inside newAdvisoryLock itself — roll back the transaction if
txid_current() fails:

  var txid struct{ ID int64 }
  err := tx.Raw("select txid_current() as id").Scan(&txid).Error
  if err != nil {
      tx.Rollback()
      return nil, err
  }

  return &AdvisoryLock{
      txid:      txid.ID,
      ownerUUID: ownerUUID,
      id:        id,
      lockType:  locktype,
      g2:        tx,
      startTime: time.Now(),
  }, nil

And in NewAdvisoryLockContext, roll back if lock.lock() fails:

err = lock.lock()
if err != nil {
    logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
    lock.g2.Rollback() // clean up the open transaction
    return ctx, lockOwnerID, err
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

logger.With(ctx, "lock_id", lock.id).Warn("lockOwnerID could not be found in AdvisoryLock")
} else if *lock.ownerUUID == callerUUID {
lockID := "<missing>"
lockType := *lock.lockType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

Line 131 dereferences lock.lockType without a nil check, but both lock.id (line 132) and
lock.ownerUUID (line 127) have nil guards. Should be consistent — if these fields are
pointers, guard them all:

  lockID := "<missing>"
  lockTypeStr := LockType("<missing>")
  if lock.lockType != nil {
      lockTypeStr = *lock.lockType
  }
  if lock.id != nil {
      lockID = *lock.id
  }

Minor, but a panic in Unlock during cleanup would be hard to debug in production.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

// MigrateWithLock runs migrations with an advisory lock to prevent concurrent migrations
func MigrateWithLock(ctx context.Context, factory SessionFactory) error {
// Acquire advisory lock for migrations
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Pattern

The lock ID "migrations" on line 30 is a bare string, while the lock type already uses a
constant (Migrations). Since advisory_locks.go already has a constants block for lock types,
consider adding the lock ID there too:

  const (
      // Migrations lock type for database migrations
      Migrations LockType = "Migrations"

      // MigrationsLockID is the advisory lock ID used for migration coordination
      MigrationsLockID = "migrations"
  )

Then in migrations.go:

Suggested change
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, "migrations", Migrations)
ctx, lockOwnerID, err := NewAdvisoryLockContext(ctx, factory, MigrationsLockID, Migrations)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if l.lockType == nil {
return errors.New("AdvisoryLock: lockType is missing")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`pg_advisory_xact_lock` blocks indefinitely — if a pod hangs holding the migration lock (stuck migration, network partition), every other pod's init container blocks forever waiting.

The Helm chart doesn't set `activeDeadlineSeconds` on the pod spec either, so there's no Kubernetes-level safety net.

A couple of options:

  1. Use `pg_try_advisory_xact_lock()` in a retry loop with a configurable timeout
  2. `SET LOCAL statement_timeout = '300s'` on the lock session before acquiring
  3. At minimum, add `activeDeadlineSeconds` to the pod spec as a backstop

I'd lean toward option 2 — it's a one-liner and gives a clean error if the lock can't be acquired within 5 minutes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've implemented option 2 with an enhancement.

We added SET LOCAL statement_timeout before acquiring the lock (see pkg/db/advisory_locks.go:87-90). This prevents indefinite blocking with a clean timeout error.

Additionally, we made the timeout configurable:

  • Default: 300 seconds (5 minutes)
  • Command-line flag: --db-advisory-lock-timeout
  • Environment variable: DB_ADVISORY_LOCK_TIMEOUT
  • Config field: DatabaseConfig.AdvisoryLockTimeoutSeconds

Regarding timeout strategy:

I noticed uhc-account-manager uses a different approach — they set driver-level timeouts via pq-timeouts parameter in the connection string, which applies a global timeout to all database operations (SELECT,
INSERT, UPDATE, advisory locks, etc.).

Our current implementation uses statement-level timeout (SET LOCAL), which only protects the migration lock acquisition. Regular database queries (reads/writes) have no timeout protection.

Question: Should we create a follow-up issue to align with uhc-account-manager's strategy and implement driver-level timeouts for all database operations? This would provide timeout protection across the board, not just for migrations.

The trade-off:

  • Current approach: Simple, no dependencies, targets only migration locks
  • uhc-account-manager approach: Broader protection, but requires pq-timeouts driver support and affects all queries

locks = make(advisoryLockMap)
}

lock, err := newAdvisoryLock(ctx, connection, &lockOwnerID, &id, &lockType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`advisoryLockMap` is a plain Go map stored via `context.WithValue`. The map is retrieved and mutated in place here — since maps are reference types, this mutation is visible to anything holding the same context. Go maps aren't safe for concurrent access and the runtime will panic if two goroutines hit the same map.

Current usage is fine (each init container starts from `context.Background()`), but since this is an exported API, a future caller sharing contexts across goroutines could hit a data race.

Worth either adding a `sync.Mutex` to the map wrapper or a doc comment on `NewAdvisoryLockContext` noting that the returned context must not be shared across goroutines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point! You're right that this could race if contexts are shared across goroutines.

This PR: Added a doc comment warning about concurrent usage (see NewAdvisoryLockContext). Current usage (init containers) is safe since each starts from isolated context.Background().

Follow-up: I'll create a separate Jira issue to harden this with sync.RWMutex + race detector tests. Keeping this PR focused on the migration coordination feature makes sense IMO, but happy to add the mutex here if you prefer.

WDYT?

go acquireLock(helper, &total, &waiter)
}

// Wait for all goroutines to complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test reads and writes `*total` from multiple goroutines without any Go-level synchronization. The advisory lock serializes execution at the DB level, but Go's memory model still requires explicit synchronization (mutex, atomic, channel) for visibility guarantees across goroutines. Running with `-race` would flag this.

Two options:

  1. Add a `sync.Mutex` around the `*total` read/write (quick fix, still proves DB-level serialization)
  2. Move the "work" into the database — read a row, sleep, update the row, verify final value. This would be a true end-to-end proof of lock serialization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've implemented option 2 — moving the work into the database.

Changes made:

Both TestAdvisoryLocksConcurrently and TestAdvisoryLocksWithTransactions now:

  1. Create a temporary counter table with a single row initialized to 0
  2. Each goroutine acquires the advisory lock, reads the current value from the DB, sleeps, then increments and writes back
  3. Verify the final counter value equals the expected total
  4. Clean up the table with defer DROP TABLE

Benefits:

  • ✅ Race detector clean: Tests pass with -race flag (verified)
  • ✅ True end-to-end validation: Actually proves the advisory lock serializes database operations, not just Go memory
  • ✅ More realistic: Mirrors the actual migration use case where the lock protects database state, not in-memory variables
  • ✅ Eliminates Go-level synchronization concerns: No mutex/atomic needed since all shared state is in the database

Test results:
$ make test-integration TESTFLAGS="-race -run 'TestAdvisory|TestConcurrentMigrations'"
✅ PASS test/integration.TestAdvisoryLocksConcurrently (0.27s)
✅ PASS test/integration.TestAdvisoryLocksWithTransactions (0.31s)
✅ PASS test/integration.TestConcurrentMigrations (0.09s)
✅ PASS test/integration.TestAdvisoryLockBlocking (0.10s)

No data races detected. The tests now provide robust proof that advisory locks correctly serialize concurrent database operations.

lockType *LockType
startTime time.Time
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`txid` and `startTime` are set but never read anywhere. If `startTime` is intended for future lock-duration logging, it'd be nice to wire it into `unlock()` now (e.g., log the hold duration). Otherwise these are dead fields and the `txid_current()` query is a wasted round-trip per lock.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! Fixed in latest commit:

Removed:

  • txid field (never used)
  • SELECT txid_current() query (wasted round-trip)

Added:

  • Lock duration logging in unlock() using startTime
  • Field constants (FieldLockID, FieldLockType, FieldLockDurationMs)

Now logs lock_duration_ms on every lock release — useful for monitoring migration performance and detecting contention.

logger.WithError(ctx, err).Error("Failed to acquire advisory lock")
return ctx, lockOwnerID, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`Unlock` returns `context.Context` but every caller discards it (`defer Unlock(ctx, lockOwnerID)`). Since the map is mutated in place via `delete()`, the returned context is the exact same object. The return type suggests immutable context semantics but doesn't deliver them.

Simplest fix: drop the return value to match the `defer` usage pattern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've dropped the return value from Unlock to match the actual usage pattern and semantics.

}

// Track when the second goroutine acquires the lock
acquired := make(chan bool, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 100ms sleep here assumes the second goroutine has reached the `pg_advisory_xact_lock` call by that point. In CI with slow testcontainers under load, the goroutine might not have even started the lock call yet — the test would still pass but wouldn't actually be testing blocking behavior.

A more reliable approach: query `pg_locks` to confirm the second connection is actively waiting on the advisory lock before checking the `acquired` channel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've replaced the 100ms sleep with active polling of pg_locks to verify the second goroutine is actually waiting on the advisory lock before proceeding with the test.

case <-time.After(5 * time.Second):
t.Error("Second goroutine did not acquire lock after first was released")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of edge cases worth covering:

  1. Context cancellation while waiting — the current impl passes `ctx` to `connection.New(ctx)` but not to the blocking `pg_advisory_xact_lock` call. So context cancellation has no effect on a waiting lock. Even if we don't fix that now, a test documenting this behavior would be useful.

  2. Migration failure under lock — `TestConcurrentMigrations` only tests the happy path. A test that injects a migration error and verifies the lock is released (so other waiters proceed) would strengthen confidence in the defer cleanup.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've added both edge case tests to strengthen test coverage.

  1. TestAdvisoryLockContextCancellation

Tests context cancellation behavior while waiting for an advisory lock.

Key finding: Context cancellation DOES work properly! When a goroutine is blocked waiting on pg_advisory_xact_lock and its context is cancelled, it receives a "pq: canceling statement due to user request"
error and exits gracefully.

Test flow:

  • First goroutine acquires the lock
  • Second goroutine tries to acquire with a cancellable context
  • Verify second goroutine is actively waiting (via pg_locks)
  • Cancel the context
  • Verify second goroutine exits with cancellation error (not stuck indefinitely)
  1. TestMigrationFailureUnderLock

Tests lock cleanup when migrations fail, ensuring other waiters can proceed.

Test flow:

  • First goroutine acquires lock and runs a failing migration
  • Verify the lock is released via defer even though migration failed
  • Second goroutine successfully acquires the same lock afterward
  • Validates the cleanup path works correctly

Test results:
$ make test-integration TESTFLAGS="-run 'TestAdvisory|TestConcurrentMigrations'"
✅ PASS test/integration.TestAdvisoryLocksConcurrently (0.22s)
✅ PASS test/integration.TestAdvisoryLocksWithTransactions (0.25s)
✅ PASS test/integration.TestConcurrentMigrations (0.04s)
✅ PASS test/integration.TestAdvisoryLockBlocking (0.10s)
✅ PASS test/integration.TestAdvisoryLockContextCancellation (0.00s)
✅ PASS test/integration.TestMigrationFailureUnderLock (0.10s)

DONE 6 tests in 3.263s

These tests now cover both happy and error paths, giving confidence that the lock mechanism is robust even under failure conditions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
test/integration/advisory_locks_test.go (1)

483-492: ⚠️ Potential issue | 🟠 Major

This test never exercises MigrateWithLock’s failure cleanup path.

Both goroutines manually acquire and release advisory locks, so a regression in db.MigrateWithLock’s defer/unlock logic would still pass here. If the goal is to validate migration-failure cleanup, the failing path needs to be driven through MigrateWithLock itself.

Also applies to: 508-513

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/integration/advisory_locks_test.go` around lines 483 - 492, The test
currently grabs advisory locks directly using
db.NewAdvisoryLockContext/db.Unlock and then calls failingMigration, so it never
exercises the failure cleanup in db.MigrateWithLock; change the test to drive
the failing path through db.MigrateWithLock instead of manually acquiring locks:
invoke db.MigrateWithLock (passing the same migration name/context and a
migration function that calls failingMigration(helper.DBFactory.New(ctx))) so
that MigrateWithLock's defer/unlock logic runs on error and the test can assert
that locks are released; update both locations (the current block and the
similar block at lines 508-513) to use db.MigrateWithLock rather than manual
lock management.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/db/context.go`:
- Around line 145-150: The code currently calls delete(locks, k) regardless of
unlock outcome, which drops the only handle if lock.unlock(ctx) fails; change
the control flow so the bookkeeping entry in the locks map is removed only when
unlock succeeds — i.e., move or restrict delete(locks, k) to the success branch
(the else after lock.unlock) and keep the map entry on the error path so callers
can retry or cleanup; ensure you reference the lock.unlock call, the locks map,
and the key k when making this change.

In `@test/integration/advisory_locks_test.go`:
- Around line 3-15: Replace the ad-hoc test setup that calls test.NewHelper(t)
and raw testing assertions with the standard integration harness: call
test.RegisterIntegration(t) at the top of the test file and use Gomega's
RegisterTestingT(t) to enable Gomega assertions; update imports to include
"github.com/onsi/gomega" (or use . "github.com/onsi/gomega" if desired) and
change any plain testing asserts to Expect(...) style; locate and update the
top-level test setup and any functions referencing NewHelper, testing.T
assertions, or plain t.Fatalf/t.Errorf so they instead rely on the registered
integration context and Gomega's Expect API.
- Around line 411-417: The test currently treats any non-nil error from
db.NewAdvisoryLockContext as the expected cancellation; update the goroutine to
only send on gotCancelError when the error is a cancellation-type error: check
for context.Canceled or context.DeadlineExceeded (compare to context.Canceled /
context.DeadlineExceeded), and also accept the DB driver cancellation message
(e.g., error text contains "canceling statement due to user request") before
signaling success; leave other errors to fail the test (t.Fatalf or t.Errorf) so
unrelated DB failures don't incorrectly pass. Ensure you reference the existing
call to NewAdvisoryLockContext (ctx2, helper.DBFactory, "cancel-test",
db.Migrations) and the gotCancelError channel when making the conditional.
- Around line 195-198: After acquiring the lock via db.NewAdvisoryLockContext
(ctx, lockOwnerID, err := ...), add an immediate deferred cleanup to release the
transaction-scoped lock so a test failure can’t leak it; e.g. right after the
successful return from db.NewAdvisoryLockContext add a defer that calls the
corresponding release function with the same ctx and lockOwnerID (mirror the
usage pattern used elsewhere for releasing advisory locks) to ensure the primary
lock is always released on test exit; apply the same change to the other
acquisition site used in TestAdvisoryLockBlocking.

---

Duplicate comments:
In `@test/integration/advisory_locks_test.go`:
- Around line 483-492: The test currently grabs advisory locks directly using
db.NewAdvisoryLockContext/db.Unlock and then calls failingMigration, so it never
exercises the failure cleanup in db.MigrateWithLock; change the test to drive
the failing path through db.MigrateWithLock instead of manually acquiring locks:
invoke db.MigrateWithLock (passing the same migration name/context and a
migration function that calls failingMigration(helper.DBFactory.New(ctx))) so
that MigrateWithLock's defer/unlock logic runs on error and the test can assert
that locks are released; update both locations (the current block and the
similar block at lines 508-513) to use db.MigrateWithLock rather than manual
lock management.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 388900b1-3b1d-42a0-9396-a83ccb4a26f6

📥 Commits

Reviewing files that changed from the base of the PR and between d0aa71f and eda930a.

📒 Files selected for processing (12)
  • docs/database.md
  • pkg/config/db.go
  • pkg/db/advisory_locks.go
  • pkg/db/context.go
  • pkg/db/db_session/default.go
  • pkg/db/db_session/test.go
  • pkg/db/db_session/testcontainer.go
  • pkg/db/migrations.go
  • pkg/db/mocks/session_factory.go
  • pkg/db/session.go
  • pkg/logger/fields.go
  • test/integration/advisory_locks_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • pkg/db/advisory_locks.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
test/integration/advisory_locks_test.go (2)

243-243: Variable errors shadows the imported package.

Naming this slice errors shadows the errors package imported on line 5. While the scope is limited to this function, it could cause confusion or compile errors if someone later adds errors.Is() or errors.As() calls.

Suggested rename
-	errors := make([]error, 0)
+	errs := make([]error, 0)

And update the append on line 257:

-			errors = append(errors, err)
+			errs = append(errs, err)

And the assertion on line 267:

-	Expect(errors).To(BeEmpty(), "Expected no errors during concurrent migrations")
+	Expect(errs).To(BeEmpty(), "Expected no errors during concurrent migrations")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/integration/advisory_locks_test.go` at line 243, The local slice named
`errors` shadows the imported `errors` package; rename the variable (e.g.,
`errs` or `gotErrors`) wherever it is declared (the current `errors :=
make([]error, 0)`), update the subsequent append call that pushes into this
slice, and update the final assertion that inspects the slice to use the new
name so code can still call `errors.Is`/`errors.As` without conflict. Ensure all
references in this test function are renamed consistently.

445-446: Consider using a channel for synchronization instead of sleep.

The 100ms sleep assumes the first goroutine will acquire, fail, and release the lock within that window. While the test logic is resilient to ordering variations (both outcomes are valid), using a channel to signal when the first goroutine has acquired the lock would make the test more deterministic and faster.

Optional improvement for deterministic ordering
+	lockAcquired := make(chan struct{})
+
 	// First goroutine: acquire lock and fail migration
 	wg.Add(1)
 	go func() {
 		defer wg.Done()

 		ctx := context.Background()
 		ctx, lockOwnerID, err := db.NewAdvisoryLockContext(ctx, h.DBFactory, "migration-fail-test", db.Migrations)
 		Expect(err).NotTo(HaveOccurred(), "Failed to acquire lock")
 		defer db.Unlock(ctx, lockOwnerID)

+		close(lockAcquired) // Signal that lock is held
+
 		// Simulate migration failure
 		if err := failingMigration(h.DBFactory.New(ctx)); err != nil {
 			mu.Lock()
 			failureCount++
 			mu.Unlock()
 		}
 		// Lock should be released via defer even though migration failed
 	}()

-	// Give first goroutine time to acquire lock and fail
-	time.Sleep(100 * time.Millisecond)
+	// Wait for first goroutine to acquire lock before starting second
+	<-lockAcquired
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/integration/advisory_locks_test.go` around lines 445 - 446, Replace the
brittle time.Sleep(100 * time.Millisecond) synchronization with a channel
signal: create a startedCh (chan struct{}) that the first goroutine closes or
sends to immediately after it successfully acquires the advisory lock, and have
the main test (or the second goroutine) block on <-startedCh before proceeding;
remove the sleep and use this startedCh to guarantee the first goroutine has the
lock before continuing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@test/integration/advisory_locks_test.go`:
- Line 243: The local slice named `errors` shadows the imported `errors`
package; rename the variable (e.g., `errs` or `gotErrors`) wherever it is
declared (the current `errors := make([]error, 0)`), update the subsequent
append call that pushes into this slice, and update the final assertion that
inspects the slice to use the new name so code can still call
`errors.Is`/`errors.As` without conflict. Ensure all references in this test
function are renamed consistently.
- Around line 445-446: Replace the brittle time.Sleep(100 * time.Millisecond)
synchronization with a channel signal: create a startedCh (chan struct{}) that
the first goroutine closes or sends to immediately after it successfully
acquires the advisory lock, and have the main test (or the second goroutine)
block on <-startedCh before proceeding; remove the sleep and use this startedCh
to guarantee the first goroutine has the lock before continuing.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ecee3479-c6bc-4e5c-a1d1-51d3be0e9d2e

📥 Commits

Reviewing files that changed from the base of the PR and between eda930a and 64c865a.

📒 Files selected for processing (2)
  • pkg/db/context.go
  • test/integration/advisory_locks_test.go

@rafabene
Copy link
Contributor

/retest

// It should have 1 lock
g2 := h.DBFactory.New(ctx)
var pgLocks []struct{ Granted bool }
g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Bug

The g2.Raw(...).Scan(&pgLocks) calls on lines

  • 177,
  • 188,
  • 196,
  • 207,
  • 214,
  • and 222

don't check .Error. If the query fails, pgLocks stays empty and the length
assertion could pass or fail for the wrong reason.

The same pattern is already done correctly in TestAdvisoryLockBlocking (line
306-307). Apply it consistently:

Suggested change
g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks)
err = g2.Raw("select granted from pg_locks WHERE locktype = 'advisory' and granted = true").Scan(&pgLocks).Error
Expect(err).NotTo(HaveOccurred(), "Failed to query pg_locks")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

func (m *MockSessionFactory) GetAdvisoryLockTimeout() int {
return 300 // 5 minutes default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAdvisoryLockTimeout() returns a hardcoded 300 here, duplicating the default
from NewDatabaseConfig(). The other implementations (Default, Test,
Testcontainer) all read from f.config.AdvisoryLockTimeoutSeconds, so they stay
in sync -- but the mock won't if the default changes.

Consider either:

  1. Adding a config field to the mock and initializing it from
    NewDatabaseConfig():
  func (m *MockSessionFactory) GetAdvisoryLockTimeout() int {
      return config.NewDatabaseConfig().AdvisoryLockTimeoutSeconds
  }
  1. Or extracting a package-level constant (DefaultAdvisoryLockTimeoutSeconds = 300) and referencing it in both places.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. NewDatabaseConfig() has been added.

"Maximum open DB connections for this instance",
)

fs.IntVar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Pattern

The new --db-advisory-lock-timeout flag (line 80) and DB_ADVISORY_LOCK_TIMEOUT
env var binding (line 107) have no unit test coverage. The env var logic
silently ignores invalid values (0, -1, "abc") which could surprise operators.

The repo already has thorough AddFlags/BindEnv tests for LoggingConfig in
logging_test.go. Consider adding equivalent tests for DatabaseConfig covering:
default value, flag override, env var override, flag-over-env precedence, and
invalid env var handling.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've added comprehensive test coverage for DatabaseConfig in pkg/config/db_test.go following the same patterns established in logging_test.go.

Tests added:

  1. ✅ Default value - TestNewDatabaseConfig_Defaults
    - Validates AdvisoryLockTimeoutSeconds = 300 (5 minutes)
    - Validates Debug = false
  2. ✅ Flag override - TestDatabaseConfig_AddFlags
    - Verifies --db-advisory-lock-timeout flag registration
    - Tests flag parsing with custom values (e.g., --db-advisory-lock-timeout=600)
  3. ✅ Env var override - TestDatabaseConfig_BindEnv + TestDatabaseConfig_EnvOverridesDefaults
    - Tests DB_ADVISORY_LOCK_TIMEOUT and DB_DEBUG env var binding
    - Validates env vars override defaults when no flag is set
  4. ✅ Flag-over-env precedence - TestDatabaseConfig_FlagsOverrideEnv + TestDatabaseConfig_PriorityMixed
    - Verifies flags take priority over env vars
    - Tests mixed scenarios (flag for timeout, env for debug)
  5. ✅ Invalid env var handling - TestDatabaseConfig_BindEnv + TestDatabaseConfig_InvalidEnvHandling
    - Tests exactly the cases you mentioned: 0, -1, "abc"
    - Documents silent fallback behavior with t.Logf()
    - Invalid values keep the default (300 seconds)

Coverage highlights:

  • 6 test functions covering all requested scenarios
  • 30+ sub-tests using table-driven approach
  • Follows the exact syntax/semantic patterns from logging_test.go
  • All tests passing ✅

ctx2, cancel := context.WithCancel(context.Background())

// Second goroutine tries to acquire the same lock with cancellable context
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Priority: Improvement

The goroutine at line 359 has no sync.WaitGroup to guarantee it finishes
before the test returns. If gotCancelError is never sent (e.g., the t.Errorf
path fires instead), the test exits after the 2s timeout but the goroutine may
still be blocked inside NewAdvisoryLockContext, leaking the DB transaction.

This is inconsistent with TestConcurrentMigrations and
TestMigrationFailureUnderLock which use sync.WaitGroup. Consider adding one
here too:

  var wg sync.WaitGroup
  wg.Add(1)
  go func() {
      defer wg.Done()
      // ... existing goroutine body ...
  }()
  // ... existing test logic ...
  wg.Wait() // ensure goroutine exits before test cleanup

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The improvement has been added.

Co-authored-by: Rafael Benevides <rafabene@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
test/integration/advisory_locks_test.go (1)

177-222: ⚠️ Potential issue | 🟡 Minor

Check the pg_locks query errors before asserting on the counts.

Lines 177-222 still ignore .Error on several Scan(&pgLocks) calls. If one of those queries fails, the subsequent len(pgLocks) assertion can report the wrong lock state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/integration/advisory_locks_test.go` around lines 177 - 222, The
Raw(...).Scan(&pgLocks) calls ignore the returned error; update each invocation
in this test (the Scan calls using g2 and variable pgLocks around
NewAdvisoryLockContext/Unlock) to capture the result (e.g., res :=
g2.Raw(...).Scan(&pgLocks)) and assert res.Error is nil (or
Expect(res.Error).NotTo(HaveOccurred()) ) before using len(pgLocks), so any DB
query failure fails the test rather than producing an incorrect lock count.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@test/integration/advisory_locks_test.go`:
- Around line 120-123: Replace the non-deterministic random selection (r,
txBeforeLock, txAfterLock) with explicit, deterministic cases: define a small
table or three t.Run subtests named "tx_before_lock", "tx_after_lock", and
"no_tx" and set txBeforeLock/txAfterLock accordingly for each case; then execute
the same test logic for each case so the three transaction-order paths are
reliably and reproducibly exercised (locate the variables r, txBeforeLock,
txAfterLock in advisory_locks_test.go and replace the rand.Intn branch with the
table/subtest loop).
- Around line 423-457: The test currently acquires/releases the advisory lock
directly (using NewAdvisoryLockContext/Unlock) and uses a no-op failingMigration
plus a fixed sleep, so it doesn't exercise the production path; change the test
to call db.MigrateWithLock (passing the failingMigration that returns an error
and optionally blocks until signaled) instead of manually using
NewAdvisoryLockContext/Unlock, so the codepath that must defer/unlock on error
is exercised; coordinate the two goroutines with channels (or WaitGroup) rather
than a fixed sleep so the second goroutine attempts to MigrateWithLock while the
first has the lock and failed, and assert that the lock is released and the
second migration acquires it and proceeds.

---

Duplicate comments:
In `@test/integration/advisory_locks_test.go`:
- Around line 177-222: The Raw(...).Scan(&pgLocks) calls ignore the returned
error; update each invocation in this test (the Scan calls using g2 and variable
pgLocks around NewAdvisoryLockContext/Unlock) to capture the result (e.g., res
:= g2.Raw(...).Scan(&pgLocks)) and assert res.Error is nil (or
Expect(res.Error).NotTo(HaveOccurred()) ) before using len(pgLocks), so any DB
query failure fails the test rather than producing an incorrect lock count.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e5db4fdd-27f1-4e95-adec-e81aee72b173

📥 Commits

Reviewing files that changed from the base of the PR and between 6b019f1 and 601e80a.

📒 Files selected for processing (1)
  • test/integration/advisory_locks_test.go

@ciaranRoche
Copy link
Contributor

/retest

@openshift-ci
Copy link

openshift-ci bot commented Mar 12, 2026

@ldornele: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
ci/prow/lint a951454 link true /test lint
ci/prow/presubmits-integration a951454 link true /test presubmits-integration

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

…ch, remove unused parameters, mark intentionally ignored errors with underscore
@ldornele
Copy link
Author

/retest

@openshift-ci
Copy link

openshift-ci bot commented Mar 12, 2026

@ldornele: Cannot trigger testing until a trusted user reviews the PR and leaves an /ok-to-test message.

Details

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@rafabene
Copy link
Contributor

/ok-to-test

@openshift-merge-robot
Copy link

PR needs rebase.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants