Skip to content

Fix manual state change not use fork#2

Open
DhirenMhatre wants to merge 4 commits into
mainfrom
fix-manual-state-change-not-use-fork
Open

Fix manual state change not use fork#2
DhirenMhatre wants to merge 4 commits into
mainfrom
fix-manual-state-change-not-use-fork

Conversation

@DhirenMhatre

Copy link
Copy Markdown

No description provided.

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
- Assigning isoformat() string to datetime-typed variable confused mypy
  now that the assignment is in the outer method scope (previously inside
  a nested closure where inference behaved differently).
- ti.operator is str | None; guard against None before .lower().

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 2 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

PR Summary

What Changed

  • Manual task state change emissions now use ProcessPoolExecutor instead of _fork_execute to avoid per-event process forking overhead.
  • Worker processes now reinitialize Stats via _executor_initializer to ensure metrics emit correctly.
  • Tests added to confirm _fork_execute is not used for manual state changes and to validate the new execution path.

Key Changes by Area

OpenLineage Listener: Replaced _fork_execute with submit_callable using ProcessPoolExecutor for manual state change events.
Metrics: Added _executor_initializer to reinitialize Stats in worker processes.

Files Changed

File Changes Summary
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Replaced _fork_execute with ProcessPoolExecutor for manual state changes; added _emit_manual_state_change_event helper and _executor_initializer for Stats reinitialization.
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py Added regression test to ensure _fork_execute is not called for manual state changes; added direct_submit_call helper to bypass pickling in tests.

Review Focus Areas

  • Verify _executor_initializer correctly reinitializes Stats in all worker processes.
  • Confirm _emit_manual_state_change_event is fully picklable and handles edge cases (e.g., missing context).
  • Check that direct_submit_call in tests accurately simulates the new execution path without masking pickling issues.

Architecture

Design Decisions: Using ProcessPoolExecutor avoids repeated fork() calls, which improves performance and avoids issues with non-fork-safe resources. The _executor_initializer pattern ensures Stats is properly set up in workers without relying on process inheritance. The direct_submit_call test helper is a temporary workaround for pickling limitations in test environments.

Scalability & Extensibility: This change improves scalability by reducing per-event process creation overhead. The ProcessPoolExecutor approach allows future reuse of worker processes across multiple events.

Risks: The _emit_manual_state_change_event function must remain picklable; any future state captured in closures or non-picklable objects could break this. This is intentional and acceptable as long as the helper stays simple. The test helper direct_submit_call bypasses real pickling, so it may miss issues that only appear in production—this risk is mitigated by the regression guard.

Merge Status

MERGEABLE — PR Score 71/100, above threshold (50). All gates passed.

@greptile-apps

greptile-apps Bot commented May 16, 2026

Copy link
Copy Markdown

Greptile Summary

This PR refactors _on_task_instance_manual_state_change to route through the existing ProcessPoolExecutor via a new module-level _emit_manual_state_change_event helper instead of _fork_execute, eliminating a fork-per-event. It also re-initializes Stats in _executor_initializer so metrics gauges work in worker processes.

  • The three dag-run hooks now use if self._executor is None: instead of if not self.executor: — the old form lazily created the pool as a side-effect; the new form is a real guard that fires on every call since nothing proactively initializes _executor, silently dropping all dag-run OL events.
  • _on_task_instance_manual_state_change drops _fork_execute in favour of submit_callable, but the _executor is None guard causes silent no-ops when called from the API server on Airflow 3.

Confidence Score: 2/5

The dag-run guard change silently disables all OpenLineage dag-run event emission on the scheduler, making the core functionality of the plugin non-operational after merge.

The change from if not self.executor: to if self._executor is None: in all three dag-run hooks eliminates the lazy pool creation that was the only mechanism for initializing _executor. Every dag-run event will be dropped on a fresh scheduler start, and existing tests should expose this regression.

providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py — the three if self._executor is None: guards replacing if not self.executor: in the dag-run hooks, and the missing executor initialization in on_starting.

Important Files Changed

Filename Overview
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Refactors _on_task_instance_manual_state_change to route through ProcessPoolExecutor instead of fork; also changes if not self.executor: to if self._executor is None: in three dag-run hooks, breaking lazy pool creation and silently dropping all dag-run events
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py Adds direct_submit_call stub, sets listener._executor = MagicMock() in three manual-state-change tests, and adds regression guards asserting _fork_execute is never called

