From 85de7b7fbe6bd8cb1aaf4d173e1e60ff2bf025cf Mon Sep 17 00:00:00 2001 From: Takayoshi Makabe Date: Sun, 21 Jun 2026 13:29:20 +0900 Subject: [PATCH 1/3] Add duplicate dag id warning --- .../src/airflow/dag_processing/collection.py | 67 +++++++++++++- airflow-core/src/airflow/models/dagwarning.py | 1 + .../unit/dag_processing/test_collection.py | 89 +++++++++++++++++++ 3 files changed, 154 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 003a4da016e60..af8428fb4b887 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -31,10 +31,11 @@ from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar import structlog -from sqlalchemy import delete, func, insert, select, tuple_, update +from sqlalchemy import delete, false, func, insert, select, tuple_, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import joinedload, load_only +from airflow._shared.observability.metrics import stats from airflow._shared.timezones.timezone import utcnow from airflow.assets.manager import asset_manager from airflow.configuration import conf @@ -51,7 +52,7 @@ ) from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag from airflow.models.dagrun import DagRun -from airflow.models.dagwarning import DagWarningType +from airflow.models.dagwarning import DagWarning, DagWarningType from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.models.trigger import Trigger @@ -75,7 +76,6 @@ from sqlalchemy.orm import Session from sqlalchemy.sql import Select - from airflow.models.dagwarning import DagWarning from airflow.models.serialized_dag import DagWriteMetadata from airflow.typing_compat import Self, Unpack @@ -325,6 +325,59 @@ def _sync_dag_perms(dag: LazyDeserializedDAG, session: Session): security_manager.sync_perm_for_dag(dag_id, dag.access_control) +def _build_duplicate_dag_id_warnings( + dags: Collection[LazyDeserializedDAG], + bundle_name: str, + session: Session, +) -> set[DagWarning]: + """ + Detect dag_ids that are defined in multiple files and return DagWarning objects for each. + + A warning is emitted whenever the incoming file differs from the file already recorded in + DagModel. This covers both accidental duplicates and file renames while leaving the final + interpretation to the user. + """ + dag_by_id = {dag.dag_id: dag for dag in dags} + if not dag_by_id: + return set() + + existing_rows = session.execute( + select(DagModel.dag_id, DagModel.bundle_name, DagModel.relative_fileloc).where( + DagModel.dag_id.in_(dag_by_id), + DagModel.is_stale == false(), + ) + ) + + warnings: set[DagWarning] = set() + for existing in existing_rows: + dag = dag_by_id[existing.dag_id] + if bundle_name == existing.bundle_name and dag.relative_fileloc == existing.relative_fileloc: + continue + + message = ( + f"dag_id '{dag.dag_id}' is now served from '{dag.relative_fileloc}' " + f"(bundle: '{bundle_name}'), previously registered from '{existing.relative_fileloc}' " + f"(bundle: '{existing.bundle_name}'). " + f"If '{existing.relative_fileloc}' was renamed or moved, this notice will clear on " + "the next parse cycle once the old file is no longer observed. " + f"If both files coexist with the same dag_id, rename one of them to avoid " + "non-deterministic behavior." + ) + log.warning( + "Duplicate dag_id '%s' detected: incoming file '%s' (bundle '%s') conflicts with " + "existing file '%s' (bundle '%s')", + dag.dag_id, + dag.relative_fileloc, + bundle_name, + existing.relative_fileloc, + existing.bundle_name, + ) + stats.incr("dag_processing.duplicate_dag_id", tags={"dag_id": dag.dag_id}) + warnings.add(DagWarning(dag.dag_id, DagWarningType.DUPLICATE_DAG_ID, message)) + + return warnings + + def _update_dag_warnings( dag_ids: list[str], warnings: set[DagWarning], @@ -446,6 +499,7 @@ def update_dag_parsing_results_in_db( *, version_data: dict | None = None, warning_types: tuple[DagWarningType, ...] = ( + DagWarningType.DUPLICATE_DAG_ID, DagWarningType.NONEXISTENT_POOL, DagWarningType.RUNTIME_VARYING_VALUE, ), @@ -475,6 +529,13 @@ def update_dag_parsing_results_in_db( # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case # of any Operational Errors # In case of failures, provide_session handles rollback + try: + duplicate_warnings = _build_duplicate_dag_id_warnings(dags, bundle_name, session) + except Exception: + log.exception("Error building duplicate dag_id warnings.") + else: + warnings = set(warnings) | duplicate_warnings + for attempt in run_with_db_retries(logger=log): with attempt: serialize_errors = [] diff --git a/airflow-core/src/airflow/models/dagwarning.py b/airflow-core/src/airflow/models/dagwarning.py index d411a3b938643..b0a5a3c93e1ef 100644 --- a/airflow-core/src/airflow/models/dagwarning.py +++ b/airflow-core/src/airflow/models/dagwarning.py @@ -102,5 +102,6 @@ class DagWarningType(str, Enum): """ ASSET_CONFLICT = "asset conflict" + DUPLICATE_DAG_ID = "duplicate dag id" NONEXISTENT_POOL = "non-existent pool" RUNTIME_VARYING_VALUE = "runtime varying value" diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index fe15045598ff5..359c315cf444c 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -51,6 +51,7 @@ ) from airflow.models.dag import DagTag from airflow.models.dagbundle import DagBundleModel +from airflow.models.dagwarning import DagWarning, DagWarningType from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.models.trigger import Trigger @@ -772,6 +773,94 @@ def test_serialized_dags_are_written_to_db_on_sync(self, testing_dag_bundle, ses new_serialized_dags_count = session.scalar(select(func.count(SerializedDagModel.dag_id))) assert new_serialized_dags_count == 1 + @patch("airflow.dag_processing.collection.stats.incr") + @pytest.mark.usefixtures("clean_db") + def test_duplicate_dag_id_creates_dag_warning(self, mock_stats_incr, testing_dag_bundle, session): + session.add( + DagModel( + dag_id="duplicated_dag", + bundle_name="testing", + fileloc="/opt/airflow/dags/existing.py", + relative_fileloc="existing.py", + is_stale=False, + ) + ) + session.flush() + + dag = DAG(dag_id="duplicated_dag") + dag.fileloc = "/opt/airflow/dags/current.py" + dag.relative_fileloc = "current.py" + + update_dag_parsing_results_in_db( + bundle_name="testing", + bundle_version=None, + dags=[LazyDeserializedDAG.from_dag(dag)], + import_errors={}, + parse_duration=None, + warnings=set(), + session=session, + ) + + warning = session.scalar( + select(DagWarning).where( + DagWarning.dag_id == "duplicated_dag", + DagWarning.warning_type == DagWarningType.DUPLICATE_DAG_ID, + ) + ) + + assert warning is not None + assert "duplicated_dag" in warning.message + assert "current.py" in warning.message + assert "existing.py" in warning.message + assert "renamed or moved" in warning.message + assert "non-deterministic behavior" in warning.message + mock_stats_incr.assert_called_once_with( + "dag_processing.duplicate_dag_id", tags={"dag_id": "duplicated_dag"} + ) + + @pytest.mark.usefixtures("clean_db") + def test_duplicate_dag_id_warning_is_removed_when_dag_file_matches(self, testing_dag_bundle, session): + session.add( + DagModel( + dag_id="same_file_dag", + bundle_name="testing", + fileloc="/opt/airflow/dags/current.py", + relative_fileloc="current.py", + is_stale=False, + ) + ) + session.add( + DagWarning( + dag_id="same_file_dag", + warning_type=DagWarningType.DUPLICATE_DAG_ID, + message="Previous duplicate dag_id warning", + ) + ) + session.flush() + + dag = DAG(dag_id="same_file_dag") + dag.fileloc = "/opt/airflow/dags/current.py" + dag.relative_fileloc = "current.py" + + update_dag_parsing_results_in_db( + bundle_name="testing", + bundle_version=None, + dags=[LazyDeserializedDAG.from_dag(dag)], + import_errors={}, + parse_duration=None, + warnings=set(), + session=session, + ) + + warning = session.scalar( + select(DagWarning).where( + DagWarning.dag_id == "same_file_dag", + DagWarning.warning_type == DagWarningType.DUPLICATE_DAG_ID, + ) + ) + + assert warning is None + def test_parse_time_written_to_db_on_sync(self, testing_dag_bundle, session): """Test that the parse time is correctly written to the DB after parsing""" From 36af15e489a39c1c4bef8eb14509c65391c2839e Mon Sep 17 00:00:00 2001 From: Takayoshi Makabe Date: Sun, 21 Jun 2026 13:58:45 +0900 Subject: [PATCH 2/3] Modify related files --- airflow-core/newsfragments/68791.improvement.rst | 1 + .../api_fastapi/core_api/openapi/v2-rest-api-generated.yaml | 1 + airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts | 2 +- airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../unit/api_fastapi/core_api/routes/public/test_dag_warning.py | 2 +- airflow-ctl/src/airflowctl/api/datamodels/generated.py | 1 + 6 files changed, 6 insertions(+), 3 deletions(-) create mode 100644 airflow-core/newsfragments/68791.improvement.rst diff --git a/airflow-core/newsfragments/68791.improvement.rst b/airflow-core/newsfragments/68791.improvement.rst new file mode 100644 index 0000000000000..2c98be3c8acfd --- /dev/null +++ b/airflow-core/newsfragments/68791.improvement.rst @@ -0,0 +1 @@ +Warn in the UI when the same dag_id is registered from multiple files. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index f82e665c9bea9..4cfd0b1b41219 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -14249,6 +14249,7 @@ components: type: string enum: - asset conflict + - duplicate dag id - non-existent pool - runtime varying value title: DagWarningType diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 413dbef3d7e42..fe928a0723f46 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4243,7 +4243,7 @@ export const $DagVersionResponse = { export const $DagWarningType = { type: 'string', - enum: ['asset conflict', 'non-existent pool', 'runtime varying value'], + enum: ['asset conflict', 'duplicate dag id', 'non-existent pool', 'runtime varying value'], title: 'DagWarningType', description: `Enum for DAG warning types. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 31f716d85f885..0259f59ec2e36 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1106,7 +1106,7 @@ export type DagVersionResponse = { * This is the set of allowable values for the ``warning_type`` field * in the DagWarning model. */ -export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'runtime varying value'; +export type DagWarningType = 'asset conflict' | 'duplicate dag id' | 'non-existent pool' | 'runtime varying value'; /** * Backfill collection serializer for responses in dry-run mode. diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py index 91efee48df797..ade87cccf6076 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py @@ -117,5 +117,5 @@ def test_get_dag_warnings_bad_request(self, test_client): assert response.status_code == 422 assert ( response_json["detail"][0]["msg"] - == "Input should be 'asset conflict', 'non-existent pool' or 'runtime varying value'" + == "Input should be 'asset conflict', 'duplicate dag id', 'non-existent pool' or 'runtime varying value'" ) diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 779bb42319193..9897e0bd7b8c6 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -553,6 +553,7 @@ class DagWarningType(str, Enum): """ ASSET_CONFLICT = "asset conflict" + DUPLICATE_DAG_ID = "duplicate dag id" NON_EXISTENT_POOL = "non-existent pool" RUNTIME_VARYING_VALUE = "runtime varying value" From fed0d5e1d7f7a5f01c35ef4996017bc41dce1def Mon Sep 17 00:00:00 2001 From: Takayoshi Makabe Date: Mon, 22 Jun 2026 19:16:01 +0900 Subject: [PATCH 3/3] Add dag_processing.duplicate_dag_id to metrics registry --- .../observability/metrics/metrics_template.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 0ba1563116fe4..7ef3d3f4d27ac 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -399,6 +399,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "dag_processing.duplicate_dag_id" + description: "Number of Dags skipped because their dag_id is already registered from another file" + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "dag_processing.total_parse_time" description: "Seconds taken to scan and import ``dag_processing.file_path_queue_size`` Dag files" type: "gauge"