Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions elt-common/src/elt_common/iceberg/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Iceberg catalog configuration.

Reads connection properties from environment variables and provides
a ``connect_catalog()`` helper that returns a connected pyiceberg ``Catalog``.
"""

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.typedef import Identifier
from pyiceberg.utils.config import Config as IcebergCatalogConfig


def connect_catalog() -> Catalog:
"""The default load_catalog only allows environment variables set before the first import or pyiceberg.catalog"""
config = IcebergCatalogConfig()
name = config.get_default_catalog_name()
return load_catalog(name, **config.get_catalog_config(name)) # type: ignore


def table_identifier(namespace: str, table_name: str) -> Identifier:
"""Construct a standard fully-qualified table name."""
return namespace, table_name
143 changes: 143 additions & 0 deletions elt-common/src/elt_common/iceberg/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Iceberg table io using pyiceberg.

Provides :class:`IcebergIO` which can be used to write pyarrow tables to an
iceberg catalog, and read properties from iceberg tables, whilst handling table,
namespace, and schema creation/evolution
"""

import logging

import pyarrow as pa
from pyiceberg.typedef import Identifier

from elt_common.iceberg.catalog import Catalog
from elt_common.iceberg.schema import create_schema, evolve_schema
from elt_common.iceberg.partition import create_partition_spec
from elt_common.iceberg.sortorder import create_sort_order
from elt_common.typing import (
BaseIO,
PartitionConfig,
SortOrderConfig,
WriteMode,
)
from pyiceberg.table import ALWAYS_TRUE, Table as IcebergTable


LOGGER = logging.getLogger(__name__)


class IcebergIO(BaseIO):
"""Read/write arrow tables to/from Iceberg, handling table creation and schema evolution."""

def __init__(self, catalog: Catalog) -> None:
self.catalog = catalog

def ensure_namespace(self, namespace: str) -> None:
"""Create the namespace if it doesn't already exist."""
if not self.catalog.namespace_exists(namespace):
self.catalog.create_namespace(namespace)
LOGGER.info(f"Created namespace '{namespace}'")

def read_property(self, table_id: Identifier, key: str) -> str:
"""Read a table property.

:param table_id: namespaced identifier of the table to read from
:param key: the key to read the value of
:raises: KeyError if property does not exist
"""
table = self.catalog.load_table(table_id)
return table.properties[key]

def write_table(
self,
table_id: Identifier,
data: pa.Table,
write_mode: WriteMode,
*,
merge_on: list[str] | None = None,
partition: PartitionConfig | None = None,
sort_order: SortOrderConfig | None = None,
properties: dict[str, str] | None = None,
) -> None:
"""Write an Arrow table to an Iceberg table.

:param table_id: namespaced identifier of the table to write to
:param data: the new data to write to the table
:param write_mode: 'append' adds the data to the table,
'merge' adds new data and modifies existing rows,
'replace' completely overwrites the table with the new data
:param merge_on: field(s) to determine if rows should be merged. Required if write_mode is 'merge'
:param partition: mapping of table names to the column(s) they should be partitioned by
:param sort_order: mapping of table names to the sort direction of their column(s)
:param properties: additional properties to set on the table upon completion. Useful for watermarking
"""
if data.num_rows == 0:
LOGGER.info(f"No data to write to {table_id}, skipping.")
return

iceberg_table = self._ensure_table(table_id, data.schema, partition, sort_order)

with iceberg_table.transaction() as txn:
if write_mode == "append":
txn.append(data)
elif write_mode == "merge":
if merge_on is None:
raise ValueError(
f"Table '{table_id}': write mode 'merge' requires 'merge_on' property."
)
txn.upsert(
df=data,
join_cols=merge_on,
when_matched_update_all=True,
when_not_matched_insert_all=True,
case_sensitive=True,
)
elif write_mode == "replace":
txn.overwrite(data, overwrite_filter=ALWAYS_TRUE, case_sensitive=True)
else:
raise ValueError(f"Unsupported write mode: {write_mode!r}")

