Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Preliminary River driver for SQLite (`riverdriver/riversqlite`). This driver seems to produce good results as judged by the test suite, but so far has minimal real world vetting. Try it and let us know how it works out. [PR #870](https://github.com/riverqueue/river/pull/870).
- CLI `river migrate-get` now takes a `--schema` option to inject a custom schema into dumped migrations and schema comments are hidden if `--schema` option isn't provided. [PR #903](https://github.com/riverqueue/river/pull/903).
- Added `riverlog.NewMiddlewareCustomContext` that makes the use of `riverlog` job-persisted logging possible with non-slog loggers. [PR #919](https://github.com/riverqueue/river/pull/919).
- Added `RequireInsertedOpts.Schema`, allowing an explicit schema to be set when asserting on job inserts with `rivertest`. [PR #926](https://github.com/riverqueue/river/pull/926).

### Changed

Expand Down
30 changes: 12 additions & 18 deletions rivertest/example_require_inserted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func Example_requireInserted() {
workers := river.NewWorkers()
river.AddWorker(workers, &RequiredWorker{})

schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil)
var (
schema = riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil)
schemaOpts = &rivertest.RequireInsertedOpts{Schema: schema}
)

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Expand Down Expand Up @@ -72,32 +75,23 @@ func Example_requireInserted() {
// *testing.T that comes from a test's argument.
t := &testing.T{}

// This is needed because rivertest does not yet support an injected schema.
if _, err := tx.Exec(ctx, "SET search_path TO "+schema); err != nil {
panic(err)
}

job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, nil)
job := rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, schemaOpts)
fmt.Printf("Test passed with message: %s\n", job.Args.Message)

// Verify the same job again, and this time that it was inserted at the
// default priority and default queue.
_ = rivertest.RequireInsertedTx[*riverpgxv5.Driver](ctx, t, tx, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
Priority: 1,
Queue: river.QueueDefault,
Schema: schema,
})

// Due to some refactoring to make schemas injectable, we don't yet have a
// way of injecting a schema at the pool level. The rivertest API will need
// to be expanded to allow it.
/*
// Insert and verify one on a pool instead of transaction.
_, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil)
if err != nil {
panic(err)
}
_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, nil)
*/
// Insert and verify one on a pool instead of transaction.
_, err = riverClient.Insert(ctx, &RequiredArgs{Message: "Hello from pool."}, nil)
if err != nil {
panic(err)
}
_ = rivertest.RequireInserted(ctx, t, riverpgxv5.New(dbPool), &RequiredArgs{}, schemaOpts)

// Output:
// Test passed with message: Hello.
Expand Down
34 changes: 14 additions & 20 deletions rivertest/example_require_many_inserted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func Example_requireManyInserted() {
river.AddWorker(workers, &FirstRequiredWorker{})
river.AddWorker(workers, &SecondRequiredWorker{})

schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil)
var (
schema = riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil)
schemaOpts = &rivertest.RequireInsertedOpts{Schema: schema}
)

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Expand Down Expand Up @@ -97,13 +100,8 @@ func Example_requireManyInserted() {
// *testing.T that comes from a test's argument.
t := &testing.T{}

// This is needed because rivertest does not yet support an injected schema.
if _, err := tx.Exec(ctx, "SET search_path TO "+schema); err != nil {
panic(err)
}

jobs := rivertest.RequireManyInsertedTx[*riverpgxv5.Driver](ctx, t, tx, []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}},
{Args: &FirstRequiredArgs{}, Opts: schemaOpts},
{Args: &SecondRequiredArgs{}},
{Args: &FirstRequiredArgs{}},
})
Expand All @@ -117,22 +115,18 @@ func Example_requireManyInserted() {
{Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{
Priority: 1,
Queue: river.QueueDefault,
Schema: schema,
}},
})

// Due to some refactoring to make schemas injectable, we don't yet have a
// way of injecting a schema at the pool level. The rivertest API will need
// to be expanded to allow it.
/*
// Insert and verify one on a pool instead of transaction.
_, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil)
if err != nil {
panic(err)
}
_ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}},
})
*/
// Insert and verify one on a pool instead of transaction.
_, err = riverClient.Insert(ctx, &FirstRequiredArgs{Message: "Hello from pool."}, nil)
if err != nil {
panic(err)
}
_ = rivertest.RequireManyInserted(ctx, t, riverpgxv5.New(dbPool), []rivertest.ExpectedJob{
{Args: &FirstRequiredArgs{}, Opts: schemaOpts},
})

// Output:
// Job 0 args: {"message": "Hello from first."}
Expand Down
99 changes: 74 additions & 25 deletions rivertest/rivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (
"github.com/riverqueue/river/rivertype"
)

