diff --git a/CHANGELOG.md b/CHANGELOG.md index 33031e34..6597ff70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.23.0 is compatible with River Pro v0.15.0. + **Terminal UI:** @almottier wrote a very cool [terminal UI for River](https://github.com/almottier/rivertui) featuring real-time job monitoring with automatic refresh, job filtering, a job details view providing detailed information (plus look up by ID in the UI or by command line argument), and job actions like retry and cancellation. And as good as all that might sound, go take a look because it's even better in person. ### Added @@ -23,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Optimized the job completer's query `JobSetStateIfRunningMany`, resulting in an approximately 15% reduction in its duration when completing 2000 jobs, and around a 15-20% increase in `riverbench` throughput. [PR #904](https://github.com/riverqueue/river/pull/904). - `TimeStub` has been removed from the `rivertest` package. Its original inclusion was entirely accidentally and it should be considered entirely an internal API. [PR #912](https://github.com/riverqueue/river/pull/912). - When storing job-persisted logging with `riverlog`, if a work run's logging was completely empty, no metadata value is stored at all (previously, an empty value was stored). [PR #919](https://github.com/riverqueue/river/pull/919). +- Changed the internal integration APIs for River Pro. River Pro users must upgrade both libraries as part of this update. [PR #929](https://github.com/riverqueue/river/pull/929). ### Fixed @@ -33,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Return specific explanatory error when attempting to use `JobListParams.Metadata` with `JobListTx` on SQLite. [PR #924](https://github.com/riverqueue/river/pull/924). - The reindexer now skips work if artifacts from a failed reindex are present under the assumption that if they are, a new reindex build is likely to fail again. Context cancel timeout is increased from 15 seconds to 1 minute, allowing more time for reindexes to finish. Timeout becomes configurable with `Config.ReindexerTimeout`. [PR #935](https://github.com/riverqueue/river/pull/935). - Accessing `Client.PeriodicJobs()` on an insert-only client now panics with a more helpful explanatory error message rather than an unhelpful nil pointer panic. [PR #938](https://github.com/riverqueue/river/pull/938). +- Return an error when adding a new queue at runtime via the `QueueBundle` if that queue was already added. [PR #929](https://github.com/riverqueue/river/pull/929). ## [0.22.0] - 2025-05-10 diff --git a/client.go b/client.go index 8a432444..117ed4e0 100644 --- a/client.go +++ b/client.go @@ -678,7 +678,11 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client testSignals: clientTestSignals{}, workCancel: func(cause error) {}, // replaced on start, but here in case StopAndCancel is called before start up } - client.queues = &QueueBundle{addProducer: client.addProducer, clientWillExecuteJobs: config.willExecuteJobs()} + + client.queues = &QueueBundle{ + addProducer: client.addProducer, + clientWillExecuteJobs: config.willExecuteJobs(), + } baseservice.Init(archetype, &client.baseService) client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is @@ -764,7 +768,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, client.elector) for queue, queueConfig := range config.Queues { - client.addProducer(queue, queueConfig) + if _, err := client.addProducer(queue, queueConfig); err != nil { + return nil, err + } } client.services = append(client.services, @@ -1988,7 +1994,11 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error { return nil } -func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer { +func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*producer, error) { + if _, alreadyExists := c.producersByQueueName[queueName]; alreadyExists { + return nil, &QueueAlreadyAddedError{Name: queueName} + } + producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), c.pilot, &producerConfig{ ClientID: c.config.ID, Completer: c.completer, @@ -2011,7 +2021,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr Workers: c.config.Workers, }) c.producersByQueueName[queueName] = producer - return producer + return producer, nil } var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`) @@ -2467,7 +2477,7 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex // through Client.Queues. type QueueBundle struct { // Function that adds a producer to the associated client. - addProducer func(queueName string, queueConfig QueueConfig) *producer + addProducer func(queueName string, queueConfig QueueConfig) (*producer, error) clientWillExecuteJobs bool @@ -2484,9 +2494,6 @@ type QueueBundle struct { // Add adds a new queue to the client. If the client is already started, a // producer for the queue is started. Context is inherited from the one given to // Client.Start. -// -// TODO: there is no way for this to work at runtime using a separate pro queue -// config, unless we put pro configs like concurrency within QueueConfig. func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { if !b.clientWillExecuteJobs { return errors.New("client is not configured to execute jobs, cannot add queue") @@ -2499,7 +2506,10 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { b.startStopMu.Lock() defer b.startStopMu.Unlock() - producer := b.addProducer(queueName, queueConfig) + producer, err := b.addProducer(queueName, queueConfig) + if err != nil { + return err + } // Start the queue if the client is already started. if b.fetchCtx != nil && b.fetchCtx.Err() == nil { diff --git a/client_test.go b/client_test.go index c18267ba..0ff72247 100644 --- a/client_test.go +++ b/client_test.go @@ -335,6 +335,31 @@ func Test_Client(t *testing.T) { riversharedtest.WaitOrTimeout(t, workedChan) }) + t.Run("Queues_Add_AlreadyAdded", func(t *testing.T) { + t.Parallel() + + // Test two scenarios: when the queue was already added before the client + // was created, and when the queue was added after the client was created. + // Both should error. + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}} + client := newTestClient(t, bundle.dbPool, config) + + err := client.Queues().Add(QueueDefault, QueueConfig{MaxWorkers: 2}) + require.Error(t, err) + var alreadyAddedErr *QueueAlreadyAddedError + require.ErrorAs(t, err, &alreadyAddedErr) + require.Equal(t, QueueDefault, alreadyAddedErr.Name) + + require.NoError(t, client.Queues().Add("new_queue", QueueConfig{MaxWorkers: 2})) + + err = client.Queues().Add("new_queue", QueueConfig{MaxWorkers: 2}) + require.Error(t, err) + require.ErrorAs(t, err, &alreadyAddedErr) + require.Equal(t, "new_queue", alreadyAddedErr.Name) + }) + t.Run("Queues_Add_Stress", func(t *testing.T) { t.Parallel() @@ -6398,7 +6423,7 @@ func Test_NewClient_Defaults(t *testing.T) { workers := NewWorkers() AddWorker(workers, &noOpWorker{}) - client, err := NewClient(riverpgxv5.New(dbPool), &Config{ + client, err := NewClient(driver, &Config{ Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, Schema: schema, Workers: workers, @@ -6469,7 +6494,7 @@ func Test_NewClient_Overrides(t *testing.T) { WorkerMiddlewareDefaults } - client, err := NewClient(riverpgxv5.New(dbPool), &Config{ + client, err := NewClient(driver, &Config{ AdvisoryLockPrefix: 123_456, CancelledJobRetentionPeriod: 1 * time.Hour, CompletedJobRetentionPeriod: 2 * time.Hour, @@ -6903,7 +6928,7 @@ func Test_NewClient_Validations(t *testing.T) { } tt.configFunc(config) - client, err := NewClient(riverpgxv5.New(dbPool), config) + client, err := NewClient(driver, config) if tt.wantErr != nil { require.Error(t, err) require.ErrorContains(t, err, tt.wantErr.Error()) diff --git a/error.go b/error.go index ecee59fa..a768d7f0 100644 --- a/error.go +++ b/error.go @@ -1,6 +1,7 @@ package river import ( + "fmt" "time" "github.com/riverqueue/river/rivertype" @@ -36,6 +37,21 @@ func JobSnooze(duration time.Duration) error { return &rivertype.JobSnoozeError{Duration: duration} } +// QueueAlreadyAddedError is returned when attempting to add a queue that has +// already been added to the Client. +type QueueAlreadyAddedError struct { + Name string +} + +func (e *QueueAlreadyAddedError) Error() string { + return fmt.Sprintf("queue %q already added", e.Name) +} + +func (e *QueueAlreadyAddedError) Is(target error) bool { + _, ok := target.(*QueueAlreadyAddedError) + return ok +} + // UnknownJobKindError is returned when a Client fetches and attempts to // work a job that has not been registered on the Client's Workers bundle (using AddWorker). type UnknownJobKindError = rivertype.UnknownJobKindError diff --git a/producer.go b/producer.go index 86fce3ad..93db03e6 100644 --- a/producer.go +++ b/producer.go @@ -307,16 +307,21 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { initialMetadata := []byte("{}") if fetchedQueue != nil { initialMetadata = fetchedQueue.Metadata + if err := p.pilot.QueueMetadataChanged(fetchCtx, p.exec, &riverpilot.QueueMetadataChangedParams{ + Queue: p.config.Queue, + Metadata: initialMetadata, + }); err != nil { + p.Logger.ErrorContext(fetchCtx, p.Name+": Error setting fetched queue metadata with pilot", slog.String("queue", p.config.Queue), slog.String("err", err.Error())) + } } p.paused = initiallyPaused id := p.id.Load() id, p.state, err = p.pilot.ProducerInit(fetchCtx, p.exec, &riverpilot.ProducerInitParams{ - ClientID: p.config.ClientID, - ProducerID: id, - Queue: p.config.Queue, - QueueMetadata: initialMetadata, - Schema: p.config.Schema, + ClientID: p.config.ClientID, + ProducerID: id, + Queue: p.config.Queue, + Schema: p.config.Schema, }) if err != nil { stopped() @@ -539,9 +544,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { p.Logger.DebugContext(workCtx, p.Name+": Queue metadata changed", slog.String("queue", p.config.Queue), slog.String("queue_in_message", msg.Queue)) p.testSignals.MetadataChanged.Signal(struct{}{}) if err := p.pilot.QueueMetadataChanged(workCtx, p.exec, &riverpilot.QueueMetadataChangedParams{ + Queue: p.config.Queue, Metadata: msg.Metadata, - Schema: p.config.Schema, - State: p.state, }); err != nil { p.Logger.ErrorContext(workCtx, p.Name+": Error updating queue metadata with pilot", slog.String("queue", p.config.Queue), slog.String("err", err.Error())) } @@ -658,8 +662,8 @@ func (p *producer) finalizeShutdown(ctx context.Context) { if err := p.pilot.ProducerShutdown(ctx, p.exec, &riverpilot.ProducerShutdownParams{ ProducerID: p.id.Load(), + Queue: p.config.Queue, Schema: p.config.Schema, - State: p.state, }); err != nil { // Don't retry on these errors: // - context.Canceled: parent context is canceled, so retrying with a new timeout won't help diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index a6cee157..04a1c634 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -50,21 +50,19 @@ type ProducerState interface { } type ProducerInitParams struct { - ClientID string - ProducerID int64 - Queue string - QueueMetadata []byte - Schema string + ClientID string + ProducerID int64 + Queue string + Schema string } type ProducerShutdownParams struct { ProducerID int64 + Queue string Schema string - State ProducerState } type QueueMetadataChangedParams struct { + Queue string Metadata []byte - Schema string - State ProducerState }