if properties is not None:
txn.set_properties(properties)

LOGGER.debug(f"Wrote {data.num_rows} rows to {table_id} (mode={write_mode})")

# private
def _ensure_table(
self,
table_id: Identifier,
arrow_schema: pa.Schema,
partition: PartitionConfig | None,
sort_order: SortOrderConfig | None,
) -> IcebergTable:
"""Load an existing table or create a new one.

For existing tables ensure the schema matches the incoming data."""
if self.catalog.table_exists(table_id):
return _ensure_table_schema(self.catalog.load_table(table_id), arrow_schema)

iceberg_schema = create_schema(arrow_schema)
LOGGER.debug(f"Created iceberg schema: {iceberg_schema}")
partition_spec = create_partition_spec(partition, iceberg_schema)
LOGGER.debug(f"Created partition spec: {partition_spec}")
sort_order_spec = create_sort_order(sort_order, iceberg_schema)
LOGGER.debug(f"Created sort order spec: {sort_order_spec}")

LOGGER.info(f"Creating table {table_id}")
return self.catalog.create_table(
table_id,
schema=iceberg_schema,
partition_spec=partition_spec,
sort_order=sort_order_spec,
)


def _ensure_table_schema(iceberg_table: IcebergTable, new_schema: pa.Schema) -> IcebergTable:
"""Ensure the existing table schema matches the new schema."""
new_schema = evolve_schema(iceberg_table.schema(), new_schema) # type:ignore
if new_schema is not None:
LOGGER.debug(f"Evolving schema. New schema: {new_schema}")
with iceberg_table.update_schema() as update:
update.union_by_name(new_schema)

return iceberg_table
33 changes: 33 additions & 0 deletions elt-common/src/elt_common/iceberg/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from elt_common.typing import PartitionConfig
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionField,
PartitionSpec,
)
from pyiceberg.schema import Schema
import pyiceberg.transforms as transforms


def create_partition_spec(
partition_config: PartitionConfig | None, iceberg_schema: Schema
) -> PartitionSpec:
"""Create an Iceberg partition spec from the partition hints"""

def field_name(column_name: str, transform: str):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea what the norms are for this, but would it make sense to just return the column name if the 'identity' transform is being used?

bracket_index = transform.find("[")
return f"{column_name}_{transform[:bracket_index] if bracket_index > 0 else transform}"

if not partition_config:
return UNPARTITIONED_PARTITION_SPEC

return PartitionSpec(
*(
PartitionField(
source_id=iceberg_schema.find_field(column_name).field_id,
field_id=1000 + index, # the documentation does this...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, based on the comment, but do we know why this is + 1000?

transform=transforms.parse_transform(transform),
name=field_name(column_name, transform),
)
for index, (column_name, transform) in enumerate(partition_config.items())
)
)
115 changes: 115 additions & 0 deletions elt-common/src/elt_common/iceberg/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import itertools
from typing import Sequence

import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
IntegerType,
LongType,
NestedField,
PrimitiveType,
StringType,
TimeType,
TimestampType,
TimestamptzType,
)


def arrow_type_to_iceberg(arrow_type: pa.DataType) -> PrimitiveType:
"""Returns the Iceberg type for the given pyarrow data type.

:raises TypeError: If the type is unknown or is not supported
"""
if pa.types.is_boolean(arrow_type):
return BooleanType()
elif pa.types.is_int32(arrow_type):
return IntegerType()
elif pa.types.is_int64(arrow_type):
return LongType()
elif pa.types.is_float64(arrow_type):
return DoubleType()
elif pa.types.is_decimal(arrow_type):
return DecimalType(arrow_type.precision, arrow_type.scale)
elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):
return StringType()
elif pa.types.is_date(arrow_type):
return DateType()
elif pa.types.is_time(arrow_type):
if arrow_type.unit != "us":
raise TypeError(
f"Iceberg time type only supports 'us' precision. Requested precision={arrow_type.unit}'."
)
return TimeType()
elif pa.types.is_timestamp(arrow_type):
if arrow_type.unit == "ns":
raise TypeError("Iceberg v1 & v2 does not support timestamps in 'ns' precision.")
if arrow_type.tz is not None:
return TimestamptzType()
else:
return TimestampType()
elif (
pa.types.is_binary(arrow_type)
or pa.types.is_large_binary(arrow_type)
or pa.types.is_fixed_size_binary(arrow_type)
):
return BinaryType()
else:
raise TypeError(f"Pyarrow type '{arrow_type}' unknown to type mapper.")


