Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions app/_includes/plugins/confluent-kafka-consume/record-headers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
The {{include.name}} plugin can forward HTTP request headers as [Kafka record headers](https://kafka.apache.org/documentation/#recordheader), which are per-record key/value metadata that lives alongside the message key and value.
This lets consumers read routing, tracing, or tenancy context without parsing the message payload.

Configure the [`config.headers`](./reference/#schema--config-headers) block to control which headers are forwarded:

{% table %}
columns:
- title: Mode
key: mode
- title: Description
key: description
- title: Example
key: example
rows:
- mode: "Allowlist ([`forward_all_by_default: false`](./reference/#schema--config-headers-forward-all-by-default))"
description: "Only forward headers listed in [`include_headers`](./reference/#schema--config-headers-include-headers)."
example: "[Forward HTTP headers as Kafka record headers (allowlist mode)](./examples/record-headers-allowlist/)"
- mode: "Blocklist ([`forward_all_by_default: true`](./reference/#schema--config-headers-forward-all-by-default))"
description: "Forward all headers except those listed in [`exclude_headers`](./reference/#schema--config-headers-exclude-headers)."
example: "[Forward HTTP headers as Kafka record headers (blocklist mode)](./examples/record-headers-blocklist/)"
{% endtable %}

Use [`config.headers.name_mappings`](./reference/#schema--config-headers-name-mappings) to rename an HTTP header to a different Kafka record header key.

Use [`config.headers.repeated_headers_behavior`](./reference/#schema--config-headers-repeated-headers-behavior) to control how duplicate HTTP headers are handled: `retain_duplicates` (default) creates a separate record header per value, `take_first` uses only the first value, and `concatenate_by_comma` joins all values with a comma.

{:.info}
> **Note**: The [`config.forward_headers`](./reference/#schema--config-forward-headers) setting embeds request headers inside the message body. `config.headers` is a separate configuration block that sets native Kafka record headers on the produced record.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
description: 'Forward selected HTTP request headers as Kafka record headers.'
extended_description: |
Use `config.headers` to copy HTTP request headers onto the produced Kafka record as native record headers.
This makes routing, tracing, and tenancy context available to consumers without parsing the message payload.

In allowlist mode (`forward_all_by_default: false`), only headers listed in `include_headers` are forwarded.
Use `name_mappings` to rename an HTTP header to a different record header key, for example renaming `x-request-id` to `trace-id`.

title: 'Forward HTTP headers as Kafka record headers (allowlist mode)'

weight: 890

min_version:
gateway: '3.15'

requirements:
- "[Create a Kafka cluster in Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/index.html#step-1-create-a-ak-cluster-in-ccloud)"
- "[Create a Kafka topic in the cluster](https://docs.confluent.io/cloud/current/get-started/index.html#step-2-create-a-ak-topic)"
- "Create an API key and secret in the cluster"

variables:
topic:
description: The name of your Kafka topic.
value: $KAFKA_TOPIC
cluster_api_key:
value: $CONFLUENT_CLUSTER_API_KEY
description: Your Confluent cluster API key.
cluster_api_secret:
value: $CONFLUENT_CLUSTER_API_SECRET
description: Your Confluent cluster API secret.

config:
topic: ${topic}
cluster_api_key: ${cluster_api_key}
cluster_api_secret: ${cluster_api_secret}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma

tools:
- deck
- admin-api
- konnect-api
- kic
- terraform
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
description: 'Forward all HTTP request headers as Kafka record headers, except those on an exclusion list.'
extended_description: |
Use blocklist mode (`forward_all_by_default: true`) to forward every incoming HTTP header as a Kafka record header,
except for headers listed in `exclude_headers`.
This is useful for stripping sensitive headers like `authorization` or `cookie` while preserving all others.

title: 'Forward HTTP headers as Kafka record headers (blocklist mode)'

weight: 889

min_version:
gateway: '3.15'

requirements:
- "[Create a Kafka cluster in Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/index.html#step-1-create-a-ak-cluster-in-ccloud)"
- "[Create a Kafka topic in the cluster](https://docs.confluent.io/cloud/current/get-started/index.html#step-2-create-a-ak-topic)"
- "Create an API key and secret in the cluster"

variables:
topic:
description: The name of your Kafka topic.
value: $KAFKA_TOPIC
cluster_api_key:
value: $CONFLUENT_CLUSTER_API_KEY
description: Your Confluent cluster API key.
cluster_api_secret:
value: $CONFLUENT_CLUSTER_API_SECRET
description: Your Confluent cluster API secret.

config:
topic: ${topic}
cluster_api_key: ${cluster_api_key}
cluster_api_secret: ${cluster_api_secret}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: true
exclude_headers:
- authorization
- cookie
repeated_headers_behavior: retain_duplicates

tools:
- deck
- admin-api
- konnect-api
- kic
- terraform
6 changes: 5 additions & 1 deletion app/_kong_plugins/confluent/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,8 @@ With Kafka at its core, [Confluent](https://confluent.io) offers complete, fully

## Schema registry support {% new_in 3.11 %}

{% include_cached /plugins/confluent-kafka-consume/schema-registry.md name=page.name slug=page.slug workflow='producer' %}
{% include_cached /plugins/confluent-kafka-consume/schema-registry.md name=page.name slug=page.slug workflow='producer' %}

## Kafka record headers {% new_in 3.15 %}

{% include_cached /plugins/confluent-kafka-consume/record-headers.md name=page.name %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
description: 'Forward HTTP request headers as Kafka record headers.'
extended_description: |
Use `config.headers` to copy HTTP request headers onto the produced Kafka record as native record headers.
This makes routing, tracing, and tenancy context available to consumers without parsing the message payload.

In allowlist mode (`forward_all_by_default: false`), only headers listed in `include_headers` are forwarded.
Use `name_mappings` to rename an HTTP header to a different record header key, for example renaming `x-request-id` to `trace-id`.

title: 'Forward HTTP headers as Kafka record headers (allowlist mode)'

weight: 890

min_version:
gateway: '3.15'

requirements:
- "[Kafka installed](https://kafka.apache.org/quickstart#quickstart_download) and running"
- "[Create a Kafka topic](https://kafka.apache.org/quickstart#quickstart_createtopic)"

variables:
topic:
description: The name of your Kafka topic.
value: $KAFKA_TOPIC

config:
topic: ${topic}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma

tools:
- deck
- admin-api
- konnect-api
- kic
- terraform
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
description: 'Forward all HTTP request headers as Kafka record headers, except those on an exclusion list.'
extended_description: |
Use blocklist mode (`forward_all_by_default: true`) to forward every incoming HTTP header as a Kafka record header,
except for headers listed in `exclude_headers`.
This is useful for stripping sensitive headers like `authorization` or `cookie` while preserving all others.

title: 'Forward HTTP headers as Kafka record headers (blocklist mode)'

weight: 889

min_version:
gateway: '3.15'

requirements:
- "[Kafka installed](https://kafka.apache.org/quickstart#quickstart_download) and running"
- "[Create a Kafka topic](https://kafka.apache.org/quickstart#quickstart_createtopic)"

variables:
topic:
description: The name of your Kafka topic.
value: $KAFKA_TOPIC

config:
topic: ${topic}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: true
exclude_headers:
- authorization
- cookie
repeated_headers_behavior: retain_duplicates

tools:
- deck
- admin-api
- konnect-api
- kic
- terraform
4 changes: 4 additions & 0 deletions app/_kong_plugins/kafka-upstream/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ When encoding request bodies, several things happen:

{% include_cached /plugins/confluent-kafka-consume/schema-registry.md name=page.name slug=page.slug workflow='producer' %}

## Kafka record headers {% new_in 3.15 %}

{% include_cached /plugins/confluent-kafka-consume/record-headers.md name=page.name %}

## Debugging {% new_in 3.15 %}

By default, when producing a message fails, the plugin returns a generic error to the HTTP client and logs the real cause in the {{site.base_gateway}} logs:
Expand Down
Loading