Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.23.0 is compatible with River Pro v0.15.0.

**Terminal UI:** @almottier wrote a very cool [terminal UI for River](https://github.com/almottier/rivertui) featuring real-time job monitoring with automatic refresh, job filtering, a job details view providing detailed information (plus look up by ID in the UI or by command line argument), and job actions like retry and cancellation. And as good as all that might sound, go take a look because it's even better in person.

### Added
Expand All @@ -23,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Optimized the job completer's query `JobSetStateIfRunningMany`, resulting in an approximately 15% reduction in its duration when completing 2000 jobs, and around a 15-20% increase in `riverbench` throughput. [PR #904](https://github.com/riverqueue/river/pull/904).
- `TimeStub` has been removed from the `rivertest` package. Its original inclusion was entirely accidentally and it should be considered entirely an internal API. [PR #912](https://github.com/riverqueue/river/pull/912).
- When storing job-persisted logging with `riverlog`, if a work run's logging was completely empty, no metadata value is stored at all (previously, an empty value was stored). [PR #919](https://github.com/riverqueue/river/pull/919).
- Changed the internal integration APIs for River Pro. River Pro users must upgrade both libraries as part of this update. [PR #929](https://github.com/riverqueue/river/pull/929).

### Fixed

Expand All @@ -33,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Return specific explanatory error when attempting to use `JobListParams.Metadata` with `JobListTx` on SQLite. [PR #924](https://github.com/riverqueue/river/pull/924).
- The reindexer now skips work if artifacts from a failed reindex are present under the assumption that if they are, a new reindex build is likely to fail again. Context cancel timeout is increased from 15 seconds to 1 minute, allowing more time for reindexes to finish. Timeout becomes configurable with `Config.ReindexerTimeout`. [PR #935](https://github.com/riverqueue/river/pull/935).
- Accessing `Client.PeriodicJobs()` on an insert-only client now panics with a more helpful explanatory error message rather than an unhelpful nil pointer panic. [PR #938](https://github.com/riverqueue/river/pull/938).
- Return an error when adding a new queue at runtime via the `QueueBundle` if that queue was already added. [PR #929](https://github.com/riverqueue/river/pull/929).

## [0.22.0] - 2025-05-10

Expand Down
28 changes: 19 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,11 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
testSignals: clientTestSignals{},
workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up
}
client.queues = &QueueBundle{addProducer: client.addProducer, clientWillExecuteJobs: config.willExecuteJobs()}

client.queues = &QueueBundle{
addProducer: client.addProducer,
clientWillExecuteJobs: config.willExecuteJobs(),
}

baseservice.Init(archetype, &client.baseService)
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
Expand Down Expand Up @@ -764,7 +768,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
client.services = append(client.services, client.elector)

for queue, queueConfig := range config.Queues {
client.addProducer(queue, queueConfig)
if _, err := client.addProducer(queue, queueConfig); err != nil {
return nil, err
}
}

client.services = append(client.services,
Expand Down Expand Up @@ -1988,7 +1994,11 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
return nil
}

func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer {
func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*producer, error) {
if _, alreadyExists := c.producersByQueueName[queueName]; alreadyExists {
return nil, &QueueAlreadyAddedError{Name: queueName}
}

producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), c.pilot, &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
Expand All @@ -2011,7 +2021,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr
Workers: c.config.Workers,
})
c.producersByQueueName[queueName] = producer
return producer
return producer, nil
}

var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)
Expand Down Expand Up @@ -2467,7 +2477,7 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex
// through Client.Queues.
type QueueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) *producer
addProducer func(queueName string, queueConfig QueueConfig) (*producer, error)

clientWillExecuteJobs bool

Expand All @@ -2484,9 +2494,6 @@ type QueueBundle struct {
// Add adds a new queue to the client. If the client is already started, a
// producer for the queue is started. Context is inherited from the one given to
// Client.Start.
//
// TODO: there is no way for this to work at runtime using a separate pro queue
// config, unless we put pro configs like concurrency within QueueConfig.
func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
if !b.clientWillExecuteJobs {
return errors.New("client is not configured to execute jobs, cannot add queue")
Expand All @@ -2499,7 +2506,10 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
b.startStopMu.Lock()
defer b.startStopMu.Unlock()

producer := b.addProducer(queueName, queueConfig)
producer, err := b.addProducer(queueName, queueConfig)
if err != nil {
return err
}

// Start the queue if the client is already started.
if b.fetchCtx != nil && b.fetchCtx.Err() == nil {
Expand Down
31 changes: 28 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,31 @@ func Test_Client(t *testing.T) {
riversharedtest.WaitOrTimeout(t, workedChan)
})

t.Run("Queues_Add_AlreadyAdded", func(t *testing.T) {
t.Parallel()

// Test two scenarios: when the queue was already added before the client
// was created, and when the queue was added after the client was created.
// Both should error.

config, bundle := setupConfig(t)
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}}
client := newTestClient(t, bundle.dbPool, config)

err := client.Queues().Add(QueueDefault, QueueConfig{MaxWorkers: 2})
require.Error(t, err)
var alreadyAddedErr *QueueAlreadyAddedError
require.ErrorAs(t, err, &alreadyAddedErr)
require.Equal(t, QueueDefault, alreadyAddedErr.Name)

require.NoError(t, client.Queues().Add("new_queue", QueueConfig{MaxWorkers: 2}))

err = client.Queues().Add("new_queue", QueueConfig{MaxWorkers: 2})
require.Error(t, err)
require.ErrorAs(t, err, &alreadyAddedErr)
require.Equal(t, "new_queue", alreadyAddedErr.Name)
})

t.Run("Queues_Add_Stress", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -6398,7 +6423,7 @@ func Test_NewClient_Defaults(t *testing.T) {
workers := NewWorkers()
AddWorker(workers, &noOpWorker{})

client, err := NewClient(riverpgxv5.New(dbPool), &Config{
client, err := NewClient(driver, &Config{
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Schema: schema,
Workers: workers,
Expand Down Expand Up @@ -6469,7 +6494,7 @@ func Test_NewClient_Overrides(t *testing.T) {
WorkerMiddlewareDefaults
}

client, err := NewClient(riverpgxv5.New(dbPool), &Config{
client, err := NewClient(driver, &Config{
AdvisoryLockPrefix: 123_456,
CancelledJobRetentionPeriod: 1 * time.Hour,
CompletedJobRetentionPeriod: 2 * time.Hour,
Expand Down Expand Up @@ -6903,7 +6928,7 @@ func Test_NewClient_Validations(t *testing.T) {
}
tt.configFunc(config)

client, err := NewClient(riverpgxv5.New(dbPool), config)
client, err := NewClient(driver, config)
if tt.wantErr != nil {
require.Error(t, err)
require.ErrorContains(t, err, tt.wantErr.Error())
Expand Down
16 changes: 16 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package river

import (
"fmt"
"time"

"github.com/riverqueue/river/rivertype"
Expand Down Expand Up @@ -36,6 +37,21 @@ func JobSnooze(duration time.Duration) error {
return &rivertype.JobSnoozeError{Duration: duration}
}

// QueueAlreadyAddedError is returned when attempting to add a queue that has
// already been added to the Client.
type QueueAlreadyAddedError struct {
Name string
}

func (e *QueueAlreadyAddedError) Error() string {
return fmt.Sprintf("queue %q already added", e.Name)
}

func (e *QueueAlreadyAddedError) Is(target error) bool {
_, ok := target.(*QueueAlreadyAddedError)
return ok
}

// UnknownJobKindError is returned when a Client fetches and attempts to
// work a job that has not been registered on the Client's Workers bundle (using AddWorker).
type UnknownJobKindError = rivertype.UnknownJobKindError
20 changes: 12 additions & 8 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,21 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error {
initialMetadata := []byte("{}")
if fetchedQueue != nil {
initialMetadata = fetchedQueue.Metadata
if err := p.pilot.QueueMetadataChanged(fetchCtx, p.exec, &riverpilot.QueueMetadataChangedParams{
Queue: p.config.Queue,
Metadata: initialMetadata,
}); err != nil {
p.Logger.ErrorContext(fetchCtx, p.Name+": Error setting fetched queue metadata with pilot", slog.String("queue", p.config.Queue), slog.String("err", err.Error()))
}
}
p.paused = initiallyPaused

id := p.id.Load()
id, p.state, err = p.pilot.ProducerInit(fetchCtx, p.exec, &riverpilot.ProducerInitParams{
ClientID: p.config.ClientID,
ProducerID: id,
Queue: p.config.Queue,
QueueMetadata: initialMetadata,
Schema: p.config.Schema,
ClientID: p.config.ClientID,
ProducerID: id,
Queue: p.config.Queue,
Schema: p.config.Schema,
})
if err != nil {
stopped()
Expand Down Expand Up @@ -539,9 +544,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) {
p.Logger.DebugContext(workCtx, p.Name+": Queue metadata changed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue))
p.testSignals.MetadataChanged.Signal(struct{}{})
if err := p.pilot.QueueMetadataChanged(workCtx, p.exec, &riverpilot.QueueMetadataChangedParams{
Queue: p.config.Queue,
Metadata: msg.Metadata,
Schema: p.config.Schema,
State: p.state,
}); err != nil {
p.Logger.ErrorContext(workCtx, p.Name+": Error updating queue metadata with pilot", slog.String("queue", p.config.Queue), slog.String("err", err.Error()))
}
Expand Down Expand Up @@ -658,8 +662,8 @@ func (p *producer) finalizeShutdown(ctx context.Context) {

if err := p.pilot.ProducerShutdown(ctx, p.exec, &riverpilot.ProducerShutdownParams{
ProducerID: p.id.Load(),
Queue: p.config.Queue,
Schema: p.config.Schema,
State: p.state,
}); err != nil {
// Don't retry on these errors:
// - context.Canceled: parent context is canceled, so retrying with a new timeout won't help
Expand Down
14 changes: 6 additions & 8 deletions rivershared/riverpilot/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,19 @@ type ProducerState interface {
}

type ProducerInitParams struct {
ClientID string
ProducerID int64
Queue string
QueueMetadata []byte
Schema string
ClientID string
ProducerID int64
Queue string
Schema string
}

type ProducerShutdownParams struct {
ProducerID int64
Queue string
Schema string
State ProducerState
}

type QueueMetadataChangedParams struct {
Queue string
Metadata []byte
Schema string
State ProducerState
}