Sequence Diagram

sequenceDiagram
    participant Sched as Scheduler
    participant OLL as OpenLineageListener
    participant PPE as ProcessPoolExecutor
    participant Worker as Worker Process
    participant Adapter as OpenLineageAdapter

    Note over Sched,OLL: Dag-run event path (broken by this PR)
    Sched->>OLL: on_dag_run_running(dag_run)
    OLL->>OLL: if self._executor is None return (pool never created!)

    Note over Sched,OLL: Manual state change scheduler path (new, working)
    Sched->>OLL: _on_task_instance_manual_state_change(ti, FAILED)
    OLL->>PPE: submit_callable(_emit_manual_state_change_event, adapter.fail_task, stats_key)
    PPE->>Worker: spawn with configure_orm + Stats.initialize
    Worker->>Adapter: "fail_task(**kwargs)"

    Note over Sched,OLL: Manual state change API server path (silently dropped)
    Sched->>OLL: _on_task_instance_manual_state_change(ti, SUCCESS)
    OLL->>OLL: _executor is None return (no event emitted)
Loading

Reviews (1): Last reviewed commit: "statsd fix" | Re-trigger Greptile

Comment on lines +882 to 884
if self._executor is None:
self.log.debug("Executor have not started before `on_dag_run_running`")
return

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Guard change breaks lazy pool initialization

Before this PR, the check was if not self.executor: (the property), which lazily created the ProcessPoolExecutor on its first call via the property's side-effect, so the guard was always False and never skipped. Replacing it with if self._executor is None: makes the guard real — but since nothing proactively creates the pool, _executor is always None when the scheduler processes its first dag run. All three dag-run hooks now return early and the pool is never created, silently dropping every dag-run OpenLineage event. The same regression applies at lines 934 and 986.

Comment on lines +714 to +717
try:
if self._executor is None:
self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
return

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Silent event drop on API-server path

_on_task_instance_manual_state_change is also invoked from the API server (Airflow 3 manual state changes via UI/API). On the API server, _executor is always None because no scheduler lifecycle hooks initialize the pool there. The new guard returns early and silently drops the OpenLineage event, whereas the previous _fork_execute-based path would have attempted emission. The docstring says "only reached on the scheduler" but the calling code explicitly describes the API-server case as a second trigger site.

Comment on lines +802 to +805
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness High

Catching BaseException instead of Exception swallows KeyboardInterrupt, SystemExit, and GeneratorExit, which can prevent clean process shutdown in the scheduler.

Suggested fix
        except Exception as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high

Issue Description:
Catching `BaseException` instead of `Exception` swallows `KeyboardInterrupt`, `SystemExit`, and `GeneratorExit`, which can prevent clean process shutdown in the scheduler.

Current Code:
        except BaseException as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 2

No critical security issues detected

Scan completed in 17.9s

Security scan powered by Codity.ai

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

License Compliance Scan

Metric Value
Packages Scanned 231
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 3
Low Risk (Permissive) 225
Unknown License 3

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 3 packages

EPL-2.0 (1 packages):

  • elkjs 0.11.1

MPL-2.0 (2 packages):

  • github.com/hashicorp/go-plugin 1.7.0
  • github.com/hashicorp/yamux 0.1.2
Unknown Licenses - 3 packages
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Code Quality Report — test-org-codity/airflow1 · PR #2

Scanned: 2026-05-16 11:52 UTC | Score: 61/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 1
Medium 4
Low 0
Top Findings

[CQ-LLM-001] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:37 (Complexity · HIGH)

Issue: The function _on_task_instance_manual_state_change has multiple nested try-except blocks and conditionals, increasing cyclomatic complexity.
Suggestion: Refactor the function to reduce nesting and complexity, possibly by extracting some logic into smaller helper functions.

try:
    if self._executor is None:
        self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
        return
    ...

[CQ-LLM-003] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:12 (Documentation · MEDIUM)

Issue: Missing docstring for the function _emit_manual_state_change_event, which is a public API.
Suggestion: Add a docstring to describe the purpose, parameters, and return value of the function.

def _emit_manual_state_change_event(adapter_method, stats_key, **kwargs):

