Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ type Config struct {
// Scheduler run interval. Shared between the scheduler and producer/job
// executors, but not currently exposed for configuration.
schedulerInterval time.Duration

// Customize processing order of job. Jobs can be processed in a FIFO/LIFO
// fashion
Fifo bool
}

// WithDefaults returns a copy of the Config with all default values applied.
Expand Down Expand Up @@ -2032,6 +2036,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p
Schema: c.config.Schema,
StaleProducerRetentionPeriod: 5 * time.Minute,
Workers: c.config.Workers,
Fifo: c.config.Fifo,
})
c.producersByQueueName[queueName] = producer
return producer, nil
Expand Down
3 changes: 3 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type producerConfig struct {
Schema string
StaleProducerRetentionPeriod time.Duration
Workers *Workers

Fifo bool
}

func (c *producerConfig) mustValidate() *producerConfig {
Expand Down Expand Up @@ -754,6 +756,7 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
Queue: p.config.Queue,
ProducerID: p.id.Load(),
Schema: p.config.Schema,
Fifo: p.config.Fifo,
})
if err != nil {
p.Logger.Error(p.Name+": Error fetching jobs", slog.String("err", err.Error()), slog.String("queue", p.config.Queue))
Expand Down
1 change: 1 addition & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type JobGetAvailableParams struct {
ProducerID int64
Queue string
Schema string
Fifo bool
}

type JobGetByIDParams struct {
Expand Down
104 changes: 100 additions & 4 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 24 additions & 10 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,31 @@ func (e *Executor) JobDeleteMany(ctx context.Context, params *riverdriver.JobDel
}

func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
jobs, err := dbsqlc.New().JobGetAvailable(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableParams{
AttemptedBy: params.ClientID,
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec
MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec
Now: params.Now,
Queue: params.Queue,
})
if err != nil {
return nil, interpretError(err)
if params.Fifo {
jobs, err := dbsqlc.New().JobGetAvailableFifo(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableFifoParams{
AttemptedBy: params.ClientID,
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec
MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec
Now: params.Now,
Queue: params.Queue,
})
if err != nil {
return nil, interpretError(err)
}
return sliceutil.MapError(jobs, jobRowFromInternal)
} else {
jobs, err := dbsqlc.New().JobGetAvailableLifo(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobGetAvailableLifoParams{
AttemptedBy: params.ClientID,
MaxAttemptedBy: int32(min(params.MaxAttemptedBy, math.MaxInt32)), //nolint:gosec
MaxToLock: int32(min(params.MaxToLock, math.MaxInt32)), //nolint:gosec
Now: params.Now,
Queue: params.Queue,
})
if err != nil {
return nil, interpretError(err)
}
return sliceutil.MapError(jobs, jobRowFromInternal)
}
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) {
Expand Down
41 changes: 40 additions & 1 deletion riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ FROM /* TEMPLATE: schema */river_job
WHERE id IN (SELECT id FROM deleted_jobs)
ORDER BY /* TEMPLATE_BEGIN: order_by_clause */ id /* TEMPLATE_END */;

-- name: JobGetAvailable :many
-- name: JobGetAvailableLifo :many
WITH locked_jobs AS (
SELECT
*
Expand Down Expand Up @@ -184,6 +184,45 @@ WHERE
RETURNING
river_job.*;

-- name: JobGetAvailableFifo :many
WITH locked_jobs AS (
SELECT
*
FROM
/* TEMPLATE: schema */river_job
WHERE
state = 'available'
AND queue = @queue::text
AND scheduled_at <= coalesce(sqlc.narg('now')::timestamptz, now())
ORDER BY
priority ASC,
scheduled_at DESC,
id ASC
LIMIT @max_to_lock::integer
FOR UPDATE
SKIP LOCKED
)
UPDATE
/* TEMPLATE: schema */river_job
SET
state = 'running',
attempt = river_job.attempt + 1,
attempted_at = coalesce(sqlc.narg('now')::timestamptz, now()),
attempted_by = array_append(
CASE WHEN array_length(river_job.attempted_by, 1) >= @max_attempted_by::int
-- +2 instead of +1 because Postgres array indexing starts at 1, not 0.
THEN river_job.attempted_by[array_length(river_job.attempted_by, 1) + 2 - @max_attempted_by:]
ELSE river_job.attempted_by
END,
@attempted_by::text
)
FROM
locked_jobs
WHERE
river_job.id = locked_jobs.id
RETURNING
river_job.*;

-- name: JobGetByID :one
SELECT *
FROM /* TEMPLATE: schema */river_job
Expand Down
101 changes: 97 additions & 4 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading