diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a9669af..8e33948c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Preliminary River driver for SQLite (`riverdriver/riversqlite`). This driver seems to produce good results as judged by the test suite, but so far has minimal real world vetting. Try it and let us know how it works out. [PR #870](https://github.com/riverqueue/river/pull/870). - CLI `river migrate-get` now takes a `--schema` option to inject a custom schema into dumped migrations and schema comments are hidden if `--schema` option isn't provided. [PR #903](https://github.com/riverqueue/river/pull/903). - Added `riverlog.NewMiddlewareCustomContext` that makes the use of `riverlog` job-persisted logging possible with non-slog loggers. [PR #919](https://github.com/riverqueue/river/pull/919). +- Added `RequireInsertedOpts.Schema`, allowing an explicit schema to be set when asserting on job inserts with `rivertest`. [PR #926](https://github.com/riverqueue/river/pull/926). ### Changed diff --git a/rivertest/example_require_inserted_test.go b/rivertest/example_require_inserted_test.go index 28be7a14..7bb9d2e9 100644 --- a/rivertest/example_require_inserted_test.go +++ b/rivertest/example_require_inserted_test.go @@ -43,7 +43,10 @@ func Example_requireInserted() { workers := river.NewWorkers() river.AddWorker(workers, &RequiredWorker{}) - schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil) + var ( + schema = riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil) + schemaOpts = &rivertest.RequireInsertedOpts{Schema: schema} + ) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), @@ -72,12 +75,7 @@ func Example_requireInserted() { // *testing.T that comes from a test's argument. t := &testing.T{} - // This is needed because rivertest does not yet support an injected schema. - if _, err := tx.Exec(ctx, "SET search_path TO "+schema); err != nil { - panic(err) - } - - job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil) + job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, schemaOpts) fmt.Printf("Test passed with message: %s\n", job.Args.Message) // Verify the same job again, and this time that it was inserted at the @@ -85,19 +83,15 @@ func Example_requireInserted() { _ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{ Priority: 1, Queue: river.QueueDefault, + Schema: schema, }) - // Due to some refactoring to make schemas injectable, we don't yet have a - // way of injecting a schema at the pool level. The rivertest API will need - // to be expanded to allow it. - /* - // Insert and verify one on a pool instead of transaction. - _, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil) - if err != nil { - panic(err) - } - _ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil) - */ + // Insert and verify one on a pool instead of transaction. + _, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil) + if err != nil { + panic(err) + } + _ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, schemaOpts) // Output: // Test passed with message: Hello. diff --git a/rivertest/example_require_many_inserted_test.go b/rivertest/example_require_many_inserted_test.go index d295595c..7c193410 100644 --- a/rivertest/example_require_many_inserted_test.go +++ b/rivertest/example_require_many_inserted_test.go @@ -61,7 +61,10 @@ func Example_requireManyInserted() { river.AddWorker(workers, &FirstRequiredWorker{}) river.AddWorker(workers, &SecondRequiredWorker{}) - schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil) + var ( + schema = riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil) + schemaOpts = &rivertest.RequireInsertedOpts{Schema: schema} + ) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), @@ -97,13 +100,8 @@ func Example_requireManyInserted() { // *testing.T that comes from a test's argument. t := &testing.T{} - // This is needed because rivertest does not yet support an injected schema. - if _, err := tx.Exec(ctx, "SET search_path TO "+schema); err != nil { - panic(err) - } - jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{ - {Args: &FirstRequiredArgs{}}, + {Args: &FirstRequiredArgs{}, Opts: schemaOpts}, {Args: &SecondRequiredArgs{}}, {Args: &FirstRequiredArgs{}}, }) @@ -117,22 +115,18 @@ func Example_requireManyInserted() { {Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{ Priority: 1, Queue: river.QueueDefault, + Schema: schema, }}, }) - // Due to some refactoring to make schemas injectable, we don't yet have a - // way of injecting a schema at the pool level. The rivertest API will need - // to be expanded to allow it. - /* - // Insert and verify one on a pool instead of transaction. - _, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil) - if err != nil { - panic(err) - } - _ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{ - {Args: &FirstRequiredArgs{}}, - }) - */ + // Insert and verify one on a pool instead of transaction. + _, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil) + if err != nil { + panic(err) + } + _ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{ + {Args: &FirstRequiredArgs{}, Opts: schemaOpts}, + }) // Output: // Job 0 args: {"message": "Hello from first."} diff --git a/rivertest/rivertest.go b/rivertest/rivertest.go index 5170534f..33f9d864 100644 --- a/rivertest/rivertest.go +++ b/rivertest/rivertest.go @@ -18,10 +18,6 @@ import ( "github.com/riverqueue/river/rivertype" ) -// A placeholder for empty schema placeholders that'll need to be fixed to -// something better at some point. -const emptySchema = "" - // testingT is an interface wrapper around *testing.T that's implemented by all // of *testing.T, *testing.F, and *testing.B. // @@ -72,6 +68,13 @@ type RequireInsertedOpts struct { // No assertion is made if left the zero value. ScheduledAt time.Time + // Schema is a non-standard Schema where River tables are located. All table + // references in assertion queries will use this value as a prefix. + // + // Defaults to empty, which causes the client to look for tables using the + // setting of Postgres `search_path`. + Schema string + // State is the expected state of the inserted job. // // No assertion is made if left the zero value. @@ -101,12 +104,12 @@ type RequireInsertedOpts struct { // to cover that case instead. func RequireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { tb.Helper() - return requireInserted(ctx, tb, driver, emptySchema, expectedJob, opts) + return requireInserted(ctx, tb, driver, expectedJob, opts) } -func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, schema string, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { +func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { t.Helper() - actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetExecutor(), schema, expectedJob, opts) + actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJob, opts) if err != nil { failure(t, "Internal failure: %s", err) } @@ -131,7 +134,7 @@ func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobAr // to cover that case instead. func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { tb.Helper() - return requireInsertedTx[TDriver](ctx, tb, tx, emptySchema, expectedJob, opts) + return requireInsertedTx[TDriver](ctx, tb, tx, expectedJob, opts) } // Internal function used by the tests so that the exported version can take @@ -139,19 +142,24 @@ func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Job // // Also takes a schema for testing purposes, which I haven't quite figured out // how to get into the public API yet. -func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, schema string, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { +func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] { t.Helper() var driver TDriver - actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), schema, expectedJob, opts) + actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJob, opts) if err != nil { failure(t, "Internal failure: %s", err) } return actualArgs } -func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, schema string, expectedJob TArgs, opts *RequireInsertedOpts) (*river.Job[TArgs], error) { +func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, expectedJob TArgs, opts *RequireInsertedOpts) (*river.Job[TArgs], error) { t.Helper() + var schema string + if opts != nil { + schema = opts.Schema + } + // Returned ordered by ID. jobRows, err := exec.JobGetByKindMany(ctx, &riverdriver.JobGetByKindManyParams{ Kind: []string{expectedJob.Kind()}, @@ -205,12 +213,12 @@ func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo // the given opts. func RequireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) { tb.Helper() - requireNotInserted(ctx, tb, driver, emptySchema, expectedJob, opts) + requireNotInserted(ctx, tb, driver, expectedJob, opts) } -func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, schema string, expectedJob TArgs, opts *RequireInsertedOpts) { +func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) { t.Helper() - err := requireNotInsertedErr[TDriver](ctx, t, driver.GetExecutor(), schema, expectedJob, opts) + err := requireNotInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJob, opts) if err != nil { failure(t, "Internal failure: %s", err) } @@ -234,7 +242,7 @@ func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo // the given opts. func RequireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) { tb.Helper() - requireNotInsertedTx[TDriver](ctx, tb, tx, emptySchema, expectedJob, opts) + requireNotInsertedTx[TDriver](ctx, tb, tx, expectedJob, opts) } // Internal function used by the tests so that the exported version can take @@ -242,18 +250,23 @@ func RequireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river. // // Also takes a schema for testing purposes, which I haven't quite figured out // how to get into the public API yet. -func requireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, schema string, expectedJob TArgs, opts *RequireInsertedOpts) { +func requireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) { t.Helper() var driver TDriver - err := requireNotInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), schema, expectedJob, opts) + err := requireNotInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJob, opts) if err != nil { failure(t, "Internal failure: %s", err) } } -func requireNotInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, schema string, expectedJob TArgs, opts *RequireInsertedOpts) error { +func requireNotInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, expectedJob TArgs, opts *RequireInsertedOpts) error { t.Helper() + var schema string + if opts != nil { + schema = opts.Schema + } + // Returned ordered by ID. jobRows, err := exec.JobGetByKindMany(ctx, &riverdriver.JobGetByKindManyParams{ Kind: []string{expectedJob.Kind()}, @@ -322,14 +335,20 @@ type ExpectedJob struct { // the number specified, and will fail in case this expectation isn't met. So if // a job of a certain kind is emitted multiple times, it must be expected // multiple times. +// +// If RequireInsertedOpts.Schema is used, it may be set only in the first +// expectation's options (and all expectations will use that schema), or the +// same schema may be set in every expectation. Setting a non-empty schema after +// the first expectation if the first's was empty is not allowed, and neither is +// mixing and matching schemas between options. func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow { tb.Helper() - return requireManyInserted(ctx, tb, driver, string(emptySchema), expectedJobs) + return requireManyInserted(ctx, tb, driver, expectedJobs) } -func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, schema string, expectedJobs []ExpectedJob) []*rivertype.JobRow { +func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow { t.Helper() - actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetExecutor(), schema, expectedJobs) + actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJobs) if err != nil { failure(t, "Internal failure: %s", err) } @@ -356,9 +375,15 @@ func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.C // the number specified, and will fail in case this expectation isn't met. So if // a job of a certain kind is emitted multiple times, it must be expected // multiple times. +// +// If RequireInsertedOpts.Schema is used, it may be set only in the first +// expectation's options (and all expectations will use that schema), or the +// same schema may be set in every expectation. Setting a non-empty schema after +// the first expectation if the first's was empty is not allowed, and neither is +// mixing and matching schemas between options. func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow { tb.Helper() - return requireManyInsertedTx[TDriver](ctx, tb, tx, emptySchema, expectedJobs) + return requireManyInsertedTx[TDriver](ctx, tb, tx, expectedJobs) } // Internal function used by the tests so that the exported version can take @@ -366,21 +391,45 @@ func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context // // Also takes a schema for testing purposes, which I haven't quite figured out // how to get into the public API yet. -func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, schema string, expectedJobs []ExpectedJob) []*rivertype.JobRow { +func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow { t.Helper() var driver TDriver - actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), schema, expectedJobs) + actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJobs) if err != nil { failure(t, "Internal failure: %s", err) } return actualArgs } -func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, exec riverdriver.Executor, schema string, expectedJobs []ExpectedJob) ([]*rivertype.JobRow, error) { +func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, exec riverdriver.Executor, expectedJobs []ExpectedJob) ([]*rivertype.JobRow, error) { t.Helper() expectedArgsKinds := sliceutil.Map(expectedJobs, func(j ExpectedJob) string { return j.Args.Kind() }) + var schema string + if len(expectedJobs) > 0 && expectedJobs[0].Opts != nil { + schema = expectedJobs[0].Opts.Schema + } + + // For simplicity (and because I can't think of any reason anyone would need + // to do otherwise), require that if an explicit schema is being set that + // it's the same explicit schema for all options. Callers may specify the + // schema only once in the first expectation's options, or specify the same + // schema for all expectations' options, but they're not allowed to set a + // schema after the first expectation's options if it wasn't set in the + // first, and not allowed to mix and match schemas between options. + for i, expectedJob := range expectedJobs { + if opts := expectedJob.Opts; opts != nil { + if schema == "" && opts.Schema != "" || + schema != "" && opts.Schema != "" && schema != opts.Schema { + return nil, fmt.Errorf( + "when setting RequireInsertedOpts.Schema with RequireMany schema should be set only at index 0 or the same schema set for all options; "+ + "expectedJobs[0].Opts.Schema = %q, expectedJobs[%d].Opts.Schema = %q", + schema, i, opts.Schema) + } + } + } + // Returned ordered by ID. jobRows, err := exec.JobGetByKindMany(ctx, &riverdriver.JobGetByKindManyParams{ Kind: expectedArgsKinds, diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index 645c7b9a..e801e235 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -40,10 +40,11 @@ func TestRequireInserted(t *testing.T) { ctx := context.Background() type testBundle struct { - dbPool *pgxpool.Pool - driver *riverpgxv5.Driver - mockT *testutil.MockT - schema string + dbPool *pgxpool.Pool + driver *riverpgxv5.Driver + mockT *testutil.MockT + schema string + schemaOpts *RequireInsertedOpts } setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { @@ -61,10 +62,11 @@ func TestRequireInserted(t *testing.T) { require.NoError(t, err) return riverClient, &testBundle{ - dbPool: dbPool, - driver: driver, - mockT: testutil.NewMockT(t), - schema: schema, + dbPool: dbPool, + driver: driver, + mockT: testutil.NewMockT(t), + schema: schema, + schemaOpts: &RequireInsertedOpts{Schema: schema}, } } @@ -76,7 +78,7 @@ func TestRequireInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - job := requireInserted(ctx, t, bundle.driver, bundle.schema, &Job1Args{}, nil) + job := requireInserted(ctx, t, bundle.driver, &Job1Args{}, bundle.schemaOpts) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job.Args.String) }) @@ -112,7 +114,7 @@ func TestRequireInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - job := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, emptySchema, &Job1Args{}, nil) + job := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job.Args.String) }) @@ -128,11 +130,11 @@ func TestRequireInsertedTx(t *testing.T) { _, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil) require.NoError(t, err) - job1 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, emptySchema, &Job1Args{}, nil) + job1 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job1.Args.String) - job2 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, emptySchema, &Job2Args{}, nil) + job2 := requireInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job2Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, 123, job2.Args.Int) }) @@ -149,12 +151,12 @@ func TestRequireInsertedTx(t *testing.T) { require.NoError(t, err) // Visible in the original transaction. - job := requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job1Args{}, nil) + job := requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) require.Equal(t, "foo", job.Args.String) // Not visible in the second transaction. - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, emptySchema, &Job1Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) }) @@ -166,7 +168,7 @@ func TestRequireInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job2Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job2Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("No jobs found with kind: job2")+"\n", @@ -184,7 +186,7 @@ func TestRequireInsertedTx(t *testing.T) { }) require.NoError(t, err) - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job1Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("More than one job found with kind: job1 (you might want RequireManyInserted instead)")+"\n", @@ -199,7 +201,7 @@ func TestRequireInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil) require.NoError(t, err) - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job1Args{}, nil) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("No jobs found with kind: job1")+"\n", @@ -239,7 +241,7 @@ func TestRequireInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() opts.MaxAttempts = 77 - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' max attempts 78 not equal to expected 77")+"\n", @@ -250,7 +252,7 @@ func TestRequireInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() opts.Priority = 3 - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' priority 2 not equal to expected 3")+"\n", @@ -261,7 +263,7 @@ func TestRequireInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() opts.Queue = "wrong_queue" - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' queue 'another_queue' not equal to expected 'wrong_queue'")+"\n", @@ -272,7 +274,7 @@ func TestRequireInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() opts.ScheduledAt = testTime.Add(3*time.Minute + 23*time.Second + 123*time.Microsecond) - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' scheduled at 2023-10-30T10:45:23.000123Z not equal to expected 2023-10-30T10:48:46.000246Z")+"\n", @@ -283,7 +285,7 @@ func TestRequireInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() opts.State = rivertype.JobStateCancelled - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' state 'scheduled' not equal to expected 'cancelled'")+"\n", @@ -294,7 +296,7 @@ func TestRequireInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() opts.Tags = []string{"tag2"} - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' tags [tag1] not equal to expected [tag2]")+"\n", @@ -306,7 +308,7 @@ func TestRequireInsertedTx(t *testing.T) { opts := emptyOpts() opts.MaxAttempts = job.MaxAttempts opts.Priority = job.Priority - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.False(t, mockT.Failed, "Should have succeeded, but failed with: "+mockT.LogOutput()) }) @@ -315,7 +317,7 @@ func TestRequireInsertedTx(t *testing.T) { opts := sameOpts() opts.MaxAttempts = 77 opts.Priority = 3 - _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + _ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' max attempts 78 not equal to expected 77, priority 2 not equal to expected 3")+"\n", @@ -325,7 +327,7 @@ func TestRequireInsertedTx(t *testing.T) { t.Run("AllSameSucceeds", func(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() - requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.False(t, mockT.Failed) }) }) @@ -339,10 +341,11 @@ func TestRequireNotInserted(t *testing.T) { ctx := context.Background() type testBundle struct { - dbPool *pgxpool.Pool - driver *riverpgxv5.Driver - mockT *testutil.MockT - schema string + dbPool *pgxpool.Pool + driver *riverpgxv5.Driver + mockT *testutil.MockT + schema string + schemaOpts *RequireInsertedOpts } setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { @@ -360,10 +363,11 @@ func TestRequireNotInserted(t *testing.T) { require.NoError(t, err) return riverClient, &testBundle{ - dbPool: dbPool, - driver: driver, - mockT: testutil.NewMockT(t), - schema: schema, + dbPool: dbPool, + driver: driver, + mockT: testutil.NewMockT(t), + schema: schema, + schemaOpts: &RequireInsertedOpts{Schema: schema}, } } @@ -375,7 +379,7 @@ func TestRequireNotInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job2Args{Int: 123}, nil) require.NoError(t, err) - requireNotInserted(ctx, t, bundle.driver, bundle.schema, &Job1Args{}, nil) + requireNotInserted(ctx, t, bundle.driver, &Job1Args{}, bundle.schemaOpts) require.False(t, bundle.mockT.Failed) }) } @@ -410,7 +414,7 @@ func TestRequireNotInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil) require.NoError(t, err) - requireNotInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, emptySchema, &Job1Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) }) @@ -419,10 +423,10 @@ func TestRequireNotInsertedTx(t *testing.T) { _, bundle := setup(t) - requireNotInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, emptySchema, &Job1Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) - requireNotInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, emptySchema, &Job2Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, t, bundle.tx, &Job2Args{}, nil) require.False(t, bundle.mockT.Failed) }) @@ -438,11 +442,11 @@ func TestRequireNotInsertedTx(t *testing.T) { require.NoError(t, err) // Not visible in the second transaction. - requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, emptySchema, &Job1Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) // Visible in the original transaction. - requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job1Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) }) @@ -451,7 +455,7 @@ func TestRequireNotInsertedTx(t *testing.T) { _, bundle := setup(t) - requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job2Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job2Args{}, nil) require.False(t, bundle.mockT.Failed) }) @@ -466,7 +470,7 @@ func TestRequireNotInsertedTx(t *testing.T) { }) require.NoError(t, err) - requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job1Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.True(t, bundle.mockT.Failed) require.Equal(t, failureString("2 jobs found with kind, but expected to find none: job1")+"\n", @@ -481,7 +485,7 @@ func TestRequireNotInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil) require.NoError(t, err) - requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, &Job1Args{}, nil) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, &Job1Args{}, nil) require.False(t, bundle.mockT.Failed) }) @@ -518,7 +522,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.MaxAttempts = job.MaxAttempts - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' max attempts equal to excluded %d", job.MaxAttempts)+"\n", @@ -529,7 +533,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.Priority = job.Priority - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' priority equal to excluded %d", job.Priority)+"\n", @@ -540,7 +544,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.Queue = job.Queue - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' queue equal to excluded '%s'", job.Queue)+"\n", @@ -551,7 +555,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.ScheduledAt = job.ScheduledAt - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' scheduled at equal to excluded %s", opts.ScheduledAt.Format(rfc3339Micro))+"\n", @@ -562,7 +566,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.State = job.State - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' state equal to excluded '%s'", job.State)+"\n", @@ -573,7 +577,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.Tags = job.Tags - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' tags equal to excluded %+v", job.Tags)+"\n", @@ -585,7 +589,7 @@ func TestRequireNotInsertedTx(t *testing.T) { opts := emptyOpts() opts.MaxAttempts = job.MaxAttempts // one property matches job, but the other does not opts.Priority = 3 - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.False(t, mockT.Failed, "Should have succeeded, but failed with: "+mockT.LogOutput()) }) @@ -594,7 +598,7 @@ func TestRequireNotInsertedTx(t *testing.T) { opts := emptyOpts() opts.MaxAttempts = job.MaxAttempts opts.Priority = job.Priority - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' max attempts equal to excluded %d, priority equal to excluded %d", job.MaxAttempts, job.Priority)+"\n", @@ -604,7 +608,7 @@ func TestRequireNotInsertedTx(t *testing.T) { t.Run("AllSameFails", func(t *testing.T) { mockT := testutil.NewMockT(t) opts := sameOpts() - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' max attempts equal to excluded %d, priority equal to excluded %d, queue equal to excluded '%s', scheduled at equal to excluded %s, state equal to excluded '%s', tags equal to excluded %+v", job.MaxAttempts, job.Priority, job.Queue, job.ScheduledAt.Format(rfc3339Micro), job.State, job.Tags)+"\n", @@ -620,7 +624,7 @@ func TestRequireNotInsertedTx(t *testing.T) { mockT := testutil.NewMockT(t) opts := emptyOpts() opts.Priority = 3 - requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, &Job2Args{}, opts) + requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts) require.True(t, mockT.Failed) require.Equal(t, failureString("Job with kind 'job2' priority equal to excluded %d", 3)+"\n", @@ -637,10 +641,11 @@ func TestRequireManyInserted(t *testing.T) { ctx := context.Background() type testBundle struct { - dbPool *pgxpool.Pool - driver *riverpgxv5.Driver - mockT *testutil.MockT - schema string + dbPool *pgxpool.Pool + driver *riverpgxv5.Driver + mockT *testutil.MockT + schema string + schemaOpts *RequireInsertedOpts } setup := func(t *testing.T) (*river.Client[pgx.Tx], *testBundle) { @@ -658,10 +663,11 @@ func TestRequireManyInserted(t *testing.T) { require.NoError(t, err) return riverClient, &testBundle{ - dbPool: dbPool, - driver: driver, - mockT: testutil.NewMockT(t), - schema: schema, + dbPool: dbPool, + driver: driver, + mockT: testutil.NewMockT(t), + schema: schema, + schemaOpts: &RequireInsertedOpts{Schema: schema}, } } @@ -673,12 +679,76 @@ func TestRequireManyInserted(t *testing.T) { _, err := riverClient.Insert(ctx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - jobs := requireManyInserted(ctx, bundle.mockT, bundle.driver, bundle.schema, []ExpectedJob{ - {Args: &Job1Args{}}, + jobs := requireManyInserted(ctx, bundle.mockT, bundle.driver, []ExpectedJob{ + {Args: &Job1Args{}, Opts: bundle.schemaOpts}, }) require.False(t, bundle.mockT.Failed) require.Equal(t, "job1", jobs[0].Kind) }) + + t.Run("SchemaInAllOptsOkay", func(t *testing.T) { + t.Parallel() + + riverClient, bundle := setup(t) + + _, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ + {Args: Job1Args{String: "foo"}}, + {Args: Job1Args{String: "bar"}}, + }) + require.NoError(t, err) + + jobs := requireManyInserted(ctx, bundle.mockT, bundle.driver, []ExpectedJob{ + {Args: &Job1Args{String: "foo"}, Opts: bundle.schemaOpts}, + {Args: &Job1Args{String: "bar"}, Opts: bundle.schemaOpts}, + }) + require.False(t, bundle.mockT.Failed) + require.Equal(t, "job1", jobs[0].Kind) + }) + + t.Run("SchemaOnlyInFirstOptsPositionOkay", func(t *testing.T) { + t.Parallel() + + riverClient, bundle := setup(t) + + _, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ + {Args: Job1Args{String: "foo"}}, + {Args: Job1Args{String: "bar"}}, + }) + require.NoError(t, err) + + jobs := requireManyInserted(ctx, bundle.mockT, bundle.driver, []ExpectedJob{ + {Args: &Job1Args{String: "foo"}, Opts: bundle.schemaOpts}, + {Args: &Job1Args{String: "bar"}}, + }) + require.False(t, bundle.mockT.Failed) + require.Equal(t, "job1", jobs[0].Kind) + }) + + t.Run("SchemaBeyondFirstPositionError", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + _ = requireManyInserted(ctx, bundle.mockT, bundle.driver, []ExpectedJob{ + {Args: &Job1Args{}, Opts: &RequireInsertedOpts{}}, + {Args: &Job2Args{}, Opts: &RequireInsertedOpts{Schema: bundle.schema}}, + }) + require.True(t, bundle.mockT.Failed) + require.Equal(t, bundle.mockT.LogOutput(), failureString("Internal failure: when setting RequireInsertedOpts.Schema with RequireMany schema should be set only at index 0 or the same schema set for all options; expectedJobs[0].Opts.Schema = %q, expectedJobs[%d].Opts.Schema = %q", "", 1, bundle.schema)+"\n") + }) + + t.Run("DifferingSchemaError", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + _ = requireManyInserted(ctx, bundle.mockT, bundle.driver, []ExpectedJob{ + {Args: &Job1Args{}, Opts: &RequireInsertedOpts{Schema: bundle.schema}}, + {Args: &Job2Args{}, Opts: &RequireInsertedOpts{Schema: "other_schema"}}, + }) + require.True(t, bundle.mockT.Failed) + require.Equal(t, bundle.mockT.LogOutput(), failureString("Internal failure: when setting RequireInsertedOpts.Schema with RequireMany schema should be set only at index 0 or the same schema set for all options; expectedJobs[0].Opts.Schema = %q, expectedJobs[%d].Opts.Schema = %q", bundle.schema, 1, "other_schema")+"\n") + }) } func TestRequireManyInsertedTx(t *testing.T) { @@ -711,7 +781,7 @@ func TestRequireManyInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.False(t, bundle.mockT.Failed) @@ -730,14 +800,14 @@ func TestRequireManyInsertedTx(t *testing.T) { require.NoError(t, err) // Visible in the original transaction. - jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.False(t, bundle.mockT.Failed) require.Equal(t, "job1", jobs[0].Kind) // Not visible in the second transaction. - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, otherTx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.True(t, bundle.mockT.Failed) @@ -754,7 +824,7 @@ func TestRequireManyInsertedTx(t *testing.T) { _, err = riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, nil) require.NoError(t, err) - jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job2Args{}}, }) @@ -774,7 +844,7 @@ func TestRequireManyInsertedTx(t *testing.T) { }) require.NoError(t, err) - jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job1Args{}}, }) @@ -797,7 +867,7 @@ func TestRequireManyInsertedTx(t *testing.T) { }) require.NoError(t, err) - jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + jobs := requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job1Args{}}, {Args: &Job2Args{}}, @@ -821,7 +891,7 @@ func TestRequireManyInsertedTx(t *testing.T) { _, err := riverClient.InsertTx(ctx, bundle.tx, Job1Args{String: "foo"}, nil) require.NoError(t, err) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ { Args: &Job1Args{}, Opts: &RequireInsertedOpts{ @@ -843,7 +913,7 @@ func TestRequireManyInsertedTx(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -863,7 +933,7 @@ func TestRequireManyInsertedTx(t *testing.T) { _, bundle := setup(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.True(t, bundle.mockT.Failed) @@ -883,7 +953,7 @@ func TestRequireManyInsertedTx(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, }) require.True(t, bundle.mockT.Failed) @@ -903,7 +973,7 @@ func TestRequireManyInsertedTx(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job2Args{}}, }) @@ -927,7 +997,7 @@ func TestRequireManyInsertedTx(t *testing.T) { }) require.NoError(t, err) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, bundle.mockT, bundle.tx, []ExpectedJob{ {Args: &Job1Args{}}, {Args: &Job1Args{}}, {Args: &Job2Args{}}, @@ -958,7 +1028,7 @@ func TestRequireManyInsertedTx(t *testing.T) { // Max attempts { mockT := testutil.NewMockT(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -980,7 +1050,7 @@ func TestRequireManyInsertedTx(t *testing.T) { // Priority { mockT := testutil.NewMockT(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -1002,7 +1072,7 @@ func TestRequireManyInsertedTx(t *testing.T) { // Queue { mockT := testutil.NewMockT(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -1024,7 +1094,7 @@ func TestRequireManyInsertedTx(t *testing.T) { // Scheduled at { mockT := testutil.NewMockT(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -1046,7 +1116,7 @@ func TestRequireManyInsertedTx(t *testing.T) { // State { mockT := testutil.NewMockT(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{ @@ -1068,7 +1138,7 @@ func TestRequireManyInsertedTx(t *testing.T) { // Tags { mockT := testutil.NewMockT(t) - _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, emptySchema, []ExpectedJob{ + _ = requireManyInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, []ExpectedJob{ { Args: &Job2Args{}, Opts: &RequireInsertedOpts{