Skip to content

Commit ba1210c

Browse files
committed
Add fault-tolerance
1 parent 067ed5b commit ba1210c

6 files changed

Lines changed: 372 additions & 0 deletions

File tree

docs/en/connectors.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ DataFlow Operator supports various connectors for data sources and sinks. Each c
88
|-----------|--------|------|----------|
99
| Kafka ||| Consumer groups, TLS, SASL, Avro, Schema Registry |
1010
| PostgreSQL ||| SQL queries, batch inserts, auto-create tables, UPSERT mode |
11+
| postgresFull ||| Full DB sync: schema (tables, views, indexes, functions) + data |
1112
| Trino ||| SQL queries, Keycloak OAuth2 authentication, batch inserts |
1213
| ClickHouse ||| Polling, batch inserts, auto-create MergeTree tables |
1314
| Nessie ||| Iceberg tables via Nessie catalog, branches, Basic/Bearer auth, polling, batch appends |
@@ -54,6 +55,9 @@ All connectors support secret references for the following fields:
5455
- `connectionStringSecretRef` - connection string
5556
- `tableSecretRef` - table name
5657

58+
#### postgresFull
59+
- `connectionStringSecretRef` - connection string (source and sink)
60+
5761
#### ClickHouse
5862
- `connectionStringSecretRef` - connection string
5963
- `tableSecretRef` - table name
@@ -454,6 +458,66 @@ sink:
454458
- **UPSERT Mode**: Updates existing records on conflict (PRIMARY KEY or `conflictKey`)
455459
- **Soft Delete**: When `softDeleteColumn` is set and message has `metadata.operation=delete`, performs `UPDATE ... SET deleted_at = NOW()` instead of physical DELETE
456460

461+
## postgresFull
462+
463+
The postgresFull connector performs **full database synchronization** from a source PostgreSQL to a target PostgreSQL. It replicates schema (tables, views, materialized views, indexes, sequences, triggers, functions) and optionally data.
464+
465+
### Features
466+
467+
- **Schema sync**: Schemas, tables, views, materialized views, indexes, sequences, triggers, functions
468+
- **Data sync**: Optional data copy (schema_only or schema_and_data)
469+
- **ExcludeObjects**: Filter out object types (view, matview, function, trigger, index, sequence)
470+
- **Databases filter**: Sync specific objects in `schema.object` format
471+
- **ConnectionStringSecretRef**: Use Kubernetes secrets for credentials
472+
473+
### Example
474+
475+
```yaml
476+
apiVersion: dataflow.dataflow.io/v1
477+
kind: DataFlow
478+
metadata:
479+
name: postgres-full-sync
480+
spec:
481+
source:
482+
type: postgresFull
483+
postgresFull:
484+
connectionString: "postgres://user:pass@source-pg:5432/db?sslmode=disable"
485+
syncMode: full
486+
dataMode: schema_and_data # or schema_only
487+
# databases: ["public.users", "analytics.mv_report"] # optional filter
488+
# excludeObjects: ["view", "function"] # optional exclude
489+
sink:
490+
type: postgresFull
491+
postgresFull:
492+
connectionString: "postgres://user:pass@target-pg:5432/db?sslmode=disable"
493+
```
494+
495+
### Source Options
496+
497+
| Option | Description |
498+
|--------|-------------|
499+
| connectionString | PostgreSQL connection string (required, or use connectionStringSecretRef) |
500+
| syncMode | `full` (default) or `incremental` |
501+
| dataMode | `schema_only` or `schema_and_data` (default) |
502+
| databases | List of `schema.object` to sync; empty = all |
503+
| excludeObjects | Exclude types: view, matview, function, trigger, index, sequence |
504+
| syncUsers | Sync roles (CREATE ROLE, without passwords) |
505+
| syncGrants | Sync grants (GRANT on objects) |
506+
507+
### Sink Options
508+
509+
| Option | Description |
510+
|--------|-------------|
511+
| connectionString | Target PostgreSQL connection string |
512+
| dropTarget | Drop objects on target before applying (use with caution) |
513+
514+
### Requirements
515+
516+
- PostgreSQL 12+ on source and target
517+
- Source: SELECT on pg_catalog, USAGE on schemas, SELECT on tables
518+
- Target: CREATE, INSERT, and DDL privileges
519+
- For data sync with FK: uses `session_replication_role = replica` during insert
520+
457521
## ClickHouse
458522

