diff --git a/client.go b/client.go index 9c78f58a..a3067279 100644 --- a/client.go +++ b/client.go @@ -23,13 +23,13 @@ import ( "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/notifylimiter" "github.com/riverqueue/river/internal/rivercommon" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/maputil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" @@ -796,9 +796,11 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client if client.pilot == nil { client.pilot = &riverpilot.StandardPilot{} } - client.pilot.PilotInit(archetype, &riverpilot.PilotInitParams{ - WorkerMetadata: workerMetadata, - }) + client.pilot.PilotInit(archetype, (&riverpilot.PilotInitParams{ + Insert: client.insertMany, + NotifyNonTxJobInsert: client.notifyProducerWithoutListenerJobFetch, + WorkerMetadata: workerMetadata, + }).Validate()) pluginPilot, _ := client.pilot.(pilotPlugin) if withBaseService, ok := config.RetryPolicy.(baseservice.WithBaseService); ok { @@ -898,12 +900,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, - Insert: func(ctx context.Context, execTx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error { - _, err := client.insertMany(ctx, execTx, insertParams) - return err - }, - Pilot: client.pilot, - Schema: config.Schema, + Insert: client.insertMany, + Pilot: client.pilot, + Schema: config.Schema, }, driver.GetExecutor()) if err != nil { return nil, err @@ -1632,16 +1631,16 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts return nil, errNoDriverDBPool } - res, err := dbutil.WithTxV(ctx, c.driver.GetExecutor(), func(ctx context.Context, execTx riverdriver.ExecutorTx) (*insertManySharedResult, error) { + res, err := dbutil.WithTxV(ctx, c.driver.GetExecutor(), func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobInsertResult, error) { return c.validateParamsAndInsertMany(ctx, execTx, []InsertManyParams{{Args: args, InsertOpts: opts}}) }) if err != nil { return nil, err } - c.notifyProducerWithoutListenerJobFetch(res.QueuesDeduped) + c.notifyProducerWithoutListenerJobFetch(ctx, res) - return res.InsertResults[0], nil + return res[0], nil } // InsertTx inserts a new job with the provided args on the given transaction. @@ -1666,7 +1665,7 @@ func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts * if err != nil { return nil, err } - return res.InsertResults[0], nil + return res[0], nil } // InsertManyParams encapsulates a single job combined with insert options for @@ -1698,16 +1697,16 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) return nil, errNoDriverDBPool } - res, err := dbutil.WithTxV(ctx, c.driver.GetExecutor(), func(ctx context.Context, execTx riverdriver.ExecutorTx) (*insertManySharedResult, error) { + res, err := dbutil.WithTxV(ctx, c.driver.GetExecutor(), func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobInsertResult, error) { return c.validateParamsAndInsertMany(ctx, execTx, params) }) if err != nil { return nil, err } - c.notifyProducerWithoutListenerJobFetch(res.QueuesDeduped) + c.notifyProducerWithoutListenerJobFetch(ctx, res) - return res.InsertResults, nil + return res, nil } // InsertManyTx inserts many jobs at once. Each job is inserted as an @@ -1733,14 +1732,14 @@ func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertM if err != nil { return nil, err } - return res.InsertResults, nil + return res, nil } // validateParamsAndInsertMany is a helper method that wraps the insertMany // method to provide param validation and conversion prior to calling the actual // insertMany method. This allows insertMany to be reused by the // PeriodicJobEnqueuer which cannot reference top-level river package types. -func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, execTx riverdriver.ExecutorTx, params []InsertManyParams) (*insertManySharedResult, error) { +func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, execTx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { insertParams, err := c.insertManyParams(params) if err != nil { return nil, err @@ -1751,7 +1750,7 @@ func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, execTx ri // insertMany is a shared code path for InsertMany and InsertManyTx, also used // by the PeriodicJobEnqueuer. -func (c *Client[TTx]) insertMany(ctx context.Context, execTx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) (*insertManySharedResult, error) { +func (c *Client[TTx]) insertMany(ctx context.Context, execTx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) { return c.insertManyShared(ctx, execTx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) { results, err := c.pilot.JobInsertMany(ctx, execTx, &riverdriver.JobInsertFastManyParams{ Jobs: insertParams, @@ -1769,11 +1768,6 @@ func (c *Client[TTx]) insertMany(ctx context.Context, execTx riverdriver.Executo }) } -type insertManySharedResult struct { - InsertResults []*rivertype.JobInsertResult - QueuesDeduped []string -} - // The shared code path for all Insert and InsertMany methods. It takes a // function that executes the actual insert operation and allows for different // implementations of the insert query to be passed in, each mapping their @@ -1783,9 +1777,7 @@ func (c *Client[TTx]) insertManyShared( tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams, execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error), -) (*insertManySharedResult, error) { - var queuesDeduped []string - +) ([]*rivertype.JobInsertResult, error) { doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { for _, params := range insertParams { for _, hook := range append( @@ -1814,9 +1806,7 @@ func (c *Client[TTx]) insertManyShared( } } - queuesDeduped = sliceutil.Uniq(queues) - - if err = c.maybeNotifyInsertForQueues(ctx, tx, queuesDeduped); err != nil { + if err = c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil { return nil, err } @@ -1836,15 +1826,7 @@ func (c *Client[TTx]) insertManyShared( } } - insertResults, err := doInner(ctx) - if err != nil { - return nil, err - } - - return &insertManySharedResult{ - InsertResults: insertResults, - QueuesDeduped: queuesDeduped, - }, nil + return doInner(ctx) } // Validates input parameters for a batch insert operation and generates a set @@ -1878,13 +1860,30 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*rivertype. // Should only ever be invoked *outside* a transaction. If invoked within a // transaction, the producer wouldn't yet be able to access the new jobs that // triggered the notification because they're not committed yet. -func (c *Client[TTx]) notifyProducerWithoutListenerJobFetch(queuesDeduped []string) { +func (c *Client[TTx]) notifyProducerWithoutListenerJobFetch(_ context.Context, res []*rivertype.JobInsertResult) { if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 { return } - for _, queue := range queuesDeduped { - if producer, ok := c.producersByQueueName[queue]; ok { + // Special case for when we were handling exactly one job, which is a very + // common case. Acts as a minor optimization by avoiding the map allocation. + if len(res) == 1 { + if producer, ok := c.producersByQueueName[res[0].Job.Queue]; ok { + producer.TriggerJobFetch() + } + + return + } + + queuesTriggered := make(map[string]struct{}) + + for _, insertRes := range res { + if _, ok := queuesTriggered[insertRes.Job.Queue]; ok { + continue + } + queuesTriggered[insertRes.Job.Queue] = struct{}{} + + if producer, ok := c.producersByQueueName[insertRes.Job.Queue]; ok { producer.TriggerJobFetch() } } @@ -1914,16 +1913,16 @@ func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyPar } // Wrap in a transaction in case we need to notify about inserts. - res, err := dbutil.WithTxV(ctx, c.driver.GetExecutor(), func(ctx context.Context, execTx riverdriver.ExecutorTx) (*insertManySharedResult, error) { + res, err := dbutil.WithTxV(ctx, c.driver.GetExecutor(), func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobInsertResult, error) { return c.insertManyFast(ctx, execTx, params) }) if err != nil { return 0, err } - c.notifyProducerWithoutListenerJobFetch(res.QueuesDeduped) + c.notifyProducerWithoutListenerJobFetch(ctx, res) - return len(res.InsertResults), nil + return len(res), nil } // InsertManyTx inserts many jobs at once using Postgres' `COPY FROM` mechanism, @@ -1954,10 +1953,10 @@ func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []Ins if err != nil { return 0, err } - return len(res.InsertResults), nil + return len(res), nil } -func (c *Client[TTx]) insertManyFast(ctx context.Context, execTx riverdriver.ExecutorTx, params []InsertManyParams) (*insertManySharedResult, error) { +func (c *Client[TTx]) insertManyFast(ctx context.Context, execTx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { insertParams, err := c.insertManyParams(params) if err != nil { return nil, err @@ -1978,12 +1977,13 @@ func (c *Client[TTx]) insertManyFast(ctx context.Context, execTx riverdriver.Exe // Notify the given queues that new jobs are available. The queues list will be // deduplicated and each will be checked to see if it is due for an insert // notification from this client. -func (c *Client[TTx]) maybeNotifyInsertForQueues(ctx context.Context, tx riverdriver.ExecutorTx, queuesDeduped []string) error { - if len(queuesDeduped) < 1 { +func (c *Client[TTx]) maybeNotifyInsertForQueues(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { + if len(queues) < 1 { return nil } var ( + queuesDeduped = sliceutil.Uniq(queues) payloads = make([]string, 0, len(queuesDeduped)) queuesTriggered = make([]string, 0, len(queuesDeduped)) ) diff --git a/client_test.go b/client_test.go index 38216bad..dcf1d811 100644 --- a/client_test.go +++ b/client_test.go @@ -30,7 +30,6 @@ import ( "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" "github.com/riverqueue/river/internal/riverinternaltest/retrypolicytest" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" @@ -38,6 +37,7 @@ import ( "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/serviceutil" diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 8d3592ed..55666b2c 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -11,11 +11,11 @@ import ( "time" "github.com/riverqueue/river/internal/notifier" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/testutil" diff --git a/internal/maintenance/job_scheduler.go b/internal/maintenance/job_scheduler.go index ac1eb9dc..96051da6 100644 --- a/internal/maintenance/job_scheduler.go +++ b/internal/maintenance/job_scheduler.go @@ -14,10 +14,8 @@ import ( "github.com/riverqueue/river/rivershared/testsignal" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/serviceutil" - "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" "github.com/riverqueue/river/rivershared/util/timeutil" - "github.com/riverqueue/river/rivertype" ) const ( @@ -36,11 +34,9 @@ func (ts *JobSchedulerTestSignals) Init(tb testutil.TestingTB) { ts.ScheduledBatch.Init(tb) } -type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error - // NotifyInsert is a function to call to emit notifications for queues where // jobs were scheduled. -type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queuesDeduped []string) error +type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error type JobSchedulerConfig struct { // Interval is the amount of time between periodic checks for jobs to @@ -187,7 +183,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er } if len(queues) > 0 { - if err := s.config.NotifyInsert(ctx, tx, sliceutil.Uniq(queues)); err != nil { + if err := s.config.NotifyInsert(ctx, tx, queues); err != nil { return 0, fmt.Errorf("error notifying insert: %w", err) } s.TestSignals.NotifiedQueues.Signal(queues) diff --git a/internal/maintenance/job_scheduler_test.go b/internal/maintenance/job_scheduler_test.go index 5944d3d4..1abefb6b 100644 --- a/internal/maintenance/job_scheduler_test.go +++ b/internal/maintenance/job_scheduler_test.go @@ -320,8 +320,8 @@ func TestJobScheduler(t *testing.T) { scheduler, _ := setup(t, &testOpts{exec: exec, schema: schema}) scheduler.config.Interval = time.Minute // should only trigger once for the initial run - scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queuesDeduped []string) error { - notifyCh <- queuesDeduped + scheduler.config.NotifyInsert = func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error { + notifyCh <- queues return nil } now := time.Now().UTC() @@ -362,10 +362,8 @@ func TestJobScheduler(t *testing.T) { require.NoError(t, scheduler.Start(ctx)) scheduler.TestSignals.ScheduledBatch.WaitOrTimeout() - expectedQueues := []string{"queue1", "queue2", "queue3", "queue4"} - - notifiedQueuesDeduped := riversharedtest.WaitOrTimeout(t, notifyCh) - sort.Strings(notifiedQueuesDeduped) - require.Equal(t, expectedQueues, notifiedQueuesDeduped) + notifiedQueues := riversharedtest.WaitOrTimeout(t, notifyCh) + sort.Strings(notifiedQueues) + require.Equal(t, []string{"queue1", "queue2", "queue2", "queue3", "queue4"}, notifiedQueues) }) } diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 38935a00..5e1f97c4 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -82,6 +82,8 @@ func (j *PeriodicJob) validate() error { return nil } +type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) + type PeriodicJobEnqueuerConfig struct { AdvisoryLockPrefix int32 @@ -450,7 +452,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany defer tx.Rollback(ctx) if len(insertParamsMany) > 0 { - if err := s.Config.Insert(ctx, tx, insertParamsMany); err != nil { + if _, err := s.Config.Insert(ctx, tx, insertParamsMany); err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error inserting periodic jobs", "error", err.Error(), "num_jobs", len(insertParamsMany)) } diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index d7530b71..ef5e0628 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -132,15 +132,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) { // A simplified version of `Client.insertMany` that only inserts jobs directly // via the driver instead of using the pilot. - makeInsertFunc := func(schema string) func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error { - return func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error { + makeInsertFunc := func(schema string) func(ctx context.Context, execTx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) { + return func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) { _, err := tx.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{ Jobs: sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams { return (*riverdriver.JobInsertFastParams)(params) }), Schema: schema, }) - return err + return nil, err } } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index a29b71a1..972dc86a 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -424,6 +424,7 @@ type JobGetStuckParams struct { } type JobInsertFastParams struct { + ID *int64 // Args contains the raw underlying job arguments struct. It has already been // encoded into EncodedArgs, but the original is kept here for to leverage its // struct tags and interfaces, such as for use in unique key generation. diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index c48b7ba2..fdd91514 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -649,7 +649,24 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara } const jobInsertFastMany = `-- name: JobInsertFastMany :many +WITH raw_job_data AS ( + SELECT + unnest($1::bigint[]) AS id, + unnest($2::jsonb[]) AS args, + unnest($3::timestamptz[]) AS created_at, + unnest($4::text[]) AS kind, + unnest($5::smallint[]) AS max_attempts, + unnest($6::jsonb[]) AS metadata, + unnest($7::smallint[]) AS priority, + unnest($8::text[]) AS queue, + unnest($9::timestamptz[]) AS scheduled_at, + unnest($10::text[]) AS state, + unnest($11::text[]) AS tags, + unnest($12::bytea[]) AS unique_key, + unnest($13::integer[]) AS unique_states +) INSERT INTO /* TEMPLATE: schema */river_job( + id, args, created_at, kind, @@ -663,24 +680,23 @@ INSERT INTO /* TEMPLATE: schema */river_job( unique_key, unique_states ) SELECT - unnest($1::jsonb[]), - unnest($2::timestamptz[]), - unnest($3::text[]), - unnest($4::smallint[]), - unnest($5::jsonb[]), - unnest($6::smallint[]), - unnest($7::text[]), - unnest($8::timestamptz[]), - -- To avoid requiring pgx users to register the OID of the river_job_state[] - -- type, we cast the array to text[] and then to river_job_state. - unnest($9::text[])::/* TEMPLATE: schema */river_job_state, - -- Unnest on a multi-dimensional array will fully flatten the array, so we - -- encode the tag list as a comma-separated string and split it in the - -- query. - string_to_array(unnest($10::text[]), ','), - - nullif(unnest($11::bytea[]), ''), - nullif(unnest($12::integer[]), 0)::bit(8) + coalesce(nullif(id, 0), nextval('/* TEMPLATE: schema */river_job_id_seq'::regclass)), + args, + coalesce(nullif(created_at, '0001-01-01 00:00:00 +0000'), now()) AS created_at, + kind, + max_attempts, + coalesce(metadata, '{}'::jsonb) AS metadata, + priority, + queue, + coalesce(nullif(scheduled_at, '0001-01-01 00:00:00 +0000'), now()) AS scheduled_at, + state::/* TEMPLATE: schema */river_job_state, + string_to_array(tags, ',')::varchar(255)[], + -- ` + "`" + `nullif` + "`" + ` is required for ` + "`" + `lib/pq` + "`" + `, which doesn't do a good job of reading + -- ` + "`" + `nil` + "`" + ` into ` + "`" + `bytea` + "`" + `. We use ` + "`" + `text` + "`" + ` because otherwise ` + "`" + `lib/pq` + "`" + ` will encode + -- to Postgres binary like ` + "`" + `\xAAAA` + "`" + `. + nullif(unique_key, '')::bytea, + nullif(unique_states::integer, 0)::bit(8) +FROM raw_job_data ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL AND unique_states IS NOT NULL @@ -691,6 +707,7 @@ RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_a ` type JobInsertFastManyParams struct { + ID []int64 Args []string CreatedAt []time.Time Kind []string @@ -712,6 +729,7 @@ type JobInsertFastManyRow struct { func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) ([]*JobInsertFastManyRow, error) { rows, err := db.QueryContext(ctx, jobInsertFastMany, + pq.Array(arg.ID), pq.Array(arg.Args), pq.Array(arg.CreatedAt), pq.Array(arg.Kind), diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 9b8ce7d4..a7457737 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -340,6 +340,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) { insertJobsParams := &dbsqlc.JobInsertFastManyParams{ + ID: make([]int64, len(params.Jobs)), Args: make([]string, len(params.Jobs)), CreatedAt: make([]time.Time, len(params.Jobs)), Kind: make([]string, len(params.Jobs)), @@ -373,6 +374,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo tags = []string{} } + insertJobsParams.ID[i] = ptrutil.ValOrDefault(params.ID, 0) insertJobsParams.Args[i] = cmp.Or(string(params.EncodedArgs), "{}") insertJobsParams.CreatedAt[i] = createdAt insertJobsParams.Kind[i] = params.Kind diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index c8b69d30..c244005c 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand/v2" "reflect" "slices" "sort" @@ -1478,11 +1479,15 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, exec, bundle := setup(ctx, t) - now := time.Now().UTC() + var ( + idStart = rand.Int64() + now = time.Now().UTC() + ) insertParams := make([]*riverdriver.JobInsertFastParams, 10) for i := 0; i < len(insertParams); i++ { insertParams[i] = &riverdriver.JobInsertFastParams{ + ID: ptrutil.Ptr(idStart + int64(i)), CreatedAt: ptrutil.Ptr(now.Add(time.Duration(i) * 5 * time.Second)), EncodedArgs: []byte(`{"encoded": "args"}`), Kind: "test_kind", @@ -1514,6 +1519,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job.Metadata, err = sjson.DeleteBytes(job.Metadata, rivercommon.MetadataKeyUniqueNonce) require.NoError(t, err) + require.Equal(t, idStart+int64(i), job.ID) require.Equal(t, 0, job.Attempt) require.Nil(t, job.AttemptedAt) require.Empty(t, job.AttemptedBy) @@ -1588,6 +1594,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.NoError(t, err) require.Len(t, jobsAfter, len(insertParams)) for _, job := range jobsAfter { + require.NotZero(t, job.ID) require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second) require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index a6e26fdb..3932b2ee 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -254,7 +254,24 @@ ORDER BY id LIMIT @max; -- name: JobInsertFastMany :many +WITH raw_job_data AS ( + SELECT + unnest(@id::bigint[]) AS id, + unnest(@args::jsonb[]) AS args, + unnest(@created_at::timestamptz[]) AS created_at, + unnest(@kind::text[]) AS kind, + unnest(@max_attempts::smallint[]) AS max_attempts, + unnest(@metadata::jsonb[]) AS metadata, + unnest(@priority::smallint[]) AS priority, + unnest(@queue::text[]) AS queue, + unnest(@scheduled_at::timestamptz[]) AS scheduled_at, + unnest(@state::text[]) AS state, + unnest(@tags::text[]) AS tags, + unnest(@unique_key::bytea[]) AS unique_key, + unnest(@unique_states::integer[]) AS unique_states +) INSERT INTO /* TEMPLATE: schema */river_job( + id, args, created_at, kind, @@ -268,24 +285,23 @@ INSERT INTO /* TEMPLATE: schema */river_job( unique_key, unique_states ) SELECT - unnest(@args::jsonb[]), - unnest(@created_at::timestamptz[]), - unnest(@kind::text[]), - unnest(@max_attempts::smallint[]), - unnest(@metadata::jsonb[]), - unnest(@priority::smallint[]), - unnest(@queue::text[]), - unnest(@scheduled_at::timestamptz[]), - -- To avoid requiring pgx users to register the OID of the river_job_state[] - -- type, we cast the array to text[] and then to river_job_state. - unnest(@state::text[])::/* TEMPLATE: schema */river_job_state, - -- Unnest on a multi-dimensional array will fully flatten the array, so we - -- encode the tag list as a comma-separated string and split it in the - -- query. - string_to_array(unnest(@tags::text[]), ','), - - nullif(unnest(@unique_key::bytea[]), ''), - nullif(unnest(@unique_states::integer[]), 0)::bit(8) + coalesce(nullif(id, 0), nextval('/* TEMPLATE: schema */river_job_id_seq'::regclass)), + args, + coalesce(nullif(created_at, '0001-01-01 00:00:00 +0000'), now()) AS created_at, + kind, + max_attempts, + coalesce(metadata, '{}'::jsonb) AS metadata, + priority, + queue, + coalesce(nullif(scheduled_at, '0001-01-01 00:00:00 +0000'), now()) AS scheduled_at, + state::/* TEMPLATE: schema */river_job_state, + string_to_array(tags, ',')::varchar(255)[], + -- `nullif` is required for `lib/pq`, which doesn't do a good job of reading + -- `nil` into `bytea`. We use `text` because otherwise `lib/pq` will encode + -- to Postgres binary like `\xAAAA`. + nullif(unique_key, '')::bytea, + nullif(unique_states::integer, 0)::bit(8) +FROM raw_job_data ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL AND unique_states IS NOT NULL diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 3716faa5..d8a7ca3c 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -628,7 +628,24 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara } const jobInsertFastMany = `-- name: JobInsertFastMany :many +WITH raw_job_data AS ( + SELECT + unnest($1::bigint[]) AS id, + unnest($2::jsonb[]) AS args, + unnest($3::timestamptz[]) AS created_at, + unnest($4::text[]) AS kind, + unnest($5::smallint[]) AS max_attempts, + unnest($6::jsonb[]) AS metadata, + unnest($7::smallint[]) AS priority, + unnest($8::text[]) AS queue, + unnest($9::timestamptz[]) AS scheduled_at, + unnest($10::text[]) AS state, + unnest($11::text[]) AS tags, + unnest($12::bytea[]) AS unique_key, + unnest($13::integer[]) AS unique_states +) INSERT INTO /* TEMPLATE: schema */river_job( + id, args, created_at, kind, @@ -642,24 +659,23 @@ INSERT INTO /* TEMPLATE: schema */river_job( unique_key, unique_states ) SELECT - unnest($1::jsonb[]), - unnest($2::timestamptz[]), - unnest($3::text[]), - unnest($4::smallint[]), - unnest($5::jsonb[]), - unnest($6::smallint[]), - unnest($7::text[]), - unnest($8::timestamptz[]), - -- To avoid requiring pgx users to register the OID of the river_job_state[] - -- type, we cast the array to text[] and then to river_job_state. - unnest($9::text[])::/* TEMPLATE: schema */river_job_state, - -- Unnest on a multi-dimensional array will fully flatten the array, so we - -- encode the tag list as a comma-separated string and split it in the - -- query. - string_to_array(unnest($10::text[]), ','), - - nullif(unnest($11::bytea[]), ''), - nullif(unnest($12::integer[]), 0)::bit(8) + coalesce(nullif(id, 0), nextval('/* TEMPLATE: schema */river_job_id_seq'::regclass)), + args, + coalesce(nullif(created_at, '0001-01-01 00:00:00 +0000'), now()) AS created_at, + kind, + max_attempts, + coalesce(metadata, '{}'::jsonb) AS metadata, + priority, + queue, + coalesce(nullif(scheduled_at, '0001-01-01 00:00:00 +0000'), now()) AS scheduled_at, + state::/* TEMPLATE: schema */river_job_state, + string_to_array(tags, ',')::varchar(255)[], + -- ` + "`" + `nullif` + "`" + ` is required for ` + "`" + `lib/pq` + "`" + `, which doesn't do a good job of reading + -- ` + "`" + `nil` + "`" + ` into ` + "`" + `bytea` + "`" + `. We use ` + "`" + `text` + "`" + ` because otherwise ` + "`" + `lib/pq` + "`" + ` will encode + -- to Postgres binary like ` + "`" + `\xAAAA` + "`" + `. + nullif(unique_key, '')::bytea, + nullif(unique_states::integer, 0)::bit(8) +FROM raw_job_data ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL AND unique_states IS NOT NULL @@ -670,6 +686,7 @@ RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_a ` type JobInsertFastManyParams struct { + ID []int64 Args [][]byte CreatedAt []time.Time Kind []string @@ -691,6 +708,7 @@ type JobInsertFastManyRow struct { func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) ([]*JobInsertFastManyRow, error) { rows, err := db.Query(ctx, jobInsertFastMany, + arg.ID, arg.Args, arg.CreatedAt, arg.Kind, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index ebc02972..e0c1d9b2 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -344,6 +344,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) { insertJobsParams := &dbsqlc.JobInsertFastManyParams{ + ID: make([]int64, len(params.Jobs)), Args: make([][]byte, len(params.Jobs)), CreatedAt: make([]time.Time, len(params.Jobs)), Kind: make([]string, len(params.Jobs)), @@ -378,6 +379,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo defaultObject := []byte("{}") + insertJobsParams.ID[i] = ptrutil.ValOrDefault(params.ID, 0) insertJobsParams.Args[i] = sliceutil.FirstNonEmpty(params.EncodedArgs, defaultObject) insertJobsParams.CreatedAt[i] = createdAt insertJobsParams.Kind[i] = params.Kind diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql index fa65cc94..1ad76f17 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql @@ -182,6 +182,7 @@ LIMIT @max; -- a network, looping over operations is probably okay performance-wise. -- name: JobInsertFast :one INSERT INTO /* TEMPLATE: schema */river_job( + id, args, created_at, kind, @@ -195,6 +196,7 @@ INSERT INTO /* TEMPLATE: schema */river_job( unique_key, unique_states ) VALUES ( + cast(sqlc.narg('id') AS integer), @args, coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')), @kind, diff --git a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go index abca6a30..089482d3 100644 --- a/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riversqlite/internal/dbsqlc/river_job.sql.go @@ -585,6 +585,7 @@ func (q *Queries) JobGetStuck(ctx context.Context, db DBTX, arg *JobGetStuckPara const jobInsertFast = `-- name: JobInsertFast :one INSERT INTO /* TEMPLATE: schema */river_job( + id, args, created_at, kind, @@ -598,18 +599,19 @@ INSERT INTO /* TEMPLATE: schema */river_job( unique_key, unique_states ) VALUES ( - ?1, - coalesce(cast(?2 AS text), datetime('now', 'subsec')), - ?3, + cast(?1 AS integer), + ?2, + coalesce(cast(?3 AS text), datetime('now', 'subsec')), ?4, - json(cast(?5 AS blob)), - ?6, + ?5, + json(cast(?6 AS blob)), ?7, - coalesce(cast(?8 AS text), datetime('now', 'subsec')), - ?9, - json(cast(?10 AS blob)), - CASE WHEN length(cast(?11 AS blob)) = 0 THEN NULL ELSE ?11 END, - ?12 + ?8, + coalesce(cast(?9 AS text), datetime('now', 'subsec')), + ?10, + json(cast(?11 AS blob)), + CASE WHEN length(cast(?12 AS blob)) = 0 THEN NULL ELSE ?12 END, + ?13 ) ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL @@ -631,6 +633,7 @@ RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, fin ` type JobInsertFastParams struct { + ID *int64 Args []byte CreatedAt *string Kind string @@ -656,6 +659,7 @@ type JobInsertFastParams struct { // a network, looping over operations is probably okay performance-wise. func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFastParams) (*RiverJob, error) { row := db.QueryRowContext(ctx, jobInsertFast, + arg.ID, arg.Args, arg.CreatedAt, arg.Kind, diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 2469d408..9618fde1 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -42,11 +42,11 @@ import ( "github.com/tidwall/sjson" "github.com/riverqueue/river/internal/rivercommon" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riversqlite/internal/dbsqlc" "github.com/riverqueue/river/rivershared/sqlctemplate" "github.com/riverqueue/river/rivershared/uniquestates" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/savepointutil" @@ -511,6 +511,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo } internal, err := dbsqlc.New().JobInsertFast(ctx, dbtx, &dbsqlc.JobInsertFastParams{ + ID: params.ID, Args: params.EncodedArgs, CreatedAt: timeStringNullable(params.CreatedAt), Kind: params.Kind, diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 503e8b5d..e248ac2c 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -17,11 +17,11 @@ import ( "strings" "time" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/levenshtein" "github.com/riverqueue/river/rivershared/sqlctemplate" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/maputil" "github.com/riverqueue/river/rivershared/util/sliceutil" ) diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index 26e78fa9..8f12791e 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -16,11 +16,11 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/require" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/sqlctemplate" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/randutil" "github.com/riverqueue/river/rivershared/util/sliceutil" ) diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index 98e1a217..b16c1fd6 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -54,6 +54,18 @@ type Pilot interface { // // API is not stable. DO NOT USE. type PilotInitParams struct { + // Insert is the insert implementation from the main client. This is + // used as a low-level insert that shouldn't be accessible via public API, + // but should be accessible to deep integrations. + Insert func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) + + // NotifyNonTxJobInsert is a special function that should be invoked when a + // client knows that a job has become available and the transaction that + // committed it has finished so that it's possible for a producer to fetch + // it. This is used in special cases like poll-only clients to improve + // latency between job insert and when a job is worked. + NotifyNonTxJobInsert func(ctx context.Context, res []*rivertype.JobInsertResult) + // WorkerMetadata is metadata about registered workers as received from the // client's worker bundle. Only available when a client will work jobs (i.e. // has Workers configured), so while it's safe to assume the presence of @@ -61,6 +73,16 @@ type PilotInitParams struct { WorkerMetadata []*rivertype.WorkerMetadata } +func (p *PilotInitParams) Validate() *PilotInitParams { + if p.Insert == nil { + panic("need PilotInitParams.Insert") + } + if p.NotifyNonTxJobInsert == nil { + panic("need PilotInitParams.NotifyNonTxJobInsert ") + } + return p +} + // PilotPeriodicJob contains pilot functions related to periodic jobs. This is // extracted as its own interface so there's less surface area to mock in places // like the periodic job enqueuer where that's needed. diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard_pilot.go similarity index 100% rename from rivershared/riverpilot/standard.go rename to rivershared/riverpilot/standard_pilot.go diff --git a/internal/util/dbutil/db_util.go b/rivershared/util/dbutil/db_util.go similarity index 73% rename from internal/util/dbutil/db_util.go rename to rivershared/util/dbutil/db_util.go index 73c5f7b4..652aeee6 100644 --- a/internal/util/dbutil/db_util.go +++ b/rivershared/util/dbutil/db_util.go @@ -9,7 +9,7 @@ import ( // WithTx starts and commits a transaction on a driver executor around // the given function, allowing the return of a generic value. -func WithTx(ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, execTx riverdriver.ExecutorTx) error) error { +func WithTx[TExec riverdriver.Executor](ctx context.Context, exec TExec, innerFunc func(ctx context.Context, execTx riverdriver.ExecutorTx) error) error { _, err := WithTxV(ctx, exec, func(ctx context.Context, tx riverdriver.ExecutorTx) (struct{}, error) { return struct{}{}, innerFunc(ctx, tx) }) @@ -18,7 +18,7 @@ func WithTx(ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx c // WithTxV starts and commits a transaction on a driver executor around // the given function, allowing the return of a generic value. -func WithTxV[T any](ctx context.Context, exec riverdriver.Executor, innerFunc func(ctx context.Context, execTx riverdriver.ExecutorTx) (T, error)) (T, error) { +func WithTxV[TExec riverdriver.Executor, T any](ctx context.Context, exec TExec, innerFunc func(ctx context.Context, execTx riverdriver.ExecutorTx) (T, error)) (T, error) { var defaultRes T tx, err := exec.Begin(ctx) diff --git a/internal/util/dbutil/db_util_test.go b/rivershared/util/dbutil/db_util_test.go similarity index 94% rename from internal/util/dbutil/db_util_test.go rename to rivershared/util/dbutil/db_util_test.go index 2c9f9fa3..8a57df56 100644 --- a/internal/util/dbutil/db_util_test.go +++ b/rivershared/util/dbutil/db_util_test.go @@ -6,10 +6,10 @@ import ( "github.com/stretchr/testify/require" - "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/util/dbutil" ) func TestWithTx(t *testing.T) { diff --git a/internal/util/dbutil/main_test.go b/rivershared/util/dbutil/main_test.go similarity index 100% rename from internal/util/dbutil/main_test.go rename to rivershared/util/dbutil/main_test.go diff --git a/rivertype/river_type.go b/rivertype/river_type.go index a946a460..32473217 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -255,6 +255,7 @@ type AttemptError struct { } type JobInsertParams struct { + ID *int64 Args JobArgs CreatedAt *time.Time EncodedArgs []byte