-
Notifications
You must be signed in to change notification settings - Fork 132
Add leadership "domains" so multiple Rivers can operate in one schema #1113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -9,6 +9,7 @@ import ( | |||||
| "log/slog" | ||||||
| "os" | ||||||
| "regexp" | ||||||
| "slices" | ||||||
| "strings" | ||||||
| "sync" | ||||||
| "time" | ||||||
|
|
@@ -209,6 +210,47 @@ type Config struct { | |||||
| // Jobs may have their own specific hooks by implementing JobArgsWithHooks. | ||||||
| Hooks []rivertype.Hook | ||||||
|
|
||||||
| // LeaderDomain is an optional "domain" string to use for leader election. | ||||||
| // Different clients sharing the same River schema can elect multiple | ||||||
| // leaders as long as they're using different domains, with one leader | ||||||
| // elected per domain. | ||||||
| // | ||||||
| // Setting this value also triggers the related behavior that maintenance | ||||||
| // services start to only operate on the queues they're configured on. So | ||||||
| // for example, given client1 handling queue_a and queue_b and client2 | ||||||
| // handling queue_c and queue_d, whichever client is elected leader will end | ||||||
| // up running all maintenance services for all queues (queue_a, queue_b, | ||||||
| // queue_c, and queue_d). But if client1 is using domain "domain1" and | ||||||
| // client2 is using domain "domain2", then client1 (elected in domain1) will | ||||||
| // only run maintenance services on queue_a and queue_b, while client2 | ||||||
| // (elected in domain2) will run maintenance services on queue_c and | ||||||
| // queue_d. | ||||||
| // | ||||||
| // A warning though that River *does not protect against configuration | ||||||
| // mistakes*. If client1 on domain1 is configured for queue_a and queue_b, | ||||||
| // and client2 on domain2 is *also* configured for queue_a and queue_b, then | ||||||
| // both clients may end up running maintenance services on the same queues | ||||||
| // at the same time. It's the caller's responsibility to ensure that doesn't | ||||||
| // happen. | ||||||
| // | ||||||
| // Left empty or use of the special value "default" causes the client to | ||||||
| // operate on all queues. When setting this value to non-empty | ||||||
| // non-"default", no other clients should be left empty or use "default" | ||||||
| // because the default client(s) will infringe on the domains of the | ||||||
| // non-default one(s). | ||||||
| // | ||||||
| // Certain maintenance services that aren't queue-related like the indexer | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| // will continue to run on all leaders regardless of domain. If using this | ||||||
| // feature, it's a good idea to configure ReindexerTimeout on all but a | ||||||
| // single leader domain to river.NeverSchedule(). | ||||||
| // | ||||||
| // In general, most River users should not need LeaderDomain, and when | ||||||
| // running multiple Rivers may want to consider using multiple databases and | ||||||
| // multiple schemas instead. | ||||||
|
Comment on lines
+247
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would probably try to get some version of this warning in the first paragraph to dissuade people from using this feature. Could describe it as "an advanced option" or something like that. I would just like to ensure people don't start using this setting automatically just bc it's there because it has some major footguns. |
||||||
| // | ||||||
| // Defaults to "default". | ||||||
| LeaderDomain string | ||||||
|
|
||||||
| // Logger is the structured logger to use for logging purposes. If none is | ||||||
| // specified, logs will be emitted to STDOUT with messages at warn level | ||||||
| // or higher. | ||||||
|
|
@@ -415,6 +457,7 @@ func (c *Config) WithDefaults() *Config { | |||||
| Hooks: c.Hooks, | ||||||
| JobInsertMiddleware: c.JobInsertMiddleware, | ||||||
| JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault), | ||||||
| LeaderDomain: c.LeaderDomain, | ||||||
| Logger: logger, | ||||||
| MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault), | ||||||
| Middleware: c.Middleware, | ||||||
|
|
@@ -840,6 +883,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |||||
|
|
||||||
| client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{ | ||||||
| ClientID: config.ID, | ||||||
| Domain: config.LeaderDomain, | ||||||
| Schema: config.Schema, | ||||||
| }) | ||||||
| client.services = append(client.services, client.elector) | ||||||
|
|
@@ -860,6 +904,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |||||
| client.services = append(client.services, pluginPilot.PluginServices()...) | ||||||
| } | ||||||
|
|
||||||
| // It's important for queuesIncluded to be `nil` in case it's not in use | ||||||
| // for the various driver queries to work correctly. | ||||||
|
Comment on lines
+907
to
+908
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe the driver layer should handle |
||||||
| var queuesIncluded []string | ||||||
| if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 { | ||||||
| queuesIncluded = maputil.Keys(config.Queues) | ||||||
| slices.Sort(queuesIncluded) | ||||||
| } | ||||||
|
|
||||||
| // | ||||||
| // Maintenance services | ||||||
| // | ||||||
|
|
@@ -872,6 +924,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |||||
| CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod, | ||||||
| DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod, | ||||||
| QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(), | ||||||
| QueuesIncluded: queuesIncluded, | ||||||
| Schema: config.Schema, | ||||||
| Timeout: config.JobCleanerTimeout, | ||||||
| }, driver.GetExecutor()) | ||||||
|
|
@@ -882,6 +935,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |||||
| { | ||||||
| jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ | ||||||
| ClientRetryPolicy: config.RetryPolicy, | ||||||
| QueuesIncluded: queuesIncluded, | ||||||
| RescueAfter: config.RescueStuckJobsAfter, | ||||||
| Schema: config.Schema, | ||||||
| WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { | ||||||
|
|
@@ -897,9 +951,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |||||
|
|
||||||
| { | ||||||
| jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{ | ||||||
| Interval: config.schedulerInterval, | ||||||
| NotifyInsert: client.maybeNotifyInsertForQueues, | ||||||
| Schema: config.Schema, | ||||||
| Interval: config.schedulerInterval, | ||||||
| NotifyInsert: client.maybeNotifyInsertForQueues, | ||||||
| QueuesIncluded: queuesIncluded, | ||||||
| Schema: config.Schema, | ||||||
| }, driver.GetExecutor()) | ||||||
| maintenanceServices = append(maintenanceServices, jobScheduler) | ||||||
| client.testSignals.jobScheduler = &jobScheduler.TestSignals | ||||||
|
|
@@ -925,6 +980,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |||||
|
|
||||||
| { | ||||||
| queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{ | ||||||
| QueuesIncluded: queuesIncluded, | ||||||
| RetentionPeriod: maintenance.QueueRetentionPeriodDefault, | ||||||
| Schema: config.Schema, | ||||||
| }, driver.GetExecutor()) | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotta say this definitely feels dangerous. I'm wondering what else can break if we allow for breaking one of the main promises of leader election (there can only be one), including Pro features.