From b86e5e63ec2efd8dd368682a1884ba0d0ea266d5 Mon Sep 17 00:00:00 2001 From: lena-larionova Date: Tue, 19 May 2026 17:22:42 -0700 Subject: [PATCH 01/10] bootstrap event gateway 1.2 --- .../event-gateway-bootstrap-schema/1.2.json | 379 +++++++++++ app/_data/products/event-gateway.yml | 8 +- app/_references/event-gateway/1.2/metrics.md | 643 ++++++++++++++++++ 3 files changed, 1029 insertions(+), 1 deletion(-) create mode 100644 app/_data/event-gateway-bootstrap-schema/1.2.json create mode 100644 app/_references/event-gateway/1.2/metrics.md diff --git a/app/_data/event-gateway-bootstrap-schema/1.2.json b/app/_data/event-gateway-bootstrap-schema/1.2.json new file mode 100644 index 0000000000..f7eb71d2d5 --- /dev/null +++ b/app/_data/event-gateway-bootstrap-schema/1.2.json @@ -0,0 +1,379 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "BootstrapConfig", + "type": "object", + "properties": { + "konnect": { + "description": "Base Konnect configuration.\n\nContains all the necessary information to connect to Konnect,\nincluding TLS configuration for mTLS authentication.", + "type": "object", + "properties": { + "region": { + "description": "The Konnect region to connect to (e.g., \"us\", \"eu\")", + "type": "string" + }, + "domain": { + "description": "The Konnect domain (e.g., \"konghq.com\", \"konghq.tech\")", + "type": "string", + "default": "konghq.com" + }, + "gateway_cluster_id": { + "description": "The Gateway Cluster ID (also known as Control Plane ID)", + "type": "string" + }, + "api_request_timeout": { + "description": "Timeout for API requests to Konnect (in seconds)", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "5s" + }, + "insecure_skip_verify": { + "description": "Skip TLS verification (insecure, for testing only)", + "type": "boolean", + "default": false + }, + "client_cert": { + "description": "Client certificate for mTLS (PEM format)", + "type": [ + "string", + "null" + ] + }, + "client_cert_path": { + "description": "Path to client certificate file", + "type": [ + "string", + "null" + ] + }, + "client_key": { + "description": "Client private key for mTLS (PEM format)", + "anyOf": [ + { + "$ref": "#/$defs/Secret" + }, + { + "type": "null" + } + ] + }, + "client_key_path": { + "description": "Path to client private key file", + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "region", + "gateway_cluster_id" + ], + "allOf": [ + { + "oneOf": [ + { + "required": [ + "client_cert" + ] + }, + { + "required": [ + "client_cert_path" + ] + } + ] + }, + { + "oneOf": [ + { + "required": [ + "client_key" + ] + }, + { + "required": [ + "client_key_path" + ] + } + ] + } + ] + }, + "config_poll_interval": { + "description": "The interval at which to poll for configuration updates\n\nEnvironment variable: `KEG__CONFIG_POLL_INTERVAL`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "5s" + }, + "runtime": { + "description": "Runtime configuration", + "type": "object", + "properties": { + "drain_duration": { + "description": "Duration to wait for existing connections to drain. It is the time between\n\nEnvironment variable: `KEG__RUNTIME__DRAIN_DURATION`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "5s" + }, + "shutdown_timeout": { + "description": "Timeout for graceful shutdown. It is the time between sending the shutdown\nsignal and forcefully terminating the process.\n\nEnvironment variable: `KEG__RUNTIME__SHUTDOWN_TIMEOUT`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "health_listener_address_port": { + "description": "Address and port for the health listener.\n\nEnvironment variable: `KEG__RUNTIME__HEALTH_LISTENER_ADDRESS_PORT`", + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "[::1]:8080", + "127.0.0.1:8080" + ] + } + }, + "additionalProperties": false + }, + "observability": { + "description": "Observability configuration", + "type": "object", + "properties": { + "log_flags": { + "description": "Configure the log level. Possible levels are: \"trace\", \"debug\", \"info\", \"warn\", \"error\"\n\nThe format is a comma-separated list of log levels for different modules, e.g., `info,keg=debug,my_module=trace`\nIf only a single level is provided, it applies to all modules.\n\nEnvironment variable: `KEG__OBSERVABILITY__LOG_FLAGS`", + "type": "string", + "default": "info" + }, + "log_format": { + "description": "Configure the log format.\n\nEnvironment variable: `KEG__OBSERVABILITY__LOG_FORMAT`", + "type": "string", + "enum": [ + "logfmt", + "json" + ] + }, + "metrics_rollup_allow_map": { + "description": "Configure the metrics rollup allow map, this is to avoid high cardinality metrics for example `messaging.operation.name`, `messaging.destination.name` or `messaging.destination.partition.id`.\nThe format is `=,|=,`\nNon-allowed values will be mapped to \"other\"\n\nEnvironment variable: `KEG__OBSERVABILITY__METRICS_ROLLUP_ALLOW_MAP`", + "type": "string", + "default": "messaging.operation.name=produce,fetch" + }, + "policy_errors_info_log_interval": { + "description": "Interval for INFO-level logging of policy errors (time-based sampling).\nSet to \"0s\" to disable.\n\nEnvironment variable: `KEG__OBSERVABILITY__POLICY_ERRORS_INFO_LOG_INTERVAL`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "1s" + }, + "otel": { + "description": "OpenTelemetry configuration", + "$ref": "#/$defs/OtelConfig" + } + }, + "additionalProperties": false + }, + "enable_debug_endpoints": { + "description": "Enables debug endpoints:\n\n- /debug/pprof/allocs endpoint\n Additionally, you need to enable jemalloc profiling like this `MALLOC_CONF=\"prof:true,prof_active:true,lg_prof_sample:19\"`\n\nEnvironment variable: `KEG__ENABLE_DEBUG_ENDPOINTS`", + "type": "boolean", + "default": false + } + }, + "additionalProperties": false, + "$defs": { + "Secret": { + "description": "Wrapper for sensitive values that redacts content in debug output.\n\nThis type wraps the `secrecy` crate's `SecretString` and adds `JsonSchema`\nsupport for configuration schema generation. The underlying value is\nzeroized on drop for memory protection.", + "type": "string" + }, + "OtelConfig": { + "description": "OTLP exporter configuration for all signals.\n\nFollows the [OpenTelemetry OTLP exporter specification](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/).\nSignal-specific environment variables (e.g., `OTEL_EXPORTER_OTLP_TRACES_*`) take\nprecedence over base variables (`OTEL_EXPORTER_OTLP_*`).\n\nEnvironment variables:\n- Base: `OTEL_EXPORTER_OTLP_ENDPOINT`, `OTEL_EXPORTER_OTLP_PROTOCOL`, `OTEL_EXPORTER_OTLP_TIMEOUT`\n- Traces: `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`, `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`\n - Sampler: `OTEL_TRACES_SAMPLER`, `OTEL_TRACES_SAMPLER_ARG`\n- Metrics: `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`, `OTEL_EXPORTER_OTLP_METRICS_PROTOCOL`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`\n- Logs: `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT`, `OTEL_EXPORTER_OTLP_LOGS_PROTOCOL`, `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT`", + "type": "object", + "properties": { + "traces": { + "description": "OTLP exporter and sampler configuration for traces.", + "$ref": "#/$defs/OtelTracesConfig" + }, + "metrics": { + "description": "OTLP exporter configuration for metrics.", + "$ref": "#/$defs/OtelMetricsConfig" + }, + "logs": { + "description": "OTLP exporter configuration for logs.", + "$ref": "#/$defs/OtelLogsConfig" + } + } + }, + "OtelTracesConfig": { + "description": "Trace signal configuration: OTLP exporter settings plus sampler configuration.", + "type": "object", + "properties": { + "endpoint": { + "description": "OTLP endpoint URL.\n\nFor HTTP protocols, should include the full path (e.g., `http://localhost:4318/v1/{signal}`).\nFor gRPC, should be the base URL (e.g., `http://localhost:4317`).\n\nIf not set, the signal is considered disabled.", + "type": [ + "string", + "null" + ], + "default": null + }, + "protocol": { + "description": "OTLP transport protocol (grpc, http/binary, http/json).", + "$ref": "#/$defs/OtlpProtocol" + }, + "timeout": { + "description": "Request timeout for the OTLP exporter.", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "headers": { + "description": "Additional headers to send with OTLP requests.\n\nHeaders are specified as key-value pairs.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "default": {} + }, + "traces_sampler": { + "description": "Trace sampler strategy.", + "$ref": "#/$defs/TracesSampler" + }, + "traces_sampler_arg": { + "description": "Trace sampler argument.", + "type": "string", + "default": "1.0" + } + } + }, + "OtlpProtocol": { + "description": "OTLP transport protocol.\n\nWraps `opentelemetry_otlp::Protocol` with serde and schema support.", + "oneOf": [ + { + "description": "gRPC protocol", + "type": "string", + "const": "grpc" + }, + { + "description": "HTTP protocol with binary encoding (default)", + "type": "string", + "const": "http/binary" + }, + { + "description": "HTTP protocol with JSON encoding", + "type": "string", + "const": "http/json" + } + ] + }, + "TracesSampler": { + "description": "Trace sampler strategy.\n\nCorresponds to the `OTEL_TRACES_SAMPLER` environment variable defined in the\n[OpenTelemetry SDK environment variables specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration).\n\nCurrently only `parentbased_traceidratio` is supported.\nThis field exists for spec compliance. Support for additional samplers may be added\nin future releases.", + "oneOf": [ + { + "description": "Parent-based trace ID ratio sampler.", + "type": "string", + "const": "parentbased_traceidratio" + } + ] + }, + "OtelMetricsConfig": { + "description": "Metrics signal configuration: OTLP exporter settings for metrics.\n\nAll fields from [`OtelSignalConfig`] are available via `#[serde(flatten)]`.", + "type": "object", + "properties": { + "endpoint": { + "description": "OTLP endpoint URL.\n\nFor HTTP protocols, should include the full path (e.g., `http://localhost:4318/v1/{signal}`).\nFor gRPC, should be the base URL (e.g., `http://localhost:4317`).\n\nIf not set, the signal is considered disabled.", + "type": [ + "string", + "null" + ], + "default": null + }, + "protocol": { + "description": "OTLP transport protocol (grpc, http/binary, http/json).", + "$ref": "#/$defs/OtlpProtocol" + }, + "timeout": { + "description": "Request timeout for the OTLP exporter.", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "headers": { + "description": "Additional headers to send with OTLP requests.\n\nHeaders are specified as key-value pairs.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "default": {} + } + } + }, + "OtelLogsConfig": { + "description": "Logs signal configuration: OTLP exporter settings for logs.\n\nAll fields from [`OtelSignalConfig`] are available via `#[serde(flatten)]`.", + "type": "object", + "properties": { + "endpoint": { + "description": "OTLP endpoint URL.\n\nFor HTTP protocols, should include the full path (e.g., `http://localhost:4318/v1/{signal}`).\nFor gRPC, should be the base URL (e.g., `http://localhost:4317`).\n\nIf not set, the signal is considered disabled.", + "type": [ + "string", + "null" + ], + "default": null + }, + "protocol": { + "description": "OTLP transport protocol (grpc, http/binary, http/json).", + "$ref": "#/$defs/OtlpProtocol" + }, + "timeout": { + "description": "Request timeout for the OTLP exporter.", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "headers": { + "description": "Additional headers to send with OTLP requests.\n\nHeaders are specified as key-value pairs.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "default": {} + } + } + } + } +} \ No newline at end of file diff --git a/app/_data/products/event-gateway.yml b/app/_data/products/event-gateway.yml index d5f10d05c9..e2144be1d0 100644 --- a/app/_data/products/event-gateway.yml +++ b/app/_data/products/event-gateway.yml @@ -14,4 +14,10 @@ releases: sunset: "2028-03-25" version: "1.1.0" name: "v1" - latest: true \ No newline at end of file + - release: "1.2" + latest: true + release_date: "TBA" + eol: "TBA" + sunset: "TBA" + version: "1.2.0" + name: "v1" \ No newline at end of file diff --git a/app/_references/event-gateway/1.2/metrics.md b/app/_references/event-gateway/1.2/metrics.md new file mode 100644 index 0000000000..5cfd16618c --- /dev/null +++ b/app/_references/event-gateway/1.2/metrics.md @@ -0,0 +1,643 @@ +--- +# **Auto-generated** - Do not edit manually. See https://github.com/kong-gateway/event-gateway/blob/main/api/metrics.md + +title: "{{site.event_gateway}} metrics" + +description: Reference for all metrics exposed by {{site.event_gateway}}. + +related_resources: + - text: "{{site.event_gateway}}" + url: /event-gateway/ + - text: Set up observability for {{site.event_gateway_short}} + url: /how-to/event-gateway/configure-observability-with-otel/ +--- + + + +## Config + +### `kong.keg.config.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Count of errors when loading the config received from the control plane + +**Labels:** + +No labels documented. + +### `kong.keg.config.loaded` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** The version of the configuration loaded from the control plane + +**Labels:** + +No labels documented. + +## Kafka + +### `kong.keg.kafka.acl.attempts` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Counts the results of every ACL attempt + +**Labels:** + +- `kong.keg.acl.resource_type`: The type of Kafka resource being accessed (Possible values: `transactional_id`, `group`, `topic`, `cluster`) +- `kong.keg.result`: The result of the ACL check (allowed or denied) (Possible values: `allowed`, `denied`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.auth.virtual_cluster.processing.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent validating client credentials against virtual cluster auth rules. Passthrough rules are always considered successful even if the backend cluster rejects the authentication + +**Labels:** + +- `kong.keg.auth.mechanism`: The authentication mechanism used by the client +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.keg.auth.mediation`: How the proxy mediates authentication between the client and the backend cluster. Empty when no rule matched the requested mechanism. (Possible values: `passthrough`, `validate_forward`, `terminate`, ``) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.backend.connection.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of connection errors to the backend cluster + +**Labels:** + +- `kong.keg.connection.error.origin`: The origin of the connection error (Possible values: `io`, `peer`, `local`) + +### `kong.keg.kafka.backend.roundtrip.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent communicating with backend cluster (send request and receive response) + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.connection.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of proxied connections that resulted in an error + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.listener.id`: The Konnect listener identifier +- `kong.konnect.listener.name`: The Konnect listener name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.connections` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** The number of active proxied connections + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.listener.id`: The Konnect listener identifier +- `kong.konnect.listener.name`: The Konnect listener name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.decrypt.attempts` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to decrypt records. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.encrypt.attempts` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to encrypt records. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.kscheme.attempts` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to run kscheme scripts. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.metadata.update.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time to update the metadata from the backend broker + +**Labels:** + +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.namespace.topic.conflict` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Indicates whether the namespace topic mapping encountered conflicts (1) or not (0). + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name + +### `kong.keg.kafka.policy.condition.failures` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of times the policy condition failed to execute due to an error + +**Labels:** + +No labels documented. + +### `kong.keg.kafka.policy.invocation.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time to process a policy + +**Labels:** + +- `kong.keg.failure_mode`: The configured policy failure mode (Possible values: `error`, `reject`, `passthrough`, `mark`, `skip`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.policy.invocations` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of policy invocation for policies. This includes both successful and failed invocations + +**Labels:** + +- `kong.keg.failure_mode`: The configured policy failure mode (Possible values: `error`, `reject`, `passthrough`, `mark`, `skip`) +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.proxy.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The end-to-end time for the entire proxy operation + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.request.processing.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent processing the received request before forwarding it to the backend cluster + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.request.received` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of requests coming from the client + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.request.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of requests sent to the backend broker + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.processing.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent processing the received response before forwarding it to the client + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.received` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of responses received from the backend broker + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.received.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of response messages received from the backend that contain at least one error. The error_code label represents the lowest error in the response + +**Labels:** + +- `kong.keg.kafka.error_code`: The lowest error code in the response +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of responses sent by the proxy to the client + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.sent.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of response messages sent back to the client that contain at least one error. + +**Labels:** + +- `kong.keg.kafka.error_code`: The lowest error code in the response +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.schema.validation.attempts` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to validate schema. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.record.part`: The part of the record (key or value) (Possible values: `key`, `value`) +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.topic_alias.conflict` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Emitted (value=1) when a topic alias shadows a physical topic name. + +**Labels:** + +- `kong.keg.kafka.topic_alias.name`: The alias name that caused the conflict +- `kong.keg.kafka.topic_alias.shadowed_topic`: The physical topic name that is shadowed by the alias +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name + +## Konnect + +### `kong.keg.konnect.analytics.bytes.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Total number of analytics bytes sent in binary websocket messages to the analytics endpoint + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.messages.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Total number of analytics messages sent to the analytics endpoint + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.queue.dropped` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Number of events dropped from the queue because the max queue size was reached + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.queue.events` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Total number of events added to the queue + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.websocket.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Number of times an error occurred on the analytics websocket connection while sending or receiving messages + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.request.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time sending and receiving the response to a request to the upstream broker + +**Labels:** + +- `kong.keg.konnect.api`: The konnect api operation being performed (Possible values: `fetch_config`, `update_dp_state`) +- `error.type`: The error type encountered on executing http request. Absent if a request was successful. (Possible values: `timeout`, `connect`, `unknown`) +- `http.response.status_code`: The status code of an http response. Absent if a request did not succeed. + +## Lifecycle + +### `kong.keg.lifecycle.component.ready` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Is a specific component ready, the global service being ready implies that all its components are ready + +**Labels:** + +- `kong.keg.component`: The component name + +### `kong.keg.lifecycle.service.healthy` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Is the service healthy + +**Labels:** + +No labels documented. + +### `kong.keg.lifecycle.service.ready` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Is the service ready + +**Labels:** + +No labels documented. + +## Listener + +### `kong.keg.listener.connections.limit` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** The number of allowed connections to the listener + +**Labels:** + +No labels documented. + +## Common Labels + +The following labels are commonly used across multiple metrics: + +### Resource Identification + +- `messaging.destination.name`: Kafka topic name +### Konnect Integration + +- `kong.keg.policy.type`: Policy type in Konnect +- `kong.konnect.policy.id`: Policy ID in Konnect +- `kong.konnect.policy.name`: Policy name in Konnect +- `kong.konnect.listener.id`: Listener ID in Konnect +- `kong.konnect.listener.name`: Listener name in Konnect +### Operations + +- `kong.keg.policy.chain_type`: Type of policy chain (e.g., produce, consume) +- `result`: Result of operation (success, fail, allowed, denied) +- `kong.keg.record.part`: Part being processed (key, value) + +- `origin`: Connection error origin (io, peer, local) + +### HTTP/Network + +- `status_code`: HTTP status code +- `error_code`: Kafka error code +- `api_key`: Kafka API key + + \ No newline at end of file From 5826c9d1bc6a7f079af54b299c59462fb029ba39 Mon Sep 17 00:00:00 2001 From: lena-larionova <54370747+lena-larionova@users.noreply.github.com> Date: Thu, 21 May 2026 09:19:31 -0700 Subject: [PATCH 02/10] add headers reference (#5318) --- app/_indices/event-gateway.yaml | 1 + app/_landing_pages/event-gateway.yaml | 16 +++++ app/_references/event-gateway/1.2/headers.md | 63 ++++++++++++++++++++ app/_references/event-gateway/1.2/metrics.md | 2 + 4 files changed, 82 insertions(+) create mode 100644 app/_references/event-gateway/1.2/headers.md diff --git a/app/_indices/event-gateway.yaml b/app/_indices/event-gateway.yaml index ad83777204..0b3a8b78f1 100644 --- a/app/_indices/event-gateway.yaml +++ b/app/_indices/event-gateway.yaml @@ -52,4 +52,5 @@ sections: - path: /event-gateway/configuration/ - path: /event-gateway/expressions/ - path: /event-gateway/metrics/ + - path: /event-gateway/headers/ - path: /event-gateway/known-limitations/ diff --git a/app/_landing_pages/event-gateway.yaml b/app/_landing_pages/event-gateway.yaml index a878dd69fc..2839974ca1 100644 --- a/app/_landing_pages/event-gateway.yaml +++ b/app/_landing_pages/event-gateway.yaml @@ -414,6 +414,22 @@ rows: url: /event-gateway/entities/ align: end + - blocks: + - type: card + config: + title: "Metrics and headers" + description: | + Reference for the metrics and headers added or exposed by {{site.event_gateway}} + icon: /assets/icons/document-list.svg + ctas: + - text: Metrics reference + url: /event-gateway/metrics/ + align: end + - text: Headers reference + url: /event-gateway/headers/ + align: end + + - blocks: - type: card config: diff --git a/app/_references/event-gateway/1.2/headers.md b/app/_references/event-gateway/1.2/headers.md new file mode 100644 index 0000000000..6d94d287cd --- /dev/null +++ b/app/_references/event-gateway/1.2/headers.md @@ -0,0 +1,63 @@ +--- +# **Auto-generated** - Do not edit manually. See https://github.com/kong-gateway/event-gateway/blob/main/api/headers.md + +title: "{{site.event_gateway}} headers" + +description: Reference for all headers added or interpreted by {{site.event_gateway}}. + +related_resources: + - text: "{{site.event_gateway}}" + url: /event-gateway/ + - text: "{{site.event_gateway}} metrics" + url: /event-gateway/metrics/ +--- + +This document lists every Kafka record header {{site.event_gateway}} may add or interpret. + +## enc + +### `kong/enc` + +Encryption metadata: which of record key/value is encrypted and the key id(s) used. Also carries the (deduplicated) list of key ids referenced by per-field encrypted payloads when the `field_encryption` sub-policy is configured (see MADR 046). The `decrypt` and `field_decryption` policies read this to know which key(s) to fetch. + +|Field |Value | +|:------------|:---------| +|Encoding |`binary` (schema: `EncryptionMetadata`)| +|Max size |_unbounded_| +|Producer |encrypt policy / field_encryption sub-policy| +|Consumer |decrypt policy / field_decryption sub-policy| +|Direction |`both`| +|Visibility |`internal`| + +## policy + +### `kong/policy-failure-{konnect_id}` + +Policy failure marker, written when a policy's failure mode is `mark`. The placeholder is the Konnect id of the failing policy; the value is the failure reason text. + +|Field |Value | +|:------------|:---------| +|Encoding |`utf8`| +|Max size |512 bytes| +|Producer |policy framework (mark failure mode)| +|Consumer |external (downstream consumer)| +|Direction |`both`| +|Visibility |`external`| + +## sverr + +### `kong/sverr-{part}` + +{:.info} +> **Deprecated.** Replaced by the unified policy-failure marker header `kong/policy-failure-{konnect_id}`. This header will be removed in the future. + +Legacy schema-validation failure marker. The placeholder is `key` or `value`; the value is the producer's client id. + +|Field |Value | +|:------------|:---------| +|Encoding |`utf8`| +|Max size |_unbounded_| +|Producer |schema_validation policy (mark_record_on_failure)| +|Consumer |external (downstream consumer)| +|Direction |`both`| +|Visibility |`external`| diff --git a/app/_references/event-gateway/1.2/metrics.md b/app/_references/event-gateway/1.2/metrics.md index 5cfd16618c..c7b5e68c25 100644 --- a/app/_references/event-gateway/1.2/metrics.md +++ b/app/_references/event-gateway/1.2/metrics.md @@ -10,6 +10,8 @@ related_resources: url: /event-gateway/ - text: Set up observability for {{site.event_gateway_short}} url: /how-to/event-gateway/configure-observability-with-otel/ + - text: "{{site.event_gateway}} headers" + url: /event-gateway/headers/ --- From e6408c57333bd4a90829a038b008a2ab9df730a8 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Tue, 2 Jun 2026 21:10:54 +0200 Subject: [PATCH 03/10] feat(event-gw): SASL PLAIN fetch Kong Identity principal (#5365) * feat(event-gw): SASL PLAIN fetch Kong Identity principal * move directory to prereq and make a bit more generic Signed-off-by: Diana <75819066+cloudjumpercat@users.noreply.github.com> * add how to to index, fix validate wording Signed-off-by: Diana <75819066+cloudjumpercat@users.noreply.github.com> * Add identity product to how to Signed-off-by: Diana <75819066+cloudjumpercat@users.noreply.github.com> --------- Signed-off-by: Diana <75819066+cloudjumpercat@users.noreply.github.com> Co-authored-by: Diana <75819066+cloudjumpercat@users.noreply.github.com> --- .../modify-headers/index.md | 2 + .../kong-identity-metadata-integration.md | 382 ++++++++++++++++++ app/_indices/event-gateway.yaml | 1 + 3 files changed, 385 insertions(+) create mode 100644 app/_how-tos/event-gateway/kong-identity-metadata-integration.md diff --git a/app/_event_gateway_policies/modify-headers/index.md b/app/_event_gateway_policies/modify-headers/index.md index c677e91688..e883e90bfb 100644 --- a/app/_event_gateway_policies/modify-headers/index.md +++ b/app/_event_gateway_policies/modify-headers/index.md @@ -55,6 +55,8 @@ rows: description: If a record fits a specific condition, add a custom header of your choice. - use_case: "[Tutorial: Filter Kafka records by classification headers](/event-gateway/filter-records-by-classification/)" description: Use a [Schema Validation policy](/event-gateway/policies/schema-validation-produce/) to parse JSON records, and use a nested Modify Headers policy to add a header to specific records. + - use_case: "[Tutorial: Enrich Kafka SASL PLAIN connections with Kong Identity principal metadata](/event-gateway/kong-identity-metadata-integration/)" + description: Look up the SASL-authenticated principal in a Kong Identity directory, and add a header to records based on the principal's metadata. {% endtable %} diff --git a/app/_how-tos/event-gateway/kong-identity-metadata-integration.md b/app/_how-tos/event-gateway/kong-identity-metadata-integration.md new file mode 100644 index 0000000000..946ce665bf --- /dev/null +++ b/app/_how-tos/event-gateway/kong-identity-metadata-integration.md @@ -0,0 +1,382 @@ +--- +title: Enrich Kafka SASL PLAIN connections with Kong Identity principal metadata +content_type: how_to +breadcrumbs: + - /event-gateway/ + +permalink: /event-gateway/kong-identity-metadata-integration/ + +products: + - event-gateway + - identity + +works_on: + - konnect + +tags: + - event-gateway + - kafka + +description: "Look up Kong Identity principal metadata from a SASL-authenticated Kafka connection and use it to drive {{site.event_gateway}} policies." + +tldr: + q: How do I use Kong Identity principal metadata in {{site.event_gateway_short}} policies? + a: | + 1. Create a Kong Identity directory, principal with metadata, and a `custom` identity keyed by the SASL username. + 1. Configure a virtual cluster with `sasl_plain` `passthrough` authentication and `fetch_kong_identity_principal` pointing at the directory. + 1. Create a Modify Headers policy with a condition on `context.auth.principal.metadata`. + 1. Produce and consume a record to see the policy fire. + +tools: + - konnect-api + +prereqs: + skip_product: true + inline: + - title: Install kafkactl + position: before + include_content: knep/kafkactl + - title: Kong Identity directory + include_content: prereqs/kong-identity-directory + icon_url: /assets/icons/kong-identity.svg + +cleanup: + inline: + - title: Clean up {{site.event_gateway}} resources + include_content: cleanup/products/event-gateway + icon_url: /assets/icons/gateway.svg + +related_resources: + - text: Authenticate {{site.event_gateway}} connections to Kafka using SASL/PLAIN + url: /event-gateway/configure-sasl-plain-backend-cluster-auth/ + - text: Modify Headers policy + url: /event-gateway/policies/modify-headers/ + - text: "{{site.event_gateway_short}} expressions language" + url: /event-gateway/expressions/ + - text: Backend clusters + url: /event-gateway/entities/backend-cluster/ + +min_version: + event-gateway: '1.2.0' + +automated_tests: false +--- + +In this guide, you'll authenticate a Kafka client to a SASL-secured broker through {{site.event_gateway_short}}, look up the connecting principal in a Kong Identity directory by its SASL username, and use the principal's metadata to drive a [Modify Headers policy](/event-gateway/policies/modify-headers/). + +{% mermaid %} +flowchart LR + C[Kafka client] + subgraph EG [" {{site.event_gateway_short}} "] + VC[sasl_plain passthrough
virtual cluster] + end + KI[(Kong Identity
directory)] + subgraph K [Kafka cluster] + L["SASL_PLAINTEXT :9082"] + end + C -->|SASL/PLAIN
user=john| VC + VC -.->|lookup by sasl_username| KI + KI -.->|principal metadata
team=operators| VC + VC -->|SASL/PLAIN passthrough| L + VC -->|record with
x-team header| C +{% endmermaid %} + +## Start the secured Kafka cluster + +Create the JAAS configuration file that defines the SASL/PLAIN credentials: + +```bash +cat <<'EOF' > kafka_server_jaas.conf +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="eventgateway" + password="eventgateway-secret" + user_eventgateway="eventgateway-secret" + user_john="john-secret"; +}; +EOF +``` + +The broker accepts two SASL/PLAIN users: `eventgateway` (used by {{site.event_gateway_short}} itself for broker discovery) and `john` (used by the Kafka client and matched against Kong Identity). + +Create the Docker Compose file: + +```bash +cat <<'EOF' > docker-compose.yaml +{% include_cached _files/event-gateway/docker-compose-sasl.yaml %} +EOF +``` + +The broker exposes a `SASL_PLAINTEXT` listener on port `9082` in the Docker network for {{site.event_gateway_short}} connections, and a `PLAINTEXT` listener on ports `9094`/`9095`/`9096` for direct local access. + +Start the cluster: + +```bash +docker compose up -d +``` + +## Create an {{site.event_gateway_short}} control plane and data plane + +Run the [quickstart script](https://get.konghq.com/event-gateway) to provision a local data plane and configure your environment: + +```bash +curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway +``` + +Copy the exported variable into your terminal: + +```bash +export EVENT_GATEWAY_ID=your-gateway-id +``` + +{% include_cached /knep/quickstart-note.md %} + +## Create a principal with team metadata + +Create a principal in the directory and attach the `team` metadata. The Modify Headers policy will read this value at request time: + + +{% konnect_api_request %} +url: /v2/directories/$DIRECTORY_ID/principals +status_code: 201 +method: POST +body: + display_name: john + description: Principal that maps to the john SASL user + metadata: + team: operators +extract_body: + - name: id + variable: PRINCIPAL_ID +capture: + - variable: PRINCIPAL_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Create a custom identity for the SASL username + +Create a `custom` identity that links the principal to the SASL username sent by the Kafka client. {{site.event_gateway_short}} will match the connecting username against the `sasl_username` key: + + +{% konnect_api_request %} +url: /v2/directories/$DIRECTORY_ID/principals/$PRINCIPAL_ID/identities +status_code: 201 +method: POST +body: + type: custom + key: sasl_username + value: john +{% endkonnect_api_request %} + + +## Create the backend cluster + +Create a [backend cluster](/event-gateway/entities/backend-cluster/) configured with the `eventgateway` SASL/PLAIN user. {{site.event_gateway_short}} uses these credentials for its own connection to the broker. Client connections pass through this configuration unchanged because of the virtual cluster's `passthrough` mediation, which you'll configure in the next step: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters +status_code: 201 +method: POST +body: + name: backend_cluster + bootstrap_servers: + - kafka1:9082 + - kafka2:9082 + - kafka3:9082 + authentication: + type: sasl_plain + username: eventgateway + password: eventgateway-secret + tls: + enabled: false +extract_body: + - name: id + variable: BACKEND_CLUSTER_ID +capture: + - variable: BACKEND_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Create a virtual cluster + +Create a [virtual cluster](/event-gateway/entities/virtual-cluster/) that accepts SASL/PLAIN connections, forwards them unchanged to the broker, and asks {{site.event_gateway_short}} to fetch the principal from the Kong Identity directory by matching the SASL username against the `sasl_username` key: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: identity_vc + destination: + id: $BACKEND_CLUSTER_ID + dns_label: identity-vc + acl_mode: passthrough + authentication: + - type: sasl_plain + mediation: passthrough + fetch_kong_identity_principal: + directory: kong-identity-directory + fetch_by: + key: sasl_username + failure_mode: error +extract_body: + - name: id + variable: VIRTUAL_CLUSTER_ID +capture: + - variable: VIRTUAL_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `fetch_kong_identity_principal` block tells {{site.event_gateway_short}} to use the SASL username (in this case, `john`) as the lookup value against identities of key `sasl_username` in the directory. When a match is found, the parent principal's metadata is attached to `context.auth.principal.metadata` for the lifetime of the connection. + +## Create a listener + +Run the following command to create a new [listener](/event-gateway/entities/listener/): + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners +status_code: 201 +method: POST +body: + name: identity_listener + addresses: + - 0.0.0.0 + ports: + - 19092-19095 +extract_body: + - name: id + variable: LISTENER_ID +capture: + - variable: LISTENER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Create a listener policy + +Add a [Forward to Virtual Cluster](/event-gateway/policies/forward-to-virtual-cluster/) policy that routes the listener to the virtual cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies +status_code: 201 +method: POST +body: + type: forward_to_virtual_cluster + name: forward_to_identity_vc + config: + type: port_mapping + advertised_host: localhost + destination: + id: $VIRTUAL_CLUSTER_ID +{% endkonnect_api_request %} + + +## Create the Modify Headers policy + +Add a [Modify Headers](/event-gateway/policies/modify-headers/) policy that sets the `x-team` header on consumed records only when the principal's `team` metadata equals `operators`: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + type: modify_headers + name: tag-operators-team + condition: context.auth.principal.metadata.team == "operators" + config: + actions: + - op: set + key: x-team + value: operators +{% endkonnect_api_request %} + + +## Configure kafkactl + +Create a `kafkactl.yaml` config file with a `direct` context that talks to the broker's PLAINTEXT listener, and a `vc` context that connects to the virtual cluster using SASL/PLAIN: + + +{% validation custom-command %} +command: | + cat < kafkactl.yaml + contexts: + direct: + brokers: + - localhost:9094 + - localhost:9095 + - localhost:9096 + vc: + brokers: + - localhost:19092 + sasl: + enabled: true + username: john + password: john-secret + EOF +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +## Create a topic + +Create the `orders` topic using the `direct` context: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct create topic orders +expected: + return_code: 0 + message: "topic created: orders" +render_output: false +{% endvalidation %} + + +## Validate + +Produce a record through the virtual cluster: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc produce orders --value="test-message" +expected: + return_code: 0 + message: "message produced (partition=0 offset=0)" +render_output: false +{% endvalidation %} + + +Consume the record back through the virtual cluster with `--print-headers` so you can see the header added by the Modify Headers policy: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc consume orders --print-headers --from-beginning --exit +expected: + return_code: 0 + message: "x-team:operators#test-message" +render_output: false +{% endvalidation %} + + +The output should contain the `x-team` header: + +```shell +x-team:operators#test-message +``` +{:.no-copy-code} + +{{site.event_gateway_short}} authenticated the client with the broker by passing the SASL/PLAIN credentials straight through, looked up the `john` SASL username in the Kong Identity directory, attached the principal's metadata to the connection, and applied the Modify Headers policy because `context.auth.principal.metadata.team` was `operators`. + +The same principal lookup strategy can be used with all other authentication methods (SASL/SCRAM, SASL/OAUTHBEARER, client certificates). diff --git a/app/_indices/event-gateway.yaml b/app/_indices/event-gateway.yaml index 0b3a8b78f1..c2bb6996a8 100644 --- a/app/_indices/event-gateway.yaml +++ b/app/_indices/event-gateway.yaml @@ -44,6 +44,7 @@ sections: - path: /event-gateway/configure-sasl-plain-backend-cluster-auth/ - path: /event-gateway/configure-mtls-backend-cluster-auth/ - path: /event-gateway/validate-avro-messages-with-schema-registry/ + - path: /event-gateway/kong-identity-metadata-integration/ - title: "References" items: - title: Event Gateway OpenAPI specification From 8db1d34349bd706cc7e38107152a7e3af2d2a9bb Mon Sep 17 00:00:00 2001 From: Joshua Schmid Date: Mon, 8 Jun 2026 19:03:37 +0200 Subject: [PATCH 04/10] feat(keg): topic aliases guide (#5109) --- .github/styles/base/Dictionary.txt | 1 + .../event-gateway/configure-topic-aliases.md | 251 ++++++++++++++++++ app/_indices/event-gateway.yaml | 3 +- app/event-gateway/entities/virtual-cluster.md | 107 ++++++++ app/event-gateway/expressions.md | 20 ++ 5 files changed, 381 insertions(+), 1 deletion(-) create mode 100644 app/_how-tos/event-gateway/configure-topic-aliases.md diff --git a/.github/styles/base/Dictionary.txt b/.github/styles/base/Dictionary.txt index 095289ff87..c4b6a19081 100644 --- a/.github/styles/base/Dictionary.txt +++ b/.github/styles/base/Dictionary.txt @@ -810,6 +810,7 @@ udpingress udpingresses UDPRoute UDPRoutes +uint uid UIs ulimit diff --git a/app/_how-tos/event-gateway/configure-topic-aliases.md b/app/_how-tos/event-gateway/configure-topic-aliases.md new file mode 100644 index 0000000000..7ded10fdc9 --- /dev/null +++ b/app/_how-tos/event-gateway/configure-topic-aliases.md @@ -0,0 +1,251 @@ +--- +title: Configure topic aliases with {{site.event_gateway}} +content_type: how_to +breadcrumbs: + - /event-gateway/ +min_version: + event-gateway: '1.2.0' +permalink: /event-gateway/configure-topic-aliases/ + +products: + - event-gateway + +works_on: + - konnect + +tags: + - event-gateway + - kafka + +description: "Use topic aliases to expose backend Kafka topics under user-friendly names." + +tldr: + q: How can I expose Kafka topics under different names? + a: | + Use topic aliases on a virtual cluster to map client-visible names to backend topic names: + 1. Create a backend cluster connected to your Kafka brokers. + 1. Create a virtual cluster with `topic_aliases` that map friendly names to backend topics. + 1. Create a listener to route traffic to the virtual cluster. + 1. Clients produce and consume using the alias names. + +tools: + - konnect-api + +prereqs: + inline: + - title: Install kafkactl + position: before + include_content: knep/kafkactl + - title: Start a local Kafka cluster + position: before + include_content: knep/docker-compose-start + - title: Configure a kafkactl context + position: before + include_content: knep/kafka-context + +related_resources: + - text: Productize Kafka topics with {{site.event_gateway}} + url: /event-gateway/productize-kafka-topics/ + - text: "{{site.event_gateway_short}} Control Plane API" + url: /api/konnect/event-gateway/ +--- + +## Overview + +Topic aliases let you expose backend Kafka topics under different, client-friendly names. +This is useful when backend topics follow internal naming conventions (like `team-alpha-orders-v2`) that you don't want to expose to clients. + +{% mermaid %} +flowchart LR + A[Kafka client] -->|produces/consumes to 'orders'| B[{{site.event_gateway_short}}] + B -->|resolves to 'team-alpha-orders-v2'| C[Kafka cluster] +{% endmermaid %} + +### Supported operations + +Aliases are a read-only abstraction over physical topics: + +* **Allowed**: produce, fetch, list offsets, consumer group operations, and metadata (`ListTopics`). Both the alias and the original backend topic name appear in metadata responses. +* **Rejected with `InvalidTopicException`**: `CreateTopics`, `DeleteTopics`, `CreatePartitions`, `DeleteRecords`, `AlterPartitionReassignments`, and `ElectLeaders`. Modifying a physical topic through an alias would be surprising because other aliases or clients may also depend on it. + +ACLs are evaluated on the name the client uses, before alias resolution. An ACL on the backend topic does not automatically grant access to its aliases, and vice versa. Each alias name must be granted access explicitly. With `acl_mode: enforce_on_gateway`, a new alias with no matching ACL is blocked by default. + +## Create Kafka topics + +Create the backend topics that we'll expose through aliases: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct create topic \ + team-alpha-orders-v2 analytics-raw-clicks +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +## Create a backend cluster + +{% include knep/create-backend-cluster.md insecure=true %} + +## Create a virtual cluster with topic aliases + +Create a virtual cluster that maps friendly alias names to the backend topics: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: aliased_cluster + dns_label: aliased + destination: + id: $BACKEND_CLUSTER_ID + authentication: + - type: anonymous + acl_mode: passthrough + topic_aliases: + - alias: orders + topic: team-alpha-orders-v2 + - alias: clicks + topic: analytics-raw-clicks +extract_body: + - name: id + variable: VC_ID +capture: + - variable: VC_ID + jq: ".id" +{% endkonnect_api_request %} + + +Clients connecting to this virtual cluster will see `orders` and `clicks` as topic names. The original backend names (`team-alpha-orders-v2`, `analytics-raw-clicks`) also remain accessible. + +## Create a listener + +Create a listener with a port forwarding policy to route traffic to the virtual cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners +status_code: 201 +method: POST +body: + name: alias_listener + addresses: + - 0.0.0.0 + ports: + - 19092-19095 +extract_body: + - name: id + variable: LISTENER_ID +capture: + - variable: LISTENER_ID + jq: ".id" +{% endkonnect_api_request %} + + +Create the port mapping policy: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies +status_code: 201 +method: POST +body: + type: forward_to_virtual_cluster + name: forward_to_aliased_cluster + config: + type: port_mapping + advertised_host: localhost + destination: + id: $VC_ID +{% endkonnect_api_request %} + + +## Add a virtual cluster context to kafkactl + +Extend `kafkactl.yaml` with a `vc` context that points at the listener you just created: + + +{% validation custom-command %} +command: | + cat < kafkactl.yaml + contexts: + direct: + brokers: + - localhost:9095 + - localhost:9096 + - localhost:9094 + vc: + brokers: + - localhost:19092 + EOF +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +## Validate + +### List topics through the virtual cluster + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc list topics +expected: + return_code: 0 + message: orders +render_output: false +{% endvalidation %} + +You should see both the aliases and the original backend topic names: + +```sh +TOPIC PARTITIONS REPLICATION FACTOR +analytics-raw-clicks 1 1 +clicks 1 1 +orders 1 1 +team-alpha-orders-v2 1 1 +``` +{:.no-copy-code} + + +### Produce via alias, consume from backend + +Produce a message using the alias name `orders`: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc produce orders --value='{"id": 123, "item": "widget"}' +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +Consume from the backend topic `team-alpha-orders-v2` directly to verify the message arrived: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct consume team-alpha-orders-v2 --from-beginning --exit +expected: + return_code: 0 + message: '{"id": 123, "item": "widget"}' +render_output: false +{% endvalidation %} + +You should see: + +```sh +{"id": 123, "item": "widget"} +``` +{:.no-copy-code} + + +The message produced to the alias `orders` is stored in the backend topic `team-alpha-orders-v2`. diff --git a/app/_indices/event-gateway.yaml b/app/_indices/event-gateway.yaml index c2bb6996a8..901406bd98 100644 --- a/app/_indices/event-gateway.yaml +++ b/app/_indices/event-gateway.yaml @@ -17,7 +17,7 @@ sections: items: - title: Get started with Event Gateway description: | - Get started with {{site.event_gateway}} by setting up a demo {{site.konnect_short_name}} control plane and data plane, + Get started with {{site.event_gateway}} by setting up a demo {{site.konnect_short_name}} control plane and data plane, then configuring a backend cluster, virtual cluster, listener, and policies with the {{site.event_gateway}} API. url: /event-gateway/get-started/ - title: "Event Gateway Entities" @@ -45,6 +45,7 @@ sections: - path: /event-gateway/configure-mtls-backend-cluster-auth/ - path: /event-gateway/validate-avro-messages-with-schema-registry/ - path: /event-gateway/kong-identity-metadata-integration/ + - path: /event-gateway/configure-topic-aliases/ - title: "References" items: - title: Event Gateway OpenAPI specification diff --git a/app/event-gateway/entities/virtual-cluster.md b/app/event-gateway/entities/virtual-cluster.md index 6ecd7d86c2..765058c093 100644 --- a/app/event-gateway/entities/virtual-cluster.md +++ b/app/event-gateway/entities/virtual-cluster.md @@ -14,6 +14,8 @@ related_resources: url: /event-gateway/entities/backend-cluster/ - text: "Listeners" url: /event-gateway/entities/listener/ + - text: "How-to: Configure topic aliases" + url: /event-gateway/configure-topic-aliases/ tools: - konnect-api @@ -318,6 +320,111 @@ You can also pass an exact list of consumer groups as an array: ] ``` +## Topic aliases {% new_in 1.2 %} + +Topic aliases let you expose a backend Kafka topic under a different, client-facing name. +Clients connecting to the virtual cluster see the alias, while {{site.event_gateway_short}} transparently routes requests to the original backend topic. + +This is useful when you want to: +* Expose internally-named topics (like `team-alpha-orders-v2`) under client-friendly names (like `orders`) +* Migrate clients to renamed backend topics without updating client configuration +* Provide multiple client-facing names for the same backend topic + +Unlike [namespaces](#namespaces), which apply a prefix to every topic and consumer group, topic aliases are explicit one-to-one mappings between an alias and a backend topic. +Both the alias and the original backend topic name remain accessible through the virtual cluster. + +### Map a friendly name to a backend topic + +Define topic aliases on the virtual cluster using the `topic_aliases` field: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: example-virtual-cluster + destination: + name: example-backend-cluster + authentication: + - type: anonymous + dns_label: virtual-cluster-1 + acl_mode: passthrough + topic_aliases: + - alias: orders + topic: team-alpha-orders-v2 + - alias: clicks + topic: analytics-raw-clicks +{% endkonnect_api_request %} + + +In this example, clients can produce and consume to `orders` and `clicks`. {{site.event_gateway_short}} resolves these to the backend topics `team-alpha-orders-v2` and `analytics-raw-clicks`. + +### Supported operations + +Aliases are a read-only abstraction over physical topics. Read and write operations are forwarded to the backend topic, but topic-modifying operations are rejected. + + +{% table %} +columns: + - title: Operation + key: operation + - title: Behavior + key: behavior +rows: + - operation: "Produce, fetch, list offsets, consumer group operations" + behavior: "Allowed. Requests reference the alias name and are transparently resolved to the backend topic." + - operation: "Metadata (`ListTopics`)" + behavior: "Allowed. Both the alias and the original backend topic name appear in the response." + - operation: "`CreateTopics`, `DeleteTopics`, `CreatePartitions`, `DeleteRecords`, `AlterPartitionReassignments`, `ElectLeaders`" + behavior: "Rejected with `InvalidTopicException` when the request references an alias. This avoids a client unintentionally modifying a physical topic that other aliases or clients depend on." +{% endtable %} + + +[ACLs](/event-gateway/policies/acl/) are evaluated on the name the client uses, before alias resolution. An ACL on the backend topic does not automatically grant access to its aliases, and vice versa. Operators must configure ACLs for each alias name explicitly. With `acl_mode: enforce_on_gateway` (deny-by-default), a new alias with no matching ACL is blocked. + +### Interaction with namespaces + +The `topic` field in `topic_aliases` references namespace-visible names, not raw backend topic names. In the request path, aliases resolve *after* policies but *before* the [namespace](#namespaces) transformer: + +``` +client → policies → alias resolution → namespace → backend +``` +{:.no-copy-code} + +If the virtual cluster has a namespace, the alias must target a name that the namespace already exposes. For example, with `namespace.mode: hide_prefix` and `prefix: analytics_`, `topic: foo` resolves to the backend topic `analytics_foo`. + +A backend topic that the namespace doesn't expose (not matching the prefix and not listed in `additional_topics`) can't be aliased directly. Expose it through the namespace first: + +```yaml +namespace: + mode: hide_prefix + prefix: analytics_ + additional_topics: + - type: exact_list + list: + - team-alpha-orders-v2 +topic_aliases: + - alias: orders + topic: team-alpha-orders-v2 # valid because additional_topics exposes it +``` + +### Policy context + +[Policies](/event-gateway/entities/policy/) attached to a virtual cluster see the alias name, not the backend topic name. In a [CEL match expression](/event-gateway/expressions/), `topic.name` evaluates to the alias name. Policies that match on a backend topic name don't automatically apply to its aliases. Match the alias name explicitly: + +```yaml +condition: topic.name == "orders" +``` + +### Consumer groups + +Avoid consuming from both an alias and its backend topic with the same consumer group. +{{site.event_gateway_short}} treats the alias and the backend topic as distinct topic names that share the same underlying partitions. Mixing them in one consumer group leaves offset commits and partition assignment undefined. +Pick one name per consumer group and stick with it. + +For a full walkthrough, see [Configure topic aliases](/event-gateway/configure-topic-aliases/). + ## Virtual cluster policies Virtual clusters can be modified by policies, which let you do things like modify headers, encrypt and decrypt records, validate record schemas, and much more. diff --git a/app/event-gateway/expressions.md b/app/event-gateway/expressions.md index baa1624454..aac4915a9b 100644 --- a/app/event-gateway/expressions.md +++ b/app/event-gateway/expressions.md @@ -156,6 +156,26 @@ rows: * `condition` field in Produce and Consume policies used as children of Schema Validation example: | `record.value.validated == true` + - variable: | + `record.value.schema` {% new_in 1.2 %} + type: "`object`" + description: | + Registry-supplied schema metadata. Populated when the value was validated by a Confluent schema registry. Fields: + `id` (uint), `version` (uint, when returned by the registry), `format` (`"avro"` or `"json"`), and either + `avro.{name, namespace}` or `json.{title, id}`. Sub-fields are absent when not applicable; use `has()` to test presence. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.format == 'avro' && record.value.schema.avro.name == 'User'` + - variable: | + `record.key.schema` {% new_in 1.2 %} + type: "`object`" + description: | + Same shape as `record.value.schema`, populated when the record key has `schema_validation` configured. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.key.schema.format == 'json' && record.key.schema.json.title == 'UserKey'` {% endtable %} ### Example expressions From 6cbd79dbe0bee1f72e0dbe6b4c75f00367f8af54 Mon Sep 17 00:00:00 2001 From: Joshua Schmid Date: Tue, 9 Jun 2026 17:43:02 +0200 Subject: [PATCH 05/10] keg: add record schema context (#5502) --- app/event-gateway/expressions.md | 71 ++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/app/event-gateway/expressions.md b/app/event-gateway/expressions.md index aac4915a9b..1bbfc7074f 100644 --- a/app/event-gateway/expressions.md +++ b/app/event-gateway/expressions.md @@ -157,21 +157,74 @@ rows: example: | `record.value.validated == true` - variable: | - `record.value.schema` {% new_in 1.2 %} - type: "`object`" + `record.value.schema.id` {% new_in 1.2 %} + type: "`uint`" description: | - Registry-supplied schema metadata. Populated when the value was validated by a Confluent schema registry. Fields: - `id` (uint), `version` (uint, when returned by the registry), `format` (`"avro"` or `"json"`), and either - `avro.{name, namespace}` or `json.{title, id}`. Sub-fields are absent when not applicable; use `has()` to test presence. + Registry-assigned schema ID. Populated when the record value is validated by a Confluent schema registry. availability: | * `condition` field in Produce and Consume policies used as children of Schema Validation example: | - `record.value.schema.format == 'avro' && record.value.schema.avro.name == 'User'` + `record.value.schema.id == 42` - variable: | - `record.key.schema` {% new_in 1.2 %} - type: "`object`" + `record.value.schema.version` {% new_in 1.2 %} + type: "`uint`" description: | - Same shape as `record.value.schema`, populated when the record key has `schema_validation` configured. + Registry-assigned schema version, when returned by the registry. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.version == 3` + - variable: | + `record.value.schema.format` {% new_in 1.2 %} + type: "`string`" + description: | + `"avro"` or `"json"`. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.format == 'avro'` + - variable: | + `record.value.schema.avro.name` {% new_in 1.2 %} + type: "`string`" + description: | + Avro record name (Avro only). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.avro.name == 'User'` + - variable: | + `record.value.schema.avro.namespace` {% new_in 1.2 %} + type: "`string`" + description: | + Avro record namespace (Avro only, when declared on the schema). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.avro.namespace == 'com.example'` + - variable: | + `record.value.schema.json.title` {% new_in 1.2 %} + type: "`string`" + description: | + JSON Schema `title` (JSON only, when declared on the schema). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.json.title == 'Order'` + - variable: | + `record.value.schema.json.id` {% new_in 1.2 %} + type: "`string`" + description: | + JSON Schema `$id` (JSON only, when declared on the schema). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.json.id == 'https://example.com/schemas/order.json'` + - variable: | + `record.key.schema.*` {% new_in 1.2 %} + type: | + same as `record.value.schema.*` + description: | + Same shape as `record.value.schema.*`, populated when the record key has `schema_validation` configured. Sub-fields are absent when not applicable; use `has()` to test presence. availability: | * `condition` field in Produce and Consume policies used as children of Schema Validation example: | From 4fa184ed31de56392fa3f99035462ed14829cbec Mon Sep 17 00:00:00 2001 From: Diana <75819066+cloudjumpercat@users.noreply.github.com> Date: Fri, 12 Jun 2026 10:06:14 -0500 Subject: [PATCH 06/10] Revert "feat(event-gw): SASL PLAIN fetch Kong Identity principal (#5365)" This reverts commit 19ca14964c6957f351de710ff9553393dc6fc44e. --- .../modify-headers/index.md | 2 - .../kong-identity-metadata-integration.md | 382 ------------------ app/_indices/event-gateway.yaml | 1 - 3 files changed, 385 deletions(-) delete mode 100644 app/_how-tos/event-gateway/kong-identity-metadata-integration.md diff --git a/app/_event_gateway_policies/modify-headers/index.md b/app/_event_gateway_policies/modify-headers/index.md index e883e90bfb..c677e91688 100644 --- a/app/_event_gateway_policies/modify-headers/index.md +++ b/app/_event_gateway_policies/modify-headers/index.md @@ -55,8 +55,6 @@ rows: description: If a record fits a specific condition, add a custom header of your choice. - use_case: "[Tutorial: Filter Kafka records by classification headers](/event-gateway/filter-records-by-classification/)" description: Use a [Schema Validation policy](/event-gateway/policies/schema-validation-produce/) to parse JSON records, and use a nested Modify Headers policy to add a header to specific records. - - use_case: "[Tutorial: Enrich Kafka SASL PLAIN connections with Kong Identity principal metadata](/event-gateway/kong-identity-metadata-integration/)" - description: Look up the SASL-authenticated principal in a Kong Identity directory, and add a header to records based on the principal's metadata. {% endtable %} diff --git a/app/_how-tos/event-gateway/kong-identity-metadata-integration.md b/app/_how-tos/event-gateway/kong-identity-metadata-integration.md deleted file mode 100644 index 946ce665bf..0000000000 --- a/app/_how-tos/event-gateway/kong-identity-metadata-integration.md +++ /dev/null @@ -1,382 +0,0 @@ ---- -title: Enrich Kafka SASL PLAIN connections with Kong Identity principal metadata -content_type: how_to -breadcrumbs: - - /event-gateway/ - -permalink: /event-gateway/kong-identity-metadata-integration/ - -products: - - event-gateway - - identity - -works_on: - - konnect - -tags: - - event-gateway - - kafka - -description: "Look up Kong Identity principal metadata from a SASL-authenticated Kafka connection and use it to drive {{site.event_gateway}} policies." - -tldr: - q: How do I use Kong Identity principal metadata in {{site.event_gateway_short}} policies? - a: | - 1. Create a Kong Identity directory, principal with metadata, and a `custom` identity keyed by the SASL username. - 1. Configure a virtual cluster with `sasl_plain` `passthrough` authentication and `fetch_kong_identity_principal` pointing at the directory. - 1. Create a Modify Headers policy with a condition on `context.auth.principal.metadata`. - 1. Produce and consume a record to see the policy fire. - -tools: - - konnect-api - -prereqs: - skip_product: true - inline: - - title: Install kafkactl - position: before - include_content: knep/kafkactl - - title: Kong Identity directory - include_content: prereqs/kong-identity-directory - icon_url: /assets/icons/kong-identity.svg - -cleanup: - inline: - - title: Clean up {{site.event_gateway}} resources - include_content: cleanup/products/event-gateway - icon_url: /assets/icons/gateway.svg - -related_resources: - - text: Authenticate {{site.event_gateway}} connections to Kafka using SASL/PLAIN - url: /event-gateway/configure-sasl-plain-backend-cluster-auth/ - - text: Modify Headers policy - url: /event-gateway/policies/modify-headers/ - - text: "{{site.event_gateway_short}} expressions language" - url: /event-gateway/expressions/ - - text: Backend clusters - url: /event-gateway/entities/backend-cluster/ - -min_version: - event-gateway: '1.2.0' - -automated_tests: false ---- - -In this guide, you'll authenticate a Kafka client to a SASL-secured broker through {{site.event_gateway_short}}, look up the connecting principal in a Kong Identity directory by its SASL username, and use the principal's metadata to drive a [Modify Headers policy](/event-gateway/policies/modify-headers/). - -{% mermaid %} -flowchart LR - C[Kafka client] - subgraph EG [" {{site.event_gateway_short}} "] - VC[sasl_plain passthrough
virtual cluster] - end - KI[(Kong Identity
directory)] - subgraph K [Kafka cluster] - L["SASL_PLAINTEXT :9082"] - end - C -->|SASL/PLAIN
user=john| VC - VC -.->|lookup by sasl_username| KI - KI -.->|principal metadata
team=operators| VC - VC -->|SASL/PLAIN passthrough| L - VC -->|record with
x-team header| C -{% endmermaid %} - -## Start the secured Kafka cluster - -Create the JAAS configuration file that defines the SASL/PLAIN credentials: - -```bash -cat <<'EOF' > kafka_server_jaas.conf -KafkaServer { - org.apache.kafka.common.security.plain.PlainLoginModule required - username="eventgateway" - password="eventgateway-secret" - user_eventgateway="eventgateway-secret" - user_john="john-secret"; -}; -EOF -``` - -The broker accepts two SASL/PLAIN users: `eventgateway` (used by {{site.event_gateway_short}} itself for broker discovery) and `john` (used by the Kafka client and matched against Kong Identity). - -Create the Docker Compose file: - -```bash -cat <<'EOF' > docker-compose.yaml -{% include_cached _files/event-gateway/docker-compose-sasl.yaml %} -EOF -``` - -The broker exposes a `SASL_PLAINTEXT` listener on port `9082` in the Docker network for {{site.event_gateway_short}} connections, and a `PLAINTEXT` listener on ports `9094`/`9095`/`9096` for direct local access. - -Start the cluster: - -```bash -docker compose up -d -``` - -## Create an {{site.event_gateway_short}} control plane and data plane - -Run the [quickstart script](https://get.konghq.com/event-gateway) to provision a local data plane and configure your environment: - -```bash -curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway -``` - -Copy the exported variable into your terminal: - -```bash -export EVENT_GATEWAY_ID=your-gateway-id -``` - -{% include_cached /knep/quickstart-note.md %} - -## Create a principal with team metadata - -Create a principal in the directory and attach the `team` metadata. The Modify Headers policy will read this value at request time: - - -{% konnect_api_request %} -url: /v2/directories/$DIRECTORY_ID/principals -status_code: 201 -method: POST -body: - display_name: john - description: Principal that maps to the john SASL user - metadata: - team: operators -extract_body: - - name: id - variable: PRINCIPAL_ID -capture: - - variable: PRINCIPAL_ID - jq: ".id" -{% endkonnect_api_request %} - - -## Create a custom identity for the SASL username - -Create a `custom` identity that links the principal to the SASL username sent by the Kafka client. {{site.event_gateway_short}} will match the connecting username against the `sasl_username` key: - - -{% konnect_api_request %} -url: /v2/directories/$DIRECTORY_ID/principals/$PRINCIPAL_ID/identities -status_code: 201 -method: POST -body: - type: custom - key: sasl_username - value: john -{% endkonnect_api_request %} - - -## Create the backend cluster - -Create a [backend cluster](/event-gateway/entities/backend-cluster/) configured with the `eventgateway` SASL/PLAIN user. {{site.event_gateway_short}} uses these credentials for its own connection to the broker. Client connections pass through this configuration unchanged because of the virtual cluster's `passthrough` mediation, which you'll configure in the next step: - - -{% konnect_api_request %} -url: /v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters -status_code: 201 -method: POST -body: - name: backend_cluster - bootstrap_servers: - - kafka1:9082 - - kafka2:9082 - - kafka3:9082 - authentication: - type: sasl_plain - username: eventgateway - password: eventgateway-secret - tls: - enabled: false -extract_body: - - name: id - variable: BACKEND_CLUSTER_ID -capture: - - variable: BACKEND_CLUSTER_ID - jq: ".id" -{% endkonnect_api_request %} - - -## Create a virtual cluster - -Create a [virtual cluster](/event-gateway/entities/virtual-cluster/) that accepts SASL/PLAIN connections, forwards them unchanged to the broker, and asks {{site.event_gateway_short}} to fetch the principal from the Kong Identity directory by matching the SASL username against the `sasl_username` key: - - -{% konnect_api_request %} -url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters -status_code: 201 -method: POST -body: - name: identity_vc - destination: - id: $BACKEND_CLUSTER_ID - dns_label: identity-vc - acl_mode: passthrough - authentication: - - type: sasl_plain - mediation: passthrough - fetch_kong_identity_principal: - directory: kong-identity-directory - fetch_by: - key: sasl_username - failure_mode: error -extract_body: - - name: id - variable: VIRTUAL_CLUSTER_ID -capture: - - variable: VIRTUAL_CLUSTER_ID - jq: ".id" -{% endkonnect_api_request %} - - -The `fetch_kong_identity_principal` block tells {{site.event_gateway_short}} to use the SASL username (in this case, `john`) as the lookup value against identities of key `sasl_username` in the directory. When a match is found, the parent principal's metadata is attached to `context.auth.principal.metadata` for the lifetime of the connection. - -## Create a listener - -Run the following command to create a new [listener](/event-gateway/entities/listener/): - - -{% konnect_api_request %} -url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners -status_code: 201 -method: POST -body: - name: identity_listener - addresses: - - 0.0.0.0 - ports: - - 19092-19095 -extract_body: - - name: id - variable: LISTENER_ID -capture: - - variable: LISTENER_ID - jq: ".id" -{% endkonnect_api_request %} - - -## Create a listener policy - -Add a [Forward to Virtual Cluster](/event-gateway/policies/forward-to-virtual-cluster/) policy that routes the listener to the virtual cluster: - - -{% konnect_api_request %} -url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies -status_code: 201 -method: POST -body: - type: forward_to_virtual_cluster - name: forward_to_identity_vc - config: - type: port_mapping - advertised_host: localhost - destination: - id: $VIRTUAL_CLUSTER_ID -{% endkonnect_api_request %} - - -## Create the Modify Headers policy - -Add a [Modify Headers](/event-gateway/policies/modify-headers/) policy that sets the `x-team` header on consumed records only when the principal's `team` metadata equals `operators`: - - -{% konnect_api_request %} -url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies -status_code: 201 -method: POST -body: - type: modify_headers - name: tag-operators-team - condition: context.auth.principal.metadata.team == "operators" - config: - actions: - - op: set - key: x-team - value: operators -{% endkonnect_api_request %} - - -## Configure kafkactl - -Create a `kafkactl.yaml` config file with a `direct` context that talks to the broker's PLAINTEXT listener, and a `vc` context that connects to the virtual cluster using SASL/PLAIN: - - -{% validation custom-command %} -command: | - cat < kafkactl.yaml - contexts: - direct: - brokers: - - localhost:9094 - - localhost:9095 - - localhost:9096 - vc: - brokers: - - localhost:19092 - sasl: - enabled: true - username: john - password: john-secret - EOF -expected: - return_code: 0 -render_output: false -{% endvalidation %} - - -## Create a topic - -Create the `orders` topic using the `direct` context: - - -{% validation custom-command %} -command: | - kafkactl -C kafkactl.yaml --context direct create topic orders -expected: - return_code: 0 - message: "topic created: orders" -render_output: false -{% endvalidation %} - - -## Validate - -Produce a record through the virtual cluster: - - -{% validation custom-command %} -command: | - kafkactl -C kafkactl.yaml --context vc produce orders --value="test-message" -expected: - return_code: 0 - message: "message produced (partition=0 offset=0)" -render_output: false -{% endvalidation %} - - -Consume the record back through the virtual cluster with `--print-headers` so you can see the header added by the Modify Headers policy: - - -{% validation custom-command %} -command: | - kafkactl -C kafkactl.yaml --context vc consume orders --print-headers --from-beginning --exit -expected: - return_code: 0 - message: "x-team:operators#test-message" -render_output: false -{% endvalidation %} - - -The output should contain the `x-team` header: - -```shell -x-team:operators#test-message -``` -{:.no-copy-code} - -{{site.event_gateway_short}} authenticated the client with the broker by passing the SASL/PLAIN credentials straight through, looked up the `john` SASL username in the Kong Identity directory, attached the principal's metadata to the connection, and applied the Modify Headers policy because `context.auth.principal.metadata.team` was `operators`. - -The same principal lookup strategy can be used with all other authentication methods (SASL/SCRAM, SASL/OAUTHBEARER, client certificates). diff --git a/app/_indices/event-gateway.yaml b/app/_indices/event-gateway.yaml index 901406bd98..64b6abd521 100644 --- a/app/_indices/event-gateway.yaml +++ b/app/_indices/event-gateway.yaml @@ -44,7 +44,6 @@ sections: - path: /event-gateway/configure-sasl-plain-backend-cluster-auth/ - path: /event-gateway/configure-mtls-backend-cluster-auth/ - path: /event-gateway/validate-avro-messages-with-schema-registry/ - - path: /event-gateway/kong-identity-metadata-integration/ - path: /event-gateway/configure-topic-aliases/ - title: "References" items: From 53c635aac2987b1b56709163030cfe31f164479d Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 15 Jun 2026 15:13:05 +0200 Subject: [PATCH 07/10] feat(egw): encrypt-fields docs (#5355) * feat(egw): encrypt-fields docs * feat(egw): decrypt-fields docs * feat: howto * chore: links * chore: review fixes * copyedit and add cross-links --------- Co-authored-by: lena-larionova --- .../examples/decrypt-with-aws.yml | 30 ++ .../examples/decrypt-with-static-key.yml | 35 ++ .../decrypt-fields/index.md | 85 ++++ .../examples/encrypt-with-aws.yml | 38 ++ .../examples/encrypt-with-static-key.yml | 41 ++ .../encrypt-fields/index.md | 84 ++++ app/_event_gateway_policies/encrypt/index.md | 2 +- ...kafka-message-fields-with-event-gateway.md | 395 ++++++++++++++++++ ...crypt-kafka-messages-with-event-gateway.md | 5 + app/_landing_pages/event-gateway.yaml | 5 +- app/event-gateway/entities/policy.md | 2 + 11 files changed, 719 insertions(+), 3 deletions(-) create mode 100644 app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml create mode 100644 app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml create mode 100644 app/_event_gateway_policies/decrypt-fields/index.md create mode 100644 app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml create mode 100644 app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml create mode 100644 app/_event_gateway_policies/encrypt-fields/index.md create mode 100644 app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md diff --git a/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml new file mode 100644 index 0000000000..d57fbcadad --- /dev/null +++ b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml @@ -0,0 +1,30 @@ +title: Decrypt fields using an AWS key vault + +description: Decrypt a message field using a specific AWS key vault. + +extended_description: | + Decrypt a message field using a specific AWS key vault. + +weight: 900 + +requirements: + - "A corresponding [Encrypt fields policy](/event-gateway/policies/encrypt-fields/examples/encrypt-with-aws/). + {{site.event_gateway_short}} uses the AWS ARN from the Encrypt fields policy to find the key for the Decrypt fields policy." + +name: decrypt-using-aws +type: decrypt_fields +config: + failure_mode: mark + key_sources: + - type: aws + decrypt_fields: + paths: + - match: personal.ssn + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml new file mode 100644 index 0000000000..1d20f77f1c --- /dev/null +++ b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml @@ -0,0 +1,35 @@ +title: Decrypt fields using a static key + +description: Decrypt a message field using a static key. + + +extended_description: | + Decrypt a message field using a static key. + + The [static key](/event-gateway/entities/static-key/) must be a secret reference to a 256-bit (32-byte) base64-encoded string, or the key itself as a string. + We recommend using secret references to avoid exposing sensitive data in your configuration. + +requirements: + - "A [static key](/event-gateway/entities/static-key/)." + - "A corresponding [Encrypt fields policy](/event-gateway/policies/encrypt-fields/examples/encrypt-with-static-key/) that uses the static key. + {{site.event_gateway_short}} uses the key reference in the message payload set by the Encrypt fields policy and looks for the actual key in `key_sources` to successfully decrypt." + +weight: 900 + +type: decrypt_fields +name: decrypt-static-key +config: + failure_mode: mark + key_sources: + - type: static + decrypt_fields: + paths: + - match: personal.ssn + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/decrypt-fields/index.md b/app/_event_gateway_policies/decrypt-fields/index.md new file mode 100644 index 0000000000..085e29651f --- /dev/null +++ b/app/_event_gateway_policies/decrypt-fields/index.md @@ -0,0 +1,85 @@ +--- +title: Decrypt Fields +name: Decrypt Fields +content_type: plugin +description: Decrypt fields of a Kafka message that were previously encrypted using the referenced key. +products: + - event-gateway +works_on: + - konnect +tags: + - event-gateway + +schema: + api: konnect/event-gateway + path: /schemas/EventGatewayParsedRecordDecryptFieldsPolicy + +api_specs: + - konnect/event-gateway + +related_resources: + - text: Encrypt fields policy + url: /event-gateway/policies/encrypt-fields/ + - text: Virtual clusters + url: /event-gateway/entities/virtual-cluster/ + - text: Policies + url: /event-gateway/entities/policy/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt and decrypt Kafka message fields with {{site.event_gateway}} + url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + +phases: + - consume + +policy_target: virtual_cluster + +categories: + - security + +icon: graph.svg + +min_version: + event-gateway: '1.2' +--- + +The Decrypt Policy decrypts fields of Kafka messages that were previously encrypted using a referenced key. +Use this policy to enforce consistent decryption standards across {{site.event_gateway}} clients. + +This policy uses AES-256-GCM for decryption, therefore keys must be 256 bits long. + +Use this policy together with the [Encrypt fields policy](/event-gateway/policies/encrypt-fields/), which encrypts fields of a message using the same referenced key. + +## Use cases + +Common use cases for the Decrypt fields policy: + + +{% table %} +columns: + - title: Use case + key: use_case + - title: Description + key: description +rows: + - use_case: "[Example: Decrypt a field using a static key](/event-gateway/policies/decrypt-fields/examples/decrypt-with-static-key/)" + description: Decrypt a message field based on a key reference name. + + - use_case: "[Example: Decrypt a field using an AWS key source](/event-gateway/policies/decrypt-fields/examples/decrypt-with-aws/)" + description: Decrypt a message field using an AWS key source. + +{% endtable %} + + +## How it works + +This policy runs during the [consume phase](/event-gateway/entities/policy/#phases), after [schema validation](/event-gateway/policies/schema-validation-consume/) has taken place. + +{% include_cached /knep/encrypt-decrypt-diagram.md %} + +{% include_cached /knep/how-encrypt-works.md %} + +### Key sources + +{% include_cached /knep/key-sources.md name=page.name %} + diff --git a/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml new file mode 100644 index 0000000000..cafe7ae57e --- /dev/null +++ b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml @@ -0,0 +1,38 @@ +title: Encrypt fields with AWS Key Vault + +description: Use an AWS Key Vault to encrypt fields of a message value. + +weight: 900 + +requirements: + - "An [AWS KMS key ARN](https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credproviders.html#credproviders-default-credentials-provider-chain)." + - "A corresponding [field decryption policy](/event-gateway/policies/decrypt-fields/examples/decrypt-with-aws/)." + +variables: + policy_id: + description: The UUID of the parent Schema Validation policy. + value: $PARENT_POLICY_ID + key_id: + description: | + The KMS key ARN in this format: `arn:aws:kms:REGION:ACCOUNT_ID:key/KEY_ID` + value: "$AWS_KEY_ARN" + +type: encrypt_fields +name: encrypt-ssn-field +parent_policy_id: ${policy_id} +config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: aws + arn: ${key_id} + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml new file mode 100644 index 0000000000..b16452438b --- /dev/null +++ b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml @@ -0,0 +1,41 @@ +title: Encrypt fields using a static key + +description: Encrypt fields of a message value using a static key. + +extended_description: | + Encrypt fields of a message value using a static key. + + The [static key](/event-gateway/entities/static-key/) must be a secret reference to a 256-bit (32-byte) base64-encoded string, or the key itself as a string. + We recommend using secret references to avoid exposing sensitive data in your configuration. + +weight: 900 + +requirements: + - "A [static key](/event-gateway/entities/static-key/) named `my-static-key`." + - "A corresponding [field decryption policy](/event-gateway/policies/decrypt-fields/examples/decrypt-with-static-key/)." + +variables: + policy_id: + description: The UUID of the parent Schema Validation policy. + value: $PARENT_POLICY_ID + +type: encrypt_fields +name: encrypt-ssn-field +parent_policy_id: ${policy_id} +config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: static + key: + name: my-static-key + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/encrypt-fields/index.md b/app/_event_gateway_policies/encrypt-fields/index.md new file mode 100644 index 0000000000..b8a6e47681 --- /dev/null +++ b/app/_event_gateway_policies/encrypt-fields/index.md @@ -0,0 +1,84 @@ +--- +title: Encrypt Fields +name: Encrypt Fields +content_type: plugin +description: Encrypt fields of Kafka records. +products: + - event-gateway +works_on: + - konnect +tags: + - event-gateway + +schema: + api: konnect/event-gateway + path: /schemas/EventGatewayParsedRecordEncryptFieldsPolicy + +api_specs: + - konnect/event-gateway + +icon: graph.svg + +phases: + - produce + +policy_target: virtual_cluster + +categories: + - security + +related_resources: + - text: Decrypt Fields policy + url: /event-gateway/policies/decrypt-fields/ + - text: Virtual clusters + url: /event-gateway/entities/virtual-cluster/ + - text: Policies + url: /event-gateway/entities/policy/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt and decrypt Kafka message fields with {{site.event_gateway}} + url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + +min_version: + event-gateway: '1.2' +--- + +The Encrypt Fields policy is used to encrypt fields of Kafka messages that have been validated to conform to a schema. + +This policy uses AES-256-GCM for encryption, therefore keys must be 256 bits long. + +Use the Encrypt Fields policy together with the [Decrypt Fields policy](/event-gateway/policies/decrypt-fields/), which decrypts fields of a message using the same referenced key, to enforce standards for encryption across {{site.event_gateway}} clients. + +## Use cases + +Common use cases for the Encrypt Fields policy: + + +{% table %} +columns: + - title: Use case + key: use_case + - title: Description + key: description +rows: + - use_case: "[Encrypt a message field using a static key](/event-gateway/policies/encrypt-fields/examples/encrypt-with-static-key/)" + description: Encrypt a message field using a static key. + + - use_case: "[Encrypt a message field using an AWS key source](/event-gateway/policies/encrypt-fields/examples/encrypt-with-aws/)" + description: Encrypt a message field using an AWS key source. + +{% endtable %} + + +## How it works + +This policy runs during the [produce phase](/event-gateway/entities/policy/#phases), after [schema validation](/event-gateway/policies/schema-validation-produce/) has taken place. + +{% include_cached /knep/encrypt-decrypt-diagram.md %} + +{% include_cached /knep/how-encrypt-works.md %} + +### Key sources + +{% include_cached /knep/key-sources.md name=page.name %} + diff --git a/app/_event_gateway_policies/encrypt/index.md b/app/_event_gateway_policies/encrypt/index.md index bf678492d1..69ca2e62e3 100644 --- a/app/_event_gateway_policies/encrypt/index.md +++ b/app/_event_gateway_policies/encrypt/index.md @@ -76,4 +76,4 @@ This policy runs during the [produce phase](/event-gateway/entities/policy/#phas ### Key sources -{% include_cached /knep/key-sources.md name=page.name %} \ No newline at end of file +{% include_cached /knep/key-sources.md name=page.name %} diff --git a/app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md b/app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md new file mode 100644 index 0000000000..968924a2e5 --- /dev/null +++ b/app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md @@ -0,0 +1,395 @@ +--- +title: Encrypt and decrypt Kafka fields in message values with {{site.event_gateway}} +permalink: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ +content_type: how_to +breadcrumbs: + - /event-gateway/ + +products: + - event-gateway + +works_on: + - konnect + +tags: + - kafka + +description: Use this tutorial to encrypt and decrypt Kafka fields in Kafka message values with {{site.event_gateway}} using a static key. + +tldr: + q: How can I encrypt and decrypt Kafka specific fields of message values with {{site.event_gateway}}? + a: | + Generate a key and create a [static key](/event-gateway/entities/static-key/) entity, then create [field encryption](/event-gateway/policies/encrypt-fields/) and [field decryption](/event-gateway/policies/decrypt-fields/) policies to enable message encryption and decryption. + +tools: + - konnect-api + +prereqs: + inline: + - title: Install kafkactl + position: before + content: | + Install [kafkactl](https://github.com/deviceinsight/kafkactl?tab=readme-ov-file#installation). You'll need it to interact with Kafka clusters. + + - title: Start a local Kafka cluster + position: before + include_content: knep/docker-compose-start + +cleanup: + inline: + - title: Clean up {{site.event_gateway}} resources + include_content: cleanup/products/event-gateway + icon_url: /assets/icons/gateway.svg + +related_resources: + - text: "{{site.event_gateway_short}} Control Plane API" + url: /api/konnect/event-gateway/ + - text: Event Gateway + url: /event-gateway/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt Fields policy + url: /event-gateway/policies/encrypt-fields/ + - text: Decrypt Fields policy + url: /event-gateway/policies/decrypt-fields/ + - text: Encrypt and decrypt Kafka messages + url: /event-gateway/encrypt-kafka-messages-with-event-gateway/ +--- + +{:.info} +> If you need to encrypt and decrypt whole Kafka messages instead of specific fields, use the Decrypt and Encrypt policies. +See [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/) for a complete how-to guide. + +## Configure a Kafka cluster + +Now that we've configured the proxy, let's make sure the Kafka cluster is ready. + +In your local environment, set up the `kafkactl.yaml` config file for your Kafka cluster: + +{% validation custom-command %} +command: | + cat < kafkactl.yaml + contexts: + direct: + brokers: + - localhost:9095 + - localhost:9096 + - localhost:9094 + vc: + brokers: + - localhost:19092 + EOF +expected: + return_code: 0 +render_output: false +{% endvalidation %} + +## Add a backend cluster + +Run the following command to create a new backend cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters +status_code: 201 +method: POST +body: + name: default_backend_cluster + bootstrap_servers: + - kafka1:9092 + - kafka2:9092 + - kafka3:9092 + authentication: + type: anonymous + insecure_allow_anonymous_virtual_cluster_auth: true + tls: + enabled: false +extract_body: + - name: id + variable: BACKEND_CLUSTER_ID +capture: + - variable: BACKEND_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Add a virtual cluster + +Run the following command to create a new virtual cluster associated with our backend cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: example_virtual_cluster + destination: + id: $BACKEND_CLUSTER_ID + dns_label: vcluster-1 + authentication: + - type: anonymous + acl_mode: passthrough +extract_body: + - name: id + variable: VIRTUAL_CLUSTER_ID +capture: + - variable: VIRTUAL_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + + +## Add a listener + +A [listener](/event-gateway/entities/listener/) represents hostname-port or IP-port combinations that connect to TCP sockets. +In this example, we're going to use port mapping, so we need to expose a range of ports. + +Run the following command to create a new listener: + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners +status_code: 201 +method: POST +body: + name: example_listener + addresses: + - 0.0.0.0 + ports: + - 19092-19095 +extract_body: + - name: id + variable: LISTENER_ID +capture: + - variable: LISTENER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Add a listener policy + +The listener needs a policy to tell it how to process requests and what to do with them. +In this example, we're going to use the [Forward to Virtual Cluster](/event-gateway/policies/forward-to-virtual-cluster/) policy, +which will forward requests based on a defined mapping to our virtual cluster. + +Run the following command to add the listener policy: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies +status_code: 201 +method: POST +body: + type: forward_to_virtual_cluster + name: forward + config: + type: port_mapping + advertised_host: localhost + destination: + id: $VIRTUAL_CLUSTER_ID +{% endkonnect_api_request %} + + +For demo purposes, we're using port mapping, which assigns each Kafka broker to a dedicated port on the {{site.event_gateway_short}}. +In production, we recommend using [SNI routing](/event-gateway/architecture/#hostname-mapping) instead. + +## Generate a key + +Use OpenSSL to generate the key that will be used to encrypt and decrypt messages: + + +{% env_variables %} +MY_KEY: $(openssl rand -base64 32) +{% endenv_variables %} + + +## Add a static key + +Run the following command to create a new [static key](/event-gateway/entities/static-key/) named `my-key` with the key we generated: + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/static-keys +status_code: 201 +method: POST +body: + name: my-key + value: $MY_KEY +{% endkonnect_api_request %} + + +## Create a Schema Validation produce policy + +Create a [Schema Validation policy](/event-gateway/policies/schema-validation-produce/) that validates that all produced values are JSON encoded. + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies +status_code: 201 +method: POST +body: + type: schema_validation + name: produce_validate_json + config: + type: json + value_validation_action: reject +extract_body: + - name: id + variable: PRODUCE_SCHEMA_VALIDATION_ID +capture: + - variable: PRODUCE_SCHEMA_VALIDATION_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `value_validation_action: reject` setting ensures that the entire batch containing an invalid message is rejected, and the producer receives an error. +Alternatively, you can use `mark`, which passes the message to the broker but adds a `kong/sverr-value` header to flag it as invalid. + +## Add a field encryption policy + +Use the following command to create a [field encryption policy](/event-gateway/policies/encrypt-fields/) to enable encryption of one field in the JSON value: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies +status_code: 201 +method: POST +body: + name: encrypt-fields-static-key + parent_policy_id: $PRODUCE_SCHEMA_VALIDATION_ID + type: encrypt_fields + config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: static + key: + name: my-key +{% endkonnect_api_request %} + + +## Create a Schema Validation consume policy + +Create a [Schema Validation policy](/event-gateway/policies/schema-validation-consume/) that validates that all consumed values are JSON encoded. + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + type: schema_validation + name: consume_validate_json + config: + type: json + value_validation_action: mark +extract_body: + - name: id + variable: CONSUME_SCHEMA_VALIDATION_ID +capture: + - variable: CONSUME_SCHEMA_VALIDATION_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `value_validation_action: mark` passes the message to the broker but adds a `kong/sverr-value` header to flag it as invalid. + +## Add a field decryption policy + +Use the following command to create a [field decryption policy](/event-gateway/policies/decrypt-fields/) to enable decryption of one field in the JSON value: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + name: decrypt-fields-static-key + parent_policy_id: $CONSUME_SCHEMA_VALIDATION_ID + type: decrypt_fields + config: + failure_mode: error + key_sources: + - type: static + decrypt_fields: + paths: + - match: "personal.ssn" +{% endkonnect_api_request %} + + +## Validate + +Let's check that the encryption/decryption works. +First, create a topic using the `direct` context, which is a direct connection to our Kafka cluster: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct create topic my-test-topic +expected: + message: "topic created: my-test-topic" + return_code: 0 +render_output: false +{% endvalidation %} + +Produce a message using the `vc` context which should encrypt the message: +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc produce my-test-topic --value='{"personal": {"ssn": "100-00-00001"}}' +expected: + message: "message produced (partition=0 offset=0)" + return_code: 0 +render_output: false +{% endvalidation %} + + +You should see the following response: +```shell +message produced (partition=0 offset=0) +``` +{:.no-copy-code} + +Now let's verify that the message was encrypted by consuming the message directly: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct consume my-test-topic --exit --output json --from-beginning --print-headers +expected: + message: '"kong/enc": "\u0000\u0004\u0000-static://' + return_code: 0 +render_output: false +{% endvalidation %} + +You should see the following response: +```json +{ + "Partition": 0, + "Offset": 0, + "Headers": { + "kong/enc": "\u0000\u0004\u0000-static://" + }, + "Value": "{\"personal\":{\"ssn\":\"AHry69Jl4oJzafOlu/xOjVa37hpfYTAVXoAolj94NoBQSKz7dkEF/gg=\"}}" +} +``` +{:.no-copy-code} + +The field encryption policy appends a `kong/enc` header to each message. This header identifies the encryption key by its ID. + +Now let's verify that the field decryption policy works by consuming the message through the virtual cluster: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc consume my-test-topic --from-beginning --exit +expected: + message: '{"personal": {"ssn": "100-00-00001"}}' + return_code: 0 +render_output: false +{% endvalidation %} + +The output should look like this, with the value decrypted: + +```json +{"personal": {"ssn": "100-00-00001"}} +``` +{:.no-copy-code} + diff --git a/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md b/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md index 570dfb9f65..00084e851d 100644 --- a/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md +++ b/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md @@ -52,8 +52,13 @@ related_resources: url: /event-gateway/policies/encrypt/ - text: Decrypt policy url: /event-gateway/policies/decrypt/ + - text: Encrypt and decrypt Kafka message fields + url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ --- +{:.info} +> If you need to encrypt and decrypt Kafka message fields instead of entire messages, use the Decrypt Fields and Encrypt Fields policies. +See [Encrypt and decrypt Kafka message fields](/event-gateway/encrypt-kafka-message-fields-with-event-gateway/) for a complete how-to guide. ## Configure a Kafka cluster diff --git a/app/_landing_pages/event-gateway.yaml b/app/_landing_pages/event-gateway.yaml index 2839974ca1..7798e1d9c4 100644 --- a/app/_landing_pages/event-gateway.yaml +++ b/app/_landing_pages/event-gateway.yaml @@ -268,10 +268,11 @@ rows: - outcome: | Demonstrate compliance with data privacy regulations through auditable, enforceable controls feature: | - [Encryption policy](/event-gateway/policies/encrypt/) and - [Decryption policy](/event-gateway/policies/decrypt/) + [Encryption policy](/event-gateway/policies/encrypt/), [Decryption policy](/event-gateway/policies/decrypt/), + [Encrypt Fields policy](/event-gateway/policies/encrypt-fields/), [Decrypt Fields policy](/event-gateway/policies/decrypt-fields/) guide: | [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/) + or [encrypt and decrypt Kafka message fields](/event-gateway/encrypt-kafka-message-fields-with-event-gateway/) - usecase: | **Simplify your Kafka deployment** because it's too complicated and expensive outcomes: diff --git a/app/event-gateway/entities/policy.md b/app/event-gateway/entities/policy.md index 877d75751a..3cf225ed58 100644 --- a/app/event-gateway/entities/policy.md +++ b/app/event-gateway/entities/policy.md @@ -159,10 +159,12 @@ rows: - parent: "[Schema Validation produce](/event-gateway/policies/schema-validation-produce/)" nested: | * [Modify Headers](/event-gateway/policies/modify-headers/) + * [Encrypt Fields](/event-gateway/policies/encrypt-fields/) - parent: "[Schema Validation consume](/event-gateway/policies/schema-validation-consume/)" nested: | * [Modify Headers](/event-gateway/policies/modify-headers/) * [Skip Records](/event-gateway/policies/skip-record/) + * [Decrypt Fields](/event-gateway/policies/decrypt-fields/) {% endtable %} From 492fa016c97105fa752df0ea3b445f970b43249b Mon Sep 17 00:00:00 2001 From: lena-larionova <54370747+lena-larionova@users.noreply.github.com> Date: Mon, 15 Jun 2026 08:21:26 -0700 Subject: [PATCH 08/10] update headers, metrics, and bootstrap references for KEG 1.2 (#5570) --- .../event-gateway-bootstrap-schema/1.2.json | 31 +++++++++++++-- app/_references/event-gateway/1.2/metrics.md | 38 ++++++++++++++++--- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/app/_data/event-gateway-bootstrap-schema/1.2.json b/app/_data/event-gateway-bootstrap-schema/1.2.json index f7eb71d2d5..637716994f 100644 --- a/app/_data/event-gateway-bootstrap-schema/1.2.json +++ b/app/_data/event-gateway-bootstrap-schema/1.2.json @@ -148,8 +148,8 @@ "type": "string" }, "default": [ - "[::1]:8080", - "127.0.0.1:8080" + "127.0.0.1:8080", + "[::1]:8080" ] } }, @@ -292,8 +292,33 @@ ] }, "TracesSampler": { - "description": "Trace sampler strategy.\n\nCorresponds to the `OTEL_TRACES_SAMPLER` environment variable defined in the\n[OpenTelemetry SDK environment variables specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration).\n\nCurrently only `parentbased_traceidratio` is supported.\nThis field exists for spec compliance. Support for additional samplers may be added\nin future releases.", + "description": "Trace sampler strategy.\n\nCorresponds to the `OTEL_TRACES_SAMPLER` environment variable defined in the\n[OpenTelemetry SDK environment variables specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration).", "oneOf": [ + { + "description": "Always-On sampler", + "type": "string", + "const": "always_on" + }, + { + "description": "Always-Off sampler", + "type": "string", + "const": "always_off" + }, + { + "description": "Trace ID ratio-based sampler.", + "type": "string", + "const": "traceidratio" + }, + { + "description": "Parent-based Always-On sampler.", + "type": "string", + "const": "parentbased_always_on" + }, + { + "description": "Parent-based Always-Off sampler.", + "type": "string", + "const": "parentbased_always_off" + }, { "description": "Parent-based trace ID ratio sampler.", "type": "string", diff --git a/app/_references/event-gateway/1.2/metrics.md b/app/_references/event-gateway/1.2/metrics.md index c7b5e68c25..6c8b0b94a1 100644 --- a/app/_references/event-gateway/1.2/metrics.md +++ b/app/_references/event-gateway/1.2/metrics.md @@ -38,7 +38,7 @@ No labels documented. |Gauge |N/A | -**Description:** The version of the configuration loaded from the control plane +**Description:** Version number of the configuration loaded from the control plane. Updated each time the control plane pushes a new config. **Labels:** @@ -155,12 +155,15 @@ No labels documented. ### `kong.keg.kafka.decrypt.attempts` +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + |Type |Unit | |:---------|:---------| |Counter |N/A | -**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to decrypt records. This includes both successful and failed calls +**Description:** The number of attempts to decrypt records. This includes both successful and failed calls **Labels:** @@ -173,12 +176,15 @@ No labels documented. ### `kong.keg.kafka.encrypt.attempts` +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + |Type |Unit | |:---------|:---------| |Counter |N/A | -**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to encrypt records. This includes both successful and failed calls +**Description:** The number of attempts to encrypt records. This includes both successful and failed calls **Labels:** @@ -191,12 +197,15 @@ No labels documented. ### `kong.keg.kafka.kscheme.attempts` +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + |Type |Unit | |:---------|:---------| |Counter |N/A | -**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to run kscheme scripts. This includes both successful and failed calls +**Description:** The number of attempts to run kscheme scripts. This includes both successful and failed calls **Labels:** @@ -443,12 +452,15 @@ No labels documented. ### `kong.keg.kafka.schema.validation.attempts` +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + |Type |Unit | |:---------|:---------| |Counter |N/A | -**Description:** DEPRECATED - Use kong.keg.kafka.policy.invocations instead. The number of attempts to validate schema. This includes both successful and failed calls +**Description:** The number of attempts to validate schema. This includes both successful and failed calls **Labels:** @@ -480,6 +492,9 @@ No labels documented. ### `kong.keg.konnect.analytics.bytes.sent` +{:.info} +> **Deprecated.** Use `kong.keg.konnect.analytics.sent` instead: the standard unit `By` belongs in metric metadata, not in the name. + |Type |Unit | |:---------|:---------| |Counter |N/A | @@ -530,6 +545,19 @@ No labels documented. No labels documented. +### `kong.keg.konnect.analytics.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |`by` | + + +**Description:** Total bytes sent in binary websocket messages to the analytics endpoint + +**Labels:** + +No labels documented. + ### `kong.keg.konnect.analytics.websocket.errors` |Type |Unit | From 1427eff8e58a3aeadb3409fe9af2aceee78375bc Mon Sep 17 00:00:00 2001 From: Charly Molter Date: Tue, 16 Jun 2026 17:38:43 +0200 Subject: [PATCH 09/10] fix(event-gateway): add a warning about internal headers (#5600) Making this clear to avoid people accidently using the product's reserved namespace EVG-207 Signed-off-by: Charly Molter --- app/_event_gateway_policies/modify-headers/index.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/_event_gateway_policies/modify-headers/index.md b/app/_event_gateway_policies/modify-headers/index.md index c677e91688..56196a492d 100644 --- a/app/_event_gateway_policies/modify-headers/index.md +++ b/app/_event_gateway_policies/modify-headers/index.md @@ -37,6 +37,9 @@ icon: graph.svg The Modify Headers policy can set or remove headers on requests. +{:.warning} +> Headers prefixed with `kong/` are reserved for {{site.event_gateway_short}} internal use. The Modify Headers policy doesn't prevent you from reading, writing, or modifying them, but doing so isn't recommended and can interfere with {{site.event_gateway_short}} behavior. For the full list of reserved headers, see the [{{site.event_gateway}} headers reference](/event-gateway/headers/). + ## Use cases Common use cases for the Modify Headers policy: From e9675e3f35cafa8f926ab8878748a0f1d2291f19 Mon Sep 17 00:00:00 2001 From: Charly Molter Date: Thu, 18 Jun 2026 21:00:43 +0200 Subject: [PATCH 10/10] =?UTF-8?q?chore(event-gateway):=20clarify=20that=20?= =?UTF-8?q?we=20don't=20support=20multiple=20header=20v=E2=80=A6=20(#5601)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore(event-gateway): clarify that we don't support multiple header values This is just an existing limitation of our product EVG-208 Signed-off-by: Charly Molter * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * Formatting & clarity edits Co-authored-by: lena-larionova <54370747+lena-larionova@users.noreply.github.com> --------- Signed-off-by: Charly Molter Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: lena-larionova <54370747+lena-larionova@users.noreply.github.com> --- app/event-gateway/known-limitations.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/app/event-gateway/known-limitations.md b/app/event-gateway/known-limitations.md index a9c3593590..58abeedd9b 100644 --- a/app/event-gateway/known-limitations.md +++ b/app/event-gateway/known-limitations.md @@ -29,4 +29,17 @@ breadcrumbs: * [Compacted topics](https://docs.confluent.io/kafka/design/log_compaction.html#topic-compaction) used with policies and namespaces are untested +## Record headers + +Kafka record headers are multi-valued: the [Kafka protocol](https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) allows the same header key to appear more than once in a single record, and clients can read every occurrence. + +{{site.event_gateway_short}} models record headers as a single value per key. +When a record is processed by a policy that reads or transforms headers (for example, Encryption, Schema Validation, or Modify Headers policies), records that carry duplicate header keys are collapsed so that only the **last** value for each key is kept. +Earlier values with the same key are dropped. + +This only affects records that {{site.event_gateway_short}} decodes to apply record-level policies. +If your clients rely on multiple headers that share the same key, don't route those topics through header-transforming policies. + +See the [{{site.event_gateway}} headers reference](/event-gateway/headers/) for the headers {{site.event_gateway_short}} adds and interprets. +