diff --git a/client.go b/client.go index 3d7a8155..2277068d 100644 --- a/client.go +++ b/client.go @@ -909,8 +909,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // available, the client appears to have started even though it's completely // non-functional. Here we try to make an initial assessment of health and // return quickly in case of an apparent problem. - _, err := c.driver.GetExecutor().Exec(fetchCtx, "SELECT 1") - if err != nil { + if err := c.driver.GetExecutor().Exec(fetchCtx, "SELECT 1"); err != nil { return fmt.Errorf("error making initial connection to database: %w", err) } diff --git a/cmd/river/riverbench/river_bench.go b/cmd/river/riverbench/river_bench.go index aa2e1863..e5a0d868 100644 --- a/cmd/river/riverbench/river_bench.go +++ b/cmd/river/riverbench/river_bench.go @@ -486,14 +486,14 @@ func (b *Benchmarker[TTx]) resetJobsTable(ctx context.Context) error { switch b.driver.DatabaseName() { case "postgres": - if _, err := b.driver.GetExecutor().Exec(ctx, "VACUUM FULL river_job"); err != nil { + if err := b.driver.GetExecutor().Exec(ctx, "VACUUM FULL river_job"); err != nil { return fmt.Errorf("error vacuuming: %w", err) } case "sqlite": // SQLite doesn't support `VACUUM FULL`, nor does it support vacuuming // on a per-table basis. `VACUUM` vacuums the entire schema, which is // okay in this case. - if _, err := b.driver.GetExecutor().Exec(ctx, "VACUUM"); err != nil { + if err := b.driver.GetExecutor().Exec(ctx, "VACUUM"); err != nil { return fmt.Errorf("error vacuuming: %w", err) } default: diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 9c305e7a..810a203f 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -459,8 +459,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) - _, err := exec.Exec(ctx, "SELECT 1 + 2") - require.NoError(t, err) + require.NoError(t, exec.Exec(ctx, "SELECT 1 + 2")) }) t.Run("WithArgs", func(t *testing.T) { @@ -468,8 +467,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, _ := setup(ctx, t) - _, err := exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar") - require.NoError(t, err) + require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar")) }) }) diff --git a/internal/util/dbutil/db_util_test.go b/internal/util/dbutil/db_util_test.go index 7f16404a..2c9f9fa3 100644 --- a/internal/util/dbutil/db_util_test.go +++ b/internal/util/dbutil/db_util_test.go @@ -20,9 +20,7 @@ func TestWithTx(t *testing.T) { driver := riverpgxv5.New(nil) err := dbutil.WithTx(ctx, driver.UnwrapExecutor(tx), func(ctx context.Context, execTx riverdriver.ExecutorTx) error { - _, err := execTx.Exec(ctx, "SELECT 1") - require.NoError(t, err) - + require.NoError(t, execTx.Exec(ctx, "SELECT 1")) return nil }) require.NoError(t, err) @@ -36,9 +34,7 @@ func TestWithTxV(t *testing.T) { driver := riverpgxv5.New(nil) ret, err := dbutil.WithTxV(ctx, driver.UnwrapExecutor(tx), func(ctx context.Context, execTx riverdriver.ExecutorTx) (int, error) { - _, err := execTx.Exec(ctx, "SELECT 1") - require.NoError(t, err) - + require.NoError(t, execTx.Exec(ctx, "SELECT 1")) return 7, nil }) require.NoError(t, err) diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index b84b06a2..c5d4bb86 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -181,7 +181,7 @@ type Executor interface { ColumnExists(ctx context.Context, params *ColumnExistsParams) (bool, error) // Exec executes raw SQL. Used for migrations. - Exec(ctx context.Context, sql string, args ...any) (struct{}, error) + Exec(ctx context.Context, sql string, args ...any) error JobCancel(ctx context.Context, params *JobCancelParams) (*rivertype.JobRow, error) JobCountByState(ctx context.Context, params *JobCountByStateParams) (int, error) diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 70e8ee57..6313b98c 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -134,9 +134,9 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE return exists, interpretError(err) } -func (e *Executor) Exec(ctx context.Context, sql string, args ...any) (struct{}, error) { +func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error { _, err := e.dbtx.ExecContext(ctx, sql, args...) - return struct{}{}, interpretError(err) + return interpretError(err) } func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { @@ -909,8 +909,7 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro } nextSavepointNum := t.savepointNum + 1 - _, err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum)) - if err != nil { + if err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum)); err != nil { return nil, err } @@ -926,8 +925,7 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error { // Release destroys a savepoint, keeping all the effects of commands that // were run within it (so it's effectively COMMIT for savepoints). - _, err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum)) - if err != nil { + if err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum)); err != nil { return err } @@ -941,8 +939,7 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return errors.New("tx is closed") // mirrors pgx's behavior for this condition } - _, err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum)) - if err != nil { + if err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum)); err != nil { return err } diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 6d87e7d1..f816b855 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -143,9 +143,9 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE return exists, interpretError(err) } -func (e *Executor) Exec(ctx context.Context, sql string, args ...any) (struct{}, error) { +func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error { _, err := e.dbtx.Exec(ctx, sql, args...) - return struct{}{}, interpretError(err) + return interpretError(err) } func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index f6ed09a0..138c9e7d 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -182,9 +182,9 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE return exists > 0, nil } -func (e *Executor) Exec(ctx context.Context, sql string, args ...any) (struct{}, error) { +func (e *Executor) Exec(ctx context.Context, sql string, args ...any) error { _, err := e.dbtx.ExecContext(ctx, sql, args...) - return struct{}{}, interpretError(err) + return interpretError(err) } func (e *Executor) JobCancel(ctx context.Context, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { @@ -1256,8 +1256,7 @@ func (t *ExecutorSubTx) Begin(ctx context.Context) (riverdriver.ExecutorTx, erro } nextSavepointNum := t.savepointNum + 1 - _, err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum)) - if err != nil { + if err := t.Exec(ctx, fmt.Sprintf("SAVEPOINT %s%02d", savepointPrefix, nextSavepointNum)); err != nil { return nil, err } @@ -1276,8 +1275,7 @@ func (t *ExecutorSubTx) Commit(ctx context.Context) error { // Release destroys a savepoint, keeping all the effects of commands that // were run within it (so it's effectively COMMIT for savepoints). - _, err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum)) - if err != nil { + if err := t.Exec(ctx, fmt.Sprintf("RELEASE %s%02d", savepointPrefix, t.savepointNum)); err != nil { return err } @@ -1291,8 +1289,7 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return errors.New("tx is closed") // mirrors pgx's behavior for this condition } - _, err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum)) - if err != nil { + if err := t.Exec(ctx, fmt.Sprintf("ROLLBACK TO %s%02d", savepointPrefix, t.savepointNum)); err != nil { return err } diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index fb70afc1..503e8b5d 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -602,8 +602,7 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex // a commit on a preexisting operation (such as adding an enum value to be // used in an immutable function) cannot succeed. err := dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error { - _, err := exec.Exec(ctx, sql) - if err != nil { + if err := exec.Exec(ctx, sql); err != nil { return fmt.Errorf("error applying version %03d [%s]: %w", versionBundle.Version, strings.ToUpper(string(direction)), err) } diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 276834a4..d9588c46 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -972,8 +972,7 @@ func buildTestMigrationsBundle(t *testing.T) *testMigrationsBundle { // continue to use the original transaction. func dbExecError(ctx context.Context, exec riverdriver.Executor, sql string) error { return dbutil.WithTx(ctx, exec, func(ctx context.Context, exec riverdriver.ExecutorTx) error { - _, err := exec.Exec(ctx, sql) - return err + return exec.Exec(ctx, sql) }) }