Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68791.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Warn in the UI when the same dag_id is registered from multiple files.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-regenerated by scripts/in_container/run_generate_openapi_spec.py to reflect the new DUPLICATE_DAG_ID value added to DagWarningType.

Original file line number Diff line number Diff line change
Expand Up @@ -14249,6 +14249,7 @@ components:
type: string
enum:
- asset conflict
- duplicate dag id
- non-existent pool
- runtime varying value
title: DagWarningType
Expand Down
67 changes: 64 additions & 3 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar

import structlog
from sqlalchemy import delete, func, insert, select, tuple_, update
from sqlalchemy import delete, false, func, insert, select, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, load_only

from airflow._shared.observability.metrics import stats
from airflow._shared.timezones.timezone import utcnow
from airflow.assets.manager import asset_manager
from airflow.configuration import conf
Expand All @@ -51,7 +52,7 @@
)
from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarningType
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.trigger import Trigger
Expand All @@ -75,7 +76,6 @@
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.models.dagwarning import DagWarning
from airflow.models.serialized_dag import DagWriteMetadata
from airflow.typing_compat import Self, Unpack

Expand Down Expand Up @@ -325,6 +325,59 @@ def _sync_dag_perms(dag: LazyDeserializedDAG, session: Session):
security_manager.sync_perm_for_dag(dag_id, dag.access_control)


def _build_duplicate_dag_id_warnings(
dags: Collection[LazyDeserializedDAG],
bundle_name: str,
session: Session,
) -> set[DagWarning]:
"""
Detect dag_ids that are defined in multiple files and return DagWarning objects for each.
A warning is emitted whenever the incoming file differs from the file already recorded in
DagModel. This covers both accidental duplicates and file renames while leaving the final
interpretation to the user.
"""
dag_by_id = {dag.dag_id: dag for dag in dags}
if not dag_by_id:
return set()

existing_rows = session.execute(
select(DagModel.dag_id, DagModel.bundle_name, DagModel.relative_fileloc).where(
DagModel.dag_id.in_(dag_by_id),
DagModel.is_stale == false(),
)
)

warnings: set[DagWarning] = set()
for existing in existing_rows:
dag = dag_by_id[existing.dag_id]
if bundle_name == existing.bundle_name and dag.relative_fileloc == existing.relative_fileloc:
continue

message = (
f"dag_id '{dag.dag_id}' is now served from '{dag.relative_fileloc}' "
f"(bundle: '{bundle_name}'), previously registered from '{existing.relative_fileloc}' "
f"(bundle: '{existing.bundle_name}'). "
f"If '{existing.relative_fileloc}' was renamed or moved, this notice will clear on "
"the next parse cycle once the old file is no longer observed. "
f"If both files coexist with the same dag_id, rename one of them to avoid "
"non-deterministic behavior."
)
log.warning(
"Duplicate dag_id '%s' detected: incoming file '%s' (bundle '%s') conflicts with "
"existing file '%s' (bundle '%s')",
dag.dag_id,
dag.relative_fileloc,
bundle_name,
existing.relative_fileloc,
existing.bundle_name,
)
stats.incr("dag_processing.duplicate_dag_id", tags={"dag_id": dag.dag_id})
warnings.add(DagWarning(dag.dag_id, DagWarningType.DUPLICATE_DAG_ID, message))

return warnings


def _update_dag_warnings(
dag_ids: list[str],
warnings: set[DagWarning],
Expand Down Expand Up @@ -446,6 +499,7 @@ def update_dag_parsing_results_in_db(
*,
version_data: dict | None = None,
warning_types: tuple[DagWarningType, ...] = (
DagWarningType.DUPLICATE_DAG_ID,
DagWarningType.NONEXISTENT_POOL,
DagWarningType.RUNTIME_VARYING_VALUE,
),
Expand Down Expand Up @@ -475,6 +529,13 @@ def update_dag_parsing_results_in_db(
# Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
# of any Operational Errors
# In case of failures, provide_session handles rollback
try:
duplicate_warnings = _build_duplicate_dag_id_warnings(dags, bundle_name, session)
except Exception:
log.exception("Error building duplicate dag_id warnings.")
else:
warnings = set(warnings) | duplicate_warnings

for attempt in run_with_db_retries(logger=log):
with attempt:
serialize_errors = []
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/models/dagwarning.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ class DagWarningType(str, Enum):
"""

ASSET_CONFLICT = "asset conflict"
DUPLICATE_DAG_ID = "duplicate dag id"
NONEXISTENT_POOL = "non-existent pool"
RUNTIME_VARYING_VALUE = "runtime varying value"

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-regenerated by pnpm codegen to reflect the new duplicate dag id value added to DagWarningType.

Original file line number Diff line number Diff line change
Expand Up @@ -4243,7 +4243,7 @@ export const $DagVersionResponse = {

export const $DagWarningType = {
type: 'string',
enum: ['asset conflict', 'non-existent pool', 'runtime varying value'],
enum: ['asset conflict', 'duplicate dag id', 'non-existent pool', 'runtime varying value'],
title: 'DagWarningType',
description: `Enum for DAG warning types.
Expand Down

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-regenerated by pnpm codegen to reflect the new duplicate dag id value added to DagWarningType.

Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ export type DagVersionResponse = {
* This is the set of allowable values for the ``warning_type`` field
* in the DagWarning model.
*/
export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'runtime varying value';
export type DagWarningType = 'asset conflict' | 'duplicate dag id' | 'non-existent pool' | 'runtime varying value';

/**
* Backfill collection serializer for responses in dry-run mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@ def test_get_dag_warnings_bad_request(self, test_client):
assert response.status_code == 422
assert (
response_json["detail"][0]["msg"]
== "Input should be 'asset conflict', 'non-existent pool' or 'runtime varying value'"
== "Input should be 'asset conflict', 'duplicate dag id', 'non-existent pool' or 'runtime varying value'"
)
89 changes: 89 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
)
from airflow.models.dag import DagTag
from airflow.models.dagbundle import DagBundleModel
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.trigger import Trigger
Expand Down Expand Up @@ -772,6 +773,94 @@ def test_serialized_dags_are_written_to_db_on_sync(self, testing_dag_bundle, ses
new_serialized_dags_count = session.scalar(select(func.count(SerializedDagModel.dag_id)))
assert new_serialized_dags_count == 1

@patch("airflow.dag_processing.collection.stats.incr")
@pytest.mark.usefixtures("clean_db")
def test_duplicate_dag_id_creates_dag_warning(self, mock_stats_incr, testing_dag_bundle, session):
session.add(
DagModel(
dag_id="duplicated_dag",
bundle_name="testing",
fileloc="/opt/airflow/dags/existing.py",
relative_fileloc="existing.py",
is_stale=False,
)
)
session.flush()

dag = DAG(dag_id="duplicated_dag")
dag.fileloc = "/opt/airflow/dags/current.py"
dag.relative_fileloc = "current.py"

update_dag_parsing_results_in_db(
bundle_name="testing",
bundle_version=None,
dags=[LazyDeserializedDAG.from_dag(dag)],
import_errors={},
parse_duration=None,
warnings=set(),
session=session,
)

warning = session.scalar(
select(DagWarning).where(
DagWarning.dag_id == "duplicated_dag",
DagWarning.warning_type == DagWarningType.DUPLICATE_DAG_ID,
)
)

assert warning is not None
assert "duplicated_dag" in warning.message
assert "current.py" in warning.message
assert "existing.py" in warning.message
assert "renamed or moved" in warning.message
assert "non-deterministic behavior" in warning.message
mock_stats_incr.assert_called_once_with(
"dag_processing.duplicate_dag_id", tags={"dag_id": "duplicated_dag"}
)

@pytest.mark.usefixtures("clean_db")
def test_duplicate_dag_id_warning_is_removed_when_dag_file_matches(self, testing_dag_bundle, session):
session.add(
DagModel(
dag_id="same_file_dag",
bundle_name="testing",
fileloc="/opt/airflow/dags/current.py",
relative_fileloc="current.py",
is_stale=False,
)
)
session.add(
DagWarning(
dag_id="same_file_dag",
warning_type=DagWarningType.DUPLICATE_DAG_ID,
message="Previous duplicate dag_id warning",
)
)
session.flush()

dag = DAG(dag_id="same_file_dag")
dag.fileloc = "/opt/airflow/dags/current.py"
dag.relative_fileloc = "current.py"

update_dag_parsing_results_in_db(
bundle_name="testing",
bundle_version=None,
dags=[LazyDeserializedDAG.from_dag(dag)],
import_errors={},
parse_duration=None,
warnings=set(),
session=session,
)

warning = session.scalar(
select(DagWarning).where(
DagWarning.dag_id == "same_file_dag",
DagWarning.warning_type == DagWarningType.DUPLICATE_DAG_ID,
)
)

assert warning is None

def test_parse_time_written_to_db_on_sync(self, testing_dag_bundle, session):
"""Test that the parse time is correctly written to the DB after parsing"""

Expand Down
1 change: 1 addition & 0 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-regenerated by datamodel-codegen to reflect the new DUPLICATE_DAG_ID value added to DagWarningType.

Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ class DagWarningType(str, Enum):
"""

ASSET_CONFLICT = "asset conflict"
DUPLICATE_DAG_ID = "duplicate dag id"
NON_EXISTENT_POOL = "non-existent pool"
RUNTIME_VARYING_VALUE = "runtime varying value"

Expand Down
Loading