Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- CLI `river migrate-get` now takes a `--schema` option to inject a custom schema into dumped migrations and schema comments are hidden if `--schema` option isn't provided. [PR #903](https://github.com/riverqueue/river/pull/903).
- Added `riverlog.NewMiddlewareCustomContext` that makes the use of `riverlog` job-persisted logging possible with non-slog loggers. [PR #919](https://github.com/riverqueue/river/pull/919).
- Added `RequireInsertedOpts.Schema`, allowing an explicit schema to be set when asserting on job inserts with `rivertest`. [PR #926](https://github.com/riverqueue/river/pull/926).
- When using a driver that doesn't support listen/notify, producers within same process are notified immediately of new job inserts and queue changes (e.g. pause/resume) without having to poll when non-transactional variants are used (i.e. `Insert` instead of `InsertTx`). [PR #928](https://github.com/riverqueue/river/pull/928).
- Added `JobListParams.Where`, which provides an escape hatch for job listing that runs arbitrary SQL with named parameters. [PR #933](https://github.com/riverqueue/river/pull/933).

### Changed
Expand Down
292 changes: 182 additions & 110 deletions client.go

Large diffs are not rendered by default.

201 changes: 199 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/riverdriver/riversqlite"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstoptest"
Expand Down Expand Up @@ -134,7 +135,6 @@ func newTestConfig(t *testing.T, schema string) *Config {
},
TestOnly: true, // disables staggered start in maintenance services
Workers: workers,
queuePollInterval: 50 * time.Millisecond,
schedulerInterval: riverinternaltest.SchedulerShortInterval,
}
}
Expand Down Expand Up @@ -608,6 +608,69 @@ func Test_Client(t *testing.T) {
require.NoError(t, err)
})

t.Run("CancelProducerControlEventSent", func(t *testing.T) {
t.Parallel()

var (
driver = riversqlite.New(nil)
schema = riverdbtest.TestSchema(ctx, t, driver, &riverdbtest.TestSchemaOpts{
ProcurePool: func(ctx context.Context, schema string) (any, string) {
return riversharedtest.DBPoolSQLite(ctx, t, schema), "" // could also be `main` instead of empty string
},
})
config = newTestConfig(t, schema)
)

client, err := NewClient(driver, config)
require.NoError(t, err)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

_, err = client.JobCancel(ctx, insertRes.Job.ID)
require.NoError(t, err)

controlEvent := client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.WaitOrTimeout()
require.NotNil(t, controlEvent)
require.Equal(t, controlActionCancel, controlEvent.Action)
})

t.Run("CancelProducerControlEventNotSent", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

_, err = client.JobCancel(ctx, insertRes.Job.ID)
require.NoError(t, err)

client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.RequireEmpty()
})

t.Run("AlternateSchema", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1161,6 +1224,55 @@ func Test_Client(t *testing.T) {
}
})

t.Run("QueuePauseAndResumeProducerControlEventSent", func(t *testing.T) {
t.Parallel()

var (
driver = riversqlite.New(nil)
schema = riverdbtest.TestSchema(ctx, t, driver, &riverdbtest.TestSchemaOpts{
ProcurePool: func(ctx context.Context, schema string) (any, string) {
return riversharedtest.DBPoolSQLite(ctx, t, schema), "" // could also be `main` instead of empty string
},
})
config = newTestConfig(t, schema)
)

client, err := NewClient(driver, config)
require.NoError(t, err)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

startClient(ctx, t, client)

require.NoError(t, client.QueuePause(ctx, QueueDefault, nil))

controlEvent := client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.WaitOrTimeout()
require.NotNil(t, controlEvent)
require.Equal(t, controlActionPause, controlEvent.Action)

require.NoError(t, client.QueueResume(ctx, QueueDefault, nil))

controlEvent = client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.WaitOrTimeout()
require.NotNil(t, controlEvent)
require.Equal(t, controlActionResume, controlEvent.Action)
})

t.Run("QueuePauseAndResumeProducerControlEventNotSent", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

startClient(ctx, t, client)

require.NoError(t, client.QueuePause(ctx, QueueDefault, nil))

client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.RequireEmpty()

require.NoError(t, client.QueueResume(ctx, QueueDefault, nil))

client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.RequireEmpty()
})

t.Run("PollOnlyDriver", func(t *testing.T) {
t.Parallel()

Expand All @@ -1172,7 +1284,6 @@ func Test_Client(t *testing.T) {

client, err := NewClient(riverdatabasesql.New(stdPool), config)
require.NoError(t, err)

client.testSignals.Init(t)

// Notifier should not have been initialized at all.
Expand Down Expand Up @@ -2018,6 +2129,46 @@ func Test_Client_Insert(t *testing.T) {
require.Equal(t, []string{}, jobRow.Tags)
})

t.Run("ProducerFetchLimiterCalled", func(t *testing.T) {
t.Parallel()

var (
driver = riversqlite.New(nil)
schema = riverdbtest.TestSchema(ctx, t, driver, &riverdbtest.TestSchemaOpts{
ProcurePool: func(ctx context.Context, schema string) (any, string) {
return riversharedtest.DBPoolSQLite(ctx, t, schema), "" // could also be `main` instead of empty string
},
})
config = newTestConfig(t, schema)
)

client, err := NewClient(driver, config)
require.NoError(t, err)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

startClient(ctx, t, client)

_, err = client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

client.producersByQueueName[QueueDefault].testSignals.JobFetchTriggered.WaitOrTimeout()
})

// Not called for drivers that support a listener.
t.Run("ProducerFetchLimiterNotCalled", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

startClient(ctx, t, client)

_, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

client.producersByQueueName[QueueDefault].testSignals.JobFetchTriggered.RequireEmpty()
})

t.Run("WithInsertOpts", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4948,6 +5099,52 @@ func Test_Client_QueueUpdate(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}
})

