This directory provides a Python implementation of the Client-Side Field Level Encryption (CSFLE) demo using Confluent Platform running locally with Docker Compose.
- Docker and Docker Compose
- Python 3.7 or later
- AWS account with KMS access
We will produce personal data to a local Kafka topic in the following format:
{
"id": "1",
"name": "Anna",
"birthday": "2024-02-10",
"timestamp": "2025-02-10T19:54:21.884Z"
}The birthday field will be encrypted using CSFLE with AWS KMS. We'll then consume the data with proper credentials to
decrypt it, and simulate unauthorized access to demonstrate the security benefits.
Create a virtual environment and install dependencies:
python -m venv venv
source venv/bin/activate
pip install -r requirements.txtFor detailed instructions on setting up AWS KMS, creating an IAM user, and obtaining credentials, please refer to the AWS KMS Setup section in the parent README.
You'll need:
- AWS KMS Key ARN (e.g.,
arn:aws:kms:eu-central-1:123456789:key/xxx-xxx-xxx) - AWS Access Key ID
- AWS Secret Access Key
Copy the example environment file from the parent directory and configure it with your credentials:
cd .. && cp .env.example .env && cd pythonEdit .env with your configuration values:
| Configuration | Environment Variable | Default/Example Value |
|---|---|---|
| Kafka Topic | KAFKA_TOPIC |
csfle-demo |
| Kafka Bootstrap Servers | KAFKA_BOOTSTRAP_SERVERS |
localhost:9091 |
| Schema Registry URL | SCHEMA_REGISTRY_URL |
http://localhost:8081 |
| AWS KMS Key ARN | AWS_KMS_KEY_ID |
Your KMS Key ARN |
| AWS KMS Key Name | AWS_KMS_KEY_NAME |
csfle-demo-kek |
| AWS Access Key ID | AWS_ACCESS_KEY_ID |
Your AWS Access Key |
| AWS Secret Access Key | AWS_SECRET_ACCESS_KEY |
Your AWS Secret Key |
| Consumer Group ID | KAFKA_GROUP_ID |
csfle-demo-consumer-group |
| Auto Offset Reset | KAFKA_AUTO_OFFSET_RESET |
earliest |
⚠️ Security: Never commit the.envfile to version control as it contains sensitive credentials! 💡 Note: Confluent Platform runs locally using PLAINTEXT protocol (no SASL/SSL) for Kafka and no authentication for Schema Registry. However, AWS KMS is still used for field-level encryption.
From the parent directory, start Confluent Platform using Docker Compose:
cd ..
docker compose up -dWait a few seconds for all services to start. You can check the logs:
docker compose logs -fYou can also access Control Center to monitor the cluster.
Before running the schema registration commands, load your configuration:
# Make sure you're in the python directory
cd python
# Load environment variables from parent directory's .env file
set -a
source ../.env
set +a💡 Tip: The
set -aandset +acommands enable/disable automatic export of variables. You only need to source the environment once per shell session. Note: Exported variables only affect the current terminal session and don't persist across different terminals.
Register the Avro schema with the PII tag applied to the birthday field:
curl --location "$SCHEMA_REGISTRY_URL/subjects/$KAFKA_TOPIC-value/versions" \
--header 'Accept: application/vnd.schemaregistry.v1+json' \
--header 'Content-Type: application/json' \
--data '{
"schemaType": "AVRO",
"schema": "{ \"name\": \"PersonalData\", \"type\": \"record\", \"namespace\": \"com.csfleExample\", \"fields\": [{\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"},{\"name\": \"birthday\", \"type\": \"string\", \"confluent:tags\": [ \"PII\"]},{\"name\": \"timestamp\",\"type\": [\"string\", \"null\"]}]}"
}'💡 Note: Unlike Confluent Cloud, there's no need to create a PII tag in Schema Registry first. The local Schema Registry accepts tags directly in the schema.
Define the encryption rule for all fields tagged with PII:
curl --location "$SCHEMA_REGISTRY_URL/subjects/$KAFKA_TOPIC-value/versions" \
--header 'Accept: application/vnd.schemaregistry.v1+json' \
--header 'Content-Type: application/json' \
--data '{
"ruleSet": {
"domainRules": [
{
"name": "encryptPII",
"kind": "TRANSFORM",
"type": "ENCRYPT",
"mode": "WRITEREAD",
"tags": [
"PII"
],
"params": {
"encrypt.kek.name": "'"$AWS_KMS_KEY_NAME"'",
"encrypt.kms.key.id": "'"$AWS_KMS_KEY_ID"'",
"encrypt.kms.type": "'"$AWS_KMS_TYPE"'"
},
"onFailure": "ERROR,NONE"
}
]
}
}'💡 Tip: The pattern
"'"$VARIABLE"'"is necessary to interpolate shell variables inside JSON strings. It works by ending the single-quoted JSON string, adding a double-quoted variable, then starting the single-quoted string again.
Check that everything is registered correctly:
curl --request GET \
--url "$SCHEMA_REGISTRY_URL/subjects/$KAFKA_TOPIC-value/versions/latest" | jqYou can also verify in Control Center by navigating to Topics → Your Topic → Schema.
Ensure your environment variables are loaded (if you haven't already):
set -a
source ../.env
set +aRun the producer to send data with the encrypted birthday field:
python avro_producer.py✅ Expected output:
Producing user records to topic csfle-demo. ^C to exit.
PersonalData record b'1' successfully produced to csfle-demo [0] at offset 0
PersonalData record b'2' successfully produced to csfle-demo [0] at offset 1
PersonalData record b'3' successfully produced to csfle-demo [0] at offset 2
...
You can view the encrypted messages in Control Center - the birthday field should appear
encrypted.
Run the consumer with valid AWS credentials to see decrypted data:
python avro_consumer.py✅ Expected output (decrypted birthday):
--- Personal Data ---
ID: 1
Name: Anna
Birthday: 2025-02-10
Timestamp: 2025-02-10T15:11:42.477591+00:00
---------------------
--- Personal Data ---
ID: 2
Name: Anna
Birthday: 2024-02-10
Timestamp: 2025-02-10T15:11:42.478123+00:00
---------------------
...
Simulate a scenario where a client without access to the KEK tries to consume the encrypted data by temporarily setting invalid AWS credentials:
# Temporarily override AWS credentials with invalid values
export AWS_SECRET_ACCESS_KEY="invalid_secret_key"
# Change the consumer group ID to re-consume all the messages from the topic
export KAFKA_GROUP_ID="testing-invalid-key"
# Run the consumer - it will fail to decrypt the birthday field
python avro_consumer.py🔴 Expected output (encrypted birthday remains encrypted):
--- Personal Data ---
ID: 1
Name: Anna
Birthday: yabvlkT//S+QDAXP7idIl3wU3pHR8/2oZZA8ORovepAun1eLORo=
Timestamp: 2025-02-10T15:11:42.476313+00:00
---------------------
✨ This demonstrates that consumers without access to the KEK cannot decrypt fields protected by CSFLE
To restore your correct AWS credentials for subsequent operations:
# Re-load environment variables from parent .env to restore correct credentials
set -a
source ../.env
set +a💡 Remember: Exported variables only affect the current terminal session. If you open a new terminal, you'll need to source
.envagain.
To stop and remove all Confluent Platform containers:
cd ..
docker compose down -vThe -v flag removes volumes, which will delete all topic data and Schema Registry data.