Skip to content

Commit 3f9a596

Browse files
committed
Add pilot hooks for periodic jobs
Add pilot hooks for periodic jobs, thereby giving River plugins the capability to extend certain parts of the periodic job enqueuer.
1 parent 7f2c72f commit 3f9a596

11 files changed

Lines changed: 589 additions & 91 deletions

File tree

client.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -485,14 +485,14 @@ func (c *Config) validate() error {
485485
if c.Workers != nil {
486486
for _, workerInfo := range c.Workers.workersMap {
487487
kind := workerInfo.jobArgs.Kind()
488-
if !jobKindRE.MatchString(kind) {
488+
if !rivercommon.UserSpecifiedIDOrKindRE.MatchString(kind) {
489489
if c.SkipJobKindValidation {
490490
c.Logger.Warn("job kind should match regex; this will be an error in future versions",
491491
slog.String("kind", kind),
492-
slog.String("regex", jobKindRE.String()),
492+
slog.String("regex", rivercommon.UserSpecifiedIDOrKindRE.String()),
493493
)
494494
} else {
495-
return fmt.Errorf("job kind %q should match regex %q", kind, jobKindRE.String())
495+
return fmt.Errorf("job kind %q should match regex %s", kind, rivercommon.UserSpecifiedIDOrKindRE.String())
496496
}
497497
}
498498
}
@@ -828,13 +828,18 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
828828
}
829829

830830
{
831-
periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
831+
periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
832832
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
833833
Insert: func(ctx context.Context, execTx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error {
834834
_, err := client.insertMany(ctx, execTx, insertParams)
835835
return err
836836
},
837+
Pilot: client.pilot,
838+
Schema: config.Schema,
837839
}, driver.GetExecutor())
840+
if err != nil {
841+
return nil, err
842+
}
838843
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
839844
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals
840845

@@ -2521,11 +2526,6 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
25212526
return nil
25222527
}
25232528

2524-
// Regular expression to which the format of job kinds must comply. Mainly,
2525-
// minimal special characters, and excluding spaces and commas which are
2526-
// problematic for the search UI.
2527-
var jobKindRE = regexp.MustCompile(`\A[\w][\w\-\[\]<>\/.·:+]+\z`)
2528-
25292529
// Generates a default client ID using the current hostname and time.
25302530
func defaultClientID(startedAt time.Time) string {
25312531
host, _ := os.Hostname()

client_test.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6892,7 +6892,7 @@ func Test_NewClient_Validations(t *testing.T) {
68926892
configFunc: func(config *Config) {
68936893
AddWorker(config.Workers, &invalidKindWorker{})
68946894
},
6895-
wantErr: fmt.Errorf("job kind %q should match regex %q", "this kind is invalid", jobKindRE.String()),
6895+
wantErr: fmt.Errorf("job kind %q should match regex %s", "this kind is invalid", rivercommon.UserSpecifiedIDOrKindRE.String()),
68966896
},
68976897
{
68986898
name: "Job kind validation skipped with SkipJobKindValidation",
@@ -7606,23 +7606,3 @@ func TestDefaultClientIDWithHost(t *testing.T) {
76067606
require.Equal(t, strings.Repeat("a", 60)+"_2024_03_07T04_39_12_123456", defaultClientIDWithHost(startedAt, strings.Repeat("a", 60)))
76077607
require.Equal(t, strings.Repeat("a", 60)+"_2024_03_07T04_39_12_123456", defaultClientIDWithHost(startedAt, strings.Repeat("a", 61)))
76087608
}
7609-
7610-
func TestJobKindRE(t *testing.T) {
7611-
t.Parallel()
7612-
7613-
require.Regexp(t, jobKindRE, "kind")
7614-
require.Regexp(t, jobKindRE, "kind123")
7615-
require.Regexp(t, jobKindRE, "with.dot")
7616-
require.Regexp(t, jobKindRE, "with:colon")
7617-
require.Regexp(t, jobKindRE, "with+plus")
7618-
require.Regexp(t, jobKindRE, "with-hyphen")
7619-
require.Regexp(t, jobKindRE, "with_underscore")
7620-
require.Regexp(t, jobKindRE, "with[brackets]")
7621-
require.Regexp(t, jobKindRE, "with<triangle_brackets>")
7622-
require.Regexp(t, jobKindRE, "with/slash")
7623-
require.Regexp(t, jobKindRE, "JobArgsReflectKind[github.com/riverqueue/river.JobArgs·12]")
7624-
7625-
require.NotRegexp(t, jobKindRE, "with space")
7626-
require.NotRegexp(t, jobKindRE, "with,comma")
7627-
require.NotRegexp(t, jobKindRE, ":no_leading_special_characters")
7628-
}

0 commit comments

Comments
 (0)