Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,28 @@ def load_policy_plugins(pm: pluggy.PluginManager):


def _get_async_conn_uri_from_sync(sync_uri):
"""
Convert a sync SQLAlchemy URI to its async equivalent.

Args:
sync_uri: The synchronous SQLAlchemy connection URI

Returns:
The async URI with appropriate async driver (e.g., aiosqlite, asyncpg, aiomysql),
or the original URI if no async driver mapping exists

Raises:
ValueError: If the URI is malformed (missing ':' scheme separator)
"""
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""Mapping of sync scheme to async scheme."""

if not sync_uri or ":" not in sync_uri:
raise ValueError(
f"Invalid SQLAlchemy connection URI: {sync_uri!r}. "
"The URI must be in the format 'scheme://host/path' or similar with a ':' separator. "
"Check that AIRFLOW__DATABASE__SQL_ALCHEMY_CONN environment variable or "
"sql_alchemy_conn in airflow.cfg contains a valid database URI."
)

scheme, rest = sync_uri.split(":", maxsplit=1)
scheme = scheme.split("+", maxsplit=1)[0]
Expand Down
55 changes: 55 additions & 0 deletions airflow-core/tests/unit/core/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,3 +500,58 @@ def test_early_return_when_all_none(self):
settings.dispose_orm(do_log=False)

mock_close.assert_not_called()


class TestGetAsyncConnUriFromSync:
"""Tests for _get_async_conn_uri_from_sync function."""

def test_sqlite_uri_conversion(self):
"""Test conversion of SQLite sync URI to async with aiosqlite."""
result = settings._get_async_conn_uri_from_sync("sqlite:///path/to/db.sqlite")
assert result == "sqlite+aiosqlite:///path/to/db.sqlite"

def test_postgresql_uri_conversion(self):
"""Test conversion of PostgreSQL sync URI to async with asyncpg."""
result = settings._get_async_conn_uri_from_sync("postgresql://user:pass@localhost/dbname")
assert result == "postgresql+asyncpg://user:pass@localhost/dbname"

def test_mysql_uri_conversion(self):
"""Test conversion of MySQL sync URI to async with aiomysql."""
result = settings._get_async_conn_uri_from_sync("mysql://user:pass@localhost/dbname")
assert result == "mysql+aiomysql://user:pass@localhost/dbname"

def test_postgresql_psycopg2_uri_conversion(self):
"""Test conversion of PostgreSQL with psycopg2 driver to asyncpg."""
result = settings._get_async_conn_uri_from_sync("postgresql+psycopg2://user@localhost/db")
assert result == "postgresql+asyncpg://user@localhost/db"
Comment on lines +508 to +526

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it maybe make sense to parameterize these tests? It could be tightened up to something like:

@pytest.mark.parametrize(
    ("sync_uri", expected"),
    [
        ("sqlite:///path/to/db.sqlite", "sqlite+aiosqlite:///path/to/db.sqlite"),
        ("postgresql://user:pass@localhost/dbname", "postgresql+asyncpg://user:pass@localhost/dbname"),
        ("mysql://user:pass@localhost/dbname", "mysql+aiomysql://user:pass@localhost/dbname"),
        ("postgresql+psycopg2://user@localhost/db", "postgresql+asyncpg://user@localhost/db"),
    ],
)
    def test_supported_scheme_conversion(self, sync_uri, expected):
        assert settings._get_async_conn_uri_from_sync(sync_uri) == expected


def test_unsupported_scheme_returns_original_uri(self):
"""Test that unsupported schemes return the original URI unchanged."""
uri = "oracle://user:pass@localhost:1521/dbname"
result = settings._get_async_conn_uri_from_sync(uri)
assert result == uri

def test_empty_string_raises_value_error(self):
"""Test that empty string raises ValueError with helpful message."""
with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI"):
settings._get_async_conn_uri_from_sync("")

def test_none_raises_value_error(self):
"""Test that None raises ValueError with helpful message."""
with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI"):
settings._get_async_conn_uri_from_sync(None)

def test_malformed_uri_without_colon_raises_value_error(self):
"""Test that URI without ':' separator raises ValueError."""
with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI.*':' separator"):
settings._get_async_conn_uri_from_sync("notavaliduri")
Comment on lines +534 to +547

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do the same with these tests as well.


def test_error_message_is_helpful(self):
"""Test that error message contains helpful guidance."""
with pytest.raises(ValueError, match="Invalid SQLAlchemy connection URI") as exc_info:
settings._get_async_conn_uri_from_sync("invalid_value")

error_msg = str(exc_info.value)
assert "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in error_msg
assert "sql_alchemy_conn" in error_msg
assert "airflow.cfg" in error_msg
Loading