From e23a60dee17478240d839ba679a42b3aa33fc5cf Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Sat, 20 Jun 2026 16:33:02 +0530 Subject: [PATCH] Show clear error for invalid sql_alchemy_conn at startup Users can hit a cryptic unpacking ValueError when sql_alchemy_conn is malformed, especially while diagnosing slow CLI startup in constrained environments. A clearer validation path makes misconfiguration immediately actionable and reduces support/debugging churn. --- airflow-core/src/airflow/settings.py | 22 +++++++- airflow-core/tests/unit/core/test_settings.py | 55 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/settings.py b/airflow-core/src/airflow/settings.py index 56fc3d07cedd9..9727cec04c2df 100644 --- a/airflow-core/src/airflow/settings.py +++ b/airflow-core/src/airflow/settings.py @@ -241,8 +241,28 @@ def load_policy_plugins(pm: pluggy.PluginManager): def _get_async_conn_uri_from_sync(sync_uri): + """ + Convert a sync SQLAlchemy URI to its async equivalent. + + Args: + sync_uri: The synchronous SQLAlchemy connection URI + + Returns: + The async URI with appropriate async driver (e.g., aiosqlite, asyncpg, aiomysql), + or the original URI if no async driver mapping exists + + Raises: + ValueError: If the URI is malformed (missing ':' scheme separator) + """ AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} - """Mapping of sync scheme to async scheme.""" + + if not sync_uri or ":" not in sync_uri: + raise ValueError( + f"Invalid SQLAlchemy connection URI: {sync_uri!r}. " + "The URI must be in the format 'scheme://host/path' or similar with a ':' separator. " + "Check that AIRFLOW__DATABASE__SQL_ALCHEMY_CONN environment variable or " + "sql_alchemy_conn in airflow.cfg contains a valid database URI." + ) scheme, rest = sync_uri.split(":", maxsplit=1) scheme = scheme.split("+", maxsplit=1)[0] diff --git a/airflow-core/tests/unit/core/test_settings.py b/airflow-core/tests/unit/core/test_settings.py index 588567181c52e..1a312290028a1 100644 --- a/airflow-core/tests/unit/core/test_settings.py +++ b/airflow-core/tests/unit/core/test_settings.py @@ -500,3 +500,58 @@ def test_early_return_when_all_none(self): settings.dispose_orm(do_log=False) mock_close.assert_not_called() + + +class TestGetAsyncConnUriFromSync: + """Tests for _get_async_conn_uri_from_sync function.""" + + def test_sqlite_uri_conversion(self): + """Test conversion of SQLite sync URI to async with aiosqlite.""" + result = settings._get_async_conn_uri_from_sync("sqlite:///path/to/db.sqlite") + assert result == "sqlite+aiosqlite:///path/to/db.sqlite" + + def test_postgresql_uri_conversion(self): + """Test conversion of PostgreSQL sync URI to async with asyncpg.""" + result = settings._get_async_conn_uri_from_sync("postgresql://user:pass@localhost/dbname") + assert result == "postgresql+asyncpg://user:pass@localhost/dbname" + + def test_mysql_uri_conversion(self): + """Test conversion of MySQL sync URI to async with aiomysql.""" + result = settings._get_async_conn_uri_from_sync("mysql://user:pass@localhost/dbname") + assert result == "mysql+aiomysql://user:pass@localhost/dbname" + + def test_postgresql_psycopg2_uri_conversion(self): + """Test conversion of PostgreSQL with psycopg2 driver to asyncpg.""" + result = settings._get_async_conn_uri_from_sync("postgresql+psycopg2://user@localhost/db") + assert result == "postgresql+asyncpg://user@localhost/db" + + def test_unsupported_scheme_returns_original_uri(self): + """Test that unsupported schemes return the original URI unchanged.""" + uri = "oracle://user:pass@localhost:1521/dbname" + result = settings._get_async_conn_uri_from_sync(uri) + assert result == uri + + def test_empty_string_raises_value_error(self): + """Test that empty string raises ValueError with helpful message.""" + with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI"): + settings._get_async_conn_uri_from_sync("") + + def test_none_raises_value_error(self): + """Test that None raises ValueError with helpful message.""" + with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI"): + settings._get_async_conn_uri_from_sync(None) + + def test_malformed_uri_without_colon_raises_value_error(self): + """Test that URI without ':' separator raises ValueError.""" + with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI.*':' separator"): + settings._get_async_conn_uri_from_sync("notavaliduri") + + def test_error_message_is_helpful(self): + """Test that error message contains helpful guidance.""" + with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI") as exc_info: + settings._get_async_conn_uri_from_sync("invalid_value") + + error_msg = str(exc_info.value) + assert "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in error_msg + assert "sql_alchemy_conn" in error_msg + assert "airflow.cfg" in error_msg