From 8ade8c85523960a1b21a8af28a41c61c8fe699fa Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 27 May 2025 18:55:18 -0600 Subject: [PATCH 1/2] Add pilot hooks for periodic jobs Add pilot hooks for periodic jobs, thereby giving River plugins the capability to extend certain parts of the periodic job enqueuer. --- client.go | 18 +- client_test.go | 22 +- internal/maintenance/periodic_job_enqueuer.go | 180 ++++++++++-- .../maintenance/periodic_job_enqueuer_test.go | 264 ++++++++++++++++-- internal/maintenance/queue_maintainer_test.go | 23 +- internal/rivercommon/river_common.go | 7 + internal/rivercommon/river_common_test.go | 27 ++ periodic_job.go | 32 ++- periodic_job_test.go | 45 ++- rivershared/riverpilot/pilot.go | 50 ++++ rivershared/riverpilot/standard.go | 12 + 11 files changed, 589 insertions(+), 91 deletions(-) create mode 100644 internal/rivercommon/river_common_test.go diff --git a/client.go b/client.go index 117ed4e0..ae5ac01e 100644 --- a/client.go +++ b/client.go @@ -485,14 +485,14 @@ func (c *Config) validate() error { if c.Workers != nil { for _, workerInfo := range c.Workers.workersMap { kind := workerInfo.jobArgs.Kind() - if !jobKindRE.MatchString(kind) { + if !rivercommon.UserSpecifiedIDOrKindRE.MatchString(kind) { if c.SkipJobKindValidation { c.Logger.Warn("job kind should match regex; this will be an error in future versions", slog.String("kind", kind), - slog.String("regex", jobKindRE.String()), + slog.String("regex", rivercommon.UserSpecifiedIDOrKindRE.String()), ) } else { - return fmt.Errorf("job kind %q should match regex %q", kind, jobKindRE.String()) + return fmt.Errorf("job kind %q should match regex %s", kind, rivercommon.UserSpecifiedIDOrKindRE.String()) } } } @@ -828,13 +828,18 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client } { - periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{ + periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, Insert: func(ctx context.Context, execTx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) error { _, err := client.insertMany(ctx, execTx, insertParams) return err }, + Pilot: client.pilot, + Schema: config.Schema, }, driver.GetExecutor()) + if err != nil { + return nil, err + } maintenanceServices = append(maintenanceServices, periodicJobEnqueuer) client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals @@ -2521,11 +2526,6 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { return nil } -// Regular expression to which the format of job kinds must comply. Mainly, -// minimal special characters, and excluding spaces and commas which are -// problematic for the search UI. -var jobKindRE = regexp.MustCompile(`\A[\w][\w\-\[\]<>\/.·:+]+\z`) - // Generates a default client ID using the current hostname and time. func defaultClientID(startedAt time.Time) string { host, _ := os.Hostname() diff --git a/client_test.go b/client_test.go index 0ff72247..60a9df1f 100644 --- a/client_test.go +++ b/client_test.go @@ -6892,7 +6892,7 @@ func Test_NewClient_Validations(t *testing.T) { configFunc: func(config *Config) { AddWorker(config.Workers, &invalidKindWorker{}) }, - wantErr: fmt.Errorf("job kind %q should match regex %q", "this kind is invalid", jobKindRE.String()), + wantErr: fmt.Errorf("job kind %q should match regex %s", "this kind is invalid", rivercommon.UserSpecifiedIDOrKindRE.String()), }, { name: "Job kind validation skipped with SkipJobKindValidation", @@ -7606,23 +7606,3 @@ func TestDefaultClientIDWithHost(t *testing.T) { require.Equal(t, strings.Repeat("a", 60)+"_2024_03_07T04_39_12_123456", defaultClientIDWithHost(startedAt, strings.Repeat("a", 60))) require.Equal(t, strings.Repeat("a", 60)+"_2024_03_07T04_39_12_123456", defaultClientIDWithHost(startedAt, strings.Repeat("a", 61))) } - -func TestJobKindRE(t *testing.T) { - t.Parallel() - - require.Regexp(t, jobKindRE, "kind") - require.Regexp(t, jobKindRE, "kind123") - require.Regexp(t, jobKindRE, "with.dot") - require.Regexp(t, jobKindRE, "with:colon") - require.Regexp(t, jobKindRE, "with+plus") - require.Regexp(t, jobKindRE, "with-hyphen") - require.Regexp(t, jobKindRE, "with_underscore") - require.Regexp(t, jobKindRE, "with[brackets]") - require.Regexp(t, jobKindRE, "with") - require.Regexp(t, jobKindRE, "with/slash") - require.Regexp(t, jobKindRE, "JobArgsReflectKind[github.com/riverqueue/river.JobArgs·12]") - - require.NotRegexp(t, jobKindRE, "with space") - require.NotRegexp(t, jobKindRE, "with,comma") - require.NotRegexp(t, jobKindRE, ":no_leading_special_characters") -} diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index fe11a834..2343f566 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -3,17 +3,21 @@ package maintenance import ( "context" "errors" + "fmt" "slices" "sync" "time" "github.com/tidwall/sjson" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" "github.com/riverqueue/river/rivershared/util/maputil" + "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" "github.com/riverqueue/river/rivertype" ) @@ -39,6 +43,7 @@ func (ts *PeriodicJobEnqueuerTestSignals) Init(tb testutil.TestingTB) { // river.PeriodicJobArgs, but needs a separate type because the enqueuer is in a // subpackage. type PeriodicJob struct { + ID string ConstructorFunc func() (*rivertype.JobInsertParams, error) RunOnStart bool ScheduleFunc func(time.Time) time.Time @@ -47,14 +52,29 @@ type PeriodicJob struct { } func (j *PeriodicJob) mustValidate() *PeriodicJob { - if j.ScheduleFunc == nil { - panic("PeriodicJob.ScheduleFunc must be set") + if err := j.validate(); err != nil { + panic(err) + } + return j +} + +func (j *PeriodicJob) validate() error { + if j.ID != "" { + if len(j.ID) >= 128 { + return errors.New("PeriodicJob.ID must be less than 128 characters") + } + if !rivercommon.UserSpecifiedIDOrKindRE.MatchString(j.ID) { + return fmt.Errorf("PeriodicJob.ID %q should match regex %s", j.ID, rivercommon.UserSpecifiedIDOrKindRE.String()) + } } if j.ConstructorFunc == nil { - panic("PeriodicJob.ConstructorFunc must be set") + return errors.New("PeriodicJob.ConstructorFunc must be set") + } + if j.ScheduleFunc == nil { + return errors.New("PeriodicJob.ScheduleFunc must be set") } - return j + return nil } type PeriodicJobEnqueuerConfig struct { @@ -65,9 +85,17 @@ type PeriodicJobEnqueuerConfig struct { // PeriodicJobs are the periodic jobs with which to configure the enqueuer. PeriodicJobs []*PeriodicJob + + // Pilot is a plugin module providing additional non-standard functionality. + Pilot riverpilot.PilotPeriodicJob + + // Schema where River tables are located. Empty string omits schema, causing + // Postgres to default to `search_path`. + Schema string } func (c *PeriodicJobEnqueuerConfig) mustValidate() *PeriodicJobEnqueuerConfig { + // no validations currently return c } @@ -84,47 +112,69 @@ type PeriodicJobEnqueuer struct { exec riverdriver.Executor mu sync.RWMutex nextHandle rivertype.PeriodicJobHandle + periodicJobIDs map[string]struct{} periodicJobs map[rivertype.PeriodicJobHandle]*PeriodicJob recalculateNextRun chan struct{} } -func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, exec riverdriver.Executor) *PeriodicJobEnqueuer { +func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJobEnqueuerConfig, exec riverdriver.Executor) (*PeriodicJobEnqueuer, error) { var ( - nextHandle rivertype.PeriodicJobHandle - periodicJobs = make(map[rivertype.PeriodicJobHandle]*PeriodicJob, len(config.PeriodicJobs)) + nextHandle rivertype.PeriodicJobHandle + periodicJobIDs = make(map[string]struct{}) + periodicJobs = make(map[rivertype.PeriodicJobHandle]*PeriodicJob, len(config.PeriodicJobs)) ) for _, periodicJob := range config.PeriodicJobs { - periodicJob.mustValidate() + if err := periodicJob.validate(); err != nil { + return nil, err + } + + if err := addUniqueID(periodicJobIDs, periodicJob.ID); err != nil { + return nil, err + } periodicJobs[nextHandle] = periodicJob nextHandle++ } + pilot := config.Pilot + if pilot == nil { + pilot = &riverpilot.StandardPilot{} + } + svc := baseservice.Init(archetype, &PeriodicJobEnqueuer{ Config: (&PeriodicJobEnqueuerConfig{ AdvisoryLockPrefix: config.AdvisoryLockPrefix, Insert: config.Insert, PeriodicJobs: config.PeriodicJobs, + Pilot: pilot, + Schema: config.Schema, }).mustValidate(), exec: exec, nextHandle: nextHandle, + periodicJobIDs: periodicJobIDs, periodicJobs: periodicJobs, recalculateNextRun: make(chan struct{}, 1), }) - return svc + return svc, nil } -// Add adds a new periodic job to the enqueuer. The service's run loop is woken -// immediately so that the job is scheduled appropriately, and inserted if its -// RunOnStart flag is set to true. -func (s *PeriodicJobEnqueuer) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobHandle { +// AddSafely adds a new periodic job to the enqueuer. The service's run loop is +// woken immediately so that the job is scheduled appropriately, and inserted if +// its RunOnStart flag is set to true. +func (s *PeriodicJobEnqueuer) AddSafely(periodicJob *PeriodicJob) (rivertype.PeriodicJobHandle, error) { s.mu.Lock() defer s.mu.Unlock() - periodicJob.mustValidate() + if err := periodicJob.validate(); err != nil { + return 0, err + } + + if err := addUniqueID(s.periodicJobIDs, periodicJob.ID); err != nil { + return 0, err + } handle := s.nextHandle s.periodicJobs[handle] = periodicJob @@ -135,20 +185,26 @@ func (s *PeriodicJobEnqueuer) Add(periodicJob *PeriodicJob) rivertype.PeriodicJo default: } - return handle + return handle, nil } -// AddMany adds many new periodic job to the enqueuer. The service's run loop is +// AddManySafely adds many new periodic job to the enqueuer. The service's run loop is // woken immediately so that the job is scheduled appropriately, and inserted if // any RunOnStart flags are set to true. -func (s *PeriodicJobEnqueuer) AddMany(periodicJobs []*PeriodicJob) []rivertype.PeriodicJobHandle { +func (s *PeriodicJobEnqueuer) AddManySafely(periodicJobs []*PeriodicJob) ([]rivertype.PeriodicJobHandle, error) { s.mu.Lock() defer s.mu.Unlock() handles := make([]rivertype.PeriodicJobHandle, len(periodicJobs)) for i, periodicJob := range periodicJobs { - periodicJob.mustValidate() + if err := periodicJob.validate(); err != nil { + return nil, err + } + + if err := addUniqueID(s.periodicJobIDs, periodicJob.ID); err != nil { + return nil, err + } handles[i] = s.nextHandle s.periodicJobs[handles[i]] = periodicJob @@ -160,7 +216,7 @@ func (s *PeriodicJobEnqueuer) AddMany(periodicJobs []*PeriodicJob) []rivertype.P default: } - return handles + return handles, nil } // Clear clears all periodic jobs from the enqueuer. @@ -216,6 +272,21 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { default: } + // Initial set of periodic job IDs mapped to next run at times fetched + // from a configured pilot. Not used in most cases. + initialPeriodicJobsMap := func() map[string]time.Time { + initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ + Schema: s.Config.Schema, + }) + if err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error fetching initial periodic jobs", "error", err) + return make(map[string]time.Time) + } + + return sliceutil.KeyBy(initialPeriodicJobs, + func(j *riverpilot.PeriodicJob) (string, time.Time) { return j.ID, j.NextRunAt }) + }() + var lastHandleSeen rivertype.PeriodicJobHandle = -1 // so handle 0 is considered validateInsertRunOnStartAndScheduleNewlyAdded := func() { @@ -223,8 +294,9 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { defer s.mu.RUnlock() var ( - insertParamsMany []*rivertype.JobInsertParams - now = s.Time.NowUTC() + insertParamsMany []*rivertype.JobInsertParams + now = s.Time.NowUTC() + periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema} ) // Handle periodic jobs in sorted order so we can correctly account @@ -241,7 +313,12 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { periodicJob := s.periodicJobs[handle].mustValidate() - periodicJob.nextRunAt = periodicJob.ScheduleFunc(now) + if nextRunAt, ok := initialPeriodicJobsMap[periodicJob.ID]; periodicJob.ID != "" && ok { + periodicJob.nextRunAt = nextRunAt + delete(initialPeriodicJobsMap, periodicJob.ID) + } else { + periodicJob.nextRunAt = periodicJob.ScheduleFunc(now) + } if !periodicJob.RunOnStart { continue @@ -250,9 +327,16 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { if insertParams, ok := s.insertParamsFromConstructor(ctx, periodicJob.ConstructorFunc, now); ok { insertParamsMany = append(insertParamsMany, insertParams) } + + if periodicJob.ID != "" { + periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{ + ID: periodicJob.ID, + NextRunAt: periodicJob.nextRunAt, + }) + } } - s.insertBatch(ctx, insertParamsMany) + s.insertBatch(ctx, insertParamsMany, periodicJobUpsertParams) if len(insertParamsMany) > 0 { s.Logger.DebugContext(ctx, s.Name+": Inserted RunOnStart jobs", "num_jobs", len(insertParamsMany)) @@ -262,6 +346,17 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { // Run any jobs that need to run on start and calculate initial runs. validateInsertRunOnStartAndScheduleNewlyAdded() + // Delete any periodic jobs that were in the database but no longer + // present in the client's configured periodic jobs. + if len(initialPeriodicJobsMap) > 0 { + if _, err := s.Config.Pilot.PeriodicJobDeleteByIDMany(ctx, s.exec, &riverpilot.PeriodicJobDeleteByIDManyParams{ + ID: maputil.Keys(initialPeriodicJobsMap), + Schema: s.Config.Schema, + }); err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error deleting periodic jobs", "error", err) + } + } + s.TestSignals.EnteredLoop.Signal(struct{}{}) timerUntilNextRun := time.NewTimer(s.timeUntilNextRun()) @@ -269,7 +364,10 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { for { select { case <-timerUntilNextRun.C: - var insertParamsMany []*rivertype.JobInsertParams + var ( + insertParamsMany []*rivertype.JobInsertParams + periodicJobUpsertParams = &riverpilot.PeriodicJobUpsertManyParams{Schema: s.Config.Schema} + ) now := s.Time.NowUTC() @@ -296,10 +394,17 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { // as true as possible to the original schedule by using the // original run time when calculating the next one. periodicJob.nextRunAt = periodicJob.ScheduleFunc(periodicJob.nextRunAt) + + if periodicJob.ID != "" { + periodicJobUpsertParams.Jobs = append(periodicJobUpsertParams.Jobs, &riverpilot.PeriodicJobUpsertParams{ + ID: periodicJob.ID, + NextRunAt: periodicJob.nextRunAt, + }) + } } }() - s.insertBatch(ctx, insertParamsMany) + s.insertBatch(ctx, insertParamsMany, periodicJobUpsertParams) case <-s.recalculateNextRun: if !timerUntilNextRun.Stop() { @@ -329,8 +434,8 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { return nil } -func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*rivertype.JobInsertParams) { - if len(insertParamsMany) == 0 { +func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*rivertype.JobInsertParams, periodicJobUpsertParams *riverpilot.PeriodicJobUpsertManyParams) { + if len(insertParamsMany) < 1 { return } @@ -345,6 +450,13 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany if err := s.Config.Insert(ctx, tx, insertParamsMany); err != nil { s.Logger.ErrorContext(ctx, s.Name+": Error inserting periodic jobs", "error", err.Error(), "num_jobs", len(insertParamsMany)) + } + } + + if len(periodicJobUpsertParams.Jobs) > 0 { + if _, err = s.Config.Pilot.PeriodicJobUpsertMany(ctx, tx, periodicJobUpsertParams); err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error upserting periodic job next run times", + "error", err.Error(), "num_jobs", len(insertParamsMany), "num_next_run_at_upserts", len(periodicJobUpsertParams.Jobs)) return } } @@ -427,3 +539,17 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration { return firstNextRunAt.Sub(now) } + +// Adds a unique ID to known periodic job IDs, erroring in case of a duplicate. +func addUniqueID(periodicJobIDs map[string]struct{}, id string) error { + if id == "" { + return nil + } + + if _, ok := periodicJobIDs[id]; ok { + return errors.New("periodic job with ID already registered: " + id) + } + + periodicJobIDs[id] = struct{}{} + return nil +} diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index 22a84707..2dd86977 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -3,6 +3,7 @@ package maintenance import ( "context" "fmt" + "strings" "sync" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/startstoptest" @@ -22,6 +24,55 @@ import ( "github.com/riverqueue/river/rivertype" ) +func TestPeriodicJob(t *testing.T) { + t.Parallel() + + validPeriodicJob := func() *PeriodicJob { + return &PeriodicJob{ + ConstructorFunc: func() (*rivertype.JobInsertParams, error) { return nil, nil }, + ScheduleFunc: func(t time.Time) time.Time { return time.Time{} }, + } + } + + t.Run("Valid", func(t *testing.T) { + t.Parallel() + + require.NoError(t, validPeriodicJob().validate()) + }) + + t.Run("IDTooLong", func(t *testing.T) { + t.Parallel() + + periodicJob := validPeriodicJob() + periodicJob.ID = strings.Repeat("a", 128) + require.EqualError(t, periodicJob.validate(), "PeriodicJob.ID must be less than 128 characters") + }) + + t.Run("IDIllegalCharacters", func(t *testing.T) { + t.Parallel() + + periodicJob := validPeriodicJob() + periodicJob.ID = "shouldn't have spaces and stuff" + require.EqualError(t, periodicJob.validate(), `PeriodicJob.ID "shouldn't have spaces and stuff" should match regex `+rivercommon.UserSpecifiedIDOrKindRE.String()) + }) + + t.Run("ConstructorFuncMissing", func(t *testing.T) { + t.Parallel() + + periodicJob := validPeriodicJob() + periodicJob.ConstructorFunc = nil + require.EqualError(t, periodicJob.validate(), "PeriodicJob.ConstructorFunc must be set") + }) + + t.Run("ScheduleFuncMissing", func(t *testing.T) { + t.Parallel() + + periodicJob := validPeriodicJob() + periodicJob.ScheduleFunc = nil + require.EqualError(t, periodicJob.validate(), "PeriodicJob.ScheduleFunc must be set") + }) +} + type noOpArgs struct{} func (noOpArgs) Kind() string { return "no_op" } @@ -34,6 +85,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { type testBundle struct { exec riverdriver.Executor notificationsByQueue map[string]int + pilotMock *PilotPeriodicJobMock schema string waitChan chan (struct{}) } @@ -103,11 +155,17 @@ func TestPeriodicJobEnqueuer(t *testing.T) { bundle := &testBundle{ exec: riverpgxv5.New(dbPool).GetExecutor(), notificationsByQueue: make(map[string]int), + pilotMock: NewPilotPeriodicJobMock(), schema: schema, waitChan: make(chan struct{}), } - svc := NewPeriodicJobEnqueuer(riversharedtest.BaseServiceArchetype(t), &PeriodicJobEnqueuerConfig{Insert: makeInsertFunc(schema)}, bundle.exec) + svc, err := NewPeriodicJobEnqueuer(riversharedtest.BaseServiceArchetype(t), &PeriodicJobEnqueuerConfig{ + Insert: makeInsertFunc(schema), + Pilot: bundle.pilotMock, + Schema: schema, + }, bundle.exec) + require.NoError(t, err) svc.StaggerStartupDisable(true) svc.TestSignals.Init(t) @@ -160,10 +218,11 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, bundle := setup(t) - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(1500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_1500ms", false)}, }) + require.NoError(t, err) startService(t, svc) @@ -197,12 +256,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) { } } - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_nil", nil)}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_empty_string", []byte(""))}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_empty_obj", []byte("{}"))}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_existing", []byte(`{"key": "value"}`))}, }) + require.NoError(t, err) startService(t, svc) @@ -224,9 +284,10 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, bundle := setup(t) - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false), RunOnStart: true}, }) + require.NoError(t, err) startService(t, svc) @@ -251,9 +312,10 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, bundle := setup(t) - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("unique_periodic_job_500ms", true)}, }) + require.NoError(t, err) startService(t, svc) @@ -277,10 +339,11 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, bundle := setup(t) - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false), RunOnStart: true}, {ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("unique_periodic_job_5s", true), RunOnStart: true}, }) + require.NoError(t, err) start := time.Now() startService(t, svc) @@ -298,12 +361,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, _ := setup(t) - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ // skip this insert when it returns nil: {ScheduleFunc: periodicIntervalSchedule(time.Second), ConstructorFunc: func() (*rivertype.JobInsertParams, error) { return nil, ErrNoJobToInsert }, RunOnStart: true}, }) + require.NoError(t, err) startService(t, svc) @@ -318,7 +382,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { now := svc.Time.StubNowUTC(time.Now()) svc.periodicJobs = make(map[rivertype.PeriodicJobHandle]*PeriodicJob) - periodicJobHandles := svc.AddMany([]*PeriodicJob{ + periodicJobHandles, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(1500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_1500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)}, @@ -326,6 +390,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { {ScheduleFunc: periodicIntervalSchedule(3 * time.Hour), ConstructorFunc: jobConstructorFunc("periodic_job_3h", false)}, {ScheduleFunc: periodicIntervalSchedule(7 * 24 * time.Hour), ConstructorFunc: jobConstructorFunc("periodic_job_7d", false)}, }) + require.NoError(t, err) startService(t, svc) @@ -379,13 +444,16 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc, _ := setup(t) - svc.Add(&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(time.Microsecond), ConstructorFunc: jobConstructorFunc("periodic_job_1us", false)}) + _, err := svc.AddSafely(&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(time.Microsecond), ConstructorFunc: jobConstructorFunc("periodic_job_1us", false)}) + require.NoError(t, err) + // make a longer list of jobs so the loop has to run for longer for i := 1; i < 100; i++ { - svc.Add(&PeriodicJob{ + _, err := svc.AddSafely(&PeriodicJob{ ScheduleFunc: periodicIntervalSchedule(time.Duration(i) * time.Hour), ConstructorFunc: jobConstructorFunc(fmt.Sprintf("periodic_job_%dh", i), false), }) + require.NoError(t, err) } startService(t, svc) @@ -402,7 +470,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { _, bundle := setup(t) - svc := NewPeriodicJobEnqueuer( + svc, err := NewPeriodicJobEnqueuer( riversharedtest.BaseServiceArchetype(t), &PeriodicJobEnqueuerConfig{ Insert: makeInsertFunc(bundle.schema), @@ -411,6 +479,7 @@ func TestPeriodicJobEnqueuer(t *testing.T) { {ScheduleFunc: periodicIntervalSchedule(1500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_1500ms", false), RunOnStart: true}, }, }, bundle.exec) + require.NoError(t, err) svc.StaggerStartupDisable(true) svc.TestSignals.Init(t) @@ -428,12 +497,14 @@ func TestPeriodicJobEnqueuer(t *testing.T) { startService(t, svc) - svc.Add( + _, err := svc.AddSafely( &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, ) - svc.Add( + require.NoError(t, err) + _, err = svc.AddSafely( &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true}, ) + require.NoError(t, err) svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle, "periodic_job_500ms", 0) @@ -447,10 +518,11 @@ func TestPeriodicJobEnqueuer(t *testing.T) { startService(t, svc) - svc.AddMany([]*PeriodicJob{ + _, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true}, }) + require.NoError(t, err) svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle, "periodic_job_500ms", 0) @@ -464,10 +536,11 @@ func TestPeriodicJobEnqueuer(t *testing.T) { startService(t, svc) - handles := svc.AddMany([]*PeriodicJob{ + handles, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true}, }) + require.NoError(t, err) svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle, "periodic_job_500ms", 0) @@ -477,9 +550,10 @@ func TestPeriodicJobEnqueuer(t *testing.T) { require.Empty(t, svc.periodicJobs) - handleAfterClear := svc.Add( + handleAfterClear, err := svc.AddSafely( &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_new", false)}, ) + require.NoError(t, err) // Handles are not reused. require.NotEqual(t, handles[0], handleAfterClear) @@ -493,10 +567,11 @@ func TestPeriodicJobEnqueuer(t *testing.T) { startService(t, svc) - handles := svc.AddMany([]*PeriodicJob{ + handles, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true}, }) + require.NoError(t, err) svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle, "periodic_job_500ms", 0) @@ -514,11 +589,12 @@ func TestPeriodicJobEnqueuer(t *testing.T) { startService(t, svc) - handles := svc.AddMany([]*PeriodicJob{ + handles, err := svc.AddManySafely([]*PeriodicJob{ {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_other", false)}, {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true}, }) + require.NoError(t, err) svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle, "periodic_job_500ms", 0) @@ -552,10 +628,12 @@ func TestPeriodicJobEnqueuer(t *testing.T) { defer wg.Done() for range 50 { - handle := svc.Add(&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(time.Millisecond), ConstructorFunc: jobConstructorFunc(jobBaseName, false)}) + handle, err := svc.AddSafely(&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(time.Millisecond), ConstructorFunc: jobConstructorFunc(jobBaseName, false)}) + require.NoError(t, err) randomSleep() - svc.Add(&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(time.Millisecond), ConstructorFunc: jobConstructorFunc(jobBaseName+"_second", false)}) + _, err = svc.AddSafely(&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(time.Millisecond), ConstructorFunc: jobConstructorFunc(jobBaseName+"_second", false)}) + require.NoError(t, err) randomSleep() svc.Remove(handle) @@ -653,4 +731,150 @@ func TestPeriodicJobEnqueuer(t *testing.T) { // pick job with soonest next run amongst some not scheduled yet require.Equal(t, 1*time.Hour, svc.timeUntilNextRun()) }) + + t.Run("InvokesPilot", func(t *testing.T) { + t.Parallel() + + svc, bundle := setup(t) + + now := time.Now() + + bundle.pilotMock.PeriodicJobGetAllMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { + require.Equal(t, bundle.schema, params.Schema) + return []*riverpilot.PeriodicJob{ + {ID: "periodic_job_500ms", NextRunAt: now.Add(1 * time.Hour)}, + {ID: "periodic_job_1500ms", NextRunAt: now.Add(2 * time.Hour)}, + {ID: "periodic_job_999ms", NextRunAt: now.Add(3 * time.Hour)}, + }, nil + } + + var periodicJobDeleteByIDManyMockCalled bool + bundle.pilotMock.PeriodicJobDeleteByIDManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { + periodicJobDeleteByIDManyMockCalled = true + require.Equal(t, []string{"periodic_job_999ms"}, params.ID) + require.Equal(t, bundle.schema, params.Schema) + return nil, nil + } + + var PeriodicJobUpsertManyMockCalled bool + bundle.pilotMock.PeriodicJobUpsertManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { + PeriodicJobUpsertManyMockCalled = true + require.Equal(t, []string{"periodic_job_100ms"}, sliceutil.Map(params.Jobs, func(j *riverpilot.PeriodicJobUpsertParams) string { return j.ID })) + require.Equal(t, bundle.schema, params.Schema) + return nil, nil + } + + handles, err := svc.AddManySafely([]*PeriodicJob{ + {ID: "periodic_job_100ms", ScheduleFunc: periodicIntervalSchedule(100 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_100ms", false)}, + {ID: "periodic_job_500ms", ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, + {ID: "periodic_job_1500ms", ScheduleFunc: periodicIntervalSchedule(1500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_1500ms", false)}, + }) + require.NoError(t, err) + + startService(t, svc) + + svc.TestSignals.InsertedJobs.WaitOrTimeout() + requireNJobs(t, bundle, "periodic_job_100ms", 1) + requireNJobs(t, bundle, "periodic_job_500ms", 0) + requireNJobs(t, bundle, "periodic_job_1500ms", 0) + + require.True(t, periodicJobDeleteByIDManyMockCalled) + require.True(t, PeriodicJobUpsertManyMockCalled) + + svc.Stop() + + require.WithinDuration(t, now.Add(1*time.Hour), svc.periodicJobs[handles[1]].nextRunAt, time.Microsecond) + require.WithinDuration(t, now.Add(2*time.Hour), svc.periodicJobs[handles[2]].nextRunAt, time.Microsecond) + }) + + t.Run("PilotNotInvokedWithoutID", func(t *testing.T) { + t.Parallel() + + svc, bundle := setup(t) + + bundle.pilotMock.PeriodicJobGetAllMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { + return nil, nil + } + + var periodicJobDeleteByIDManyMockCalled bool + bundle.pilotMock.PeriodicJobDeleteByIDManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { + periodicJobDeleteByIDManyMockCalled = true + return nil, nil + } + + var PeriodicJobUpsertManyMockCalled bool + bundle.pilotMock.PeriodicJobUpsertManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { + PeriodicJobUpsertManyMockCalled = true + return nil, nil + } + + _, err := svc.AddManySafely([]*PeriodicJob{ + {ScheduleFunc: periodicIntervalSchedule(100 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_100ms", false)}, + {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, + {ScheduleFunc: periodicIntervalSchedule(1500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_1500ms", false)}, + }) + require.NoError(t, err) + + startService(t, svc) + + svc.TestSignals.InsertedJobs.WaitOrTimeout() + requireNJobs(t, bundle, "periodic_job_100ms", 1) + + require.False(t, periodicJobDeleteByIDManyMockCalled) + require.False(t, PeriodicJobUpsertManyMockCalled) + }) + + t.Run("DuplicateIDError", func(t *testing.T) { + t.Parallel() + + svc, bundle := setup(t) + + periodicJobs := []*PeriodicJob{ + {ID: "periodic_job_100ms", ScheduleFunc: periodicIntervalSchedule(100 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_100ms", false)}, + {ID: "periodic_job_100ms", ScheduleFunc: periodicIntervalSchedule(100 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)}, + } + + _, err := NewPeriodicJobEnqueuer(riversharedtest.BaseServiceArchetype(t), &PeriodicJobEnqueuerConfig{ + Insert: makeInsertFunc(bundle.schema), + PeriodicJobs: periodicJobs, + Pilot: bundle.pilotMock, + Schema: bundle.schema, + }, bundle.exec) + require.EqualError(t, err, "periodic job with ID already registered: periodic_job_100ms") + + _, err = svc.AddManySafely(periodicJobs) + require.EqualError(t, err, "periodic job with ID already registered: periodic_job_100ms") + }) +} + +type PilotPeriodicJobMock struct { + PeriodicJobDeleteByIDManyMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) + PeriodicJobGetAllMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) + PeriodicJobUpsertManyMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) +} + +func NewPilotPeriodicJobMock() *PilotPeriodicJobMock { + return &PilotPeriodicJobMock{ + PeriodicJobDeleteByIDManyMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { + return nil, nil + }, + PeriodicJobGetAllMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { + return nil, nil + }, + PeriodicJobUpsertManyMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { + return nil, nil + }, + } +} + +func (p *PilotPeriodicJobMock) PeriodicJobDeleteByIDMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { + return p.PeriodicJobDeleteByIDManyMock(ctx, exec, params) +} + +func (p *PilotPeriodicJobMock) PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { + return p.PeriodicJobGetAllMock(ctx, exec, params) +} + +func (p *PilotPeriodicJobMock) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { + return p.PeriodicJobUpsertManyMock(ctx, exec, params) } diff --git a/internal/maintenance/queue_maintainer_test.go b/internal/maintenance/queue_maintainer_test.go index 75e688e6..256b6853 100644 --- a/internal/maintenance/queue_maintainer_test.go +++ b/internal/maintenance/queue_maintainer_test.go @@ -101,20 +101,23 @@ func TestQueueMaintainer(t *testing.T) { driver := riverpgxv5.New(nil).UnwrapExecutor(sharedTx) + periodicJobEnqueuer, err := NewPeriodicJobEnqueuer(archetype, &PeriodicJobEnqueuerConfig{ + PeriodicJobs: []*PeriodicJob{ + { + ConstructorFunc: func() (*rivertype.JobInsertParams, error) { + return nil, ErrNoJobToInsert + }, + ScheduleFunc: cron.Every(15 * time.Minute).Next, + }, + }, + }, driver) + require.NoError(t, err) + // Use realistic services in this one so we can verify stress not only // on the queue maintainer, but it and all its subservices together. maintainer := setup(t, []startstop.Service{ NewJobCleaner(archetype, &JobCleanerConfig{}, driver), - NewPeriodicJobEnqueuer(archetype, &PeriodicJobEnqueuerConfig{ - PeriodicJobs: []*PeriodicJob{ - { - ConstructorFunc: func() (*rivertype.JobInsertParams, error) { - return nil, ErrNoJobToInsert - }, - ScheduleFunc: cron.Every(15 * time.Minute).Next, - }, - }, - }, driver), + periodicJobEnqueuer, NewQueueCleaner(archetype, &QueueCleanerConfig{}, driver), NewJobScheduler(archetype, &JobSchedulerConfig{}, driver), }) diff --git a/internal/rivercommon/river_common.go b/internal/rivercommon/river_common.go index 4f3519c6..063abe7d 100644 --- a/internal/rivercommon/river_common.go +++ b/internal/rivercommon/river_common.go @@ -2,6 +2,7 @@ package rivercommon import ( "errors" + "regexp" ) // These constants are made available in rivercommon so that they're accessible @@ -27,3 +28,9 @@ type ContextKeyClient struct{} // CancelCauseFuncs when it's stopping. It may be used by components for such // cases like avoiding logging an error during a normal shutdown procedure. var ErrStop = errors.New("stop initiated") + +// UserSpecifiedIDOrKindRE is a regular expression to which the format of job +// kinds and some other user-specified IDs (e.g. periodic job names) must +// comply. Mainly, minimal special characters, and excluding spaces and commas +// which are problematic for the search UI. +var UserSpecifiedIDOrKindRE = regexp.MustCompile(`\A[\w][\w\-\[\]<>\/.·:+]+\z`) diff --git a/internal/rivercommon/river_common_test.go b/internal/rivercommon/river_common_test.go new file mode 100644 index 00000000..a492386f --- /dev/null +++ b/internal/rivercommon/river_common_test.go @@ -0,0 +1,27 @@ +package rivercommon + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestJobKindRE(t *testing.T) { + t.Parallel() + + require.Regexp(t, UserSpecifiedIDOrKindRE, "kind") + require.Regexp(t, UserSpecifiedIDOrKindRE, "kind123") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with.dot") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with:colon") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with+plus") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with-hyphen") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with_underscore") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with[brackets]") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with") + require.Regexp(t, UserSpecifiedIDOrKindRE, "with/slash") + require.Regexp(t, UserSpecifiedIDOrKindRE, "JobArgsReflectKind[github.com/riverqueue/river.JobArgs·12]") + + require.NotRegexp(t, UserSpecifiedIDOrKindRE, "with space") + require.NotRegexp(t, UserSpecifiedIDOrKindRE, "with,comma") + require.NotRegexp(t, UserSpecifiedIDOrKindRE, ":no_leading_special_characters") +} diff --git a/periodic_job.go b/periodic_job.go index fe3d254b..ba615505 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -33,6 +33,11 @@ type PeriodicJob struct { // PeriodicJobOpts are options for a periodic job. type PeriodicJobOpts struct { + // ID is an optional identifier for the job. Identifiers must be unique + // between all periodic jobs and adding a periodic job will error if they're + // not. + ID string + // RunOnStart can be used to indicate that a periodic job should insert an // initial job as a new scheduler is started. This can be used as a hedge // for jobs with longer scheduled durations that may not get to expiry @@ -114,7 +119,7 @@ func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.Perio } } -// Adds a new periodic job to the client. The job is queued immediately if +// Add adds a new periodic job to the client. The job is queued immediately if // RunOnStart is enabled, and then scheduled normally. // // Returns a periodic job handle which can be used to subsequently remove the @@ -125,7 +130,17 @@ func newPeriodicJobBundle(config *Config, periodicJobEnqueuer *maintenance.Perio // new periodic job is fully enabled or disabled, it should be added or removed // from _every_ active River client across all processes. func (b *PeriodicJobBundle) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobHandle { - return b.periodicJobEnqueuer.Add(b.toInternal(periodicJob)) + handle, err := b.periodicJobEnqueuer.AddSafely(b.toInternal(periodicJob)) + if err != nil { + panic(err) + } + return handle +} + +// AddSafely is the same as Add, but it returns an error in the case of a +// validation problem or duplicate ID instead of panicking. +func (b *PeriodicJobBundle) AddSafely(periodicJob *PeriodicJob) (rivertype.PeriodicJobHandle, error) { + return b.periodicJobEnqueuer.AddSafely(b.toInternal(periodicJob)) } // AddMany adds many new periodic jobs to the client. The jobs are queued @@ -139,7 +154,17 @@ func (b *PeriodicJobBundle) Add(periodicJob *PeriodicJob) rivertype.PeriodicJobH // new periodic job is fully enabled or disabled, it should be added or removed // from _every_ active River client across all processes. func (b *PeriodicJobBundle) AddMany(periodicJobs []*PeriodicJob) []rivertype.PeriodicJobHandle { - return b.periodicJobEnqueuer.AddMany(sliceutil.Map(periodicJobs, b.toInternal)) + handles, err := b.periodicJobEnqueuer.AddManySafely(sliceutil.Map(periodicJobs, b.toInternal)) + if err != nil { + panic(err) + } + return handles +} + +// AddManySafely is the same as AddMany, but it returns an error in the case of +// a validation problem or duplicate ID instead of panicking. +func (b *PeriodicJobBundle) AddManySafely(periodicJobs []*PeriodicJob) ([]rivertype.PeriodicJobHandle, error) { + return b.periodicJobEnqueuer.AddManySafely(sliceutil.Map(periodicJobs, b.toInternal)) } // Clear clears all periodic jobs, cancelling all scheduled runs. @@ -191,6 +216,7 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe opts = periodicJob.opts } return &maintenance.PeriodicJob{ + ID: opts.ID, ConstructorFunc: func() (*rivertype.JobInsertParams, error) { args, options := periodicJob.constructorFunc() if args == nil { diff --git a/periodic_job_test.go b/periodic_job_test.go index 70eda012..94aaf7b5 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -36,11 +36,12 @@ func TestPeriodicJobBundle(t *testing.T) { setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { //nolint:unparam t.Helper() - periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer( + periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer( riversharedtest.BaseServiceArchetype(t), &maintenance.PeriodicJobEnqueuerConfig{}, nil, ) + require.NoError(t, err) return newPeriodicJobBundle(newTestConfig(t, ""), periodicJobEnqueuer), &testBundle{} } @@ -96,6 +97,48 @@ func TestPeriodicJobBundle(t *testing.T) { _, err := internalPeriodicJob.ConstructorFunc() require.ErrorIs(t, err, maintenance.ErrNoJobToInsert) }) + + t.Run("AddError", func(t *testing.T) { + t.Parallel() + + periodicJobBundle, _ := setup(t) + + periodicJob := NewPeriodicJob( + PeriodicInterval(15*time.Minute), + func() (JobArgs, *InsertOpts) { return nil, nil }, + &PeriodicJobOpts{ID: "periodic_job_id"}, + ) + + periodicJobBundle.Add(periodicJob) + + require.PanicsWithError(t, "periodic job with ID already registered: periodic_job_id", func() { + periodicJobBundle.Add(periodicJob) + }) + + _, err := periodicJobBundle.AddSafely(periodicJob) + require.EqualError(t, err, "periodic job with ID already registered: periodic_job_id") + }) + + t.Run("AddManyError", func(t *testing.T) { + t.Parallel() + + periodicJobBundle, _ := setup(t) + + periodicJob := NewPeriodicJob( + PeriodicInterval(15*time.Minute), + func() (JobArgs, *InsertOpts) { return nil, nil }, + &PeriodicJobOpts{ID: "periodic_job_id"}, + ) + + periodicJobBundle.Add(periodicJob) + + require.PanicsWithError(t, "periodic job with ID already registered: periodic_job_id", func() { + periodicJobBundle.AddMany([]*PeriodicJob{periodicJob}) + }) + + _, err := periodicJobBundle.AddManySafely([]*PeriodicJob{periodicJob}) + require.EqualError(t, err, "periodic job with ID already registered: periodic_job_id") + }) } func mustUnmarshalJSON[T any](t *testing.T, data []byte) *T { diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index 04a1c634..42711ad9 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -2,6 +2,7 @@ package riverpilot import ( "context" + "time" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" @@ -16,6 +17,8 @@ import ( // This should be considered a River internal API and its stability is not // guaranteed. DO NOT USE. type Pilot interface { + PilotPeriodicJob + JobGetAvailable( ctx context.Context, exec riverdriver.Executor, @@ -45,6 +48,53 @@ type Pilot interface { QueueMetadataChanged(ctx context.Context, exec riverdriver.Executor, params *QueueMetadataChangedParams) error } +// PilotPeriodicJob contains pilot functions related to periodic jobs. This is +// extracted as its own interface so there's less surface area to mock in places +// like the periodic job enqueuer where that's needed. +type PilotPeriodicJob interface { + // PeriodicJobDeleteByIDMany deletes many periodic jobs by ID. + // + // API is not stable. DO NOT USE. + PeriodicJobDeleteByIDMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobDeleteByIDManyParams) ([]*PeriodicJob, error) + + // PeriodicJobGetAll gets all currently known periodic jobs. + // + // API is not stable. DO NOT USE. + PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error) + + // PeriodicJobUpsertMany upserts many periodic jobs. + // + // API is not stable. DO NOT USE. + PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error) +} + +type PeriodicJob struct { + ID string + CreatedAt time.Time + NextRunAt time.Time + UpdatedAt time.Time +} + +type PeriodicJobDeleteByIDManyParams struct { + ID []string + Schema string +} + +type PeriodicJobGetAllParams struct { + Schema string +} + +type PeriodicJobUpsertManyParams struct { + Jobs []*PeriodicJobUpsertParams + Schema string +} + +type PeriodicJobUpsertParams struct { + ID string + NextRunAt time.Time + UpdatedAt time.Time +} + type ProducerState interface { JobFinish(job *rivertype.JobRow) } diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go index ea12b207..37bbf29d 100644 --- a/rivershared/riverpilot/standard.go +++ b/rivershared/riverpilot/standard.go @@ -32,6 +32,18 @@ func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, execTx riv return execTx.JobSetStateIfRunningMany(ctx, params) } +func (p *StandardPilot) PeriodicJobDeleteByIDMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobDeleteByIDManyParams) ([]*PeriodicJob, error) { + return nil, nil +} + +func (p *StandardPilot) PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error) { + return nil, nil +} + +func (p *StandardPilot) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobUpsertManyParams) ([]*PeriodicJob, error) { + return nil, nil +} + func (p *StandardPilot) PilotInit(archetype *baseservice.Archetype) { // No-op } From a1852d328f9d338f769afd1a62c945a95ad5ccb4 Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 4 Jun 2025 10:57:07 +0200 Subject: [PATCH 2/2] Change periodic jobs to a keep alive + reap model Change durable periodic jobs so that instead of doing a single removal pass after initial leader election, the leader periodically bumps the `updated_at` timestamp of known periodic jobs while simultaneously removing any jobs that haven't been seen in a configurable period, making it very similar to how the producer keep alives work. --- internal/maintenance/periodic_job_enqueuer.go | 68 +++++++++++++++---- .../maintenance/periodic_job_enqueuer_test.go | 50 +++++++------- producer.go | 1 - rivershared/riverpilot/pilot.go | 16 +++-- rivershared/riverpilot/standard.go | 2 +- 5 files changed, 90 insertions(+), 47 deletions(-) diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 2343f566..a034ce46 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -19,6 +19,7 @@ import ( "github.com/riverqueue/river/rivershared/util/maputil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivershared/util/timeutil" "github.com/riverqueue/river/rivertype" ) @@ -28,14 +29,16 @@ var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert") // Test-only properties. type PeriodicJobEnqueuerTestSignals struct { - EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop - InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted - SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams + EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop + InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted + PeriodicJobKeepAliveAndReap testsignal.TestSignal[struct{}] // notifies when the background services that runs keep alive and reap on periodic jobs ticks + SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams } func (ts *PeriodicJobEnqueuerTestSignals) Init(tb testutil.TestingTB) { ts.EnteredLoop.Init(tb) ts.InsertedJobs.Init(tb) + ts.PeriodicJobKeepAliveAndReap.Init(tb) ts.SkippedJob.Init(tb) } @@ -257,6 +260,13 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { s.StaggerStart(ctx) + subServices := []startstop.Service{ + startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), + } + if err := startstop.StartAll(ctx, subServices...); err != nil { + return err + } + go func() { started() defer stopped() // this defer should come first so it's last out @@ -264,6 +274,8 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStarted) defer s.Logger.DebugContext(ctx, s.Name+logPrefixRunLoopStopped) + defer startstop.StopAllParallel(subServices...) + // Drain the signal to recalculate next run if it's been sent (i.e. Add // or AddMany called before Start). We're about to schedule jobs from // scratch, and therefore don't need to immediately do so again. @@ -346,17 +358,6 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { // Run any jobs that need to run on start and calculate initial runs. validateInsertRunOnStartAndScheduleNewlyAdded() - // Delete any periodic jobs that were in the database but no longer - // present in the client's configured periodic jobs. - if len(initialPeriodicJobsMap) > 0 { - if _, err := s.Config.Pilot.PeriodicJobDeleteByIDMany(ctx, s.exec, &riverpilot.PeriodicJobDeleteByIDManyParams{ - ID: maputil.Keys(initialPeriodicJobsMap), - Schema: s.Config.Schema, - }); err != nil { - s.Logger.ErrorContext(ctx, s.Name+": Error deleting periodic jobs", "error", err) - } - } - s.TestSignals.EnteredLoop.Signal(struct{}{}) timerUntilNextRun := time.NewTimer(s.timeUntilNextRun()) @@ -493,6 +494,45 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c return insertParams, true } +func (s *PeriodicJobEnqueuer) periodicJobKeepAliveAndReapPeriodically(ctx context.Context, shouldStart bool, started, stopped func()) error { + if !shouldStart { + return nil + } + + go func() { + started() + defer stopped() // this defer should come first so it's last out + + ticker := timeutil.NewTickerWithInitialTick(ctx, 10*time.Minute) + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + func() { + s.mu.RLock() + defer s.mu.RUnlock() + + if len(s.periodicJobIDs) > 0 { + if _, err := s.Config.Pilot.PeriodicJobKeepAliveAndReap(ctx, s.exec, &riverpilot.PeriodicJobKeepAliveAndReapParams{ + ID: maputil.Keys(s.periodicJobIDs), + Schema: s.Config.Schema, + }); err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error executing periodic job keep alive and reap", "error", err.Error()) + return + } + } + + s.TestSignals.PeriodicJobKeepAliveAndReap.Signal(struct{}{}) + }() + } + } + }() + + return nil +} + const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration { diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index 2dd86977..19050f77 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -748,17 +748,17 @@ func TestPeriodicJobEnqueuer(t *testing.T) { }, nil } - var periodicJobDeleteByIDManyMockCalled bool - bundle.pilotMock.PeriodicJobDeleteByIDManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { - periodicJobDeleteByIDManyMockCalled = true - require.Equal(t, []string{"periodic_job_999ms"}, params.ID) + var periodicJobKeepAliveAndReapMockCalled bool + bundle.pilotMock.PeriodicJobKeepAliveAndReapMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) { + periodicJobKeepAliveAndReapMockCalled = true + require.ElementsMatch(t, []string{"periodic_job_100ms", "periodic_job_500ms", "periodic_job_1500ms"}, params.ID) require.Equal(t, bundle.schema, params.Schema) return nil, nil } - var PeriodicJobUpsertManyMockCalled bool + var periodicJobUpsertManyMockCalled bool bundle.pilotMock.PeriodicJobUpsertManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { - PeriodicJobUpsertManyMockCalled = true + periodicJobUpsertManyMockCalled = true require.Equal(t, []string{"periodic_job_100ms"}, sliceutil.Map(params.Jobs, func(j *riverpilot.PeriodicJobUpsertParams) string { return j.ID })) require.Equal(t, bundle.schema, params.Schema) return nil, nil @@ -777,9 +777,10 @@ func TestPeriodicJobEnqueuer(t *testing.T) { requireNJobs(t, bundle, "periodic_job_100ms", 1) requireNJobs(t, bundle, "periodic_job_500ms", 0) requireNJobs(t, bundle, "periodic_job_1500ms", 0) + require.True(t, periodicJobUpsertManyMockCalled) - require.True(t, periodicJobDeleteByIDManyMockCalled) - require.True(t, PeriodicJobUpsertManyMockCalled) + svc.TestSignals.PeriodicJobKeepAliveAndReap.WaitOrTimeout() + require.True(t, periodicJobKeepAliveAndReapMockCalled) svc.Stop() @@ -796,15 +797,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) { return nil, nil } - var periodicJobDeleteByIDManyMockCalled bool - bundle.pilotMock.PeriodicJobDeleteByIDManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { - periodicJobDeleteByIDManyMockCalled = true + var periodicJobKeepAliveAndReapMockCalled bool + bundle.pilotMock.PeriodicJobKeepAliveAndReapMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) { + periodicJobKeepAliveAndReapMockCalled = true return nil, nil } - var PeriodicJobUpsertManyMockCalled bool + var periodicJobUpsertManyMockCalled bool bundle.pilotMock.PeriodicJobUpsertManyMock = func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { - PeriodicJobUpsertManyMockCalled = true + periodicJobUpsertManyMockCalled = true return nil, nil } @@ -819,9 +820,10 @@ func TestPeriodicJobEnqueuer(t *testing.T) { svc.TestSignals.InsertedJobs.WaitOrTimeout() requireNJobs(t, bundle, "periodic_job_100ms", 1) + require.False(t, periodicJobUpsertManyMockCalled) - require.False(t, periodicJobDeleteByIDManyMockCalled) - require.False(t, PeriodicJobUpsertManyMockCalled) + svc.TestSignals.PeriodicJobKeepAliveAndReap.WaitOrTimeout() + require.False(t, periodicJobKeepAliveAndReapMockCalled) }) t.Run("DuplicateIDError", func(t *testing.T) { @@ -848,17 +850,17 @@ func TestPeriodicJobEnqueuer(t *testing.T) { } type PilotPeriodicJobMock struct { - PeriodicJobDeleteByIDManyMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) - PeriodicJobGetAllMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) - PeriodicJobUpsertManyMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) + PeriodicJobGetAllMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) + PeriodicJobKeepAliveAndReapMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) + PeriodicJobUpsertManyMock func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) } func NewPilotPeriodicJobMock() *PilotPeriodicJobMock { return &PilotPeriodicJobMock{ - PeriodicJobDeleteByIDManyMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { + PeriodicJobGetAllMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { return nil, nil }, - PeriodicJobGetAllMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { + PeriodicJobKeepAliveAndReapMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) { return nil, nil }, PeriodicJobUpsertManyMock: func(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { @@ -867,14 +869,14 @@ func NewPilotPeriodicJobMock() *PilotPeriodicJobMock { } } -func (p *PilotPeriodicJobMock) PeriodicJobDeleteByIDMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobDeleteByIDManyParams) ([]*riverpilot.PeriodicJob, error) { - return p.PeriodicJobDeleteByIDManyMock(ctx, exec, params) -} - func (p *PilotPeriodicJobMock) PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) { return p.PeriodicJobGetAllMock(ctx, exec, params) } +func (p *PilotPeriodicJobMock) PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) { + return p.PeriodicJobKeepAliveAndReapMock(ctx, exec, params) +} + func (p *PilotPeriodicJobMock) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) { return p.PeriodicJobUpsertManyMock(ctx, exec, params) } diff --git a/producer.go b/producer.go index 1fba620a..3c2ac4f1 100644 --- a/producer.go +++ b/producer.go @@ -448,7 +448,6 @@ func (p *producer) TriggerJobFetch() { func (p *producer) TriggerQueueControlEvent(controlEvent *controlEventPayload) { p.queueControlCh <- controlEvent p.testSignals.QueueControlEventTriggered.Signal(controlEvent) - } type controlAction string diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index 42711ad9..81a237bc 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -52,15 +52,17 @@ type Pilot interface { // extracted as its own interface so there's less surface area to mock in places // like the periodic job enqueuer where that's needed. type PilotPeriodicJob interface { - // PeriodicJobDeleteByIDMany deletes many periodic jobs by ID. + // PeriodicJobGetAll gets all currently known periodic jobs. // // API is not stable. DO NOT USE. - PeriodicJobDeleteByIDMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobDeleteByIDManyParams) ([]*PeriodicJob, error) + PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error) - // PeriodicJobGetAll gets all currently known periodic jobs. + // PeriodicJobTouchMany updates the `updated_at` timestamp on many jobs at + // once to keep them alive and reaps any jobs that haven't been seen in some + // time. // // API is not stable. DO NOT USE. - PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobGetAllParams) ([]*PeriodicJob, error) + PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error) // PeriodicJobUpsertMany upserts many periodic jobs. // @@ -75,12 +77,12 @@ type PeriodicJob struct { UpdatedAt time.Time } -type PeriodicJobDeleteByIDManyParams struct { - ID []string +type PeriodicJobGetAllParams struct { Schema string } -type PeriodicJobGetAllParams struct { +type PeriodicJobKeepAliveAndReapParams struct { + ID []string Schema string } diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go index 37bbf29d..dafc8f32 100644 --- a/rivershared/riverpilot/standard.go +++ b/rivershared/riverpilot/standard.go @@ -32,7 +32,7 @@ func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, execTx riv return execTx.JobSetStateIfRunningMany(ctx, params) } -func (p *StandardPilot) PeriodicJobDeleteByIDMany(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobDeleteByIDManyParams) ([]*PeriodicJob, error) { +func (p *StandardPilot) PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *PeriodicJobKeepAliveAndReapParams) ([]*PeriodicJob, error) { return nil, nil }