From 8c33958f29ae5d2b5f62f03cc5644db147216576 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 9 Mar 2026 14:42:04 +0100 Subject: [PATCH 1/5] fix: Allow overriding the logging configuration in Airflow 3.1 --- rust/operator-binary/src/product_logging.rs | 101 ++++++++++++++++++++ tests/test-definition.yaml | 4 +- 2 files changed, 103 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index b2431a18..085bebb5 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -84,6 +84,20 @@ fn create_airflow_config( log_config: &AutomaticContainerLogConfig, log_dir: &str, resolved_product_image: &ResolvedProductImage, +) -> String { + if resolved_product_image.product_version.starts_with("2.") + || resolved_product_image.product_version.starts_with("3.0.") + { + create_airflow_stdlib_config(log_config, log_dir, resolved_product_image) + } else { + create_airflow_structlog_config(log_config, log_dir) + } +} + +fn create_airflow_stdlib_config( + log_config: &AutomaticContainerLogConfig, + log_dir: &str, + resolved_product_image: &ResolvedProductImage, ) -> String { let loggers_config = log_config .loggers @@ -176,3 +190,90 @@ LOGGING_CONFIG['root'] = {{ .to_python_expression(), ) } + +fn create_airflow_structlog_config( + log_config: &AutomaticContainerLogConfig, + log_dir: &str, +) -> String { + let loggers_config = log_config + .loggers + .iter() + .filter(|(name, _)| name.as_str() != AutomaticContainerLogConfig::ROOT_LOGGER) + .fold(String::new(), |mut output, (name, config)| { + let _ = writeln!( + output, + " +LOGGING_CONFIG['loggers'].setdefault('{name}', {{ 'propagate': True }}) +LOGGING_CONFIG['loggers']['{name}']['level'] = {level} +", + level = config.level.to_python_expression() + ); + output + }); + + format!( + "\ +import logging +import os +from airflow.config_templates import airflow_local_settings + +os.makedirs('{log_dir}', exist_ok=True) + +LOGGING_CONFIG = {{ + 'filters': {{ + 'mask_secrets_core': {{ + '()': 'airflow._shared.secrets_masker._secrets_masker', + }} + }}, + 'formatters': {{ + 'airflow': {{ + 'format': '%(asctime)s logLevel=%(levelname)s logger=%(name)s - %(message)s', + 'class': 'airflow.utils.log.timezone_aware.TimezoneAware', + }}, + 'json': {{ + '()': 'airflow.utils.log.json_formatter.JSONFormatter', + 'json_fields': ['asctime', 'levelname', 'message', 'name'] + }} + }}, + 'handlers': {{ + 'task': {{ + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': '{log_dir}', + 'filters': ['mask_secrets_core'] + }}, + 'file': {{ + 'class': 'logging.handlers.RotatingFileHandler', + 'level': {file_log_level}, + 'formatter': 'json', + 'filename': '{log_dir}/{LOG_FILE}', + 'maxBytes': 1048576, + 'backupCount': 1 + }} + }}, + 'loggers': {{ + 'airflow.task': {{ + 'handlers': ['task'], + 'level': logging.INFO, + 'propagate': True, + 'filters': ['mask_secrets_core'] + }} + }}, + 'root': {{ + 'handlers': ['default', 'file'], + 'level': {root_log_level}, + 'propagate': True + }} +}} +{loggers_config} +REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG +", + file_log_level = log_config + .file + .as_ref() + .and_then(|file| file.level) + .unwrap_or_default() + .to_python_expression(), + root_log_level = log_config.root_log_level().to_python_expression(), + ) +} diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 512e237a..c8656c40 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -9,12 +9,12 @@ dimensions: values: - 2.9.3 - 3.0.6 - - 3.1.6 + - 3.1.6,oci.stackable.tech/sandbox/airflow:3.1.6-stackable0.0.0-dev # To use a custom image, add a comma and the full name after the product version # - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev - name: airflow-latest values: - - 3.1.6 + - 3.1.6,oci.stackable.tech/sandbox/airflow:3.1.6-stackable0.0.0-dev # To use a custom image, add a comma and the full name after the product version # - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev - name: opa-latest From cd306fa2d6ad3b004095e4a6cee94d0ccc8a90d0 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 9 Mar 2026 14:52:05 +0100 Subject: [PATCH 2/5] chore: Update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e5b3f2c..4fb140c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Added -- Add support for airflow 3.1.6 ([#742]). +- Add support for airflow 3.1.6 ([#742, #757]). - Add operator versioning ([#725]). - GitSync considered for v1alpha1 and v1alpha2 - Support objectOverrides using `.spec.objectOverrides`. @@ -30,6 +30,7 @@ [#741]: https://github.com/stackabletech/airflow-operator/pull/741 [#742]: https://github.com/stackabletech/airflow-operator/pull/742 [#752]: https://github.com/stackabletech/airflow-operator/pull/752 +[#757]: https://github.com/stackabletech/airflow-operator/pull/757 ## [25.11.0] - 2025-11-07 From cca4d6dbeb4575cd531adb5ecedeb96377a3d168 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 9 Mar 2026 15:02:09 +0100 Subject: [PATCH 3/5] chore: Run pre-commit --- tests/templates/kuttl/logging/52-assert.yaml.j2 | 1 - tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/templates/kuttl/logging/52-assert.yaml.j2 b/tests/templates/kuttl/logging/52-assert.yaml.j2 index 3e093e4f..35442128 100644 --- a/tests/templates/kuttl/logging/52-assert.yaml.j2 +++ b/tests/templates/kuttl/logging/52-assert.yaml.j2 @@ -14,4 +14,3 @@ commands: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" {% endif %} - diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 index 91c14877..b85052aa 100644 --- a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 @@ -10,4 +10,3 @@ commands: {% else %} - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} - From bf3aa31fd002c095808fe186d17062e356ed1593 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 9 Mar 2026 15:16:34 +0100 Subject: [PATCH 4/5] chore: Fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c7a5d01..ce7dc6d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Added -- Add support for airflow 3.1.6 ([#742, #757]). +- Add support for airflow 3.1.6 ([#742], [#757]). - Add operator versioning ([#725]). - GitSync considered for v1alpha1 and v1alpha2 - Support objectOverrides using `.spec.objectOverrides`. From 50aaaf17b1c72bb440ea2e51b639526abc75cca1 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 10 Mar 2026 14:49:45 +0100 Subject: [PATCH 5/5] test(logging): Fix custom logging config --- rust/operator-binary/src/product_logging.rs | 19 ++++++++++++++----- tests/templates/kuttl/commons/metrics.py | 6 +++--- .../41-install-airflow-cluster.yaml.j2 | 11 +++++++++++ 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 085bebb5..51572729 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -236,11 +236,8 @@ LOGGING_CONFIG = {{ }} }}, 'handlers': {{ - 'task': {{ - 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', - 'formatter': 'airflow', - 'base_log_folder': '{log_dir}', - 'filters': ['mask_secrets_core'] + 'default': {{ + 'level': {console_log_level} }}, 'file': {{ 'class': 'logging.handlers.RotatingFileHandler', @@ -249,6 +246,12 @@ LOGGING_CONFIG = {{ 'filename': '{log_dir}/{LOG_FILE}', 'maxBytes': 1048576, 'backupCount': 1 + }}, + 'task': {{ + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': '{log_dir}', + 'filters': ['mask_secrets_core'] }} }}, 'loggers': {{ @@ -268,6 +271,12 @@ LOGGING_CONFIG = {{ {loggers_config} REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG ", + console_log_level = log_config + .console + .as_ref() + .and_then(|console| console.level) + .unwrap_or_default() + .to_python_expression(), file_log_level = log_config .file .as_ref() diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index fda935bf..526cac11 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -38,8 +38,8 @@ def metrics_v3(role_group: str) -> None: # allow a few moments for the DAGs to be registered to all roles time.sleep(10) - rest_url = "http://airflow-webserver:8080/api/v2" - token_url = "http://airflow-webserver:8080/auth/token" + rest_url = f"http://airflow-webserver-{role_group}-headless:8080/api/v2" + token_url = f"http://airflow-webserver-{role_group}-headless:8080/auth/token" data = {"username": "airflow", "password": "airflow"} @@ -110,7 +110,7 @@ def metrics_v2(role_group: str) -> None: dag_id = "example_trigger_target_dag" dag_conf = {"message": "Hello World"} - rest_url = "http://airflow-webserver:8080/api/v1" + rest_url = f"http://airflow-webserver-{role_group}-headless:8080/api/v1" auth = ("airflow", "airflow") # allow a few moments for the DAGs to be registered to all roles diff --git a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 index 498f6db0..c60495da 100644 --- a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 @@ -49,6 +49,13 @@ data: 'filename': '/stackable/log/airflow/airflow.py.json', } + LOGGING_CONFIG['loggers']['airflow'] = { + 'level': logging.DEBUG, + } + LOGGING_CONFIG['loggers']['sqlalchemy.engine'] = { + 'level': logging.DEBUG, + } + LOGGING_CONFIG['root'] = { 'level': logging.DEBUG, 'handlers': ['file'], @@ -159,6 +166,10 @@ spec: loggers: ROOT: level: DEBUG + airflow: + level: DEBUG + sqlalchemy.engine: + level: DEBUG git-sync: console: level: INFO