Skip to content

Commit 6fe71cc

Browse files
committed
Update Flush logic
1 parent 8a23e6f commit 6fe71cc

12 files changed

Lines changed: 731 additions & 275 deletions

File tree

api/v1/dataflow_types.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,10 @@ type NessieSinkSpec struct {
445445
// +optional
446446
BatchSize *int32 `json:"batchSize,omitempty"`
447447

448+
// BatchFlushIntervalSeconds flushes the batch after this many seconds even if BatchSize is not reached (default: 10; 0 disables timer).
449+
// +optional
450+
BatchFlushIntervalSeconds *int32 `json:"batchFlushIntervalSeconds,omitempty"`
451+
448452
// AutoCreateTable creates the table if it does not exist.
449453
// +optional
450454
AutoCreateTable *bool `json:"autoCreateTable,omitempty"`
@@ -511,6 +515,10 @@ type PostgreSQLSinkSpec struct {
511515
// +optional
512516
BatchSize *int32 `json:"batchSize,omitempty"`
513517

518+
// BatchFlushIntervalSeconds flushes the batch after this many seconds even if BatchSize is not reached (default: 10; 0 disables timer).
519+
// +optional
520+
BatchFlushIntervalSeconds *int32 `json:"batchFlushIntervalSeconds,omitempty"`
521+
514522
// AutoCreateTable automatically creates the table if it doesn't exist
515523
// +optional
516524
AutoCreateTable *bool `json:"autoCreateTable,omitempty"`
@@ -552,6 +560,10 @@ type TrinoSinkSpec struct {
552560
// +optional
553561
BatchSize *int32 `json:"batchSize,omitempty"`
554562

563+
// BatchFlushIntervalSeconds flushes the batch after this many seconds even if BatchSize is not reached (default: 10; 0 disables timer).
564+
// +optional
565+
BatchFlushIntervalSeconds *int32 `json:"batchFlushIntervalSeconds,omitempty"`
566+
555567
// AutoCreateTable automatically creates the table if it doesn't exist
556568
// +optional
557569
AutoCreateTable *bool `json:"autoCreateTable,omitempty"`
@@ -589,7 +601,7 @@ type ClickHouseSinkSpec struct {
589601
// +optional
590602
BatchSize *int32 `json:"batchSize,omitempty"`
591603

592-
// BatchFlushIntervalSeconds flushes the batch after this many seconds even if BatchSize is not reached (default: 10)
604+
// BatchFlushIntervalSeconds flushes the batch after this many seconds even if BatchSize is not reached (default: 10; 0 disables timer).
593605
// +optional
594606
BatchFlushIntervalSeconds *int32 `json:"batchFlushIntervalSeconds,omitempty"`
595607

api/v1/zz_generated.deepcopy.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/dataflow.dataflow.io_dataflows.yaml

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,7 @@ spec:
965965
batchFlushIntervalSeconds:
966966
description: 'BatchFlushIntervalSeconds flushes the batch
967967
after this many seconds even if BatchSize is not reached
968-
(default: 10)'
968+
(default: 10; 0 disables timer).'
969969
format: int32
970970
type: integer
971971
batchSize:
@@ -1270,6 +1270,12 @@ spec:
12701270
- name
12711271
type: object
12721272
type: object
1273+
batchFlushIntervalSeconds:
1274+
description: 'BatchFlushIntervalSeconds flushes the batch
1275+
after this many seconds even if BatchSize is not reached
1276+
(default: 10; 0 disables timer).'
1277+
format: int32
1278+
type: integer
12731279
batchSize:
12741280
description: BatchSize for batch appends.
12751281
format: int32
@@ -1358,6 +1364,12 @@ spec:
13581364
description: AutoCreateTable automatically creates the table
13591365
if it doesn't exist
13601366
type: boolean
1367+
batchFlushIntervalSeconds:
1368+
description: 'BatchFlushIntervalSeconds flushes the batch
1369+
after this many seconds even if BatchSize is not reached
1370+
(default: 10; 0 disables timer).'
1371+
format: int32
1372+
type: integer
13611373
batchSize:
13621374
description: BatchSize for batch inserts
13631375
format: int32
@@ -1425,6 +1437,12 @@ spec:
14251437
description: AutoCreateTable automatically creates the table
14261438
if it doesn't exist
14271439
type: boolean
1440+
batchFlushIntervalSeconds:
1441+
description: 'BatchFlushIntervalSeconds flushes the batch
1442+
after this many seconds even if BatchSize is not reached
1443+
(default: 10; 0 disables timer).'
1444+
format: int32
1445+
type: integer
14281446
batchSize:
14291447
description: BatchSize for batch inserts
14301448
format: int32
@@ -1766,7 +1784,7 @@ spec:
17661784
batchFlushIntervalSeconds:
17671785
description: 'BatchFlushIntervalSeconds flushes the batch
17681786
after this many seconds even if BatchSize is not reached
1769-
(default: 10)'
1787+
(default: 10; 0 disables timer).'
17701788
format: int32
17711789
type: integer
17721790
batchSize:
@@ -2071,6 +2089,12 @@ spec:
20712089
- name
20722090
type: object
20732091
type: object
2092+
batchFlushIntervalSeconds:
2093+
description: 'BatchFlushIntervalSeconds flushes the batch
2094+
after this many seconds even if BatchSize is not reached
2095+
(default: 10; 0 disables timer).'
2096+
format: int32
2097+
type: integer
20742098
batchSize:
20752099
description: BatchSize for batch appends.
20762100
format: int32
@@ -2159,6 +2183,12 @@ spec:
21592183
description: AutoCreateTable automatically creates the table
21602184
if it doesn't exist
21612185
type: boolean
2186+
batchFlushIntervalSeconds:
2187+
description: 'BatchFlushIntervalSeconds flushes the batch
2188+
after this many seconds even if BatchSize is not reached
2189+
(default: 10; 0 disables timer).'
2190+
format: int32
2191+
type: integer
21622192
batchSize:
21632193
description: BatchSize for batch inserts
21642194
format: int32
@@ -2226,6 +2256,12 @@ spec:
22262256
description: AutoCreateTable automatically creates the table
22272257
if it doesn't exist
22282258
type: boolean
2259+
batchFlushIntervalSeconds:
2260+
description: 'BatchFlushIntervalSeconds flushes the batch
2261+
after this many seconds even if BatchSize is not reached
2262+
(default: 10; 0 disables timer).'
2263+
format: int32
2264+
type: integer
22292265
batchSize:
22302266
description: BatchSize for batch inserts
22312267
format: int32
@@ -3566,7 +3602,8 @@ spec:
35663602
batchFlushIntervalSeconds:
35673603
description: 'BatchFlushIntervalSeconds flushes
35683604
the batch after this many seconds even if
3569-
BatchSize is not reached (default: 10)'
3605+
BatchSize is not reached (default: 10; 0
3606+
disables timer).'
35703607
format: int32
35713608
type: integer
35723609
batchSize:
@@ -3897,6 +3934,13 @@ spec:
38973934
- name
38983935
type: object
38993936
type: object
3937+
batchFlushIntervalSeconds:
3938+
description: 'BatchFlushIntervalSeconds flushes
3939+
the batch after this many seconds even if
3940+
BatchSize is not reached (default: 10; 0
3941+
disables timer).'
3942+
format: int32
3943+
type: integer
39003944
batchSize:
39013945
description: BatchSize for batch appends.
39023946
format: int32
@@ -3989,6 +4033,13 @@ spec:
39894033
description: AutoCreateTable automatically
39904034
creates the table if it doesn't exist
39914035
type: boolean
4036+
batchFlushIntervalSeconds:
4037+
description: 'BatchFlushIntervalSeconds flushes
4038+
the batch after this many seconds even if
4039+
BatchSize is not reached (default: 10; 0
4040+
disables timer).'
4041+
format: int32
4042+
type: integer
39924043
batchSize:
39934044
description: BatchSize for batch inserts
39944045
format: int32
@@ -4059,6 +4110,13 @@ spec:
40594110
description: AutoCreateTable automatically
40604111
creates the table if it doesn't exist
40614112
type: boolean
4113+
batchFlushIntervalSeconds:
4114+
description: 'BatchFlushIntervalSeconds flushes
4115+
the batch after this many seconds even if
4116+
BatchSize is not reached (default: 10; 0
4117+
disables timer).'
4118+
format: int32
4119+
type: integer
40624120
batchSize:
40634121
description: BatchSize for batch inserts
40644122
format: int32

0 commit comments

Comments
 (0)