diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index f6d398a5..42d538c7 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -235,7 +235,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) { @@ -251,7 +251,7 @@ func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGe if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error) { @@ -259,7 +259,7 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { @@ -270,7 +270,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) { @@ -327,7 +327,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo return nil, interpretError(err) } - return mapSliceError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) { + return sliceutil.MapError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) { job, err := jobRowFromInternal(&row.RiverJob) if err != nil { return nil, err @@ -469,7 +469,7 @@ func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.Jo return nil, interpretError(err) } - return mapSliceError(items, jobRowFromInternal) + return sliceutil.MapError(items, jobRowFromInternal) } func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error) { @@ -482,7 +482,7 @@ func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParam if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) { @@ -517,7 +517,7 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched if err != nil { return nil, interpretError(err) } - return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) { + return sliceutil.MapError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) { job, err := jobRowFromInternal(&result.RiverJob) if err != nil { return nil, err @@ -576,7 +576,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { @@ -1085,27 +1085,6 @@ func leaderFromInternal(internal *dbsqlc.RiverLeader) *riverdriver.Leader { } } -// mapSliceError manipulates a slice and transforms it to a slice of another -// type, returning the first error that occurred invoking the map function, if -// there was one. -func mapSliceError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) { - if collection == nil { - return nil, nil - } - - result := make([]R, len(collection)) - - for i, item := range collection { - var err error - result[i], err = mapFunc(item) - if err != nil { - return nil, err - } - } - - return result, nil -} - func migrationFromInternal(internal *dbsqlc.RiverMigration) *riverdriver.Migration { return &riverdriver.Migration{ CreatedAt: internal.CreatedAt.UTC(), diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 6ce51930..6b531257 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -240,7 +240,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) { @@ -256,7 +256,7 @@ func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGe if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error) { @@ -264,7 +264,7 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { @@ -275,7 +275,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) { @@ -333,7 +333,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo return nil, interpretError(err) } - return mapSliceError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) { + return sliceutil.MapError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) { job, err := jobRowFromInternal(&row.RiverJob) if err != nil { return nil, err @@ -464,7 +464,7 @@ func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.Jo return nil, interpretError(err) } - return mapSliceError(items, jobRowFromInternal) + return sliceutil.MapError(items, jobRowFromInternal) } func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error) { @@ -477,7 +477,7 @@ func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParam if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) { @@ -513,7 +513,7 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched if err != nil { return nil, interpretError(err) } - return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) { + return sliceutil.MapError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) { job, err := jobRowFromInternal(&result.RiverJob) if err != nil { return nil, err @@ -566,7 +566,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { @@ -1123,27 +1123,6 @@ func leaderFromInternal(internal *dbsqlc.RiverLeader) *riverdriver.Leader { } } -// mapSliceError manipulates a slice and transforms it to a slice of another -// type, returning the first error that occurred invoking the map function, if -// there was one. -func mapSliceError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) { - if collection == nil { - return nil, nil - } - - result := make([]R, len(collection)) - - for i, item := range collection { - var err error - result[i], err = mapFunc(item) - if err != nil { - return nil, err - } - } - - return result, nil -} - func migrationFromInternal(internal *dbsqlc.RiverMigration) *riverdriver.Migration { return &riverdriver.Migration{ CreatedAt: internal.CreatedAt.UTC(), diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index abfd8fae..72ba2506 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -313,7 +313,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) { @@ -329,7 +329,7 @@ func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGe if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error) { @@ -337,7 +337,7 @@ func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.Job if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) { @@ -348,7 +348,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) { @@ -608,7 +608,7 @@ func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParam if err != nil { return nil, interpretError(err) } - return mapSliceError(jobs, jobRowFromInternal) + return sliceutil.MapError(jobs, jobRowFromInternal) } func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) { @@ -1438,27 +1438,6 @@ func leaderFromInternal(internal *dbsqlc.RiverLeader) *riverdriver.Leader { } } -// mapSliceError manipulates a slice and transforms it to a slice of another -// type, returning the first error that occurred invoking the map function, if -// there was one. -func mapSliceError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) { - if collection == nil { - return nil, nil - } - - result := make([]R, len(collection)) - - for i, item := range collection { - var err error - result[i], err = mapFunc(item) - if err != nil { - return nil, err - } - } - - return result, nil -} - func migrationFromInternal(internal *dbsqlc.RiverMigration) *riverdriver.Migration { return &riverdriver.Migration{ CreatedAt: internal.CreatedAt.UTC(), diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index 945de5af..c6fac024 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -55,6 +55,23 @@ func Map[T any, R any](collection []T, mapFunc func(T) R) []R { return result } +// MapError manipulates a slice and transforms it to a slice of another type, +// returning the first error that occurred invoking the map function, if there +// was one. +func MapError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) { + result := make([]R, len(collection)) + + for i, item := range collection { + var err error + result[i], err = mapFunc(item) + if err != nil { + return nil, err + } + } + + return result, nil +} + // Uniq returns a duplicate-free version of an array, in which only the first occurrence of each element is kept. // The order of result values is determined by the order they occur in the array. func Uniq[T comparable](collection []T) []T { diff --git a/rivershared/util/sliceutil/slice_util_test.go b/rivershared/util/sliceutil/slice_util_test.go index bdac4bb1..2cd09de3 100644 --- a/rivershared/util/sliceutil/slice_util_test.go +++ b/rivershared/util/sliceutil/slice_util_test.go @@ -1,6 +1,7 @@ package sliceutil import ( + "errors" "fmt" "strconv" "testing" @@ -90,6 +91,24 @@ func TestMap(t *testing.T) { require.Equal(t, []string{"1", "2", "3", "4"}, result2) } +func TestMapError(t *testing.T) { + t.Parallel() + + result1, err := MapError([]int64{1, 2, 3, 4}, func(x int64) (string, error) { + return strconv.FormatInt(x, 10), nil + }) + require.NoError(t, err) + require.Equal(t, []string{"1", "2", "3", "4"}, result1) + + _, err = MapError([]int64{1, 2, 3, 4}, func(x int64) (string, error) { + if x == 4 { + return "", errors.New("error on element 4") + } + return strconv.FormatInt(x, 10), nil + }) + require.EqualError(t, err, "error on element 4") +} + func TestUniq(t *testing.T) { t.Parallel()