Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql

- Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245).
- Ensure jobs that return a custom timeout of -1 (no timeout) are never rescued. [PR #1288](https://github.com/riverqueue/river/pull/1288).
- Detect numbered PostgreSQL `REINDEX INDEX CONCURRENTLY` artifacts like `_ccnew1` and `_ccold2` so the reindexer does not keep accumulating failed artifact indexes. Fixes [#1296](https://github.com/riverqueue/river/issues/1296). [PR #1297](https://github.com/riverqueue/river/pull/1297).

## [0.39.0] - 2026-06-03

Expand Down
29 changes: 17 additions & 12 deletions internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err
// exist before trying to reindex. When using `CONCURRENTLY`, Postgres
// creates a new index suffixed with `_ccnew` before swapping it in as the
// new index. The existing index is renamed `_ccold` before being dropped
// concurrently.
// concurrently. If multiple failed artifacts exist, Postgres may add a
// numeric suffix like `_ccnew1` or `_ccold2` to keep names unique.
//
// If one of these artifacts exists, it probably means that a previous
// reindex attempt timed out, and attempting to reindex again is likely
Expand All @@ -234,16 +235,15 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err
//
// https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY
if !s.skipReindexArtifactCheck {
for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} {
reindexArtifactExists, err := s.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: s.Config.Schema})
if err != nil {
return false, err
}
if reindexArtifactExists {
s.Logger.WarnContext(ctx, s.Name+": Found reindex artifact likely resulting from previous partially completed reindex attempt; skipping reindex",
slog.String("artifact_name", reindexArtifactName), slog.String("index_name", indexName), slog.Duration("timeout", s.Config.Timeout))
return false, nil
}
reindexArtifactNames, err := s.exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{Index: indexName, Schema: s.Config.Schema})
if err != nil {
return false, err
}

if len(reindexArtifactNames) > 0 {
s.Logger.WarnContext(ctx, s.Name+": Found reindex artifact likely resulting from previous partially completed reindex attempt; skipping reindex",
slog.Any("artifact_names", reindexArtifactNames), slog.String("index_name", indexName), slog.Duration("timeout", s.Config.Timeout))
return false, nil
}
}

Expand All @@ -268,7 +268,12 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err

s.Logger.InfoContext(ctx, s.Name+": Signaled to stop during index build; attempting to clean up concurrent artifacts")

for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} {
reindexArtifactNames, err := s.exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{Index: indexName, Schema: s.Config.Schema})
if err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error listing reindex artifacts", slog.String("error", err.Error()))
}

for _, reindexArtifactName := range reindexArtifactNames {
if err := s.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: reindexArtifactName, Schema: s.Config.Schema}); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error dropping reindex artifact", slog.String("artifact_name", reindexArtifactName), slog.String("error", err.Error()))
}
Expand Down
64 changes: 44 additions & 20 deletions internal/maintenance/reindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func TestReindexer(t *testing.T) {
schema string
}

setup := func(t *testing.T) (*Reindexer, *testBundle) {
setupWithOpts := func(t *testing.T, testSchemaOpts *riverdbtest.TestSchemaOpts) (*Reindexer, *testBundle) {
t.Helper()

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
schema = riverdbtest.TestSchema(ctx, t, driver, testSchemaOpts)
)

bundle := &testBundle{
Expand Down Expand Up @@ -100,6 +100,12 @@ func TestReindexer(t *testing.T) {
return svc, bundle
}

setup := func(t *testing.T) (*Reindexer, *testBundle) {
t.Helper()

return setupWithOpts(t, nil)
}

runImmediatelyThenOnceAnHour := func() func(time.Time) time.Time {
alreadyRan := false
return func(t time.Time) time.Time {
Expand Down Expand Up @@ -141,7 +147,13 @@ func TestReindexer(t *testing.T) {
t.Run("ReindexSkippedWithReindexArtifact", func(t *testing.T) {
t.Parallel()

svc, bundle := setup(t)
svc, bundle := setupWithOpts(t, &riverdbtest.TestSchemaOpts{DisableReuse: true})

mockExec := newReindexerExecutorMock(bundle.exec)
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
return nil
}
svc.exec = mockExec

requireReindexOne := func(indexName string) bool {
didReindex, err := svc.reindexOne(ctx, indexName)
Expand All @@ -151,20 +163,20 @@ func TestReindexer(t *testing.T) {

indexName := svc.Config.IndexNames[0]

// With a `_ccnew` index in place, the reindexer refuses to run.
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s_ccnew ON %s.river_job (id)", indexName, bundle.schema)))
require.False(t, requireReindexOne(indexName))

// With the index dropped again, reindexing can now occur.
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("DROP INDEX %s.%s_ccnew", bundle.schema, indexName)))
require.True(t, requireReindexOne(indexName))

// `_ccold` also prevents reindexing.
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s_ccold ON %s.river_job (id)", indexName, bundle.schema)))
require.False(t, requireReindexOne(indexName))
for _, artifactSuffix := range []string{"_ccnew", "_ccnew1", "_ccnew31", "_ccold", "_ccold1"} {
artifactName := indexName + artifactSuffix
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", artifactName, bundle.schema)))
require.False(t, requireReindexOne(indexName))
require.NoError(t, bundle.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: artifactName, Schema: bundle.schema}))
}

// And with `_ccold` dropped, reindexing can proceed.
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("DROP INDEX %s.%s_ccold", bundle.schema, indexName)))
for _, artifactSuffix := range []string{"_ccnewa", "_ccnew_1", "_ccoldx", "_ccold_1", "x_ccnew1"} {
artifactName := indexName + artifactSuffix
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", artifactName, bundle.schema)))
t.Cleanup(func() {
require.NoError(t, bundle.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: artifactName, Schema: bundle.schema}))
})
}
require.True(t, requireReindexOne(indexName))
})

