Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,7 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) error {
ctx, cancel := context.WithTimeout(ctx, s.Config.Timeout)
defer cancel()

var maybeSchema string
if s.Config.Schema != "" {
maybeSchema = s.Config.Schema + "."
}

_, err := s.exec.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+indexName)
if err != nil {
if err := s.exec.Reindex(ctx, &riverdriver.ReindexParams{Schema: s.Config.Schema, Index: indexName}); err != nil {
return err
}

Expand Down
14 changes: 14 additions & 0 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3826,6 +3826,20 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
require.Equal(t, "foo", fieldFoo)
})

t.Run("Reindex", func(t *testing.T) {
t.Parallel()

// Postgres runs the reindex with `CONCURRENTLY` so this must use a full
// schema rather than a transaction block.
driver, schema := driverWithSchema(ctx, t, nil)

err := driver.GetExecutor().Reindex(ctx, &riverdriver.ReindexParams{
Index: "river_job_kind",
Schema: schema,
})
require.NoError(t, err)
})

t.Run("SchemaGetExpired", func(t *testing.T) {
t.Parallel()

Expand Down
19 changes: 11 additions & 8 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ type Executor interface {
QueueResume(ctx context.Context, params *QueueResumeParams) error
QueueUpdate(ctx context.Context, params *QueueUpdateParams) (*rivertype.Queue, error)
QueryRow(ctx context.Context, sql string, args ...any) Row

// Reindex reindexes a database index. This abstraction is a little leaky
// right now because Postgres runs this `CONCURRENTLY` and that's not
// possible in SQLite.
Reindex(ctx context.Context, params *ReindexParams) error

SchemaCreate(ctx context.Context, params *SchemaCreateParams) error
SchemaDrop(ctx context.Context, params *SchemaDropParams) error
SchemaGetExpired(ctx context.Context, params *SchemaGetExpiredParams) ([]string, error)
Expand Down Expand Up @@ -730,19 +736,16 @@ type Row interface {
Scan(dest ...any) error
}

type Schema struct {
Name string
}

type SchemaAttachParams struct {
type ReindexParams struct {
Index string
Schema string
}

type SchemaCreateParams struct {
Schema string
type Schema struct {
Name string
}

type SchemaDetachParams struct {
type SchemaCreateParams struct {
Schema string
}

Expand Down
10 changes: 10 additions & 0 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd
return e.dbtx.QueryRowContext(ctx, sql, args...)
}

func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error {
var maybeSchema string
if params.Schema != "" {
maybeSchema = params.Schema + "."
}

_, err := e.dbtx.ExecContext(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index)
return interpretError(err)
}

func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error {
_, err := e.dbtx.ExecContext(ctx, "CREATE SCHEMA "+params.Schema)
return interpretError(err)
Expand Down
10 changes: 10 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd
return e.dbtx.QueryRow(ctx, sql, args...)
}

func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error {
var maybeSchema string
if params.Schema != "" {
maybeSchema = params.Schema + "."
}

_, err := e.dbtx.Exec(ctx, "REINDEX INDEX CONCURRENTLY "+maybeSchema+params.Index)
return interpretError(err)
}

func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error {
_, err := e.dbtx.Exec(ctx, "CREATE SCHEMA "+params.Schema)
return interpretError(err)
Expand Down
10 changes: 10 additions & 0 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,16 @@ func (e *Executor) QueryRow(ctx context.Context, sql string, args ...any) riverd
return e.dbtx.QueryRowContext(ctx, sql, args...)
}

func (e *Executor) Reindex(ctx context.Context, params *riverdriver.ReindexParams) error {
var maybeSchema string
if params.Schema != "" {
maybeSchema = params.Schema + "."
}

_, err := e.dbtx.ExecContext(ctx, "REINDEX "+maybeSchema+params.Index)
return interpretError(err)
}

const sqliteTestDir = "./sqlite"

func (e *Executor) SchemaCreate(ctx context.Context, params *riverdriver.SchemaCreateParams) error {
Expand Down
Loading