def arrow_field_to_iceberg(column_id: int, arrow_field: pa.Field) -> NestedField:
return NestedField(
field_id=column_id,
name=arrow_field.name,
field_type=arrow_type_to_iceberg(arrow_field.type),
required=not arrow_field.nullable,
)


def create_schema(arrow_schema: pa.Schema, identifier_fields: Sequence[str] = ()) -> Schema:
"""Convert a pyarrow schema into an iceberg schema

:param arrow_schema: A pyarrow schema.
:param identifier_fields: An optional list of fields to mark as identifiers
"""
iceberg_fields, identifier_field_ids = [], []
for index, arrow_field in enumerate(arrow_schema):
col_id = index + 1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this (and also in evolve_schema) + 1? Does iceberg want indexes to start at 1 rather than 0?

iceberg_fields.append(arrow_field_to_iceberg(col_id, arrow_field))
if arrow_field.name in identifier_fields:
identifier_field_ids.append(col_id)

return Schema(*iceberg_fields, identifier_field_ids=identifier_field_ids)


def evolve_schema(iceberg_schema: Schema, new_arrow_schema: pa.Schema) -> Schema | None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only detects new fields. Do we want to try and handle cases where fields are removed/changed as well, or is that beyond scope?

"""Attempt to evolve the schema to match the data.

Returns the new schema if updates were applied, else None
"""
existing_columns = set(iceberg_schema.column_names)
new_columns = set(new_arrow_schema.names) - existing_columns
if new_columns:
num_existing_fields = len(iceberg_schema.fields)

return Schema(
*(
itertools.chain(
iceberg_schema.fields,
[
arrow_field_to_iceberg(
num_existing_fields + index + 1, new_arrow_schema.field(name)
)
for index, name in enumerate(new_arrow_schema.names)
if name in new_columns
],
)
)
)
else:
return None
28 changes: 28 additions & 0 deletions elt-common/src/elt_common/iceberg/sortorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from elt_common.typing import SortOrderConfig
import pyiceberg.transforms as transforms
from pyiceberg.schema import Schema
from pyiceberg.table.sorting import (
UNSORTED_SORT_ORDER,
SortOrder,
SortField,
SortDirection,
)


def create_sort_order(
sort_order_config: SortOrderConfig | None, iceberg_schema: Schema
) -> SortOrder:
"""If a sort order hint is provider, create the appropriate SortOrder instance."""
if not sort_order_config:
return UNSORTED_SORT_ORDER

return SortOrder(
*(
SortField(
source_id=iceberg_schema.find_field(column_name).field_id,
direction=SortDirection(direction),
transform=transforms.parse_transform("identity"),
)
for column_name, direction in sort_order_config.items()
)
)
21 changes: 2 additions & 19 deletions elt-common/src/elt_common/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,10 @@
"""Utilities for capturing and describing information about ELT jobs in a set of elt pipelines."""

import dataclasses as dc
from pathlib import Path

INGEST = "ingest"


@dc.dataclass(frozen=True)
class ELTJobManifest:
"""Parsed representation of an ELT job"""

name: str
domain: str
ingest_job_dir: Path
from .typing import ELTJobManifest

@property
def full_name(self) -> str:
return f"{self.domain}.{self.name}"

@property
def destination_namespace(self) -> str:
"""The destination namespace for this job: ``{domain}_{name}``."""
return f"{self.domain}_{self.name}"
INGEST = "ingest"


class PipelinesProject:
Expand Down
Loading
Loading