From 3097be1c2ad177289581fec4fe72ddadc92834f7 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 7 Jul 2025 09:48:18 -0500 Subject: [PATCH] add JobRetry to Pilot interface Allow Pro pilot to further customize this behavior. --- client.go | 12 ++++++------ rivershared/riverpilot/pilot.go | 2 ++ rivershared/riverpilot/standard.go | 4 ++++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/client.go b/client.go index 2a5e6f63..c5710b8a 100644 --- a/client.go +++ b/client.go @@ -1412,11 +1412,7 @@ func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertyp // MaxAttempts is also incremented by one if the job has already exhausted its // max attempts. func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) { - return c.driver.GetExecutor().JobRetry(ctx, &riverdriver.JobRetryParams{ - ID: id, - Now: c.baseService.Time.NowUTCOrNil(), - Schema: c.config.Schema, - }) + return c.jobRetry(ctx, c.driver.GetExecutor(), id) } // JobRetryTx updates the job with the given ID to make it immediately available @@ -1433,7 +1429,11 @@ func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow // MaxAttempts is also incremented by one if the job has already exhausted its // max attempts. func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) { - return c.driver.UnwrapExecutor(tx).JobRetry(ctx, &riverdriver.JobRetryParams{ + return c.jobRetry(ctx, c.driver.UnwrapExecutor(tx), id) +} + +func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, id int64) (*rivertype.JobRow, error) { + return c.pilot.JobRetry(ctx, exec, &riverdriver.JobRetryParams{ ID: id, Now: c.baseService.Time.NowUTCOrNil(), Schema: c.config.Schema, diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index b95075d2..c2edc97c 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -32,6 +32,8 @@ type Pilot interface { params *riverdriver.JobInsertFastManyParams, ) ([]*riverdriver.JobInsertFastResult, error) + JobRetry(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error) + JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) PilotInit(archetype *baseservice.Archetype) diff --git a/rivershared/riverpilot/standard.go b/rivershared/riverpilot/standard.go index 2e1c576a..3db36b8e 100644 --- a/rivershared/riverpilot/standard.go +++ b/rivershared/riverpilot/standard.go @@ -28,6 +28,10 @@ func (p *StandardPilot) JobInsertMany( return exec.JobInsertFastMany(ctx, params) } +func (p *StandardPilot) JobRetry(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error) { + return exec.JobRetry(ctx, params) +} + func (p *StandardPilot) JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { return exec.JobSetStateIfRunningMany(ctx, params) }