[CQ-LLM-002] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:38 (Error_Handling · MEDIUM)

Issue: Swallowed exceptions in the import statement for stats_utils without any logging or handling.
Suggestion: Consider logging the exception or handling it in a way that informs the user of the failure.

except ImportError:
    pass

[CQ-LLM-004] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:38 (Maintainability · MEDIUM)

Issue: Use of magic strings like 'NoStatsLogger' and 'unknown' without explanation.
Suggestion: Define these strings as constants at the top of the file for better maintainability.

self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")

[CQ-LLM-005] providers/openlineage/tests/unit/openlineage/plugins/test_listener.py:86 (Testability · MEDIUM)

Issue: The function direct_submit_call has hard-coded dependencies on specific implementation details of _emit_manual_state_change_event.
Suggestion: Consider using dependency injection to make the function more testable and decoupled from specific implementations.

if callable is _emit_manual_state_change_event:

Per-File Breakdown

File Critical High Medium Low Total
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 0 1 3 0 4
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 0 0 1 0 1

Recommendations

  1. Resolve High severity issues, especially error handling gaps and performance bottlenecks.
  • Run automated tests after applying fixes to verify no regressions.

@DhirenMhatre

Copy link
Copy Markdown
Author

@codity review

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 2 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

PR Summary

What Changed

  • Manual task state change emissions now use ProcessPoolExecutor instead of _fork_execute to avoid per-event process forking overhead.
  • Worker processes now reinitialize Stats via _executor_initializer to ensure metrics emit correctly.
  • Tests were updated to verify _fork_execute is not used for manual state changes and to test the new execution path.

Key Changes by Area

OpenLineage Listener: Replaced _fork_execute with submit_callable using ProcessPoolExecutor for manual state change events.
Stats Initialization: Added _executor_initializer to reset Stats in worker processes, fixing metric emission issues.

Files Changed

File Changes Summary
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Replaced _fork_execute with ProcessPoolExecutor for manual state changes; added _emit_manual_state_change_event helper and _executor_initializer for Stats reinitialization.
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py Added regression test to confirm _fork_execute is not called for manual state changes; added direct_submit_call helper to bypass pickling in tests.

Review Focus Areas

  • Verify _executor_initializer correctly resets Stats without side effects across workers.
  • Confirm _emit_manual_state_change_event is fully picklable and handles edge cases (e.g., None inputs).
  • Check that direct_submit_call in tests accurately simulates the new execution path.

Architecture

Design Decisions: Using ProcessPoolExecutor avoids repeated fork() calls, which improves performance and avoids issues with shared state. The _executor_initializer pattern ensures workers start with clean Stats state, matching the original intent of _fork_execute but in a reusable way.

Scalability & Extensibility: The change supports scaling manual state changes across more workers without per-event overhead. The helper function design keeps the execution path modular but does not yet support dynamic worker pool resizing.

Risks: The _emit_manual_state_change_event function must remain picklable; any future state captured in closures could break this. This is intentional for now but needs monitoring.

Merge Status

MERGEABLE — PR Score 61/100, above threshold (50). All gates passed.

Comment on lines +802 to +805
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness High

Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to avoid masking process-termination signals.

Suggested fix
        except Exception as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high

Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to avoid masking process-termination signals.

Current Code:
        except BaseException as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +714 to +726
try:
if self._executor is None:
self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
return

if ti_state == TaskInstanceState.FAILED:
adapter_method = self.adapter.fail_task
event_type = RunState.FAIL.value.lower()
elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
adapter_method = self.adapter.complete_task
event_type = RunState.COMPLETE.value.lower()
else:
raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

The try/except block wrapping the executor-null check and the submit_callable call also catches the ValueError raised for unsupported ti_state values and logs it as a warning instead of propagating it; callers will silently receive no event and no error signal for genuinely unsupported states.

Suggested fix
        # Guard: executor must be running
        if self._executor is None:
            self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
            return

        if ti_state == TaskInstanceState.FAILED:
            adapter_method = self.adapter.fail_task
            event_type = RunState.FAIL.value.lower()
        elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
            adapter_method = self.adapter.complete_task
            event_type = RunState.COMPLETE.value.lower()
        else:
            raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

        try:
            # ... rest of the extraction and submit_callable call ...
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 714-726
Issue Type: functional-medium
Severity: medium

