diff --git a/.github/styles/base/Dictionary.txt b/.github/styles/base/Dictionary.txt index 095289ff87..c4b6a19081 100644 --- a/.github/styles/base/Dictionary.txt +++ b/.github/styles/base/Dictionary.txt @@ -810,6 +810,7 @@ udpingress udpingresses UDPRoute UDPRoutes +uint uid UIs ulimit diff --git a/app/_data/event-gateway-bootstrap-schema/1.2.json b/app/_data/event-gateway-bootstrap-schema/1.2.json new file mode 100644 index 0000000000..637716994f --- /dev/null +++ b/app/_data/event-gateway-bootstrap-schema/1.2.json @@ -0,0 +1,404 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "BootstrapConfig", + "type": "object", + "properties": { + "konnect": { + "description": "Base Konnect configuration.\n\nContains all the necessary information to connect to Konnect,\nincluding TLS configuration for mTLS authentication.", + "type": "object", + "properties": { + "region": { + "description": "The Konnect region to connect to (e.g., \"us\", \"eu\")", + "type": "string" + }, + "domain": { + "description": "The Konnect domain (e.g., \"konghq.com\", \"konghq.tech\")", + "type": "string", + "default": "konghq.com" + }, + "gateway_cluster_id": { + "description": "The Gateway Cluster ID (also known as Control Plane ID)", + "type": "string" + }, + "api_request_timeout": { + "description": "Timeout for API requests to Konnect (in seconds)", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "5s" + }, + "insecure_skip_verify": { + "description": "Skip TLS verification (insecure, for testing only)", + "type": "boolean", + "default": false + }, + "client_cert": { + "description": "Client certificate for mTLS (PEM format)", + "type": [ + "string", + "null" + ] + }, + "client_cert_path": { + "description": "Path to client certificate file", + "type": [ + "string", + "null" + ] + }, + "client_key": { + "description": "Client private key for mTLS (PEM format)", + "anyOf": [ + { + "$ref": "#/$defs/Secret" + }, + { + "type": "null" + } + ] + }, + "client_key_path": { + "description": "Path to client private key file", + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "region", + "gateway_cluster_id" + ], + "allOf": [ + { + "oneOf": [ + { + "required": [ + "client_cert" + ] + }, + { + "required": [ + "client_cert_path" + ] + } + ] + }, + { + "oneOf": [ + { + "required": [ + "client_key" + ] + }, + { + "required": [ + "client_key_path" + ] + } + ] + } + ] + }, + "config_poll_interval": { + "description": "The interval at which to poll for configuration updates\n\nEnvironment variable: `KEG__CONFIG_POLL_INTERVAL`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "5s" + }, + "runtime": { + "description": "Runtime configuration", + "type": "object", + "properties": { + "drain_duration": { + "description": "Duration to wait for existing connections to drain. It is the time between\n\nEnvironment variable: `KEG__RUNTIME__DRAIN_DURATION`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "5s" + }, + "shutdown_timeout": { + "description": "Timeout for graceful shutdown. It is the time between sending the shutdown\nsignal and forcefully terminating the process.\n\nEnvironment variable: `KEG__RUNTIME__SHUTDOWN_TIMEOUT`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "health_listener_address_port": { + "description": "Address and port for the health listener.\n\nEnvironment variable: `KEG__RUNTIME__HEALTH_LISTENER_ADDRESS_PORT`", + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "127.0.0.1:8080", + "[::1]:8080" + ] + } + }, + "additionalProperties": false + }, + "observability": { + "description": "Observability configuration", + "type": "object", + "properties": { + "log_flags": { + "description": "Configure the log level. Possible levels are: \"trace\", \"debug\", \"info\", \"warn\", \"error\"\n\nThe format is a comma-separated list of log levels for different modules, e.g., `info,keg=debug,my_module=trace`\nIf only a single level is provided, it applies to all modules.\n\nEnvironment variable: `KEG__OBSERVABILITY__LOG_FLAGS`", + "type": "string", + "default": "info" + }, + "log_format": { + "description": "Configure the log format.\n\nEnvironment variable: `KEG__OBSERVABILITY__LOG_FORMAT`", + "type": "string", + "enum": [ + "logfmt", + "json" + ] + }, + "metrics_rollup_allow_map": { + "description": "Configure the metrics rollup allow map, this is to avoid high cardinality metrics for example `messaging.operation.name`, `messaging.destination.name` or `messaging.destination.partition.id`.\nThe format is `=,|=,`\nNon-allowed values will be mapped to \"other\"\n\nEnvironment variable: `KEG__OBSERVABILITY__METRICS_ROLLUP_ALLOW_MAP`", + "type": "string", + "default": "messaging.operation.name=produce,fetch" + }, + "policy_errors_info_log_interval": { + "description": "Interval for INFO-level logging of policy errors (time-based sampling).\nSet to \"0s\" to disable.\n\nEnvironment variable: `KEG__OBSERVABILITY__POLICY_ERRORS_INFO_LOG_INTERVAL`", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "1s" + }, + "otel": { + "description": "OpenTelemetry configuration", + "$ref": "#/$defs/OtelConfig" + } + }, + "additionalProperties": false + }, + "enable_debug_endpoints": { + "description": "Enables debug endpoints:\n\n- /debug/pprof/allocs endpoint\n Additionally, you need to enable jemalloc profiling like this `MALLOC_CONF=\"prof:true,prof_active:true,lg_prof_sample:19\"`\n\nEnvironment variable: `KEG__ENABLE_DEBUG_ENDPOINTS`", + "type": "boolean", + "default": false + } + }, + "additionalProperties": false, + "$defs": { + "Secret": { + "description": "Wrapper for sensitive values that redacts content in debug output.\n\nThis type wraps the `secrecy` crate's `SecretString` and adds `JsonSchema`\nsupport for configuration schema generation. The underlying value is\nzeroized on drop for memory protection.", + "type": "string" + }, + "OtelConfig": { + "description": "OTLP exporter configuration for all signals.\n\nFollows the [OpenTelemetry OTLP exporter specification](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/).\nSignal-specific environment variables (e.g., `OTEL_EXPORTER_OTLP_TRACES_*`) take\nprecedence over base variables (`OTEL_EXPORTER_OTLP_*`).\n\nEnvironment variables:\n- Base: `OTEL_EXPORTER_OTLP_ENDPOINT`, `OTEL_EXPORTER_OTLP_PROTOCOL`, `OTEL_EXPORTER_OTLP_TIMEOUT`\n- Traces: `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`, `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`\n - Sampler: `OTEL_TRACES_SAMPLER`, `OTEL_TRACES_SAMPLER_ARG`\n- Metrics: `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`, `OTEL_EXPORTER_OTLP_METRICS_PROTOCOL`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`\n- Logs: `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT`, `OTEL_EXPORTER_OTLP_LOGS_PROTOCOL`, `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT`", + "type": "object", + "properties": { + "traces": { + "description": "OTLP exporter and sampler configuration for traces.", + "$ref": "#/$defs/OtelTracesConfig" + }, + "metrics": { + "description": "OTLP exporter configuration for metrics.", + "$ref": "#/$defs/OtelMetricsConfig" + }, + "logs": { + "description": "OTLP exporter configuration for logs.", + "$ref": "#/$defs/OtelLogsConfig" + } + } + }, + "OtelTracesConfig": { + "description": "Trace signal configuration: OTLP exporter settings plus sampler configuration.", + "type": "object", + "properties": { + "endpoint": { + "description": "OTLP endpoint URL.\n\nFor HTTP protocols, should include the full path (e.g., `http://localhost:4318/v1/{signal}`).\nFor gRPC, should be the base URL (e.g., `http://localhost:4317`).\n\nIf not set, the signal is considered disabled.", + "type": [ + "string", + "null" + ], + "default": null + }, + "protocol": { + "description": "OTLP transport protocol (grpc, http/binary, http/json).", + "$ref": "#/$defs/OtlpProtocol" + }, + "timeout": { + "description": "Request timeout for the OTLP exporter.", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "headers": { + "description": "Additional headers to send with OTLP requests.\n\nHeaders are specified as key-value pairs.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "default": {} + }, + "traces_sampler": { + "description": "Trace sampler strategy.", + "$ref": "#/$defs/TracesSampler" + }, + "traces_sampler_arg": { + "description": "Trace sampler argument.", + "type": "string", + "default": "1.0" + } + } + }, + "OtlpProtocol": { + "description": "OTLP transport protocol.\n\nWraps `opentelemetry_otlp::Protocol` with serde and schema support.", + "oneOf": [ + { + "description": "gRPC protocol", + "type": "string", + "const": "grpc" + }, + { + "description": "HTTP protocol with binary encoding (default)", + "type": "string", + "const": "http/binary" + }, + { + "description": "HTTP protocol with JSON encoding", + "type": "string", + "const": "http/json" + } + ] + }, + "TracesSampler": { + "description": "Trace sampler strategy.\n\nCorresponds to the `OTEL_TRACES_SAMPLER` environment variable defined in the\n[OpenTelemetry SDK environment variables specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration).", + "oneOf": [ + { + "description": "Always-On sampler", + "type": "string", + "const": "always_on" + }, + { + "description": "Always-Off sampler", + "type": "string", + "const": "always_off" + }, + { + "description": "Trace ID ratio-based sampler.", + "type": "string", + "const": "traceidratio" + }, + { + "description": "Parent-based Always-On sampler.", + "type": "string", + "const": "parentbased_always_on" + }, + { + "description": "Parent-based Always-Off sampler.", + "type": "string", + "const": "parentbased_always_off" + }, + { + "description": "Parent-based trace ID ratio sampler.", + "type": "string", + "const": "parentbased_traceidratio" + } + ] + }, + "OtelMetricsConfig": { + "description": "Metrics signal configuration: OTLP exporter settings for metrics.\n\nAll fields from [`OtelSignalConfig`] are available via `#[serde(flatten)]`.", + "type": "object", + "properties": { + "endpoint": { + "description": "OTLP endpoint URL.\n\nFor HTTP protocols, should include the full path (e.g., `http://localhost:4318/v1/{signal}`).\nFor gRPC, should be the base URL (e.g., `http://localhost:4317`).\n\nIf not set, the signal is considered disabled.", + "type": [ + "string", + "null" + ], + "default": null + }, + "protocol": { + "description": "OTLP transport protocol (grpc, http/binary, http/json).", + "$ref": "#/$defs/OtlpProtocol" + }, + "timeout": { + "description": "Request timeout for the OTLP exporter.", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "headers": { + "description": "Additional headers to send with OTLP requests.\n\nHeaders are specified as key-value pairs.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "default": {} + } + } + }, + "OtelLogsConfig": { + "description": "Logs signal configuration: OTLP exporter settings for logs.\n\nAll fields from [`OtelSignalConfig`] are available via `#[serde(flatten)]`.", + "type": "object", + "properties": { + "endpoint": { + "description": "OTLP endpoint URL.\n\nFor HTTP protocols, should include the full path (e.g., `http://localhost:4318/v1/{signal}`).\nFor gRPC, should be the base URL (e.g., `http://localhost:4317`).\n\nIf not set, the signal is considered disabled.", + "type": [ + "string", + "null" + ], + "default": null + }, + "protocol": { + "description": "OTLP transport protocol (grpc, http/binary, http/json).", + "$ref": "#/$defs/OtlpProtocol" + }, + "timeout": { + "description": "Request timeout for the OTLP exporter.", + "type": "string", + "pattern": "^\\d+(\\.\\d+)?(ns|us|µs|ms|s|m|h)$", + "examples": [ + "5s", + "100ms", + "1h30m" + ], + "default": "10s" + }, + "headers": { + "description": "Additional headers to send with OTLP requests.\n\nHeaders are specified as key-value pairs.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "default": {} + } + } + } + } +} \ No newline at end of file diff --git a/app/_data/products/event-gateway.yml b/app/_data/products/event-gateway.yml index d5f10d05c9..e2144be1d0 100644 --- a/app/_data/products/event-gateway.yml +++ b/app/_data/products/event-gateway.yml @@ -14,4 +14,10 @@ releases: sunset: "2028-03-25" version: "1.1.0" name: "v1" - latest: true \ No newline at end of file + - release: "1.2" + latest: true + release_date: "TBA" + eol: "TBA" + sunset: "TBA" + version: "1.2.0" + name: "v1" \ No newline at end of file diff --git a/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml new file mode 100644 index 0000000000..d57fbcadad --- /dev/null +++ b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-aws.yml @@ -0,0 +1,30 @@ +title: Decrypt fields using an AWS key vault + +description: Decrypt a message field using a specific AWS key vault. + +extended_description: | + Decrypt a message field using a specific AWS key vault. + +weight: 900 + +requirements: + - "A corresponding [Encrypt fields policy](/event-gateway/policies/encrypt-fields/examples/encrypt-with-aws/). + {{site.event_gateway_short}} uses the AWS ARN from the Encrypt fields policy to find the key for the Decrypt fields policy." + +name: decrypt-using-aws +type: decrypt_fields +config: + failure_mode: mark + key_sources: + - type: aws + decrypt_fields: + paths: + - match: personal.ssn + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml new file mode 100644 index 0000000000..1d20f77f1c --- /dev/null +++ b/app/_event_gateway_policies/decrypt-fields/examples/decrypt-with-static-key.yml @@ -0,0 +1,35 @@ +title: Decrypt fields using a static key + +description: Decrypt a message field using a static key. + + +extended_description: | + Decrypt a message field using a static key. + + The [static key](/event-gateway/entities/static-key/) must be a secret reference to a 256-bit (32-byte) base64-encoded string, or the key itself as a string. + We recommend using secret references to avoid exposing sensitive data in your configuration. + +requirements: + - "A [static key](/event-gateway/entities/static-key/)." + - "A corresponding [Encrypt fields policy](/event-gateway/policies/encrypt-fields/examples/encrypt-with-static-key/) that uses the static key. + {{site.event_gateway_short}} uses the key reference in the message payload set by the Encrypt fields policy and looks for the actual key in `key_sources` to successfully decrypt." + +weight: 900 + +type: decrypt_fields +name: decrypt-static-key +config: + failure_mode: mark + key_sources: + - type: static + decrypt_fields: + paths: + - match: personal.ssn + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/decrypt-fields/index.md b/app/_event_gateway_policies/decrypt-fields/index.md new file mode 100644 index 0000000000..085e29651f --- /dev/null +++ b/app/_event_gateway_policies/decrypt-fields/index.md @@ -0,0 +1,85 @@ +--- +title: Decrypt Fields +name: Decrypt Fields +content_type: plugin +description: Decrypt fields of a Kafka message that were previously encrypted using the referenced key. +products: + - event-gateway +works_on: + - konnect +tags: + - event-gateway + +schema: + api: konnect/event-gateway + path: /schemas/EventGatewayParsedRecordDecryptFieldsPolicy + +api_specs: + - konnect/event-gateway + +related_resources: + - text: Encrypt fields policy + url: /event-gateway/policies/encrypt-fields/ + - text: Virtual clusters + url: /event-gateway/entities/virtual-cluster/ + - text: Policies + url: /event-gateway/entities/policy/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt and decrypt Kafka message fields with {{site.event_gateway}} + url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + +phases: + - consume + +policy_target: virtual_cluster + +categories: + - security + +icon: graph.svg + +min_version: + event-gateway: '1.2' +--- + +The Decrypt Policy decrypts fields of Kafka messages that were previously encrypted using a referenced key. +Use this policy to enforce consistent decryption standards across {{site.event_gateway}} clients. + +This policy uses AES-256-GCM for decryption, therefore keys must be 256 bits long. + +Use this policy together with the [Encrypt fields policy](/event-gateway/policies/encrypt-fields/), which encrypts fields of a message using the same referenced key. + +## Use cases + +Common use cases for the Decrypt fields policy: + + +{% table %} +columns: + - title: Use case + key: use_case + - title: Description + key: description +rows: + - use_case: "[Example: Decrypt a field using a static key](/event-gateway/policies/decrypt-fields/examples/decrypt-with-static-key/)" + description: Decrypt a message field based on a key reference name. + + - use_case: "[Example: Decrypt a field using an AWS key source](/event-gateway/policies/decrypt-fields/examples/decrypt-with-aws/)" + description: Decrypt a message field using an AWS key source. + +{% endtable %} + + +## How it works + +This policy runs during the [consume phase](/event-gateway/entities/policy/#phases), after [schema validation](/event-gateway/policies/schema-validation-consume/) has taken place. + +{% include_cached /knep/encrypt-decrypt-diagram.md %} + +{% include_cached /knep/how-encrypt-works.md %} + +### Key sources + +{% include_cached /knep/key-sources.md name=page.name %} + diff --git a/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml new file mode 100644 index 0000000000..cafe7ae57e --- /dev/null +++ b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-aws.yml @@ -0,0 +1,38 @@ +title: Encrypt fields with AWS Key Vault + +description: Use an AWS Key Vault to encrypt fields of a message value. + +weight: 900 + +requirements: + - "An [AWS KMS key ARN](https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credproviders.html#credproviders-default-credentials-provider-chain)." + - "A corresponding [field decryption policy](/event-gateway/policies/decrypt-fields/examples/decrypt-with-aws/)." + +variables: + policy_id: + description: The UUID of the parent Schema Validation policy. + value: $PARENT_POLICY_ID + key_id: + description: | + The KMS key ARN in this format: `arn:aws:kms:REGION:ACCOUNT_ID:key/KEY_ID` + value: "$AWS_KEY_ARN" + +type: encrypt_fields +name: encrypt-ssn-field +parent_policy_id: ${policy_id} +config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: aws + arn: ${key_id} + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml new file mode 100644 index 0000000000..b16452438b --- /dev/null +++ b/app/_event_gateway_policies/encrypt-fields/examples/encrypt-with-static-key.yml @@ -0,0 +1,41 @@ +title: Encrypt fields using a static key + +description: Encrypt fields of a message value using a static key. + +extended_description: | + Encrypt fields of a message value using a static key. + + The [static key](/event-gateway/entities/static-key/) must be a secret reference to a 256-bit (32-byte) base64-encoded string, or the key itself as a string. + We recommend using secret references to avoid exposing sensitive data in your configuration. + +weight: 900 + +requirements: + - "A [static key](/event-gateway/entities/static-key/) named `my-static-key`." + - "A corresponding [field decryption policy](/event-gateway/policies/decrypt-fields/examples/decrypt-with-static-key/)." + +variables: + policy_id: + description: The UUID of the parent Schema Validation policy. + value: $PARENT_POLICY_ID + +type: encrypt_fields +name: encrypt-ssn-field +parent_policy_id: ${policy_id} +config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: static + key: + name: my-static-key + +tools: + - konnect-api + - terraform + - kongctl + +min_version: + event-gateway: '1.2' \ No newline at end of file diff --git a/app/_event_gateway_policies/encrypt-fields/index.md b/app/_event_gateway_policies/encrypt-fields/index.md new file mode 100644 index 0000000000..b8a6e47681 --- /dev/null +++ b/app/_event_gateway_policies/encrypt-fields/index.md @@ -0,0 +1,84 @@ +--- +title: Encrypt Fields +name: Encrypt Fields +content_type: plugin +description: Encrypt fields of Kafka records. +products: + - event-gateway +works_on: + - konnect +tags: + - event-gateway + +schema: + api: konnect/event-gateway + path: /schemas/EventGatewayParsedRecordEncryptFieldsPolicy + +api_specs: + - konnect/event-gateway + +icon: graph.svg + +phases: + - produce + +policy_target: virtual_cluster + +categories: + - security + +related_resources: + - text: Decrypt Fields policy + url: /event-gateway/policies/decrypt-fields/ + - text: Virtual clusters + url: /event-gateway/entities/virtual-cluster/ + - text: Policies + url: /event-gateway/entities/policy/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt and decrypt Kafka message fields with {{site.event_gateway}} + url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + +min_version: + event-gateway: '1.2' +--- + +The Encrypt Fields policy is used to encrypt fields of Kafka messages that have been validated to conform to a schema. + +This policy uses AES-256-GCM for encryption, therefore keys must be 256 bits long. + +Use the Encrypt Fields policy together with the [Decrypt Fields policy](/event-gateway/policies/decrypt-fields/), which decrypts fields of a message using the same referenced key, to enforce standards for encryption across {{site.event_gateway}} clients. + +## Use cases + +Common use cases for the Encrypt Fields policy: + + +{% table %} +columns: + - title: Use case + key: use_case + - title: Description + key: description +rows: + - use_case: "[Encrypt a message field using a static key](/event-gateway/policies/encrypt-fields/examples/encrypt-with-static-key/)" + description: Encrypt a message field using a static key. + + - use_case: "[Encrypt a message field using an AWS key source](/event-gateway/policies/encrypt-fields/examples/encrypt-with-aws/)" + description: Encrypt a message field using an AWS key source. + +{% endtable %} + + +## How it works + +This policy runs during the [produce phase](/event-gateway/entities/policy/#phases), after [schema validation](/event-gateway/policies/schema-validation-produce/) has taken place. + +{% include_cached /knep/encrypt-decrypt-diagram.md %} + +{% include_cached /knep/how-encrypt-works.md %} + +### Key sources + +{% include_cached /knep/key-sources.md name=page.name %} + diff --git a/app/_event_gateway_policies/encrypt/index.md b/app/_event_gateway_policies/encrypt/index.md index bf678492d1..69ca2e62e3 100644 --- a/app/_event_gateway_policies/encrypt/index.md +++ b/app/_event_gateway_policies/encrypt/index.md @@ -76,4 +76,4 @@ This policy runs during the [produce phase](/event-gateway/entities/policy/#phas ### Key sources -{% include_cached /knep/key-sources.md name=page.name %} \ No newline at end of file +{% include_cached /knep/key-sources.md name=page.name %} diff --git a/app/_event_gateway_policies/modify-headers/index.md b/app/_event_gateway_policies/modify-headers/index.md index c677e91688..56196a492d 100644 --- a/app/_event_gateway_policies/modify-headers/index.md +++ b/app/_event_gateway_policies/modify-headers/index.md @@ -37,6 +37,9 @@ icon: graph.svg The Modify Headers policy can set or remove headers on requests. +{:.warning} +> Headers prefixed with `kong/` are reserved for {{site.event_gateway_short}} internal use. The Modify Headers policy doesn't prevent you from reading, writing, or modifying them, but doing so isn't recommended and can interfere with {{site.event_gateway_short}} behavior. For the full list of reserved headers, see the [{{site.event_gateway}} headers reference](/event-gateway/headers/). + ## Use cases Common use cases for the Modify Headers policy: diff --git a/app/_how-tos/event-gateway/configure-topic-aliases.md b/app/_how-tos/event-gateway/configure-topic-aliases.md new file mode 100644 index 0000000000..7ded10fdc9 --- /dev/null +++ b/app/_how-tos/event-gateway/configure-topic-aliases.md @@ -0,0 +1,251 @@ +--- +title: Configure topic aliases with {{site.event_gateway}} +content_type: how_to +breadcrumbs: + - /event-gateway/ +min_version: + event-gateway: '1.2.0' +permalink: /event-gateway/configure-topic-aliases/ + +products: + - event-gateway + +works_on: + - konnect + +tags: + - event-gateway + - kafka + +description: "Use topic aliases to expose backend Kafka topics under user-friendly names." + +tldr: + q: How can I expose Kafka topics under different names? + a: | + Use topic aliases on a virtual cluster to map client-visible names to backend topic names: + 1. Create a backend cluster connected to your Kafka brokers. + 1. Create a virtual cluster with `topic_aliases` that map friendly names to backend topics. + 1. Create a listener to route traffic to the virtual cluster. + 1. Clients produce and consume using the alias names. + +tools: + - konnect-api + +prereqs: + inline: + - title: Install kafkactl + position: before + include_content: knep/kafkactl + - title: Start a local Kafka cluster + position: before + include_content: knep/docker-compose-start + - title: Configure a kafkactl context + position: before + include_content: knep/kafka-context + +related_resources: + - text: Productize Kafka topics with {{site.event_gateway}} + url: /event-gateway/productize-kafka-topics/ + - text: "{{site.event_gateway_short}} Control Plane API" + url: /api/konnect/event-gateway/ +--- + +## Overview + +Topic aliases let you expose backend Kafka topics under different, client-friendly names. +This is useful when backend topics follow internal naming conventions (like `team-alpha-orders-v2`) that you don't want to expose to clients. + +{% mermaid %} +flowchart LR + A[Kafka client] -->|produces/consumes to 'orders'| B[{{site.event_gateway_short}}] + B -->|resolves to 'team-alpha-orders-v2'| C[Kafka cluster] +{% endmermaid %} + +### Supported operations + +Aliases are a read-only abstraction over physical topics: + +* **Allowed**: produce, fetch, list offsets, consumer group operations, and metadata (`ListTopics`). Both the alias and the original backend topic name appear in metadata responses. +* **Rejected with `InvalidTopicException`**: `CreateTopics`, `DeleteTopics`, `CreatePartitions`, `DeleteRecords`, `AlterPartitionReassignments`, and `ElectLeaders`. Modifying a physical topic through an alias would be surprising because other aliases or clients may also depend on it. + +ACLs are evaluated on the name the client uses, before alias resolution. An ACL on the backend topic does not automatically grant access to its aliases, and vice versa. Each alias name must be granted access explicitly. With `acl_mode: enforce_on_gateway`, a new alias with no matching ACL is blocked by default. + +## Create Kafka topics + +Create the backend topics that we'll expose through aliases: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct create topic \ + team-alpha-orders-v2 analytics-raw-clicks +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +## Create a backend cluster + +{% include knep/create-backend-cluster.md insecure=true %} + +## Create a virtual cluster with topic aliases + +Create a virtual cluster that maps friendly alias names to the backend topics: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: aliased_cluster + dns_label: aliased + destination: + id: $BACKEND_CLUSTER_ID + authentication: + - type: anonymous + acl_mode: passthrough + topic_aliases: + - alias: orders + topic: team-alpha-orders-v2 + - alias: clicks + topic: analytics-raw-clicks +extract_body: + - name: id + variable: VC_ID +capture: + - variable: VC_ID + jq: ".id" +{% endkonnect_api_request %} + + +Clients connecting to this virtual cluster will see `orders` and `clicks` as topic names. The original backend names (`team-alpha-orders-v2`, `analytics-raw-clicks`) also remain accessible. + +## Create a listener + +Create a listener with a port forwarding policy to route traffic to the virtual cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners +status_code: 201 +method: POST +body: + name: alias_listener + addresses: + - 0.0.0.0 + ports: + - 19092-19095 +extract_body: + - name: id + variable: LISTENER_ID +capture: + - variable: LISTENER_ID + jq: ".id" +{% endkonnect_api_request %} + + +Create the port mapping policy: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies +status_code: 201 +method: POST +body: + type: forward_to_virtual_cluster + name: forward_to_aliased_cluster + config: + type: port_mapping + advertised_host: localhost + destination: + id: $VC_ID +{% endkonnect_api_request %} + + +## Add a virtual cluster context to kafkactl + +Extend `kafkactl.yaml` with a `vc` context that points at the listener you just created: + + +{% validation custom-command %} +command: | + cat < kafkactl.yaml + contexts: + direct: + brokers: + - localhost:9095 + - localhost:9096 + - localhost:9094 + vc: + brokers: + - localhost:19092 + EOF +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +## Validate + +### List topics through the virtual cluster + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc list topics +expected: + return_code: 0 + message: orders +render_output: false +{% endvalidation %} + +You should see both the aliases and the original backend topic names: + +```sh +TOPIC PARTITIONS REPLICATION FACTOR +analytics-raw-clicks 1 1 +clicks 1 1 +orders 1 1 +team-alpha-orders-v2 1 1 +``` +{:.no-copy-code} + + +### Produce via alias, consume from backend + +Produce a message using the alias name `orders`: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc produce orders --value='{"id": 123, "item": "widget"}' +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +Consume from the backend topic `team-alpha-orders-v2` directly to verify the message arrived: + + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct consume team-alpha-orders-v2 --from-beginning --exit +expected: + return_code: 0 + message: '{"id": 123, "item": "widget"}' +render_output: false +{% endvalidation %} + +You should see: + +```sh +{"id": 123, "item": "widget"} +``` +{:.no-copy-code} + + +The message produced to the alias `orders` is stored in the backend topic `team-alpha-orders-v2`. diff --git a/app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md b/app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md new file mode 100644 index 0000000000..968924a2e5 --- /dev/null +++ b/app/_how-tos/event-gateway/encrypt-kafka-message-fields-with-event-gateway.md @@ -0,0 +1,395 @@ +--- +title: Encrypt and decrypt Kafka fields in message values with {{site.event_gateway}} +permalink: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ +content_type: how_to +breadcrumbs: + - /event-gateway/ + +products: + - event-gateway + +works_on: + - konnect + +tags: + - kafka + +description: Use this tutorial to encrypt and decrypt Kafka fields in Kafka message values with {{site.event_gateway}} using a static key. + +tldr: + q: How can I encrypt and decrypt Kafka specific fields of message values with {{site.event_gateway}}? + a: | + Generate a key and create a [static key](/event-gateway/entities/static-key/) entity, then create [field encryption](/event-gateway/policies/encrypt-fields/) and [field decryption](/event-gateway/policies/decrypt-fields/) policies to enable message encryption and decryption. + +tools: + - konnect-api + +prereqs: + inline: + - title: Install kafkactl + position: before + content: | + Install [kafkactl](https://github.com/deviceinsight/kafkactl?tab=readme-ov-file#installation). You'll need it to interact with Kafka clusters. + + - title: Start a local Kafka cluster + position: before + include_content: knep/docker-compose-start + +cleanup: + inline: + - title: Clean up {{site.event_gateway}} resources + include_content: cleanup/products/event-gateway + icon_url: /assets/icons/gateway.svg + +related_resources: + - text: "{{site.event_gateway_short}} Control Plane API" + url: /api/konnect/event-gateway/ + - text: Event Gateway + url: /event-gateway/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt Fields policy + url: /event-gateway/policies/encrypt-fields/ + - text: Decrypt Fields policy + url: /event-gateway/policies/decrypt-fields/ + - text: Encrypt and decrypt Kafka messages + url: /event-gateway/encrypt-kafka-messages-with-event-gateway/ +--- + +{:.info} +> If you need to encrypt and decrypt whole Kafka messages instead of specific fields, use the Decrypt and Encrypt policies. +See [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/) for a complete how-to guide. + +## Configure a Kafka cluster + +Now that we've configured the proxy, let's make sure the Kafka cluster is ready. + +In your local environment, set up the `kafkactl.yaml` config file for your Kafka cluster: + +{% validation custom-command %} +command: | + cat < kafkactl.yaml + contexts: + direct: + brokers: + - localhost:9095 + - localhost:9096 + - localhost:9094 + vc: + brokers: + - localhost:19092 + EOF +expected: + return_code: 0 +render_output: false +{% endvalidation %} + +## Add a backend cluster + +Run the following command to create a new backend cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters +status_code: 201 +method: POST +body: + name: default_backend_cluster + bootstrap_servers: + - kafka1:9092 + - kafka2:9092 + - kafka3:9092 + authentication: + type: anonymous + insecure_allow_anonymous_virtual_cluster_auth: true + tls: + enabled: false +extract_body: + - name: id + variable: BACKEND_CLUSTER_ID +capture: + - variable: BACKEND_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Add a virtual cluster + +Run the following command to create a new virtual cluster associated with our backend cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: example_virtual_cluster + destination: + id: $BACKEND_CLUSTER_ID + dns_label: vcluster-1 + authentication: + - type: anonymous + acl_mode: passthrough +extract_body: + - name: id + variable: VIRTUAL_CLUSTER_ID +capture: + - variable: VIRTUAL_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + + +## Add a listener + +A [listener](/event-gateway/entities/listener/) represents hostname-port or IP-port combinations that connect to TCP sockets. +In this example, we're going to use port mapping, so we need to expose a range of ports. + +Run the following command to create a new listener: + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners +status_code: 201 +method: POST +body: + name: example_listener + addresses: + - 0.0.0.0 + ports: + - 19092-19095 +extract_body: + - name: id + variable: LISTENER_ID +capture: + - variable: LISTENER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Add a listener policy + +The listener needs a policy to tell it how to process requests and what to do with them. +In this example, we're going to use the [Forward to Virtual Cluster](/event-gateway/policies/forward-to-virtual-cluster/) policy, +which will forward requests based on a defined mapping to our virtual cluster. + +Run the following command to add the listener policy: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies +status_code: 201 +method: POST +body: + type: forward_to_virtual_cluster + name: forward + config: + type: port_mapping + advertised_host: localhost + destination: + id: $VIRTUAL_CLUSTER_ID +{% endkonnect_api_request %} + + +For demo purposes, we're using port mapping, which assigns each Kafka broker to a dedicated port on the {{site.event_gateway_short}}. +In production, we recommend using [SNI routing](/event-gateway/architecture/#hostname-mapping) instead. + +## Generate a key + +Use OpenSSL to generate the key that will be used to encrypt and decrypt messages: + + +{% env_variables %} +MY_KEY: $(openssl rand -base64 32) +{% endenv_variables %} + + +## Add a static key + +Run the following command to create a new [static key](/event-gateway/entities/static-key/) named `my-key` with the key we generated: + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/static-keys +status_code: 201 +method: POST +body: + name: my-key + value: $MY_KEY +{% endkonnect_api_request %} + + +## Create a Schema Validation produce policy + +Create a [Schema Validation policy](/event-gateway/policies/schema-validation-produce/) that validates that all produced values are JSON encoded. + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies +status_code: 201 +method: POST +body: + type: schema_validation + name: produce_validate_json + config: + type: json + value_validation_action: reject +extract_body: + - name: id + variable: PRODUCE_SCHEMA_VALIDATION_ID +capture: + - variable: PRODUCE_SCHEMA_VALIDATION_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `value_validation_action: reject` setting ensures that the entire batch containing an invalid message is rejected, and the producer receives an error. +Alternatively, you can use `mark`, which passes the message to the broker but adds a `kong/sverr-value` header to flag it as invalid. + +## Add a field encryption policy + +Use the following command to create a [field encryption policy](/event-gateway/policies/encrypt-fields/) to enable encryption of one field in the JSON value: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies +status_code: 201 +method: POST +body: + name: encrypt-fields-static-key + parent_policy_id: $PRODUCE_SCHEMA_VALIDATION_ID + type: encrypt_fields + config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: static + key: + name: my-key +{% endkonnect_api_request %} + + +## Create a Schema Validation consume policy + +Create a [Schema Validation policy](/event-gateway/policies/schema-validation-consume/) that validates that all consumed values are JSON encoded. + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + type: schema_validation + name: consume_validate_json + config: + type: json + value_validation_action: mark +extract_body: + - name: id + variable: CONSUME_SCHEMA_VALIDATION_ID +capture: + - variable: CONSUME_SCHEMA_VALIDATION_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `value_validation_action: mark` passes the message to the broker but adds a `kong/sverr-value` header to flag it as invalid. + +## Add a field decryption policy + +Use the following command to create a [field decryption policy](/event-gateway/policies/decrypt-fields/) to enable decryption of one field in the JSON value: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + name: decrypt-fields-static-key + parent_policy_id: $CONSUME_SCHEMA_VALIDATION_ID + type: decrypt_fields + config: + failure_mode: error + key_sources: + - type: static + decrypt_fields: + paths: + - match: "personal.ssn" +{% endkonnect_api_request %} + + +## Validate + +Let's check that the encryption/decryption works. +First, create a topic using the `direct` context, which is a direct connection to our Kafka cluster: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct create topic my-test-topic +expected: + message: "topic created: my-test-topic" + return_code: 0 +render_output: false +{% endvalidation %} + +Produce a message using the `vc` context which should encrypt the message: +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc produce my-test-topic --value='{"personal": {"ssn": "100-00-00001"}}' +expected: + message: "message produced (partition=0 offset=0)" + return_code: 0 +render_output: false +{% endvalidation %} + + +You should see the following response: +```shell +message produced (partition=0 offset=0) +``` +{:.no-copy-code} + +Now let's verify that the message was encrypted by consuming the message directly: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct consume my-test-topic --exit --output json --from-beginning --print-headers +expected: + message: '"kong/enc": "\u0000\u0004\u0000-static://' + return_code: 0 +render_output: false +{% endvalidation %} + +You should see the following response: +```json +{ + "Partition": 0, + "Offset": 0, + "Headers": { + "kong/enc": "\u0000\u0004\u0000-static://" + }, + "Value": "{\"personal\":{\"ssn\":\"AHry69Jl4oJzafOlu/xOjVa37hpfYTAVXoAolj94NoBQSKz7dkEF/gg=\"}}" +} +``` +{:.no-copy-code} + +The field encryption policy appends a `kong/enc` header to each message. This header identifies the encryption key by its ID. + +Now let's verify that the field decryption policy works by consuming the message through the virtual cluster: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc consume my-test-topic --from-beginning --exit +expected: + message: '{"personal": {"ssn": "100-00-00001"}}' + return_code: 0 +render_output: false +{% endvalidation %} + +The output should look like this, with the value decrypted: + +```json +{"personal": {"ssn": "100-00-00001"}} +``` +{:.no-copy-code} + diff --git a/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md b/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md index 570dfb9f65..00084e851d 100644 --- a/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md +++ b/app/_how-tos/event-gateway/encrypt-kafka-messages-with-event-gateway.md @@ -52,8 +52,13 @@ related_resources: url: /event-gateway/policies/encrypt/ - text: Decrypt policy url: /event-gateway/policies/decrypt/ + - text: Encrypt and decrypt Kafka message fields + url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ --- +{:.info} +> If you need to encrypt and decrypt Kafka message fields instead of entire messages, use the Decrypt Fields and Encrypt Fields policies. +See [Encrypt and decrypt Kafka message fields](/event-gateway/encrypt-kafka-message-fields-with-event-gateway/) for a complete how-to guide. ## Configure a Kafka cluster diff --git a/app/_indices/event-gateway.yaml b/app/_indices/event-gateway.yaml index ad83777204..64b6abd521 100644 --- a/app/_indices/event-gateway.yaml +++ b/app/_indices/event-gateway.yaml @@ -17,7 +17,7 @@ sections: items: - title: Get started with Event Gateway description: | - Get started with {{site.event_gateway}} by setting up a demo {{site.konnect_short_name}} control plane and data plane, + Get started with {{site.event_gateway}} by setting up a demo {{site.konnect_short_name}} control plane and data plane, then configuring a backend cluster, virtual cluster, listener, and policies with the {{site.event_gateway}} API. url: /event-gateway/get-started/ - title: "Event Gateway Entities" @@ -44,6 +44,7 @@ sections: - path: /event-gateway/configure-sasl-plain-backend-cluster-auth/ - path: /event-gateway/configure-mtls-backend-cluster-auth/ - path: /event-gateway/validate-avro-messages-with-schema-registry/ + - path: /event-gateway/configure-topic-aliases/ - title: "References" items: - title: Event Gateway OpenAPI specification @@ -52,4 +53,5 @@ sections: - path: /event-gateway/configuration/ - path: /event-gateway/expressions/ - path: /event-gateway/metrics/ + - path: /event-gateway/headers/ - path: /event-gateway/known-limitations/ diff --git a/app/_landing_pages/event-gateway.yaml b/app/_landing_pages/event-gateway.yaml index a878dd69fc..7798e1d9c4 100644 --- a/app/_landing_pages/event-gateway.yaml +++ b/app/_landing_pages/event-gateway.yaml @@ -268,10 +268,11 @@ rows: - outcome: | Demonstrate compliance with data privacy regulations through auditable, enforceable controls feature: | - [Encryption policy](/event-gateway/policies/encrypt/) and - [Decryption policy](/event-gateway/policies/decrypt/) + [Encryption policy](/event-gateway/policies/encrypt/), [Decryption policy](/event-gateway/policies/decrypt/), + [Encrypt Fields policy](/event-gateway/policies/encrypt-fields/), [Decrypt Fields policy](/event-gateway/policies/decrypt-fields/) guide: | [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/) + or [encrypt and decrypt Kafka message fields](/event-gateway/encrypt-kafka-message-fields-with-event-gateway/) - usecase: | **Simplify your Kafka deployment** because it's too complicated and expensive outcomes: @@ -414,6 +415,22 @@ rows: url: /event-gateway/entities/ align: end + - blocks: + - type: card + config: + title: "Metrics and headers" + description: | + Reference for the metrics and headers added or exposed by {{site.event_gateway}} + icon: /assets/icons/document-list.svg + ctas: + - text: Metrics reference + url: /event-gateway/metrics/ + align: end + - text: Headers reference + url: /event-gateway/headers/ + align: end + + - blocks: - type: card config: diff --git a/app/_references/event-gateway/1.2/headers.md b/app/_references/event-gateway/1.2/headers.md new file mode 100644 index 0000000000..6d94d287cd --- /dev/null +++ b/app/_references/event-gateway/1.2/headers.md @@ -0,0 +1,63 @@ +--- +# **Auto-generated** - Do not edit manually. See https://github.com/kong-gateway/event-gateway/blob/main/api/headers.md + +title: "{{site.event_gateway}} headers" + +description: Reference for all headers added or interpreted by {{site.event_gateway}}. + +related_resources: + - text: "{{site.event_gateway}}" + url: /event-gateway/ + - text: "{{site.event_gateway}} metrics" + url: /event-gateway/metrics/ +--- + +This document lists every Kafka record header {{site.event_gateway}} may add or interpret. + +## enc + +### `kong/enc` + +Encryption metadata: which of record key/value is encrypted and the key id(s) used. Also carries the (deduplicated) list of key ids referenced by per-field encrypted payloads when the `field_encryption` sub-policy is configured (see MADR 046). The `decrypt` and `field_decryption` policies read this to know which key(s) to fetch. + +|Field |Value | +|:------------|:---------| +|Encoding |`binary` (schema: `EncryptionMetadata`)| +|Max size |_unbounded_| +|Producer |encrypt policy / field_encryption sub-policy| +|Consumer |decrypt policy / field_decryption sub-policy| +|Direction |`both`| +|Visibility |`internal`| + +## policy + +### `kong/policy-failure-{konnect_id}` + +Policy failure marker, written when a policy's failure mode is `mark`. The placeholder is the Konnect id of the failing policy; the value is the failure reason text. + +|Field |Value | +|:------------|:---------| +|Encoding |`utf8`| +|Max size |512 bytes| +|Producer |policy framework (mark failure mode)| +|Consumer |external (downstream consumer)| +|Direction |`both`| +|Visibility |`external`| + +## sverr + +### `kong/sverr-{part}` + +{:.info} +> **Deprecated.** Replaced by the unified policy-failure marker header `kong/policy-failure-{konnect_id}`. This header will be removed in the future. + +Legacy schema-validation failure marker. The placeholder is `key` or `value`; the value is the producer's client id. + +|Field |Value | +|:------------|:---------| +|Encoding |`utf8`| +|Max size |_unbounded_| +|Producer |schema_validation policy (mark_record_on_failure)| +|Consumer |external (downstream consumer)| +|Direction |`both`| +|Visibility |`external`| diff --git a/app/_references/event-gateway/1.2/metrics.md b/app/_references/event-gateway/1.2/metrics.md new file mode 100644 index 0000000000..6c8b0b94a1 --- /dev/null +++ b/app/_references/event-gateway/1.2/metrics.md @@ -0,0 +1,673 @@ +--- +# **Auto-generated** - Do not edit manually. See https://github.com/kong-gateway/event-gateway/blob/main/api/metrics.md + +title: "{{site.event_gateway}} metrics" + +description: Reference for all metrics exposed by {{site.event_gateway}}. + +related_resources: + - text: "{{site.event_gateway}}" + url: /event-gateway/ + - text: Set up observability for {{site.event_gateway_short}} + url: /how-to/event-gateway/configure-observability-with-otel/ + - text: "{{site.event_gateway}} headers" + url: /event-gateway/headers/ +--- + + + +## Config + +### `kong.keg.config.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Count of errors when loading the config received from the control plane + +**Labels:** + +No labels documented. + +### `kong.keg.config.loaded` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Version number of the configuration loaded from the control plane. Updated each time the control plane pushes a new config. + +**Labels:** + +No labels documented. + +## Kafka + +### `kong.keg.kafka.acl.attempts` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Counts the results of every ACL attempt + +**Labels:** + +- `kong.keg.acl.resource_type`: The type of Kafka resource being accessed (Possible values: `transactional_id`, `group`, `topic`, `cluster`) +- `kong.keg.result`: The result of the ACL check (allowed or denied) (Possible values: `allowed`, `denied`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.auth.virtual_cluster.processing.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent validating client credentials against virtual cluster auth rules. Passthrough rules are always considered successful even if the backend cluster rejects the authentication + +**Labels:** + +- `kong.keg.auth.mechanism`: The authentication mechanism used by the client +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.keg.auth.mediation`: How the proxy mediates authentication between the client and the backend cluster. Empty when no rule matched the requested mechanism. (Possible values: `passthrough`, `validate_forward`, `terminate`, ``) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.backend.connection.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of connection errors to the backend cluster + +**Labels:** + +- `kong.keg.connection.error.origin`: The origin of the connection error (Possible values: `io`, `peer`, `local`) + +### `kong.keg.kafka.backend.roundtrip.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent communicating with backend cluster (send request and receive response) + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.connection.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of proxied connections that resulted in an error + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.listener.id`: The Konnect listener identifier +- `kong.konnect.listener.name`: The Konnect listener name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.connections` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** The number of active proxied connections + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.listener.id`: The Konnect listener identifier +- `kong.konnect.listener.name`: The Konnect listener name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.decrypt.attempts` + +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of attempts to decrypt records. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.encrypt.attempts` + +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of attempts to encrypt records. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.kscheme.attempts` + +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of attempts to run kscheme scripts. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.metadata.update.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time to update the metadata from the backend broker + +**Labels:** + +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.namespace.topic.conflict` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Indicates whether the namespace topic mapping encountered conflicts (1) or not (0). + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name + +### `kong.keg.kafka.policy.condition.failures` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of times the policy condition failed to execute due to an error + +**Labels:** + +No labels documented. + +### `kong.keg.kafka.policy.invocation.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time to process a policy + +**Labels:** + +- `kong.keg.failure_mode`: The configured policy failure mode (Possible values: `error`, `reject`, `passthrough`, `mark`, `skip`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.policy.invocations` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of policy invocation for policies. This includes both successful and failed invocations + +**Labels:** + +- `kong.keg.failure_mode`: The configured policy failure mode (Possible values: `error`, `reject`, `passthrough`, `mark`, `skip`) +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.proxy.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The end-to-end time for the entire proxy operation + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.request.processing.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent processing the received request before forwarding it to the backend cluster + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.request.received` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of requests coming from the client + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.request.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of requests sent to the backend broker + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.processing.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time spent processing the received response before forwarding it to the client + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.received` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of responses received from the backend broker + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.received.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of response messages received from the backend that contain at least one error. The error_code label represents the lowest error in the response + +**Labels:** + +- `kong.keg.kafka.error_code`: The lowest error code in the response +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of responses sent by the proxy to the client + +**Labels:** + +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.response.sent.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of response messages sent back to the client that contain at least one error. + +**Labels:** + +- `kong.keg.kafka.error_code`: The lowest error code in the response +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.schema.validation.attempts` + +{:.info} +> **Deprecated.** Use `kong.keg.kafka.policy.invocations` instead. + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** The number of attempts to validate schema. This includes both successful and failed calls + +**Labels:** + +- `kong.keg.record.part`: The part of the record (key or value) (Possible values: `key`, `value`) +- `kong.keg.result`: The result of the operation (success or failure) (Possible values: `success`, `fail`) +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name +- `kong.konnect.backend_cluster.id`: The Konnect backend cluster identifier +- `kong.konnect.backend_cluster.name`: The Konnect backend cluster name +- `kong.keg.component`: The component name + +### `kong.keg.kafka.topic_alias.conflict` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Emitted (value=1) when a topic alias shadows a physical topic name. + +**Labels:** + +- `kong.keg.kafka.topic_alias.name`: The alias name that caused the conflict +- `kong.keg.kafka.topic_alias.shadowed_topic`: The physical topic name that is shadowed by the alias +- `kong.konnect.virtual_cluster.id`: The Konnect virtual cluster identifier +- `kong.konnect.virtual_cluster.name`: The Konnect virtual cluster name + +## Konnect + +### `kong.keg.konnect.analytics.bytes.sent` + +{:.info} +> **Deprecated.** Use `kong.keg.konnect.analytics.sent` instead: the standard unit `By` belongs in metric metadata, not in the name. + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Total number of analytics bytes sent in binary websocket messages to the analytics endpoint + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.messages.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Total number of analytics messages sent to the analytics endpoint + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.queue.dropped` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Number of events dropped from the queue because the max queue size was reached + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.queue.events` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Total number of events added to the queue + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.sent` + +|Type |Unit | +|:---------|:---------| +|Counter |`by` | + + +**Description:** Total bytes sent in binary websocket messages to the analytics endpoint + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.analytics.websocket.errors` + +|Type |Unit | +|:---------|:---------| +|Counter |N/A | + + +**Description:** Number of times an error occurred on the analytics websocket connection while sending or receiving messages + +**Labels:** + +No labels documented. + +### `kong.keg.konnect.request.duration` + +|Type |Unit | +|:---------|:---------| +|Histogram |`seconds` | + + +**Description:** The time sending and receiving the response to a request to the upstream broker + +**Labels:** + +- `kong.keg.konnect.api`: The konnect api operation being performed (Possible values: `fetch_config`, `update_dp_state`) +- `error.type`: The error type encountered on executing http request. Absent if a request was successful. (Possible values: `timeout`, `connect`, `unknown`) +- `http.response.status_code`: The status code of an http response. Absent if a request did not succeed. + +## Lifecycle + +### `kong.keg.lifecycle.component.ready` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Is a specific component ready, the global service being ready implies that all its components are ready + +**Labels:** + +- `kong.keg.component`: The component name + +### `kong.keg.lifecycle.service.healthy` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Is the service healthy + +**Labels:** + +No labels documented. + +### `kong.keg.lifecycle.service.ready` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** Is the service ready + +**Labels:** + +No labels documented. + +## Listener + +### `kong.keg.listener.connections.limit` + +|Type |Unit | +|:---------|:---------| +|Gauge |N/A | + + +**Description:** The number of allowed connections to the listener + +**Labels:** + +No labels documented. + +## Common Labels + +The following labels are commonly used across multiple metrics: + +### Resource Identification + +- `messaging.destination.name`: Kafka topic name +### Konnect Integration + +- `kong.keg.policy.type`: Policy type in Konnect +- `kong.konnect.policy.id`: Policy ID in Konnect +- `kong.konnect.policy.name`: Policy name in Konnect +- `kong.konnect.listener.id`: Listener ID in Konnect +- `kong.konnect.listener.name`: Listener name in Konnect +### Operations + +- `kong.keg.policy.chain_type`: Type of policy chain (e.g., produce, consume) +- `result`: Result of operation (success, fail, allowed, denied) +- `kong.keg.record.part`: Part being processed (key, value) + +- `origin`: Connection error origin (io, peer, local) + +### HTTP/Network + +- `status_code`: HTTP status code +- `error_code`: Kafka error code +- `api_key`: Kafka API key + + \ No newline at end of file diff --git a/app/event-gateway/entities/policy.md b/app/event-gateway/entities/policy.md index 877d75751a..3cf225ed58 100644 --- a/app/event-gateway/entities/policy.md +++ b/app/event-gateway/entities/policy.md @@ -159,10 +159,12 @@ rows: - parent: "[Schema Validation produce](/event-gateway/policies/schema-validation-produce/)" nested: | * [Modify Headers](/event-gateway/policies/modify-headers/) + * [Encrypt Fields](/event-gateway/policies/encrypt-fields/) - parent: "[Schema Validation consume](/event-gateway/policies/schema-validation-consume/)" nested: | * [Modify Headers](/event-gateway/policies/modify-headers/) * [Skip Records](/event-gateway/policies/skip-record/) + * [Decrypt Fields](/event-gateway/policies/decrypt-fields/) {% endtable %} diff --git a/app/event-gateway/entities/virtual-cluster.md b/app/event-gateway/entities/virtual-cluster.md index 6ecd7d86c2..765058c093 100644 --- a/app/event-gateway/entities/virtual-cluster.md +++ b/app/event-gateway/entities/virtual-cluster.md @@ -14,6 +14,8 @@ related_resources: url: /event-gateway/entities/backend-cluster/ - text: "Listeners" url: /event-gateway/entities/listener/ + - text: "How-to: Configure topic aliases" + url: /event-gateway/configure-topic-aliases/ tools: - konnect-api @@ -318,6 +320,111 @@ You can also pass an exact list of consumer groups as an array: ] ``` +## Topic aliases {% new_in 1.2 %} + +Topic aliases let you expose a backend Kafka topic under a different, client-facing name. +Clients connecting to the virtual cluster see the alias, while {{site.event_gateway_short}} transparently routes requests to the original backend topic. + +This is useful when you want to: +* Expose internally-named topics (like `team-alpha-orders-v2`) under client-friendly names (like `orders`) +* Migrate clients to renamed backend topics without updating client configuration +* Provide multiple client-facing names for the same backend topic + +Unlike [namespaces](#namespaces), which apply a prefix to every topic and consumer group, topic aliases are explicit one-to-one mappings between an alias and a backend topic. +Both the alias and the original backend topic name remain accessible through the virtual cluster. + +### Map a friendly name to a backend topic + +Define topic aliases on the virtual cluster using the `topic_aliases` field: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: example-virtual-cluster + destination: + name: example-backend-cluster + authentication: + - type: anonymous + dns_label: virtual-cluster-1 + acl_mode: passthrough + topic_aliases: + - alias: orders + topic: team-alpha-orders-v2 + - alias: clicks + topic: analytics-raw-clicks +{% endkonnect_api_request %} + + +In this example, clients can produce and consume to `orders` and `clicks`. {{site.event_gateway_short}} resolves these to the backend topics `team-alpha-orders-v2` and `analytics-raw-clicks`. + +### Supported operations + +Aliases are a read-only abstraction over physical topics. Read and write operations are forwarded to the backend topic, but topic-modifying operations are rejected. + + +{% table %} +columns: + - title: Operation + key: operation + - title: Behavior + key: behavior +rows: + - operation: "Produce, fetch, list offsets, consumer group operations" + behavior: "Allowed. Requests reference the alias name and are transparently resolved to the backend topic." + - operation: "Metadata (`ListTopics`)" + behavior: "Allowed. Both the alias and the original backend topic name appear in the response." + - operation: "`CreateTopics`, `DeleteTopics`, `CreatePartitions`, `DeleteRecords`, `AlterPartitionReassignments`, `ElectLeaders`" + behavior: "Rejected with `InvalidTopicException` when the request references an alias. This avoids a client unintentionally modifying a physical topic that other aliases or clients depend on." +{% endtable %} + + +[ACLs](/event-gateway/policies/acl/) are evaluated on the name the client uses, before alias resolution. An ACL on the backend topic does not automatically grant access to its aliases, and vice versa. Operators must configure ACLs for each alias name explicitly. With `acl_mode: enforce_on_gateway` (deny-by-default), a new alias with no matching ACL is blocked. + +### Interaction with namespaces + +The `topic` field in `topic_aliases` references namespace-visible names, not raw backend topic names. In the request path, aliases resolve *after* policies but *before* the [namespace](#namespaces) transformer: + +``` +client → policies → alias resolution → namespace → backend +``` +{:.no-copy-code} + +If the virtual cluster has a namespace, the alias must target a name that the namespace already exposes. For example, with `namespace.mode: hide_prefix` and `prefix: analytics_`, `topic: foo` resolves to the backend topic `analytics_foo`. + +A backend topic that the namespace doesn't expose (not matching the prefix and not listed in `additional_topics`) can't be aliased directly. Expose it through the namespace first: + +```yaml +namespace: + mode: hide_prefix + prefix: analytics_ + additional_topics: + - type: exact_list + list: + - team-alpha-orders-v2 +topic_aliases: + - alias: orders + topic: team-alpha-orders-v2 # valid because additional_topics exposes it +``` + +### Policy context + +[Policies](/event-gateway/entities/policy/) attached to a virtual cluster see the alias name, not the backend topic name. In a [CEL match expression](/event-gateway/expressions/), `topic.name` evaluates to the alias name. Policies that match on a backend topic name don't automatically apply to its aliases. Match the alias name explicitly: + +```yaml +condition: topic.name == "orders" +``` + +### Consumer groups + +Avoid consuming from both an alias and its backend topic with the same consumer group. +{{site.event_gateway_short}} treats the alias and the backend topic as distinct topic names that share the same underlying partitions. Mixing them in one consumer group leaves offset commits and partition assignment undefined. +Pick one name per consumer group and stick with it. + +For a full walkthrough, see [Configure topic aliases](/event-gateway/configure-topic-aliases/). + ## Virtual cluster policies Virtual clusters can be modified by policies, which let you do things like modify headers, encrypt and decrypt records, validate record schemas, and much more. diff --git a/app/event-gateway/expressions.md b/app/event-gateway/expressions.md index baa1624454..1bbfc7074f 100644 --- a/app/event-gateway/expressions.md +++ b/app/event-gateway/expressions.md @@ -156,6 +156,79 @@ rows: * `condition` field in Produce and Consume policies used as children of Schema Validation example: | `record.value.validated == true` + - variable: | + `record.value.schema.id` {% new_in 1.2 %} + type: "`uint`" + description: | + Registry-assigned schema ID. Populated when the record value is validated by a Confluent schema registry. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.id == 42` + - variable: | + `record.value.schema.version` {% new_in 1.2 %} + type: "`uint`" + description: | + Registry-assigned schema version, when returned by the registry. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.version == 3` + - variable: | + `record.value.schema.format` {% new_in 1.2 %} + type: "`string`" + description: | + `"avro"` or `"json"`. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.format == 'avro'` + - variable: | + `record.value.schema.avro.name` {% new_in 1.2 %} + type: "`string`" + description: | + Avro record name (Avro only). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.avro.name == 'User'` + - variable: | + `record.value.schema.avro.namespace` {% new_in 1.2 %} + type: "`string`" + description: | + Avro record namespace (Avro only, when declared on the schema). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.avro.namespace == 'com.example'` + - variable: | + `record.value.schema.json.title` {% new_in 1.2 %} + type: "`string`" + description: | + JSON Schema `title` (JSON only, when declared on the schema). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.json.title == 'Order'` + - variable: | + `record.value.schema.json.id` {% new_in 1.2 %} + type: "`string`" + description: | + JSON Schema `$id` (JSON only, when declared on the schema). + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.value.schema.json.id == 'https://example.com/schemas/order.json'` + - variable: | + `record.key.schema.*` {% new_in 1.2 %} + type: | + same as `record.value.schema.*` + description: | + Same shape as `record.value.schema.*`, populated when the record key has `schema_validation` configured. Sub-fields are absent when not applicable; use `has()` to test presence. + availability: | + * `condition` field in Produce and Consume policies used as children of Schema Validation + example: | + `record.key.schema.format == 'json' && record.key.schema.json.title == 'UserKey'` {% endtable %} ### Example expressions diff --git a/app/event-gateway/known-limitations.md b/app/event-gateway/known-limitations.md index a9c3593590..58abeedd9b 100644 --- a/app/event-gateway/known-limitations.md +++ b/app/event-gateway/known-limitations.md @@ -29,4 +29,17 @@ breadcrumbs: * [Compacted topics](https://docs.confluent.io/kafka/design/log_compaction.html#topic-compaction) used with policies and namespaces are untested +## Record headers + +Kafka record headers are multi-valued: the [Kafka protocol](https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) allows the same header key to appear more than once in a single record, and clients can read every occurrence. + +{{site.event_gateway_short}} models record headers as a single value per key. +When a record is processed by a policy that reads or transforms headers (for example, Encryption, Schema Validation, or Modify Headers policies), records that carry duplicate header keys are collapsed so that only the **last** value for each key is kept. +Earlier values with the same key are dropped. + +This only affects records that {{site.event_gateway_short}} decodes to apply record-level policies. +If your clients rely on multiple headers that share the same key, don't route those topics through header-transforming policies. + +See the [{{site.event_gateway}} headers reference](/event-gateway/headers/) for the headers {{site.event_gateway_short}} adds and interprets. +