Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions elt-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies = [
"dlt[parquet,s3]==1.26.0", # Soon to remove, don't make any more updates: https://github.com/ISISNeutronMuon/analytics-data-platform/issues/321
"pyiceberg[pyiceberg-core]>=0.11.1",
"s3fs<2026.2.0", # See https://github.com/ISISNeutronMuon/analytics-data-platform/issues/237
"pydantic_settings>=2.14.1",
"pydantic>=2.12.5",
]


Expand All @@ -21,7 +23,6 @@ iceberg-maintenance = [
"click>=8.4.1",
"sqlalchemy>=2.0.49",
"trino>=0.336.0",
"pydantic_settings>=2.14.1",
]
m365 = ["authlib>=1.7.2", "httpx>=0.28.1", "tenacity>=9.1.2"]

Expand All @@ -36,7 +37,6 @@ iceberg-maintenance = "elt_common.iceberg.maintenance:cli"
dev = [
"minio>=7.2.20",
"prek>=0.4.1",
"pydantic-settings>=2.14.1",
"pytest>=9.0.3",
"pytest-httpx>=0.36.2",
"pytest-mock>=3.15.1",
Expand Down
42 changes: 42 additions & 0 deletions elt-common/src/elt_common/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import dataclasses as dc
from typing import TYPE_CHECKING, Any, Callable, Iterator, Literal, Optional, get_args

WriteMode = Literal["append", "merge", "replace"]

if TYPE_CHECKING:
import pyarrow as pa


@dc.dataclass(frozen=True)
class Watermark:
column: str
value: Any


@dc.dataclass(frozen=True, kw_only=True)
class ResourceWriteProperties:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice distinction and makes it clearer that these are purely about writing to the table.

# Destination table
merge_on: list[str] = dc.field(default_factory=list)
partition: dict[str, str] = dc.field(default_factory=dict)
sort_order: dict[str, str] = dc.field(default_factory=dict)
write_mode: WriteMode = "append"

def __post_init__(self):
if self.write_mode not in get_args(WriteMode):
raise ValueError(
f"Invalid write mode '{self.write_mode}'. Allowed values: {get_args(WriteMode)}"
)
if self.write_mode == "merge" and not self.merge_on:
raise ValueError("'merge_on' must be provided when mode='merge'")


@dc.dataclass(frozen=True, kw_only=True)
class ResourceProperties:
"""Configuration for a single resource to be extracted."""

# Required properties
extractor: Callable[[Optional[Watermark]], "Iterator[pa.Table]"]
write_properties: ResourceWriteProperties

# Ingestion properties
watermark_column: Optional[str]
Empty file.
167 changes: 167 additions & 0 deletions elt-common/src/elt_common/sources/sqldatabase/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Support for ingesting data from an SQL database."""

import logging
from abc import ABC, abstractmethod
from typing import Generator, Iterator, NamedTuple, Optional

import pyarrow as pa
import sqlalchemy as sa
from pydantic import SecretStr
from pydantic_settings import BaseSettings

from elt_common.extract import ResourceProperties, ResourceWriteProperties, Watermark

LOGGER = logging.getLogger(__name__)


class SqlDatabaseSourceConfig(BaseSettings):
"""Configuration required to connect to a database"""

# connection
drivername: str
database: str
database_schema: Optional[str] = None
port: Optional[int] = None
host: Optional[str] = None
username: Optional[str] = None
password: Optional[SecretStr] = None

# loading behaviour
chunk_size: int = 5000

@property
def connection_url(self):
return sa.URL.create(
drivername=self.drivername,
username=self.username,
password=self.password.get_secret_value() if self.password else None,
host=self.host,
port=self.port,
database=self.database,
)


class TableInfo(NamedTuple):
"""Extra information for controlling how a table is ingested.

Each table in a DB can have nondefault write properties, a watermark column,
both, or neither.

:ivar write_properties: properties to control how the table is written to the
destination. If omitted, will default to appending with no partitions or sorting.
:ivar watermark_column: the column to use for watermarking. If omitted, the
entire table will be queried on every run
"""

write_properties: Optional[ResourceWriteProperties] = None
watermark_column: Optional[str] = None


class SqlDatabaseExtract(ABC):
"""Base class for defining SQL ingest Extract classes.

Example usage, for an ingest script that reads from 3 tables::

class Extract(SqlDatabaseExtract):
def table_info(self):
return {
"a_table": None,
"a_table_that_watermarks_ingest_progress": TableInfo(
watermark_column="id"
),
"a_table_to_replace_entirely_every_time": TableInfo(
write_properties=ResourceWriteProperties(
write_mode="replace"
)
)
}
"""

source_config_cls = SqlDatabaseSourceConfig

def __init__(self, source_config: SqlDatabaseSourceConfig):
self._source_config = source_config

LOGGER.debug(
f"Creating engine for {source_config.drivername} database at "
f"{source_config.host}:{source_config.port}/{source_config.database}"
)
self._engine = sa.create_engine(source_config.connection_url)
self._metadata = sa.MetaData(schema=source_config.database_schema)

@property
def _chunk_size(self):
return self._source_config.chunk_size

@abstractmethod
def table_info(self) -> dict[str, Optional[TableInfo]]:
"""Define the tables to be extracted from the DB.

Each key in the returned dict is a table name. Their values can include
extra properties for controlling ingestion, see :class:`TableInfo`.
"""
pass

def resource_properties(self):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we would rename this as they no longer return static data and "do" something. Renaming can happen in the next stage of the work when implementing the runner.

"""Open a connection to the DB and return ingest properties for tables
defined by :func:`table_info`.

The extractor functions yielded as part of this function use the DB
connection which is only active whilst this function is executing.
This means the extractors must be called whilst iterating over the
results of this function.
"""
with self._engine.connect() as conn:
yield from self._make_table_properties(conn)

def _make_table_properties(
self, conn: sa.Connection
) -> Generator[tuple[str, ResourceProperties]]:
"""For each table defined in :func:`table_info`, build a
:class:`ResourceProperties` which can be used to ingest it"""

for name, table_props in self.table_info().items():
write_properties = (
table_props.write_properties
if table_props and table_props.write_properties
else ResourceWriteProperties()
)
watermark_column = (
table_props.watermark_column
if table_props and table_props.watermark_column
else None
)

def extractor(watermark):
return self._extract_table(name, watermark=watermark, conn=conn)

properties = ResourceProperties(
extractor=extractor,
write_properties=write_properties,
watermark_column=watermark_column,
)

yield name, properties

def _extract_table(
self,
name: str,
*,
conn: sa.Connection,
watermark: Watermark | None = None,
) -> Iterator[pa.Table]:
LOGGER.debug(f"Extracting table {name} in chunks of {self._chunk_size} rows.")
table = sa.Table(
name,
self._metadata,
autoload_with=self._engine,
)
query = sa.select(table)
if watermark is not None:
column, max_value = watermark.column, watermark.value
LOGGER.debug(f"Cursor value detected. Limiting query to {column} > {max_value}")
query = query.where(sa.column(column) > max_value)

result = conn.execution_options(yield_per=self._chunk_size).execute(query)
for partition in result.mappings().partitions():
yield pa.Table.from_pylist(partition)
Empty file.
Loading
Loading