Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// available, the client appears to have started even though it's completely
// non-functional. Here we try to make an initial assessment of health and
// return quickly in case of an apparent problem.
_, err := c.driver.GetExecutor().Exec(fetchCtx, "SELECT 1")
if err != nil {
if err := c.driver.GetExecutor().Exec(fetchCtx, "SELECT 1"); err != nil {
return fmt.Errorf("error making initial connection to database: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/river/riverbench/river_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,14 @@ func (b *Benchmarker[TTx]) resetJobsTable(ctx context.Context) error {

switch b.driver.DatabaseName() {
case "postgres":
if _, err := b.driver.GetExecutor().Exec(ctx, "VACUUM FULL river_job"); err != nil {
if err := b.driver.GetExecutor().Exec(ctx, "VACUUM FULL river_job"); err != nil {
return fmt.Errorf("error vacuuming: %w", err)
}
case "sqlite":
// SQLite doesn't support `VACUUM FULL`, nor does it support vacuuming
// on a per-table basis. `VACUUM` vacuums the entire schema, which is
// okay in this case.
if _, err := b.driver.GetExecutor().Exec(ctx, "VACUUM"); err != nil {
if err := b.driver.GetExecutor().Exec(ctx, "VACUUM"); err != nil {
return fmt.Errorf("error vacuuming: %w", err)
}
default:
Expand Down
6 changes: 2 additions & 4 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,15 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

exec, _ := setup(ctx, t)

_, err := exec.Exec(ctx, "SELECT 1 + 2")
require.NoError(t, err)
require.NoError(t, exec.Exec(ctx, "SELECT 1 + 2"))
})

t.Run("WithArgs", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

_, err := exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar")
require.NoError(t, err)
require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
})
})

Expand Down
8 changes: 2 additions & 6 deletions internal/util/dbutil/db_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ func TestWithTx(t *testing.T) {
driver := riverpgxv5.New(nil)

err := dbutil.WithTx(ctx, driver.UnwrapExecutor(tx), func(ctx context.Context, execTx riverdriver.ExecutorTx) error {
_, err := execTx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

require.NoError(t, execTx.Exec(ctx, "SELECT 1"))
return nil
})
require.NoError(t, err)
Expand All @@ -36,9 +34,7 @@ func TestWithTxV(t *testing.T) {
driver := riverpgxv5.New(nil)

ret, err := dbutil.WithTxV(ctx, driver.UnwrapExecutor(tx), func(ctx context.Context, execTx riverdriver.ExecutorTx) (int, error) {
_, err := execTx.Exec(ctx, "SELECT 1")
require.NoError(t, err)

require.NoError(t, execTx.Exec(ctx, "SELECT 1"))
return 7, nil
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ type Executor interface {
ColumnExists(ctx context.Context, params *ColumnExistsParams) (bool, error)

// Exec executes raw SQL. Used for migrations.
Exec(ctx context.Context, sql string, args ...any) (struct{}, error)
Exec(ctx context.Context, sql string, args ...any) error

JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error)
JobCountByState(ctx context.Context, params *JobCountByStateParams) (int, error)
Expand Down
13 changes: 5 additions & 8 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE
return exists, interpretError(err)
}

func (e *Executor) Exec(ctx context.Context, sql string, args ...any) (struct{}, error) {
func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error {
_, err := e.dbtx.ExecContext(ctx, sql, args...)
return struct{}{}, interpretError(err)
return interpretError(err)
}

func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
Expand Down Expand Up @@ -909,8 +909,7 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro
}

nextSavepointNum := t.savepointNum + 1
_, err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum))
if err != nil {
if err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum)); err != nil {
return nil, err
}

Expand All @@ -926,8 +925,7 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error {

// Release destroys a savepoint, keeping all the effects of commands that
// were run within it (so it's effectively COMMIT for savepoints).
_, err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum))
if err != nil {
if err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum)); err != nil {
return err
}

Expand All @@ -941,8 +939,7 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
return errors.New("tx is closed") // mirrors pgx's behavior for this condition
}

_, err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum))
if err != nil {
if err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum)); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE
return exists, interpretError(err)
}

func (e *Executor) Exec(ctx context.Context, sql string, args ...any) (struct{}, error) {
func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error {
_, err := e.dbtx.Exec(ctx, sql, args...)
return struct{}{}, interpretError(err)
return interpretError(err)
}

func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
Expand Down
13 changes: 5 additions & 8 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE
return exists > 0, nil
}

func (e *Executor) Exec(ctx context.Context, sql string, args ...any) (struct{}, error) {
func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error {
_, err := e.dbtx.ExecContext(ctx, sql, args...)
return struct{}{}, interpretError(err)
return interpretError(err)
}

func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
Expand Down Expand Up @@ -1256,8 +1256,7 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro
}

nextSavepointNum := t.savepointNum + 1
_, err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum))
if err != nil {
if err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum)); err != nil {
return nil, err
}

Expand All @@ -1276,8 +1275,7 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error {

// Release destroys a savepoint, keeping all the effects of commands that
// were run within it (so it's effectively COMMIT for savepoints).
_, err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum))
if err != nil {
if err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum)); err != nil {
return err
}

Expand All @@ -1291,8 +1289,7 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error {
return errors.New("tx is closed") // mirrors pgx's behavior for this condition
}

_, err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum))
if err != nil {
if err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum)); err != nil {
return err
}

Expand Down
3 changes: 1 addition & 2 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,7 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex
// a commit on a preexisting operation (such as adding an enum value to be
// used in an immutable function) cannot succeed.
err := dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error {
_, err := exec.Exec(ctx, sql)
if err != nil {
if err := exec.Exec(ctx, sql); err != nil {
return fmt.Errorf("error applying version %03d [%s]: %w",
versionBundle.Version, strings.ToUpper(string(direction)), err)
}
Expand Down
3 changes: 1 addition & 2 deletions rivermigrate/river_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,7 @@ func buildTestMigrationsBundle(t *testing.T) *testMigrationsBundle {
// continue to use the original transaction.
func dbExecError(ctx context.Context, exec riverdriver.Executor, sql string) error {
return dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error {
_, err := exec.Exec(ctx, sql)
return err
return exec.Exec(ctx, sql)
})
}

Expand Down