Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Added

- Add support for airflow 3.1.6 ([#742]).
- Add support for airflow 3.1.6 ([#742], [#757]).
- Add operator versioning ([#725]).
- GitSync considered for v1alpha1 and v1alpha2
- Support objectOverrides using `.spec.objectOverrides`.
Expand Down Expand Up @@ -32,6 +32,7 @@
[#742]: https://github.com/stackabletech/airflow-operator/pull/742
[#752]: https://github.com/stackabletech/airflow-operator/pull/752
[#756]: https://github.com/stackabletech/airflow-operator/pull/756
[#757]: https://github.com/stackabletech/airflow-operator/pull/757

## [25.11.0] - 2025-11-07

Expand Down
110 changes: 110 additions & 0 deletions rust/operator-binary/src/product_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ fn create_airflow_config(
log_config: &AutomaticContainerLogConfig,
log_dir: &str,
resolved_product_image: &ResolvedProductImage,
) -> String {
if resolved_product_image.product_version.starts_with("2.")
|| resolved_product_image.product_version.starts_with("3.0.")
{
create_airflow_stdlib_config(log_config, log_dir, resolved_product_image)
} else {
create_airflow_structlog_config(log_config, log_dir)
}
}

fn create_airflow_stdlib_config(
log_config: &AutomaticContainerLogConfig,
log_dir: &str,
resolved_product_image: &ResolvedProductImage,
) -> String {
let loggers_config = log_config
.loggers
Expand Down Expand Up @@ -176,3 +190,99 @@ LOGGING_CONFIG['root'] = {{
.to_python_expression(),
)
}

fn create_airflow_structlog_config(
log_config: &AutomaticContainerLogConfig,
log_dir: &str,
) -> String {
let loggers_config = log_config
.loggers
.iter()
.filter(|(name, _)| name.as_str() != AutomaticContainerLogConfig::ROOT_LOGGER)
.fold(String::new(), |mut output, (name, config)| {
let _ = writeln!(
output,
"
LOGGING_CONFIG['loggers'].setdefault('{name}', {{ 'propagate': True }})
LOGGING_CONFIG['loggers']['{name}']['level'] = {level}
",
level = config.level.to_python_expression()
);
output
});

format!(
"\
import logging
import os
from airflow.config_templates import airflow_local_settings
os.makedirs('{log_dir}', exist_ok=True)
LOGGING_CONFIG = {{
'filters': {{
'mask_secrets_core': {{
'()': 'airflow._shared.secrets_masker._secrets_masker',
}}
}},
'formatters': {{
'airflow': {{
'format': '%(asctime)s logLevel=%(levelname)s logger=%(name)s - %(message)s',
'class': 'airflow.utils.log.timezone_aware.TimezoneAware',
}},
'json': {{
'()': 'airflow.utils.log.json_formatter.JSONFormatter',
'json_fields': ['asctime', 'levelname', 'message', 'name']
}}
}},
'handlers': {{
'default': {{
'level': {console_log_level}
}},
'file': {{
'class': 'logging.handlers.RotatingFileHandler',
'level': {file_log_level},
'formatter': 'json',
'filename': '{log_dir}/{LOG_FILE}',
'maxBytes': 1048576,
'backupCount': 1
}},
'task': {{
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': '{log_dir}',
'filters': ['mask_secrets_core']
}}
}},
'loggers': {{
'airflow.task': {{
'handlers': ['task'],
'level': logging.INFO,
'propagate': True,
'filters': ['mask_secrets_core']
}}
}},
'root': {{
'handlers': ['default', 'file'],
'level': {root_log_level},
'propagate': True
}}
}}
{loggers_config}
REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG
",
console_log_level = log_config
.console
.as_ref()
.and_then(|console| console.level)
.unwrap_or_default()
.to_python_expression(),
file_log_level = log_config
.file
.as_ref()
.and_then(|file| file.level)
.unwrap_or_default()
.to_python_expression(),
root_log_level = log_config.root_log_level().to_python_expression(),
)
}
6 changes: 3 additions & 3 deletions tests/templates/kuttl/commons/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def metrics_v3(role_group: str) -> None:
# allow a few moments for the DAGs to be registered to all roles
time.sleep(10)

rest_url = "http://airflow-webserver:8080/api/v2"
token_url = "http://airflow-webserver:8080/auth/token"
rest_url = f"http://airflow-webserver-{role_group}-headless:8080/api/v2"
token_url = f"http://airflow-webserver-{role_group}-headless:8080/auth/token"

data = {"username": "airflow", "password": "airflow"}

Expand Down Expand Up @@ -110,7 +110,7 @@ def metrics_v2(role_group: str) -> None:
dag_id = "example_trigger_target_dag"
dag_conf = {"message": "Hello World"}

rest_url = "http://airflow-webserver:8080/api/v1"
rest_url = f"http://airflow-webserver-{role_group}-headless:8080/api/v1"
auth = ("airflow", "airflow")

# allow a few moments for the DAGs to be registered to all roles
Expand Down
11 changes: 11 additions & 0 deletions tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ data:
'filename': '/stackable/log/airflow/airflow.py.json',
}

LOGGING_CONFIG['loggers']['airflow'] = {
'level': logging.DEBUG,
}
LOGGING_CONFIG['loggers']['sqlalchemy.engine'] = {
'level': logging.DEBUG,
}

LOGGING_CONFIG['root'] = {
'level': logging.DEBUG,
'handlers': ['file'],
Expand Down Expand Up @@ -159,6 +166,10 @@ spec:
loggers:
ROOT:
level: DEBUG
airflow:
level: DEBUG
sqlalchemy.engine:
level: DEBUG
git-sync:
console:
level: INFO
Expand Down
1 change: 0 additions & 1 deletion tests/templates/kuttl/logging/52-assert.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ commands:
kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}"
kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}"
{% endif %}

Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ commands:
{% else %}
- script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}"
{% endif %}

4 changes: 2 additions & 2 deletions tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ dimensions:
values:
- 2.9.3
- 3.0.6
- 3.1.6
- 3.1.6,oci.stackable.tech/sandbox/airflow:3.1.6-stackable0.0.0-dev
# To use a custom image, add a comma and the full name after the product version
# - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev
- name: airflow-latest
values:
- 3.1.6
- 3.1.6,oci.stackable.tech/sandbox/airflow:3.1.6-stackable0.0.0-dev
# To use a custom image, add a comma and the full name after the product version
# - x.x.x,oci.stackable.tech/sandbox/airflow:x.x.x-stackable0.0.0-dev
- name: opa-latest
Expand Down