diff --git a/CHANGELOG.md b/CHANGELOG.md index aeb6d064..3c19ed4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ Types of changes - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [3.7.0] + +- `Added` flag `--commit-timeout` to `lino push` command. This flag allows to trigger a commit if no new row is received within the specified duration + ## [3.6.2] - `Fixed` quoting column names in UPDATE during push (PostgreSQL) diff --git a/README.md b/README.md index 8f2c4355..aec54cc6 100755 --- a/README.md +++ b/README.md @@ -345,6 +345,14 @@ Each line is a filter and `lino` apply it to the start table to extract data. The `push` sub-command import a **json** line stream (jsonline format http://jsonlines.org/) in each table, following the ingress descriptor defined in current directory. +### Commit Timeout + +Use the `--commit-timeout` flag to specify a duration (e.g. `5s`, `1m`) after which a commit is triggered if no new row is received. This is useful for streaming applications where data might arrive slowly. + +```bash +$ lino push insert dest --commit-timeout 5s +``` + ### Autotruncate values Use the `autotruncate` flag to automatically truncate string values that overflows the maximum length accepted by the database. diff --git a/internal/app/push/cli.go b/internal/app/push/cli.go index c5ccb9d5..74449272 100755 --- a/internal/app/push/cli.go +++ b/internal/app/push/cli.go @@ -99,6 +99,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra autoTruncate bool watch bool logSQLTo string + commitTimeout time.Duration ) cmd := &cobra.Command{ @@ -179,7 +180,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra observers = append(observers, observer) } - e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...) + e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, commitTimeout, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...) if e3 != nil { log.Fatal().AnErr("error", e3).Msg("Fatal error stop the push command") os.Exit(1) @@ -193,6 +194,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra }, } cmd.Flags().UintVarP(&commitSize, "commitSize", "c", 500, "Commit size") + cmd.Flags().DurationVar(&commitTimeout, "commit-timeout", 0, "Commit timeout (e.g. 5s, 1m). If set, a commit is triggered if no new row is received within this duration.") cmd.Flags().BoolVarP(&disableConstraints, "disable-constraints", "d", false, "Disable constraint during push") cmd.Flags().StringVarP(&catchErrors, "catch-errors", "e", "", "Catch errors and write line in file") cmd.Flags().StringVarP(&table, "table", "t", "", "Table to writes json") diff --git a/internal/app/push/http.go b/internal/app/push/http.go index 92b732f3..88cf6830 100644 --- a/internal/app/push/http.go +++ b/internal/app/push/http.go @@ -146,7 +146,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc log.Debug().Msg(fmt.Sprintf("call Push with mode %s", mode)) - e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false) + e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, 0, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false) if e3 != nil { log.Error().Err(e3).Msg("") w.WriteHeader(http.StatusNotFound) diff --git a/internal/infra/push/datadestination_sql.go b/internal/infra/push/datadestination_sql.go index dd1e1a6c..5db2a0fe 100644 --- a/internal/infra/push/datadestination_sql.go +++ b/internal/infra/push/datadestination_sql.go @@ -382,8 +382,8 @@ func (rw *SQLRowWriter) Write(row push.Row, where push.Row) *push.Error { rw.sqlLogger.Write(values) _, err2 := rw.statement.Exec(values...) - log.Trace().AnErr("error", err2).Msg("push error") if err2 != nil { + log.Trace().AnErr("error", err2).Msg("push error") // reset statement after error if err := rw.close(); err != nil { return &push.Error{Description: err.Error() + "\noriginal error :\n" + err2.Error()} diff --git a/pkg/push/driven_test.go b/pkg/push/driven_test.go index 04a07348..091a79f8 100755 --- a/pkg/push/driven_test.go +++ b/pkg/push/driven_test.go @@ -55,6 +55,7 @@ type memoryDataDestination struct { closed bool committed bool opened bool + commits int } func (mdd *memoryDataDestination) SafeUrl() string { @@ -76,6 +77,7 @@ func (mdd *memoryDataDestination) Open(pla push.Plan, mode push.Mode, disableCon func (mdd *memoryDataDestination) Commit() *push.Error { mdd.committed = true + mdd.commits++ return nil } diff --git a/pkg/push/driver.go b/pkg/push/driver.go index b4bf0e84..b0bf0d42 100755 --- a/pkg/push/driver.go +++ b/pkg/push/driver.go @@ -19,108 +19,290 @@ package push import ( "encoding/json" + "errors" "fmt" "os" + "time" "github.com/rs/zerolog/log" ) +// PushConfig holds configuration for the push operation +type PushConfig struct { + CommitSize uint + CommitTimeout time.Duration + DisableConstraints bool + WhereField string + SavepointPath string + AutoTruncate bool +} + +// pushContext encapsulates the state of a push operation +type pushContext struct { + cfg PushConfig + destination DataDestination + plan Plan + mode Mode + catchError RowWriter + translator Translator + observers []Observer + + committed []Row + inputCount uint +} + // Push write rows to target table -func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, observers ...Observer) (err *Error) { - defer func() { - for _, observer := range observers { - if observer != nil { - observer.Close() - } - } - }() +func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, commitTimeout time.Duration, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, observers ...Observer) *Error { + cfg := PushConfig{ + CommitSize: commitSize, + CommitTimeout: commitTimeout, + DisableConstraints: disableConstraints, + WhereField: whereField, + SavepointPath: savepointPath, + AutoTruncate: autotruncate, + } + + ctx := &pushContext{ + cfg: cfg, + destination: destination, + plan: plan, + mode: mode, + catchError: catchError, + translator: translator, + observers: observers, + committed: make([]Row, 0, commitSize), + } + + return ctx.Run(ri) +} + +func (ctx *pushContext) Run(ri RowIterator) (err *Error) { + defer ctx.closeObservers() log.Info(). - Str("url", destination.SafeUrl()). + Str("url", ctx.destination.SafeUrl()). Msg("Open database") - err1 := destination.Open(plan, mode, disableConstraints) - if err1 != nil { - return err1 + if err := ctx.destination.Open(ctx.plan, ctx.mode, ctx.cfg.DisableConstraints); err != nil { + return err } defer func() { - er1 := destination.Close() - er2 := ri.Close() - - switch { - case er1 != nil && er2 == nil && err == nil: - err = er1 - case er2 != nil && er1 == nil && err == nil: - err = er2 - case err != nil && er1 == nil && er2 == nil: - // err = err - case err != nil || er1 != nil || er2 != nil: - err = &Error{Description: fmt.Sprintf("multiple errors: [%s], [%s], [%s]", err, er1, er2)} - } + err = ctx.cleanup(ri, err) }() Reset() - committed := make([]Row, 0, commitSize) - + // Handle savepoint on exit defer func() { - if savepointPath != "" { - if err := savepoint(savepointPath, committed); err != nil { - log.Error().Msgf("Savepoint failure, %d lines committed unsaved", len(committed)) - for _, unsaved := range committed { + if ctx.cfg.SavepointPath != "" { + if err := savepoint(ctx.cfg.SavepointPath, ctx.committed); err != nil { + log.Error().Msgf("Savepoint failure, %d lines committed unsaved", len(ctx.committed)) + for _, unsaved := range ctx.committed { log.Warn().Interface("value", unsaved).Msg("Unsaved committed value") } } } }() - i := uint(0) - for ri.Next() { - row := ri.Value() + return ctx.processLoop(ri) +} - err2 := pushRow(*row, destination, plan.FirstTable(), plan, mode, translator, whereField) - if err2 != nil { - err4 := catchError.Write(*row, nil) - if err4 != nil { - return &Error{Description: fmt.Sprintf("%s (%s)", err2.Error(), err4.Error())} +func (ctx *pushContext) closeObservers() { + for _, observer := range ctx.observers { + if observer != nil { + observer.Close() + } + } +} + +func (ctx *pushContext) cleanup(ri RowIterator, err *Error) *Error { + er1 := ctx.destination.Close() + er2 := ri.Close() + + // Use helper that aggregates multiple *Error into a single *Error using errors.Join + return combineErrors(err, er1, er2) +} + +func (ctx *pushContext) processLoop(ri RowIterator) *Error { + rowChan, errChan, quit := ctx.startRowReader(ri) + defer close(quit) + + var timer *time.Timer + var timerCh <-chan time.Time + + if ctx.cfg.CommitTimeout > 0 { + timer = time.NewTimer(ctx.cfg.CommitTimeout) + // Ensure timer is stopped when we exit to avoid leaks, though we are exiting anyway. + defer func() { + if timer != nil { + timer.Stop() + } + }() + timerCh = timer.C + } + +loop: + for { + select { + case row, ok := <-rowChan: + if !ok { + select { + case e := <-errChan: + return e + default: + } + break loop + } + + if ctx.cfg.CommitTimeout > 0 { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(ctx.cfg.CommitTimeout) + } + + if err := ctx.processRow(row); err != nil { + return err + } + + case <-timerCh: + if err := ctx.handleTimeout(); err != nil { + return err } - log.Warn().Msg(fmt.Sprintf("Error catched : %s", err2.Error())) } - i++ - if savepointPath != "" { - committed = append(committed, extractValues(*row, plan.FirstTable().PrimaryKey())) + } + + // Final commit for any remaining uncommitted rows + if ctx.inputCount%ctx.cfg.CommitSize != 0 { + log.Info().Msg("Final commit") + if err := ctx.commit(); err != nil { + return err } - if i%commitSize == 0 { - log.Info().Msg("Intermediate commit") - errCommit := destination.Commit() - if errCommit != nil { - return errCommit + } + + log.Info().Msg("End of stream") + return nil +} + +func (ctx *pushContext) startRowReader(ri RowIterator) (<-chan *Row, <-chan *Error, chan struct{}) { + rowChan := make(chan *Row) + errChan := make(chan *Error, 1) + quit := make(chan struct{}) + + go func() { + defer close(rowChan) + for ri.Next() { + val := ri.Value() + // Shallow copy to avoid race conditions + newRow := make(Row, len(*val)) + for k, v := range *val { + newRow[k] = v } - if savepointPath != "" { - if err := savepoint(savepointPath, committed); err != nil { - return err - } - committed = committed[:0] // clear slice without releasing memory + select { + case rowChan <- &newRow: + case <-quit: + return } - IncCommitsCount() } - IncInputLinesCount() - for _, observer := range observers { - if observer != nil { - observer.Pushed() + if e := ri.Error(); e != nil { + select { + case errChan <- e: + case <-quit: + return } } + }() + + return rowChan, errChan, quit +} + +func (ctx *pushContext) processRow(row *Row) *Error { + err := pushRow(*row, ctx.destination, ctx.plan.FirstTable(), ctx.plan, ctx.mode, ctx.translator, ctx.cfg.WhereField) + if err != nil { + if errWrite := ctx.catchError.Write(*row, nil); errWrite != nil { + return &Error{Description: fmt.Sprintf("%s (%s)", err.Error(), errWrite.Error())} + } + log.Warn().Msg(fmt.Sprintf("Error catched : %s", err.Error())) } - if ri.Error() != nil { - return ri.Error() + ctx.inputCount++ + if ctx.cfg.SavepointPath != "" { + ctx.committed = append(ctx.committed, extractValues(*row, ctx.plan.FirstTable().PrimaryKey())) } - log.Info().Msg("End of stream") + if ctx.inputCount%ctx.cfg.CommitSize == 0 { + log.Info().Msg("Intermediate commit") + if err := ctx.commit(); err != nil { + return err + } + } + + IncInputLinesCount() + for _, observer := range ctx.observers { + if observer != nil { + observer.Pushed() + } + } + return nil +} + +func (ctx *pushContext) handleTimeout() *Error { + if ctx.inputCount%ctx.cfg.CommitSize != 0 { + log.Info().Msg("Timeout commit") + return ctx.commit() + } + return nil +} + +func (ctx *pushContext) commit() *Error { + if err := ctx.destination.Commit(); err != nil { + return err + } + if ctx.cfg.SavepointPath != "" { + if err := savepoint(ctx.cfg.SavepointPath, ctx.committed); err != nil { + // Restore previous behavior: log savepoint failures but do not make them fatal. + log.Error().Msgf("Savepoint failure, %d lines committed unsaved: %v", len(ctx.committed), err) + for _, unsaved := range ctx.committed { + log.Warn().Interface("value", unsaved).Msg("Unsaved committed value") + } + // clear committed slice (we consider them committed to the destination even if savepoint failed) + ctx.committed = ctx.committed[:0] + } else { + ctx.committed = ctx.committed[:0] + } + } + IncCommitsCount() return nil } +// combineErrors aggregates multiple *Error values into a single *Error. +// It uses errors.Join for multi-error aggregation and preserves single errors. +func combineErrors(errs ...*Error) *Error { + var nonNil []error + for _, e := range errs { + if e != nil { + nonNil = append(nonNil, e) + } + } + switch len(nonNil) { + case 0: + return nil + case 1: + // If it's already our *Error type, return it as-is + if single, ok := nonNil[0].(*Error); ok { + return single + } + return &Error{Description: nonNil[0].Error()} + default: + joined := errors.Join(nonNil...) + return &Error{Description: joined.Error()} + } +} + // FilterRelation split values and relations to follow func FilterRelation(row Row, relations map[string]Relation, whereField string) (Row, Row, map[string]Row, map[string][]Row, *Error) { frow := Row{} diff --git a/pkg/push/driver_internal_test.go b/pkg/push/driver_internal_test.go new file mode 100644 index 00000000..0479b954 --- /dev/null +++ b/pkg/push/driver_internal_test.go @@ -0,0 +1,31 @@ +// NOTE: this test file introduces a unit test for combineErrors helper. +package push + +import ( + "strings" + "testing" +) + +func TestCombineErrorsAggregatesMultipleErrors(t *testing.T) { + e1 := &Error{Description: "first error"} + e2 := &Error{Description: "second error"} + + combined := combineErrors(e1, e2, nil) + if combined == nil { + t.Fatal("expected combined error, got nil") + } + if !strings.Contains(combined.Description, "first error") || !strings.Contains(combined.Description, "second error") { + t.Fatalf("combined Description did not contain both parts; got: %q", combined.Description) + } +} + +func TestCombineErrorsReturnsSingle(t *testing.T) { + e1 := &Error{Description: "only one"} + combined := combineErrors(nil, e1, nil) + if combined == nil { + t.Fatal("expected single error returned, got nil") + } + if combined != e1 { + t.Fatalf("expected original *Error back, got different value: %v", combined) + } +} diff --git a/pkg/push/driver_test.go b/pkg/push/driver_test.go index 0aaef714..2e0b5c40 100755 --- a/pkg/push/driver_test.go +++ b/pkg/push/driver_test.go @@ -18,7 +18,11 @@ package push_test import ( + "fmt" + "os" + "strings" "testing" + "time" "github.com/cgi-fr/lino/pkg/push" "github.com/stretchr/testify/assert" @@ -47,9 +51,9 @@ func TestSimplePush(t *testing.T) { B.Name(): {}, C.Name(): {}, } - dest := memoryDataDestination{tables, false, false, false} + dest := memoryDataDestination{tables, false, false, false, 0} - err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) assert.Nil(t, err) assert.Equal(t, true, dest.closed) @@ -86,9 +90,9 @@ func TestRelationPush(t *testing.T) { B.Name(): {}, C.Name(): {}, } - dest := memoryDataDestination{tables, false, false, false} + dest := memoryDataDestination{tables, false, false, false, 0} - err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) // no error assert.Nil(t, err) @@ -135,9 +139,9 @@ func TestRelationPushWithEmptyRelation(t *testing.T) { B.Name(): {}, C.Name(): {}, } - dest := memoryDataDestination{tables, false, false, false} + dest := memoryDataDestination{tables, false, false, false, 0} - err := push.Push(&ri, &dest, plan, push.Insert, 2, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) // no error assert.Nil(t, err) @@ -186,9 +190,9 @@ func TestInversseRelationPush(t *testing.T) { B.Name(): {}, C.Name(): {}, } - dest := memoryDataDestination{tables, false, false, false} + dest := memoryDataDestination{tables, false, false, false, 0} - err := push.Push(&ri, &dest, plan, push.Insert, 5, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + err := push.Push(&ri, &dest, plan, push.Insert, 5, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) // no error assert.Nil(t, err) @@ -209,3 +213,880 @@ func TestInversseRelationPush(t *testing.T) { assert.Equal(t, 1, len(dest.tables[C.Name()].rows[0])) assert.Equal(t, "1", dest.tables[C.Name()].rows[0]["history"]) } + +func TestPushWithCommitTimeout(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + + // Iterator that returns 2 rows with a delay between them + ri := &delayedRowIterator{ + rows: []push.Row{ + {"name": "Row1"}, + {"name": "Row2"}, + }, + delay: 200 * time.Millisecond, + } + + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + // Commit size 10, but timeout 100ms. Should trigger commit after first row due to delay. + err := push.Push(ri, dest, plan, push.Insert, 10, 100*time.Millisecond, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + // Should have 2 commits: 1 for timeout after first row, 1 final commit for second row + assert.Equal(t, 2, dest.commits, "Expected 2 commits: 1 timeout + 1 final") + assert.Equal(t, 2, len(dest.tables[A.Name()].rows)) +} + +func TestPushWithSavepoint(t *testing.T) { + tmpfile, err := os.CreateTemp("", "savepoint") + assert.Nil(t, err) + defer os.Remove(tmpfile.Name()) + tmpfile.Close() + + A := push.NewTable("A", []string{"id"}, nil) + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 5, row: push.Row{"id": 1, "name": "John"}} + + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + // Commit size 2, total 5 rows -> 2 intermediate commits + err = push.Push(&ri, dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", tmpfile.Name(), false) + + assert.Nil(t, err) + + content, err := os.ReadFile(tmpfile.Name()) + assert.Nil(t, err) + lines := strings.Split(strings.TrimSpace(string(content)), "\n") + + assert.GreaterOrEqual(t, len(lines), 4) +} + +func TestPushWithObservers(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 5, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + obs := &mockObserver{} + err := push.Push(&ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false, obs) + + assert.Nil(t, err) + assert.Equal(t, 5, obs.pushedCount) + assert.True(t, obs.closed) +} + +type delayedRowIterator struct { + rows []push.Row + index int + delay time.Duration +} + +func (i *delayedRowIterator) Next() bool { + if i.index >= len(i.rows) { + return false + } + if i.index > 0 { + time.Sleep(i.delay) + } + i.index++ + return true +} + +func (i *delayedRowIterator) Value() *push.Row { + return &i.rows[i.index-1] +} + +func (i *delayedRowIterator) Error() *push.Error { + return nil +} + +func (i *delayedRowIterator) Close() *push.Error { + return nil +} + +type mockObserver struct { + pushedCount int + closed bool +} + +func (m *mockObserver) Pushed() { + m.pushedCount++ +} + +func (m *mockObserver) Close() { + m.closed = true +} + +// Test: Empty iterator (no rows) +func TestPushWithEmptyIterator(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 0, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(&ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 0, len(dest.tables[A.Name()].rows)) + assert.Equal(t, 0, dest.commits, "No commits should occur for empty iterator") +} + +// Test: Exactly commitSize rows (boundary condition) +func TestPushWithExactCommitSize(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 5, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(&ri, dest, plan, push.Insert, 5, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 5, len(dest.tables[A.Name()].rows)) + // Should have 1 intermediate commit (at row 5) + no final commit since count % commitSize == 0 + assert.Equal(t, 1, dest.commits, "Expected 1 commit for exact commitSize") +} + +// Test: commitSize = 1 (commit every row) +func TestPushWithCommitSizeOne(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 3, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(&ri, dest, plan, push.Insert, 1, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 3, len(dest.tables[A.Name()].rows)) + assert.Equal(t, 3, dest.commits, "Expected 3 commits (one per row)") +} + +// Test: Iterator error handling +func TestPushWithIteratorError(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := &errorRowIterator{ + rows: []push.Row{{"name": "Row1"}}, + errorAfter: 1, + } + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.NotNil(t, err) + assert.Equal(t, "iterator error", err.Description) +} + +// Test: Destination open error +func TestPushWithDestinationOpenError(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 5, row: push.Row{"name": "John"}} + dest := &errorDataDestination{failOnOpen: true} + + err := push.Push(&ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.NotNil(t, err) + assert.Equal(t, "open error", err.Description) +} + +// Test: Destination commit error +func TestPushWithCommitError(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 3, row: push.Row{"name": "John"}} + dest := &errorDataDestination{ + tables: map[string]*rowWriter{A.Name(): {}}, + failOnCommit: true, + commitToFail: 1, + } + + err := push.Push(&ri, dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.NotNil(t, err) + assert.Equal(t, "commit error", err.Description) +} + +// Test: Multiple timeouts +func TestPushWithMultipleTimeouts(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + + ri := &delayedRowIterator{ + rows: []push.Row{ + {"name": "Row1"}, + {"name": "Row2"}, + {"name": "Row3"}, + }, + delay: 150 * time.Millisecond, + } + + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Insert, 10, 100*time.Millisecond, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 3, len(dest.tables[A.Name()].rows)) + // Should have 2 timeout commits (after row 1 and row 2) + 1 final commit (for row 3) + assert.Equal(t, 3, dest.commits, "Expected 3 commits: 2 timeouts + 1 final") +} + +// Test: Timeout with exact commitSize +func TestPushTimeoutWithExactCommitSize(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + + ri := &delayedRowIterator{ + rows: []push.Row{ + {"name": "Row1"}, + {"name": "Row2"}, + }, + delay: 150 * time.Millisecond, + } + + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + // commitSize = 2, so after 2 rows we hit the size limit + err := push.Push(ri, dest, plan, push.Insert, 2, 100*time.Millisecond, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 2, len(dest.tables[A.Name()].rows)) + // First row -> timeout commit, second row -> intermediate commit (size reached) + assert.Equal(t, 2, dest.commits, "Expected 2 commits: 1 timeout + 1 intermediate") +} + +// Helper types for error testing +type errorRowIterator struct { + rows []push.Row + index int + errorAfter int + failOnClose bool +} + +func (i *errorRowIterator) Next() bool { + if i.index >= len(i.rows) { + return false + } + i.index++ + return true +} + +func (i *errorRowIterator) Value() *push.Row { + return &i.rows[i.index-1] +} + +func (i *errorRowIterator) Error() *push.Error { + if i.index >= i.errorAfter { + return &push.Error{Description: "iterator error"} + } + return nil +} + +func (i *errorRowIterator) Close() *push.Error { + if i.failOnClose { + return &push.Error{Description: "close error"} + } + return nil +} + +type errorDataDestination struct { + tables map[string]*rowWriter + failOnOpen bool + failOnCommit bool + failOnClose bool + commitToFail int + commitCount int + opened bool + closed bool +} + +func (d *errorDataDestination) SafeUrl() string { + return "mem://error-test" +} + +func (d *errorDataDestination) Open(plan push.Plan, mode push.Mode, disableConstraints bool) *push.Error { + if d.failOnOpen { + return &push.Error{Description: "open error"} + } + d.opened = true + return nil +} + +func (d *errorDataDestination) Commit() *push.Error { + d.commitCount++ + if d.failOnCommit && d.commitCount >= d.commitToFail { + return &push.Error{Description: "commit error"} + } + return nil +} + +func (d *errorDataDestination) Close() *push.Error { + d.closed = true + if d.failOnClose { + return &push.Error{Description: "close error"} + } + return nil +} + +func (d *errorDataDestination) RowWriter(table push.Table) (push.RowWriter, *push.Error) { + if d.tables == nil { + d.tables = map[string]*rowWriter{} + } + if _, ok := d.tables[table.Name()]; !ok { + d.tables[table.Name()] = &rowWriter{} + } + return d.tables[table.Name()], nil +} + +func (d *errorDataDestination) OpenSQLLogger(string) error { + return nil +} + +// Test: Update mode with translator (covers computeTranslatedKeys) +func TestPushUpdateModeWithTranslator(t *testing.T) { + A := push.NewTable("A", []string{"id"}, nil) + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 2, row: push.Row{"id": 1, "name": "Updated"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + // Mock translator + translator := &mockTranslator{ + translations: map[string]interface{}{ + "A.id.1": 100, // old value for new value 1 + }, + } + + err := push.Push(&ri, dest, plan, push.Update, 10, 0, true, push.NoErrorCaptureRowWriter{}, translator, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 2, len(dest.tables[A.Name()].rows)) +} + +// Test: Delete mode +func TestPushDeleteMode(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 3, row: push.Row{"name": "ToDelete"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(&ri, dest, plan, push.Delete, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 3, len(dest.tables[A.Name()].rows)) +} + +// Test: Destination close error +func TestPushWithDestinationCloseError(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 2, row: push.Row{"name": "John"}} + dest := &errorDataDestination{ + tables: map[string]*rowWriter{A.Name(): {}}, + failOnClose: true, + } + + err := push.Push(&ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.NotNil(t, err) + assert.Equal(t, "close error", err.Description) +} + +// Test: Iterator close error +func TestPushWithIteratorCloseError(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := &errorRowIterator{ + rows: []push.Row{{"name": "Row1"}}, + errorAfter: 10, // No error during iteration + failOnClose: true, + } + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.NotNil(t, err) + assert.Equal(t, "close error", err.Description) +} + +// Test: Multiple errors (destination close + iterator close) +func TestPushWithMultipleCloseErrors(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := &errorRowIterator{ + rows: []push.Row{{"name": "Row1"}}, + errorAfter: 10, + failOnClose: true, + } + dest := &errorDataDestination{ + tables: map[string]*rowWriter{A.Name(): {}}, + failOnClose: true, + } + + err := push.Push(ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.NotNil(t, err) + assert.Contains(t, err.Description, "close error") +} + +// Test: Row write error with catch error writer +func TestPushWithRowWriteError(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 3, row: push.Row{"name": "John"}} + + // Destination with error row writer + dest := &errorWriterDataDestination{ + tables: map[string]*errorRowWriter{A.Name(): {failAfter: 1}}, + } + + // Catch error writer + errorWriter := &captureRowWriter{} + + err := push.Push(&ri, dest, plan, push.Insert, 10, 0, true, errorWriter, nil, "", "", false) + + assert.Nil(t, err) // Should not fail, errors are caught + assert.Equal(t, 2, len(errorWriter.rows), "Should have caught 2 errors") +} + +// Test: Savepoint write error +func TestPushWithSavepointWriteError(t *testing.T) { + A := push.NewTable("A", []string{"id"}, nil) + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 3, row: push.Row{"id": 1, "name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + // Use invalid path to trigger savepoint error + err := push.Push(&ri, dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "/invalid/path/savepoint.json", false) + + assert.Nil(t, err) // Savepoint failure should be non-fatal +} + +// Helper types for new tests +type mockTranslator struct { + translations map[string]interface{} +} + +func (m *mockTranslator) FindValue(key push.Key, value push.Value) push.Value { + keyStr := fmt.Sprintf("%s.%s.%v", key.TableName, key.ColumnName, value) + if oldValue, ok := m.translations[keyStr]; ok { + return oldValue + } + return value +} + +func (m *mockTranslator) Load(keys []push.Key, rows push.RowIterator) *push.Error { + return nil +} + +type errorRowWriter struct { + rows []push.Row + failAfter int +} + +func (rw *errorRowWriter) Write(row push.Row, where push.Row) *push.Error { + if len(rw.rows) >= rw.failAfter { + return &push.Error{Description: "write error"} + } + rw.rows = append(rw.rows, row) + return nil +} + +type captureRowWriter struct { + rows []push.Row +} + +func (c *captureRowWriter) Write(row push.Row, where push.Row) *push.Error { + c.rows = append(c.rows, row) + return nil +} + +// errorWriterDataDestination is a destination that uses errorRowWriter +type errorWriterDataDestination struct { + tables map[string]*errorRowWriter + opened bool + closed bool +} + +func (d *errorWriterDataDestination) SafeUrl() string { + return "mem://error-writer-test" +} + +func (d *errorWriterDataDestination) Open(plan push.Plan, mode push.Mode, disableConstraints bool) *push.Error { + d.opened = true + return nil +} + +func (d *errorWriterDataDestination) Commit() *push.Error { + return nil +} + +func (d *errorWriterDataDestination) Close() *push.Error { + d.closed = true + return nil +} + +func (d *errorWriterDataDestination) RowWriter(table push.Table) (push.RowWriter, *push.Error) { + if d.tables == nil { + d.tables = map[string]*errorRowWriter{} + } + if _, ok := d.tables[table.Name()]; !ok { + d.tables[table.Name()] = &errorRowWriter{} + } + return d.tables[table.Name()], nil +} + +func (d *errorWriterDataDestination) OpenSQLLogger(string) error { + return nil +} + +// Test: FilterRelation with invalid array element (not a map) +func TestPushWithInvalidRelationArrayElement(t *testing.T) { + A := makeTable("A") + B := makeTable("B") + AB := makeRel(A, B) + + plan := push.NewPlan(A, []push.Relation{AB}) + + // Invalid relation: array element is not a map + ri := &singleRowIterator{ + row: push.Row{ + "name": "John", + "A->B": []interface{}{"invalid", "not a map"}, + }, + } + + tables := map[string]*rowWriter{A.Name(): {}, B.Name(): {}} + dest := &memoryDataDestination{tables: tables} + errorWriter := &captureRowWriter{} + + err := push.Push(ri, dest, plan, push.Insert, 10, 0, true, errorWriter, nil, "", "", false) + + assert.Nil(t, err) // Error should be caught + assert.Equal(t, 1, len(errorWriter.rows)) +} + +// Test: FilterRelation with invalid relation type (not map or array) +func TestPushWithInvalidRelationType(t *testing.T) { + A := makeTable("A") + B := makeTable("B") + AB := makeRel(A, B) + + plan := push.NewPlan(A, []push.Relation{AB}) + + // Invalid relation: not a map or array + ri := &singleRowIterator{ + row: push.Row{ + "name": "John", + "A->B": "invalid string value", + }, + } + + tables := map[string]*rowWriter{A.Name(): {}, B.Name(): {}} + dest := &memoryDataDestination{tables: tables} + errorWriter := &captureRowWriter{} + + err := push.Push(ri, dest, plan, push.Insert, 10, 0, true, errorWriter, nil, "", "", false) + + assert.Nil(t, err) // Error should be caught + assert.Equal(t, 1, len(errorWriter.rows)) +} + +// Test: Push with whereField +func TestPushWithWhereField(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + + ri := &singleRowIterator{ + row: push.Row{ + "name": "John", + "__usingpk__": map[string]interface{}{ + "old_id": 100, + }, + }, + } + + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Update, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "__usingpk__", "", false) + + assert.Nil(t, err) + assert.Equal(t, 1, len(dest.tables[A.Name()].rows)) +} + +// Test: Push with table columns (import="no") +func TestPushWithNoImportColumns(t *testing.T) { + // Create table with columns + columns := []push.Column{ + push.NewColumn("id", "yes", "yes", 0, false, false, ""), + push.NewColumn("name", "yes", "yes", 0, false, false, ""), + push.NewColumn("internal", "yes", "no", 0, false, false, ""), // Should not be imported + } + A := push.NewTable("A", []string{"id"}, push.NewColumnList(columns)) + plan := push.NewPlan(A, []push.Relation{}) + + ri := &singleRowIterator{ + row: push.Row{ + "id": 1, + "name": "John", + "internal": "should_be_removed", + }, + } + + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Insert, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 1, len(dest.tables[A.Name()].rows)) + // The "internal" field should have been removed + _, hasInternal := dest.tables[A.Name()].rows[0]["internal"] + assert.False(t, hasInternal, "internal field should be removed") +} + +// Test: Truncate mode +func TestPushTruncateMode(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 2, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(&ri, dest, plan, push.Truncate, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 2, len(dest.tables[A.Name()].rows)) +} + +// Test: Update mode with relations +func TestPushUpdateModeWithRelations(t *testing.T) { + A := makeTable("A") + B := makeTable("B") + AB := makeRel(A, B) + + plan := push.NewPlan(A, []push.Relation{AB}) + + ri := &singleRowIterator{ + row: push.Row{ + "name": "John", + "A->B": map[string]interface{}{ + "age": 42, + }, + }, + } + + tables := map[string]*rowWriter{A.Name(): {}, B.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Update, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 1, len(dest.tables[A.Name()].rows)) + assert.Equal(t, 1, len(dest.tables[B.Name()].rows)) +} + +// Test: Delete mode with relations +func TestPushDeleteModeWithRelations(t *testing.T) { + A := makeTable("A") + B := makeTable("B") + AB := makeRel(A, B) + + plan := push.NewPlan(A, []push.Relation{AB}) + + ri := &singleRowIterator{ + row: push.Row{ + "name": "John", + "A->B": map[string]interface{}{ + "age": 42, + }, + }, + } + + tables := map[string]*rowWriter{A.Name(): {}, B.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Delete, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 1, len(dest.tables[A.Name()].rows)) + assert.Equal(t, 1, len(dest.tables[B.Name()].rows)) +} + +// Test: Update mode with inverse relations +func TestPushUpdateModeWithInverseRelations(t *testing.T) { + A := makeTable("A") + C := makeTable("C") + B := makeTable("B") + AB := makeRel(A, B) + CB := makeRel(C, B) + + plan := push.NewPlan(A, []push.Relation{AB, CB}) + + ri := &singleRowIterator{ + row: push.Row{ + "name": "John", + "A->B": map[string]interface{}{ + "age": 42, + "C->B": []interface{}{ + map[string]interface{}{"history": "1"}, + }, + }, + }, + } + + tables := map[string]*rowWriter{A.Name(): {}, B.Name(): {}, C.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + err := push.Push(ri, dest, plan, push.Update, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + assert.Equal(t, 1, len(dest.tables[A.Name()].rows)) + assert.Equal(t, 1, len(dest.tables[B.Name()].rows)) + assert.Equal(t, 1, len(dest.tables[C.Name()].rows)) +} + +// Helper: single row iterator +type singleRowIterator struct { + row push.Row + done bool +} + +func (i *singleRowIterator) Next() bool { + if i.done { + return false + } + i.done = true + return true +} + +func (i *singleRowIterator) Value() *push.Row { + return &i.row +} + +func (i *singleRowIterator) Error() *push.Error { + return nil +} + +func (i *singleRowIterator) Close() *push.Error { + return nil +} + +// Test: Stats computation +func TestStatsComputation(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 5, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + push.Reset() + + err := push.Push(&ri, dest, plan, push.Insert, 2, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + + stats := push.Compute() + assert.Equal(t, 5, stats.GetInputLinesCount()) + assert.Equal(t, 5, stats.GetCreatedLinesCount()[A.Name()]) + assert.Equal(t, 3, stats.GetCommitsCount()) // 2 intermediate + 1 final + + // Test ToJSON + jsonBytes := stats.ToJSON() + assert.NotNil(t, jsonBytes) + assert.Greater(t, len(jsonBytes), 0) +} + +// Test: Delete stats +func TestDeleteStats(t *testing.T) { + A := makeTable("A") + plan := push.NewPlan(A, []push.Relation{}) + ri := rowIterator{limit: 3, row: push.Row{"name": "John"}} + tables := map[string]*rowWriter{A.Name(): {}} + dest := &memoryDataDestination{tables: tables} + + push.Reset() + + err := push.Push(&ri, dest, plan, push.Delete, 10, 0, true, push.NoErrorCaptureRowWriter{}, nil, "", "", false) + + assert.Nil(t, err) + + stats := push.Compute() + assert.Equal(t, 3, stats.GetDeletedLinesCount()[A.Name()]) +} + +// Test: Mode parsing +func TestModeParsing(t *testing.T) { + mode, err := push.ParseMode("insert") + assert.Nil(t, err) + assert.Equal(t, push.Insert, mode) + assert.Equal(t, "insert", mode.String()) + + mode, err = push.ParseMode("update") + assert.Nil(t, err) + assert.Equal(t, push.Update, mode) + + mode, err = push.ParseMode("delete") + assert.Nil(t, err) + assert.Equal(t, push.Delete, mode) + + mode, err = push.ParseMode("truncate") + assert.Nil(t, err) + assert.Equal(t, push.Truncate, mode) + + _, err = push.ParseMode("invalid") + assert.NotNil(t, err) +} + +// Test: Plan Tables +func TestPlanTables(t *testing.T) { + A := makeTable("A") + B := makeTable("B") + AB := makeRel(A, B) + + plan := push.NewPlan(A, []push.Relation{AB}) + + tables := plan.Tables() + assert.Equal(t, 2, len(tables)) +} + +// Test: Column methods +func TestColumnMethods(t *testing.T) { + col := push.NewColumn("test", "yes", "yes", 100, false, true, "preserve_value") + + assert.Equal(t, "test", col.Name()) + assert.Equal(t, "yes", col.Export()) + assert.Equal(t, int64(100), col.Length()) + assert.False(t, col.LengthInBytes()) + assert.True(t, col.Truncate()) + assert.Equal(t, "preserve_value", col.Preserve()) +} + +// Test: Table GetColumn +func TestTableGetColumn(t *testing.T) { + columns := []push.Column{ + push.NewColumn("id", "yes", "yes", 0, false, false, ""), + push.NewColumn("name", "yes", "yes", 0, false, false, ""), + } + A := push.NewTable("A", []string{"id"}, push.NewColumnList(columns)) + + col := A.GetColumn("name") + assert.NotNil(t, col) + assert.Equal(t, "name", col.Name()) + + col = A.GetColumn("nonexistent") + assert.Nil(t, col) +} diff --git a/tests/suites/push/stats.yml b/tests/suites/push/stats.yml index 6455bfe9..cdaa31ba 100644 --- a/tests/suites/push/stats.yml +++ b/tests/suites/push/stats.yml @@ -39,7 +39,7 @@ testcases: assertions: - result.code ShouldEqual 0 - result.systemout ShouldBeEmpty - - result.systemerr ShouldContainSubstring {"level":"info","stats":{"inputLinesCount":1,"createdLinesCount":{},"deletedLinesCount":{"staff":1,"store":1},"commitsCount":0 + - result.systemerr ShouldContainSubstring {"level":"info","stats":{"inputLinesCount":1,"createdLinesCount":{},"deletedLinesCount":{"staff":1,"store":1},"commitsCount":1 - script: lino pull dest --limit 1 -f store_id=2 assertions: @@ -58,12 +58,12 @@ testcases: assertions: - result.code ShouldEqual 0 - result.systemout ShouldBeEmpty - - result.systemerr ShouldContainSubstring {"level":"info","stats":{"inputLinesCount":1,"createdLinesCount":{},"deletedLinesCount":{"staff":1,"store":1},"commitsCount":0,"duration" + - result.systemerr ShouldContainSubstring {"level":"info","stats":{"inputLinesCount":1,"createdLinesCount":{},"deletedLinesCount":{"staff":1,"store":1},"commitsCount":1,"duration" - result.systemerr ShouldContainSubstring },"return":0 - script: cat lino-push-delete-stats.json assertions: - - result.systemout ShouldContainSubstring {"inputLinesCount":1,"createdLinesCount":{},"deletedLinesCount":{"staff":1,"store":1},"commitsCount":0,"duration" + - result.systemout ShouldContainSubstring {"inputLinesCount":1,"createdLinesCount":{},"deletedLinesCount":{"staff":1,"store":1},"commitsCount":1,"duration" - name: restore with stats steps: diff --git a/tests/suites/push/timeout.yml b/tests/suites/push/timeout.yml new file mode 100644 index 00000000..2db94533 --- /dev/null +++ b/tests/suites/push/timeout.yml @@ -0,0 +1,23 @@ +name: push timeout +testcases: + - name: prepare test + steps: + # Clean working directory + - script: rm -f * + - script: lino dataconnector add --read-only source postgresql://postgres:sakila@source:5432/postgres?sslmode=disable + - script: lino relation extract source + - script: lino table extract source --only-tables + - script: lino id create store + - script: lino dataconnector add dest postgresql://postgres:sakila@dest:5432/postgres?sslmode=disable + - script: sed -i "s/true/false/g" ingress-descriptor.yaml + - script: lino id set-child-lookup staff_store_id_fkey true + + - name: push with timeout + steps: + # We use a subshell to send a line, wait, and send another line. + # Commit timeout is 1s. We wait 2s. We expect a timeout commit. + - script: | + (echo '{"address_id":2,"last_update":"2006-02-15T09:57:12Z","manager_staff_id":2,"staff_store_id_fkey":[{"active":true,"address_id":4,"email":"Jon.Stephens@sakilastaff.com","first_name":"Jon","last_name":"Stephens","last_update":"2006-05-16T16:13:11.79328Z","password":"8cb2237d0679ca88db6464eac60da96345513964","picture":null,"staff_id":2,"store_id":2,"username":"Jon"}],"store_id":2}'; sleep 2; echo '{"address_id":1,"last_update":"2006-02-15T09:57:12Z","manager_staff_id":1,"staff_store_id_fkey":[{"active":true,"address_id":3,"email":"Mike.Hillyer@sakilastaff.com","first_name":"Mike","last_name":"Hillyer","last_update":"2006-05-16T16:13:11.79328Z","password":"8cb2237d0679ca88db6464eac60da96345513964","picture":null,"staff_id":1,"store_id":1,"username":"Mike"}],"store_id":1}') | lino push -d dest --commit-timeout 1s -v 5 --log-json + assertions: + - result.code ShouldEqual 0 + - result.systemerr ShouldContainSubstring "Timeout commit"