Add openlineage config spark submit#1
Conversation
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Greptile SummaryThis PR adds OpenLineage configuration injection into
Confidence Score: 3/5Not safe to merge as-is: the None-conf crash and the unbound-variable in lineage_run_id would both trigger in realistic usage scenarios. Three bugs touch the newly enabled code paths. First, SparkSubmitOperator.execute passes self.conf (None by default) directly into the inject functions, where iterating over None raises a TypeError. Second, the Airflow 3.0+ branch in lineage_run_id uses hasattr(context, 'dag_run') on a plain dict, which never matches, leaving dag_run unbound and causing a NameError when the fallback is needed. Third, the operator's new boolean parameters are evaluated at import time via conf.getboolean(), freezing the config values and making runtime overrides ineffective. spark_submit.py (None conf + import-time config reads) and macros.py (unbound variable / wrong dict hasattr) need the most attention before merging. Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant SparkSubmitOperator
participant CompatShim as common.compat/utils/spark.py
participant OLUtils as openlineage/utils/spark.py
participant OLListener as OpenLineageListener
participant SparkSubmitHook
User->>SparkSubmitOperator: execute(context)
alt "openlineage_inject_parent_job_info=True"
SparkSubmitOperator->>CompatShim: inject_parent_job_information_into_spark_properties(conf, ctx)
CompatShim->>OLUtils: delegates if OL provider available
OLUtils-->>SparkSubmitOperator: conf + parentJobName/Namespace/RunId
end
alt "openlineage_inject_transport_info=True"
SparkSubmitOperator->>CompatShim: inject_transport_information_into_spark_properties(conf, ctx)
CompatShim->>OLUtils: delegates if OL provider available
OLUtils->>OLListener: get_openlineage_listener().adapter...transport
OLListener-->>OLUtils: transport kind
alt "transport.kind == http"
OLUtils-->>SparkSubmitOperator: "conf + spark.openlineage.transport.*"
else "transport.kind == composite"
OLUtils-->>SparkSubmitOperator: "conf + spark.openlineage.transport.transports.name.*"
else unsupported
OLUtils-->>SparkSubmitOperator: conf unchanged, log warning
end
end
SparkSubmitOperator->>SparkSubmitHook: submit(application)
Reviews (1): Last reviewed commit: "add OpenLineage configuration injection ..." | Re-trigger Greptile |
| if self._openlineage_inject_parent_job_info: | ||
| self.log.debug("Injecting OpenLineage parent job information into Spark properties.") | ||
| self.conf = inject_parent_job_information_into_spark_properties(self.conf, context) | ||
| if self._openlineage_inject_transport_info: | ||
| self.log.debug("Injecting OpenLineage transport information into Spark properties.") | ||
| self.conf = inject_transport_information_into_spark_properties(self.conf, context) |
There was a problem hiding this comment.
None conf causes TypeError in inject functions
self.conf defaults to None (the parameter is typed dict[str, Any] | None = None). When a user creates SparkSubmitOperator(task_id="...", openlineage_inject_parent_job_info=True) without providing conf, calling inject_parent_job_information_into_spark_properties(None, context) propagates None into _is_parent_job_information_present_in_spark_properties, which immediately does for key in properties — iterating over None raises TypeError: 'NoneType' object is not iterable. Both injection paths share this defect.
| if AIRFLOW_V_3_0_PLUS: | ||
| context = task_instance.get_template_context() | ||
| if hasattr(task_instance, "dag_run"): | ||
| dag_run = task_instance.dag_run | ||
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] | ||
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: |
There was a problem hiding this comment.
Unbound
dag_run variable and incorrect hasattr on a dict
context returned by get_template_context() is a plain dict, so hasattr(context, "dag_run") checks for an attribute on the dict object itself — not a key — and will virtually always be False. If task_instance also lacks dag_run, the variable dag_run is never assigned, and the subsequent if hasattr(dag_run, ...) line raises NameError: name 'dag_run' is not defined. The correct guard for a dict key is "dag_run" in context.
| if AIRFLOW_V_3_0_PLUS: | |
| context = task_instance.get_template_context() | |
| if hasattr(task_instance, "dag_run"): | |
| dag_run = task_instance.dag_run | |
| elif hasattr(context, "dag_run"): | |
| dag_run = context["dag_run"] | |
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: | |
| if AIRFLOW_V_3_0_PLUS: | |
| context = task_instance.get_template_context() | |
| if hasattr(task_instance, "dag_run"): | |
| dag_run = task_instance.dag_run | |
| elif "dag_run" in context: | |
| dag_run = context["dag_run"] | |
| else: | |
| raise RuntimeError("Cannot determine dag_run from task_instance or context") | |
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: |
| openlineage_inject_parent_job_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_parent_job_info", fallback=False | ||
| ), | ||
| openlineage_inject_transport_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_transport_info", fallback=False | ||
| ), |
There was a problem hiding this comment.
Config read at class-definition (import) time, not at instantiation time
Python evaluates default argument expressions exactly once — when the class body is executed on first import. Here conf.getboolean(...) is called at import time and the result is baked in as a bool constant. Any later change to the Airflow configuration (e.g., via conf_vars in tests, or dynamic reconfiguration) will have no effect on new SparkSubmitOperator instances. The idiomatic Airflow pattern is to use None as the sentinel and read the config inside __init__.
| openlineage_inject_parent_job_info: bool = conf.getboolean( | |
| "openlineage", "spark_inject_parent_job_info", fallback=False | |
| ), | |
| openlineage_inject_transport_info: bool = conf.getboolean( | |
| "openlineage", "spark_inject_transport_info", fallback=False | |
| ), | |
| openlineage_inject_parent_job_info: bool | None = None, | |
| openlineage_inject_transport_info: bool | None = None, |
| self._openlineage_inject_parent_job_info = openlineage_inject_parent_job_info | ||
| self._openlineage_inject_transport_info = openlineage_inject_transport_info |
There was a problem hiding this comment.
Complement to the default-argument fix above: read the config inside
__init__ so every new operator instance picks up the current Airflow configuration.
| self._openlineage_inject_parent_job_info = openlineage_inject_parent_job_info | |
| self._openlineage_inject_transport_info = openlineage_inject_transport_info | |
| self._openlineage_inject_parent_job_info = ( | |
| openlineage_inject_parent_job_info | |
| if openlineage_inject_parent_job_info is not None | |
| else conf.getboolean("openlineage", "spark_inject_parent_job_info", fallback=False) | |
| ) | |
| self._openlineage_inject_transport_info = ( | |
| openlineage_inject_transport_info | |
| if openlineage_inject_transport_info is not None | |
| else conf.getboolean("openlineage", "spark_inject_transport_info", fallback=False) | |
| ) |
| "spark.openlineage.transport.url": "http://localhost:5000", | ||
| "spark.openlineage.transport.endpoint": "api/v2/lineage", | ||
| "spark.openlineage.transport.timeoutInMillis": "5050000", |
There was a problem hiding this comment.
Test assertion is always
True — log message is never actually checked
assert "OpenLineage transport type console..." evaluates a non-empty string literal, which is always truthy in Python regardless of what was logged. The test passes unconditionally and would not catch a regression where the warning is no longer emitted. The in caplog.text is missing here, unlike the analogous assertion in test_inject_openlineage_composite_config_wrong_transport_to_spark just above it.
|
@codity review |
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added Dataproc Operators: Extended existing utility functions to inject OpenLineage properties into Dataproc jobs, batches, and workflow templates, with new tests covering injection scenarios. OpenLineage Utils: Updated Macros: Modified Files Changed
Review Focus Areas
ArchitectureDesign Decisions: The PR reuses Scalability & Extensibility: The injection pattern is provider-agnostic and can extend to other operators that submit Spark jobs. Dataproc integration follows existing utility patterns, limiting scope to job/batch/workflow template types. Risks: The macro fallback to |
Workflow DiagramsAutomatically generated sequence diagrams showing the workflows in this PR 1. Spark Submit OpenLineage Integration FlowMedium complexity • Components: SparkSubmitOperator, OpenLineage integration utilities, Common compat OpenLineage Spark utils sequenceDiagram
title SparkSubmitOperator OpenLineage Injection and Submit Flow
participant DAG_Run
participant SparkSubmitOperator
participant OpenLineageUtils
participant SparkSubmitHook
participant SparkCluster
DAG_Run->>SparkSubmitOperator: execute(context)
alt inject_parent_job_info enabled
SparkSubmitOperator->>OpenLineageUtils: inject_parent_job_information_into_spark_properties(conf, context)
OpenLineageUtils-->>SparkSubmitOperator: return conf with parent job info
end
alt inject_transport_info enabled
SparkSubmitOperator->>OpenLineageUtils: inject_transport_information_into_spark_properties(conf, context)
OpenLineageUtils-->>SparkSubmitOperator: return conf with transport info
end
SparkSubmitOperator->>SparkSubmitHook: _get_hook(conf, params)
SparkSubmitOperator->>SparkSubmitHook: submit(application)
SparkSubmitHook->>SparkCluster: spark-submit with enriched conf
SparkCluster-->>SparkSubmitHook: job status and completion
SparkSubmitHook-->>SparkSubmitOperator: report completion
Note over SparkSubmitOperator,SparkSubmitHook: OpenLineage properties are merged into Spark conf before submission when enabled in airflow configuration
Note: Diagrams show detected patterns only. Complex workflows may require manual review. |
| if hasattr(task_instance, "dag_run"): | ||
| dag_run = task_instance.dag_run | ||
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] | ||
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: | ||
| date = dag_run.logical_date | ||
| else: | ||
| date = dag_run.run_after |
There was a problem hiding this comment.
The code uses hasattr on the context dict, so the dag_run fallback is never set and dag_run remains undefined, leading to a NameError when evaluated.
Suggested fix
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif "dag_run" in context:
dag_run = context["dag_run"]
else:
dag_run = None
if dag_run and getattr(dag_run, "logical_date", None):
date = dag_run.logical_date
elif dag_run:
date = dag_run.run_afterPrompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 64-71
Issue Type: functional-critical
Severity: critical
Issue Description:
The code uses hasattr on the context dict, so the dag_run fallback is never set and dag_run remains undefined, leading to a NameError when evaluated.
Current Code:
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 51.8sSecurity scan powered by Codity.ai |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added two new injection parameters with config-based defaults and updated tests to cover all injection scenarios and unsupported transport handling. Dataproc Operators: Extended OpenLineage Utils: Enhanced Macros: Updated Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Injection is opt-in via operator parameters to avoid breaking existing jobs. Composite transport support uses nested Spark properties to preserve backend structure, matching OpenLineage’s expected format. Scalability & Extensibility: Injection logic is centralized in Risks: The composite transport implementation assumes HTTP backends are valid; non-HTTP backends are skipped silently. This is intentional per OpenLineage spec but should be reviewed for unexpected edge cases. Merge StatusNOT MERGEABLE — PR Score 34/100, below threshold (50)
|
| openlineage_inject_parent_job_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_parent_job_info", fallback=False | ||
| ), | ||
| openlineage_inject_transport_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_transport_info", fallback=False | ||
| ), |
There was a problem hiding this comment.
Using conf.getboolean() as a default argument value evaluates the configuration at class definition (import) time, not at instantiation time, so runtime changes to the airflow config are not reflected and the call may fail at import if the config subsystem is not ready.
Suggested fix
openlineage_inject_parent_job_info: bool | None = None,
openlineage_inject_transport_info: bool | None = None,Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high
Issue Description:
Using conf.getboolean() as a default argument value evaluates the configuration at class definition (import) time, not at instantiation time, so runtime changes to the airflow config are not reflected and the call may fail at import if the config subsystem is not ready.
Current Code:
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if hasattr(task_instance, "dag_run"): | ||
| dag_run = task_instance.dag_run | ||
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] | ||
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: | ||
| date = dag_run.logical_date | ||
| else: | ||
| date = dag_run.run_after |
There was a problem hiding this comment.
If neither task_instance nor context has a dag_run attribute, dag_run is referenced unbound on line 68 causing UnboundLocalError. Also hasattr(context, 'dag_run') checks attribute access on a dict-like Context object then accesses via subscript, which is inconsistent and may fail.
Suggested fix
dag_run = None
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif "dag_run" in context:
dag_run = context["dag_run"]
if dag_run is None:
raise AirflowException("Unable to resolve dag_run for task instance")
if getattr(dag_run, "logical_date", None):
date = dag_run.logical_date
else:
date = dag_run.run_afterPrompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 64-71
Issue Type: functional-high
Severity: high
Issue Description:
If neither `task_instance` nor `context` has a `dag_run` attribute, `dag_run` is referenced unbound on line 68 causing UnboundLocalError. Also `hasattr(context, 'dag_run')` checks attribute access on a dict-like Context object then accesses via subscript, which is inconsistent and may fail.
Current Code:
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| ) | ||
| operator.execute(MagicMock()) | ||
|
|
||
| assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." |
There was a problem hiding this comment.
The assertion is a bare string expression rather than an assert statement checking that the message is in caplog.text, so it never validates the log output. Wrap it in an assert ... in caplog.text like the sibling test.
Suggested fix
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium
Issue Description:
The assertion is a bare string expression rather than an assert statement checking that the message is in caplog.text, so it never validates the log output. Wrap it in an `assert ... in caplog.text` like the sibling test.
Current Code:
assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if hasattr(tp, "compression") and tp.compression: | ||
| props["compression"] = str(tp.compression) | ||
|
|
||
| if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer(): |
There was a problem hiding this comment.
tp.config.auth is accessed without a hasattr/None check before hasattr(tp.config.auth, "api_key"); transports lacking config.auth (or with auth=None) will raise AttributeError. Guard the access defensively.
Also reported at: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py L69
Suggested fix
if (
hasattr(tp, "config")
and hasattr(tp.config, "auth")
and tp.config.auth is not None
and hasattr(tp.config.auth, "api_key")
and tp.config.auth.get_bearer()
):Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-109
Issue Type: robustness-medium
Severity: medium
Issue Description:
`tp.config.auth` is accessed without a hasattr/None check before `hasattr(tp.config.auth, "api_key")`; transports lacking `config.auth` (or with `auth=None`) will raise AttributeError. Guard the access defensively.
_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/utils/spark.py` L69_
Current Code:
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer(): | ||
| properties["auth.type"] = "api_key" | ||
| properties["auth.apiKey"] = tp.config.auth.get_bearer() |
There was a problem hiding this comment.
API key bearer token is propagated into Spark properties which are typically logged, persisted in job metadata, and visible in Spark UI; ensure these properties are masked or transmitted through a secure mechanism rather than plain Spark conf.
Suggested fix
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
properties["auth.type"] = "api_key"
# NOTE: apiKey is sensitive; consumers must ensure Spark properties are masked in logs/UI
properties["auth.apiKey"] = tp.config.auth.get_bearer()Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 69-71
Issue Type: security-medium
Severity: medium
Issue Description:
API key bearer token is propagated into Spark properties which are typically logged, persisted in job metadata, and visible in Spark UI; ensure these properties are masked or transmitted through a secure mechanism rather than plain Spark conf.
Current Code:
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
properties["auth.type"] = "api_key"
properties["auth.apiKey"] = tp.config.auth.get_bearer()
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 29.2sSecurity scan powered by Codity.ai |
Dependency vulnerability scanning
Top 10 Vulnerabilities (19 total found)1. @babel/helpers various CVE: GHSA-968p-4wvh-cqc8
2. @babel/plugin-transform-modules-systemjs various CVE: GHSA-fv7c-fp4j-7gwp
3. @babel/runtime various CVE: GHSA-968p-4wvh-cqc8
4. ajv various CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
5. brace-expansion various CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
6. copy-webpack-plugin various CVE: N/A
7. css-minimizer-webpack-plugin various CVE: N/A
8. fast-uri various CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
9. flatted various CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
10. js-yaml various CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
9 more vulnerabilities not shown. Update dependencies to fix these issues. Powered by Codity.ai · Docs |
Dependency vulnerability scanning
View vulnerability details (4 items)1. pip 24.0 CVE: GHSA-4xh5-x5gv-qwph
2. pip 24.0 CVE: GHSA-6vgw-5pg2-w6jp
3. pip 24.0 CVE: GHSA-58qw-9mgm-455v
4. pip 24.0 CVE: GHSA-jp4c-xjxw-mgf9
Powered by Codity.ai · Docs |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 1 packagesEPL-2.0 (1 packages):
Unknown Licenses - 14 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #1Scanned: 2026-05-16 08:11 UTC | Score: 20/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-002]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py |
0 | 0 | 3 | 1 | 4 |
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py |
0 | 1 | 0 | 48 | 49 |
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py |
0 | 0 | 0 | 64 | 64 |
providers/google/tests/unit/google/cloud/openlineage/test_utils.py |
0 | 0 | 0 | 1 | 1 |
providers/google/tests/unit/google/cloud/operators/test_dataproc.py |
0 | 0 | 0 | 14 | 14 |
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/utils/test_spark.py |
0 | 0 | 0 | 5 | 5 |
Recommendations
- Resolve High severity issues, especially error handling gaps and performance bottlenecks.
- Run automated tests after applying fixes to verify no regressions.
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added Transport Handling: Enhanced Dataproc Tests: Updated tests to mock static UUIDs for reproducible run IDs and added coverage for composite transport injection and unsupported type handling. Macros: Modified Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Transport injection delegates to Scalability & Extensibility: The Risks: Moving Merge StatusNOT MERGEABLE — PR Score 12/100, below threshold (50)
|
| openlineage_inject_parent_job_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_parent_job_info", fallback=False | ||
| ), | ||
| openlineage_inject_transport_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_transport_info", fallback=False | ||
| ), |
There was a problem hiding this comment.
Using conf.getboolean() as a default argument value evaluates it once at class definition/import time, not per-instantiation, so runtime config changes will not be picked up and config errors at import time will break the module.
Suggested fix
openlineage_inject_parent_job_info: bool | None = None,
openlineage_inject_transport_info: bool | None = None,Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high
Issue Description:
Using conf.getboolean() as a default argument value evaluates it once at class definition/import time, not per-instantiation, so runtime config changes will not be picked up and config errors at import time will break the module.
Current Code:
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if AIRFLOW_V_3_0_PLUS: | ||
| context = task_instance.get_template_context() | ||
| if hasattr(task_instance, "dag_run"): | ||
| dag_run = task_instance.dag_run | ||
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] | ||
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: | ||
| date = dag_run.logical_date | ||
| else: | ||
| date = dag_run.run_after |
There was a problem hiding this comment.
When AIRFLOW_V_3_0_PLUS is True and neither task_instance nor context has a 'dag_run' attribute, 'dag_run' is never assigned, causing UnboundLocalError on the subsequent hasattr(dag_run, ...) check. Initialize dag_run or restructure the conditional to guarantee assignment.
Suggested fix
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
else:
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_afterPrompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 62-71
Issue Type: functional-high
Severity: high
Issue Description:
When AIRFLOW_V_3_0_PLUS is True and neither task_instance nor context has a 'dag_run' attribute, 'dag_run' is never assigned, causing UnboundLocalError on the subsequent hasattr(dag_run, ...) check. Initialize dag_run or restructure the conditional to guarantee assignment.
Current Code:
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| ) | ||
| operator.execute(MagicMock()) | ||
|
|
||
| assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." |
There was a problem hiding this comment.
The assert statement only evaluates a string literal (always truthy) rather than checking its presence in caplog.text, so this assertion never validates the log message.
Suggested fix
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium
Issue Description:
The assert statement only evaluates a string literal (always truthy) rather than checking its presence in caplog.text, so this assertion never validates the log message.
Current Code:
assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer(): | ||
| props["auth.type"] = "api_key" | ||
| props["auth.apiKey"] = tp.config.auth.get_bearer() |
There was a problem hiding this comment.
API key (bearer token) is injected into Spark properties as plaintext, which are commonly logged, persisted in Spark UI/event logs, and visible to anyone with cluster access. Consider warning users or supporting a secret reference instead of embedding the raw credential.
Also reported at: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py L69–L71
Suggested fix
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
log.warning(
"Injecting OpenLineage API key into Spark properties in plaintext; "
"these may be exposed via Spark UI/event logs."
)
props["auth.type"] = "api_key"
props["auth.apiKey"] = tp.config.auth.get_bearer()Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-111
Issue Type: security-medium
Severity: medium
Issue Description:
API key (bearer token) is injected into Spark properties as plaintext, which are commonly logged, persisted in Spark UI/event logs, and visible to anyone with cluster access. Consider warning users or supporting a secret reference instead of embedding the raw credential.
_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/utils/spark.py` L69–L71_
Current Code:
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
props["auth.type"] = "api_key"
props["auth.apiKey"] = tp.config.auth.get_bearer()
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| "timeoutInMillis": str( | ||
| int(tp.timeout) * 1000 | ||
| # convert to milliseconds, as required by Spark integration | ||
| ), |
There was a problem hiding this comment.
int(tp.timeout) * 1000 truncates fractional seconds (e.g., 0.5s becomes 0ms), which differs from the previous int(tp.timeout * 1000) behavior and can yield a 0 timeout. Multiply first, then cast.
Suggested fix
"timeoutInMillis": str(
int(tp.timeout * 1000)
# convert to milliseconds, as required by Spark integration
),Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 101-104
Issue Type: robustness-medium
Severity: medium
Issue Description:
`int(tp.timeout) * 1000` truncates fractional seconds (e.g., 0.5s becomes 0ms), which differs from the previous `int(tp.timeout * 1000)` behavior and can yield a 0 timeout. Multiply first, then cast.
Current Code:
"timeoutInMillis": str(
int(tp.timeout) * 1000
# convert to milliseconds, as required by Spark integration
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if hasattr(tp, "compression") and tp.compression: | ||
| props["compression"] = str(tp.compression) | ||
|
|
||
| if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer(): |
There was a problem hiding this comment.
tp.config.auth may not exist on all transport configs; accessing tp.config.auth.api_key will raise AttributeError before hasattr on api_key is evaluated. Guard with hasattr(tp.config, 'auth') first.
Suggested fix
if (
hasattr(tp, "config")
and hasattr(tp.config, "auth")
and hasattr(tp.config.auth, "api_key")
and tp.config.auth.get_bearer()
):Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-109
Issue Type: robustness-medium
Severity: medium
Issue Description:
`tp.config.auth` may not exist on all transport configs; accessing `tp.config.auth.api_key` will raise AttributeError before `hasattr` on `api_key` is evaluated. Guard with `hasattr(tp.config, 'auth')` first.
Current Code:
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] |
There was a problem hiding this comment.
hasattr(context, "dag_run") checks the context object's attributes, but context is a dict-like mapping accessed via context["dag_run"]; use 'in' instead of hasattr.
Suggested fix
elif "dag_run" in context:
dag_run = context["dag_run"]Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 66-67
Issue Type: functional-medium
Severity: medium
Issue Description:
hasattr(context, "dag_run") checks the context object's attributes, but context is a dict-like mapping accessed via context["dag_run"]; use 'in' instead of hasattr.
Current Code:
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if nested_transport.kind == "http": | ||
| http_transports[nested_transport.name] = _get_transport_information(nested_transport) |
There was a problem hiding this comment.
nested_transport.name is accessed without a hasattr check inside the http branch, but the else branch guards with hasattr; if an http nested transport lacks name, this raises AttributeError. Guard both branches consistently.
Suggested fix
if nested_transport.kind == "http":
name = nested_transport.name if hasattr(nested_transport, "name") else "no-name"
http_transports[name] = _get_transport_information(nested_transport)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 91-92
Issue Type: robustness-medium
Severity: medium
Issue Description:
`nested_transport.name` is accessed without a hasattr check inside the http branch, but the else branch guards with hasattr; if an http nested transport lacks `name`, this raises AttributeError. Guard both branches consistently.
Current Code:
if nested_transport.kind == "http":
http_transports[nested_transport.name] = _get_transport_information(nested_transport)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 28.0sSecurity scan powered by Codity.ai |
Dependency vulnerability scanning
Top 10 Vulnerabilities (19 total found)1. @babel/helpers various CVE: GHSA-968p-4wvh-cqc8
2. @babel/plugin-transform-modules-systemjs various CVE: GHSA-fv7c-fp4j-7gwp
3. @babel/runtime various CVE: GHSA-968p-4wvh-cqc8
4. ajv various CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
5. brace-expansion various CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
6. copy-webpack-plugin various CVE: N/A
7. css-minimizer-webpack-plugin various CVE: N/A
8. fast-uri various CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
9. flatted various CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
10. js-yaml various CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
9 more vulnerabilities not shown. Update dependencies to fix these issues. Powered by Codity.ai · Docs |
Dependency vulnerability scanning
View vulnerability details (4 items)1. pip 24.0 CVE: GHSA-4xh5-x5gv-qwph
2. pip 24.0 CVE: GHSA-6vgw-5pg2-w6jp
3. pip 24.0 CVE: GHSA-58qw-9mgm-455v
4. pip 24.0 CVE: GHSA-jp4c-xjxw-mgf9
Powered by Codity.ai · Docs |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 1 packagesEPL-2.0 (1 packages):
Unknown Licenses - 14 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #1Scanned: 2026-05-16 09:34 UTC | Score: 20/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-003]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py |
0 | 1 | 1 | 0 | 2 |
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py |
0 | 0 | 1 | 49 | 50 |
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py |
0 | 0 | 0 | 64 | 64 |
providers/google/tests/unit/google/cloud/openlineage/test_utils.py |
0 | 0 | 0 | 1 | 1 |
providers/google/tests/unit/google/cloud/operators/test_dataproc.py |
0 | 0 | 0 | 14 | 14 |
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/utils/test_spark.py |
0 | 0 | 0 | 5 | 5 |
Recommendations
- Resolve High severity issues, especially error handling gaps and performance bottlenecks.
- Run automated tests after applying fixes to verify no regressions.
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added Dataproc Operator: Extended existing OpenLineage injection to support both job and batch operations using the same two injection flags. OpenLineage Utils: Enhanced Macros: Updated Files Changed
Review Focus Areas
ArchitectureDesign Decisions: The injection flags default to Scalability & Extensibility: The approach keeps transport handling generic via Risks: Logging unsupported transports (e.g., console) is informational only—no action required. The macro change is low risk as it only adds a fallback path. Merge StatusNOT MERGEABLE — PR Score 13/100, below threshold (50)
|
| openlineage_inject_parent_job_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_parent_job_info", fallback=False | ||
| ), | ||
| openlineage_inject_transport_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_transport_info", fallback=False | ||
| ), |
There was a problem hiding this comment.
Using conf.getboolean() as a default parameter value evaluates it once at class definition time, not per instantiation, so runtime changes to the Airflow config are ignored. Move the lookup into init body with a sentinel default.
Suggested fix
openlineage_inject_parent_job_info: bool | None = None,
openlineage_inject_transport_info: bool | None = None,Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high
Issue Description:
Using conf.getboolean() as a default parameter value evaluates it once at class definition time, not per instantiation, so runtime changes to the Airflow config are ignored. Move the lookup into __init__ body with a sentinel default.
Current Code:
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer(): | ||
| props["auth.type"] = "api_key" | ||
| props["auth.apiKey"] = tp.config.auth.get_bearer() |
There was a problem hiding this comment.
API key and bearer token are written into Spark properties which are typically logged or visible in Spark UI/process listings, exposing credentials. Mask or pass via a secure file/secret reference instead.
Also reported at: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py L69–L71
Suggested fix
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
props["auth.type"] = "api_key"
# NOTE: Spark properties are commonly logged/exposed in the Spark UI and process args.
# Avoid embedding the bearer token directly; prefer referencing a secret file or env var.
props["auth.apiKey"] = tp.config.auth.get_bearer()Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-111
Issue Type: security-high
Severity: high
Issue Description:
API key and bearer token are written into Spark properties which are typically logged or visible in Spark UI/process listings, exposing credentials. Mask or pass via a secure file/secret reference instead.
_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/utils/spark.py` L69–L71_
Current Code:
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
props["auth.type"] = "api_key"
props["auth.apiKey"] = tp.config.auth.get_bearer()
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if AIRFLOW_V_3_0_PLUS: | ||
| context = task_instance.get_template_context() | ||
| if hasattr(task_instance, "dag_run"): | ||
| dag_run = task_instance.dag_run | ||
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] | ||
| if hasattr(dag_run, "logical_date") and dag_run.logical_date: | ||
| date = dag_run.logical_date | ||
| else: | ||
| date = dag_run.run_after |
There was a problem hiding this comment.
If task_instance lacks a dag_run attribute and context also lacks dag_run, dag_run is never assigned and the subsequent hasattr(dag_run, ...) check raises UnboundLocalError.
Suggested fix
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
else:
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_afterPrompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 62-71
Issue Type: functional-high
Severity: high
Issue Description:
If task_instance lacks a dag_run attribute and context also lacks dag_run, dag_run is never assigned and the subsequent hasattr(dag_run, ...) check raises UnboundLocalError.
Current Code:
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| elif hasattr(context, "dag_run"): | ||
| dag_run = context["dag_run"] |
There was a problem hiding this comment.
hasattr(context, "dag_run") checks attributes on the context object but context is a dict-like Context; this check is incorrect and should use "dag_run" in context.
Suggested fix
elif "dag_run" in context:
dag_run = context["dag_run"]Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 66-67
Issue Type: functional-high
Severity: high
Issue Description:
hasattr(context, "dag_run") checks attributes on the context object but context is a dict-like Context; this check is incorrect and should use "dag_run" in context.
Current Code:
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| ) | ||
| operator.execute(MagicMock()) | ||
|
|
||
| assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." |
There was a problem hiding this comment.
Assertion on a bare string literal always evaluates truthy and never verifies the log message. Use in caplog.text like the previous test.
Suggested fix
assert (
"OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
in caplog.text
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium
Issue Description:
Assertion on a bare string literal always evaluates truthy and never verifies the log message. Use `in caplog.text` like the previous test.
Current Code:
assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| for nested_transport in transport.transports: | ||
| if nested_transport.kind == "http": | ||
| http_transports[nested_transport.name] = _get_transport_information(nested_transport) |
There was a problem hiding this comment.
nested_transport.name is accessed without a hasattr guard inside the http branch, while the else-branch defensively checks for it; if a composite http transport lacks name, this raises AttributeError. Use a consistent guarded access.
Suggested fix
for nested_transport in transport.transports:
if nested_transport.kind == "http":
name = nested_transport.name if hasattr(nested_transport, "name") else "no-name"
http_transports[name] = _get_transport_information(nested_transport)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 90-92
Issue Type: robustness-medium
Severity: medium
Issue Description:
`nested_transport.name` is accessed without a `hasattr` guard inside the http branch, while the else-branch defensively checks for it; if a composite http transport lacks `name`, this raises AttributeError. Use a consistent guarded access.
Current Code:
for nested_transport in transport.transports:
if nested_transport.kind == "http":
http_transports[nested_transport.name] = _get_transport_information(nested_transport)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 26.4sSecurity scan powered by Codity.ai |
Dependency vulnerability scanning
Top 10 Vulnerabilities (19 total found)1. @babel/helpers various CVE: GHSA-968p-4wvh-cqc8
2. @babel/plugin-transform-modules-systemjs various CVE: GHSA-fv7c-fp4j-7gwp
3. @babel/runtime various CVE: GHSA-968p-4wvh-cqc8
4. ajv various CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
5. brace-expansion various CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
6. copy-webpack-plugin various CVE: N/A
7. css-minimizer-webpack-plugin various CVE: N/A
8. fast-uri various CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
9. flatted various CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
10. js-yaml various CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
9 more vulnerabilities not shown. Update dependencies to fix these issues. Powered by Codity.ai · Docs |
Dependency vulnerability scanning
View vulnerability details (4 items)1. pip 24.0 CVE: GHSA-4xh5-x5gv-qwph
2. pip 24.0 CVE: GHSA-6vgw-5pg2-w6jp
3. pip 24.0 CVE: GHSA-58qw-9mgm-455v
4. pip 24.0 CVE: GHSA-jp4c-xjxw-mgf9
Powered by Codity.ai · Docs |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 1 packagesEPL-2.0 (1 packages):
Unknown Licenses - 14 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #1Scanned: 2026-05-16 10:25 UTC | Score: 21/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-002]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py |
0 | 0 | 0 | 1 | 1 |
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py |
0 | 0 | 2 | 0 | 2 |
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py |
0 | 0 | 1 | 49 | 50 |
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py |
0 | 0 | 0 | 64 | 64 |
providers/google/tests/unit/google/cloud/openlineage/test_utils.py |
0 | 0 | 0 | 1 | 1 |
providers/google/tests/unit/google/cloud/operators/test_dataproc.py |
0 | 0 | 0 | 14 | 14 |
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/utils/test_spark.py |
0 | 0 | 0 | 5 | 5 |
Recommendations
- Run automated tests after applying fixes to verify no regressions.
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added Dataproc Operators: Extended existing OpenLineage injection to support both job and batch operations with parent and transport info, validated via updated tests. OpenLineage Utilities: Enhanced Macros: Modified Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Composite transport support uses nested keys to avoid key collisions and align with Spark’s property hierarchy. The Scalability & Extensibility: Transport injection is delegated to Risks: The dependency move from optional to required may affect users who previously disabled OpenLineage. This is intentional but should be called out in release notes. No architectural risks beyond that. |
Security Scan Summary
No critical security issues detected Scan completed in 25.1sSecurity scan powered by Codity.ai |
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
|
PR review started! Estimated time: 5-10 minutes. Learn MoreAsk Codity questions: Mention Trigger a manual review: Comment Generate unit tests: Comment Run security scan again: Comment |
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added injection flags and logging for unsupported transport types in Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Composite transports are flattened into nested Spark properties (e.g., Scalability & Extensibility: The injection flags are optional and config-driven, allowing gradual adoption. The approach supports future transport types via the same pattern. Risks: The nested transport key format ( |
Security Scan Summary
No critical security issues detected Scan completed in 24.1sSecurity scan powered by Codity.ai |
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaSpark Operator: Added Transport Handling: Enhanced Dataproc Tests: Patched static UUID generation to ensure reproducible run IDs in OpenLineage injection tests. Macros: Modified Dependencies: Promoted Files Changed
Review Focus Areas
ArchitectureDesign Decisions: The PR delegates transport parsing to Scalability & Extensibility: The approach keeps transport handling centralized in Risks: The dependency promotion to required may affect users who previously disabled OpenLineage. This is intentional but should be called out in release notes. The fallback to Merge StatusNOT MERGEABLE — PR Score 13/100, below threshold (50)
|
| openlineage_inject_parent_job_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_parent_job_info", fallback=False | ||
| ), | ||
| openlineage_inject_transport_info: bool = conf.getboolean( | ||
| "openlineage", "spark_inject_transport_info", fallback=False | ||
| ), |
There was a problem hiding this comment.
Default values for openlineage_inject_parent_job_info and openlineage_inject_transport_info are evaluated once at module import time via conf.getboolean(...), so configuration changes after import are not reflected; use None as the default and resolve the config value inside __init__.
Also reported at: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py L183–L184
Suggested fix
openlineage_inject_parent_job_info: bool | None = None,
openlineage_inject_transport_info: bool | None = None,Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high
Issue Description:
Default values for `openlineage_inject_parent_job_info` and `openlineage_inject_transport_info` are evaluated once at module import time via `conf.getboolean(...)`, so configuration changes after import are not reflected; use `None` as the default and resolve the config value inside `__init__`.
_Also reported at: `providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py` L183–L184_
Current Code:
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| if nested_transport.kind == "http": | ||
| http_transports[nested_transport.name] = _get_transport_information( | ||
| nested_transport | ||
| ) |
There was a problem hiding this comment.
When iterating composite transports, http_transports[nested_transport.name] is used without checking hasattr(nested_transport, 'name'), which will raise AttributeError if an http transport has no name attribute, unlike the else branch which guards with hasattr.
Suggested fix
if nested_transport.kind == "http":
transport_name = (
nested_transport.name
if hasattr(nested_transport, "name")
else "http"
)
http_transports[transport_name] = _get_transport_information(
nested_transport
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 133-136
Issue Type: functional-high
Severity: high
Issue Description:
When iterating composite transports, `http_transports[nested_transport.name]` is used without checking `hasattr(nested_transport, 'name')`, which will raise `AttributeError` if an http transport has no `name` attribute, unlike the `else` branch which guards with `hasattr`.
Current Code:
if nested_transport.kind == "http":
http_transports[nested_transport.name] = _get_transport_information(
nested_transport
)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| "timeoutInMillis": str( | ||
| int(tp.timeout) * 1000 # convert to milliseconds, as required by Spark integration | ||
| ), |
There was a problem hiding this comment.
The timeout conversion changed from int(tp.timeout * 1000) (multiply then truncate) to int(tp.timeout) * 1000 (truncate then multiply), producing incorrect millisecond values for fractional-second timeouts (e.g., 1.5s yields 1000ms instead of 1500ms).
Suggested fix
"timeoutInMillis": str(
int(tp.timeout * 1000) # convert to milliseconds, as required by Spark integration
),Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 62-64
Issue Type: functional-high
Severity: high
Issue Description:
The timeout conversion changed from `int(tp.timeout * 1000)` (multiply then truncate) to `int(tp.timeout) * 1000` (truncate then multiply), producing incorrect millisecond values for fractional-second timeouts (e.g., 1.5s yields 1000ms instead of 1500ms).
Current Code:
"timeoutInMillis": str(
int(tp.timeout) * 1000 # convert to milliseconds, as required by Spark integration
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| ) | ||
| operator.execute(MagicMock()) | ||
|
|
||
| assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." |
There was a problem hiding this comment.
The assert on line 459 is asserting a non-empty string literal, which is always truthy and never actually checks that the log message appears in caplog.text, so the test will pass even if the log message is never emitted.
Suggested fix
assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." in caplog.textPrompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium
Issue Description:
The assert on line 459 is asserting a non-empty string literal, which is always truthy and never actually checks that the log message appears in caplog.text, so the test will pass even if the log message is never emitted.
Current Code:
assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| "timeoutInMillis": str( | ||
| int(tp.timeout) * 1000 | ||
| # convert to milliseconds, as required by Spark integration | ||
| ), |
There was a problem hiding this comment.
In _get_transport_information, tp.timeout is cast with int() before multiplying, but if tp.timeout is None or not set, this raises a TypeError; add a guard or default value (e.g., int(tp.timeout or 0) * 1000).
Suggested fix
"timeoutInMillis": str(
int(tp.timeout) * 1000 if tp.timeout is not None else "0"
# convert to milliseconds, as required by Spark integration
),Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 101-104
Issue Type: robustness-medium
Severity: medium
Issue Description:
In `_get_transport_information`, `tp.timeout` is cast with `int()` before multiplying, but if `tp.timeout` is `None` or not set, this raises a `TypeError`; add a guard or default value (e.g., `int(tp.timeout or 0) * 1000`).
Current Code:
"timeoutInMillis": str(
int(tp.timeout) * 1000
# convert to milliseconds, as required by Spark integration
),
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 26.8sSecurity scan powered by Codity.ai |
Dependency vulnerability scanning
Top 10 Vulnerabilities (19 total found)1. @babel/helpers various CVE: GHSA-968p-4wvh-cqc8
2. @babel/plugin-transform-modules-systemjs various CVE: GHSA-fv7c-fp4j-7gwp
3. @babel/runtime various CVE: GHSA-968p-4wvh-cqc8
4. ajv various CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
5. brace-expansion various CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
6. copy-webpack-plugin various CVE: N/A
7. css-minimizer-webpack-plugin various CVE: N/A
8. fast-uri various CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
9. flatted various CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
10. js-yaml various CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
9 more vulnerabilities not shown. Update dependencies to fix these issues. Powered by Codity.ai · Docs |
Dependency vulnerability scanning
View vulnerability details (4 items)1. pip 24.0 CVE: GHSA-4xh5-x5gv-qwph
2. pip 24.0 CVE: GHSA-6vgw-5pg2-w6jp
3. pip 24.0 CVE: GHSA-58qw-9mgm-455v
4. pip 24.0 CVE: GHSA-jp4c-xjxw-mgf9
Powered by Codity.ai · Docs |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 1 packagesEPL-2.0 (1 packages):
Unknown Licenses - 14 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #1Scanned: 2026-05-16 11:38 UTC | Score: 21/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-001]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py |
0 | 0 | 2 | 1 | 3 |
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py |
0 | 0 | 1 | 48 | 49 |
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py |
0 | 0 | 0 | 64 | 64 |
providers/google/tests/unit/google/cloud/openlineage/test_utils.py |
0 | 0 | 0 | 1 | 1 |
providers/google/tests/unit/google/cloud/operators/test_dataproc.py |
0 | 0 | 0 | 14 | 14 |
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py |
0 | 0 | 0 | 3 | 3 |
providers/openlineage/tests/unit/openlineage/utils/test_spark.py |
0 | 0 | 0 | 5 | 5 |
Recommendations
- Run automated tests after applying fixes to verify no regressions.
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.