-
Notifications
You must be signed in to change notification settings - Fork 0
Add non-dlt iceberg writer #337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
330ef2e
c916056
4d4455e
83eb915
78ad323
d67e8d5
6e89bcb
14b5923
53fd786
717327e
1f113d3
bb47c72
d592aae
f8c2f8e
9d0f026
a1572e3
c70d116
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| """Iceberg catalog configuration. | ||
|
|
||
| Reads connection properties from environment variables and provides | ||
| a ``connect_catalog()`` helper that returns a connected pyiceberg ``Catalog``. | ||
| """ | ||
|
|
||
| from pyiceberg.catalog import Catalog, load_catalog | ||
| from pyiceberg.typedef import Identifier | ||
| from pyiceberg.utils.config import Config as IcebergCatalogConfig | ||
|
|
||
|
|
||
| def connect_catalog() -> Catalog: | ||
| """The default load_catalog only allows environment variables set before the first import or pyiceberg.catalog""" | ||
| config = IcebergCatalogConfig() | ||
| name = config.get_default_catalog_name() | ||
| return load_catalog(name, **config.get_catalog_config(name)) # type: ignore | ||
|
|
||
|
|
||
| def table_identifier(namespace: str, table_name: str) -> Identifier: | ||
| """Construct a standard fully-qualified table name.""" | ||
| return namespace, table_name |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| """Iceberg table io using pyiceberg. | ||
|
|
||
| Provides :class:`IcebergIO` which can be used to write pyarrow tables to an | ||
| iceberg catalog, and read properties from iceberg tables, whilst handling table, | ||
| namespace, and schema creation/evolution | ||
| """ | ||
|
|
||
| import logging | ||
|
|
||
| import pyarrow as pa | ||
| from pyiceberg.typedef import Identifier | ||
|
|
||
| from elt_common.iceberg.catalog import Catalog | ||
| from elt_common.iceberg.schema import create_schema, evolve_schema | ||
| from elt_common.iceberg.partition import create_partition_spec | ||
| from elt_common.iceberg.sortorder import create_sort_order | ||
| from elt_common.typing import ( | ||
| BaseIO, | ||
| PartitionConfig, | ||
| SortOrderConfig, | ||
| WriteMode, | ||
| ) | ||
| from pyiceberg.table import ALWAYS_TRUE, Table as IcebergTable | ||
|
|
||
|
|
||
| LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class IcebergIO(BaseIO): | ||
| """Read/write arrow tables to/from Iceberg, handling table creation and schema evolution.""" | ||
|
|
||
| def __init__(self, catalog: Catalog) -> None: | ||
| self.catalog = catalog | ||
|
|
||
| def ensure_namespace(self, namespace: str) -> None: | ||
| """Create the namespace if it doesn't already exist.""" | ||
| if not self.catalog.namespace_exists(namespace): | ||
| self.catalog.create_namespace(namespace) | ||
| LOGGER.info(f"Created namespace '{namespace}'") | ||
|
|
||
| def read_property(self, table_id: Identifier, key: str) -> str: | ||
| """Read a table property. | ||
|
|
||
| :param table_id: namespaced identifier of the table to read from | ||
| :param key: the key to read the value of | ||
| :raises: KeyError if property does not exist | ||
| """ | ||
| table = self.catalog.load_table(table_id) | ||
| return table.properties[key] | ||
|
|
||
| def write_table( | ||
| self, | ||
| table_id: Identifier, | ||
| data: pa.Table, | ||
| write_mode: WriteMode, | ||
| *, | ||
| merge_on: list[str] | None = None, | ||
| partition: PartitionConfig | None = None, | ||
| sort_order: SortOrderConfig | None = None, | ||
| properties: dict[str, str] | None = None, | ||
| ) -> None: | ||
| """Write an Arrow table to an Iceberg table. | ||
|
|
||
| :param table_id: namespaced identifier of the table to write to | ||
| :param data: the new data to write to the table | ||
| :param write_mode: 'append' adds the data to the table, | ||
| 'merge' adds new data and modifies existing rows, | ||
| 'replace' completely overwrites the table with the new data | ||
| :param merge_on: field(s) to determine if rows should be merged. Required if write_mode is 'merge' | ||
| :param partition: mapping of table names to the column(s) they should be partitioned by | ||
| :param sort_order: mapping of table names to the sort direction of their column(s) | ||
| :param properties: additional properties to set on the table upon completion. Useful for watermarking | ||
| """ | ||
| if data.num_rows == 0: | ||
| LOGGER.info(f"No data to write to {table_id}, skipping.") | ||
| return | ||
|
|
||
| iceberg_table = self._ensure_table(table_id, data.schema, partition, sort_order) | ||
|
|
||
| with iceberg_table.transaction() as txn: | ||
| if write_mode == "append": | ||
| txn.append(data) | ||
| elif write_mode == "merge": | ||
| if merge_on is None: | ||
| raise ValueError( | ||
| f"Table '{table_id}': write mode 'merge' requires 'merge_on' property." | ||
| ) | ||
| txn.upsert( | ||
| df=data, | ||
| join_cols=merge_on, | ||
| when_matched_update_all=True, | ||
| when_not_matched_insert_all=True, | ||
| case_sensitive=True, | ||
| ) | ||
| elif write_mode == "replace": | ||
| txn.overwrite(data, overwrite_filter=ALWAYS_TRUE, case_sensitive=True) | ||
| else: | ||
| raise ValueError(f"Unsupported write mode: {write_mode!r}") | ||
|
|
||
| if properties is not None: | ||
| txn.set_properties(properties) | ||
|
|
||
| LOGGER.debug(f"Wrote {data.num_rows} rows to {table_id} (mode={write_mode})") | ||
|
|
||
| # private | ||
| def _ensure_table( | ||
| self, | ||
| table_id: Identifier, | ||
| arrow_schema: pa.Schema, | ||
| partition: PartitionConfig | None, | ||
| sort_order: SortOrderConfig | None, | ||
| ) -> IcebergTable: | ||
| """Load an existing table or create a new one. | ||
|
|
||
| For existing tables ensure the schema matches the incoming data.""" | ||
| if self.catalog.table_exists(table_id): | ||
| return _ensure_table_schema(self.catalog.load_table(table_id), arrow_schema) | ||
|
|
||
| iceberg_schema = create_schema(arrow_schema) | ||
| LOGGER.debug(f"Created iceberg schema: {iceberg_schema}") | ||
| partition_spec = create_partition_spec(partition, iceberg_schema) | ||
| LOGGER.debug(f"Created partition spec: {partition_spec}") | ||
| sort_order_spec = create_sort_order(sort_order, iceberg_schema) | ||
| LOGGER.debug(f"Created sort order spec: {sort_order_spec}") | ||
|
|
||
| LOGGER.info(f"Creating table {table_id}") | ||
| return self.catalog.create_table( | ||
| table_id, | ||
| schema=iceberg_schema, | ||
| partition_spec=partition_spec, | ||
| sort_order=sort_order_spec, | ||
| ) | ||
|
|
||
|
|
||
| def _ensure_table_schema(iceberg_table: IcebergTable, new_schema: pa.Schema) -> IcebergTable: | ||
| """Ensure the existing table schema matches the new schema.""" | ||
| new_schema = evolve_schema(iceberg_table.schema(), new_schema) # type:ignore | ||
| if new_schema is not None: | ||
| LOGGER.debug(f"Evolving schema. New schema: {new_schema}") | ||
| with iceberg_table.update_schema() as update: | ||
| update.union_by_name(new_schema) | ||
|
|
||
| return iceberg_table |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| from elt_common.typing import PartitionConfig | ||
| from pyiceberg.partitioning import ( | ||
| UNPARTITIONED_PARTITION_SPEC, | ||
| PartitionField, | ||
| PartitionSpec, | ||
| ) | ||
| from pyiceberg.schema import Schema | ||
| import pyiceberg.transforms as transforms | ||
|
|
||
|
|
||
| def create_partition_spec( | ||
| partition_config: PartitionConfig | None, iceberg_schema: Schema | ||
| ) -> PartitionSpec: | ||
| """Create an Iceberg partition spec from the partition hints""" | ||
|
|
||
| def field_name(column_name: str, transform: str): | ||
| bracket_index = transform.find("[") | ||
| return f"{column_name}_{transform[:bracket_index] if bracket_index > 0 else transform}" | ||
|
|
||
| if not partition_config: | ||
| return UNPARTITIONED_PARTITION_SPEC | ||
|
|
||
| return PartitionSpec( | ||
| *( | ||
| PartitionField( | ||
| source_id=iceberg_schema.find_field(column_name).field_id, | ||
| field_id=1000 + index, # the documentation does this... | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably not, based on the comment, but do we know why this is + 1000? |
||
| transform=transforms.parse_transform(transform), | ||
| name=field_name(column_name, transform), | ||
| ) | ||
| for index, (column_name, transform) in enumerate(partition_config.items()) | ||
| ) | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| import itertools | ||
| from typing import Sequence | ||
|
|
||
| import pyarrow as pa | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.types import ( | ||
| BinaryType, | ||
| BooleanType, | ||
| DateType, | ||
| DecimalType, | ||
| DoubleType, | ||
| IntegerType, | ||
| LongType, | ||
| NestedField, | ||
| PrimitiveType, | ||
| StringType, | ||
| TimeType, | ||
| TimestampType, | ||
| TimestamptzType, | ||
| ) | ||
|
|
||
|
|
||
| def arrow_type_to_iceberg(arrow_type: pa.DataType) -> PrimitiveType: | ||
| """Returns the Iceberg type for the given pyarrow data type. | ||
|
|
||
| :raises TypeError: If the type is unknown or is not supported | ||
| """ | ||
| if pa.types.is_boolean(arrow_type): | ||
| return BooleanType() | ||
| elif pa.types.is_int32(arrow_type): | ||
| return IntegerType() | ||
| elif pa.types.is_int64(arrow_type): | ||
| return LongType() | ||
| elif pa.types.is_float64(arrow_type): | ||
| return DoubleType() | ||
| elif pa.types.is_decimal(arrow_type): | ||
| return DecimalType(arrow_type.precision, arrow_type.scale) | ||
| elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type): | ||
| return StringType() | ||
| elif pa.types.is_date(arrow_type): | ||
| return DateType() | ||
| elif pa.types.is_time(arrow_type): | ||
| if arrow_type.unit != "us": | ||
| raise TypeError( | ||
| f"Iceberg time type only supports 'us' precision. Requested precision={arrow_type.unit}'." | ||
| ) | ||
| return TimeType() | ||
| elif pa.types.is_timestamp(arrow_type): | ||
| if arrow_type.unit == "ns": | ||
| raise TypeError("Iceberg v1 & v2 does not support timestamps in 'ns' precision.") | ||
| if arrow_type.tz is not None: | ||
| return TimestamptzType() | ||
| else: | ||
| return TimestampType() | ||
| elif ( | ||
| pa.types.is_binary(arrow_type) | ||
| or pa.types.is_large_binary(arrow_type) | ||
| or pa.types.is_fixed_size_binary(arrow_type) | ||
| ): | ||
| return BinaryType() | ||
| else: | ||
| raise TypeError(f"Pyarrow type '{arrow_type}' unknown to type mapper.") | ||
|
|
||
|
|
||
| def arrow_field_to_iceberg(column_id: int, arrow_field: pa.Field) -> NestedField: | ||
| return NestedField( | ||
| field_id=column_id, | ||
| name=arrow_field.name, | ||
| field_type=arrow_type_to_iceberg(arrow_field.type), | ||
| required=not arrow_field.nullable, | ||
| ) | ||
|
|
||
|
|
||
| def create_schema(arrow_schema: pa.Schema, identifier_fields: Sequence[str] = ()) -> Schema: | ||
| """Convert a pyarrow schema into an iceberg schema | ||
|
|
||
| :param arrow_schema: A pyarrow schema. | ||
| :param identifier_fields: An optional list of fields to mark as identifiers | ||
| """ | ||
| iceberg_fields, identifier_field_ids = [], [] | ||
| for index, arrow_field in enumerate(arrow_schema): | ||
| col_id = index + 1 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this (and also in |
||
| iceberg_fields.append(arrow_field_to_iceberg(col_id, arrow_field)) | ||
| if arrow_field.name in identifier_fields: | ||
| identifier_field_ids.append(col_id) | ||
|
|
||
| return Schema(*iceberg_fields, identifier_field_ids=identifier_field_ids) | ||
|
|
||
|
|
||
| def evolve_schema(iceberg_schema: Schema, new_arrow_schema: pa.Schema) -> Schema | None: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This only detects new fields. Do we want to try and handle cases where fields are removed/changed as well, or is that beyond scope? |
||
| """Attempt to evolve the schema to match the data. | ||
|
|
||
| Returns the new schema if updates were applied, else None | ||
| """ | ||
| existing_columns = set(iceberg_schema.column_names) | ||
| new_columns = set(new_arrow_schema.names) - existing_columns | ||
| if new_columns: | ||
| num_existing_fields = len(iceberg_schema.fields) | ||
|
|
||
| return Schema( | ||
| *( | ||
| itertools.chain( | ||
| iceberg_schema.fields, | ||
| [ | ||
| arrow_field_to_iceberg( | ||
| num_existing_fields + index + 1, new_arrow_schema.field(name) | ||
| ) | ||
| for index, name in enumerate(new_arrow_schema.names) | ||
| if name in new_columns | ||
| ], | ||
| ) | ||
| ) | ||
| ) | ||
| else: | ||
| return None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| from elt_common.typing import SortOrderConfig | ||
| import pyiceberg.transforms as transforms | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.table.sorting import ( | ||
| UNSORTED_SORT_ORDER, | ||
| SortOrder, | ||
| SortField, | ||
| SortDirection, | ||
| ) | ||
|
|
||
|
|
||
| def create_sort_order( | ||
| sort_order_config: SortOrderConfig | None, iceberg_schema: Schema | ||
| ) -> SortOrder: | ||
| """If a sort order hint is provider, create the appropriate SortOrder instance.""" | ||
| if not sort_order_config: | ||
| return UNSORTED_SORT_ORDER | ||
|
|
||
| return SortOrder( | ||
| *( | ||
| SortField( | ||
| source_id=iceberg_schema.find_field(column_name).field_id, | ||
| direction=SortDirection(direction), | ||
| transform=transforms.parse_transform("identity"), | ||
| ) | ||
| for column_name, direction in sort_order_config.items() | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea what the norms are for this, but would it make sense to just return the column name if the 'identity' transform is being used?