t.Run("ProducerControlEventSent", func(t *testing.T) {
t.Parallel()

var (
driver = riversqlite.New(nil)
schema = riverdbtest.TestSchema(ctx, t, driver, &riverdbtest.TestSchemaOpts{
ProcurePool: func(ctx context.Context, schema string) (any, string) {
return riversharedtest.DBPoolSQLite(ctx, t, schema), "" // could also be `main` instead of empty string
},
})
config = newTestConfig(t, schema)
)

client, err := NewClient(driver, config)
require.NoError(t, err)
client.producersByQueueName[QueueDefault].testSignals.Init(t)

startClient(ctx, t, client)

_, err = client.QueueUpdate(ctx, QueueDefault, &QueueUpdateParams{
Metadata: []byte(`{"foo":"baz"}`),
})
require.NoError(t, err)

controlEvent := client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.WaitOrTimeout()
require.NotNil(t, controlEvent)
require.Equal(t, controlActionMetadataChanged, controlEvent.Action)
})

t.Run("ProducerControlEventNotSent", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

client.producersByQueueName[QueueDefault].testSignals.Init(t)

startClient(ctx, t, client)

_, err := client.QueueUpdate(ctx, QueueDefault, &QueueUpdateParams{
Metadata: []byte(`{"foo":"baz"}`),
})
require.NoError(t, err)

client.producersByQueueName[QueueDefault].testSignals.QueueControlEventTriggered.RequireEmpty()
})
}

func Test_Client_QueueUpdateTx(t *testing.T) {
Expand Down
5 changes: 1 addition & 4 deletions example_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"log/slog"
"time"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver"
Expand Down Expand Up @@ -36,9 +35,7 @@ func Example_sqlite() {
river.AddWorker(workers, &SortWorker{})

riverClient, err := river.NewClient(driver, &river.Config{
FetchCooldown: 20 * time.Millisecond,
FetchPollInterval: 50 * time.Millisecond, // this driver is poll only, so speed up poll interval so the test runs fater

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most immediate advantage of this change for the test suite is that when testing SQLite, we don't make the tests super slow if we accidentally omit configuration for increased polling frequency. It can now be safely removed and we can be sure the producer will pick changes up right away.

Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Expand Down
7 changes: 4 additions & 3 deletions internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivertype"
Expand All @@ -35,11 +36,11 @@ func (ts *JobSchedulerTestSignals) Init(tb testutil.TestingTB) {
ts.ScheduledBatch.Init(tb)
}

type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error)
type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error

// NotifyInsert is a function to call to emit notifications for queues where
// jobs were scheduled.
type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error
type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queuesDeduped []string) error

type JobSchedulerConfig struct {
// Interval is the amount of time between periodic checks for jobs to
Expand Down Expand Up @@ -186,7 +187,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
}

if len(queues) > 0 {
if err := s.config.NotifyInsert(ctx, tx, queues); err != nil {
if err := s.config.NotifyInsert(ctx, tx, sliceutil.Uniq(queues)); err != nil {
return 0, fmt.Errorf("error notifying insert: %w", err)
}
s.TestSignals.NotifiedQueues.Signal(queues)
Expand Down
12 changes: 6 additions & 6 deletions internal/maintenance/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ func TestJobScheduler(t *testing.T) {

scheduler, _ := setup(t, &testOpts{exec: exec, schema: schema})
scheduler.config.Interval = time.Minute // should only trigger once for the initial run
scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error {
notifyCh <- queues
scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queuesDeduped []string) error {
notifyCh <- queuesDeduped
return nil
}
now := time.Now().UTC()
Expand Down Expand Up @@ -362,10 +362,10 @@ func TestJobScheduler(t *testing.T) {
require.NoError(t, scheduler.Start(ctx))
scheduler.TestSignals.ScheduledBatch.WaitOrTimeout()

expectedQueues := []string{"queue1", "queue2", "queue2", "queue3", "queue4"}
expectedQueues := []string{"queue1", "queue2", "queue3", "queue4"}

notifiedQueues := riversharedtest.WaitOrTimeout(t, notifyCh)
sort.Strings(notifiedQueues)
require.Equal(t, expectedQueues, notifiedQueues)
notifiedQueuesDeduped := riversharedtest.WaitOrTimeout(t, notifyCh)
sort.Strings(notifiedQueuesDeduped)
require.Equal(t, expectedQueues, notifiedQueuesDeduped)
})
}
3 changes: 1 addition & 2 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
defer tx.Rollback(ctx)

if len(insertParamsMany) > 0 {
_, err := s.Config.Insert(ctx, tx, insertParamsMany)
if err != nil {
if err := s.Config.Insert(ctx, tx, insertParamsMany); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error inserting periodic jobs",
"error", err.Error(), "num_jobs", len(insertParamsMany))
return
Expand Down
15 changes: 4 additions & 11 deletions internal/maintenance/periodic_job_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) {

// A simplified version of `Client.insertMany` that only inserts jobs directly
// via the driver instead of using the pilot.
makeInsertFunc := func(schema string) func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) {
return func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) {
results, err := tx.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{
makeInsertFunc := func(schema string) func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error {
return func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error {
_, err := tx.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{
Jobs: sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams {
return (*riverdriver.JobInsertFastParams)(params)
}),
Schema: schema,
})
if err != nil {
return nil, err
}
return sliceutil.Map(results,
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
return (*rivertype.JobInsertResult)(result)
},
), nil
return err
}
}

Expand Down
Loading
Loading