Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.timetables.base import DataInterval
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -111,6 +112,7 @@ class DAGRunResponse(BaseModel):
last_scheduling_decision: datetime | None
run_type: DagRunType
state: DagRunState
tags: list[DagTagResponse] | None = Field(validation_alias=AliasPath("dag_model", "tags"))
triggered_by: DagRunTriggeredByType | None
triggering_user_name: str | None
conf: dict | None
Expand Down Expand Up @@ -159,7 +161,6 @@ class TriggerDAGRunPostBody(StrictBaseModel):
data_interval_end: AwareDatetime | None = None
logical_date: AwareDatetime | None
run_after: datetime | None = Field(default_factory=timezone.utcnow)

conf: dict | None = Field(default_factory=dict)
note: str | None = None
partition_key: str | None = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2498,6 +2498,14 @@ paths:
items:
type: string
title: State
- name: tags
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Tags
- name: dag_version
in: query
required: false
Expand Down Expand Up @@ -13636,6 +13644,13 @@ components:
$ref: '#/components/schemas/DagRunType'
state:
$ref: '#/components/schemas/DagRunState'
tags:
anyOf:
- items:
$ref: '#/components/schemas/DagTagResponse'
type: array
- type: 'null'
title: Tags
triggered_by:
anyOf:
- $ref: '#/components/schemas/DagRunTriggeredByType'
Expand Down Expand Up @@ -13689,6 +13704,7 @@ components:
- last_scheduling_decision
- run_type
- state
- tags
- triggered_by
- triggering_user_name
- conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
from airflow.models.dag import DagTag
from airflow.models.dag_version import DagVersion
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down Expand Up @@ -436,6 +437,10 @@ def get_dag_runs(
],
run_type: QueryDagRunRunTypesFilter,
state: QueryDagRunStateFilter,
tags: Annotated[
FilterParam[str | None],
Depends(filter_param_factory(DagTag.name, str | None, FilterOptionEnum.EQUAL, "tags")),
],
dag_version: QueryDagRunVersionFilter,
bundle_version: Annotated[
FilterParam[str | None], Depends(filter_param_factory(DagRun.bundle_version, str | None))
Expand Down Expand Up @@ -516,6 +521,9 @@ def get_dag_runs(
get_latest_version_of_dag(dag_bag, dag_id, session) # Check if the Dag exists.
query = query.filter(DagRun.dag_id == dag_id).options()

if tags != "":
query = query.join(DagTag, DagRun.dag_id == DagTag.dag_id)

# Add join with DagVersion if dag_version filter is active
if dag_version.value:
query = query.join(DagVersion, DagRun.created_dag_version_id == DagVersion.id)
Expand All @@ -529,6 +537,7 @@ def get_dag_runs(
duration_range,
conf_contains,
state,
tags,
run_type,
dag_version,
bundle_version,
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export const UseDagRunServiceGetDagRunKeyFn = ({ dagId, dagRunId }: {
export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof DagRunService.getDagRuns>>;
export type DagRunServiceGetDagRunsQueryResult<TData = DagRunServiceGetDagRunsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }: {
bundleVersion?: string;
confContains?: string;
consumingAssetPattern?: string;
Expand Down Expand Up @@ -191,7 +191,8 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains, c
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
tags?: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }])];
export type DagRunServiceGetUpstreamAssetEventsDefaultResponse = Awaited<ReturnType<typeof DagRunService.getUpstreamAssetEvents>>;
export type DagRunServiceGetUpstreamAssetEventsQueryResult<TData = DagRunServiceGetUpstreamAssetEventsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetUpstreamAssetEventsKey = "DagRunServiceGetUpstreamAssetEvents";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ export const ensureUseDagRunServiceGetDagRunData = (queryClient: QueryClient, {
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }: {
bundleVersion?: string;
confContains?: string;
consumingAssetPattern?: string;
Expand Down Expand Up @@ -387,7 +387,8 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, {
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
tags?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }) });
/**
* Get Upstream Asset Events
* If dag run is asset-triggered, return the asset events that triggered it.
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ export const prefetchUseDagRunServiceGetDagRun = (queryClient: QueryClient, { da
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }: {
bundleVersion?: string;
confContains?: string;
consumingAssetPattern?: string;
Expand Down Expand Up @@ -387,7 +387,8 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { b
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
tags?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }), queryFn: () => DagRunService.getDagRuns({ bundleVersion, confContains, consumingAssetPattern, cursor, dagId, dagIdPattern, dagIdPrefixPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern, partitionKeyPrefixPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runIdPrefixPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, triggeringUserNamePrefixPattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, tags }) });
/**
* Get Upstream Asset Events
* If dag run is asset-triggered, return the asset events that triggered it.
Expand Down
Loading