Issue Description:
The `try/except` block wrapping the executor-null check and the `submit_callable` call also catches the `ValueError` raised for unsupported `ti_state` values and logs it as a warning instead of propagating it; callers will silently receive no event and no error signal for genuinely unsupported states.

Current Code:
        try:
            if self._executor is None:
                ...
            if ti_state == TaskInstanceState.FAILED:
                ...
            elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
                ...
            else:
                raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +1537 to +1538
# Regression guard: manual state-change emission must not go through _fork_execute.
mock_fork_execute.assert_not_called()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

assert mock_emit.assert_called_once always evaluates to True because assert_called_once is a method object (truthy), not a call; replace with mock_emit.assert_called_once() (with parentheses) to actually assert the mock was called exactly once.

Also reported at: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py L1726–L1727, L1944–L1945

Suggested change
# Regression guard: manual state-change emission must not go through _fork_execute.
mock_fork_execute.assert_not_called()
mock_emit.assert_called_once()
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
Lines: 1537-1538
Issue Type: functional-medium
Severity: medium

Issue Description:
`assert mock_emit.assert_called_once` always evaluates to `True` because `assert_called_once` is a method object (truthy), not a call; replace with `mock_emit.assert_called_once()` (with parentheses) to actually assert the mock was called exactly once.

_Also reported at: `providers/openlineage/tests/unit/openlineage/plugins/test_listener.py` L1726–L1727, L1944–L1945_

Current Code:
        assert mock_emit.assert_called_once

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 2

No critical security issues detected

Scan completed in 16.4s

Security scan powered by Codity.ai

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

License Compliance Scan

Metric Value
Packages Scanned 231
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 3
Low Risk (Permissive) 225
Unknown License 3

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 3 packages

EPL-2.0 (1 packages):

  • elkjs 0.11.1

MPL-2.0 (2 packages):

  • github.com/hashicorp/go-plugin 1.7.0
  • github.com/hashicorp/yamux 0.1.2
Unknown Licenses - 3 packages
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Code Quality Report — test-org-codity/airflow1 · PR #2

Scanned: 2026-05-16 12:22 UTC | Score: 61/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 1
Medium 4
Low 0
Top Findings

[CQ-LLM-001] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:37 (Complexity · HIGH)

Issue: The function _on_task_instance_manual_state_change has deep nesting due to multiple try-except blocks and if-else conditions.
Suggestion: Consider refactoring the function to reduce nesting and improve readability.

try:
    if self._executor is None:
        self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
        return

    if ti_state == TaskInstanceState.FAILED:
        adapter_method = self.adapter.fail_task
        event_type = RunState.FAIL.value.lower()
    elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
        adapter_method = self.adapter.complete_task
        event_type = RunState.COMPLETE.value.lower()
    else:
        raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

[CQ-LLM-003] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:0 (Documentation · MEDIUM)

Issue: Missing docstring for the function _emit_manual_state_change_event which is a public API.
Suggestion: Add a docstring to describe the function's purpose, parameters, and return value.

def _emit_manual_state_change_event(adapter_method, stats_key, **kwargs):

[CQ-LLM-004] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:0 (Maintainability · MEDIUM)

Issue: Use of magic strings like 'unknown' in the operator_name assignment can lead to maintenance issues.
Suggestion: Define constants for such strings to improve maintainability.

operator_name = (ti.operator or "unknown").lower()

[CQ-LLM-002] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:37 (Error_Handling · MEDIUM)

Issue: Swallowed exceptions in the import statement for stats_utils may lead to silent failures without logging.
Suggestion: Log the exception or handle it in a way that informs the user of the failure.

except ImportError:
    # ``stats_utils`` lives under ``airflow.observability.metrics`` in current Airflow; if the
    # import path changes or is unavailable, fall through silently — gauge calls will simply
    # land on ``NoStatsLogger`` as before, which is no worse than current behavior.
    pass

[CQ-LLM-005] providers/openlineage/tests/unit/openlineage/plugins/test_listener.py:0 (Testability · MEDIUM)

Issue: The direct_submit_call function introduces a hard-coded dependency on _emit_manual_state_change_event, reducing testability.
Suggestion: Consider using dependency injection to pass the callable instead of hard-coding it.

