diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index 9f868f8b..5e379bb3 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -162,13 +162,7 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) error { ctx, cancel := context.WithTimeout(ctx, s.Config.Timeout) defer cancel() - var maybeSchema string - if s.Config.Schema != "" { - maybeSchema = s.Config.Schema + "." - } - - _, err := s.exec.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+indexName) - if err != nil { + if err := s.exec.Reindex(ctx, &riverdriver.ReindexParams{Schema: s.Config.Schema, Index: indexName}); err != nil { return err } diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 3a862802..9c305e7a 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -3826,6 +3826,20 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, "foo", fieldFoo) }) + t.Run("Reindex", func(t *testing.T) { + t.Parallel() + + // Postgres runs the reindex with `CONCURRENTLY` so this must use a full + // schema rather than a transaction block. + driver, schema := driverWithSchema(ctx, t, nil) + + err := driver.GetExecutor().Reindex(ctx, &riverdriver.ReindexParams{ + Index: "river_job_kind", + Schema: schema, + }) + require.NoError(t, err) + }) + t.Run("SchemaGetExpired", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 6bfb8f27..b84b06a2 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -245,6 +245,12 @@ type Executor interface { QueueResume(ctx context.Context, params *QueueResumeParams) error QueueUpdate(ctx context.Context, params *QueueUpdateParams) (*rivertype.Queue, error) QueryRow(ctx context.Context, sql string, args ...any) Row + + // Reindex reindexes a database index. This abstraction is a little leaky + // right now because Postgres runs this `CONCURRENTLY` and that's not + // possible in SQLite. + Reindex(ctx context.Context, params *ReindexParams) error + SchemaCreate(ctx context.Context, params *SchemaCreateParams) error SchemaDrop(ctx context.Context, params *SchemaDropParams) error SchemaGetExpired(ctx context.Context, params *SchemaGetExpiredParams) ([]string, error) @@ -730,19 +736,16 @@ type Row interface { Scan(dest ...any) error } -type Schema struct { - Name string -} - -type SchemaAttachParams struct { +type ReindexParams struct { + Index string Schema string } -type SchemaCreateParams struct { - Schema string +type Schema struct { + Name string } -type SchemaDetachParams struct { +type SchemaCreateParams struct { Schema string } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 32bd4ea1..70e8ee57 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -814,6 +814,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd return e.dbtx.QueryRowContext(ctx, sql, args...) } +func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error { + var maybeSchema string + if params.Schema != "" { + maybeSchema = params.Schema + "." + } + + _, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + return interpretError(err) +} + func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error { _, err := e.dbtx.ExecContext(ctx, "CREATE SCHEMA "+params.Schema) return interpretError(err) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 3173f909..6d87e7d1 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -804,6 +804,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd return e.dbtx.QueryRow(ctx, sql, args...) } +func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error { + var maybeSchema string + if params.Schema != "" { + maybeSchema = params.Schema + "." + } + + _, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + return interpretError(err) +} + func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error { _, err := e.dbtx.Exec(ctx, "CREATE SCHEMA "+params.Schema) return interpretError(err) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1816fd0c..f6ed09a0 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1131,6 +1131,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd return e.dbtx.QueryRowContext(ctx, sql, args...) } +func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error { + var maybeSchema string + if params.Schema != "" { + maybeSchema = params.Schema + "." + } + + _, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+params.Index) + return interpretError(err) +} + const sqliteTestDir = "./sqlite" func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error {