Fix manual state change not use fork#2
Conversation
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
- Assigning isoformat() string to datetime-typed variable confused mypy now that the assignment is in the outer method scope (previously inside a nested closure where inference behaved differently). - ti.operator is str | None; guard against None before .lower(). Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com>
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaOpenLineage Listener: Replaced Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Using Scalability & Extensibility: This change improves scalability by reducing per-event process creation overhead. The Risks: The Merge StatusMERGEABLE — PR Score 71/100, above threshold (50). All gates passed. |
Greptile SummaryThis PR refactors
Confidence Score: 2/5The dag-run guard change silently disables all OpenLineage dag-run event emission on the scheduler, making the core functionality of the plugin non-operational after merge. The change from providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py — the three Important Files Changed
Sequence DiagramsequenceDiagram
participant Sched as Scheduler
participant OLL as OpenLineageListener
participant PPE as ProcessPoolExecutor
participant Worker as Worker Process
participant Adapter as OpenLineageAdapter
Note over Sched,OLL: Dag-run event path (broken by this PR)
Sched->>OLL: on_dag_run_running(dag_run)
OLL->>OLL: if self._executor is None return (pool never created!)
Note over Sched,OLL: Manual state change scheduler path (new, working)
Sched->>OLL: _on_task_instance_manual_state_change(ti, FAILED)
OLL->>PPE: submit_callable(_emit_manual_state_change_event, adapter.fail_task, stats_key)
PPE->>Worker: spawn with configure_orm + Stats.initialize
Worker->>Adapter: "fail_task(**kwargs)"
Note over Sched,OLL: Manual state change API server path (silently dropped)
Sched->>OLL: _on_task_instance_manual_state_change(ti, SUCCESS)
OLL->>OLL: _executor is None return (no event emitted)
Reviews (1): Last reviewed commit: "statsd fix" | Re-trigger Greptile |
| if self._executor is None: | ||
| self.log.debug("Executor have not started before `on_dag_run_running`") | ||
| return |
There was a problem hiding this comment.
Guard change breaks lazy pool initialization
Before this PR, the check was if not self.executor: (the property), which lazily created the ProcessPoolExecutor on its first call via the property's side-effect, so the guard was always False and never skipped. Replacing it with if self._executor is None: makes the guard real — but since nothing proactively creates the pool, _executor is always None when the scheduler processes its first dag run. All three dag-run hooks now return early and the pool is never created, silently dropping every dag-run OpenLineage event. The same regression applies at lines 934 and 986.
| try: | ||
| if self._executor is None: | ||
| self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`") | ||
| return |
There was a problem hiding this comment.
Silent event drop on API-server path
_on_task_instance_manual_state_change is also invoked from the API server (Airflow 3 manual state changes via UI/API). On the API server, _executor is always None because no scheduler lifecycle hooks initialize the pool there. The new guard returns early and silently drops the OpenLineage event, whereas the previous _fork_execute-based path would have attempted emission. The docstring says "only reached on the scheduler" but the calling code explicitly describes the API-server case as a second trigger site.
| except BaseException as e: | ||
| self.log.warning( | ||
| "OpenLineage received exception in method `_on_task_instance_manual_state_change`", | ||
| exc_info=e, |
There was a problem hiding this comment.
Catching BaseException instead of Exception swallows KeyboardInterrupt, SystemExit, and GeneratorExit, which can prevent clean process shutdown in the scheduler.
Suggested fix
except Exception as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high
Issue Description:
Catching `BaseException` instead of `Exception` swallows `KeyboardInterrupt`, `SystemExit`, and `GeneratorExit`, which can prevent clean process shutdown in the scheduler.
Current Code:
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 17.9sSecurity scan powered by Codity.ai |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 3 packagesEPL-2.0 (1 packages):
MPL-2.0 (2 packages):
Unknown Licenses - 3 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #2Scanned: 2026-05-16 11:52 UTC | Score: 61/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-001]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py |
0 | 1 | 3 | 0 | 4 |
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py |
0 | 0 | 1 | 0 | 1 |
Recommendations
- Resolve High severity issues, especially error handling gaps and performance bottlenecks.
- Run automated tests after applying fixes to verify no regressions.
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaOpenLineage Listener: Replaced Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Using Scalability & Extensibility: The change supports scaling manual state changes across more workers without per-event overhead. The helper function design keeps the execution path modular but does not yet support dynamic worker pool resizing. Risks: The Merge StatusMERGEABLE — PR Score 61/100, above threshold (50). All gates passed. |
| except BaseException as e: | ||
| self.log.warning( | ||
| "OpenLineage received exception in method `_on_task_instance_manual_state_change`", | ||
| exc_info=e, |
There was a problem hiding this comment.
Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to avoid masking process-termination signals.
Suggested fix
except Exception as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high
Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to avoid masking process-termination signals.
Current Code:
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| try: | ||
| if self._executor is None: | ||
| self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`") | ||
| return | ||
|
|
||
| if ti_state == TaskInstanceState.FAILED: | ||
| adapter_method = self.adapter.fail_task | ||
| event_type = RunState.FAIL.value.lower() | ||
| elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED): | ||
| adapter_method = self.adapter.complete_task | ||
| event_type = RunState.COMPLETE.value.lower() | ||
| else: | ||
| raise ValueError(f"Unsupported ti_state: `{ti_state}`.") |
There was a problem hiding this comment.
The try/except block wrapping the executor-null check and the submit_callable call also catches the ValueError raised for unsupported ti_state values and logs it as a warning instead of propagating it; callers will silently receive no event and no error signal for genuinely unsupported states.
Suggested fix
# Guard: executor must be running
if self._executor is None:
self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
return
if ti_state == TaskInstanceState.FAILED:
adapter_method = self.adapter.fail_task
event_type = RunState.FAIL.value.lower()
elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
adapter_method = self.adapter.complete_task
event_type = RunState.COMPLETE.value.lower()
else:
raise ValueError(f"Unsupported ti_state: `{ti_state}`.")
try:
# ... rest of the extraction and submit_callable call ...Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 714-726
Issue Type: functional-medium
Severity: medium
Issue Description:
The `try/except` block wrapping the executor-null check and the `submit_callable` call also catches the `ValueError` raised for unsupported `ti_state` values and logs it as a warning instead of propagating it; callers will silently receive no event and no error signal for genuinely unsupported states.
Current Code:
try:
if self._executor is None:
...
if ti_state == TaskInstanceState.FAILED:
...
elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
...
else:
raise ValueError(f"Unsupported ti_state: `{ti_state}`.")
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| # Regression guard: manual state-change emission must not go through _fork_execute. | ||
| mock_fork_execute.assert_not_called() |
There was a problem hiding this comment.
assert mock_emit.assert_called_once always evaluates to True because assert_called_once is a method object (truthy), not a call; replace with mock_emit.assert_called_once() (with parentheses) to actually assert the mock was called exactly once.
Also reported at: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py L1726–L1727, L1944–L1945
| # Regression guard: manual state-change emission must not go through _fork_execute. | |
| mock_fork_execute.assert_not_called() | |
| mock_emit.assert_called_once() |
Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
Lines: 1537-1538
Issue Type: functional-medium
Severity: medium
Issue Description:
`assert mock_emit.assert_called_once` always evaluates to `True` because `assert_called_once` is a method object (truthy), not a call; replace with `mock_emit.assert_called_once()` (with parentheses) to actually assert the mock was called exactly once.
_Also reported at: `providers/openlineage/tests/unit/openlineage/plugins/test_listener.py` L1726–L1727, L1944–L1945_
Current Code:
assert mock_emit.assert_called_once
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 16.4sSecurity scan powered by Codity.ai |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 3 packagesEPL-2.0 (1 packages):
MPL-2.0 (2 packages):
Unknown Licenses - 3 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #2Scanned: 2026-05-16 12:22 UTC | Score: 61/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-001]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py |
0 | 1 | 3 | 0 | 4 |
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py |
0 | 0 | 1 | 0 | 1 |
Recommendations
- Resolve High severity issues, especially error handling gaps and performance bottlenecks.
- Run automated tests after applying fixes to verify no regressions.
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaState Change Emission: Manual state changes now route through Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Using Scalability & Extensibility: Risks: The Merge StatusNOT MERGEABLE — PR Score 53/100, below threshold (50)
|
| except BaseException as e: | ||
| self.log.warning( | ||
| "OpenLineage received exception in method `_on_task_instance_manual_state_change`", | ||
| exc_info=e, |
There was a problem hiding this comment.
Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to let process-termination signals propagate normally.
Suggested fix
except Exception as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high
Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to let process-termination signals propagate normally.
Current Code:
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| try: | ||
| if self._executor is None: | ||
| self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`") | ||
| return | ||
|
|
||
| if ti_state == TaskInstanceState.FAILED: | ||
| adapter_method = self.adapter.fail_task | ||
| event_type = RunState.FAIL.value.lower() | ||
| elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED): | ||
| adapter_method = self.adapter.complete_task | ||
| event_type = RunState.COMPLETE.value.lower() | ||
| else: | ||
| raise ValueError(f"Unsupported ti_state: `{ti_state}`.") |
There was a problem hiding this comment.
The try/except block wraps the self._executor is None early-return check, so a ValueError raised for an unsupported ti_state is silently swallowed and logged as a warning instead of propagating; the caller receives no signal that the state was unhandled.
Suggested fix
if self._executor is None:
self.log.debug("Executor has not started before `_on_task_instance_manual_state_change`")
return
if ti_state == TaskInstanceState.FAILED:
adapter_method = self.adapter.fail_task
event_type = RunState.FAIL.value.lower()
elif ti_state in (TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED):
adapter_method = self.adapter.complete_task
event_type = RunState.COMPLETE.value.lower()
else:
raise ValueError(f"Unsupported ti_state: `{ti_state}`.")
try:
# ... rest of the extraction and submission ...Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 714-726
Issue Type: functional-medium
Severity: medium
Issue Description:
The `try/except` block wraps the `self._executor is None` early-return check, so a `ValueError` raised for an unsupported `ti_state` is silently swallowed and logged as a warning instead of propagating; the caller receives no signal that the state was unhandled.
Current Code:
try:
if self._executor is None:
...
...
else:
raise ValueError(f"Unsupported ti_state: `{ti_state}`.")
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 19.4sSecurity scan powered by Codity.ai |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 3 packagesEPL-2.0 (1 packages):
MPL-2.0 (2 packages):
Unknown Licenses - 3 packages
Powered by Codity.ai · Docs |
|
@codity review |
|
Hold up! This PR is already under review, no double-booking allowed 😄 |
Code Quality Report — test-org-codity/airflow1 · PR #2Scanned: 2026-05-16 12:40 UTC | Score: 61/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-001]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py |
0 | 1 | 3 | 0 | 4 |
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py |
0 | 0 | 1 | 0 | 1 |
Recommendations
- Resolve High severity issues, especially error handling gaps and performance bottlenecks.
- Run automated tests after applying fixes to verify no regressions.
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaOpenLineage Listener: Replaced Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Using |
| except BaseException as e: | ||
| self.log.warning( | ||
| "OpenLineage received exception in method `_on_task_instance_manual_state_change`", | ||
| exc_info=e, |
There was a problem hiding this comment.
Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to avoid masking process-termination signals.
Suggested fix
except Exception as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high
Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to avoid masking process-termination signals.
Current Code:
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 18.2sSecurity scan powered by Codity.ai |
|
@codity review |
Policy Check Failed✗ 3/3 policy checks failed: • Need 2 more approval(s) (0/2) — comment LGTM or approve via review To merge this PR:
|
PR SummaryWhat Changed
Key Changes by AreaState Change Emission: Manual state changes now route through Files Changed
Review Focus Areas
ArchitectureDesign Decisions: Using Scalability & Extensibility: Risks: The reliance on picklable helpers ( Merge StatusNOT MERGEABLE — PR Score 39/100, below threshold (50)
|
| except BaseException as e: | ||
| self.log.warning( | ||
| "OpenLineage received exception in method `_on_task_instance_manual_state_change`", | ||
| exc_info=e, |
There was a problem hiding this comment.
Catching BaseException swallows SystemExit, KeyboardInterrupt, and GeneratorExit; use Exception instead to avoid masking process-termination signals.
Suggested fix
except Exception as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Lines: 802-805
Issue Type: robustness-high
Severity: high
Issue Description:
Catching `BaseException` swallows `SystemExit`, `KeyboardInterrupt`, and `GeneratorExit`; use `Exception` instead to avoid masking process-termination signals.
Current Code:
except BaseException as e:
self.log.warning(
"OpenLineage received exception in method `_on_task_instance_manual_state_change`",
exc_info=e,
)
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
| # Regression guard: manual state-change emission must not go through _fork_execute. | ||
| mock_fork_execute.assert_not_called() |
There was a problem hiding this comment.
assert mock_emit.assert_called_once always evaluates to True because assert_called_once is a method object (truthy), not a call; replace with mock_emit.assert_called_once() (with parentheses) to actually assert the mock was called exactly once.
Also reported at: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py L1726–L1727, L1944–L1945
| # Regression guard: manual state-change emission must not go through _fork_execute. | |
| mock_fork_execute.assert_not_called() | |
| mock_emit.assert_called_once() |
Prompt for AI assistance
Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:
You are an expert python developer with deep knowledge of security, performance, and best practices.
### Context
File: providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
Lines: 1537-1538
Issue Type: functional-medium
Severity: medium
Issue Description:
`assert mock_emit.assert_called_once` always evaluates to `True` because `assert_called_once` is a method object (truthy), not a call; replace with `mock_emit.assert_called_once()` (with parentheses) to actually assert the mock was called exactly once.
_Also reported at: `providers/openlineage/tests/unit/openlineage/plugins/test_listener.py` L1726–L1727, L1944–L1945_
Current Code:
assert mock_emit.assert_called_once
---
### Instructions
1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow python best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed
### Constraints
- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready
---
Security Scan Summary
No critical security issues detected Scan completed in 24.9sSecurity scan powered by Codity.ai |
License Compliance Scan
Weak copyleft licenses found - verify compatibility Some packages have unknown licenses - manual review required Medium Risk Licenses - 3 packagesEPL-2.0 (1 packages):
MPL-2.0 (2 packages):
Unknown Licenses - 3 packages
Powered by Codity.ai · Docs |
Code Quality Report — test-org-codity/airflow1 · PR #2Scanned: 2026-05-16 13:02 UTC | Score: 61/100 | Provider: github Executive Summary
Top Findings[CQ-LLM-001]
|
| File | Critical | High | Medium | Low | Total |
|---|---|---|---|---|---|
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py |
0 | 1 | 3 | 0 | 4 |
providers/openlineage/tests/unit/openlineage/plugins/test_listener.py |
0 | 0 | 1 | 0 | 1 |
Recommendations
- Resolve High severity issues, especially error handling gaps and performance bottlenecks.
- Run automated tests after applying fixes to verify no regressions.
No description provided.