459523
The ClickHouse connector supports reading from and writing to ClickHouse tables. It supports polling for incremental reads, custom SQL queries, batch inserts, and auto-creation of MergeTree tables.

docs/en/fault-tolerance.md

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Fault Tolerance and Data Consistency
2+
3+
DataFlow Operator processes messages with **at-least-once** delivery semantics. When the processor pod crashes or restarts, some messages may be re-read and written again. This document explains the behavior, risks of data desynchronization, and how to configure idempotent sinks to prevent duplicates.
4+
5+
## Delivery Semantics
6+
7+
- **At-least-once**: Each message is delivered at least once. Duplicates are possible on processor restart or crash.
8+
- **Exactly-once**: Not supported natively. Use idempotent sinks to achieve effectively-once semantics.
9+
10+
## Source Behavior on Restart
11+
12+
| Source | State storage | On restart |
13+
|--------|---------------|------------|
14+
| **Kafka** | Consumer group (Kafka) | Resumes from last committed offset. No duplicates if offset was committed after sink write. |
15+
| **PostgreSQL** | In-memory (lastReadChangeTime) | State lost. Re-reads from beginning. Duplicates or gaps possible. |
16+
| **ClickHouse** | In-memory (lastReadID, lastReadTime) | State lost. Re-reads from beginning. Duplicates possible. |
17+
| **Trino** | In-memory (lastReadID) | State lost. Re-reads from beginning. Duplicates possible. |
18+
19+
### Kafka Source
20+
21+
The Kafka consumer commits offset **only after** the message is successfully written to the sink (via `msg.Ack()`). If the processor crashes:
22+
23+
- **Before sink write**: Offset not committed. On restart, message is re-read. No duplicate in sink.
24+
- **After sink write, before Ack**: Data may be in sink, offset not committed. On restart, re-read → duplicate in sink.
25+
- **After Ack**: Offset committed. On restart, resume from next message. No duplicate.
26+
27+
### Polling Sources (PostgreSQL, ClickHouse, Trino)
28+
29+
Read position (lastReadID, lastReadChangeTime) is stored **only in memory**. On pod crash:
30+
31+
- State is lost.
32+
- On restart, the source re-reads from the beginning (or from a wrong position).
33+
- **Duplicates** or **gaps** are possible depending on when the crash occurred.
34+
35+
!!! warning "Idempotent sink required"
36+
For polling sources, always configure an **idempotent sink** (UPSERT, ReplacingMergeTree) to handle duplicates safely.
37+
38+
## Batch Sink Behavior
39+
40+
PostgreSQL, ClickHouse, and Trino sinks write in batches. The flow is:
41+
42+
1. Accumulate messages in batch
43+
2. Execute `Commit` (transaction)
44+
3. Call `Ack()` for each message (commits Kafka offset, if applicable)
45+
46+
If the processor crashes **between Commit and the last Ack**:
47+
48+
- Data is already in the sink
49+
- Kafka offset may not be committed
50+
- On restart: re-read from Kafka → **duplicate writes to sink**
51+
52+
!!! tip "Reduce duplicate window"
53+
Use a smaller `batchSize` to reduce the number of messages at risk of duplication on crash.
54+
55+
## Idempotent Sink Configuration
56+
57+
### PostgreSQL Sink
58+
59+
Enable UPSERT mode so that duplicate inserts update existing rows instead of failing:
60+
61+
```yaml
62+
sink:
63+
type: postgresql
64+
postgresql:
65+
connectionString: "postgres://..."
66+
table: output_table
67+
upsertMode: true
68+
conflictKey: ["id"] # Optional; defaults to PRIMARY KEY
69+
```
70+
71+
Requires the table to have a PRIMARY KEY or UNIQUE constraint on the conflict columns.
72+
73+
### ClickHouse Sink
74+
75+
Use `ReplacingMergeTree` engine for automatic deduplication by a version column:
76+
77+
```sql
78+
CREATE TABLE output_table (
79+
id UInt64,
80+
data String,
81+
created_at DateTime DEFAULT now()
82+
) ENGINE = ReplacingMergeTree(created_at)
83+
ORDER BY id;
84+
```
85+
86+
Or create the table with `autoCreateTable: true` and `rawMode: false` — the connector infers column types. For deduplication, create the table manually with `ReplacingMergeTree(version_column)` and `ORDER BY` on the deduplication key.
87+
88+
### Kafka Sink
89+
90+
The Kafka producer uses `RequiredAcks = WaitForAll` and `Producer.Idempotent = true` for durability and to prevent duplicate messages on retry. Consumers should still handle potential duplicates (e.g., by idempotent processing or deduplication by key) for end-to-end exactly-once semantics.
91+
92+
## Best Practices
93+
94+
1. **Use idempotent sinks** for PostgreSQL (UPSERT) and ClickHouse (ReplacingMergeTree) when using polling sources or when duplicates are possible.
95+
2. **Kafka source**: Consumer group stores offset; at-least-once is preserved. Idempotent sink recommended for batch sinks.
96+
3. **batchSize**: Smaller batches reduce the duplicate window on crash. Balance with throughput.
97+
4. **batchFlushIntervalSeconds**: Shorter intervals flush more frequently, reducing in-flight data at risk.
98+
5. **Error sink**: Configure `spec.errors` to capture failed messages for replay or analysis.
99+
100+
## Graceful Shutdown
101+
102+
On SIGTERM (e.g., pod eviction, node drain):
103+
104+
1. The processor receives the signal and cancels the context.
105+
2. Sinks flush in-flight batches before exiting.
106+
3. `PreStop: sleep 5` gives time for the load balancer to stop routing traffic.
107+
108+
Ensure `terminationGracePeriodSeconds` is sufficient for large batches to flush (default: 600 seconds).
109+
110+
## Checkpoint Persistence (Future)
111+
112+
Persisting source checkpoint (lastReadID, lastReadChangeTime) to external storage (ConfigMap or sink table) would allow polling sources to resume from the last committed position after a processor restart, reducing duplicates. This is planned for a future release. Until then, use idempotent sinks to handle duplicates safely.
113+
114+
## Summary Checklist
115+
116+
| Scenario | Recommendation |
117+
|----------|-----------------|
118+
| PostgreSQL sink | Enable `upsertMode: true` with PRIMARY KEY or `conflictKey` |
119+
| ClickHouse sink | Use `ReplacingMergeTree` with `ORDER BY` on deduplication key |
120+
| Kafka source | Consumer group persists offset; idempotent sink recommended |
121+
| Polling sources | **Always** use idempotent sink; state is lost on crash |
122+
| batchSize | Consider smaller values to reduce duplicate window |

