Skip to content

Commit ea5ea56

Browse files
committed
Add High-Throughput Kafka Pipeline
1 parent ab9ebf3 commit ea5ea56

File tree

6 files changed

+94
-4
lines changed

6 files changed

+94
-4
lines changed

docs/en/architecture.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ Transformations are applied in order: `timestamp`, `flatten`, `filter`, `mask`,
5555
- **Resources**: optional CPU/memory for the processor pod.
5656
- **Scheduling**: optional `nodeSelector`, `affinity`, `tolerations`.
5757
- **CheckpointPersistence**: optional; defaults to `true`. When enabled, polling sources (PostgreSQL, ClickHouse, Trino) persist read position to a ConfigMap, reducing duplicates on restart. Set to `false` to disable.
58+
- **ChannelBufferSize**: optional; defaults to `100`. Buffer size for message channels between source, processor, and sink. Use 500–1000 for high Kafka throughput to reduce blocking when the sink is slower than the source.
5859

5960
Secrets can be referenced via `SecretRef` in the spec; the operator resolves them before writing the spec into the ConfigMap.
6061

@@ -77,6 +78,7 @@ flowchart TB
7778
Resources["resources (optional)"]
7879
Scheduling["scheduling (optional)"]
7980
Checkpoint["checkpointPersistence (optional)"]
81+
ChannelBuffer["channelBufferSize (optional)"]
8082
Image["processorImage / processorVersion (optional)"]
8183
end
8284
@@ -91,6 +93,7 @@ flowchart TB
9193
Spec --> Resources
9294
Spec --> Scheduling
9395
Spec --> Checkpoint
96+
Spec --> ChannelBuffer
9497
Spec --> Image
9598
```
9699

docs/en/connectors.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,3 +905,38 @@ DataFlow Operator supports configuring a separate sink for messages that failed
905905

906906
For configuration, error message structure, and error types, see [Error Handling](errors.md).
907907

908+
## Performance Recommendations
909+
910+
### Spec-level settings
911+
912+
- **channelBufferSize** (default 100): Buffer size for message channels between source, processor, and sink. For high Kafka throughput (tens of thousands msg/s), increase to 500–1000 to reduce blocking when the sink is slower than the source.
913+
914+
### Kafka
915+
916+
- Use multiple brokers for fault tolerance
917+
- Configure an appropriate consumer group size for parallel processing
918+
- Use batch writes for higher throughput
919+
920+
### PostgreSQL
921+
922+
- Increase `batchSize` for the sink (recommended 50–100)
923+
- Add indexes on frequently queried columns
924+
- Tune `pollInterval` based on data update frequency
925+
926+
## Troubleshooting
927+
928+
### Connection issues
929+
930+
1. Verify data source accessibility from the cluster
931+
2. Ensure credentials are correct
932+
3. Check Kubernetes network policies
933+
4. For TLS, verify certificates
934+
935+
### Performance issues
936+
937+
1. Increase `channelBufferSize` (500–1000) for high Kafka load
938+
2. Increase batch sizes for sinks
939+
3. Tune `pollInterval` for sources
940+
4. Scale operator instances if needed
941+
5. Monitor message processing metrics
942+

docs/en/examples.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,28 @@ kubectl apply -f dataflow/config/samples/kafka-to-postgres-secrets.yaml
240240

241241
For supported fields, TLS certificates, and troubleshooting, see [Using Kubernetes Secrets](connectors.md#using-kubernetes-secrets).
242242

243+
## High-Throughput Kafka Pipeline
244+
245+
For high Kafka message rates (tens of thousands msg/s), increase `channelBufferSize` and sink `batchSize`:
246+
247+
```yaml
248+
spec:
249+
channelBufferSize: 500 # default 100; reduces blocking when sink is slower than source
250+
source:
251+
type: kafka
252+
config:
253+
brokers: [localhost:9092]
254+
topic: high-volume-topic
255+
consumerGroup: dataflow-group
256+
sink:
257+
type: postgresql
258+
config:
259+
connectionString: "..."
260+
table: events
261+
batchSize: 500
262+
batchFlushIntervalSeconds: 2
263+
```
264+
243265
## Configuring Pod Resources and Placement
244266

245267
Each DataFlow resource creates a separate pod (Deployment) for processing. You can configure resources, node selection, affinity, and tolerations for these pods.

docs/ru/architecture.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ flowchart LR
5555
- **Resources**: опциональные CPU/память для пода процессора.
5656
- **Scheduling**: опционально `nodeSelector`, `affinity`, `tolerations`.
5757
- **CheckpointPersistence**: опционально; по умолчанию `true`. При включении polling-источники (PostgreSQL, ClickHouse, Trino) сохраняют позицию чтения в ConfigMap, уменьшая дубликаты при перезапуске. Задайте `false` для отключения.
58+
- **ChannelBufferSize**: опционально; по умолчанию `100`. Размер буфера каналов между source, processor и sink. Используйте 500–1000 при высокой нагрузке Kafka, чтобы снизить блокировки, когда sink медленнее source.
5859

5960
Секреты задаются через `SecretRef` в spec; оператор подставляет их перед записью spec в ConfigMap.
6061

@@ -77,6 +78,7 @@ flowchart TB
7778
Resources["resources (опционально)"]
7879
Scheduling["scheduling (опционально)"]
7980
Checkpoint["checkpointPersistence (опционально)"]
81+
ChannelBuffer["channelBufferSize (опционально)"]
8082
Image["processorImage / processorVersion (опционально)"]
8183
end
8284
@@ -91,6 +93,7 @@ flowchart TB
9193
Spec --> Resources
9294
Spec --> Scheduling
9395
Spec --> Checkpoint
96+
Spec --> ChannelBuffer
9497
Spec --> Image
9598
```
9699

docs/ru/connectors.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,10 @@ kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret
12901290

12911291
## Рекомендации по производительности
12921292

1293+
### Общие настройки spec
1294+
1295+
- **channelBufferSize** (по умолчанию 100): размер буфера каналов между source, processor и sink. При высокой нагрузке Kafka (десятки тысяч msg/s) увеличьте до 500–1000, чтобы снизить блокировки, когда sink медленнее source.
1296+
12931297
### Kafka
12941298

12951299
- Используйте несколько брокеров для отказоустойчивости
@@ -1313,10 +1317,11 @@ kubectl logs -l app.kubernetes.io/name=dataflow-operator | grep -i secret
13131317

13141318
### Проблемы с производительностью
13151319

1316-
1. Увеличьте размер батчей для приемников
1317-
2. Настройте правильный `pollInterval` для источников
1318-
3. Используйте несколько инстансов оператора для масштабирования
1319-
4. Мониторьте метрики обработки сообщений
1320+
1. Увеличьте `channelBufferSize` (500–1000) при высокой нагрузке Kafka
1321+
2. Увеличьте размер батчей для приемников
1322+
3. Настройте правильный `pollInterval` для источников
1323+
4. Используйте несколько инстансов оператора для масштабирования
1324+
5. Мониторьте метрики обработки сообщений
13201325

13211326
### Логирование
13221327

docs/ru/examples.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,28 @@ status:
644644
- Настройте правильные consumer groups для Kafka
645645
- Мониторьте статус DataFlow ресурсов
646646

647+
## Высоконагруженный Kafka-пайплайн
648+
649+
При высокой скорости сообщений Kafka (десятки тысяч msg/s) увеличьте `channelBufferSize` и `batchSize` в sink:
650+
651+
```yaml
652+
spec:
653+
channelBufferSize: 500 # по умолчанию 100; снижает блокировки, когда sink медленнее source
654+
source:
655+
type: kafka
656+
config:
657+
brokers: [localhost:9092]
658+
topic: high-volume-topic
659+
consumerGroup: dataflow-group
660+
sink:
661+
type: postgresql
662+
config:
663+
connectionString: "..."
664+
table: events
665+
batchSize: 500
666+
batchFlushIntervalSeconds: 2
667+
```
668+
647669
## Настройка ресурсов и размещения подов
648670

649671
Каждый ресурс DataFlow создает отдельный под (Deployment) для обработки данных. Вы можете настроить ресурсы, выбор нод, affinity и tolerations для этих подов.

0 commit comments

Comments
 (0)