if callable is _emit_manual_state_change_event:

Per-File Breakdown

File Critical High Medium Low Total
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 0 1 3 0 4
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 0 0 1 0 1

Recommendations

  1. Resolve High severity issues, especially error handling gaps and performance bottlenecks.
  • Run automated tests after applying fixes to verify no regressions.

@DhirenMhatre

Copy link
Copy Markdown
Author

@codity review

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 2 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

PR Summary

What Changed

  • Manual task state changes now use ProcessPoolExecutor instead of _fork_execute to avoid per-event forking and ensure proper ORM and Stats initialization in workers.
  • A new picklable helper _emit_manual_state_change_event was added to support execution via the executor.
  • Tests were updated to mock _fork_execute and submit_callable, and to assert _fork_execute is not called for manual state changes.

Key Changes by Area

State Change Emission: Manual state changes now route through submit_callableProcessPoolExecutor instead of _fork_execute.
Executor Initialization: _executor_initializer now reinitializes Stats to ensure metrics emit correctly in worker processes.
Testing: Added mocks and a direct_submit_call helper to enable synchronous test execution and regression prevention.

Files Changed

File Changes Summary
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Replaced _fork_execute with ProcessPoolExecutor for manual state changes; added _emit_manual_state_change_event; updated _executor_initializer to reinit Stats; fixed getattr to use default None for ti.task at lines 693 and 763
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py Added mocks for _fork_execute and submit_callable; added direct_submit_call helper; added assertions to verify _fork_execute is not called for manual state changes

Review Focus Areas

  • getattr(ti, "task") usage at lines 693 and 763: ensure None default is used to match surrounding if task: guard.
  • Correctness of ProcessPoolExecutor initialization and pickling of _emit_manual_state_change_event.
  • Test coverage for manual state change paths, especially regression guard against _fork_execute.

Architecture

Design Decisions: Using ProcessPoolExecutor avoids per-event forking overhead and ensures consistent ORM and Stats setup in workers. The _emit_manual_state_change_event helper is picklable to support cross-process execution. The getattr fix aligns with defensive patterns used elsewhere in the code.

Scalability & Extensibility: ProcessPoolExecutor improves reuse across manual state change events. The initializer pattern supports future extensions needing per-process setup.

Risks: The getattr omission (now fixed) could cause AttributeError if ti.task is missing. This was unintentional and should be reviewed. No other architectural risks are present.

Merge Status

NOT MERGEABLE — PR Score 53/100, below threshold (50)

  • [H5] 3 HIGH-severity inline review findings need resolution (threshold: 3)

Comment on lines +802 to +805
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness High

Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to let process-termination signals propagate normally.

Suggested fix
        except Exception as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high

Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to let process-termination signals propagate normally.

Current Code:
        except BaseException as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +714 to +726
try:
if self._executor is None:
self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
return

if ti_state == TaskInstanceState.FAILED:
adapter_method = self.adapter.fail_task
event_type = RunState.FAIL.value.lower()
elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
adapter_method = self.adapter.complete_task
event_type = RunState.COMPLETE.value.lower()
else:
raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

The try/except block wraps the self._executor is None early-return check, so a ValueError raised for an unsupported ti_state is silently swallowed and logged as a warning instead of propagating; the caller receives no signal that the state was unhandled.

Suggested fix
        if self._executor is None:
            self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
            return

        if ti_state == TaskInstanceState.FAILED:
            adapter_method = self.adapter.fail_task
            event_type = RunState.FAIL.value.lower()
        elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
            adapter_method = self.adapter.complete_task
            event_type = RunState.COMPLETE.value.lower()
        else:
            raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

        try:
            # ... rest of the extraction and submission ...
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 714-726
Issue Type: functional-medium
Severity: medium

Issue Description:
The `try/except` block wraps the `self._executor is None` early-return check, so a `ValueError` raised for an unsupported `ti_state` is silently swallowed and logged as a warning instead of propagating; the caller receives no signal that the state was unhandled.

Current Code:
        try:
            if self._executor is None:
                ...
            ...
            else:
                raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Nitpicks (Low Priority)

Found 1 low-priority suggestions for code improvement

Click to expand nitpicks

providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py (line 693)

Robustness Low