docs/en/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ See [Metrics](metrics.md) for more details.
132132
- [Transformations](transformations.md) — message transformations
133133
- [Examples](examples.md) — practical examples
134134
- [Errors](errors.md) — error handling and error sink
135+
- [Fault Tolerance](fault-tolerance.md) — at-least-once semantics, idempotent sinks, data consistency
135136
- [Metrics](metrics.md) — Prometheus metrics
136137
- [Development](development.md) — developer guide
137138

docs/ru/connectors.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ DataFlow Operator поддерживает различные коннектор
88
|-----------|----------|----------|-------------|
99
| Kafka ||| Consumer groups, TLS, SASL, Avro, Schema Registry |
1010
| PostgreSQL ||| SQL запросы, батч-вставки, автосоздание таблиц, UPSERT режим |
11+
| postgresFull ||| Полная синхронизация БД: схема (таблицы, view, индексы, функции) + данные |
1112
| Trino ||| SQL запросы, аутентификация Keycloak OAuth2, батч-вставки |
1213
| ClickHouse ||| Опрос таблиц, батч-вставки, автосоздание MergeTree таблиц |
1314
| Nessie ||| Таблицы Iceberg через каталог Nessie, ветки, Basic/Bearer auth, опрос, батч-дозапись |
@@ -372,6 +373,66 @@ sink:
372373

