diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index a09851e5e24ff..6270b0b97fd35 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -1518,6 +1518,13 @@ class GroupCommand(NamedTuple): func=lazy_load_command("airflow.cli.commands.team_command.team_list"), args=(ARG_OUTPUT, ARG_VERBOSE), ), + ActionCommand( + name="sync", + help="Sync teams", + description=("Sync missing teams from the dag bundle config into the database.\n"), + func=lazy_load_command("airflow.cli.commands.team_command.team_sync"), + args=(ARG_VERBOSE,), + ), ) DB_COMMANDS = ( ActionCommand( diff --git a/airflow-core/src/airflow/cli/commands/team_command.py b/airflow-core/src/airflow/cli/commands/team_command.py index 13702d516c45d..244396f3d89b5 100644 --- a/airflow-core/src/airflow/cli/commands/team_command.py +++ b/airflow-core/src/airflow/cli/commands/team_command.py @@ -25,6 +25,7 @@ from sqlalchemy.exc import IntegrityError from airflow.cli.simple_table import AirflowConsole +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.connection import Connection from airflow.models.pool import Pool from airflow.models.team import Team, dag_bundle_team_association_table @@ -155,3 +156,33 @@ def team_list(args, session=NEW_SESSION): print(NO_TEAMS_LIST_MSG) else: _show_teams(teams=teams, output=args.output) + + +@cli_utils.action_cli +@providers_configuration_loaded +@provide_session +def team_sync(args, session=NEW_SESSION): + """Sync missing teams from the dag bundle config.""" + existing_teams = {t.name for t in session.scalars(select(Team).order_by(Team.name)).all()} + + dag_bundle_teams = { + bundle.team_name + for bundle in DagBundlesManager()._bundle_config.values() + if bundle.team_name is not None + } + + teams_added = 0 + + try: + for team_name in dag_bundle_teams: + if team_name not in existing_teams: + team = Team(name=team_name) + session.add(team) + teams_added += 1 + session.commit() + except Exception as e: + session.rollback() + raise SystemExit(f"Failed to sync teams: {e}") + + if teams_added > 0: + print(f"{teams_added} teams added.") diff --git a/airflow-core/tests/unit/cli/commands/test_team_command.py b/airflow-core/tests/unit/cli/commands/test_team_command.py index 55892489f8b77..a54e333eea279 100644 --- a/airflow-core/tests/unit/cli/commands/test_team_command.py +++ b/airflow-core/tests/unit/cli/commands/test_team_command.py @@ -19,6 +19,7 @@ from unittest.mock import patch +import json import pytest from sqlalchemy import select @@ -30,6 +31,7 @@ from airflow.models.team import Team, dag_bundle_team_association_table from airflow.settings import Session +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import ( clear_db_connections, clear_db_dag_bundles, @@ -362,3 +364,37 @@ def test_team_operations_integration(self): assert "integration-1" in team_names assert "integration-2" not in team_names assert "integration-3" in team_names + + def test_team_sync(self): + bundle_config = [ + { + "name": "bundleone", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 0}, + "team_name": "team1", + }, + { + "name": "bundletwo", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 300}, + "team_name": "team2", + }, + { + "name": "bundlethree", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"path": "/dev/null", "refresh_interval": 300}, + }, + ] + + teams = self.session.scalars(select(Team)).all() + assert len(teams) == 0 + + with conf_vars({("dag_processing", "dag_bundle_config_list"): json.dumps(bundle_config)}): + team_command.team_sync(self.parser.parse_args(["teams", "sync"])) + + teams = self.session.scalars(select(Team)).all() + assert len(teams) == 2 + + team_names = [team.name for team in teams] + assert "team1" in team_names + assert "team2" in team_names