From f5318c6ab8de3de389b17aa333e3ee96622a4420 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 2 Jul 2026 09:38:19 -0500 Subject: [PATCH] reindex numbered concurrent artifacts Postgres can leave invalid `_ccnew` and `_ccold` indexes after a failed concurrent reindex. When artifacts already existed and Postgres had to append digits to keep names unique, River only checked the unnumbered forms and could keep trying the same reindex, adding more large artifacts. Add a driver-level catalog lookup for artifacts belonging to a configured index. It matches the exact configured index name followed by `_ccnew` or `_ccold` with optional digits, returns the names for logging and cleanup, and lets stop-time cleanup drop the same numbered artifacts. SQLite keeps no-op semantics because it does not create concurrent reindex artifacts. Extend the maintenance and driver conformance coverage for numbered forms and near-miss names so similar user-created indexes are not treated as PostgreSQL artifacts. Make `riverdbtest.TestSchema`'s `DisableReuse` option use a fresh schema and avoid returning it to the idle pool, which keeps schema-mutating tests from leaking ad hoc indexes into later test cases. Document the user-facing fix in the changelog. Fixes #1296. --- CHANGELOG.md | 1 + internal/maintenance/reindexer.go | 29 ++++--- internal/maintenance/reindexer_test.go | 64 ++++++++++----- riverdbtest/riverdbtest.go | 78 ++++++++++--------- riverdbtest/riverdbtest_test.go | 16 ++++ riverdriver/river_driver_interface.go | 6 ++ .../internal/dbsqlc/schema.sql.go | 45 +++++++++++ .../river_database_sql_driver.go | 15 +++- .../riverdrivertest/schema_introspection.go | 57 +++++++++++++- .../riverpgxv5/internal/dbsqlc/schema.sql | 16 ++++ .../riverpgxv5/internal/dbsqlc/schema.sql.go | 42 ++++++++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 15 +++- .../riversqlite/river_sqlite_driver.go | 8 +- 13 files changed, 316 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c976cbd..5ee5e58c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245). - Ensure jobs that return a custom timeout of -1 (no timeout) are never rescued. [PR #1288](https://github.com/riverqueue/river/pull/1288). +- Detect numbered PostgreSQL `REINDEX INDEX CONCURRENTLY` artifacts like `_ccnew1` and `_ccold2` so the reindexer does not keep accumulating failed artifact indexes. Fixes [#1296](https://github.com/riverqueue/river/issues/1296). [PR #1297](https://github.com/riverqueue/river/pull/1297). ## [0.39.0] - 2026-06-03 diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index dbfdcbe6..cc167164 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -221,7 +221,8 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err // exist before trying to reindex. When using `CONCURRENTLY`, Postgres // creates a new index suffixed with `_ccnew` before swapping it in as the // new index. The existing index is renamed `_ccold` before being dropped - // concurrently. + // concurrently. If multiple failed artifacts exist, Postgres may add a + // numeric suffix like `_ccnew1` or `_ccold2` to keep names unique. // // If one of these artifacts exists, it probably means that a previous // reindex attempt timed out, and attempting to reindex again is likely @@ -234,16 +235,15 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err // // https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY if !s.skipReindexArtifactCheck { - for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} { - reindexArtifactExists, err := s.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: s.Config.Schema}) - if err != nil { - return false, err - } - if reindexArtifactExists { - s.Logger.WarnContext(ctx, s.Name+": Found reindex artifact likely resulting from previous partially completed reindex attempt; skipping reindex", - slog.String("artifact_name", reindexArtifactName), slog.String("index_name", indexName), slog.Duration("timeout", s.Config.Timeout)) - return false, nil - } + reindexArtifactNames, err := s.exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{Index: indexName, Schema: s.Config.Schema}) + if err != nil { + return false, err + } + + if len(reindexArtifactNames) > 0 { + s.Logger.WarnContext(ctx, s.Name+": Found reindex artifact likely resulting from previous partially completed reindex attempt; skipping reindex", + slog.Any("artifact_names", reindexArtifactNames), slog.String("index_name", indexName), slog.Duration("timeout", s.Config.Timeout)) + return false, nil } } @@ -268,7 +268,12 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err s.Logger.InfoContext(ctx, s.Name+": Signaled to stop during index build; attempting to clean up concurrent artifacts") - for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} { + reindexArtifactNames, err := s.exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{Index: indexName, Schema: s.Config.Schema}) + if err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error listing reindex artifacts", slog.String("error", err.Error())) + } + + for _, reindexArtifactName := range reindexArtifactNames { if err := s.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: reindexArtifactName, Schema: s.Config.Schema}); err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error dropping reindex artifact", slog.String("artifact_name", reindexArtifactName), slog.String("error", err.Error())) } diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 2d4817a5..3c6f5499 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -64,13 +64,13 @@ func TestReindexer(t *testing.T) { schema string } - setup := func(t *testing.T) (*Reindexer, *testBundle) { + setupWithOpts := func(t *testing.T, testSchemaOpts *riverdbtest.TestSchemaOpts) (*Reindexer, *testBundle) { t.Helper() var ( dbPool = riversharedtest.DBPool(ctx, t) driver = riverpgxv5.New(dbPool) - schema = riverdbtest.TestSchema(ctx, t, driver, nil) + schema = riverdbtest.TestSchema(ctx, t, driver, testSchemaOpts) ) bundle := &testBundle{ @@ -100,6 +100,12 @@ func TestReindexer(t *testing.T) { return svc, bundle } + setup := func(t *testing.T) (*Reindexer, *testBundle) { + t.Helper() + + return setupWithOpts(t, nil) + } + runImmediatelyThenOnceAnHour := func() func(time.Time) time.Time { alreadyRan := false return func(t time.Time) time.Time { @@ -141,7 +147,13 @@ func TestReindexer(t *testing.T) { t.Run("ReindexSkippedWithReindexArtifact", func(t *testing.T) { t.Parallel() - svc, bundle := setup(t) + svc, bundle := setupWithOpts(t, &riverdbtest.TestSchemaOpts{DisableReuse: true}) + + mockExec := newReindexerExecutorMock(bundle.exec) + mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error { + return nil + } + svc.exec = mockExec requireReindexOne := func(indexName string) bool { didReindex, err := svc.reindexOne(ctx, indexName) @@ -151,20 +163,20 @@ func TestReindexer(t *testing.T) { indexName := svc.Config.IndexNames[0] - // With a `_ccnew` index in place, the reindexer refuses to run. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s_ccnew ON %s.river_job (id)", indexName, bundle.schema))) - require.False(t, requireReindexOne(indexName)) - - // With the index dropped again, reindexing can now occur. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("DROP INDEX %s.%s_ccnew", bundle.schema, indexName))) - require.True(t, requireReindexOne(indexName)) - - // `_ccold` also prevents reindexing. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s_ccold ON %s.river_job (id)", indexName, bundle.schema))) - require.False(t, requireReindexOne(indexName)) + for _, artifactSuffix := range []string{"_ccnew", "_ccnew1", "_ccnew31", "_ccold", "_ccold1"} { + artifactName := indexName + artifactSuffix + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", artifactName, bundle.schema))) + require.False(t, requireReindexOne(indexName)) + require.NoError(t, bundle.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: artifactName, Schema: bundle.schema})) + } - // And with `_ccold` dropped, reindexing can proceed. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("DROP INDEX %s.%s_ccold", bundle.schema, indexName))) + for _, artifactSuffix := range []string{"_ccnewa", "_ccnew_1", "_ccoldx", "_ccold_1", "x_ccnew1"} { + artifactName := indexName + artifactSuffix + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", artifactName, bundle.schema))) + t.Cleanup(func() { + require.NoError(t, bundle.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: artifactName, Schema: bundle.schema})) + }) + } require.True(t, requireReindexOne(indexName)) }) @@ -249,7 +261,7 @@ func TestReindexer(t *testing.T) { t.Run("ReindexDeletesArtifactsWhenCancelledWithStop", func(t *testing.T) { t.Parallel() - svc, bundle := setup(t) + svc, bundle := setupWithOpts(t, &riverdbtest.TestSchemaOpts{DisableReuse: true}) svc.skipReindexArtifactCheck = true requireIndexExists := func(indexName string) bool { @@ -259,16 +271,25 @@ func TestReindexer(t *testing.T) { } var ( - indexName = svc.Config.IndexNames[0] - indexNameNew = indexName + "_ccnew" - indexNameOld = indexName + "_ccold" + indexName = svc.Config.IndexNames[0] + indexNameNew = indexName + "_ccnew" + indexNameNewNumber = indexName + "_ccnew1" + indexNameNonArtifact = indexName + "_ccnewa" + indexNameOld = indexName + "_ccold" + indexNameOldNumber = indexName + "_ccold2" ) require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNew, bundle.schema))) + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNewNumber, bundle.schema))) + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNonArtifact, bundle.schema))) require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameOld, bundle.schema))) + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameOldNumber, bundle.schema))) require.True(t, requireIndexExists(indexNameNew)) + require.True(t, requireIndexExists(indexNameNewNumber)) + require.True(t, requireIndexExists(indexNameNonArtifact)) require.True(t, requireIndexExists(indexNameOld)) + require.True(t, requireIndexExists(indexNameOldNumber)) { // Pre-cancel context to simulate a reindexer being stopped while @@ -286,7 +307,10 @@ func TestReindexer(t *testing.T) { } require.False(t, requireIndexExists(indexNameNew)) + require.False(t, requireIndexExists(indexNameNewNumber)) + require.True(t, requireIndexExists(indexNameNonArtifact)) require.False(t, requireIndexExists(indexNameOld)) + require.False(t, requireIndexExists(indexNameOldNumber)) }) t.Run("StopsImmediately", func(t *testing.T) { diff --git a/riverdbtest/riverdbtest.go b/riverdbtest/riverdbtest.go index eb1e6df1..acdb3db7 100644 --- a/riverdbtest/riverdbtest.go +++ b/riverdbtest/riverdbtest.go @@ -51,9 +51,10 @@ var ( // TestSchemaOpts are options for TestSchema. Most of the time these can be left // as nil. type TestSchemaOpts struct { - // DisableReuse specifies that schema will not be checked in for reuse at - // the end of tests. This is desirable in certain like cases like where a - // test case is making modifications to schema. + // DisableReuse specifies that a schema will not be checked out from the + // idle schema pool or checked in for reuse at the end of tests. This is + // desirable in cases like where a test case is making modifications to + // schema. // // Not being able to reuse the schema introduces overhead to tests because // it means more schemas will need to be generated to replace one not @@ -320,44 +321,47 @@ func TestSchema[TTx any](ctx context.Context, tb testutil.TestingTB, driver rive return userFacingSchema } - // See if there are any idle schemas that were previously generated during - // this run and have since been checked back into the pool. If so, pop it - // off and run cleanup on it. If not, continue on to generating a new schema - // below. This function never blocks, so we'll prefer generating extra - // schemas rather than optimizing amongst a minimal set that's already there. - if schema := func() string { - idleSchemasMu.Lock() - defer idleSchemasMu.Unlock() - - linesIdleSchemas := idleSchemas[databaseAndLinesKey] + if !opts.DisableReuse { + // See if there are any idle schemas that were previously generated + // during this run and have since been checked back into the pool. If + // so, pop it off and run cleanup on it. If not, continue on to + // generating a new schema below. This function never blocks, so we'll + // prefer generating extra schemas rather than optimizing amongst a + // minimal set that's already there. + if schema := func() string { + idleSchemasMu.Lock() + defer idleSchemasMu.Unlock() + + linesIdleSchemas := idleSchemas[databaseAndLinesKey] + + if len(linesIdleSchemas) < 1 { + return "" + } - if len(linesIdleSchemas) < 1 { - return "" - } + schema := linesIdleSchemas[0] + idleSchemas[databaseAndLinesKey] = linesIdleSchemas[1:] + return schema + }(); schema != "" { + // Should be called BEFORE maybeProcurePool. maybeProcurePool may open a + // pool, and in case it does, we want a cleanup in it that closes the pool + // to run before this cleanup hook that checks the test schema back in. + // Cleanup is FILO, so clean up must appear first to run last. + addCleanupHook(schema) + + var ( + start = time.Now() + userFacingSchema = maybeProcurePool(schema) + ) + + if len(truncateTables) > 0 { + require.NoError(tb, driver.GetExecutor().TableTruncate(ctx, &riverdriver.TableTruncateParams{Schema: userFacingSchema, Table: truncateTables})) + } - schema := linesIdleSchemas[0] - idleSchemas[databaseAndLinesKey] = linesIdleSchemas[1:] - return schema - }(); schema != "" { - // Should be called BEFORE maybeProcurePool. maybeProcurePool may open a - // pool, and in case it does, we want a cleanup in it that closes the pool - // to run before this cleanup hook that checks the test schema back in. - // Cleanup is FILO, so clean up must appear first to run last. - addCleanupHook(schema) + tb.Logf("Reusing idle %s schema %q [user facing: %q] after cleaning in %s [%d generated] [%d reused]", + driver.DatabaseName(), schema, userFacingSchema, time.Since(start), stats.numGenerated.Load(), stats.numReused.Add(1)) - var ( - start = time.Now() - userFacingSchema = maybeProcurePool(schema) - ) - - if len(truncateTables) > 0 { - require.NoError(tb, driver.GetExecutor().TableTruncate(ctx, &riverdriver.TableTruncateParams{Schema: userFacingSchema, Table: truncateTables})) + return userFacingSchema } - - tb.Logf("Reusing idle %s schema %q [user facing: %q] after cleaning in %s [%d generated] [%d reused]", - driver.DatabaseName(), schema, userFacingSchema, time.Since(start), stats.numGenerated.Load(), stats.numReused.Add(1)) - - return userFacingSchema } // e.g. river_2025_04_14t22_13_58_schema_10 diff --git a/riverdbtest/riverdbtest_test.go b/riverdbtest/riverdbtest_test.go index 138e38df..9ddb499b 100644 --- a/riverdbtest/riverdbtest_test.go +++ b/riverdbtest/riverdbtest_test.go @@ -230,6 +230,22 @@ func TestTestSchemaReuse(t *testing.T) { //nolint:paralleltest // they also may not be. requireIdle(t, schema1) requireIdle(t, schema2) + + var ( + previouslyIdleSchemasForDisableReuse = append([]string(nil), idleSchemas[databaseAndLinesKey]...) + schema3 string + ) + + t.Run("DisableReuse", func(t *testing.T) { //nolint:paralleltest + schema3 = TestSchema(ctx, t, driver, &TestSchemaOpts{DisableReuse: true}) + requireNotIdle(t, schema3) + require.NotContains(t, previouslyIdleSchemasForDisableReuse, schema3) + }) + + requireNotIdle(t, schema3) + for _, schema := range previouslyIdleSchemasForDisableReuse { + requireIdle(t, schema) + } } func TestPackageFromFunc(t *testing.T) { diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 7d92ab32..20b92c0c 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -206,6 +206,7 @@ type Executor interface { // // API is not stable. DO NOT USE. IndexReindex(ctx context.Context, params *IndexReindexParams) error + IndexReindexArtifacts(ctx context.Context, params *IndexReindexArtifactsParams) ([]string, error) JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error) JobCountByAllStates(ctx context.Context, params *JobCountByAllStatesParams) (map[rivertype.JobState]int, error) @@ -863,6 +864,11 @@ type IndexReindexParams struct { Schema string } +type IndexReindexArtifactsParams struct { + Index string + Schema string +} + type Schema struct { Name string } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go index e734ca4a..3632abc6 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go @@ -57,6 +57,51 @@ func (q *Queries) IndexExists(ctx context.Context, db DBTX, arg *IndexExistsPara return exists, err } +const indexReindexArtifacts = `-- name: IndexReindexArtifacts :many +WITH index_artifacts AS ( + SELECT + c.relname::text AS index_name, + substring(c.relname FROM length($1::text) + 1) AS suffix + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = coalesce($2::text, current_schema()) + AND c.relkind = 'i' + AND left(c.relname, length($1::text)) = $1::text +) +SELECT index_name +FROM index_artifacts +WHERE suffix ~ '^_cc(new|old)[0-9]*$' +ORDER BY index_name +` + +type IndexReindexArtifactsParams struct { + Index string + Schema sql.NullString +} + +func (q *Queries) IndexReindexArtifacts(ctx context.Context, db DBTX, arg *IndexReindexArtifactsParams) ([]string, error) { + rows, err := db.QueryContext(ctx, indexReindexArtifacts, arg.Index, arg.Schema) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var index_name string + if err := rows.Scan(&index_name); err != nil { + return nil, err + } + items = append(items, index_name) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const indexesExist = `-- name: IndexesExist :many WITH index_names AS ( SELECT unnest($2::text[]) as index_name diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 32733e89..211a4374 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -147,7 +147,7 @@ func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.In maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } @@ -168,10 +168,21 @@ func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexRe maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } +func (e *Executor) IndexReindexArtifacts(ctx context.Context, params *riverdriver.IndexReindexArtifactsParams) ([]string, error) { + artifacts, err := dbsqlc.New().IndexReindexArtifacts(ctx, e.dbtx, &dbsqlc.IndexReindexArtifactsParams{ + Index: params.Index, + Schema: sql.NullString{String: params.Schema, Valid: params.Schema != ""}, + }) + if err != nil { + return nil, interpretError(err) + } + return artifacts, nil +} + func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { rows, err := dbsqlc.New().IndexesExist(ctx, e.dbtx, &dbsqlc.IndexesExistParams{ IndexNames: params.IndexNames, diff --git a/riverdriver/riverdrivertest/schema_introspection.go b/riverdriver/riverdrivertest/schema_introspection.go index 14eca2a7..8ebe32dc 100644 --- a/riverdriver/riverdrivertest/schema_introspection.go +++ b/riverdriver/riverdrivertest/schema_introspection.go @@ -97,7 +97,7 @@ func exerciseSchemaIntrospection[TTx any](ctx context.Context, t *testing.T, } err := driver.GetExecutor().IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{ - Index: "river_job_index_drop_if_exists ", + Index: "river_job_index_drop_if_exists", Schema: schema, }) require.NoError(t, err) @@ -177,6 +177,61 @@ func exerciseSchemaIntrospection[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) }) + t.Run("IndexReindexArtifacts", func(t *testing.T) { + t.Parallel() + + driver, schema := driverWithSchema(ctx, t, nil) + exec := driver.GetExecutor() + + baseIndexName := "river_job_reindex_artifacts_index" + artifactNames := []string{ + baseIndexName + "_ccnew", + baseIndexName + "_ccnew1", + baseIndexName + "_ccnew31", + baseIndexName + "_ccold", + baseIndexName + "_ccold2", + } + nonArtifactNames := []string{ + baseIndexName + "_ccnewa", + baseIndexName + "_ccnew_1", + baseIndexName + "_ccoldx", + baseIndexName + "_ccold_2", + baseIndexName + "x_ccnew1", + } + + indexNames := make([]string, 0, len(artifactNames)+len(nonArtifactNames)) + indexNames = append(indexNames, artifactNames...) + indexNames = append(indexNames, nonArtifactNames...) + for _, indexName := range indexNames { + require.NoError(t, exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: indexName, Schema: schema})) + } + t.Cleanup(func() { + for _, indexName := range indexNames { + require.NoError(t, exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: indexName, Schema: schema})) + } + }) + + for _, indexName := range indexNames { + if driver.DatabaseName() == riverdriver.DatabaseNameSQLite { + require.NoError(t, exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON river_job (id)", indexName))) + } else { + require.NoError(t, exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexName, schema))) + } + } + + reindexArtifactNames, err := exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{ + Index: baseIndexName, + Schema: schema, + }) + require.NoError(t, err) + + if driver.DatabaseName() == riverdriver.DatabaseNameSQLite { + require.Empty(t, reindexArtifactNames) + } else { + require.Equal(t, artifactNames, reindexArtifactNames) + } + }) + t.Run("IndexesExist", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql index e60ece66..a9953a44 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql @@ -17,6 +17,22 @@ SELECT EXISTS ( AND pg_class.relkind = 'i' ); +-- name: IndexReindexArtifacts :many +WITH index_artifacts AS ( + SELECT + c.relname::text AS index_name, + substring(c.relname FROM length(@index::text) + 1) AS suffix + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = coalesce(sqlc.narg('schema')::text, current_schema()) + AND c.relkind = 'i' + AND left(c.relname, length(@index::text)) = @index::text +) +SELECT index_name +FROM index_artifacts +WHERE suffix ~ '^_cc(new|old)[0-9]*$' +ORDER BY index_name; + -- name: IndexesExist :many WITH index_names AS ( SELECT unnest(@index_names::text[]) as index_name diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go index 5860284e..25000997 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go @@ -56,6 +56,48 @@ func (q *Queries) IndexExists(ctx context.Context, db DBTX, arg *IndexExistsPara return exists, err } +const indexReindexArtifacts = `-- name: IndexReindexArtifacts :many +WITH index_artifacts AS ( + SELECT + c.relname::text AS index_name, + substring(c.relname FROM length($1::text) + 1) AS suffix + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = coalesce($2::text, current_schema()) + AND c.relkind = 'i' + AND left(c.relname, length($1::text)) = $1::text +) +SELECT index_name +FROM index_artifacts +WHERE suffix ~ '^_cc(new|old)[0-9]*$' +ORDER BY index_name +` + +type IndexReindexArtifactsParams struct { + Index string + Schema pgtype.Text +} + +func (q *Queries) IndexReindexArtifacts(ctx context.Context, db DBTX, arg *IndexReindexArtifactsParams) ([]string, error) { + rows, err := db.Query(ctx, indexReindexArtifacts, arg.Index, arg.Schema) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var index_name string + if err := rows.Scan(&index_name); err != nil { + return nil, err + } + items = append(items, index_name) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const indexesExist = `-- name: IndexesExist :many WITH index_names AS ( SELECT unnest($2::text[]) as index_name diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 54c3f567..41b510eb 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -155,7 +155,7 @@ func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.In maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.Exec(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+params.Index) + _, err := e.dbtx.Exec(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } @@ -176,10 +176,21 @@ func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexRe maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + _, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } +func (e *Executor) IndexReindexArtifacts(ctx context.Context, params *riverdriver.IndexReindexArtifactsParams) ([]string, error) { + artifacts, err := dbsqlc.New().IndexReindexArtifacts(ctx, e.dbtx, &dbsqlc.IndexReindexArtifactsParams{ + Index: params.Index, + Schema: pgtype.Text{String: params.Schema, Valid: params.Schema != ""}, + }) + if err != nil { + return nil, interpretError(err) + } + return artifacts, nil +} + func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { rows, err := dbsqlc.New().IndexesExist(ctx, e.dbtx, &dbsqlc.IndexesExistParams{ IndexNames: params.IndexNames, diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index b479b728..c7df4964 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -209,7 +209,7 @@ func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.In maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "DROP INDEX IF EXISTS "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "DROP INDEX IF EXISTS "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } @@ -224,10 +224,14 @@ func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexRe maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } +func (e *Executor) IndexReindexArtifacts(ctx context.Context, params *riverdriver.IndexReindexArtifactsParams) ([]string, error) { + return nil, nil +} + func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { exists := make(map[string]bool) for _, index := range params.IndexNames {