diff --git a/app/_includes/plugins/confluent-kafka-consume/record-headers.md b/app/_includes/plugins/confluent-kafka-consume/record-headers.md new file mode 100644 index 0000000000..328dc711f6 --- /dev/null +++ b/app/_includes/plugins/confluent-kafka-consume/record-headers.md @@ -0,0 +1,28 @@ +The {{include.name}} plugin can forward HTTP request headers as [Kafka record headers](https://kafka.apache.org/documentation/#recordheader), which are per-record key/value metadata that lives alongside the message key and value. +This lets consumers read routing, tracing, or tenancy context without parsing the message payload. + +Configure the [`config.headers`](./reference/#schema--config-headers) block to control which headers are forwarded: + +{% table %} +columns: + - title: Mode + key: mode + - title: Description + key: description + - title: Example + key: example +rows: + - mode: "Allowlist ([`forward_all_by_default: false`](./reference/#schema--config-headers-forward-all-by-default))" + description: "Only forward headers listed in [`include_headers`](./reference/#schema--config-headers-include-headers)." + example: "[Forward HTTP headers as Kafka record headers (allowlist mode)](./examples/record-headers-allowlist/)" + - mode: "Blocklist ([`forward_all_by_default: true`](./reference/#schema--config-headers-forward-all-by-default))" + description: "Forward all headers except those listed in [`exclude_headers`](./reference/#schema--config-headers-exclude-headers)." + example: "[Forward HTTP headers as Kafka record headers (blocklist mode)](./examples/record-headers-blocklist/)" +{% endtable %} + +Use [`config.headers.name_mappings`](./reference/#schema--config-headers-name-mappings) to rename an HTTP header to a different Kafka record header key. + +Use [`config.headers.repeated_headers_behavior`](./reference/#schema--config-headers-repeated-headers-behavior) to control how duplicate HTTP headers are handled: `retain_duplicates` (default) creates a separate record header per value, `take_first` uses only the first value, and `concatenate_by_comma` joins all values with a comma. + +{:.info} +> **Note**: The [`config.forward_headers`](./reference/#schema--config-forward-headers) setting embeds request headers inside the message body. `config.headers` is a separate configuration block that sets native Kafka record headers on the produced record. diff --git a/app/_kong_plugins/confluent/examples/record-headers-allowlist.yaml b/app/_kong_plugins/confluent/examples/record-headers-allowlist.yaml new file mode 100644 index 0000000000..d25761b96a --- /dev/null +++ b/app/_kong_plugins/confluent/examples/record-headers-allowlist.yaml @@ -0,0 +1,53 @@ +description: 'Forward selected HTTP request headers as Kafka record headers.' +extended_description: | + Use `config.headers` to copy HTTP request headers onto the produced Kafka record as native record headers. + This makes routing, tracing, and tenancy context available to consumers without parsing the message payload. + + In allowlist mode (`forward_all_by_default: false`), only headers listed in `include_headers` are forwarded. + Use `name_mappings` to rename an HTTP header to a different record header key, for example renaming `x-request-id` to `trace-id`. + +title: 'Forward HTTP headers as Kafka record headers (allowlist mode)' + +weight: 890 + +min_version: + gateway: '3.15' + +requirements: +- "[Create a Kafka cluster in Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/index.html#step-1-create-a-ak-cluster-in-ccloud)" +- "[Create a Kafka topic in the cluster](https://docs.confluent.io/cloud/current/get-started/index.html#step-2-create-a-ak-topic)" +- "Create an API key and secret in the cluster" + +variables: + topic: + description: The name of your Kafka topic. + value: $KAFKA_TOPIC + cluster_api_key: + value: $CONFLUENT_CLUSTER_API_KEY + description: Your Confluent cluster API key. + cluster_api_secret: + value: $CONFLUENT_CLUSTER_API_SECRET + description: Your Confluent cluster API secret. + +config: + topic: ${topic} + cluster_api_key: ${cluster_api_key} + cluster_api_secret: ${cluster_api_secret} + headers: + forward_http_headers_as_record_headers: true + forward_all_by_default: false + include_headers: + - x-request-id + - x-tenant-id + - x-correlation-id + name_mappings: + x-request-id: trace-id + x-tenant-id: tenant + repeated_headers_behavior: concatenate_by_comma + +tools: + - deck + - admin-api + - konnect-api + - kic + - terraform diff --git a/app/_kong_plugins/confluent/examples/record-headers-blocklist.yaml b/app/_kong_plugins/confluent/examples/record-headers-blocklist.yaml new file mode 100644 index 0000000000..fdb5c80408 --- /dev/null +++ b/app/_kong_plugins/confluent/examples/record-headers-blocklist.yaml @@ -0,0 +1,47 @@ +description: 'Forward all HTTP request headers as Kafka record headers, except those on an exclusion list.' +extended_description: | + Use blocklist mode (`forward_all_by_default: true`) to forward every incoming HTTP header as a Kafka record header, + except for headers listed in `exclude_headers`. + This is useful for stripping sensitive headers like `authorization` or `cookie` while preserving all others. + +title: 'Forward HTTP headers as Kafka record headers (blocklist mode)' + +weight: 889 + +min_version: + gateway: '3.15' + +requirements: +- "[Create a Kafka cluster in Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/index.html#step-1-create-a-ak-cluster-in-ccloud)" +- "[Create a Kafka topic in the cluster](https://docs.confluent.io/cloud/current/get-started/index.html#step-2-create-a-ak-topic)" +- "Create an API key and secret in the cluster" + +variables: + topic: + description: The name of your Kafka topic. + value: $KAFKA_TOPIC + cluster_api_key: + value: $CONFLUENT_CLUSTER_API_KEY + description: Your Confluent cluster API key. + cluster_api_secret: + value: $CONFLUENT_CLUSTER_API_SECRET + description: Your Confluent cluster API secret. + +config: + topic: ${topic} + cluster_api_key: ${cluster_api_key} + cluster_api_secret: ${cluster_api_secret} + headers: + forward_http_headers_as_record_headers: true + forward_all_by_default: true + exclude_headers: + - authorization + - cookie + repeated_headers_behavior: retain_duplicates + +tools: + - deck + - admin-api + - konnect-api + - kic + - terraform diff --git a/app/_kong_plugins/confluent/index.md b/app/_kong_plugins/confluent/index.md index 6310f07589..8d2c78e4b7 100644 --- a/app/_kong_plugins/confluent/index.md +++ b/app/_kong_plugins/confluent/index.md @@ -70,4 +70,8 @@ With Kafka at its core, [Confluent](https://confluent.io) offers complete, fully ## Schema registry support {% new_in 3.11 %} -{% include_cached /plugins/confluent-kafka-consume/schema-registry.md name=page.name slug=page.slug workflow='producer' %} \ No newline at end of file +{% include_cached /plugins/confluent-kafka-consume/schema-registry.md name=page.name slug=page.slug workflow='producer' %} + +## Kafka record headers {% new_in 3.15 %} + +{% include_cached /plugins/confluent-kafka-consume/record-headers.md name=page.name %} \ No newline at end of file diff --git a/app/_kong_plugins/kafka-upstream/examples/record-headers-allowlist.yaml b/app/_kong_plugins/kafka-upstream/examples/record-headers-allowlist.yaml new file mode 100644 index 0000000000..7ee53c7d37 --- /dev/null +++ b/app/_kong_plugins/kafka-upstream/examples/record-headers-allowlist.yaml @@ -0,0 +1,44 @@ +description: 'Forward HTTP request headers as Kafka record headers.' +extended_description: | + Use `config.headers` to copy HTTP request headers onto the produced Kafka record as native record headers. + This makes routing, tracing, and tenancy context available to consumers without parsing the message payload. + + In allowlist mode (`forward_all_by_default: false`), only headers listed in `include_headers` are forwarded. + Use `name_mappings` to rename an HTTP header to a different record header key, for example renaming `x-request-id` to `trace-id`. + +title: 'Forward HTTP headers as Kafka record headers (allowlist mode)' + +weight: 890 + +min_version: + gateway: '3.15' + +requirements: + - "[Kafka installed](https://kafka.apache.org/quickstart#quickstart_download) and running" + - "[Create a Kafka topic](https://kafka.apache.org/quickstart#quickstart_createtopic)" + +variables: + topic: + description: The name of your Kafka topic. + value: $KAFKA_TOPIC + +config: + topic: ${topic} + headers: + forward_http_headers_as_record_headers: true + forward_all_by_default: false + include_headers: + - x-request-id + - x-tenant-id + - x-correlation-id + name_mappings: + x-request-id: trace-id + x-tenant-id: tenant + repeated_headers_behavior: concatenate_by_comma + +tools: + - deck + - admin-api + - konnect-api + - kic + - terraform diff --git a/app/_kong_plugins/kafka-upstream/examples/record-headers-blocklist.yaml b/app/_kong_plugins/kafka-upstream/examples/record-headers-blocklist.yaml new file mode 100644 index 0000000000..993f502045 --- /dev/null +++ b/app/_kong_plugins/kafka-upstream/examples/record-headers-blocklist.yaml @@ -0,0 +1,38 @@ +description: 'Forward all HTTP request headers as Kafka record headers, except those on an exclusion list.' +extended_description: | + Use blocklist mode (`forward_all_by_default: true`) to forward every incoming HTTP header as a Kafka record header, + except for headers listed in `exclude_headers`. + This is useful for stripping sensitive headers like `authorization` or `cookie` while preserving all others. + +title: 'Forward HTTP headers as Kafka record headers (blocklist mode)' + +weight: 889 + +min_version: + gateway: '3.15' + +requirements: + - "[Kafka installed](https://kafka.apache.org/quickstart#quickstart_download) and running" + - "[Create a Kafka topic](https://kafka.apache.org/quickstart#quickstart_createtopic)" + +variables: + topic: + description: The name of your Kafka topic. + value: $KAFKA_TOPIC + +config: + topic: ${topic} + headers: + forward_http_headers_as_record_headers: true + forward_all_by_default: true + exclude_headers: + - authorization + - cookie + repeated_headers_behavior: retain_duplicates + +tools: + - deck + - admin-api + - konnect-api + - kic + - terraform diff --git a/app/_kong_plugins/kafka-upstream/index.md b/app/_kong_plugins/kafka-upstream/index.md index 030a1fba1a..f28a44fd47 100644 --- a/app/_kong_plugins/kafka-upstream/index.md +++ b/app/_kong_plugins/kafka-upstream/index.md @@ -66,6 +66,10 @@ When encoding request bodies, several things happen: {% include_cached /plugins/confluent-kafka-consume/schema-registry.md name=page.name slug=page.slug workflow='producer' %} +## Kafka record headers {% new_in 3.15 %} + +{% include_cached /plugins/confluent-kafka-consume/record-headers.md name=page.name %} + ## Debugging {% new_in 3.15 %} By default, when producing a message fails, the plugin returns a generic error to the HTTP client and logs the real cause in the {{site.base_gateway}} logs: