Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Types of changes
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [3.7.0]

- `Added` flag `--commit-timeout` to `lino push` command. This flag allows to trigger a commit if no new row is received within the specified duration

## [3.6.2]

- `Fixed` quoting column names in UPDATE during push (PostgreSQL)
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ Each line is a filter and `lino` apply it to the start table to extract data.

The `push` sub-command import a **json** line stream (jsonline format http://jsonlines.org/) in each table, following the ingress descriptor defined in current directory.

### Commit Timeout

Use the `--commit-timeout` flag to specify a duration (e.g. `5s`, `1m`) after which a commit is triggered if no new row is received. This is useful for streaming applications where data might arrive slowly.

```bash
$ lino push insert dest --commit-timeout 5s
```

### Autotruncate values

Use the `autotruncate` flag to automatically truncate string values that overflows the maximum length accepted by the database.
Expand Down
4 changes: 3 additions & 1 deletion internal/app/push/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
autoTruncate bool
watch bool
logSQLTo string
commitTimeout time.Duration
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -179,7 +180,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
observers = append(observers, observer)
}

e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...)
e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, commitTimeout, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...)
if e3 != nil {
log.Fatal().AnErr("error", e3).Msg("Fatal error stop the push command")
os.Exit(1)
Expand All @@ -193,6 +194,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
},
}
cmd.Flags().UintVarP(&commitSize, "commitSize", "c", 500, "Commit size")
cmd.Flags().DurationVar(&commitTimeout, "commit-timeout", 0, "Commit timeout (e.g. 5s, 1m). If set, a commit is triggered if no new row is received within this duration.")
cmd.Flags().BoolVarP(&disableConstraints, "disable-constraints", "d", false, "Disable constraint during push")
cmd.Flags().StringVarP(&catchErrors, "catch-errors", "e", "", "Catch errors and write line in file")
cmd.Flags().StringVarP(&table, "table", "t", "", "Table to writes json")
Expand Down
2 changes: 1 addition & 1 deletion internal/app/push/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc

log.Debug().Msg(fmt.Sprintf("call Push with mode %s", mode))

e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false)
e3 := push.Push(rowIteratorFactory(r.Body), datadestination, plan, mode, commitSize, 0, disableConstraints, push.NoErrorCaptureRowWriter{}, nil, query.Get("using-pk-field"), "", false)
if e3 != nil {
log.Error().Err(e3).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down
2 changes: 1 addition & 1 deletion internal/infra/push/datadestination_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ func (rw *SQLRowWriter) Write(row push.Row, where push.Row) *push.Error {
rw.sqlLogger.Write(values)

_, err2 := rw.statement.Exec(values...)
log.Trace().AnErr("error", err2).Msg("push error")
if err2 != nil {
log.Trace().AnErr("error", err2).Msg("push error")
// reset statement after error
if err := rw.close(); err != nil {
return &push.Error{Description: err.Error() + "\noriginal error :\n" + err2.Error()}
Expand Down
2 changes: 2 additions & 0 deletions pkg/push/driven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type memoryDataDestination struct {
closed bool
committed bool
opened bool
commits int
}

func (mdd *memoryDataDestination) SafeUrl() string {
Expand All @@ -76,6 +77,7 @@ func (mdd *memoryDataDestination) Open(pla push.Plan, mode push.Mode, disableCon

func (mdd *memoryDataDestination) Commit() *push.Error {
mdd.committed = true
mdd.commits++
return nil
}

Expand Down
Loading
Loading