getattr(ti, "task") without a default raises AttributeError when the attribute is absent; use getattr(ti, "task", None) to match the intent expressed in the surrounding if task: guard.

Also reported at: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py L763

Code Suggestion or Comments
task = getattr(ti, "task", None)  # on scheduler, we should have access to task
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 693-693
Issue Type: robustness-low
Severity: low

Issue Description:
`getattr(ti, "task")` without a default raises `AttributeError` when the attribute is absent; use `getattr(ti, "task", None)` to match the intent expressed in the surrounding `if task:` guard.

_Also reported at: `providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py` L763_

Current Code:
        task = getattr(ti, "task")  # on scheduler, we should have access to task

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---



Like Dislike

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 2

No critical security issues detected

Scan completed in 19.4s

Security scan powered by Codity.ai

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

License Compliance Scan

Metric Value
Packages Scanned 231
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 3
Low Risk (Permissive) 225
Unknown License 3

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 3 packages

EPL-2.0 (1 packages):

  • elkjs 0.11.1

MPL-2.0 (2 packages):

  • github.com/hashicorp/go-plugin 1.7.0
  • github.com/hashicorp/yamux 0.1.2
Unknown Licenses - 3 packages
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@DhirenMhatre

Copy link
Copy Markdown
Author

@codity review

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Hold up! This PR is already under review, no double-booking allowed 😄

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Code Quality Report — test-org-codity/airflow1 · PR #2

Scanned: 2026-05-16 12:40 UTC | Score: 61/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 1
Medium 4
Low 0
Top Findings

[CQ-LLM-001] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:37 (Complexity · HIGH)

Issue: The function _on_task_instance_manual_state_change has deep nesting due to multiple try-except blocks and if-else conditions.
Suggestion: Consider refactoring the function to reduce nesting and improve readability.

if self._executor is None:
    self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
    return

if ti_state == TaskInstanceState.FAILED:
    adapter_method = self.adapter.fail_task
    event_type = RunState.FAIL.value.lower()
elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
    adapter_method = self.adapter.complete_task
    event_type = RunState.COMPLETE.value.lower()
else:
    raise ValueError(f"Unsupported ti_state: `{ti_state}`.")

[CQ-LLM-003] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:12 (Documentation · MEDIUM)

Issue: Missing docstring for the function _emit_manual_state_change_event.
Suggestion: Add a docstring to describe the function's purpose, parameters, and return value.

def _emit_manual_state_change_event(adapter_method, stats_key, **kwargs):

[CQ-LLM-002] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:38 (Error_Handling · MEDIUM)

Issue: Swallowed exceptions in the import statement for stats_utils without any logging or handling.
Suggestion: Log the exception or handle it appropriately to avoid silent failures.

except ImportError:
    pass

[CQ-LLM-004] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:38 (Maintainability · MEDIUM)

Issue: Use of magic strings for event types (e.g., RunState.FAIL.value.lower()).
Suggestion: Consider defining constants for event types to improve maintainability.

event_type = RunState.FAIL.value.lower()

[CQ-LLM-005] providers/openlineage/tests/unit/openlineage/plugins/test_listener.py:86 (Testability · MEDIUM)

Issue: Directly calling _emit_manual_state_change_event in tests can lead to tight coupling and reduced test isolation.
Suggestion: Consider using dependency injection or mocking to isolate tests from implementation details.

if callable is _emit_manual_state_change_event:

Per-File Breakdown

File Critical High Medium Low Total
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 0 1 3 0 4
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 0 0 1 0 1

Recommendations

  1. Resolve High severity issues, especially error handling gaps and performance bottlenecks.
  • Run automated tests after applying fixes to verify no regressions.

@DhirenMhatre

Copy link
Copy Markdown
Author

@codity review

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 2 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

PR Summary

What Changed

  • Manual task state change emissions now use ProcessPoolExecutor instead of _fork_execute to avoid per-event process forking overhead.
  • Worker processes now reinitialize Stats via _executor_initializer to ensure metrics emit correctly.
  • Tests were updated to verify the new execution path and prevent accidental use of _fork_execute.

Key Changes by Area

OpenLineage Listener: Replaced _fork_execute with submit_callable using ProcessPoolExecutor for manual state change events.
Metrics: Added _executor_initializer to reinitialize Stats in worker processes.

Files Changed