Expand Down Expand Up @@ -249,7 +261,7 @@ func TestReindexer(t *testing.T) {
t.Run("ReindexDeletesArtifactsWhenCancelledWithStop", func(t *testing.T) {
t.Parallel()

svc, bundle := setup(t)
svc, bundle := setupWithOpts(t, &riverdbtest.TestSchemaOpts{DisableReuse: true})
svc.skipReindexArtifactCheck = true

requireIndexExists := func(indexName string) bool {
Expand All @@ -259,16 +271,25 @@ func TestReindexer(t *testing.T) {
}

var (
indexName = svc.Config.IndexNames[0]
indexNameNew = indexName + "_ccnew"
indexNameOld = indexName + "_ccold"
indexName = svc.Config.IndexNames[0]
indexNameNew = indexName + "_ccnew"
indexNameNewNumber = indexName + "_ccnew1"
indexNameNonArtifact = indexName + "_ccnewa"
indexNameOld = indexName + "_ccold"
indexNameOldNumber = indexName + "_ccold2"
)

require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNew, bundle.schema)))
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNewNumber, bundle.schema)))
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNonArtifact, bundle.schema)))
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameOld, bundle.schema)))
require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameOldNumber, bundle.schema)))

require.True(t, requireIndexExists(indexNameNew))
require.True(t, requireIndexExists(indexNameNewNumber))
require.True(t, requireIndexExists(indexNameNonArtifact))
require.True(t, requireIndexExists(indexNameOld))
require.True(t, requireIndexExists(indexNameOldNumber))

{
// Pre-cancel context to simulate a reindexer being stopped while
Expand All @@ -286,7 +307,10 @@ func TestReindexer(t *testing.T) {
}

require.False(t, requireIndexExists(indexNameNew))
require.False(t, requireIndexExists(indexNameNewNumber))
require.True(t, requireIndexExists(indexNameNonArtifact))
require.False(t, requireIndexExists(indexNameOld))
require.False(t, requireIndexExists(indexNameOldNumber))
})

