Skip to content

Add openlineage config spark submit#1

Open
DhirenMhatre wants to merge 2 commits into
mainfrom
add-openlineage-config-spark-submit
Open

Add openlineage config spark submit#1
DhirenMhatre wants to merge 2 commits into
mainfrom
add-openlineage-config-spark-submit

Conversation

@DhirenMhatre
Copy link
Copy Markdown


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented May 16, 2026

Greptile Summary

This PR adds OpenLineage configuration injection into spark-submit properties, supporting both HTTP and composite transports. It extends SparkSubmitOperator with two new opt-in flags, refactors the transport-to-Spark-properties conversion to handle composite transports, and updates lineage_run_id in macros for Airflow 3.0+ compatibility.

  • SparkSubmitOperator gains openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters, with config-backed defaults, and calls the new inject utilities in execute().
  • _get_transport_information_as_spark_properties (in both openlineage/utils/spark.py and the common.compat shim) is refactored to fan-out composite transports into per-transport Spark property namespaces, filtering out non-HTTP sub-transports.
  • lineage_run_id in macros.py is updated to prefer dag_run.logical_date and fall back to dag_run.run_after on Airflow 3.0+, with test coverage added for both paths.

Confidence Score: 3/5

Not safe to merge as-is: the None-conf crash and the unbound-variable in lineage_run_id would both trigger in realistic usage scenarios.

Three bugs touch the newly enabled code paths. First, SparkSubmitOperator.execute passes self.conf (None by default) directly into the inject functions, where iterating over None raises a TypeError. Second, the Airflow 3.0+ branch in lineage_run_id uses hasattr(context, 'dag_run') on a plain dict, which never matches, leaving dag_run unbound and causing a NameError when the fallback is needed. Third, the operator's new boolean parameters are evaluated at import time via conf.getboolean(), freezing the config values and making runtime overrides ineffective.

spark_submit.py (None conf + import-time config reads) and macros.py (unbound variable / wrong dict hasattr) need the most attention before merging.

Important Files Changed

Filename Overview
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Adds OpenLineage injection to SparkSubmitOperator.execute(); has a None-conf TypeError bug and config-evaluated-at-import-time defect
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Adds Airflow 3.0+ branch to lineage_run_id(); contains unbound dag_run variable risk and wrong hasattr-on-dict check
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Refactors transport injection to support composite transports alongside http; logic looks correct
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Mirrors the composite-transport refactor for backward-compat fallback path; logic is consistent
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Adds OpenLineage injection tests; one assertion for the console-transport warning is a bare string literal (always True) and never actually checks caplog
providers/apache/spark/pyproject.toml Promotes apache-airflow-providers-common-compat from optional to required dependency and removes the now-redundant optional extra
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Adds composite-transport test cases; well structured and covers the main happy-path and skip scenarios
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Splits lineage_run_id test into Airflow-version-specific variants; coverage is adequate but only exercises the happy path
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Adds generate_static_uuid mock to stabilise run-id assertions in Dataproc OL tests; purely test-fixture improvements

Sequence Diagram

sequenceDiagram
    participant User
    participant SparkSubmitOperator
    participant CompatShim as common.compat/utils/spark.py
    participant OLUtils as openlineage/utils/spark.py
    participant OLListener as OpenLineageListener
    participant SparkSubmitHook

    User->>SparkSubmitOperator: execute(context)
    alt "openlineage_inject_parent_job_info=True"
        SparkSubmitOperator->>CompatShim: inject_parent_job_information_into_spark_properties(conf, ctx)
        CompatShim->>OLUtils: delegates if OL provider available
        OLUtils-->>SparkSubmitOperator: conf + parentJobName/Namespace/RunId
    end
    alt "openlineage_inject_transport_info=True"
        SparkSubmitOperator->>CompatShim: inject_transport_information_into_spark_properties(conf, ctx)
        CompatShim->>OLUtils: delegates if OL provider available
        OLUtils->>OLListener: get_openlineage_listener().adapter...transport
        OLListener-->>OLUtils: transport kind
        alt "transport.kind == http"
            OLUtils-->>SparkSubmitOperator: "conf + spark.openlineage.transport.*"
        else "transport.kind == composite"
            OLUtils-->>SparkSubmitOperator: "conf + spark.openlineage.transport.transports.name.*"
        else unsupported
            OLUtils-->>SparkSubmitOperator: conf unchanged, log warning
        end
    end
    SparkSubmitOperator->>SparkSubmitHook: submit(application)
Loading

Reviews (1): Last reviewed commit: "add OpenLineage configuration injection ..." | Re-trigger Greptile

Comment on lines +188 to +193
if self._openlineage_inject_parent_job_info:
self.log.debug("Injecting OpenLineage parent job information into Spark properties.")
self.conf = inject_parent_job_information_into_spark_properties(self.conf, context)
if self._openlineage_inject_transport_info:
self.log.debug("Injecting OpenLineage transport information into Spark properties.")
self.conf = inject_transport_information_into_spark_properties(self.conf, context)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 None conf causes TypeError in inject functions

self.conf defaults to None (the parameter is typed dict[str, Any] | None = None). When a user creates SparkSubmitOperator(task_id="...", openlineage_inject_parent_job_info=True) without providing conf, calling inject_parent_job_information_into_spark_properties(None, context) propagates None into _is_parent_job_information_present_in_spark_properties, which immediately does for key in properties — iterating over None raises TypeError: 'NoneType' object is not iterable. Both injection paths share this defect.

Comment on lines +62 to +68
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unbound dag_run variable and incorrect hasattr on a dict

context returned by get_template_context() is a plain dict, so hasattr(context, "dag_run") checks for an attribute on the dict object itself — not a key — and will virtually always be False. If task_instance also lacks dag_run, the variable dag_run is never assigned, and the subsequent if hasattr(dag_run, ...) line raises NameError: name 'dag_run' is not defined. The correct guard for a dict key is "dag_run" in context.

Suggested change
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif "dag_run" in context:
dag_run = context["dag_run"]
else:
raise RuntimeError("Cannot determine dag_run from task_instance or context")
if hasattr(dag_run, "logical_date") and dag_run.logical_date:

Comment on lines +143 to +148
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Config read at class-definition (import) time, not at instantiation time

Python evaluates default argument expressions exactly once — when the class body is executed on first import. Here conf.getboolean(...) is called at import time and the result is baked in as a bool constant. Any later change to the Airflow configuration (e.g., via conf_vars in tests, or dynamic reconfiguration) will have no effect on new SparkSubmitOperator instances. The idiomatic Airflow pattern is to use None as the sentinel and read the config inside __init__.

Suggested change
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
openlineage_inject_parent_job_info: bool | None = None,
openlineage_inject_transport_info: bool | None = None,

Comment on lines +183 to +184
self._openlineage_inject_parent_job_info = openlineage_inject_parent_job_info
self._openlineage_inject_transport_info = openlineage_inject_transport_info
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Complement to the default-argument fix above: read the config inside __init__ so every new operator instance picks up the current Airflow configuration.

Suggested change
self._openlineage_inject_parent_job_info = openlineage_inject_parent_job_info
self._openlineage_inject_transport_info = openlineage_inject_transport_info
self._openlineage_inject_parent_job_info = (
openlineage_inject_parent_job_info
if openlineage_inject_parent_job_info is not None
else conf.getboolean("openlineage", "spark_inject_parent_job_info", fallback=False)
)
self._openlineage_inject_transport_info = (
openlineage_inject_transport_info
if openlineage_inject_transport_info is not None
else conf.getboolean("openlineage", "spark_inject_transport_info", fallback=False)
)

Comment on lines +322 to +324
"spark.openlineage.transport.url": "http://localhost:5000",
"spark.openlineage.transport.endpoint": "api/v2/lineage",
"spark.openlineage.transport.timeoutInMillis": "5050000",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Test assertion is always True — log message is never actually checked

assert "OpenLineage transport type console..." evaluates a non-empty string literal, which is always truthy in Python regardless of what was logged. The test passes unconditionally and would not catch a regression where the warning is no longer emitted. The in caplog.text is missing here, unlike the analogous assertion in test_inject_openlineage_composite_config_wrong_transport_to_spark just above it.

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-ai
Copy link
Copy Markdown

codity-ai Bot commented May 16, 2026

PR Summary

What Changed

  • Added OpenLineage parent job and transport injection support to Spark Submit and Dataproc operators via new parameters and utility functions.
  • Updated transport injection logic to handle composite transports and nested HTTP properties correctly.
  • Fixed Airflow 3.0+ compatibility for the lineage_run_id macro by falling back to run_after when logical_date is unavailable.

Key Changes by Area

Spark Operator: Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters to spark_submit.py, defaulting to config values and using common.compat utilities to inject metadata before job submission.

Dataproc Operators: Extended existing utility functions to inject OpenLineage properties into Dataproc jobs, batches, and workflow templates, with new tests covering injection scenarios.

OpenLineage Utils: Updated _get_transport_information_as_spark_properties in spark.py to support composite transports and format nested HTTP properties correctly.

Macros: Modified lineage_run_id macro in macros.py to use run_after when logical_date is missing, ensuring compatibility with Airflow 3.0+.

Files Changed

File Changes Summary
generated/provider_dependencies.json Updated provider dependency metadata
providers/apache/spark/README.rst Added apache-airflow-providers-common-compat>=1.5.0 to dependencies list
providers/apache/spark/pyproject.toml Added apache-airflow-providers-common-compat>=1.5.0 as required dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider info (dependency metadata)
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added OpenLineage injection parameters and calls to inject parent job and transport info
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for OpenLineage injection in Spark Submit
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Added utility functions for injecting OpenLineage metadata into Spark properties
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added tests for Dataproc OpenLineage property injection
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Added tests for Dataproc operators with OpenLineage injection
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Fixed lineage_run_id macro to use run_after when logical_date is unavailable
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Updated _get_transport_information_as_spark_properties to support composite transports
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added tests for lineage_run_id macro behavior
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport injection

Review Focus Areas

  • Composite transport handling in spark.py:53–115, especially nested HTTP property formatting
  • Dataproc operator integration tests (test_dataproc.py:1565–2038, test_utils.py:533–1146) for edge cases
  • lineage_run_id macro fallback logic (macros.py:58–77) for Airflow 3.0+ compatibility

Architecture

Design Decisions: The PR reuses common.compat utilities to avoid duplicating injection logic across providers. Composite transport support was added incrementally to maintain backward compatibility with existing HTTP-only configurations.

Scalability & Extensibility: The injection pattern is provider-agnostic and can extend to other operators that submit Spark jobs. Dataproc integration follows existing utility patterns, limiting scope to job/batch/workflow template types.

Risks: The macro fallback to run_after assumes DAG run context availability; if run_after is also missing, the macro may return None. This is intentional per Airflow 3.0+ behavior and should be tested in real DAG runs.

@codity-ai
Copy link
Copy Markdown

codity-ai Bot commented May 16, 2026

Workflow Diagrams

Automatically generated sequence diagrams showing the workflows in this PR

1. Spark Submit OpenLineage Integration Flow

Medium complexity • Components: SparkSubmitOperator, OpenLineage integration utilities, Common compat OpenLineage Spark utils

sequenceDiagram
    title SparkSubmitOperator OpenLineage Injection and Submit Flow
    participant DAG_Run
    participant SparkSubmitOperator
    participant OpenLineageUtils
    participant SparkSubmitHook
    participant SparkCluster

    DAG_Run->>SparkSubmitOperator: execute(context)
    alt inject_parent_job_info enabled
        SparkSubmitOperator->>OpenLineageUtils: inject_parent_job_information_into_spark_properties(conf, context)
        OpenLineageUtils-->>SparkSubmitOperator: return conf with parent job info
    end
    alt inject_transport_info enabled
        SparkSubmitOperator->>OpenLineageUtils: inject_transport_information_into_spark_properties(conf, context)
        OpenLineageUtils-->>SparkSubmitOperator: return conf with transport info
    end
    SparkSubmitOperator->>SparkSubmitHook: _get_hook(conf, params)
    SparkSubmitOperator->>SparkSubmitHook: submit(application)
    SparkSubmitHook->>SparkCluster: spark-submit with enriched conf
    SparkCluster-->>SparkSubmitHook: job status and completion
    SparkSubmitHook-->>SparkSubmitOperator: report completion
    Note over SparkSubmitOperator,SparkSubmitHook: OpenLineage properties are merged into Spark conf before submission when enabled in airflow configuration
Loading

Note: Diagrams show detected patterns only. Complex workflows may require manual review.

Comment on lines +64 to +71
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Critical

The code uses hasattr on the context dict, so the dag_run fallback is never set and dag_run remains undefined, leading to a NameError when evaluated.

Suggested fix
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        elif "dag_run" in context:
            dag_run = context["dag_run"]
        else:
            dag_run = None
        if dag_run and getattr(dag_run, "logical_date", None):
            date = dag_run.logical_date
        elif dag_run:
            date = dag_run.run_after
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 64-71
Issue Type: functional-critical
Severity: critical

Issue Description:
The code uses hasattr on the context dict, so the dag_run fallback is never set and dag_run remains undefined, leading to a NameError when evaluated.

Current Code:
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        elif hasattr(context, "dag_run"):
            dag_run = context["dag_run"]
        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
            date = dag_run.logical_date
        else:
            date = dag_run.run_after

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue

@codity-ai
Copy link
Copy Markdown

codity-ai Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 51.8s

Security scan powered by Codity.ai

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR Summary

What Changed

  • Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters to SparkSubmitOperator and Dataproc operators to inject OpenLineage metadata into Spark properties.
  • Implemented injection logic in common.compat and openlineage.utils.spark to support HTTP and composite transports, including nested property generation.
  • Fixed Airflow 3.0+ compatibility for the lineage_run_id macro by falling back to run_after when logical_date is unavailable.

Key Changes by Area

Spark Operator: Added two new injection parameters with config-based defaults and updated tests to cover all injection scenarios and unsupported transport handling.

Dataproc Operators: Extended DataprocSubmitJobOperator and DataprocCreateBatchOperator to accept the same OpenLineage injection flags as SparkSubmitOperator.

OpenLineage Utils: Enhanced _get_transport_information_as_spark_properties to generate nested properties for composite transports with multiple HTTP backends.

Macros: Updated lineage_run_id to use dag_run.run_after when logical_date is missing.

Files Changed

File Changes Summary
generated/provider_dependencies.json Added apache-airflow-providers-common-compat>=1.5.0 dependency
providers/apache/spark/README.rst Documented new OpenLineage injection parameters
providers/apache/spark/pyproject.toml Added apache-airflow-providers-common-compat>=1.5.0 dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider info to reflect new dependency
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added injection parameters and integration logic
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for injection scenarios and unsupported transport handling
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Added inject_parent_job_information_into_spark_properties and inject_transport_information_into_spark_properties functions
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added test utilities for Dataproc OpenLineage injection
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Added tests for Dataproc job/batch OpenLineage injection
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py:62–75 Fixed lineage_run_id macro for Airflow 3.0+ compatibility
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py:56–112 Enhanced transport injection to support composite and HTTP transports
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added tests for lineage_run_id macro fallback behavior
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport injection and skip behavior

Review Focus Areas

  • Composite transport property generation logic in utils/spark.py:56–112, especially handling of multiple HTTP backends
  • Skip behavior for unsupported transports (e.g., console, non-HTTP composites) in test_spark_submit.py
  • Airflow 3.0+ fallback in macros.py:62–75 when logical_date is missing

Architecture

Design Decisions: Injection is opt-in via operator parameters to avoid breaking existing jobs. Composite transport support uses nested Spark properties to preserve backend structure, matching OpenLineage’s expected format.

Scalability & Extensibility: Injection logic is centralized in common.compat, enabling reuse across providers. New transport types can be added by extending the mapping in utils/spark.py.

Risks: The composite transport implementation assumes HTTP backends are valid; non-HTTP backends are skipped silently. This is intentional per OpenLineage spec but should be reviewed for unexpected edge cases.

Merge Status

NOT MERGEABLE — PR Score 34/100, below threshold (50)

  • [H4] PR quality score (34) is below merge floor (50)
  • [H6] Code quality raw score (20) is below merge floor (40)

Comment on lines +143 to +148
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

Using conf.getboolean() as a default argument value evaluates the configuration at class definition (import) time, not at instantiation time, so runtime changes to the airflow config are not reflected and the call may fail at import if the config subsystem is not ready.

Suggested fix
        openlineage_inject_parent_job_info: bool | None = None,
        openlineage_inject_transport_info: bool | None = None,
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high

Issue Description:
Using conf.getboolean() as a default argument value evaluates the configuration at class definition (import) time, not at instantiation time, so runtime changes to the airflow config are not reflected and the call may fail at import if the config subsystem is not ready.

Current Code:
        openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),
        openlineage_inject_transport_info: bool = conf.getboolean(
            "openlineage", "spark_inject_transport_info", fallback=False
        ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +64 to +71
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

If neither task_instance nor context has a dag_run attribute, dag_run is referenced unbound on line 68 causing UnboundLocalError. Also hasattr(context, 'dag_run') checks attribute access on a dict-like Context object then accesses via subscript, which is inconsistent and may fail.

Suggested fix
        dag_run = None
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        elif "dag_run" in context:
            dag_run = context["dag_run"]
        if dag_run is None:
            raise AirflowException("Unable to resolve dag_run for task instance")
        if getattr(dag_run, "logical_date", None):
            date = dag_run.logical_date
        else:
            date = dag_run.run_after
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 64-71
Issue Type: functional-high
Severity: high

Issue Description:
If neither `task_instance` nor `context` has a `dag_run` attribute, `dag_run` is referenced unbound on line 68 causing UnboundLocalError. Also `hasattr(context, 'dag_run')` checks attribute access on a dict-like Context object then accesses via subscript, which is inconsistent and may fail.

Current Code:
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        elif hasattr(context, "dag_run"):
            dag_run = context["dag_run"]
        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
            date = dag_run.logical_date
        else:
            date = dag_run.run_after

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

)
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

The assertion is a bare string expression rather than an assert statement checking that the message is in caplog.text, so it never validates the log output. Wrap it in an assert ... in caplog.text like the sibling test.

Suggested fix
            assert (
                "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
                in caplog.text
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium

Issue Description:
The assertion is a bare string expression rather than an assert statement checking that the message is in caplog.text, so it never validates the log output. Wrap it in an `assert ... in caplog.text` like the sibling test.

Current Code:
            assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

if hasattr(tp, "compression") and tp.compression:
props["compression"] = str(tp.compression)

if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

tp.config.auth is accessed without a hasattr/None check before hasattr(tp.config.auth, "api_key"); transports lacking config.auth (or with auth=None) will raise AttributeError. Guard the access defensively.

Also reported at: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py L69

Suggested fix
                        if (
                            hasattr(tp, "config")
                            and hasattr(tp.config, "auth")
                            and tp.config.auth is not None
                            and hasattr(tp.config.auth, "api_key")
                            and tp.config.auth.get_bearer()
                        ):
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-109
Issue Type: robustness-medium
Severity: medium

Issue Description:
`tp.config.auth` is accessed without a hasattr/None check before `hasattr(tp.config.auth, "api_key")`; transports lacking `config.auth` (or with `auth=None`) will raise AttributeError. Guard the access defensively.

_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/utils/spark.py` L69_

Current Code:
                        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +69 to +71
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
properties["auth.type"] = "api_key"
properties["auth.apiKey"] = tp.config.auth.get_bearer()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Medium

API key bearer token is propagated into Spark properties which are typically logged, persisted in job metadata, and visible in Spark UI; ensure these properties are masked or transmitted through a secure mechanism rather than plain Spark conf.

Suggested fix
        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
            properties["auth.type"] = "api_key"
            # NOTE: apiKey is sensitive; consumers must ensure Spark properties are masked in logs/UI
            properties["auth.apiKey"] = tp.config.auth.get_bearer()
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 69-71
Issue Type: security-medium
Severity: medium

Issue Description:
API key bearer token is propagated into Spark properties which are typically logged, persisted in job metadata, and visible in Spark UI; ensure these properties are masked or transmitted through a secure mechanism rather than plain Spark conf.

Current Code:
        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
            properties["auth.type"] = "api_key"
            properties["auth.apiKey"] = tp.config.auth.get_bearer()

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 29.2s

Security scan powered by Codity.ai

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 19
Scanner npm audit
Top 10 Vulnerabilities (19 total found)

1. @babel/helpers various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


2. @babel/plugin-transform-modules-systemjs various

CVE: GHSA-fv7c-fp4j-7gwp
Severity: HIGH
Fixed in: True

@babel/plugin-transform-modules-systemjs generates arbitrary code when compiling malicious input


3. @babel/runtime various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


4. ajv various

CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
Severity: MODERATE
Fixed in: True

ajv has ReDoS when using $data option


5. brace-expansion various

CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
Severity: MODERATE
Fixed in: True

brace-expansion Regular Expression Denial of Service vulnerability


6. copy-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 14.0.0

Vulnerability in copy-webpack-plugin


7. css-minimizer-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 8.0.0

Vulnerability in css-minimizer-webpack-plugin


8. fast-uri various

CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
Severity: HIGH
Fixed in: True

fast-uri vulnerable to path traversal via percent-encoded dot segments


9. flatted various

CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
Severity: HIGH
Fixed in: True

flatted vulnerable to unbounded recursion DoS in parse() revive phase


10. js-yaml various

CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
Severity: MODERATE
Fixed in: True

js-yaml has prototype pollution in merge (<<)


9 more vulnerabilities not shown. Update dependencies to fix these issues.

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 4
Scanner pip-audit
View vulnerability details (4 items)

1. pip 24.0

CVE: GHSA-4xh5-x5gv-qwph
Fixed in: 25.3

When extracting a tar archive pip may not check symbolic links point into the extraction directory if the tarfile module doesn't implement PEP 706. Note that upgrading pip to a "fixed" version for thi


2. pip 24.0

CVE: GHSA-6vgw-5pg2-w6jp
Fixed in: 26.0

When pip is installing and extracting a maliciously crafted wheel archive, files may be extracted outside the installation directory. The path traversal is limited to prefixes of the installation dire


3. pip 24.0

CVE: GHSA-58qw-9mgm-455v
Fixed in:

pip handles concatenated tar and ZIP files as ZIP files regardless of filename or whether a file is both a tar and ZIP file. This behavior could result in confusing installation behavior, such as inst


4. pip 24.0

CVE: GHSA-jp4c-xjxw-mgf9
Fixed in: 26.1

pip prior to version 26.1 would run self-update check functionality after installing wheel files which required importing well-known Python modules names. These module imports were intentionally defer

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

License Compliance Scan

Metric Value
Packages Scanned 1157
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 1
Low Risk (Permissive) 1142
Unknown License 14

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 1 packages

EPL-2.0 (1 packages):

  • elkjs 0.9.3
Unknown Licenses - 14 packages
  • react-syntax-highlighter 15.5.6
  • @pandacss/is-valid-prop 0.41.0
  • jackspeak 3.4.3 (BlueOak-1.0.0)
  • path-scurry 1.11.1 (BlueOak-1.0.0)
  • package-json-from-dist 1.0.1 (BlueOak-1.0.0)
  • string-width-cjs 4.2.3
  • strip-ansi-cjs 6.0.1
  • wrap-ansi-cjs 7.0.0
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • pytest 8.2,<9
  • openlineage-airflow 1.29.0
  • requests 2.27.0,<3
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Code Quality Report — test-org-codity/airflow1 · PR #1

Scanned: 2026-05-16 08:11 UTC | Score: 20/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 1
Medium 3
Low 139
Top Findings

[CQ-LLM-002] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:284 (Duplication · HIGH)

Issue: The test cases for injecting OpenLineage configurations have similar setup code, indicating potential duplication.
Suggestion: Extract common setup code into a helper function to adhere to DRY principles.

operator = SparkSubmitOperator(
            task_id="spark_submit_job",
            spark_binary="sparky",
            dag=self.dag,
            openlineage_inject_parent_job_info=False,
            openlineage_inject_transport_info=True,
            **self._config,
        )

[CQ-LLM-001] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:140 (Complexity · MEDIUM)

Issue: The constructor of SparkSubmitOperator has multiple parameters, increasing cyclomatic complexity.
Suggestion: Consider refactoring the constructor to reduce the number of parameters, possibly by using a configuration object.

openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),
        openlineage_inject_transport_info: bool = conf.getboolean(
            "openlineage", "spark_inject_transport_info", fallback=False
        ),

[CQ-LLM-005] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:140 (Maintainability · MEDIUM)

Issue: Use of magic strings for configuration keys in the SparkSubmitOperator constructor.
Suggestion: Define constants for configuration keys to improve maintainability and readability.

conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        )

[CQ-LLM-003] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:180 (Error_Handling · MEDIUM)

Issue: Potentially missing error handling when injecting OpenLineage properties.
Suggestion: Add error handling to manage exceptions that may arise during the injection of OpenLineage properties.

self.conf = inject_parent_job_information_into_spark_properties(self.conf, context)

[CQ-LLM-004] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:140 (Documentation · LOW)

Issue: Missing docstring for the new parameters in the SparkSubmitOperator constructor.
Suggestion: Add docstrings to describe the purpose of the new parameters.

openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),

[CQ-007] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:290 (Documentation · LOW)

Issue: Public def 'test_inject_simple_openlineage_config_to_spark' missing docstring
Suggestion: Add a docstring describing purpose and parameters

def test_inject_simple_openlineage_config_to_spark(self, mock_get_openlineage_listener, mock_get_hook):

[CQ-009] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:299 (Style · LOW)

Issue: Line exceeds 120 characters (131 chars)
Suggestion: Break long lines into multiple lines for readability

        mock_get_openlineage_listener.return_value.adapter.get_or_create_openlineage_client.return_value.transport = Htt...

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:301 (Maintainability · LOW)

Issue: Magic number 5000 in code
Suggestion: Extract to a named constant

url="http://localhost:5000",

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:303 (Maintainability · LOW)

Issue: Magic number 5050 in code
Suggestion: Extract to a named constant

timeout=5050,

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:304 (Maintainability · LOW)

Issue: Magic number 12345 in code
Suggestion: Extract to a named constant

auth=ApiKeyTokenProvider({"api_key": "12345"}),

Per-File Breakdown

File Critical High Medium Low Total
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py 0 0 3 1 4
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py 0 1 0 48 49
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py 0 0 0 64 64
providers/google/tests/unit/google/cloud/openlineage/test_utils.py 0 0 0 1 1
providers/google/tests/unit/google/cloud/operators/test_dataproc.py 0 0 0 14 14
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/utils/test_spark.py 0 0 0 5 5

Recommendations

  1. Resolve High severity issues, especially error handling gaps and performance bottlenecks.
  • Run automated tests after applying fixes to verify no regressions.

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR Summary

What Changed

  • Added two new optional parameters to SparkSubmitOperator and Dataproc operators to inject OpenLineage parent job and transport configuration into Spark properties.
  • Updated transport injection logic to support composite transports, skip unsupported types (e.g., console), and nest HTTP transports under spark.openlineage.transport.transports.*.
  • Made apache-airflow-providers-common-compat>=1.5.0 a required dependency for the Spark provider and fixed Airflow 3.0+ compatibility in the lineage_run_id macro.

Key Changes by Area

Spark Operator: Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters with config-driven defaults; injection delegates to common-compat utilities.

Transport Handling: Enhanced _get_transport_information_as_spark_properties to support composite transports and format nested HTTP transports under spark.openlineage.transport.transports.<name>.*.

Dataproc Tests: Updated tests to mock static UUIDs for reproducible run IDs and added coverage for composite transport injection and unsupported type handling.

Macros: Modified lineage_run_id to use dag_run.run_after when logical_date is unavailable, with version-gated logic.

Files Changed

File Changes Summary
generated/provider_dependencies.json Updated Spark provider dependencies to include common-compat as required
providers/apache/spark/README.rst Documented new OpenLineage injection parameters
providers/apache/spark/pyproject.toml Added apache-airflow-providers-common-compat>=1.5.0 as required dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider info to reflect new dependency
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for new injection parameters
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Implemented transport injection logic (composite, HTTP nesting, skip unsupported)
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added static UUID mocking helper
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Updated tests with static UUID mock and composite transport coverage
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Fixed lineage_run_id macro for Airflow 3.0+ compatibility
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Updated _get_transport_information_as_spark_properties with composite transport support
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added tests for lineage_run_id Airflow 3.0+ behavior
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport injection and unsupported type handling

Review Focus Areas

  • Verify transport injection logic handles edge cases: composite transports, nested HTTP, and skipped types (e.g., console).
  • Confirm lineage_run_id macro fallback to run_after works correctly across Airflow versions.
  • Check that static UUID mocking in Dataproc tests ensures deterministic run IDs without side effects.

Architecture

Design Decisions: Transport injection delegates to common-compat to avoid duplicating logic; composite transport support follows OpenLineage’s nested transport spec. Skipping unsupported types (e.g., console) avoids invalid Spark properties.

Scalability & Extensibility: The common-compat utility layer allows future transport types without modifying Spark-specific code. Composite transport formatting is generic and extensible.

Risks: Moving common-compat to required dependencies increases Spark provider’s runtime footprint. This is intentional to ensure OpenLineage injection works reliably.

Merge Status

NOT MERGEABLE — PR Score 12/100, below threshold (50)

  • [H4] PR quality score (12) is below merge floor (50)
  • [H5] 4 HIGH-severity inline review findings need resolution (threshold: 3)
  • [H6] Code quality raw score (20) is below merge floor (40)

Comment on lines +143 to +148
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

Using conf.getboolean() as a default argument value evaluates it once at class definition/import time, not per-instantiation, so runtime config changes will not be picked up and config errors at import time will break the module.

Suggested fix
        openlineage_inject_parent_job_info: bool | None = None,
        openlineage_inject_transport_info: bool | None = None,
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high

Issue Description:
Using conf.getboolean() as a default argument value evaluates it once at class definition/import time, not per-instantiation, so runtime config changes will not be picked up and config errors at import time will break the module.

Current Code:
        openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),
        openlineage_inject_transport_info: bool = conf.getboolean(
            "openlineage", "spark_inject_transport_info", fallback=False
        ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +62 to +71
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

When AIRFLOW_V_3_0_PLUS is True and neither task_instance nor context has a 'dag_run' attribute, 'dag_run' is never assigned, causing UnboundLocalError on the subsequent hasattr(dag_run, ...) check. Initialize dag_run or restructure the conditional to guarantee assignment.

Suggested fix
    if AIRFLOW_V_3_0_PLUS:
        context = task_instance.get_template_context()
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        else:
            dag_run = context["dag_run"]
        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
            date = dag_run.logical_date
        else:
            date = dag_run.run_after
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 62-71
Issue Type: functional-high
Severity: high

Issue Description:
When AIRFLOW_V_3_0_PLUS is True and neither task_instance nor context has a 'dag_run' attribute, 'dag_run' is never assigned, causing UnboundLocalError on the subsequent hasattr(dag_run, ...) check. Initialize dag_run or restructure the conditional to guarantee assignment.

Current Code:
    if AIRFLOW_V_3_0_PLUS:
        context = task_instance.get_template_context()
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        elif hasattr(context, "dag_run"):
            dag_run = context["dag_run"]
        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
            date = dag_run.logical_date
        else:
            date = dag_run.run_after

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

)
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

The assert statement only evaluates a string literal (always truthy) rather than checking its presence in caplog.text, so this assertion never validates the log message.

Suggested fix
            assert (
                "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
                in caplog.text
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium

Issue Description:
The assert statement only evaluates a string literal (always truthy) rather than checking its presence in caplog.text, so this assertion never validates the log message.

Current Code:
            assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +109 to +111
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
props["auth.type"] = "api_key"
props["auth.apiKey"] = tp.config.auth.get_bearer()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Medium

API key (bearer token) is injected into Spark properties as plaintext, which are commonly logged, persisted in Spark UI/event logs, and visible to anyone with cluster access. Consider warning users or supporting a secret reference instead of embedding the raw credential.

Also reported at: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py L69–L71

Suggested fix
                        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
                            log.warning(
                                "Injecting OpenLineage API key into Spark properties in plaintext; "
                                "these may be exposed via Spark UI/event logs."
                            )
                            props["auth.type"] = "api_key"
                            props["auth.apiKey"] = tp.config.auth.get_bearer()
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-111
Issue Type: security-medium
Severity: medium

Issue Description:
API key (bearer token) is injected into Spark properties as plaintext, which are commonly logged, persisted in Spark UI/event logs, and visible to anyone with cluster access. Consider warning users or supporting a secret reference instead of embedding the raw credential.

_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/utils/spark.py` L69–L71_

Current Code:
                        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
                            props["auth.type"] = "api_key"
                            props["auth.apiKey"] = tp.config.auth.get_bearer()

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +101 to +104
"timeoutInMillis": str(
int(tp.timeout) * 1000
# convert to milliseconds, as required by Spark integration
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

int(tp.timeout) * 1000 truncates fractional seconds (e.g., 0.5s becomes 0ms), which differs from the previous int(tp.timeout * 1000) behavior and can yield a 0 timeout. Multiply first, then cast.

Suggested fix
                            "timeoutInMillis": str(
                                int(tp.timeout * 1000)
                                # convert to milliseconds, as required by Spark integration
                            ),
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 101-104
Issue Type: robustness-medium
Severity: medium

Issue Description:
`int(tp.timeout) * 1000` truncates fractional seconds (e.g., 0.5s becomes 0ms), which differs from the previous `int(tp.timeout * 1000)` behavior and can yield a 0 timeout. Multiply first, then cast.

Current Code:
                            "timeoutInMillis": str(
                                int(tp.timeout) * 1000
                                # convert to milliseconds, as required by Spark integration
                            ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

if hasattr(tp, "compression") and tp.compression:
props["compression"] = str(tp.compression)

if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

tp.config.auth may not exist on all transport configs; accessing tp.config.auth.api_key will raise AttributeError before hasattr on api_key is evaluated. Guard with hasattr(tp.config, 'auth') first.

Suggested fix
                        if (
                            hasattr(tp, "config")
                            and hasattr(tp.config, "auth")
                            and hasattr(tp.config.auth, "api_key")
                            and tp.config.auth.get_bearer()
                        ):
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-109
Issue Type: robustness-medium
Severity: medium

Issue Description:
`tp.config.auth` may not exist on all transport configs; accessing `tp.config.auth.api_key` will raise AttributeError before `hasattr` on `api_key` is evaluated. Guard with `hasattr(tp.config, 'auth')` first.

Current Code:
                        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +66 to +67
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

hasattr(context, "dag_run") checks the context object's attributes, but context is a dict-like mapping accessed via context["dag_run"]; use 'in' instead of hasattr.

Suggested fix
        elif "dag_run" in context:
            dag_run = context["dag_run"]
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 66-67
Issue Type: functional-medium
Severity: medium

Issue Description:
hasattr(context, "dag_run") checks the context object's attributes, but context is a dict-like mapping accessed via context["dag_run"]; use 'in' instead of hasattr.

Current Code:
        elif hasattr(context, "dag_run"):
            dag_run = context["dag_run"]

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +91 to +92
if nested_transport.kind == "http":
http_transports[nested_transport.name] = _get_transport_information(nested_transport)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

nested_transport.name is accessed without a hasattr check inside the http branch, but the else branch guards with hasattr; if an http nested transport lacks name, this raises AttributeError. Guard both branches consistently.

Suggested fix
            if nested_transport.kind == "http":
                name = nested_transport.name if hasattr(nested_transport, "name") else "no-name"
                http_transports[name] = _get_transport_information(nested_transport)
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 91-92
Issue Type: robustness-medium
Severity: medium

Issue Description:
`nested_transport.name` is accessed without a hasattr check inside the http branch, but the else branch guards with hasattr; if an http nested transport lacks `name`, this raises AttributeError. Guard both branches consistently.

Current Code:
            if nested_transport.kind == "http":
                http_transports[nested_transport.name] = _get_transport_information(nested_transport)

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 28.0s

Security scan powered by Codity.ai

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 19
Scanner npm audit
Top 10 Vulnerabilities (19 total found)

1. @babel/helpers various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


2. @babel/plugin-transform-modules-systemjs various

CVE: GHSA-fv7c-fp4j-7gwp
Severity: HIGH
Fixed in: True

@babel/plugin-transform-modules-systemjs generates arbitrary code when compiling malicious input


3. @babel/runtime various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


4. ajv various

CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
Severity: MODERATE
Fixed in: True

ajv has ReDoS when using $data option


5. brace-expansion various

CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
Severity: MODERATE
Fixed in: True

brace-expansion Regular Expression Denial of Service vulnerability


6. copy-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 14.0.0

Vulnerability in copy-webpack-plugin


7. css-minimizer-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 8.0.0

Vulnerability in css-minimizer-webpack-plugin


8. fast-uri various

CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
Severity: HIGH
Fixed in: True

fast-uri vulnerable to path traversal via percent-encoded dot segments


9. flatted various

CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
Severity: HIGH
Fixed in: True

flatted vulnerable to unbounded recursion DoS in parse() revive phase


10. js-yaml various

CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
Severity: MODERATE
Fixed in: True

js-yaml has prototype pollution in merge (<<)


9 more vulnerabilities not shown. Update dependencies to fix these issues.

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 4
Scanner pip-audit
View vulnerability details (4 items)

1. pip 24.0

CVE: GHSA-4xh5-x5gv-qwph
Fixed in: 25.3

When extracting a tar archive pip may not check symbolic links point into the extraction directory if the tarfile module doesn't implement PEP 706. Note that upgrading pip to a "fixed" version for thi


2. pip 24.0

CVE: GHSA-6vgw-5pg2-w6jp
Fixed in: 26.0

When pip is installing and extracting a maliciously crafted wheel archive, files may be extracted outside the installation directory. The path traversal is limited to prefixes of the installation dire


3. pip 24.0

CVE: GHSA-58qw-9mgm-455v
Fixed in:

pip handles concatenated tar and ZIP files as ZIP files regardless of filename or whether a file is both a tar and ZIP file. This behavior could result in confusing installation behavior, such as inst


4. pip 24.0

CVE: GHSA-jp4c-xjxw-mgf9
Fixed in: 26.1

pip prior to version 26.1 would run self-update check functionality after installing wheel files which required importing well-known Python modules names. These module imports were intentionally defer

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

License Compliance Scan

Metric Value
Packages Scanned 1157
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 1
Low Risk (Permissive) 1142
Unknown License 14

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 1 packages

EPL-2.0 (1 packages):

  • elkjs 0.9.3
Unknown Licenses - 14 packages
  • react-syntax-highlighter 15.5.6
  • @pandacss/is-valid-prop 0.41.0
  • jackspeak 3.4.3 (BlueOak-1.0.0)
  • package-json-from-dist 1.0.1 (BlueOak-1.0.0)
  • path-scurry 1.11.1 (BlueOak-1.0.0)
  • string-width-cjs 4.2.3
  • strip-ansi-cjs 6.0.1
  • wrap-ansi-cjs 7.0.0
  • openlineage-airflow 1.29.0
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • pytest 8.2,<9
  • requests 2.27.0,<3
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Code Quality Report — test-org-codity/airflow1 · PR #1

Scanned: 2026-05-16 09:34 UTC | Score: 20/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 1
Medium 2
Low 139
Top Findings

[CQ-LLM-003] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:180 (Error_Handling · HIGH)

Issue: The code does not handle potential exceptions that may arise from the OpenLineage injection functions.
Suggestion: Wrap the calls to inject_parent_job_information_into_spark_properties and inject_transport_information_into_spark_properties in try-except blocks to handle exceptions gracefully.

self.conf = inject_parent_job_information_into_spark_properties(self.conf, context)

[CQ-LLM-002] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:140 (Documentation · MEDIUM)

Issue: The new parameters for OpenLineage injection in the constructor lack docstrings explaining their purpose.
Suggestion: Add docstrings for the new parameters in the constructor to improve documentation.

openlineage_inject_parent_job_info: bool = conf.getboolean(...)

[CQ-LLM-001] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:284 (Duplication · MEDIUM)

Issue: The test cases for injecting OpenLineage configuration into Spark have similar setup code, which could lead to duplication.
Suggestion: Consider refactoring the common setup code into a helper function to reduce duplication.

mock_get_openlineage_listener.return_value.adapter.get_or_create_openlineage_client.return_value.transport = ...

[CQ-LLM-004] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:284 (Maintainability · LOW)

Issue: The test case uses hard-coded values for configuration which may lead to maintenance issues.
Suggestion: Consider using constants or configuration files for hard-coded values to improve maintainability.

url='http://localhost:5000'

[CQ-007] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:290 (Documentation · LOW)

Issue: Public def 'test_inject_simple_openlineage_config_to_spark' missing docstring
Suggestion: Add a docstring describing purpose and parameters

def test_inject_simple_openlineage_config_to_spark(self, mock_get_openlineage_listener, mock_get_hook):

[CQ-009] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:299 (Style · LOW)

Issue: Line exceeds 120 characters (131 chars)
Suggestion: Break long lines into multiple lines for readability

        mock_get_openlineage_listener.return_value.adapter.get_or_create_openlineage_client.return_value.transport = Htt...

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:301 (Maintainability · LOW)

Issue: Magic number 5000 in code
Suggestion: Extract to a named constant

url="http://localhost:5000",

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:303 (Maintainability · LOW)

Issue: Magic number 5050 in code
Suggestion: Extract to a named constant

timeout=5050,

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:304 (Maintainability · LOW)

Issue: Magic number 12345 in code
Suggestion: Extract to a named constant

auth=ApiKeyTokenProvider({"api_key": "12345"}),

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:322 (Maintainability · LOW)

Issue: Magic number 5000 in code
Suggestion: Extract to a named constant

"spark.openlineage.transport.url": "http://localhost:5000",

Per-File Breakdown

File Critical High Medium Low Total
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py 0 1 1 0 2
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py 0 0 1 49 50
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py 0 0 0 64 64
providers/google/tests/unit/google/cloud/openlineage/test_utils.py 0 0 0 1 1
providers/google/tests/unit/google/cloud/operators/test_dataproc.py 0 0 0 14 14
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/utils/test_spark.py 0 0 0 5 5

Recommendations

  1. Resolve High severity issues, especially error handling gaps and performance bottlenecks.
  • Run automated tests after applying fixes to verify no regressions.

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR Summary

What Changed

  • Added two new optional configuration flags (openlineage_inject_parent_job_info and openlineage_inject_transport_info) to Spark Submit and Dataproc operators to inject OpenLineage metadata into Spark properties.
  • Updated OpenLineage transport injection logic to support composite transports (e.g., nested HTTP transports) and added Airflow 3.0+ compatibility for the lineage_run_id macro.
  • Bumped apache-airflow-providers-common-compat to ≥1.5.0 as a required dependency for Spark provider.

Key Changes by Area

Spark Operator: Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters to SparkSubmitOperator, with config-based defaults and logging for unsupported transport types.

Dataproc Operator: Extended existing OpenLineage injection to support both job and batch operations using the same two injection flags.

OpenLineage Utils: Enhanced inject_transport_information_into_spark_properties in spark.py to parse composite transports and format nested HTTP transport keys (e.g., spark.openlineage.transport.transports.http.*).

Macros: Updated lineage_run_id macro in macros.py to fall back to run_after when logical_date is unavailable (Airflow 3.0+).

Files Changed

File Changes Summary
generated/provider_dependencies.json Added apache-airflow-providers-common-compat>=1.5.0 dependency for Spark provider
providers/apache/spark/README.rst Documented new OpenLineage injection parameters
providers/apache/spark/pyproject.toml Added apache-airflow-providers-common-compat>=1.5.0 dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider info to reflect new config defaults
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added injection parameters and logic for parent job and transport info
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for injection scenarios and unsupported transport logging
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Enhanced transport injection to handle composite transports
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added tests for composite transport handling in Dataproc
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Updated Dataproc tests to cover new injection flags
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Fixed lineage_run_id macro for Airflow 3.0+ compatibility
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Updated transport injection logic for Spark integration
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added tests for lineage_run_id with run_after
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport handling

Review Focus Areas

  • Verify composite transport parsing and nested key formatting in spark.py (common-compat and openlineage)
  • Confirm lineage_run_id macro fallback logic works correctly for Airflow 3.0+ DAG runs
  • Check test coverage for skipped injection when OpenLineage is not accessible

Architecture

Design Decisions: The injection flags default to False to avoid breaking existing behavior. Composite transport support uses nested keys to avoid key collisions and align with Spark’s property hierarchy.

Scalability & Extensibility: The approach keeps transport handling generic via common-compat, allowing future transport types without modifying operator code. Dataproc reuse of the same flags ensures consistency across providers.

Risks: Logging unsupported transports (e.g., console) is informational only—no action required. The macro change is low risk as it only adds a fallback path.

Merge Status

NOT MERGEABLE — PR Score 13/100, below threshold (50)

  • [H4] PR quality score (13) is below merge floor (50)
  • [H5] 8 HIGH-severity inline review findings need resolution (threshold: 3)
  • [H6] Code quality raw score (21) is below merge floor (40)

Comment on lines +143 to +148
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

Using conf.getboolean() as a default parameter value evaluates it once at class definition time, not per instantiation, so runtime changes to the Airflow config are ignored. Move the lookup into init body with a sentinel default.

Suggested fix
        openlineage_inject_parent_job_info: bool | None = None,
        openlineage_inject_transport_info: bool | None = None,
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high

Issue Description:
Using conf.getboolean() as a default parameter value evaluates it once at class definition time, not per instantiation, so runtime changes to the Airflow config are ignored. Move the lookup into __init__ body with a sentinel default.

Current Code:
        openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),
        openlineage_inject_transport_info: bool = conf.getboolean(
            "openlineage", "spark_inject_transport_info", fallback=False
        ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +109 to +111
if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
props["auth.type"] = "api_key"
props["auth.apiKey"] = tp.config.auth.get_bearer()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security High

API key and bearer token are written into Spark properties which are typically logged or visible in Spark UI/process listings, exposing credentials. Mask or pass via a secure file/secret reference instead.

Also reported at: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py L69–L71

Suggested fix
                        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
                            props["auth.type"] = "api_key"
                            # NOTE: Spark properties are commonly logged/exposed in the Spark UI and process args.
                            # Avoid embedding the bearer token directly; prefer referencing a secret file or env var.
                            props["auth.apiKey"] = tp.config.auth.get_bearer()
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 109-111
Issue Type: security-high
Severity: high

Issue Description:
API key and bearer token are written into Spark properties which are typically logged or visible in Spark UI/process listings, exposing credentials. Mask or pass via a secure file/secret reference instead.

_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/utils/spark.py` L69–L71_

Current Code:
                        if hasattr(tp.config.auth, "api_key") and tp.config.auth.get_bearer():
                            props["auth.type"] = "api_key"
                            props["auth.apiKey"] = tp.config.auth.get_bearer()

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +62 to +71
if AIRFLOW_V_3_0_PLUS:
context = task_instance.get_template_context()
if hasattr(task_instance, "dag_run"):
dag_run = task_instance.dag_run
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
if hasattr(dag_run, "logical_date") and dag_run.logical_date:
date = dag_run.logical_date
else:
date = dag_run.run_after
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

If task_instance lacks a dag_run attribute and context also lacks dag_run, dag_run is never assigned and the subsequent hasattr(dag_run, ...) check raises UnboundLocalError.

Suggested fix
    if AIRFLOW_V_3_0_PLUS:
        context = task_instance.get_template_context()
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        else:
            dag_run = context["dag_run"]
        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
            date = dag_run.logical_date
        else:
            date = dag_run.run_after
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 62-71
Issue Type: functional-high
Severity: high

Issue Description:
If task_instance lacks a dag_run attribute and context also lacks dag_run, dag_run is never assigned and the subsequent hasattr(dag_run, ...) check raises UnboundLocalError.

Current Code:
    if AIRFLOW_V_3_0_PLUS:
        context = task_instance.get_template_context()
        if hasattr(task_instance, "dag_run"):
            dag_run = task_instance.dag_run
        elif hasattr(context, "dag_run"):
            dag_run = context["dag_run"]
        if hasattr(dag_run, "logical_date") and dag_run.logical_date:
            date = dag_run.logical_date
        else:
            date = dag_run.run_after

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +66 to +67
elif hasattr(context, "dag_run"):
dag_run = context["dag_run"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

hasattr(context, "dag_run") checks attributes on the context object but context is a dict-like Context; this check is incorrect and should use "dag_run" in context.

Suggested fix
        elif "dag_run" in context:
            dag_run = context["dag_run"]
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py
Lines: 66-67
Issue Type: functional-high
Severity: high

Issue Description:
hasattr(context, "dag_run") checks attributes on the context object but context is a dict-like Context; this check is incorrect and should use "dag_run" in context.

Current Code:
        elif hasattr(context, "dag_run"):
            dag_run = context["dag_run"]

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

)
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

Assertion on a bare string literal always evaluates truthy and never verifies the log message. Use in caplog.text like the previous test.

Suggested fix
            assert (
                "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
                in caplog.text
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium

Issue Description:
Assertion on a bare string literal always evaluates truthy and never verifies the log message. Use `in caplog.text` like the previous test.

Current Code:
            assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +90 to +92
for nested_transport in transport.transports:
if nested_transport.kind == "http":
http_transports[nested_transport.name] = _get_transport_information(nested_transport)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

nested_transport.name is accessed without a hasattr guard inside the http branch, while the else-branch defensively checks for it; if a composite http transport lacks name, this raises AttributeError. Use a consistent guarded access.

Suggested fix
        for nested_transport in transport.transports:
            if nested_transport.kind == "http":
                name = nested_transport.name if hasattr(nested_transport, "name") else "no-name"
                http_transports[name] = _get_transport_information(nested_transport)
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 90-92
Issue Type: robustness-medium
Severity: medium

Issue Description:
`nested_transport.name` is accessed without a `hasattr` guard inside the http branch, while the else-branch defensively checks for it; if a composite http transport lacks `name`, this raises AttributeError. Use a consistent guarded access.

Current Code:
        for nested_transport in transport.transports:
            if nested_transport.kind == "http":
                http_transports[nested_transport.name] = _get_transport_information(nested_transport)

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 26.4s

Security scan powered by Codity.ai

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 19
Scanner npm audit
Top 10 Vulnerabilities (19 total found)

1. @babel/helpers various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


2. @babel/plugin-transform-modules-systemjs various

CVE: GHSA-fv7c-fp4j-7gwp
Severity: HIGH
Fixed in: True

@babel/plugin-transform-modules-systemjs generates arbitrary code when compiling malicious input


3. @babel/runtime various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


4. ajv various

CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
Severity: MODERATE
Fixed in: True

ajv has ReDoS when using $data option


5. brace-expansion various

CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
Severity: MODERATE
Fixed in: True

brace-expansion Regular Expression Denial of Service vulnerability


6. copy-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 14.0.0

Vulnerability in copy-webpack-plugin


7. css-minimizer-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 8.0.0

Vulnerability in css-minimizer-webpack-plugin


8. fast-uri various

CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
Severity: HIGH
Fixed in: True

fast-uri vulnerable to path traversal via percent-encoded dot segments


9. flatted various

CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
Severity: HIGH
Fixed in: True

flatted vulnerable to unbounded recursion DoS in parse() revive phase


10. js-yaml various

CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
Severity: MODERATE
Fixed in: True

js-yaml has prototype pollution in merge (<<)


9 more vulnerabilities not shown. Update dependencies to fix these issues.

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 4
Scanner pip-audit
View vulnerability details (4 items)

1. pip 24.0

CVE: GHSA-4xh5-x5gv-qwph
Fixed in: 25.3

When extracting a tar archive pip may not check symbolic links point into the extraction directory if the tarfile module doesn't implement PEP 706. Note that upgrading pip to a "fixed" version for thi


2. pip 24.0

CVE: GHSA-6vgw-5pg2-w6jp
Fixed in: 26.0

When pip is installing and extracting a maliciously crafted wheel archive, files may be extracted outside the installation directory. The path traversal is limited to prefixes of the installation dire


3. pip 24.0

CVE: GHSA-58qw-9mgm-455v
Fixed in:

pip handles concatenated tar and ZIP files as ZIP files regardless of filename or whether a file is both a tar and ZIP file. This behavior could result in confusing installation behavior, such as inst


4. pip 24.0

CVE: GHSA-jp4c-xjxw-mgf9
Fixed in: 26.1

pip prior to version 26.1 would run self-update check functionality after installing wheel files which required importing well-known Python modules names. These module imports were intentionally defer

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

License Compliance Scan

Metric Value
Packages Scanned 1157
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 1
Low Risk (Permissive) 1142
Unknown License 14

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 1 packages

EPL-2.0 (1 packages):

  • elkjs 0.9.3
Unknown Licenses - 14 packages
  • react-syntax-highlighter 15.5.6
  • @pandacss/is-valid-prop 0.41.0
  • jackspeak 3.4.3 (BlueOak-1.0.0)
  • path-scurry 1.11.1 (BlueOak-1.0.0)
  • package-json-from-dist 1.0.1 (BlueOak-1.0.0)
  • string-width-cjs 4.2.3
  • strip-ansi-cjs 6.0.1
  • wrap-ansi-cjs 7.0.0
  • openlineage-airflow 1.29.0
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • pytest 8.2,<9
  • requests 2.27.0,<3
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Code Quality Report — test-org-codity/airflow1 · PR #1

Scanned: 2026-05-16 10:25 UTC | Score: 21/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 0
Medium 3
Low 140
Top Findings

[CQ-LLM-002] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:141 (Documentation · MEDIUM)

Issue: Missing docstring for the constructor of SparkSubmitOperator.
Suggestion: Add a docstring to describe the parameters and purpose of the constructor.

def __init__(self, ...):

[CQ-LLM-003] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:145 (Error_Handling · MEDIUM)

Issue: Potentially missing error handling for configuration injection.
Suggestion: Add error handling to manage cases where the configuration injection fails.

self.conf = inject_parent_job_information_into_spark_properties(self.conf, context)

[CQ-LLM-004] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:284 (Testability · MEDIUM)

Issue: Test is tightly coupled to the implementation of SparkSubmitOperator.
Suggestion: Consider using dependency injection to make the operator more testable.

operator = SparkSubmitOperator(task_id="spark_submit_job", ...)

[CQ-LLM-001] providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py:126 (Duplication · LOW)

Issue: The dependencies list is duplicated in multiple places.
Suggestion: Consider creating a single source of truth for dependencies to avoid duplication.

"dependencies": ["apache-airflow>=2.9.0", "apache-airflow-providers-common-compat>=1.5.0", "pyspark>=3.1.3", "grpcio-status>=1.59.0"]

[CQ-LLM-005] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:285 (Maintainability · LOW)

Issue: Magic numbers used in the test for timeout.
Suggestion: Define constants for magic numbers to improve readability and maintainability.

timeout=5050

[CQ-007] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:290 (Documentation · LOW)

Issue: Public def 'test_inject_simple_openlineage_config_to_spark' missing docstring
Suggestion: Add a docstring describing purpose and parameters

def test_inject_simple_openlineage_config_to_spark(self, mock_get_openlineage_listener, mock_get_hook):

[CQ-009] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:299 (Style · LOW)

Issue: Line exceeds 120 characters (131 chars)
Suggestion: Break long lines into multiple lines for readability

        mock_get_openlineage_listener.return_value.adapter.get_or_create_openlineage_client.return_value.transport = Htt...

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:301 (Maintainability · LOW)

Issue: Magic number 5000 in code
Suggestion: Extract to a named constant

url="http://localhost:5000",

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:303 (Maintainability · LOW)

Issue: Magic number 5050 in code
Suggestion: Extract to a named constant

timeout=5050,

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:304 (Maintainability · LOW)

Issue: Magic number 12345 in code
Suggestion: Extract to a named constant

auth=ApiKeyTokenProvider({"api_key": "12345"}),

Per-File Breakdown

File Critical High Medium Low Total
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py 0 0 0 1 1
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py 0 0 2 0 2
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py 0 0 1 49 50
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py 0 0 0 64 64
providers/google/tests/unit/google/cloud/openlineage/test_utils.py 0 0 0 1 1
providers/google/tests/unit/google/cloud/operators/test_dataproc.py 0 0 0 14 14
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/utils/test_spark.py 0 0 0 5 5

Recommendations

  • Run automated tests after applying fixes to verify no regressions.

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR Summary

What Changed

  • Added two new optional parameters to SparkSubmitOperator and Dataproc operators to inject OpenLineage parent job and transport info into Spark properties.
  • Updated OpenLineage transport injection logic in common-compat to support composite transports (e.g., nested HTTP sub-transports).
  • Made apache-airflow-providers-common-compat>=1.5.0 a required dependency for the Spark provider and fixed Airflow 3.0+ compatibility in the lineage_run_id macro.

Key Changes by Area

Spark Operator: Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters; defaults pull from Airflow config; injection delegates to common-compat utilities.

Dataproc Operators: Extended existing OpenLineage injection to support both job and batch operations with parent and transport info, validated via updated tests.

OpenLineage Utilities: Enhanced _get_transport_information_as_spark_properties to handle composite transports by extracting HTTP sub-transports and nesting keys (e.g., spark.openlineage.transport.transports.http.*).

Macros: Modified lineage_run_id in macros.py to use dag_run.run_after when logical_date is unavailable, with version-gated logic.

Files Changed

File Changes Summary
generated/provider_dependencies.json Updated Spark provider dependencies to require common-compat>=1.5.0
providers/apache/spark/README.rst Documented new OpenLineage injection parameters
providers/apache/spark/pyproject.toml Made common-compat a required dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider metadata to reflect new dependency
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for new injection parameters and composite transport handling
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Enhanced transport injection to support composite transports
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added tests for composite transport fallbacks
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Extended tests to validate parent and transport info injection
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Fixed lineage_run_id macro for Airflow 3.0+ compatibility
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Updated transport injection logic (mirrors common-compat changes)
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added tests for lineage_run_id with run_after fallback
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport handling

Review Focus Areas

  • Verify composite transport injection logic handles edge cases (e.g., missing HTTP sub-transport, unsupported transport types).
  • Confirm lineage_run_id macro fallback to run_after works correctly across Airflow versions (2.x vs 3.x).
  • Check that common-compat is correctly required (not optional) and version-pinned in pyproject.toml.

Architecture

Design Decisions: Composite transport support uses nested keys to avoid key collisions and align with Spark’s property hierarchy. The lineage_run_id macro uses version-gated logic to maintain backward compatibility without breaking existing DAGs.

Scalability & Extensibility: Transport injection is delegated to common-compat, enabling reuse across providers. The macro change is isolated and does not affect lineage collection logic.

Risks: The dependency move from optional to required may affect users who previously disabled OpenLineage. This is intentional but should be called out in release notes. No architectural risks beyond that.

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 25.1s

Security scan powered by Codity.ai

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR review started! Estimated time: 5-10 minutes.
Using default review instructions (no custom configuration found)

Learn More

View Analytics Dashboard

Ask Codity questions: Mention @codity {your question} in a comment to get answers about the code.

Trigger a manual review: Comment @codity review on a PR or MR.

Generate unit tests: Comment /generate-tests to auto-generate tests for Go, Python, Ruby, JavaScript, TypeScript, and Java files.

Run security scan again: Comment /security-scan to run SAST and dependency vulnerability scans for all major languages in your repo.

View Full Docs

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR Summary

What Changed

  • Added two new optional boolean parameters (openlineage_inject_parent_job_info and openlineage_inject_transport_info) to Spark Submit and Dataproc operators to inject OpenLineage metadata into Spark properties.
  • Updated OpenLineage transport injection logic to support composite transports (e.g., HTTP nested under transports.http.*) and added Airflow 3.0+ compatibility for the lineage_run_id macro.
  • Added apache-airflow-providers-common-compat>=1.5.0 as a required dependency for the Spark provider.

Key Changes by Area

Spark Operator: Added injection flags and logging for unsupported transport types in spark_submit.py.
Dataproc Operator: Extended existing OpenLineage injection to support both job and batch operations with the same flags.
OpenLineage Utils: Enhanced inject_transport_information_into_spark_properties in spark.py to parse composite transports and extract HTTP sub-transports.
Macros: Updated lineage_run_id in macros.py to fall back to run_after when logical_date is unavailable (Airflow 3.0+).

Files Changed

File Changes Summary
generated/provider_dependencies.json Added apache-airflow-providers-common-compat>=1.5.0 dependency
providers/apache/spark/README.rst Documented new injection parameters
providers/apache/spark/pyproject.toml Added common-compat dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider info to reflect new dependency
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added injection flags and logging for transport types
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for injection scenarios and unsupported transports
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Enhanced transport injection to handle composite transports
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added tests for composite transport handling
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Added tests for Dataproc job/batch OpenLineage injection
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Fixed lineage_run_id macro for Airflow 3.0+
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Updated transport injection logic
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added tests for lineage_run_id with run_after
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport handling

Review Focus Areas

  • Transport injection logic in spark.py (especially composite transport parsing and nested key formatting)
  • lineage_run_id macro fallback behavior for Airflow 3.0+ DAG runs
  • Logging of unsupported transport types in spark_submit.py

Architecture

Design Decisions: Composite transports are flattened into nested Spark properties (e.g., spark.openlineage.transport.transports.http.*) to preserve structure while remaining compatible with Spark’s flat property model. The common-compat dependency is used to centralize shared OpenLineage utilities and avoid duplication.

Scalability & Extensibility: The injection flags are optional and config-driven, allowing gradual adoption. The approach supports future transport types via the same pattern.

Risks: The nested transport key format (transports.http.*) is new and untested in production Spark environments. If Spark rejects nested keys, this may require runtime fallback logic. This is intentional for now but should be monitored.

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 24.1s

Security scan powered by Codity.ai

@DhirenMhatre
Copy link
Copy Markdown
Author

@codity review

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 10 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

PR Summary

What Changed

  • Added two new optional parameters to Spark Submit and Dataproc operators to inject OpenLineage parent job and transport info into Spark properties.
  • Updated transport injection logic to support composite transports (including nested HTTP) and skip unsupported types like console.
  • Fixed Airflow 3.0+ compatibility for the lineage_run_id macro by falling back to run_after when logical_date is missing.

Key Changes by Area

Spark Operator: Added openlineage_inject_parent_job_info and openlineage_inject_transport_info parameters; injection logic delegates to common-compat utilities.

Transport Handling: Enhanced _get_transport_information_as_spark_properties to parse composite transports, extract HTTP sub-transports, and log skipped types.

Dataproc Tests: Patched static UUID generation to ensure reproducible run IDs in OpenLineage injection tests.

Macros: Modified lineage_run_id in macros.py to use dag_run.run_after when logical_date is unavailable.

Dependencies: Promoted apache-airflow-providers-common-compat from optional to required in Spark provider.

Files Changed

File Changes Summary
generated/provider_dependencies.json Updated Spark provider dependencies to include common-compat as required
providers/apache/spark/README.rst Documented new OpenLineage injection parameters
providers/apache/spark/pyproject.toml Made apache-airflow-providers-common-compat a required dependency
providers/apache/spark/src/airflow/providers/apache/spark/get_provider_info.py Updated provider info to reflect new dependency
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py Added injection parameters and transport handling logic
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py Added tests for composite transport injection
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py Implemented robust transport parsing and injection
providers/google/tests/unit/google/cloud/openlineage/test_utils.py Added mock_static_uuid helper for deterministic test IDs
providers/google/tests/unit/google/cloud/operators/test_dataproc.py Updated tests to use mock_static_uuid for reproducibility
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py Fixed lineage_run_id macro for Airflow 3.0+ compatibility
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py Updated transport injection logic to match common-compat behavior
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py Added test for lineage_run_id fallback to run_after
providers/openlineage/tests/unit/openlineage/utils/test_spark.py Added tests for composite transport handling

Review Focus Areas

  • Transport injection logic in spark.py (especially composite transport parsing and HTTP sub-transport extraction)
  • Behavior of lineage_run_id macro when logical_date is absent (Airflow 3.0+ compatibility)
  • Consistency of OpenLineage test mocks across Spark and Dataproc operators

Architecture

Design Decisions: The PR delegates transport parsing to common-compat to avoid duplication and ensure consistency across providers. Composite transport support is implemented by recursively extracting HTTP sub-transports, which aligns with OpenLineage’s expected Spark property format.

Scalability & Extensibility: The approach keeps transport handling centralized in common-compat, making future transport types easier to add without modifying operator code. Dataproc tests now use shared mocking utilities, improving test maintainability.

Risks: The dependency promotion to required may affect users who previously disabled OpenLineage. This is intentional but should be called out in release notes. The fallback to run_after is safe for Airflow 3.0+ but assumes run_after is always set when logical_date is missing—this is true in current Airflow behavior but worth monitoring.

Merge Status

NOT MERGEABLE — PR Score 13/100, below threshold (50)

  • [H4] PR quality score (13) is below merge floor (50)
  • [H5] 11 HIGH-severity inline review findings need resolution (threshold: 3)
  • [H6] Code quality raw score (21) is below merge floor (40)

Comment on lines +143 to +148
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

Default values for openlineage_inject_parent_job_info and openlineage_inject_transport_info are evaluated once at module import time via conf.getboolean(...), so configuration changes after import are not reflected; use None as the default and resolve the config value inside __init__.

Also reported at: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py L183–L184

Suggested fix
        openlineage_inject_parent_job_info: bool | None = None,
        openlineage_inject_transport_info: bool | None = None,
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py
Lines: 143-148
Issue Type: functional-high
Severity: high

Issue Description:
Default values for `openlineage_inject_parent_job_info` and `openlineage_inject_transport_info` are evaluated once at module import time via `conf.getboolean(...)`, so configuration changes after import are not reflected; use `None` as the default and resolve the config value inside `__init__`.

_Also reported at: `providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py` L183–L184_

Current Code:
        openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),
        openlineage_inject_transport_info: bool = conf.getboolean(
            "openlineage", "spark_inject_transport_info", fallback=False
        ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +133 to +136
if nested_transport.kind == "http":
http_transports[nested_transport.name] = _get_transport_information(
nested_transport
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

When iterating composite transports, http_transports[nested_transport.name] is used without checking hasattr(nested_transport, 'name'), which will raise AttributeError if an http transport has no name attribute, unlike the else branch which guards with hasattr.

Suggested fix
                            if nested_transport.kind == "http":
                                transport_name = (
                                    nested_transport.name
                                    if hasattr(nested_transport, "name")
                                    else "http"
                                )
                                http_transports[transport_name] = _get_transport_information(
                                    nested_transport
                                )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 133-136
Issue Type: functional-high
Severity: high

Issue Description:
When iterating composite transports, `http_transports[nested_transport.name]` is used without checking `hasattr(nested_transport, 'name')`, which will raise `AttributeError` if an http transport has no `name` attribute, unlike the `else` branch which guards with `hasattr`.

Current Code:
                            if nested_transport.kind == "http":
                                http_transports[nested_transport.name] = _get_transport_information(
                                    nested_transport
                                )

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +62 to +64
"timeoutInMillis": str(
int(tp.timeout) * 1000 # convert to milliseconds, as required by Spark integration
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

The timeout conversion changed from int(tp.timeout * 1000) (multiply then truncate) to int(tp.timeout) * 1000 (truncate then multiply), producing incorrect millisecond values for fractional-second timeouts (e.g., 1.5s yields 1000ms instead of 1500ms).

Suggested fix
            "timeoutInMillis": str(
                int(tp.timeout * 1000)  # convert to milliseconds, as required by Spark integration
            ),
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/utils/spark.py
Lines: 62-64
Issue Type: functional-high
Severity: high

Issue Description:
The timeout conversion changed from `int(tp.timeout * 1000)` (multiply then truncate) to `int(tp.timeout) * 1000` (truncate then multiply), producing incorrect millisecond values for fractional-second timeouts (e.g., 1.5s yields 1000ms instead of 1500ms).

Current Code:
            "timeoutInMillis": str(
                int(tp.timeout) * 1000  # convert to milliseconds, as required by Spark integration
            ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

)
operator.execute(MagicMock())

assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

The assert on line 459 is asserting a non-empty string literal, which is always truthy and never actually checks that the log message appears in caplog.text, so the test will pass even if the log message is never emitted.

Suggested fix
            assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties." in caplog.text
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py
Lines: 459-459
Issue Type: functional-medium
Severity: medium

Issue Description:
The assert on line 459 is asserting a non-empty string literal, which is always truthy and never actually checks that the log message appears in caplog.text, so the test will pass even if the log message is never emitted.

Current Code:
            assert "OpenLineage transport type `console` does not support automatic injection of OpenLineage transport information into Spark properties."

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +101 to +104
"timeoutInMillis": str(
int(tp.timeout) * 1000
# convert to milliseconds, as required by Spark integration
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

In _get_transport_information, tp.timeout is cast with int() before multiplying, but if tp.timeout is None or not set, this raises a TypeError; add a guard or default value (e.g., int(tp.timeout or 0) * 1000).

Suggested fix
                            "timeoutInMillis": str(
                                int(tp.timeout) * 1000 if tp.timeout is not None else "0"
                                # convert to milliseconds, as required by Spark integration
                            ),
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py
Lines: 101-104
Issue Type: robustness-medium
Severity: medium

Issue Description:
In `_get_transport_information`, `tp.timeout` is cast with `int()` before multiplying, but if `tp.timeout` is `None` or not set, this raises a `TypeError`; add a guard or default value (e.g., `int(tp.timeout or 0) * 1000`).

Current Code:
                            "timeoutInMillis": str(
                                int(tp.timeout) * 1000
                                # convert to milliseconds, as required by Spark integration
                            ),

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 13

No critical security issues detected

Scan completed in 26.8s

Security scan powered by Codity.ai

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 19
Scanner npm audit
Top 10 Vulnerabilities (19 total found)

1. @babel/helpers various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


2. @babel/plugin-transform-modules-systemjs various

CVE: GHSA-fv7c-fp4j-7gwp
Severity: HIGH
Fixed in: True

@babel/plugin-transform-modules-systemjs generates arbitrary code when compiling malicious input


3. @babel/runtime various

CVE: GHSA-968p-4wvh-cqc8
Severity: MODERATE
Fixed in: True

Babel has inefficient RegExp complexity in generated code with .replace when transpiling named capturing groups


4. ajv various

CVE: GHSA-2g4f-4pwh-qvx6, GHSA-2g4f-4pwh-qvx6
Severity: MODERATE
Fixed in: True

ajv has ReDoS when using $data option


5. brace-expansion various

CVE: GHSA-v6h2-p8h4-qcjw, GHSA-f886-m6hf-6m8v
Severity: MODERATE
Fixed in: True

brace-expansion Regular Expression Denial of Service vulnerability


6. copy-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 14.0.0

Vulnerability in copy-webpack-plugin


7. css-minimizer-webpack-plugin various

CVE: N/A
Severity: HIGH
Fixed in: 8.0.0

Vulnerability in css-minimizer-webpack-plugin


8. fast-uri various

CVE: GHSA-q3j6-qgpj-74h6, GHSA-v39h-62p7-jpjc
Severity: HIGH
Fixed in: True

fast-uri vulnerable to path traversal via percent-encoded dot segments


9. flatted various

CVE: GHSA-25h7-pfq9-p65f, GHSA-rf6f-7fwh-wjgh
Severity: HIGH
Fixed in: True

flatted vulnerable to unbounded recursion DoS in parse() revive phase


10. js-yaml various

CVE: GHSA-mh29-5h37-fv8m, GHSA-mh29-5h37-fv8m
Severity: MODERATE
Fixed in: True

js-yaml has prototype pollution in merge (<<)


9 more vulnerabilities not shown. Update dependencies to fix these issues.

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Dependency vulnerability scanning

Metric Value
Vulnerabilities Found 4
Scanner pip-audit
View vulnerability details (4 items)

1. pip 24.0

CVE: GHSA-4xh5-x5gv-qwph
Fixed in: 25.3

When extracting a tar archive pip may not check symbolic links point into the extraction directory if the tarfile module doesn't implement PEP 706. Note that upgrading pip to a "fixed" version for thi


2. pip 24.0

CVE: GHSA-6vgw-5pg2-w6jp
Fixed in: 26.0

When pip is installing and extracting a maliciously crafted wheel archive, files may be extracted outside the installation directory. The path traversal is limited to prefixes of the installation dire


3. pip 24.0

CVE: GHSA-58qw-9mgm-455v
Fixed in:

pip handles concatenated tar and ZIP files as ZIP files regardless of filename or whether a file is both a tar and ZIP file. This behavior could result in confusing installation behavior, such as inst


4. pip 24.0

CVE: GHSA-jp4c-xjxw-mgf9
Fixed in: 26.1

pip prior to version 26.1 would run self-update check functionality after installing wheel files which required importing well-known Python modules names. These module imports were intentionally defer

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

License Compliance Scan

Metric Value
Packages Scanned 1157
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 1
Low Risk (Permissive) 1142
Unknown License 14

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 1 packages

EPL-2.0 (1 packages):

  • elkjs 0.9.3
Unknown Licenses - 14 packages
  • react-syntax-highlighter 15.5.6
  • @pandacss/is-valid-prop 0.41.0
  • jackspeak 3.4.3 (BlueOak-1.0.0)
  • package-json-from-dist 1.0.1 (BlueOak-1.0.0)
  • path-scurry 1.11.1 (BlueOak-1.0.0)
  • string-width-cjs 4.2.3
  • strip-ansi-cjs 6.0.1
  • wrap-ansi-cjs 7.0.0
  • openlineage-airflow 1.29.0
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • pytest 8.2,<9
  • requests 2.27.0,<3
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm
Copy link
Copy Markdown

codity-dm Bot commented May 16, 2026

Code Quality Report — test-org-codity/airflow1 · PR #1

Scanned: 2026-05-16 11:38 UTC | Score: 21/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 0
Medium 3
Low 139
Top Findings

[CQ-LLM-001] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:140 (Complexity · MEDIUM)

Issue: The constructor of SparkSubmitOperator has multiple parameters, which may lead to complexity in understanding and maintaining the code.
Suggestion: Consider using a configuration object or builder pattern to simplify the constructor.

openlineage_inject_parent_job_info: bool = conf.getboolean(
            "openlineage", "spark_inject_parent_job_info", fallback=False
        ),

[CQ-LLM-002] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:180 (Error_Handling · MEDIUM)

Issue: The code does not handle potential exceptions that may arise from injecting OpenLineage information into Spark properties.
Suggestion: Add try-except blocks around the injection logic to handle potential errors gracefully.

self.conf = inject_parent_job_information_into_spark_properties(self.conf, context)

[CQ-LLM-004] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:284 (Testability · MEDIUM)

Issue: The test cases are tightly coupled with the implementation details of SparkSubmitOperator, making them less maintainable.
Suggestion: Consider using dependency injection or mocking to decouple tests from the implementation.

operator = SparkSubmitOperator(
            task_id="spark_submit_job",
            spark_binary="sparky",
            dag=self.dag,
            openlineage_inject_parent_job_info=False,
            openlineage_inject_transport_info=True,
            **self._config,
        )

[CQ-LLM-003] providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:135 (Documentation · LOW)

Issue: Missing docstring for the constructor of SparkSubmitOperator, which is a public API.
Suggestion: Add a docstring to describe the parameters and their purpose.

def __init__(self, ...):

[CQ-007] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:290 (Documentation · LOW)

Issue: Public def 'test_inject_simple_openlineage_config_to_spark' missing docstring
Suggestion: Add a docstring describing purpose and parameters

def test_inject_simple_openlineage_config_to_spark(self, mock_get_openlineage_listener, mock_get_hook):

[CQ-009] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:299 (Style · LOW)

Issue: Line exceeds 120 characters (131 chars)
Suggestion: Break long lines into multiple lines for readability

        mock_get_openlineage_listener.return_value.adapter.get_or_create_openlineage_client.return_value.transport = Htt...

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:301 (Maintainability · LOW)

Issue: Magic number 5000 in code
Suggestion: Extract to a named constant

url="http://localhost:5000",

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:303 (Maintainability · LOW)

Issue: Magic number 5050 in code
Suggestion: Extract to a named constant

timeout=5050,

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:304 (Maintainability · LOW)

Issue: Magic number 12345 in code
Suggestion: Extract to a named constant

auth=ApiKeyTokenProvider({"api_key": "12345"}),

[CQ-008] providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py:322 (Maintainability · LOW)

Issue: Magic number 5000 in code
Suggestion: Extract to a named constant

"spark.openlineage.transport.url": "http://localhost:5000",

Per-File Breakdown

File Critical High Medium Low Total
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py 0 0 2 1 3
providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py 0 0 1 48 49
providers/common/compat/src/airflow/providers/common/compat/openlineage/utils/spark.py 0 0 0 64 64
providers/google/tests/unit/google/cloud/openlineage/test_utils.py 0 0 0 1 1
providers/google/tests/unit/google/cloud/operators/test_dataproc.py 0 0 0 14 14
providers/openlineage/src/airflow/providers/openlineage/utils/spark.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/plugins/test_macros.py 0 0 0 3 3
providers/openlineage/tests/unit/openlineage/utils/test_spark.py 0 0 0 5 5

Recommendations

  • Run automated tests after applying fixes to verify no regressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants