Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 9 additions & 30 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) {
Expand All @@ -251,15 +251,15 @@ func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGe
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error) {
jobs, err := dbsqlc.New().JobGetByKindMany(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.Kind)
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) {
Expand All @@ -270,7 +270,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) {
Expand Down Expand Up @@ -327,7 +327,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo
return nil, interpretError(err)
}

return mapSliceError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) {
return sliceutil.MapError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) {
job, err := jobRowFromInternal(&row.RiverJob)
if err != nil {
return nil, err
Expand Down Expand Up @@ -469,7 +469,7 @@ func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.Jo
return nil, interpretError(err)
}

return mapSliceError(items, jobRowFromInternal)
return sliceutil.MapError(items, jobRowFromInternal)
}

func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error) {
Expand All @@ -482,7 +482,7 @@ func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParam
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) {
Expand Down Expand Up @@ -517,7 +517,7 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) {
return sliceutil.MapError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) {
job, err := jobRowFromInternal(&result.RiverJob)
if err != nil {
return nil, err
Expand Down Expand Up @@ -576,7 +576,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) {
Expand Down Expand Up @@ -1085,27 +1085,6 @@ func leaderFromInternal(internal *dbsqlc.RiverLeader) *riverdriver.Leader {
}
}

// mapSliceError manipulates a slice and transforms it to a slice of another
// type, returning the first error that occurred invoking the map function, if
// there was one.
func mapSliceError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) {
if collection == nil {
return nil, nil
}

result := make([]R, len(collection))

for i, item := range collection {
var err error
result[i], err = mapFunc(item)
if err != nil {
return nil, err
}
}

return result, nil
}

func migrationFromInternal(internal *dbsqlc.RiverMigration) *riverdriver.Migration {
return &riverdriver.Migration{
CreatedAt: internal.CreatedAt.UTC(),
Expand Down
39 changes: 9 additions & 30 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) {
Expand All @@ -256,15 +256,15 @@ func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGe
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error) {
jobs, err := dbsqlc.New().JobGetByKindMany(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.Kind)
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) {
Expand All @@ -275,7 +275,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) {
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.Jo
return nil, interpretError(err)
}

return mapSliceError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) {
return sliceutil.MapError(items, func(row *dbsqlc.JobInsertFastManyRow) (*riverdriver.JobInsertFastResult, error) {
job, err := jobRowFromInternal(&row.RiverJob)
if err != nil {
return nil, err
Expand Down Expand Up @@ -464,7 +464,7 @@ func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.Jo
return nil, interpretError(err)
}

return mapSliceError(items, jobRowFromInternal)
return sliceutil.MapError(items, jobRowFromInternal)
}

func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error) {
Expand All @@ -477,7 +477,7 @@ func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParam
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) {
Expand Down Expand Up @@ -513,7 +513,7 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) {
return sliceutil.MapError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) {
job, err := jobRowFromInternal(&result.RiverJob)
if err != nil {
return nil, err
Expand Down Expand Up @@ -566,7 +566,7 @@ func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdr
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) {
Expand Down Expand Up @@ -1123,27 +1123,6 @@ func leaderFromInternal(internal *dbsqlc.RiverLeader) *riverdriver.Leader {
}
}

// mapSliceError manipulates a slice and transforms it to a slice of another
// type, returning the first error that occurred invoking the map function, if
// there was one.
func mapSliceError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) {
if collection == nil {
return nil, nil
}

result := make([]R, len(collection))

for i, item := range collection {
var err error
result[i], err = mapFunc(item)
if err != nil {
return nil, err
}
}

return result, nil
}

func migrationFromInternal(internal *dbsqlc.RiverMigration) *riverdriver.Migration {
return &riverdriver.Migration{
CreatedAt: internal.CreatedAt.UTC(),
Expand Down
31 changes: 5 additions & 26 deletions riverdriver/riversqlite/river_sqlite_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (e *Executor) JobGetAvailable(ctx context.Context, params *riverdriver.JobG
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByID(ctx context.Context, params *riverdriver.JobGetByIDParams) (*rivertype.JobRow, error) {
Expand All @@ -329,15 +329,15 @@ func (e *Executor) JobGetByIDMany(ctx context.Context, params *riverdriver.JobGe
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetByKindMany(ctx context.Context, params *riverdriver.JobGetByKindManyParams) ([]*rivertype.JobRow, error) {
jobs, err := dbsqlc.New().JobGetByKindMany(schemaTemplateParam(ctx, params.Schema), e.dbtx, params.Kind)
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetStuckParams) ([]*rivertype.JobRow, error) {
Expand All @@ -348,7 +348,7 @@ func (e *Executor) JobGetStuck(ctx context.Context, params *riverdriver.JobGetSt
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobInsertFastMany(ctx context.Context, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) {
Expand Down Expand Up @@ -608,7 +608,7 @@ func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParam
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return sliceutil.MapError(jobs, jobRowFromInternal)
}

func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) {
Expand Down Expand Up @@ -1438,27 +1438,6 @@ func leaderFromInternal(internal *dbsqlc.RiverLeader) *riverdriver.Leader {
}
}

// mapSliceError manipulates a slice and transforms it to a slice of another
// type, returning the first error that occurred invoking the map function, if
// there was one.
func mapSliceError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) {
if collection == nil {
return nil, nil
}

result := make([]R, len(collection))

for i, item := range collection {
var err error
result[i], err = mapFunc(item)
if err != nil {
return nil, err
}
}

return result, nil
}

func migrationFromInternal(internal *dbsqlc.RiverMigration) *riverdriver.Migration {
return &riverdriver.Migration{
CreatedAt: internal.CreatedAt.UTC(),
Expand Down
17 changes: 17 additions & 0 deletions rivershared/util/sliceutil/slice_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ func Map[T any, R any](collection []T, mapFunc func(T) R) []R {
return result
}

// MapError manipulates a slice and transforms it to a slice of another type,
// returning the first error that occurred invoking the map function, if there
// was one.
func MapError[T any, R any](collection []T, mapFunc func(T) (R, error)) ([]R, error) {
result := make([]R, len(collection))

for i, item := range collection {
var err error
result[i], err = mapFunc(item)
if err != nil {
return nil, err
}
}

return result, nil
}

// Uniq returns a duplicate-free version of an array, in which only the first occurrence of each element is kept.
// The order of result values is determined by the order they occur in the array.
func Uniq[T comparable](collection []T) []T {
Expand Down
19 changes: 19 additions & 0 deletions rivershared/util/sliceutil/slice_util_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sliceutil

import (
"errors"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -90,6 +91,24 @@ func TestMap(t *testing.T) {
require.Equal(t, []string{"1", "2", "3", "4"}, result2)
}

func TestMapError(t *testing.T) {
t.Parallel()

result1, err := MapError([]int64{1, 2, 3, 4}, func(x int64) (string, error) {
return strconv.FormatInt(x, 10), nil
})
require.NoError(t, err)
require.Equal(t, []string{"1", "2", "3", "4"}, result1)

_, err = MapError([]int64{1, 2, 3, 4}, func(x int64) (string, error) {
if x == 4 {
return "", errors.New("error on element 4")
}
return strconv.FormatInt(x, 10), nil
})
require.EqualError(t, err, "error on element 4")
}

func TestUniq(t *testing.T) {
t.Parallel()

Expand Down
Loading