t.Run("StopsImmediately", func(t *testing.T) {
Expand Down
78 changes: 41 additions & 37 deletions riverdbtest/riverdbtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ var (
// TestSchemaOpts are options for TestSchema. Most of the time these can be left
// as nil.
type TestSchemaOpts struct {
// DisableReuse specifies that schema will not be checked in for reuse at
// the end of tests. This is desirable in certain like cases like where a
// test case is making modifications to schema.
// DisableReuse specifies that a schema will not be checked out from the
// idle schema pool or checked in for reuse at the end of tests. This is
// desirable in cases like where a test case is making modifications to
// schema.
//
// Not being able to reuse the schema introduces overhead to tests because
// it means more schemas will need to be generated to replace one not
Expand Down Expand Up @@ -320,44 +321,47 @@ func TestSchema[TTx any](ctx context.Context, tb testutil.TestingTB, driver rive
return userFacingSchema
}

// See if there are any idle schemas that were previously generated during
// this run and have since been checked back into the pool. If so, pop it
// off and run cleanup on it. If not, continue on to generating a new schema
// below. This function never blocks, so we'll prefer generating extra
// schemas rather than optimizing amongst a minimal set that's already there.
if schema := func() string {
idleSchemasMu.Lock()
defer idleSchemasMu.Unlock()

linesIdleSchemas := idleSchemas[databaseAndLinesKey]
if !opts.DisableReuse {
// See if there are any idle schemas that were previously generated
// during this run and have since been checked back into the pool. If
// so, pop it off and run cleanup on it. If not, continue on to
// generating a new schema below. This function never blocks, so we'll
// prefer generating extra schemas rather than optimizing amongst a
// minimal set that's already there.
if schema := func() string {
idleSchemasMu.Lock()
defer idleSchemasMu.Unlock()

linesIdleSchemas := idleSchemas[databaseAndLinesKey]

if len(linesIdleSchemas) < 1 {
return ""
}

if len(linesIdleSchemas) < 1 {
return ""
}
schema := linesIdleSchemas[0]
idleSchemas[databaseAndLinesKey] = linesIdleSchemas[1:]
return schema
}(); schema != "" {
// Should be called BEFORE maybeProcurePool. maybeProcurePool may open a
// pool, and in case it does, we want a cleanup in it that closes the pool
// to run before this cleanup hook that checks the test schema back in.
// Cleanup is FILO, so clean up must appear first to run last.
addCleanupHook(schema)

var (
start = time.Now()
userFacingSchema = maybeProcurePool(schema)
)

if len(truncateTables) > 0 {
require.NoError(tb, driver.GetExecutor().TableTruncate(ctx, &riverdriver.TableTruncateParams{Schema: userFacingSchema, Table: truncateTables}))
}

schema := linesIdleSchemas[0]
idleSchemas[databaseAndLinesKey] = linesIdleSchemas[1:]
return schema
}(); schema != "" {
// Should be called BEFORE maybeProcurePool. maybeProcurePool may open a
// pool, and in case it does, we want a cleanup in it that closes the pool
// to run before this cleanup hook that checks the test schema back in.
// Cleanup is FILO, so clean up must appear first to run last.
addCleanupHook(schema)
tb.Logf("Reusing idle %s schema %q [user facing: %q] after cleaning in %s [%d generated] [%d reused]",
driver.DatabaseName(), schema, userFacingSchema, time.Since(start), stats.numGenerated.Load(), stats.numReused.Add(1))

var (
start = time.Now()
userFacingSchema = maybeProcurePool(schema)
)

if len(truncateTables) > 0 {
require.NoError(tb, driver.GetExecutor().TableTruncate(ctx, &riverdriver.TableTruncateParams{Schema: userFacingSchema, Table: truncateTables}))
return userFacingSchema
}

tb.Logf("Reusing idle %s schema %q [user facing: %q] after cleaning in %s [%d generated] [%d reused]",
driver.DatabaseName(), schema, userFacingSchema, time.Since(start), stats.numGenerated.Load(), stats.numReused.Add(1))

return userFacingSchema
}

// e.g. river_2025_04_14t22_13_58_schema_10
Expand Down
16 changes: 16 additions & 0 deletions riverdbtest/riverdbtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,22 @@ func TestTestSchemaReuse(t *testing.T) { //nolint:paralleltest
// they also may not be.
requireIdle(t, schema1)
requireIdle(t, schema2)

var (
previouslyIdleSchemasForDisableReuse = append([]string(nil), idleSchemas[databaseAndLinesKey]...)
schema3 string
)

t.Run("DisableReuse", func(t *testing.T) { //nolint:paralleltest
schema3 = TestSchema(ctx, t, driver, &TestSchemaOpts{DisableReuse: true})
requireNotIdle(t, schema3)
require.NotContains(t, previouslyIdleSchemasForDisableReuse, schema3)
})

requireNotIdle(t, schema3)
for _, schema := range previouslyIdleSchemasForDisableReuse {
requireIdle(t, schema)
}
}

func TestPackageFromFunc(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type Executor interface {
//
// API is not stable. DO NOT USE.
IndexReindex(ctx context.Context, params *IndexReindexParams) error
IndexReindexArtifacts(ctx context.Context, params *IndexReindexArtifactsParams) ([]string, error)

JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error)
JobCountByAllStates(ctx context.Context, params *JobCountByAllStatesParams) (map[rivertype.JobState]int, error)
Expand Down Expand Up @@ -863,6 +864,11 @@ type IndexReindexParams struct {
Schema string
}

type IndexReindexArtifactsParams struct {
Index string
Schema string
}

type Schema struct {
Name string
}
Expand Down
45 changes: 45 additions & 0 deletions riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading