Skip to content

Commit a7db9f1

Browse files
committed
Improve clickhouse documentation
1 parent 688cb3f commit a7db9f1

20 files changed

Lines changed: 1299 additions & 769 deletions

api/v1/dataflow_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,11 @@ type ClickHouseSinkSpec struct {
647647
// +optional
648648
AutoCreateTable *bool `json:"autoCreateTable,omitempty"`
649649

650+
// RawMode when true, creates table with data String and created_at columns (JSON storage).
651+
// When false (default), creates table from message structure (columnar, replicates source schema).
652+
// +optional
653+
RawMode *bool `json:"rawMode,omitempty"`
654+
650655
// ConnectionStringSecretRef references a Kubernetes secret for connection string
651656
// +optional
652657
ConnectionStringSecretRef *SecretRef `json:"connectionStringSecretRef,omitempty"`

api/v1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/clickhouse/README.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Конфигурация ClickHouse
2+
3+
Конфигурация ClickHouse для локальной разработки и тестирования DataFlow operator.
4+
5+
## Запуск
6+
7+
ClickHouse запускается через `docker-compose` из корня проекта `dataflow`:
8+
9+
```bash
10+
cd dataflow
11+
docker-compose up -d clickhouse
12+
```
13+
14+
## Параметры подключения
15+
16+
| Параметр | Значение |
17+
|----------|----------|
18+
| **Host** | localhost |
19+
| **HTTP порт** | 8123 |
20+
| **Native порт** | 9000 |
21+
| **База данных** | dataflow |
22+
| **Пользователь** | dataflow |
23+
| **Пароль** | dataflow |
24+
25+
**Connection string (Native):**
26+
```
27+
clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s
28+
```
29+
30+
**JDBC URL (HTTP):**
31+
```
32+
jdbc:ch:http://localhost:8123/dataflow
33+
```
34+
35+
## Структура конфигурации
36+
37+
- `users.d/default-user.xml` — пользователь `dataflow` с доступом из любой сети
38+
- `users.d/session-settings.xml` — лимиты сессий и конкурентных запросов (см. SESSION_IS_LOCKED)
39+
40+
Конфиг монтируется в контейнер: `./config/clickhouse/users.d``/etc/clickhouse-server/users.d`
41+
42+
---
43+
44+
## Ошибка SESSION_IS_LOCKED (Code 373)
45+
46+
Ошибка возникает, когда **один и тот же session_id** используется несколькими запросами одновременно. ClickHouse блокирует сессию на время выполнения запроса.
47+
48+
### Решение 1: Настройка SQL-клиента (рекомендуется)
49+
50+
**DBeaver:**
51+
1. Правый клик по подключению ClickHouse → **Edit Connection**
52+
2. Вкладка **Connection settings****SQL Editor**
53+
3. **Open separate connection for each editor** = **Always**
54+
4. Вкладка **Metadata****Open separate connection for metadata read** = **Always**
55+
56+
**DataGrip:** Аналогично — настройте отдельное соединение для каждой вкладки редактора.
57+
58+
**Альтернатива — HTTP вместо Native:**
59+
- Подключение через HTTP (порт 8123) без `session_id` — каждый запрос выполняется в отдельном контексте
60+
- JDBC URL: `jdbc:ch:http://localhost:8123/dataflow` (драйвер ClickHouse)
61+
62+
### Решение 2: Серверная конфигурация
63+
64+
В `users.d/session-settings.xml` заданы:
65+
- `max_sessions_for_user: 50` — больше одновременных сессий на пользователя
66+
- `max_concurrent_queries_for_user: 50` — больше конкурентных запросов
67+
68+
Это помогает, когда несколько **разных** сессий работают параллельно. Не устраняет проблему переиспользования одной сессии в клиенте.
69+
70+
### Решение 3: Поведение при работе
71+
72+
- Не запускайте несколько запросов одновременно в одной вкладке
73+
- Дождитесь завершения текущего запроса перед запуском следующего
74+
- Используйте отдельные вкладки с отдельными подключениями
75+
76+
---
77+
78+
## Примеры DataFlow
79+
80+
Примеры конфигураций ClickHouse-to-ClickHouse: `config/samples/clickhouse-to-clickhouse.yaml`, `config/samples/clickhouse-to-clickhouse2.yaml`
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<clickhouse>
2+
<!-- Docs: <https://clickhouse.com/docs/en/operations/settings/settings_users/> -->
3+
<users>
4+
<!-- Remove default user -->
5+
<default remove="remove">
6+
</default>
7+
8+
<dataflow>
9+
<profile>default</profile>
10+
<networks>
11+
<ip>::/0</ip>
12+
</networks>
13+
<password><![CDATA[dataflow]]></password>
14+
<quota>default</quota>
15+
<access_management>1</access_management>
16+
</dataflow>
17+
</users>
18+
</clickhouse>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
Настройки для уменьшения ошибок SESSION_IS_LOCKED.
4+
Увеличиваем лимиты сессий и конкурентных запросов.
5+
Важно: ошибка чаще всего возникает из-за переиспользования session_id в SQL-клиенте.
6+
Настройте клиент: отдельное соединение для каждой вкладки или отключите session_id.
7+
Примечание: секция <default> убрана — при CLICKHOUSE_USER=dataflow пользователь default
8+
может отсутствовать в конфиге Docker-образа.
9+
-->
10+
<clickhouse>
11+
<profiles>
12+
<default>
13+
<max_concurrent_queries_for_user>50</max_concurrent_queries_for_user>
14+
</default>
15+
</profiles>
16+
<users>
17+
<dataflow>
18+
<max_sessions_for_user>50</max_sessions_for_user>
19+
</dataflow>
20+
</users>
21+
</clickhouse>

config/crd/bases/dataflow.dataflow.io_dataflows.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,11 @@ spec:
994994
- key
995995
- name
996996
type: object
997+
rawMode:
998+
description: |-
999+
RawMode when true, creates table with data String and created_at columns (JSON storage).
1000+
When false (default), creates table from message structure (columnar, replicates source schema).
1001+
type: boolean
9971002
table:
9981003
description: Table to write to
9991004
type: string
@@ -1855,6 +1860,11 @@ spec:
18551860
- key
18561861
- name
18571862
type: object
1863+
rawMode:
1864+
description: |-
1865+
RawMode when true, creates table with data String and created_at columns (JSON storage).
1866+
When false (default), creates table from message structure (columnar, replicates source schema).
1867+
type: boolean
18581868
table:
18591869
description: Table to write to
18601870
type: string
@@ -3701,6 +3711,11 @@ spec:
37013711
- key
37023712
- name
37033713
type: object
3714+
rawMode:
3715+
description: |-
3716+
RawMode when true, creates table with data String and created_at columns (JSON storage).
3717+
When false (default), creates table from message structure (columnar, replicates source schema).
3718+
type: boolean
37043719
table:
37053720
description: Table to write to
37063721
type: string
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
-- 1. Очистка (на случай повторного запуска) и создание таблицы
2+
DROP TABLE IF EXISTS products;
3+
4+
CREATE TABLE products (
5+
id UInt64,
6+
name String,
7+
category String,
8+
price Decimal(10, 2),
9+
stock_quantity Int32 DEFAULT 0,
10+
sku String,
11+
is_available UInt8 DEFAULT 1,
12+
created_at DateTime DEFAULT now(),
13+
updated_at DateTime DEFAULT now()
14+
) ENGINE = MergeTree()
15+
ORDER BY (id, created_at);
16+
17+
-- 2. Вставка 100 записей
18+
-- В ClickHouse numbers(n) возвращает колонку number (0..n-1), используем подзапрос для 1..100
19+
INSERT INTO products (name, category, price, stock_quantity, sku, created_at, updated_at)
20+
SELECT
21+
'Товар ' || toString(i),
22+
multiIf(i % 5 = 0, 'Электроника', i % 5 = 1, 'Одежда', i % 5 = 2, 'Дом', i % 5 = 3, 'Спорт', 'Книги'),
23+
round(rand() * 5000 + 100, 2),
24+
toInt32(floor(rand() * 1000)),
25+
'ART-' || toString(round(rand() * 5000 + 100)),
26+
now(),
27+
now()
28+
FROM (SELECT number + 1 AS i FROM numbers(100));
29+
30+
-- 3. Обновление 17 случайных записей (ClickHouse mutations)
31+
-- Сначала получаем ID через подзапрос (mutations выполняются асинхронно)
32+
ALTER TABLE products UPDATE
33+
price = round(price * 1.3, 2),
34+
stock_quantity = greatest(0, stock_quantity - toInt32(floor(rand() * 10))),
35+
updated_at = now()
36+
WHERE id IN (
37+
SELECT id FROM products ORDER BY rand() LIMIT 17
38+
);
39+
40+
-- Дождаться завершения мутации (опционально, для проверки)
41+
-- SELECT * FROM system.mutations WHERE table = 'products' AND is_done = 0;
42+
43+
-- 4. Проверка результатов (обновлённые записи)
44+
SELECT id, name, price, stock_quantity, created_at, updated_at
45+
FROM products
46+
WHERE updated_at > created_at
47+
ORDER BY price DESC
48+
LIMIT 50;
49+
50+
-- 4. Проверка products_clone (если dataflow уже скопировал данные)
51+
-- Примечание: dataflow по умолчанию создаёт таблицу с колонками (data String, created_at).
52+
-- Для такой же схемы, как products, создайте products_clone вручную или настройте трансформации.
53+
SELECT id, name, price, stock_quantity, created_at, updated_at
54+
FROM products_clone
55+
WHERE updated_at > created_at
56+
ORDER BY price DESC
57+
LIMIT 50;
58+
59+
-- 4. Проверка products_raw_clone (raw-режим с _metadata)
60+
SELECT id, value, _metadata, created_at, updated_at, deleted_at
61+
FROM products_raw_clone
62+
WHERE updated_at > created_at
63+
ORDER BY id DESC
64+
LIMIT 50;
65+
66+
-- Подсчёт записей
67+
SELECT count(*) FROM products;
68+
SELECT count(*) FROM products_clone;
69+
SELECT count(*) FROM products_raw_clone;
70+
71+
-- 5. Удаление дубликатов в products_clone, оставляя последнюю запись по id
72+
-- В ClickHouse нет ctid; используем ReplacingMergeTree или пересоздание таблицы
73+
74+
-- Вариант A: Если products_clone — ReplacingMergeTree(updated_at), достаточно:
75+
-- OPTIMIZE TABLE products_clone FINAL;
76+
77+
-- Вариант B: Пересоздание таблицы с дедупликацией через argMax
78+
DROP TABLE IF EXISTS products_clone_dedup;
79+
CREATE TABLE products_clone_dedup AS products_clone;
80+
81+
INSERT INTO products_clone_dedup
82+
SELECT
83+
id,
84+
argMax(name, updated_at) AS name,
85+
argMax(category, updated_at) AS category,
86+
argMax(price, updated_at) AS price,
87+
argMax(stock_quantity, updated_at) AS stock_quantity,
88+
argMax(sku, updated_at) AS sku,
89+
argMax(is_available, updated_at) AS is_available,
90+
argMax(created_at, updated_at) AS created_at,
91+
argMax(updated_at, updated_at) AS updated_at
92+
FROM products_clone
93+
GROUP BY id;
94+
95+
RENAME TABLE products_clone TO products_clone_old;
96+
RENAME TABLE products_clone_dedup TO products_clone;
97+
DROP TABLE products_clone_old;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
apiVersion: dataflow.dataflow.io/v1
2+
kind: DataFlow
3+
metadata:
4+
name: clickhouse-to-clickhouse
5+
spec:
6+
source:
7+
type: clickhouse
8+
clickhouse:
9+
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
10+
table: products
11+
pollInterval: 5
12+
sink:
13+
type: clickhouse
14+
clickhouse:
15+
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
16+
table: products_clone
17+
batchSize: 100
18+
autoCreateTable: true
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
apiVersion: dataflow.dataflow.io/v1
2+
kind: DataFlow
3+
metadata:
4+
name: clickhouse-to-clickhouse
5+
spec:
6+
source:
7+
type: clickhouse
8+
clickhouse:
9+
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
10+
table: products
11+
pollInterval: 5
12+
sink:
13+
type: clickhouse
14+
clickhouse:
15+
connectionString: "clickhouse://dataflow:dataflow@localhost:9000/dataflow?dial_timeout=10s"
16+
table: products_row
17+
batchSize: 100
18+
autoCreateTable: true
19+
rawMode: true

docker-compose.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,30 @@ services:
7272
networks:
7373
- dataflow-network
7474

75+
clickhouse:
76+
image: clickhouse/clickhouse-server:24
77+
container_name: clickhouse
78+
cap_add:
79+
- SYS_NICE
80+
- IPC_LOCK
81+
ulimits:
82+
nofile:
83+
soft: 262144
84+
hard: 262144
85+
ports:
86+
- "8123:8123"
87+
- "9000:9000"
88+
environment:
89+
CLICKHOUSE_DB: dataflow
90+
CLICKHOUSE_USER: dataflow
91+
CLICKHOUSE_PASSWORD: dataflow
92+
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
93+
volumes:
94+
- clickhouse-data:/var/lib/clickhouse
95+
- ./config/clickhouse/users.d:/etc/clickhouse-server/users.d
96+
networks:
97+
- dataflow-network
98+
7599
# rest:
76100
# image: apache/iceberg-rest-fixture:1.9.1
77101
# container_name: iceberg-rest
@@ -129,6 +153,7 @@ volumes:
129153
postgres-data:
130154
# pgadmin-data:
131155
rabbitmq-data:
156+
clickhouse-data:
132157
# minio-data:
133158

134159
networks:

0 commit comments

Comments
 (0)