diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c976cbd..5ee5e58c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245). - Ensure jobs that return a custom timeout of -1 (no timeout) are never rescued. [PR #1288](https://github.com/riverqueue/river/pull/1288). +- Detect numbered PostgreSQL `REINDEX INDEX CONCURRENTLY` artifacts like `_ccnew1` and `_ccold2` so the reindexer does not keep accumulating failed artifact indexes. Fixes [#1296](https://github.com/riverqueue/river/issues/1296). [PR #1297](https://github.com/riverqueue/river/pull/1297). ## [0.39.0] - 2026-06-03 diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index dbfdcbe6..cc167164 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -221,7 +221,8 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err // exist before trying to reindex. When using `CONCURRENTLY`, Postgres // creates a new index suffixed with `_ccnew` before swapping it in as the // new index. The existing index is renamed `_ccold` before being dropped - // concurrently. + // concurrently. If multiple failed artifacts exist, Postgres may add a + // numeric suffix like `_ccnew1` or `_ccold2` to keep names unique. // // If one of these artifacts exists, it probably means that a previous // reindex attempt timed out, and attempting to reindex again is likely @@ -234,16 +235,15 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err // // https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY if !s.skipReindexArtifactCheck { - for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} { - reindexArtifactExists, err := s.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: s.Config.Schema}) - if err != nil { - return false, err - } - if reindexArtifactExists { - s.Logger.WarnContext(ctx, s.Name+": Found reindex artifact likely resulting from previous partially completed reindex attempt; skipping reindex", - slog.String("artifact_name", reindexArtifactName), slog.String("index_name", indexName), slog.Duration("timeout", s.Config.Timeout)) - return false, nil - } + reindexArtifactNames, err := s.exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{Index: indexName, Schema: s.Config.Schema}) + if err != nil { + return false, err + } + + if len(reindexArtifactNames) > 0 { + s.Logger.WarnContext(ctx, s.Name+": Found reindex artifact likely resulting from previous partially completed reindex attempt; skipping reindex", + slog.Any("artifact_names", reindexArtifactNames), slog.String("index_name", indexName), slog.Duration("timeout", s.Config.Timeout)) + return false, nil } } @@ -268,7 +268,12 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, err s.Logger.InfoContext(ctx, s.Name+": Signaled to stop during index build; attempting to clean up concurrent artifacts") - for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} { + reindexArtifactNames, err := s.exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{Index: indexName, Schema: s.Config.Schema}) + if err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error listing reindex artifacts", slog.String("error", err.Error())) + } + + for _, reindexArtifactName := range reindexArtifactNames { if err := s.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: reindexArtifactName, Schema: s.Config.Schema}); err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error dropping reindex artifact", slog.String("artifact_name", reindexArtifactName), slog.String("error", err.Error())) } diff --git a/internal/maintenance/reindexer_test.go b/internal/maintenance/reindexer_test.go index 2d4817a5..3c6f5499 100644 --- a/internal/maintenance/reindexer_test.go +++ b/internal/maintenance/reindexer_test.go @@ -64,13 +64,13 @@ func TestReindexer(t *testing.T) { schema string } - setup := func(t *testing.T) (*Reindexer, *testBundle) { + setupWithOpts := func(t *testing.T, testSchemaOpts *riverdbtest.TestSchemaOpts) (*Reindexer, *testBundle) { t.Helper() var ( dbPool = riversharedtest.DBPool(ctx, t) driver = riverpgxv5.New(dbPool) - schema = riverdbtest.TestSchema(ctx, t, driver, nil) + schema = riverdbtest.TestSchema(ctx, t, driver, testSchemaOpts) ) bundle := &testBundle{ @@ -100,6 +100,12 @@ func TestReindexer(t *testing.T) { return svc, bundle } + setup := func(t *testing.T) (*Reindexer, *testBundle) { + t.Helper() + + return setupWithOpts(t, nil) + } + runImmediatelyThenOnceAnHour := func() func(time.Time) time.Time { alreadyRan := false return func(t time.Time) time.Time { @@ -141,7 +147,13 @@ func TestReindexer(t *testing.T) { t.Run("ReindexSkippedWithReindexArtifact", func(t *testing.T) { t.Parallel() - svc, bundle := setup(t) + svc, bundle := setupWithOpts(t, &riverdbtest.TestSchemaOpts{DisableReuse: true}) + + mockExec := newReindexerExecutorMock(bundle.exec) + mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error { + return nil + } + svc.exec = mockExec requireReindexOne := func(indexName string) bool { didReindex, err := svc.reindexOne(ctx, indexName) @@ -151,20 +163,20 @@ func TestReindexer(t *testing.T) { indexName := svc.Config.IndexNames[0] - // With a `_ccnew` index in place, the reindexer refuses to run. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s_ccnew ON %s.river_job (id)", indexName, bundle.schema))) - require.False(t, requireReindexOne(indexName)) - - // With the index dropped again, reindexing can now occur. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("DROP INDEX %s.%s_ccnew", bundle.schema, indexName))) - require.True(t, requireReindexOne(indexName)) - - // `_ccold` also prevents reindexing. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s_ccold ON %s.river_job (id)", indexName, bundle.schema))) - require.False(t, requireReindexOne(indexName)) + for _, artifactSuffix := range []string{"_ccnew", "_ccnew1", "_ccnew31", "_ccold", "_ccold1"} { + artifactName := indexName + artifactSuffix + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", artifactName, bundle.schema))) + require.False(t, requireReindexOne(indexName)) + require.NoError(t, bundle.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: artifactName, Schema: bundle.schema})) + } - // And with `_ccold` dropped, reindexing can proceed. - require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("DROP INDEX %s.%s_ccold", bundle.schema, indexName))) + for _, artifactSuffix := range []string{"_ccnewa", "_ccnew_1", "_ccoldx", "_ccold_1", "x_ccnew1"} { + artifactName := indexName + artifactSuffix + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", artifactName, bundle.schema))) + t.Cleanup(func() { + require.NoError(t, bundle.exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: artifactName, Schema: bundle.schema})) + }) + } require.True(t, requireReindexOne(indexName)) }) @@ -249,7 +261,7 @@ func TestReindexer(t *testing.T) { t.Run("ReindexDeletesArtifactsWhenCancelledWithStop", func(t *testing.T) { t.Parallel() - svc, bundle := setup(t) + svc, bundle := setupWithOpts(t, &riverdbtest.TestSchemaOpts{DisableReuse: true}) svc.skipReindexArtifactCheck = true requireIndexExists := func(indexName string) bool { @@ -259,16 +271,25 @@ func TestReindexer(t *testing.T) { } var ( - indexName = svc.Config.IndexNames[0] - indexNameNew = indexName + "_ccnew" - indexNameOld = indexName + "_ccold" + indexName = svc.Config.IndexNames[0] + indexNameNew = indexName + "_ccnew" + indexNameNewNumber = indexName + "_ccnew1" + indexNameNonArtifact = indexName + "_ccnewa" + indexNameOld = indexName + "_ccold" + indexNameOldNumber = indexName + "_ccold2" ) require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNew, bundle.schema))) + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNewNumber, bundle.schema))) + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameNonArtifact, bundle.schema))) require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameOld, bundle.schema))) + require.NoError(t, bundle.exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexNameOldNumber, bundle.schema))) require.True(t, requireIndexExists(indexNameNew)) + require.True(t, requireIndexExists(indexNameNewNumber)) + require.True(t, requireIndexExists(indexNameNonArtifact)) require.True(t, requireIndexExists(indexNameOld)) + require.True(t, requireIndexExists(indexNameOldNumber)) { // Pre-cancel context to simulate a reindexer being stopped while @@ -286,7 +307,10 @@ func TestReindexer(t *testing.T) { } require.False(t, requireIndexExists(indexNameNew)) + require.False(t, requireIndexExists(indexNameNewNumber)) + require.True(t, requireIndexExists(indexNameNonArtifact)) require.False(t, requireIndexExists(indexNameOld)) + require.False(t, requireIndexExists(indexNameOldNumber)) }) t.Run("StopsImmediately", func(t *testing.T) { diff --git a/riverdbtest/riverdbtest.go b/riverdbtest/riverdbtest.go index eb1e6df1..acdb3db7 100644 --- a/riverdbtest/riverdbtest.go +++ b/riverdbtest/riverdbtest.go @@ -51,9 +51,10 @@ var ( // TestSchemaOpts are options for TestSchema. Most of the time these can be left // as nil. type TestSchemaOpts struct { - // DisableReuse specifies that schema will not be checked in for reuse at - // the end of tests. This is desirable in certain like cases like where a - // test case is making modifications to schema. + // DisableReuse specifies that a schema will not be checked out from the + // idle schema pool or checked in for reuse at the end of tests. This is + // desirable in cases like where a test case is making modifications to + // schema. // // Not being able to reuse the schema introduces overhead to tests because // it means more schemas will need to be generated to replace one not @@ -320,44 +321,47 @@ func TestSchema[TTx any](ctx context.Context, tb testutil.TestingTB, driver rive return userFacingSchema } - // See if there are any idle schemas that were previously generated during - // this run and have since been checked back into the pool. If so, pop it - // off and run cleanup on it. If not, continue on to generating a new schema - // below. This function never blocks, so we'll prefer generating extra - // schemas rather than optimizing amongst a minimal set that's already there. - if schema := func() string { - idleSchemasMu.Lock() - defer idleSchemasMu.Unlock() - - linesIdleSchemas := idleSchemas[databaseAndLinesKey] + if !opts.DisableReuse { + // See if there are any idle schemas that were previously generated + // during this run and have since been checked back into the pool. If + // so, pop it off and run cleanup on it. If not, continue on to + // generating a new schema below. This function never blocks, so we'll + // prefer generating extra schemas rather than optimizing amongst a + // minimal set that's already there. + if schema := func() string { + idleSchemasMu.Lock() + defer idleSchemasMu.Unlock() + + linesIdleSchemas := idleSchemas[databaseAndLinesKey] + + if len(linesIdleSchemas) < 1 { + return "" + } - if len(linesIdleSchemas) < 1 { - return "" - } + schema := linesIdleSchemas[0] + idleSchemas[databaseAndLinesKey] = linesIdleSchemas[1:] + return schema + }(); schema != "" { + // Should be called BEFORE maybeProcurePool. maybeProcurePool may open a + // pool, and in case it does, we want a cleanup in it that closes the pool + // to run before this cleanup hook that checks the test schema back in. + // Cleanup is FILO, so clean up must appear first to run last. + addCleanupHook(schema) + + var ( + start = time.Now() + userFacingSchema = maybeProcurePool(schema) + ) + + if len(truncateTables) > 0 { + require.NoError(tb, driver.GetExecutor().TableTruncate(ctx, &riverdriver.TableTruncateParams{Schema: userFacingSchema, Table: truncateTables})) + } - schema := linesIdleSchemas[0] - idleSchemas[databaseAndLinesKey] = linesIdleSchemas[1:] - return schema - }(); schema != "" { - // Should be called BEFORE maybeProcurePool. maybeProcurePool may open a - // pool, and in case it does, we want a cleanup in it that closes the pool - // to run before this cleanup hook that checks the test schema back in. - // Cleanup is FILO, so clean up must appear first to run last. - addCleanupHook(schema) + tb.Logf("Reusing idle %s schema %q [user facing: %q] after cleaning in %s [%d generated] [%d reused]", + driver.DatabaseName(), schema, userFacingSchema, time.Since(start), stats.numGenerated.Load(), stats.numReused.Add(1)) - var ( - start = time.Now() - userFacingSchema = maybeProcurePool(schema) - ) - - if len(truncateTables) > 0 { - require.NoError(tb, driver.GetExecutor().TableTruncate(ctx, &riverdriver.TableTruncateParams{Schema: userFacingSchema, Table: truncateTables})) + return userFacingSchema } - - tb.Logf("Reusing idle %s schema %q [user facing: %q] after cleaning in %s [%d generated] [%d reused]", - driver.DatabaseName(), schema, userFacingSchema, time.Since(start), stats.numGenerated.Load(), stats.numReused.Add(1)) - - return userFacingSchema } // e.g. river_2025_04_14t22_13_58_schema_10 diff --git a/riverdbtest/riverdbtest_test.go b/riverdbtest/riverdbtest_test.go index 138e38df..9ddb499b 100644 --- a/riverdbtest/riverdbtest_test.go +++ b/riverdbtest/riverdbtest_test.go @@ -230,6 +230,22 @@ func TestTestSchemaReuse(t *testing.T) { //nolint:paralleltest // they also may not be. requireIdle(t, schema1) requireIdle(t, schema2) + + var ( + previouslyIdleSchemasForDisableReuse = append([]string(nil), idleSchemas[databaseAndLinesKey]...) + schema3 string + ) + + t.Run("DisableReuse", func(t *testing.T) { //nolint:paralleltest + schema3 = TestSchema(ctx, t, driver, &TestSchemaOpts{DisableReuse: true}) + requireNotIdle(t, schema3) + require.NotContains(t, previouslyIdleSchemasForDisableReuse, schema3) + }) + + requireNotIdle(t, schema3) + for _, schema := range previouslyIdleSchemasForDisableReuse { + requireIdle(t, schema) + } } func TestPackageFromFunc(t *testing.T) { diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 7d92ab32..20b92c0c 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -206,6 +206,7 @@ type Executor interface { // // API is not stable. DO NOT USE. IndexReindex(ctx context.Context, params *IndexReindexParams) error + IndexReindexArtifacts(ctx context.Context, params *IndexReindexArtifactsParams) ([]string, error) JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error) JobCountByAllStates(ctx context.Context, params *JobCountByAllStatesParams) (map[rivertype.JobState]int, error) @@ -863,6 +864,11 @@ type IndexReindexParams struct { Schema string } +type IndexReindexArtifactsParams struct { + Index string + Schema string +} + type Schema struct { Name string } diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go index e734ca4a..3632abc6 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/schema.sql.go @@ -57,6 +57,51 @@ func (q *Queries) IndexExists(ctx context.Context, db DBTX, arg *IndexExistsPara return exists, err } +const indexReindexArtifacts = `-- name: IndexReindexArtifacts :many +WITH index_artifacts AS ( + SELECT + c.relname::text AS index_name, + substring(c.relname FROM length($1::text) + 1) AS suffix + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = coalesce($2::text, current_schema()) + AND c.relkind = 'i' + AND left(c.relname, length($1::text)) = $1::text +) +SELECT index_name +FROM index_artifacts +WHERE suffix ~ '^_cc(new|old)[0-9]*$' +ORDER BY index_name +` + +type IndexReindexArtifactsParams struct { + Index string + Schema sql.NullString +} + +func (q *Queries) IndexReindexArtifacts(ctx context.Context, db DBTX, arg *IndexReindexArtifactsParams) ([]string, error) { + rows, err := db.QueryContext(ctx, indexReindexArtifacts, arg.Index, arg.Schema) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var index_name string + if err := rows.Scan(&index_name); err != nil { + return nil, err + } + items = append(items, index_name) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const indexesExist = `-- name: IndexesExist :many WITH index_names AS ( SELECT unnest($2::text[]) as index_name diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 32733e89..211a4374 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -147,7 +147,7 @@ func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.In maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } @@ -168,10 +168,21 @@ func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexRe maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } +func (e *Executor) IndexReindexArtifacts(ctx context.Context, params *riverdriver.IndexReindexArtifactsParams) ([]string, error) { + artifacts, err := dbsqlc.New().IndexReindexArtifacts(ctx, e.dbtx, &dbsqlc.IndexReindexArtifactsParams{ + Index: params.Index, + Schema: sql.NullString{String: params.Schema, Valid: params.Schema != ""}, + }) + if err != nil { + return nil, interpretError(err) + } + return artifacts, nil +} + func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { rows, err := dbsqlc.New().IndexesExist(ctx, e.dbtx, &dbsqlc.IndexesExistParams{ IndexNames: params.IndexNames, diff --git a/riverdriver/riverdrivertest/schema_introspection.go b/riverdriver/riverdrivertest/schema_introspection.go index 14eca2a7..8ebe32dc 100644 --- a/riverdriver/riverdrivertest/schema_introspection.go +++ b/riverdriver/riverdrivertest/schema_introspection.go @@ -97,7 +97,7 @@ func exerciseSchemaIntrospection[TTx any](ctx context.Context, t *testing.T, } err := driver.GetExecutor().IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{ - Index: "river_job_index_drop_if_exists ", + Index: "river_job_index_drop_if_exists", Schema: schema, }) require.NoError(t, err) @@ -177,6 +177,61 @@ func exerciseSchemaIntrospection[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) }) + t.Run("IndexReindexArtifacts", func(t *testing.T) { + t.Parallel() + + driver, schema := driverWithSchema(ctx, t, nil) + exec := driver.GetExecutor() + + baseIndexName := "river_job_reindex_artifacts_index" + artifactNames := []string{ + baseIndexName + "_ccnew", + baseIndexName + "_ccnew1", + baseIndexName + "_ccnew31", + baseIndexName + "_ccold", + baseIndexName + "_ccold2", + } + nonArtifactNames := []string{ + baseIndexName + "_ccnewa", + baseIndexName + "_ccnew_1", + baseIndexName + "_ccoldx", + baseIndexName + "_ccold_2", + baseIndexName + "x_ccnew1", + } + + indexNames := make([]string, 0, len(artifactNames)+len(nonArtifactNames)) + indexNames = append(indexNames, artifactNames...) + indexNames = append(indexNames, nonArtifactNames...) + for _, indexName := range indexNames { + require.NoError(t, exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: indexName, Schema: schema})) + } + t.Cleanup(func() { + for _, indexName := range indexNames { + require.NoError(t, exec.IndexDropIfExists(ctx, &riverdriver.IndexDropIfExistsParams{Index: indexName, Schema: schema})) + } + }) + + for _, indexName := range indexNames { + if driver.DatabaseName() == riverdriver.DatabaseNameSQLite { + require.NoError(t, exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON river_job (id)", indexName))) + } else { + require.NoError(t, exec.Exec(ctx, fmt.Sprintf("CREATE INDEX %s ON %s.river_job (id)", indexName, schema))) + } + } + + reindexArtifactNames, err := exec.IndexReindexArtifacts(ctx, &riverdriver.IndexReindexArtifactsParams{ + Index: baseIndexName, + Schema: schema, + }) + require.NoError(t, err) + + if driver.DatabaseName() == riverdriver.DatabaseNameSQLite { + require.Empty(t, reindexArtifactNames) + } else { + require.Equal(t, artifactNames, reindexArtifactNames) + } + }) + t.Run("IndexesExist", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql index e60ece66..a9953a44 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql @@ -17,6 +17,22 @@ SELECT EXISTS ( AND pg_class.relkind = 'i' ); +-- name: IndexReindexArtifacts :many +WITH index_artifacts AS ( + SELECT + c.relname::text AS index_name, + substring(c.relname FROM length(@index::text) + 1) AS suffix + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = coalesce(sqlc.narg('schema')::text, current_schema()) + AND c.relkind = 'i' + AND left(c.relname, length(@index::text)) = @index::text +) +SELECT index_name +FROM index_artifacts +WHERE suffix ~ '^_cc(new|old)[0-9]*$' +ORDER BY index_name; + -- name: IndexesExist :many WITH index_names AS ( SELECT unnest(@index_names::text[]) as index_name diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go index 5860284e..25000997 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/schema.sql.go @@ -56,6 +56,48 @@ func (q *Queries) IndexExists(ctx context.Context, db DBTX, arg *IndexExistsPara return exists, err } +const indexReindexArtifacts = `-- name: IndexReindexArtifacts :many +WITH index_artifacts AS ( + SELECT + c.relname::text AS index_name, + substring(c.relname FROM length($1::text) + 1) AS suffix + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = coalesce($2::text, current_schema()) + AND c.relkind = 'i' + AND left(c.relname, length($1::text)) = $1::text +) +SELECT index_name +FROM index_artifacts +WHERE suffix ~ '^_cc(new|old)[0-9]*$' +ORDER BY index_name +` + +type IndexReindexArtifactsParams struct { + Index string + Schema pgtype.Text +} + +func (q *Queries) IndexReindexArtifacts(ctx context.Context, db DBTX, arg *IndexReindexArtifactsParams) ([]string, error) { + rows, err := db.Query(ctx, indexReindexArtifacts, arg.Index, arg.Schema) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var index_name string + if err := rows.Scan(&index_name); err != nil { + return nil, err + } + items = append(items, index_name) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const indexesExist = `-- name: IndexesExist :many WITH index_names AS ( SELECT unnest($2::text[]) as index_name diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 54c3f567..41b510eb 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -155,7 +155,7 @@ func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.In maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.Exec(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+params.Index) + _, err := e.dbtx.Exec(ctx, "DROP INDEX CONCURRENTLY IF EXISTS "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } @@ -176,10 +176,21 @@ func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexRe maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + _, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } +func (e *Executor) IndexReindexArtifacts(ctx context.Context, params *riverdriver.IndexReindexArtifactsParams) ([]string, error) { + artifacts, err := dbsqlc.New().IndexReindexArtifacts(ctx, e.dbtx, &dbsqlc.IndexReindexArtifactsParams{ + Index: params.Index, + Schema: pgtype.Text{String: params.Schema, Valid: params.Schema != ""}, + }) + if err != nil { + return nil, interpretError(err) + } + return artifacts, nil +} + func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { rows, err := dbsqlc.New().IndexesExist(ctx, e.dbtx, &dbsqlc.IndexesExistParams{ IndexNames: params.IndexNames, diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index b479b728..c7df4964 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -209,7 +209,7 @@ func (e *Executor) IndexDropIfExists(ctx context.Context, params *riverdriver.In maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "DROP INDEX IF EXISTS "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "DROP INDEX IF EXISTS "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } @@ -224,10 +224,14 @@ func (e *Executor) IndexReindex(ctx context.Context, params *riverdriver.IndexRe maybeSchema = dbutil.SafeIdentifier(params.Schema) + "." } - _, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+params.Index) + _, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+dbutil.SafeIdentifier(params.Index)) return interpretError(err) } +func (e *Executor) IndexReindexArtifacts(ctx context.Context, params *riverdriver.IndexReindexArtifactsParams) ([]string, error) { + return nil, nil +} + func (e *Executor) IndexesExist(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error) { exists := make(map[string]bool) for _, index := range params.IndexNames {