File Changes Summary
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Replaced _fork_execute with ProcessPoolExecutor for manual state changes; added _emit_manual_state_change_event helper and _executor_initializer for Stats reinitialization.
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py Updated mocks to use submit_callable; added guards to confirm _fork_execute is not called; added test for new execution path.

Review Focus Areas

  • Verify _executor_initializer correctly resets Stats without side effects across workers.
  • Confirm _emit_manual_state_change_event is fully picklable and works in all test scenarios.

Architecture

Design Decisions: Using ProcessPoolExecutor avoids repeated fork() calls, which improves performance and avoids issues with uninitialized state in workers. The synchronous helper _emit_manual_state_change_event keeps the API simple while enabling pickling.
Risks: The new approach assumes ProcessPoolExecutor is available and behaves consistently across environments. If Stats initialization has hidden dependencies, they may surface in workers. This is acceptable for now but worth monitoring.

Comment on lines +802 to +805
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness High

Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to avoid masking process-termination signals.

Suggested fix
        except Exception as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high

Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to avoid masking process-termination signals.

Current Code:
        except BaseException as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 2

No critical security issues detected

Scan completed in 18.2s

Security scan powered by Codity.ai

@DhirenMhatre

Copy link
Copy Markdown
Author

@codity review

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Policy Check Failed

✗ 3/3 policy checks failed:

