Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/server/ongoing-tasks/cdc-sink/_category_.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"position": 5,
"label": "CDC Sink"
}
176 changes: 176 additions & 0 deletions docs/server/ongoing-tasks/cdc-sink/api-reference.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
---
title: "Server: Ongoing Tasks: CDC Sink: API Reference"
sidebar_label: API Reference
description: "Client API operations for creating, updating, toggling, and deleting CDC Sink tasks programmatically."
sidebar_position: 11
---

import Admonition from '@theme/Admonition';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import CodeBlock from '@theme/CodeBlock';
import LanguageSwitcher from "@site/src/components/LanguageSwitcher";
import LanguageContent from "@site/src/components/LanguageContent";

# Server: Ongoing Tasks: CDC Sink: API Reference

<Admonition type="note" title="">

* CDC Sink tasks can be created, updated, and managed programmatically using
the RavenDB Client API.

* In this page:
* [Add a CDC Sink Task](#add-a-cdc-sink-task)
* [Update a CDC Sink Task](#update-a-cdc-sink-task)
* [Get Task Info](#get-task-info)
* [Toggle Task State](#toggle-task-state)
* [Delete a Task](#delete-a-task)

</Admonition>

---

## Add a CDC Sink Task

Use `AddCdcSinkOperation` to create a new CDC Sink task:

```csharp
var config = new CdcSinkConfiguration
{
Name = "OrdersSync",
ConnectionStringName = "MyPostgresConnection",
Tables = new List<CdcSinkTableConfig>
{
new CdcSinkTableConfig
{
Name = "Orders",
SourceTableName = "orders",
PrimaryKeyColumns = new List<string> { "id" },
ColumnsMapping = new Dictionary<string, string>
{
{ "id", "Id" },
{ "customer_name", "CustomerName" },
{ "total", "Total" }
}
}
}
};

var result = await store.Maintenance.SendAsync(
new AddCdcSinkOperation(config));

long taskId = result.TaskId;
```

`AddCdcSinkOperationResult`:

| Property | Type | Description |
|----------|------|-------------|
| `TaskId` | `long` | Assigned task ID |
| `RaftCommandIndex` | `long` | Raft index of the command |

---

## Update a CDC Sink Task

Use `UpdateCdcSinkOperation` to modify an existing task.
Pass the full updated configuration including the `TaskId`:

```csharp
config.TaskId = taskId; // Must be set
config.Tables.Add(new CdcSinkTableConfig
{
Name = "Customers",
SourceTableName = "customers",
PrimaryKeyColumns = new List<string> { "id" },
ColumnsMapping = new Dictionary<string, string>
{
{ "id", "Id" },
{ "name", "Name" },
{ "email", "Email" }
}
});

await store.Maintenance.SendAsync(
new UpdateCdcSinkOperation(taskId, config));
```

<Admonition type="warning" title="">

**PostgreSQL — table changes affect the replication slot:**

* **Adding tables** to the task requires updating the publication to include the
new tables. If the slot and publication were auto-named (hash-based), this causes
a new slot/publication to be created under a new hash, and the old ones become
orphaned.
* **Removing tables** from the task means the old publication includes tables that
are no longer needed. The slot keeps receiving updates for those tables
unnecessarily.

If you specified explicit `SlotName` and `PublicationName` in `CdcSinkPostgresSettings`,
you can update the publication manually to add/remove tables without affecting the slot.
See [Initial Setup](./postgres/initial-setup.mdx) for
guidance on manual slot management.

Orphaned slots and publications must be dropped by the database administrator.
See [Cleanup and Maintenance](./postgres/cleanup-and-maintenance.mdx).

</Admonition>

---

## Get Task Info

Use `GetOngoingTaskInfoOperation` to retrieve the current state of a CDC Sink task:

```csharp
var taskInfo = await store.Maintenance.SendAsync(
new GetOngoingTaskInfoOperation(taskId, OngoingTaskType.CdcSink));
```

---

## Toggle Task State

Pause or resume a CDC Sink task using `ToggleOngoingTaskStateOperation`:

```csharp
// Pause the task
await store.Maintenance.SendAsync(
new ToggleOngoingTaskStateOperation(taskId, OngoingTaskType.CdcSink, disable: true));

// Resume the task
await store.Maintenance.SendAsync(
new ToggleOngoingTaskStateOperation(taskId, OngoingTaskType.CdcSink, disable: false));
```

<Admonition type="warning" title="">

**PostgreSQL:** Pausing a CDC Sink task stops the replication slot from being consumed.
PostgreSQL retains WAL segments for unconsumed slots, so pausing for an extended period
causes WAL to accumulate on disk. Monitor disk usage if a task is paused for more than
a short time.
See [Monitoring PostgreSQL](./postgres/monitoring-postgres.mdx).

</Admonition>

---

## Delete a Task

Use `DeleteOngoingTaskOperation` to delete a CDC Sink task:

```csharp
await store.Maintenance.SendAsync(
new DeleteOngoingTaskOperation(taskId, OngoingTaskType.CdcSink));
```

<Admonition type="warning" title="">

Deleting the task in RavenDB does **not** drop the replication slot or publication
in the source database. These must be cleaned up manually by the database administrator.
See [Cleanup and Maintenance](./postgres/cleanup-and-maintenance.mdx).

</Admonition>

---
131 changes: 131 additions & 0 deletions docs/server/ongoing-tasks/cdc-sink/attachment-handling.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
---
title: "Server: Ongoing Tasks: CDC Sink: Attachment Handling"
sidebar_label: Attachment Handling
description: "How to store binary SQL columns as RavenDB attachments using AttachmentNameMapping on root and embedded tables."
sidebar_position: 9
---

import Admonition from '@theme/Admonition';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import CodeBlock from '@theme/CodeBlock';
import LanguageSwitcher from "@site/src/components/LanguageSwitcher";
import LanguageContent from "@site/src/components/LanguageContent";

# Server: Ongoing Tasks: CDC Sink: Attachment Handling

<Admonition type="note" title="">

* Binary SQL columns can be stored as RavenDB **attachments** instead of document
properties using `AttachmentNameMapping`.

* This applies to both root tables and embedded tables.

* In this page:
* [Root Table Attachments](#root-table-attachments)
* [Embedded Table Attachments](#embedded-table-attachments)
* [Attachment Naming](#attachment-naming)
* [Attachment Lifecycle](#attachment-lifecycle)

</Admonition>

---

## Root Table Attachments

Use `AttachmentNameMapping` to map a binary SQL column to a RavenDB attachment:

```csharp
new CdcSinkTableConfig
{
Name = "Files",
SourceTableName = "files",
PrimaryKeyColumns = ["id"],
ColumnsMapping = new Dictionary<string, string>
{
{ "id", "Id" },
{ "filename", "Filename" },
{ "mime_type","MimeType" }
},
AttachmentNameMapping = new Dictionary<string, string>
{
{ "content", "file" } // SQL column "content" → attachment named "file"
}
}
```

The binary `content` column is stored as an attachment named `"file"` on the document.
The attachment is stored with content type `application/octet-stream`.

---

## Embedded Table Attachments

Binary columns on embedded tables are stored as attachments on the **parent** document.
The attachment name is automatically prefixed to ensure uniqueness:

```csharp
new CdcSinkEmbeddedTableConfig
{
SourceTableName = "photos",
PropertyName = "Photos",
PrimaryKeyColumns = ["photo_num"],
JoinColumns = ["product_id"],
ColumnsMapping = new Dictionary<string, string>
{
{ "photo_num", "PhotoNum" },
{ "caption", "Caption" }
},
AttachmentNameMapping = new Dictionary<string, string>
{
{ "thumbnail", "thumb" }
}
}
```

A photo with `photo_num = 1` creates an attachment named `"Photos/1/thumb"` on the
parent document. The prefix `"Photos/1/"` is generated from the `PropertyName` and
the primary key value.

---

## Attachment Naming

**Root table attachments:**

The attachment name is exactly the value you specify in `AttachmentNameMapping`.

```
AttachmentNameMapping = { ["content"] = "file" }
→ Attachment name: "file"
```

**Embedded table attachments:**

The attachment name is prefixed with `{PropertyName}/{pkValue}/`:

```
PropertyName = "Photos"
PrimaryKeyColumns = ["photo_num"] → photo_num = 1
AttachmentNameMapping = { { "thumbnail", "thumb" } }
→ Attachment name: "Photos/1/thumb"
```

For composite primary keys, all key values are joined:

```
PrimaryKeyColumns = ["date", "seq"] → date='2024-01', seq=3
→ Attachment name: "Photos/2024-01/3/thumb"
```

---

## Attachment Lifecycle

* **INSERT** — Attachment is created on the document
* **UPDATE** — Attachment is replaced with the new binary data
* **DELETE (embedded item)** — All attachments for that item are automatically removed
from the parent document
* **DELETE (root document)** — Document and all its attachments are deleted

---
Loading
Loading