diff --git a/CHANGELOG.md b/CHANGELOG.md index db2936e8..ce7dc6d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Added -- Add support for airflow 3.1.6 ([#742]). +- Add support for airflow 3.1.6 ([#742], [#757]). - Add operator versioning ([#725]). - GitSync considered for v1alpha1 and v1alpha2 - Support objectOverrides using `.spec.objectOverrides`. @@ -32,6 +32,7 @@ [#742]: https://github.com/stackabletech/airflow-operator/pull/742 [#752]: https://github.com/stackabletech/airflow-operator/pull/752 [#756]: https://github.com/stackabletech/airflow-operator/pull/756 +[#757]: https://github.com/stackabletech/airflow-operator/pull/757 ## [25.11.0] - 2025-11-07 diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index b2431a18..51572729 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -84,6 +84,20 @@ fn create_airflow_config( log_config: &AutomaticContainerLogConfig, log_dir: &str, resolved_product_image: &ResolvedProductImage, +) -> String { + if resolved_product_image.product_version.starts_with("2.") + || resolved_product_image.product_version.starts_with("3.0.") + { + create_airflow_stdlib_config(log_config, log_dir, resolved_product_image) + } else { + create_airflow_structlog_config(log_config, log_dir) + } +} + +fn create_airflow_stdlib_config( + log_config: &AutomaticContainerLogConfig, + log_dir: &str, + resolved_product_image: &ResolvedProductImage, ) -> String { let loggers_config = log_config .loggers @@ -176,3 +190,99 @@ LOGGING_CONFIG['root'] = {{ .to_python_expression(), ) } + +fn create_airflow_structlog_config( + log_config: &AutomaticContainerLogConfig, + log_dir: &str, +) -> String { + let loggers_config = log_config + .loggers + .iter() + .filter(|(name, _)| name.as_str() != AutomaticContainerLogConfig::ROOT_LOGGER) + .fold(String::new(), |mut output, (name, config)| { + let _ = writeln!( + output, + " +LOGGING_CONFIG['loggers'].setdefault('{name}', {{ 'propagate': True }}) +LOGGING_CONFIG['loggers']['{name}']['level'] = {level} +", + level = config.level.to_python_expression() + ); + output + }); + + format!( + "\ +import logging +import os +from airflow.config_templates import airflow_local_settings + +os.makedirs('{log_dir}', exist_ok=True) + +LOGGING_CONFIG = {{ + 'filters': {{ + 'mask_secrets_core': {{ + '()': 'airflow._shared.secrets_masker._secrets_masker', + }} + }}, + 'formatters': {{ + 'airflow': {{ + 'format': '%(asctime)s logLevel=%(levelname)s logger=%(name)s - %(message)s', + 'class': 'airflow.utils.log.timezone_aware.TimezoneAware', + }}, + 'json': {{ + '()': 'airflow.utils.log.json_formatter.JSONFormatter', + 'json_fields': ['asctime', 'levelname', 'message', 'name'] + }} + }}, + 'handlers': {{ + 'default': {{ + 'level': {console_log_level} + }}, + 'file': {{ + 'class': 'logging.handlers.RotatingFileHandler', + 'level': {file_log_level}, + 'formatter': 'json', + 'filename': '{log_dir}/{LOG_FILE}', + 'maxBytes': 1048576, + 'backupCount': 1 + }}, + 'task': {{ + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': '{log_dir}', + 'filters': ['mask_secrets_core'] + }} + }}, + 'loggers': {{ + 'airflow.task': {{ + 'handlers': ['task'], + 'level': logging.INFO, + 'propagate': True, + 'filters': ['mask_secrets_core'] + }} + }}, + 'root': {{ + 'handlers': ['default', 'file'], + 'level': {root_log_level}, + 'propagate': True + }} +}} +{loggers_config} +REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG +", + console_log_level = log_config + .console + .as_ref() + .and_then(|console| console.level) + .unwrap_or_default() + .to_python_expression(), + file_log_level = log_config + .file + .as_ref() + .and_then(|file| file.level) + .unwrap_or_default() + .to_python_expression(), + root_log_level = log_config.root_log_level().to_python_expression(), + ) +} diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index fda935bf..526cac11 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -38,8 +38,8 @@ def metrics_v3(role_group: str) -> None: # allow a few moments for the DAGs to be registered to all roles time.sleep(10) - rest_url = "http://airflow-webserver:8080/api/v2" - token_url = "http://airflow-webserver:8080/auth/token" + rest_url = f"http://airflow-webserver-{role_group}-headless:8080/api/v2" + token_url = f"http://airflow-webserver-{role_group}-headless:8080/auth/token" data = {"username": "airflow", "password": "airflow"} @@ -110,7 +110,7 @@ def metrics_v2(role_group: str) -> None: dag_id = "example_trigger_target_dag" dag_conf = {"message": "Hello World"} - rest_url = "http://airflow-webserver:8080/api/v1" + rest_url = f"http://airflow-webserver-{role_group}-headless:8080/api/v1" auth = ("airflow", "airflow") # allow a few moments for the DAGs to be registered to all roles diff --git a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 index 498f6db0..c60495da 100644 --- a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 @@ -49,6 +49,13 @@ data: 'filename': '/stackable/log/airflow/airflow.py.json', } + LOGGING_CONFIG['loggers']['airflow'] = { + 'level': logging.DEBUG, + } + LOGGING_CONFIG['loggers']['sqlalchemy.engine'] = { + 'level': logging.DEBUG, + } + LOGGING_CONFIG['root'] = { 'level': logging.DEBUG, 'handlers': ['file'], @@ -159,6 +166,10 @@ spec: loggers: ROOT: level: DEBUG + airflow: + level: DEBUG + sqlalchemy.engine: + level: DEBUG git-sync: console: level: INFO diff --git a/tests/templates/kuttl/logging/52-assert.yaml.j2 b/tests/templates/kuttl/logging/52-assert.yaml.j2 index 3e093e4f..35442128 100644 --- a/tests/templates/kuttl/logging/52-assert.yaml.j2 +++ b/tests/templates/kuttl/logging/52-assert.yaml.j2 @@ -14,4 +14,3 @@ commands: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" {% endif %} - diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 index 91c14877..b85052aa 100644 --- a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 @@ -10,4 +10,3 @@ commands: {% else %} - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} - diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 512e237a..c8656c40 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -9,12 +9,12 @@ dimensions: values: - 2.9.3 - 3.0.6 - - 3.1.6 + - 3.1.6,oci.stackable.tech/sandbox/airflow:3.1.6-stackable0.0.0-dev # To use a custom image, add a comma and the full name after the product version # - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev - name: airflow-latest values: - - 3.1.6 + - 3.1.6,oci.stackable.tech/sandbox/airflow:3.1.6-stackable0.0.0-dev # To use a custom image, add a comma and the full name after the product version # - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev - name: opa-latest