373374
**Важно:** Для работы UPSERT таблица должна иметь PRIMARY KEY или UNIQUE constraint на указанном `conflictKey`.
374375

376+
## postgresFull
377+
378+
Коннектор postgresFull выполняет **полную синхронизацию базы данных** из PostgreSQL-источника в PostgreSQL-приёмник. Реплицирует схему (таблицы, представления, материализованные представления, индексы, последовательности, триггеры, функции) и опционально данные.
379+
380+
### Особенности
381+
382+
- **Синхронизация схемы**: схемы, таблицы, views, materialized views, индексы, последовательности, триггеры, функции
383+
- **Синхронизация данных**: опционально (schema_only или schema_and_data)
384+
- **ExcludeObjects**: исключение типов объектов (view, matview, function, trigger, index, sequence)
385+
- **Фильтр databases**: синхронизация конкретных объектов в формате `schema.object`
386+
- **ConnectionStringSecretRef**: использование Kubernetes secrets для учётных данных
387+
388+
### Пример
389+
390+
```yaml
391+
apiVersion: dataflow.dataflow.io/v1
392+
kind: DataFlow
393+
metadata:
394+
name: postgres-full-sync
395+
spec:
396+
source:
397+
type: postgresFull
398+
postgresFull:
399+
connectionString: "postgres://user:pass@source-pg:5432/db?sslmode=disable"
400+
syncMode: full
401+
dataMode: schema_and_data # или schema_only
402+
# databases: ["public.users", "analytics.mv_report"] # опциональный фильтр
403+
# excludeObjects: ["view", "function"] # опциональное исключение
404+
sink:
405+
type: postgresFull
406+
postgresFull:
407+
connectionString: "postgres://user:pass@target-pg:5432/db?sslmode=disable"
408+
```
409+
410+
### Опции источника
411+
412+
| Опция | Описание |
413+
|-------|----------|
414+
| connectionString | Строка подключения PostgreSQL (обязательно, или connectionStringSecretRef) |
415+
| syncMode | `full` (по умолчанию) или `incremental` |
416+
| dataMode | `schema_only` или `schema_and_data` (по умолчанию) |
417+
| databases | Список `schema.object` для синхронизации; пусто = все |
418+
| excludeObjects | Исключить типы: view, matview, function, trigger, index, sequence |
419+
| syncUsers | Синхронизировать роли (CREATE ROLE, без паролей) |
420+
| syncGrants | Синхронизировать права (GRANT на объекты) |
421+
422+
### Опции приёмника
423+
424+
| Опция | Описание |
425+
|-------|----------|
426+
| connectionString | Строка подключения к целевой PostgreSQL |
427+
| dropTarget | Удалить объекты на target перед применением (осторожно) |
428+
429+
### Требования
430+
431+
- PostgreSQL 12+ на source и target
432+
- Source: SELECT на pg_catalog, USAGE на схемах, SELECT на таблицах
433+
- Target: CREATE, INSERT и права на DDL
434+
- Для синхронизации данных с FK: используется `session_replication_role = replica` при вставке
435+
375436
## ClickHouse
376437

377438
ClickHouse коннектор поддерживает чтение из таблиц и запись в таблицы ClickHouse. Поддерживает периодический опрос для инкрементального чтения, кастомные SQL запросы, батч-вставки и автосоздание MergeTree таблиц.

0 commit comments

Comments
 (0)