Skip to content

Commit bc95654

Browse files
committed
Update pg connector documentation
1 parent 976b488 commit bc95654

2 files changed

Lines changed: 93 additions & 42 deletions

File tree

docs/en/connectors.md

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -364,16 +364,19 @@ sink:
364364

365365
## PostgreSQL
366366

367-
The PostgreSQL connector supports reading from and writing to PostgreSQL tables. It supports custom SQL queries, periodic polling, batch inserts, auto-create tables, and UPSERT mode.
367+
The PostgreSQL connector supports reading from and writing to PostgreSQL tables. It supports custom SQL queries, periodic polling, batch inserts, auto-create tables, UPSERT mode, CDC-style change tracking (inserts and updates), soft delete, and SecretRef for credentials.
368368

369369
### Source
370370

371371
```yaml
372372
source:
373373
type: postgresql
374374
postgresql:
375+
# Connection string (required, or use connectionStringSecretRef)
375376
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
377+
# Table to read from (required if query not specified). Supports schema.table (e.g. public.products)
376378
table: source_table
379+
377380
# Custom SQL query (optional)
378381
query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"
379382
# Poll interval in seconds (optional, default: 5)
@@ -382,14 +385,32 @@ source:
382385
# Raw mode (optional, default: false)
383386
# When true, wraps each row as JSON: {"value": <row data>, "_metadata": {table, id}}
384387
rawMode: true
388+
389+
# CDC-style options (optional)
390+
readBatchSize: 1000 # Limit rows per poll to reduce DB load (0 = no limit)
391+
changeTrackingColumn: updated_at # Column to track changes (default: updated_at). Not used when query is specified
392+
autoCreateTable: true # Create table if it doesn't exist before reading
393+
394+
# SecretRef (optional) - use instead of direct values
395+
# connectionStringSecretRef:
396+
# name: postgres-credentials
397+
# key: connectionString
398+
# tableSecretRef:
399+
# name: postgres-credentials
400+
# key: table
385401
```
386402

387-
### Features
403+
### Source Features
388404

389405
- **Periodic Polling**: Regularly polls the table for new data
390406
- **Custom Queries**: Support for complex SQL with JOIN, WHERE, etc.
391-
- **Metadata**: Each message contains `table` metadata
407+
- **Metadata**: Each message contains `table` metadata and `operation` (insert/update)
392408
- **Raw Mode**: When enabled, wraps each row as JSON with `value` (row data) and `_metadata` (table, id)
409+
- **Read Batch Size**: Limits rows per poll to reduce database load when many new records appear
410+
- **Change Tracking**: By default tracks changes via `updated_at` column (or `changeTrackingColumn`), captures both INSERTs and UPDATEs
411+
- **Auto-create Table**: Creates the table with CDC-friendly schema (id, created_at, updated_at) if it doesn't exist
412+
- **Schema notation**: Table name supports `schema.table` format (e.g. `public.products`)
413+
- **In-memory state**: Read position (lastReadChangeTime) is stored only in memory. On pod/connector restart, the table is fully re-read. For pg→pg flows, enable `upsertMode: true` in sink to update duplicates instead of inserting them again.
393414

394415
### Sink
395416

@@ -398,22 +419,38 @@ sink:
398419
type: postgresql
399420
postgresql:
400421
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
422+
# Table to write to. Supports schema.table (e.g. public.products_clone)
401423
table: target_table
424+
402425
# Batch size (optional, default: 1). 0 = flush only by timer
403426
batchSize: 100
404427
# Flush interval in seconds (optional, default: 10). 0 = disable timer
405428
batchFlushIntervalSeconds: 10
406429
autoCreateTable: true
430+
431+
# Raw mode (optional, default: false)
432+
# When true, expects messages in format {"value": <data>, "_metadata": {...}}
433+
# Table is created with value JSONB and _metadata JSONB columns
434+
rawMode: false
435+
407436
# UPSERT mode (optional, default: false)
408437
upsertMode: true
409438
conflictKey: "id"
439+
# Soft delete column (optional). When set, DELETE operations UPDATE this column instead of physical delete
440+
softDeleteColumn: "deleted_at"
441+
442+
# SecretRef (optional)
443+
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
444+
# tableSecretRef: { name: postgres-credentials, key: table }
410445
```
411446

412-
### Features
447+
### Sink Features
413448

414-
- **Batch Inserts**: Groups messages for efficient writing
415-
- **Auto-create Tables**: Creates tables with JSONB field and GIN index
449+
- **Batch Inserts**: Groups messages for efficient writing. Flush when `batchSize` reached or on timer. Use `batchFlushIntervalSeconds: 0` for size-only flush; `batchSize: 0` for timer-only flush.
450+
- **Auto-create Tables**: When `rawMode: true` — creates table with `value` JSONB, `_metadata` JSONB, created_at, updated_at, deleted_at and GIN index. When `rawMode: false` — infers table structure from the first message (replicates source schema).
451+
- **Raw Mode**: When true, expects `{"value": <data>, "_metadata": {...}}` and creates table with value/_metadata columns. When false, uses columnar format from message.
416452
- **UPSERT Mode**: Updates existing records on conflict (PRIMARY KEY or `conflictKey`)
453+
- **Soft Delete**: When `softDeleteColumn` is set and message has `metadata.operation=delete`, performs `UPDATE ... SET deleted_at = NOW()` instead of physical DELETE
417454

418455
## ClickHouse
419456

docs/ru/connectors.md

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ sink:
210210

211211
## PostgreSQL
212212

213-
PostgreSQL коннектор поддерживает чтение из таблиц и запись в таблицы PostgreSQL. Поддерживает кастомные SQL запросы, периодический опрос и батч-вставки.
213+
PostgreSQL коннектор поддерживает чтение из таблиц и запись в таблицы PostgreSQL. Поддерживает кастомные SQL запросы, периодический опрос, батч-вставки, CDC-стиль отслеживания изменений (INSERT и UPDATE), soft delete и SecretRef для учётных данных.
214214

215215
### Источник (Source)
216216

@@ -220,34 +220,45 @@ PostgreSQL коннектор поддерживает чтение из таб
220220
source:
221221
type: postgresql
222222
postgresql:
223-
# Connection string (обязательно)
223+
# Connection string (обязательно, или connectionStringSecretRef)
224224
# Формат: postgres://user:password@host:port/dbname?sslmode=mode
225225
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
226226
227-
# Таблица для чтения (обязательно, если не указан query)
227+
# Таблица для чтения (обязательно, если не указан query). Поддерживает schema.table (напр. public.products)
228228
table: source_table
229229
230230
# Кастомный SQL запрос (опционально)
231231
# Если указан, используется вместо чтения всей таблицы
232232
query: "SELECT * FROM source_table WHERE updated_at > NOW() - INTERVAL '1 hour'"
233233
234234
# Интервал опроса в секундах (опционально, по умолчанию: 5)
235-
# Используется для периодического чтения новых данных
236235
pollInterval: 60
237236
238237
# Режим сырой записи (опционально, по умолчанию: false)
239238
# При true каждая строка оборачивается в JSON: {"value": <данные строки>, "_metadata": {table, id}}
240239
rawMode: true
240+
241+
# Опции CDC-стиля (опционально)
242+
readBatchSize: 1000 # Ограничение строк за один опрос (0 = без лимита)
243+
changeTrackingColumn: updated_at # Колонка для отслеживания изменений (по умолчанию: updated_at). Не используется при заданном query
244+
autoCreateTable: true # Создать таблицу, если не существует, перед чтением
245+
246+
# SecretRef (опционально)
247+
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
248+
# tableSecretRef: { name: postgres-credentials, key: table }
241249
```
242250

243251
#### Особенности PostgreSQL источника
244252

245253
- **Периодический опрос**: Регулярно опрашивает таблицу на наличие новых данных
246254
- **Кастомные запросы**: Поддержка сложных SQL запросов с JOIN, WHERE, и т.д.
247-
- **Метаданные**: Каждое сообщение содержит метаданные:
248-
- `table` - название таблицы
249-
- **Формат данных**: Данные преобразуются в JSON формат
250-
- **Режим сырой записи (rawMode)**: При включении каждая строка оборачивается в JSON с полями `value` (данные строки) и `_metadata` (table, id)
255+
- **Метаданные**: Каждое сообщение содержит метаданные `table` и `operation` (insert/update)
256+
- **Режим rawMode**: При включении каждая строка оборачивается в JSON с полями `value` (данные строки) и `_metadata` (table, id)
257+
- **Размер батча чтения**: Ограничивает количество строк за один опрос для снижения нагрузки на БД
258+
- **Отслеживание обновлений**: По умолчанию отслеживает изменения по колонке `updated_at` (или `changeTrackingColumn`), захватывает INSERT и UPDATE
259+
- **Автосоздание таблицы**: Создаёт таблицу с CDC-совместимой схемой (id, created_at, updated_at), если не существует
260+
- **Схема таблицы**: Имя таблицы поддерживает формат `schema.table` (напр. `public.products`)
261+
- **Состояние в памяти**: Позиция чтения (lastReadChangeTime) хранится только в памяти. При перезапуске пода/коннектора происходит полное перечитывание таблицы. Для pg→pg включите `upsertMode: true` в sink, чтобы дубликаты обновлялись, а не вставлялись повторно.
251262

252263
#### Пример с кастомным запросом
253264

