diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index 309eba50cbd2c..3dcd32e84ef0b 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -885,6 +885,43 @@ def test_celery_tasks_registered_on_import(): ) +@pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="ExecuteCallback requires Airflow 3.2+") +@pytest.mark.parametrize( + ("callback_data", "expected_queue"), + [ + pytest.param({"path": "tests.callbacks.test_callback", "kwargs": {}}, "default", id="default_queue"), + pytest.param( + {"path": "tests.callbacks.test_callback", "kwargs": {}, "queue": "callback_queue"}, + "callback_queue", + id="callback_queue", + ), + ], +) +@mock.patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor._send_workloads") +def test_process_workloads_routes_execute_callback(mock_send_workloads, callback_data, expected_queue): + """CeleryExecutor routes callback workloads to Celery with the expected queue.""" + from airflow.executors import workloads + from airflow.executors.workloads.callback import CallbackDTO + + callback_id = "00000000-0000-0000-0000-000000000003" + workload = workloads.ExecuteCallback( + callback=CallbackDTO( + id=callback_id, + fetch_method=workloads.CallbackFetchMethod.IMPORT_PATH, + data=callback_data, + ), + dag_rel_path="test_dag.py", + bundle_info=workloads.BundleInfo(name="test-bundle", version=None), + token="test-token", + log_path="callback.log", + ) + + executor = celery_executor.CeleryExecutor() + executor._process_workloads([workload]) + + mock_send_workloads.assert_called_once_with([(callback_id, workload, expected_queue, None)]) + + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="execute_workload is only used for Airflow 3+") @pytest.mark.skipif(AIRFLOW_V_3_3_PLUS, reason="pre-3.3 compatibility path only applies before Airflow 3.3") def test_execute_workload_runs_execute_task_before_airflow_3_3():