• Need 2 more approval(s) (0/2) — comment LGTM or approve via review
• Missing ticket reference (expected: JIRA-, ENG-, #*)
• 2 code file(s) changed but no test files added


To merge this PR:

  1. Address the failed checks listed above
  2. Ensure branch protection requires the codity/policy-check status

Configure policies in your dashboard

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

PR Summary

What Changed

  • Manual task state change emissions now use ProcessPoolExecutor instead of _fork_execute to avoid per-event forking and ensure proper initialization in workers.
  • The _executor_initializer now reinitializes Stats so metrics emit correctly in worker processes.
  • Tests were updated to verify _fork_execute is no longer called and submit_callable is used instead.

Key Changes by Area

State Change Emission: Manual state changes now route through submit_callableProcessPoolExecutor instead of _fork_execute.
Worker Initialization: _executor_initializer reinitializes Stats to ensure metrics work in worker processes.
Testing: Added mocks and assertions to confirm the new emission path and prevent regression.

Files Changed

File Changes Summary
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py Replaced _fork_execute with submit_callableProcessPoolExecutor for manual state changes; updated _executor_initializer to reinitialize Stats.
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py Added mocks and assertions to verify _fork_execute is not called and submit_callable is used.

Review Focus Areas

  • Check that Stats reinitialization in _executor_initializer works correctly across worker restarts.
  • Verify submit_callable handles all manual state change scenarios without side effects.
  • Confirm test mocks accurately reflect the new execution path and cover edge cases.

Architecture

Design Decisions: Using ProcessPoolExecutor avoids repeated forking overhead and ensures consistent ORM and Stats setup. The _emit_manual_state_change_event helper is picklable to support multiprocessing. This deviates from _fork_execute intentionally to improve reliability in worker processes.

Scalability & Extensibility: ProcessPoolExecutor allows reuse of worker processes across multiple state changes, reducing per-event overhead. The change does not affect extensibility of other emission paths.

Risks: The reliance on picklable helpers (_emit_manual_state_change_event) could break if dependencies change. This is acceptable if the helper remains simple and stable. Test coverage must be maintained to catch such issues early.

Merge Status

NOT MERGEABLE — PR Score 39/100, below threshold (50)

  • [H4] PR quality score (39) is below merge floor (50)
  • [H5] 5 HIGH-severity inline review findings need resolution (threshold: 3)

Comment on lines +802 to +805
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness High

Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to avoid masking process-termination signals.

Suggested fix
        except Exception as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high

Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to avoid masking process-termination signals.

Current Code:
        except BaseException as e:
            self.log.warning(
                "OpenLineage received exception in method `_on_task_instance_manual_state_change`",
                exc_info=e,
            )

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Comment on lines +1537 to +1538
# Regression guard: manual state-change emission must not go through _fork_execute.
mock_fork_execute.assert_not_called()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional Medium

assert mock_emit.assert_called_once always evaluates to True because assert_called_once is a method object (truthy), not a call; replace with mock_emit.assert_called_once() (with parentheses) to actually assert the mock was called exactly once.

Also reported at: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py L1726–L1727, L1944–L1945

Suggested change
# Regression guard: manual state-change emission must not go through _fork_execute.
mock_fork_execute.assert_not_called()
mock_emit.assert_called_once()
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert python developer with deep knowledge of security, performance, and best practices.

### Context

File: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
Lines: 1537-1538
Issue Type: functional-medium
Severity: medium

Issue Description:
`assert mock_emit.assert_called_once` always evaluates to `True` because `assert_called_once` is a method object (truthy), not a call; replace with `mock_emit.assert_called_once()` (with parentheses) to actually assert the mock was called exactly once.

_Also reported at: `providers/openlineage/tests/unit/openlineage/plugins/test_listener.py` L1726–L1727, L1944–L1945_

Current Code:
        assert mock_emit.assert_called_once

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Security Scan Summary

Metric Value
Vulnerabilities Critical: 0
Overall Risk Clean
Files Scanned 2

No critical security issues detected

Scan completed in 24.9s

Security scan powered by Codity.ai

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

License Compliance Scan

Metric Value
Packages Scanned 231
High Risk (Strong Copyleft) 0
Medium Risk (Weak Copyleft) 3
Low Risk (Permissive) 225
Unknown License 3

Weak copyleft licenses found - verify compatibility

Some packages have unknown licenses - manual review required

Medium Risk Licenses - 3 packages

EPL-2.0 (1 packages):

  • elkjs 0.11.1

MPL-2.0 (2 packages):

  • github.com/hashicorp/go-plugin 1.7.0
  • github.com/hashicorp/yamux 0.1.2
Unknown Licenses - 3 packages
  • {% if params and params.environ and params.environ 'templated_unit_test' %}
  • python-dateutil 2.8.1 # including inline comments
  • com.google.cloud:google-cloud-pubsub ${google.cloud.version}

Powered by Codity.ai · Docs

@codity-dm

codity-dm Bot commented May 16, 2026

Copy link
Copy Markdown

Code Quality Report — test-org-codity/airflow1 · PR #2

Scanned: 2026-05-16 13:02 UTC | Score: 61/100 | Provider: github

Executive Summary

Severity Count
Critical 0
High 1
Medium 4
Low 0
Top Findings

[CQ-LLM-001] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:37 (Complexity · HIGH)

Issue: The function _on_task_instance_manual_state_change has multiple nested try-except blocks and conditionals, increasing cyclomatic complexity.
Suggestion: Refactor the function to reduce nesting and complexity, possibly by extracting some logic into smaller helper functions.

try:
    if self._executor is None:
        self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
        return
    ...

[CQ-LLM-003] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:11 (Documentation · MEDIUM)

Issue: Missing docstring for the function _emit_manual_state_change_event, which is a public API.
Suggestion: Add a docstring that describes the function's purpose, parameters, and return value.

def _emit_manual_state_change_event(adapter_method, stats_key, **kwargs):

[CQ-LLM-002] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:38 (Error_Handling · MEDIUM)

Issue: Swallowed exceptions in the import statement for stats_utils without any logging or handling.
Suggestion: Consider logging the exception or handling it in a way that informs the user of the failure.

except ImportError:
    pass

[CQ-LLM-004] providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:690 (Maintainability · MEDIUM)

Issue: Use of magic strings for event types (e.g., RunState.FAIL.value.lower()) can lead to maintenance issues.
Suggestion: Define constants for event types to improve readability and maintainability.

event_type = RunState.FAIL.value.lower()

[CQ-LLM-005] providers/openlineage/tests/unit/openlineage/plugins/test_listener.py:85 (Testability · MEDIUM)

Issue: The direct_submit_call function introduces a hard-coded dependency on _emit_manual_state_change_event, making it less flexible for testing.
Suggestion: Consider using dependency injection to pass the callable instead of hard-coding it.

if callable is _emit_manual_state_change_event:

Per-File Breakdown

File Critical High Medium Low Total
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 0 1 3 0 4
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 0 0 1 0 1

Recommendations

  1. Resolve High severity issues, especially error handling gaps and performance bottlenecks.
  • Run automated tests after applying fixes to verify no regressions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants