Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``9ff64e1c35d3`` (head) | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and |
| ``623bce373cdf`` (head) | ``9ff64e1c35d3`` | ``3.3.0`` | Add is_refresh to AssetPartitionDagRun. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``9ff64e1c35d3`` | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes on dag_run.created_dag_version_id and |
| | | | task_instance.dag_version_id. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``dd5f3a8e2b91`` | ``c20871fbf23a`` | ``3.3.0`` | Add rollup_fingerprint to AssetPartitionDagRun and index |
Expand Down
40 changes: 39 additions & 1 deletion airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
TaskOutletAssetReference,
)
from airflow.models.log import Log
from airflow.partition_mappers.base import is_rollup
from airflow.partition_mappers.rerun_policy import RerunPolicy
from airflow.timetables.base import compute_rollup_fingerprint
from airflow.utils.helpers import is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -643,14 +645,22 @@ def _queue_partitioned_dags(
)
continue

# rerun_policy only applies to rollups (it decides what happens when an
# upstream key re-arrives after the window already fired). Non-rollup
# mappers have no such field; ``None`` keeps their legacy behavior.
rerun_policy = mapper.rerun_policy if is_rollup(mapper) else None
for target_key in target_keys:
apdr = cls._get_or_create_apdr(
target_key=target_key,
target_dag=target_dag,
rollup_fingerprint=fingerprint,
asset_id=asset_id,
rerun_policy=rerun_policy,
session=session,
)
# IGNORE dropped this late event for an already-fired window.
if apdr is None:
continue
log_record = PartitionedAssetKeyLog(
asset_id=asset_id,
asset_event_id=event.id,
Expand All @@ -669,8 +679,9 @@ def _get_or_create_apdr(
target_dag: DagModel,
rollup_fingerprint: dict,
asset_id: int,
rerun_policy: RerunPolicy | None = None,
session: Session,
) -> AssetPartitionDagRun:
) -> AssetPartitionDagRun | None:
"""
Get or create an APDR.

Expand All @@ -683,6 +694,19 @@ def _get_or_create_apdr(
``rollup_fingerprint`` is the serialized mapper / window definition for all partitioned
assets in the timetable at creation time; the scheduler discards APDRs whose stamp no
longer matches the current timetable's fingerprint (mapper / window may have changed).

``rerun_policy`` governs what happens when the latest APDR for this (key, dag) has
already fired and a new upstream event arrives (an upstream partition was cleared and
re-run). It is set only for rollup mappers; ``None`` (non-rollup) keeps the legacy
"always create a fresh APDR" behavior, where the new APDR fires on the next tick.

- :attr:`RerunPolicy.IGNORE` returns ``None`` so the caller drops the late event.
- :attr:`RerunPolicy.REFRESH` creates a new APDR flagged ``is_refresh`` so the
scheduler fires it immediately rather than waiting for the whole window again.
- :attr:`RerunPolicy.HOLD` (and non-rollup) creates a plain new APDR that waits.

A pending (not-yet-fired) latest APDR is always reused regardless of policy, so events
accumulating toward a window's first firing are unaffected.
"""
with _lock_asset_model(session=session, asset_id=asset_id):
latest_apdr: AssetPartitionDagRun | None = session.scalar(
Expand All @@ -703,11 +727,25 @@ def _get_or_create_apdr(
)
return latest_apdr

is_refresh = False
if latest_apdr is not None:
# The latest APDR already fired, so this event re-arrives for an
# already-materialized window. Apply the rollup's rerun policy.
if rerun_policy is RerunPolicy.IGNORE:
cls.logger().debug(
"Dropping re-arrived event for fired window key %s dag_id %s (IGNORE)",
target_key,
target_dag.dag_id,
)
return None
is_refresh = rerun_policy is RerunPolicy.REFRESH

apdr = AssetPartitionDagRun(
target_dag_id=target_dag.dag_id,
created_dag_run_id=None,
partition_key=target_key,
rollup_fingerprint=rollup_fingerprint,
is_refresh=is_refresh,
)
session.add(apdr)
session.flush()
Expand Down
46 changes: 29 additions & 17 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2261,23 +2261,35 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st

source_key_by_asset = source_key_by_asset_per_apdr[apdr.id]
timetable = dag.timetable
statuses: dict[SerializedAssetUniqueKey, bool] = {}
for asset_id, (name, uri) in asset_info_per_apdr[apdr.id].items():
key = SerializedAssetUniqueKey(name=name, uri=uri)
if timetable.partitioned:
statuses[key] = self._resolve_asset_partition_status(
session=session,
asset_id=asset_id,
name=name,
uri=uri,
apdr=apdr,
timetable=timetable,
actual_by_asset=source_key_by_asset,
)
else:
statuses[key] = True
if not evaluator.run(timetable.asset_condition, statuses=statuses):
continue
contributing_assets = asset_info_per_apdr[apdr.id]
if apdr.is_refresh:
# A refresh APDR supersedes an already-fired window
# (RerunPolicy.REFRESH): the rest of the window is still
# materialized, so fire immediately with the re-arrived events
# rather than re-satisfying the wait policy. Still require at
# least one active contributing asset, mirroring the
# freeze-on-inactive behavior below: an APDR whose only assets
# are inactive stays pending until they reactivate.
if not contributing_assets:
continue
else:
statuses: dict[SerializedAssetUniqueKey, bool] = {}
for asset_id, (name, uri) in contributing_assets.items():
key = SerializedAssetUniqueKey(name=name, uri=uri)
if timetable.partitioned:
statuses[key] = self._resolve_asset_partition_status(
session=session,
asset_id=asset_id,
name=name,
uri=uri,
apdr=apdr,
timetable=timetable,
actual_by_asset=source_key_by_asset,
)
else:
statuses[key] = True
if not evaluator.run(timetable.asset_condition, statuses=statuses):
continue

partition_dag_ids.add(apdr.target_dag_id)
run_after = timezone.utcnow()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add is_refresh to AssetPartitionDagRun.

The ``is_refresh`` flag marks a provisional partition Dag run that re-fires an
already-materialized rollup window after an upstream partition was cleared and
re-run under ``RerunPolicy.REFRESH``. The scheduler fires such a run immediately
with the current events instead of waiting for the whole window to re-arrive.

Revision ID: 623bce373cdf
Revises: 9ff64e1c35d3
Create Date: 2026-06-17 00:00:00.000000

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.utils import disable_sqlite_fkeys

revision = "623bce373cdf"
down_revision = "9ff64e1c35d3"
branch_labels = None
depends_on = None
airflow_version = "3.3.0"


def upgrade():
"""Add ``is_refresh`` to ``asset_partition_dag_run``."""
with disable_sqlite_fkeys(op):
with op.batch_alter_table("asset_partition_dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("is_refresh", sa.Boolean(), nullable=False, server_default="0"))


def downgrade():
"""Drop the APDR ``is_refresh`` column."""
with disable_sqlite_fkeys(op):
with op.batch_alter_table("asset_partition_dag_run", schema=None) as batch_op:
batch_op.drop_column("is_refresh")
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import sqlalchemy as sa
from sqlalchemy import (
JSON,
Boolean,
Column,
ForeignKey,
ForeignKeyConstraint,
Expand Down Expand Up @@ -931,6 +932,11 @@ class AssetPartitionDagRun(Base):
# legacy rows that pre-date the column; they are treated as stale on the
# next scheduler tick.
rollup_fingerprint: Mapped[dict | None] = mapped_column(JSON, nullable=True)
# Set when this APDR is a refresh of an already-materialized window (an
# upstream partition was cleared and re-run under RerunPolicy.REFRESH). The
# window already fired once, so the scheduler fires a refresh immediately
# with the current events instead of waiting for the whole window again.
is_refresh: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="0")
created_at: Mapped[datetime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/partition_mappers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, ClassVar, TypeGuard

from airflow.partition_mappers.rerun_policy import RerunPolicy
from airflow.partition_mappers.wait_policy import WaitForAll, WaitPolicy

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,6 +138,11 @@ class RollupMapper(PartitionMapper):
``wait_policy`` that decides when the downstream Dag run fires given
the expected window and the upstream keys that have actually arrived.
The default policy waits for every expected upstream key.

``rerun_policy`` decides what happens when an upstream partition that the
fired window already consumed is cleared and re-run; see :class:`RerunPolicy`.
The default :attr:`RerunPolicy.HOLD` waits for the whole window to
re-materialize (the historical behavior).
"""

is_rollup: ClassVar[bool] = True
Expand All @@ -147,6 +153,7 @@ def __init__(
upstream_mapper: PartitionMapper,
window: Window,
wait_policy: WaitPolicy | None = None,
rerun_policy: RerunPolicy | str = RerunPolicy.HOLD,
max_downstream_keys: int | None = None,
) -> None:
decode_overridden = type(upstream_mapper).decode_downstream is not PartitionMapper.decode_downstream
Expand All @@ -166,6 +173,7 @@ def __init__(
self.upstream_mapper = upstream_mapper
self.window = window
self.wait_policy = wait_policy
self.rerun_policy = RerunPolicy(rerun_policy)

def to_downstream(self, key: str) -> str | Iterable[str]:
return self.upstream_mapper.to_downstream(key)
Expand Down Expand Up @@ -198,6 +206,7 @@ def serialize(self) -> dict[str, Any]:
"upstream_mapper": encode_partition_mapper(self.upstream_mapper),
"window": encode_window(self.window),
"wait_policy": encode_wait_policy(self.wait_policy),
"rerun_policy": self.rerun_policy.value,
}
if self.max_downstream_keys is not None:
data["max_downstream_keys"] = self.max_downstream_keys
Expand All @@ -215,6 +224,9 @@ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
upstream_mapper=decode_partition_mapper(data["upstream_mapper"]),
window=decode_window(data["window"]),
wait_policy=decode_wait_policy(data["wait_policy"]),
# Default for serialized Dags written before rerun_policy existed —
# HOLD reproduces their pre-feature behavior (wait for the full window).
rerun_policy=data.get("rerun_policy", RerunPolicy.HOLD),
max_downstream_keys=data.get("max_downstream_keys"),
)

Expand Down
43 changes: 43 additions & 0 deletions airflow-core/src/airflow/partition_mappers/rerun_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from enum import Enum


class RerunPolicy(str, Enum):
"""
Core-side mirror of the SDK :class:`airflow.sdk.RerunPolicy`.

Decides what the scheduler does when an upstream partition is cleared and
re-run after a rollup's downstream window has already fired. See the SDK
class for the authoring-facing documentation; the two are serialized by
string value so they round-trip across the Dag-parse / scheduler boundary.

``HOLD`` (default): queue a provisional run that waits for the entire window
to re-materialize before firing again. This is the historical behavior, so it
is the default to keep existing Dags unchanged.

``REFRESH``: re-fire the downstream Dag run immediately so it reprocesses with
the corrected upstream data.

``IGNORE``: drop the late upstream event; do not re-fire.
"""

REFRESH = "refresh"
HOLD = "hold"
IGNORE = "ignore"
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/serialization/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
PartitionMapper,
ProductMapper,
QuarterWindow,
RerunPolicy,
RollupMapper,
SegmentWindow,
StartOfDayMapper,
Expand Down Expand Up @@ -530,6 +531,7 @@ def _(self, partition_mapper: RollupMapper) -> dict[str, Any]:
"upstream_mapper": encode_partition_mapper(partition_mapper.upstream_mapper),
"window": encode_window(partition_mapper.window),
"wait_policy": encode_wait_policy(partition_mapper.wait_policy),
"rerun_policy": RerunPolicy(partition_mapper.rerun_policy).value,
}
if partition_mapper.max_downstream_keys is not None:
data["max_downstream_keys"] = partition_mapper.max_downstream_keys
Expand Down
Loading
Loading