// A placeholder for empty schema placeholders that'll need to be fixed to
// something better at some point.
const emptySchema = ""

// testingT is an interface wrapper around *testing.T that's implemented by all
// of *testing.T, *testing.F, and *testing.B.
//
Expand Down Expand Up @@ -72,6 +68,13 @@ type RequireInsertedOpts struct {
// No assertion is made if left the zero value.
ScheduledAt time.Time

// Schema is a non-standard Schema where River tables are located. All table
// references in assertion queries will use this value as a prefix.
//
// Defaults to empty, which causes the client to look for tables using the
// setting of Postgres `search_path`.
Schema string

// State is the expected state of the inserted job.
//
// No assertion is made if left the zero value.
Expand Down Expand Up @@ -101,12 +104,12 @@ type RequireInsertedOpts struct {
// to cover that case instead.
func RequireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
tb.Helper()
return requireInserted(ctx, tb, driver, emptySchema, expectedJob, opts)
return requireInserted(ctx, tb, driver, expectedJob, opts)
}

func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, schema string, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
t.Helper()
actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetExecutor(), schema, expectedJob, opts)
actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJob, opts)
if err != nil {
failure(t, "Internal failure: %s", err)
}
Expand All @@ -131,27 +134,32 @@ func requireInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobAr
// to cover that case instead.
func RequireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
tb.Helper()
return requireInsertedTx[TDriver](ctx, tb, tx, emptySchema, expectedJob, opts)
return requireInsertedTx[TDriver](ctx, tb, tx, expectedJob, opts)
}

// Internal function used by the tests so that the exported version can take
// `testing.TB` instead of `testing.T`.
//
// Also takes a schema for testing purposes, which I haven't quite figured out
// how to get into the public API yet.
func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, schema string, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
func requireInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) *river.Job[TArgs] {
t.Helper()
var driver TDriver
actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), schema, expectedJob, opts)
actualArgs, err := requireInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJob, opts)
if err != nil {
failure(t, "Internal failure: %s", err)
}
return actualArgs
}

func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, schema string, expectedJob TArgs, opts *RequireInsertedOpts) (*river.Job[TArgs], error) {
func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, expectedJob TArgs, opts *RequireInsertedOpts) (*river.Job[TArgs], error) {
t.Helper()

var schema string
if opts != nil {
schema = opts.Schema
}

// Returned ordered by ID.
jobRows, err := exec.JobGetByKindMany(ctx, &riverdriver.JobGetByKindManyParams{
Kind: []string{expectedJob.Kind()},
Expand Down Expand Up @@ -205,12 +213,12 @@ func requireInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo
// the given opts.
func RequireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) {
tb.Helper()
requireNotInserted(ctx, tb, driver, emptySchema, expectedJob, opts)
requireNotInserted(ctx, tb, driver, expectedJob, opts)
}

func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, schema string, expectedJob TArgs, opts *RequireInsertedOpts) {
func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, driver TDriver, expectedJob TArgs, opts *RequireInsertedOpts) {
t.Helper()
err := requireNotInsertedErr[TDriver](ctx, t, driver.GetExecutor(), schema, expectedJob, opts)
err := requireNotInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJob, opts)
if err != nil {
failure(t, "Internal failure: %s", err)
}
Expand All @@ -234,26 +242,31 @@ func requireNotInserted[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.Jo
// the given opts.
func RequireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, tb testing.TB, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) {
tb.Helper()
requireNotInsertedTx[TDriver](ctx, tb, tx, emptySchema, expectedJob, opts)
requireNotInsertedTx[TDriver](ctx, tb, tx, expectedJob, opts)
}

// Internal function used by the tests so that the exported version can take
// `testing.TB` instead of `testing.T`.
//
// Also takes a schema for testing purposes, which I haven't quite figured out
// how to get into the public API yet.
func requireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, schema string, expectedJob TArgs, opts *RequireInsertedOpts) {
func requireNotInsertedTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, tx TTx, expectedJob TArgs, opts *RequireInsertedOpts) {
t.Helper()
var driver TDriver
err := requireNotInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), schema, expectedJob, opts)
err := requireNotInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJob, opts)
if err != nil {
failure(t, "Internal failure: %s", err)
}
}

func requireNotInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, schema string, expectedJob TArgs, opts *RequireInsertedOpts) error {
func requireNotInsertedErr[TDriver riverdriver.Driver[TTx], TTx any, TArgs river.JobArgs](ctx context.Context, t testingT, exec riverdriver.Executor, expectedJob TArgs, opts *RequireInsertedOpts) error {
t.Helper()

var schema string
if opts != nil {
schema = opts.Schema
}

// Returned ordered by ID.
jobRows, err := exec.JobGetByKindMany(ctx, &riverdriver.JobGetByKindManyParams{
Kind: []string{expectedJob.Kind()},
Expand Down Expand Up @@ -322,14 +335,20 @@ type ExpectedJob struct {
// the number specified, and will fail in case this expectation isn't met. So if
// a job of a certain kind is emitted multiple times, it must be expected
// multiple times.
//
// If RequireInsertedOpts.Schema is used, it may be set only in the first
// expectation's options (and all expectations will use that schema), or the
// same schema may be set in every expectation. Setting a non-empty schema after
// the first expectation if the first's was empty is not allowed, and neither is
// mixing and matching schemas between options.
func RequireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow {
tb.Helper()
return requireManyInserted(ctx, tb, driver, string(emptySchema), expectedJobs)
return requireManyInserted(ctx, tb, driver, expectedJobs)
}

func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, schema string, expectedJobs []ExpectedJob) []*rivertype.JobRow {
func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, driver TDriver, expectedJobs []ExpectedJob) []*rivertype.JobRow {
t.Helper()
actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetExecutor(), schema, expectedJobs)
actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.GetExecutor(), expectedJobs)
if err != nil {
failure(t, "Internal failure: %s", err)
}
Expand All @@ -356,31 +375,61 @@ func requireManyInserted[TDriver riverdriver.Driver[TTx], TTx any](ctx context.C
// the number specified, and will fail in case this expectation isn't met. So if
// a job of a certain kind is emitted multiple times, it must be expected
// multiple times.
//
// If RequireInsertedOpts.Schema is used, it may be set only in the first
// expectation's options (and all expectations will use that schema), or the
// same schema may be set in every expectation. Setting a non-empty schema after
// the first expectation if the first's was empty is not allowed, and neither is
// mixing and matching schemas between options.
func RequireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, tb testing.TB, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow {
tb.Helper()
return requireManyInsertedTx[TDriver](ctx, tb, tx, emptySchema, expectedJobs)
return requireManyInsertedTx[TDriver](ctx, tb, tx, expectedJobs)
}

// Internal function used by the tests so that the exported version can take
// `testing.TB` instead of `testing.T`.
//
// Also takes a schema for testing purposes, which I haven't quite figured out
// how to get into the public API yet.
func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, schema string, expectedJobs []ExpectedJob) []*rivertype.JobRow {
func requireManyInsertedTx[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, tx TTx, expectedJobs []ExpectedJob) []*rivertype.JobRow {
t.Helper()
var driver TDriver
actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), schema, expectedJobs)
actualArgs, err := requireManyInsertedErr[TDriver](ctx, t, driver.UnwrapExecutor(tx), expectedJobs)
if err != nil {
failure(t, "Internal failure: %s", err)
}
return actualArgs
}

func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, exec riverdriver.Executor, schema string, expectedJobs []ExpectedJob) ([]*rivertype.JobRow, error) {
func requireManyInsertedErr[TDriver riverdriver.Driver[TTx], TTx any](ctx context.Context, t testingT, exec riverdriver.Executor, expectedJobs []ExpectedJob) ([]*rivertype.JobRow, error) {
t.Helper()

expectedArgsKinds := sliceutil.Map(expectedJobs, func(j ExpectedJob) string { return j.Args.Kind() })

var schema string
if len(expectedJobs) > 0 && expectedJobs[0].Opts != nil {
schema = expectedJobs[0].Opts.Schema
}

// For simplicity (and because I can't think of any reason anyone would need
// to do otherwise), require that if an explicit schema is being set that
// it's the same explicit schema for all options. Callers may specify the
// schema only once in the first expectation's options, or specify the same
// schema for all expectations' options, but they're not allowed to set a
// schema after the first expectation's options if it wasn't set in the
// first, and not allowed to mix and match schemas between options.
for i, expectedJob := range expectedJobs {
if opts := expectedJob.Opts; opts != nil {
if schema == "" && opts.Schema != "" ||
schema != "" && opts.Schema != "" && schema != opts.Schema {
return nil, fmt.Errorf(
"when setting RequireInsertedOpts.Schema with RequireMany schema should be set only at index 0 or the same schema set for all options; "+
"expectedJobs[0].Opts.Schema = %q, expectedJobs[%d].Opts.Schema = %q",
schema, i, opts.Schema)
}
}
}

// Returned ordered by ID.
jobRows, err := exec.JobGetByKindMany(ctx, &riverdriver.JobGetByKindManyParams{
Kind: expectedArgsKinds,
Expand Down
Loading
Loading