From 48cb4110a48eaad5e14e7970db57d3799f83e291 Mon Sep 17 00:00:00 2001 From: crobertson-astro Date: Tue, 5 May 2026 15:13:41 -0700 Subject: [PATCH] Improve clarity of cluster-policies documentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrites the cluster policies docs for better clarity: - Fix heading typo ("How do define" → "How to define") - Sharpen policy type descriptions to clarify dag_policy vs task_policy vs task_instance_mutation_hook distinctions - Simplify exception descriptions; remove informal parenthetical - Fix broken sentence in task policies section ("implement to protect") - Improve multi-check guidance and entrypoint plugin instructions --- .../cluster-policies.rst | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/cluster-policies.rst b/airflow-core/docs/administration-and-deployment/cluster-policies.rst index fd7d0b622f540..ee2f500680f0f 100644 --- a/airflow-core/docs/administration-and-deployment/cluster-policies.rst +++ b/airflow-core/docs/administration-and-deployment/cluster-policies.rst @@ -18,8 +18,8 @@ Cluster Policies ================ -If you want to check or mutate Dags or Tasks on a cluster-wide level, then a Cluster Policy will let you do -it. They also let you apply cluster-wide settings to Dags based on the dag_id or other properties. +Cluster policies let you inspect or mutate Dags and tasks at the cluster level. They also let you apply +cluster-wide settings to Dags based on the ``dag_id`` or other properties. Common use-cases include: @@ -29,25 +29,22 @@ Common use-cases include: There are three main types of cluster policy: -* ``dag_policy``: Takes a :class:`~airflow.models.dag.DAG` parameter called ``dag``. Runs at load time of the - Dag from DagBag :class:`~airflow.models.dagbag.DagBag`. -* ``task_policy``: Takes a :class:`~airflow.models.baseoperator.BaseOperator` parameter called ``task``. The - policy gets executed when the task is created during parsing of the task from DagBag at load time. This - means that the whole task definition can be altered in the task policy. It does not relate to a specific - task running in a DagRun. The ``task_policy`` defined is applied to all the task instances that will be - executed in the future. +* ``dag_policy``: Takes a :class:`~airflow.models.dag.DAG` parameter called ``dag``. Runs when the Dag is + loaded from :class:`~airflow.models.dagbag.DagBag`. +* ``task_policy``: Takes a :class:`~airflow.models.baseoperator.BaseOperator` parameter called ``task``. Runs + when the task is parsed from DagBag at load time, so the entire task definition can be modified. It is not + tied to a specific DagRun — any changes apply to all future task instances of that task. * ``task_instance_mutation_hook``: Takes a :class:`~airflow.models.taskinstance.TaskInstance` parameter called - ``task_instance``. The ``task_instance_mutation_hook`` applies not to a task but to the instance of a task that - relates to a particular DagRun. It is executed in a "worker", not in the Dag file processor, just before the - task instance is executed. The policy is only applied to the currently executed run (i.e. instance) of that - task. + ``task_instance``. Unlike the task-level policies, this hook operates on a single task instance within a + specific DagRun. It runs on the worker just before the task instance is executed, so changes apply only to + that particular run. -The Dag and Task cluster policies can raise the :class:`~airflow.exceptions.AirflowClusterPolicyViolation` -exception to indicate that the Dag/task they were passed is not compliant and should not be loaded. +Dag and task cluster policies can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation` +to indicate that the Dag or task is not compliant and should not be loaded. -They can also raise the :class:`~airflow.exceptions.AirflowClusterPolicySkipDag` exception -when skipping that Dag is needed intentionally. Unlike :class:`~airflow.exceptions.AirflowClusterPolicyViolation`, -this exception is not displayed on the Airflow web UI (Internally, it's not recorded on ``import_error`` table on meta database.) +They can also raise :class:`~airflow.exceptions.AirflowClusterPolicySkipDag` to intentionally skip a Dag. +Unlike :class:`~airflow.exceptions.AirflowClusterPolicyViolation`, this exception is not shown in the +Airflow UI and is not recorded in the ``import_error`` table in the metadata database. Any extra attributes set by a cluster policy take priority over those defined in your Dag file; for example, if you set an ``sla`` on your Task in the Dag file, and then your cluster policy also sets an ``sla``, the @@ -55,26 +52,24 @@ cluster policy's value will take precedence. .. _administration-and-deployment:cluster-policies-define: -How do define a policy function +How to define a policy function ------------------------------- There are two ways to configure cluster policies: -1. create an ``airflow_local_settings.py`` file somewhere in the python search path (the ``config/`` folder - under your $AIRFLOW_HOME is a good "default" location) and then add callables to the file matching one or more - of the cluster policy names above (e.g. ``dag_policy``). - -See :ref:`Configuring local settings ` for details on how to -configure local settings. +1. Create an ``airflow_local_settings.py`` file somewhere on the Python path (the ``config/`` folder + under ``$AIRFLOW_HOME`` is the recommended location), then add callables matching one or more of the + cluster policy names above (e.g. ``dag_policy``). + See :ref:`Configuring local settings ` for details. -2. By using a +2. Use a `setuptools entrypoint `_ in a custom module using the `Pluggy `_ interface. .. versionadded:: 2.6 - This method is more advanced and for people who are already comfortable with python packaging. + This method is more advanced and assumes familiarity with Python packaging. First create your policy function in a module: @@ -106,13 +101,16 @@ configure local settings. [project.entry-points.'airflow.policy'] _ = 'my_airflow_plugin.policies' - The entrypoint group must be ``airflow.policy``, and the name must be unique per entry, otherwise duplicate entries will be ignored by pluggy. The value should be your module (or class) decorated with the ``@hookimpl`` marker. + The entrypoint group must be ``airflow.policy``. Each entry name must be unique — pluggy silently + ignores duplicates. The value must be the module (or class) decorated with the ``@hookimpl`` marker. - Once you have done that, and you have installed your distribution into your Airflow env, the policy functions will get called by the various Airflow components. (The exact call order is undefined, so don't rely on any particular calling order if you have multiple plugins). + After installing the distribution into your Airflow environment, the policy functions will be called by + the relevant Airflow components. If you register multiple plugins, the call order between them is + undefined, so do not rely on a specific ordering. -One important thing to note (for either means of defining policy functions) is that the argument names must -exactly match as documented below. +Regardless of which method you use, argument names in your policy functions must exactly match the +signatures documented below. Available Policy Functions -------------------------- @@ -154,14 +152,16 @@ Here's an example of enforcing a maximum timeout policy on every task: :start-after: [START example_task_cluster_policy] :end-before: [END example_task_cluster_policy] -You could also implement to protect against common errors, rather than as technical security controls. For example, don't run tasks without Airflow owners: +You can also use a policy to guard against common authoring errors rather than enforce security controls. For example, requiring all tasks to have an Airflow owner: .. literalinclude:: /../tests/unit/cluster_policies/__init__.py :language: python :start-after: [START example_cluster_policy_rule] :end-before: [END example_cluster_policy_rule] -If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single ``AirflowClusterPolicyViolation`` can be reported in the UI (and import errors table in the database). +If you have multiple checks to apply, define each rule in a separate function and call them all from a +single policy function. This lets you aggregate error messages and raise a single +``AirflowClusterPolicyViolation`` — which is what gets shown in the UI and recorded in the import errors table. For example, your ``airflow_local_settings.py`` might follow this pattern: @@ -184,7 +184,7 @@ Here's an example of re-routing tasks that are on their second (or greater) retr :start-after: [START example_task_mutation_hook] :end-before: [END example_task_mutation_hook] -Note that since priority weight is determined dynamically using weight rules, you cannot alter the ``priority_weight`` of a task instance within the mutation hook. +Note that ``priority_weight`` cannot be altered in the mutation hook — it is determined dynamically by weight rules. Metadata Engine Hooks