@@ -276,66 +287,69 @@ source:
276287
sink:
277288
type: postgresql
278289
postgresql:
279-
# Connection string (обязательно)
280290
connectionString: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
281-
282-
# Таблица для записи (обязательно)
291+
# Таблица для записи. Поддерживает schema.table (напр. public.products_clone)
283292
table: target_table
284293
285-
# Размер батча для вставки (опционально, по умолчанию: 1)
286-
# Увеличьте для повышения производительности. 0 — сброс только по таймеру
294+
# Размер батча (опционально, по умолчанию: 1). 0 — сброс только по таймеру
287295
batchSize: 100
288-
289-
# Интервал сброса батча в секундах (опционально, по умолчанию: 10)
290-
# Батч сбрасывается при достижении batchSize или по истечении интервала. 0 — отключить сброс по таймеру
296+
# Интервал сброса в секундах (опционально, по умолчанию: 10). 0 — отключить сброс по таймеру
291297
batchFlushIntervalSeconds: 10
292-
293-
# Автоматическое создание таблицы (опционально, по умолчанию: false)
294-
# Если true, создает таблицу с JSONB полем для гибкой схемы
295298
autoCreateTable: true
296299
300+
# Режим rawMode (опционально, по умолчанию: false)
301+
# При true ожидает сообщения в формате {"value": <данные>, "_metadata": {...}}
302+
# Таблица создаётся с колонками value JSONB и _metadata JSONB
303+
rawMode: false
304+
297305
# Режим UPSERT (опционально, по умолчанию: false)
298-
# Если true, существующие записи будут обновляться вместо пропуска
299-
# Используется ON CONFLICT DO UPDATE для обновления записей
300306
upsertMode: true
301-
302-
# Ключ конфликта для UPSERT (опционально, по умолчанию: PRIMARY KEY)
303-
# Указывает колонку(ы) для определения конфликта при UPSERT
304-
# Если не указан, используется PRIMARY KEY таблицы
305307
conflictKey: "id"
308+
# Колонка для soft delete (опционально)
309+
softDeleteColumn: "deleted_at"
310+
311+
# SecretRef (опционально)
312+
# connectionStringSecretRef: { name: postgres-credentials, key: connectionString }
313+
# tableSecretRef: { name: postgres-credentials, key: table }
306314
```
307315

308316
#### Особенности PostgreSQL приемника
309317

310-
- **Батч-вставки**: Группирует сообщения для эффективной записи. По умолчанию сброс при достижении `batchSize` или по таймеру (10 с). Только по размеру: `batchFlushIntervalSeconds: 0`. Только по времени: `batchSize: 0`
311-
- **Автосоздание таблиц**: Автоматически создает таблицы с JSONB полем
312-
- **Гибкая схема**: Поддерживает как JSONB (для автосозданных таблиц), так и колоночный формат
313-
- **Индексы**: Автоматически создает GIN индекс на JSONB поле для быстрого поиска
314-
- **UPSERT режим**: Поддерживает обновление существующих записей при конфликте по PRIMARY KEY или указанному ключу
318+
- **Батч-вставки**: Группирует сообщения для эффективной записи. Сброс при достижении `batchSize` или по таймеру. Только по размеру: `batchFlushIntervalSeconds: 0`. Только по времени: `batchSize: 0`
319+
- **Автосоздание таблиц**: При `rawMode: true` — создаёт таблицу с колонками `value` JSONB, `_metadata` JSONB, created_at, updated_at, deleted_at и GIN индексом. При `rawMode: false` — выводит структуру таблицы из первого сообщения (реплицирует схему источника)
320+
- **Режим rawMode**: При true ожидает `{"value": <данные>, "_metadata": {...}}` и создаёт таблицу с колонками value/_metadata. При false использует колоночный формат из сообщения
321+
- **UPSERT режим**: Обновляет существующие записи при конфликте по PRIMARY KEY или указанному `conflictKey`
322+
- **Soft delete**: При заданном `softDeleteColumn` и `metadata.operation=delete` выполняет `UPDATE ... SET deleted_at = NOW()` вместо физического DELETE
315323

316-
#### Пример с автосозданием таблицы
324+
#### Пример с автосозданием таблицы (rawMode)
317325

318326
```yaml
319327
sink:
320328
type: postgresql
321329
postgresql:
322330
connectionString: "postgres://user:password@localhost:5432/analytics"
323331
table: events
324-
autoCreateTable: true # Создаст таблицу с JSONB полем
332+
autoCreateTable: true
333+
rawMode: true # Таблица с value и _metadata
325334
batchSize: 50
326335
```
327336

328-
Автоматически созданная таблица будет иметь структуру:
337+
При `rawMode: true` и `autoCreateTable: true` таблица создаётся со структурой:
329338

330339
```sql
331340
CREATE TABLE events (
332341
id SERIAL PRIMARY KEY,
333-
data JSONB NOT NULL,
334-
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
342+
value JSONB NOT NULL,
343+
_metadata JSONB,
344+
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
345+
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
346+
deleted_at TIMESTAMP
335347
);
336-
CREATE INDEX idx_events_data ON events USING GIN (data);
348+
CREATE INDEX idx_events_value ON events USING GIN (value);
337349
```
338350

351+
При `rawMode: false` структура таблицы выводится из первого сообщения (колоночный формат).
352+
339353
#### Пример с UPSERT для обновления существующих записей
340354

341355
```yaml

0 commit comments

Comments
 (0)