From e44ebc6857a6d472ba520bde057175e614d2ab6e Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 20 May 2025 19:20:04 -0700 Subject: [PATCH] Push `REINDEX` down into driver so reindexer works on SQLite I realized when #909 that the reindexer isn't currently compatible with the SQLite driver because SQLite's `REINDEX` syntax is a little different, and it doesn't support the `CONCURRENTLY` keyword. The reindexer runs once an hour so nothing caught the problem. Here, push the reindex command down into the driver and add a test for it. This is a good time to do this since we're changing the driver interface by the necessity for the next version anyway so we're not breaking anything that's not already broken. One outstanding issue is that SQLite can't reindex concurrently so the reindexer would fully lock the database while it's reindexing the configured indexes. There's nothing that can be done about this for now, so we might have to recommend that users running a larger database disable the reindexer if they're using SQLite. --- internal/maintenance/reindexer.go | 8 +------- .../riverdrivertest/riverdrivertest.go | 14 ++++++++++++++ riverdriver/river_driver_interface.go | 19 +++++++++++-------- .../river_database_sql_driver.go | 10 ++++++++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 10 ++++++++++ .../riversqlite/river_sqlite_driver.go | 10 ++++++++++ 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/internal/maintenance/reindexer.go b/internal/maintenance/reindexer.go index 9f868f8b..5e379bb3 100644 --- a/internal/maintenance/reindexer.go +++ b/internal/maintenance/reindexer.go @@ -162,13 +162,7 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) error { ctx, cancel := context.WithTimeout(ctx, s.Config.Timeout) defer cancel() - var maybeSchema string - if s.Config.Schema != "" { - maybeSchema = s.Config.Schema + "." - } - - _, err := s.exec.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+indexName) - if err != nil { + if err := s.exec.Reindex(ctx, &riverdriver.ReindexParams{Schema: s.Config.Schema, Index: indexName}); err != nil { return err } diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 3a862802..9c305e7a 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -3826,6 +3826,20 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, "foo", fieldFoo) }) + t.Run("Reindex", func(t *testing.T) { + t.Parallel() + + // Postgres runs the reindex with `CONCURRENTLY` so this must use a full + // schema rather than a transaction block. + driver, schema := driverWithSchema(ctx, t, nil) + + err := driver.GetExecutor().Reindex(ctx, &riverdriver.ReindexParams{ + Index: "river_job_kind", + Schema: schema, + }) + require.NoError(t, err) + }) + t.Run("SchemaGetExpired", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 6bfb8f27..b84b06a2 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -245,6 +245,12 @@ type Executor interface { QueueResume(ctx context.Context, params *QueueResumeParams) error QueueUpdate(ctx context.Context, params *QueueUpdateParams) (*rivertype.Queue, error) QueryRow(ctx context.Context, sql string, args ...any) Row + + // Reindex reindexes a database index. This abstraction is a little leaky + // right now because Postgres runs this `CONCURRENTLY` and that's not + // possible in SQLite. + Reindex(ctx context.Context, params *ReindexParams) error + SchemaCreate(ctx context.Context, params *SchemaCreateParams) error SchemaDrop(ctx context.Context, params *SchemaDropParams) error SchemaGetExpired(ctx context.Context, params *SchemaGetExpiredParams) ([]string, error) @@ -730,19 +736,16 @@ type Row interface { Scan(dest ...any) error } -type Schema struct { - Name string -} - -type SchemaAttachParams struct { +type ReindexParams struct { + Index string Schema string } -type SchemaCreateParams struct { - Schema string +type Schema struct { + Name string } -type SchemaDetachParams struct { +type SchemaCreateParams struct { Schema string } diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 32bd4ea1..70e8ee57 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -814,6 +814,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd return e.dbtx.QueryRowContext(ctx, sql, args...) } +func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error { + var maybeSchema string + if params.Schema != "" { + maybeSchema = params.Schema + "." + } + + _, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + return interpretError(err) +} + func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error { _, err := e.dbtx.ExecContext(ctx, "CREATE SCHEMA "+params.Schema) return interpretError(err) diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 3173f909..6d87e7d1 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -804,6 +804,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd return e.dbtx.QueryRow(ctx, sql, args...) } +func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error { + var maybeSchema string + if params.Schema != "" { + maybeSchema = params.Schema + "." + } + + _, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index) + return interpretError(err) +} + func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error { _, err := e.dbtx.Exec(ctx, "CREATE SCHEMA "+params.Schema) return interpretError(err) diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 1816fd0c..f6ed09a0 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1131,6 +1131,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd return e.dbtx.QueryRowContext(ctx, sql, args...) } +func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error { + var maybeSchema string + if params.Schema != "" { + maybeSchema = params.Schema + "." + } + + _, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+params.Index) + return interpretError(err) +} + const sqliteTestDir = "./sqlite" func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error {