Skip to content

Add non-dlt iceberg writer#337

Open
WHTaylor wants to merge 17 commits into
mainfrom
321-iceberg
Open

Add non-dlt iceberg writer#337
WHTaylor wants to merge 17 commits into
mainfrom
321-iceberg

Conversation

@WHTaylor

@WHTaylor WHTaylor commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

ref #321

Adds functionality for writing pyarrow tables to iceberg, similar to what's in elt_common.dlt_destinations.pyiceberg without the dlt parts.

Again, the vast majority of these changes were taken directly from the elt-command-without-dlt branch, compressed into commit 35df000. The commits after that are various small tweaks/additions that are probably easiest to review individually.

There are some outstanding questions I have that I've left as comments below.

Summary by CodeRabbit

  • New Features

    • Added Apache Iceberg data warehouse integration with support for connecting to Iceberg catalogues.
    • Implemented table read/write functionality supporting append, merge, and replace write modes.
    • Added partitioning and sort order configuration capabilities.
    • Integrated automatic schema translation and evolution from PyArrow formats.
  • Tests

    • Comprehensive unit test coverage for Iceberg operations and utilities.

@coderabbitai

coderabbitai Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 987d7c32-7293-4ea0-9f20-6a9a18b1767b

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • ✅ Review completed - (🔄 Check again to review again)
📝 Walkthrough

Walkthrough

This PR adds comprehensive Iceberg support to the ELT platform. It introduces type contracts for IO operations, schema translation from PyArrow to Iceberg format, configuration helpers for partitioning and sorting, and a full IcebergIO implementation supporting append, merge, and replace write modes with schema evolution and transaction management.

Changes

Iceberg integration for ELT destinations

Layer / File(s) Summary
Type contracts and ELTJobManifest centralisation
src/elt_common/typing.py, src/elt_common/pipeline.py, tests/unit_tests/iceberg/conftest.py
BaseIO abstract interface defines ensure_namespace and write_table contracts. ELTJobManifest dataclass with full_name and destination_namespace properties is centralised in typing.py and imported by pipeline.py. Test conftest provides a sample Iceberg schema fixture with required/optional fields for downstream tests.
PyArrow to Iceberg schema conversion and evolution
src/elt_common/iceberg/schema.py, tests/unit_tests/iceberg/test_schema.py
arrow_type_to_iceberg maps PyArrow primitives to Iceberg types with precision validation; arrow_field_to_iceberg constructs Iceberg NestedFields preserving nullability; create_schema assigns sequential field IDs and marks identifier fields; evolve_schema detects new columns and returns an evolved schema or None. Tests cover all type mappings, precision error handling, identifier fields, and schema evolution paths.
Partition and sort order specification helpers
src/elt_common/iceberg/partition.py, src/elt_common/iceberg/sortorder.py, tests/unit_tests/iceberg/test_partition.py, tests/unit_tests/iceberg/test_sortorder.py
create_partition_spec converts configuration dicts to Iceberg PartitionSpecs with transform parsing and field naming; create_sort_order builds SortOrder instances from column/direction pairs. Tests validate transform types (identity, date, hour, truncate, bucket), field ID sequencing, multi-column specs, and exception paths for missing columns or invalid directions.
Iceberg catalog connectivity and IO implementation
src/elt_common/iceberg/catalog.py, src/elt_common/iceberg/io.py, tests/unit_tests/iceberg/test_catalog.py, tests/unit_tests/iceberg/test_io.py
connect_catalog loads Iceberg catalog from pyiceberg environment config; table_identifier forms qualified table names. IcebergIO implements BaseIO for namespace creation, property reads, and write_table supporting append (direct insert), merge (upsert with required merge_on columns), and replace (overwrite with always-true filter). Schema evolution is applied to existing tables. Optional partitioning, sort orders, and table properties are configured per write. Tests cover all write modes, schema evolution, empty data handling, property setting, and merge validation.

Suggested labels

enhancement

Poem

🐰 From Arrow schemas to Iceberg tables so grand,
Partitions and sorts now align with a plan,
Append, merge, replace—each mode finds its way,
Through catalog's streams to the warehouse they stay!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 54.24% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add non-dlt iceberg writer' accurately summarizes the main change: introducing Iceberg writing functionality that operates independently of DLT, as evidenced by the new IcebergIO class, schema utilities, and related modules.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@WHTaylor WHTaylor left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the preexisting trino.py be moved into the maintenance subpackage? I think it's only used in there.

return Schema(*iceberg_fields, identifier_field_ids=identifier_field_ids)


def evolve_schema(iceberg_schema: Schema, new_arrow_schema: pa.Schema) -> Schema | None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only detects new fields. Do we want to try and handle cases where fields are removed/changed as well, or is that beyond scope?

) -> PartitionSpec:
"""Create an Iceberg partition spec from the partition hints"""

def field_name(column_name: str, transform: str):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea what the norms are for this, but would it make sense to just return the column name if the 'identity' transform is being used?

"""
iceberg_fields, identifier_field_ids = [], []
for index, arrow_field in enumerate(arrow_schema):
col_id = index + 1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this (and also in evolve_schema) + 1? Does iceberg want indexes to start at 1 rather than 0?



@dc.dataclass(frozen=True)
class ELTJobManifest:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this here because it's where it ends up on elt-command-without-dlt, to be shared between pipeline and extract, but it feels like a slight misnomer for it to be in typing. I might move it later when implementing the runner functionality.

*(
PartitionField(
source_id=iceberg_schema.find_field(column_name).field_id,
field_id=1000 + index, # the documentation does this...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, based on the comment, but do we know why this is + 1000?

@WHTaylor WHTaylor marked this pull request as ready for review June 5, 2026 15:19
@WHTaylor WHTaylor requested a review from a team as a code owner June 5, 2026 15:19

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
elt-common/src/elt_common/iceberg/io.py (1)

26-27: ⚡ Quick win

Remove redundant self-assignment.

Line 27 assigns NoSuchTableError to itself, which has no effect. The comment suggests the intent is to expose this exception at the module level, but this alias doesn't accomplish anything. Either remove these lines or, if re-exporting is desired, add NoSuchTableError to an __all__ list.

♻️ Proposed fix

Remove the redundant assignment:

-# Map exception 1:1
-NoSuchTableError = NoSuchTableError
-
 LOGGER = logging.getLogger(__name__)

Or, if you intend to re-export it, add an __all__ declaration:

+__all__ = ["IcebergIO", "NoSuchTableError"]
+
 # Map exception 1:1
 NoSuchTableError = NoSuchTableError
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elt-common/src/elt_common/iceberg/io.py` around lines 26 - 27, The line
"NoSuchTableError = NoSuchTableError" is a redundant self-assignment; remove
that assignment from the module (or if you intended to re-export the symbol, add
"NoSuchTableError" to the module's __all__ list instead). Update the module
where NoSuchTableError is declared/used (the iceberg/io.py export area) to
either delete the redundant alias or add the symbol name to __all__ so it is
properly exported.
elt-common/src/elt_common/typing.py (1)

12-33: ⚡ Quick win

Consider removing redundant NotImplementedError raises from abstract methods.

Abstract methods decorated with @abstractmethod already prevent instantiation of the abstract class if the methods aren't implemented by subclasses. The raise NotImplementedError statements on lines 15-17 and 31-33 are redundant and will never execute. The Pythonic approach is to use pass or just a docstring.

♻️ Suggested refactor
 class BaseIO(ABC):
     `@abstractmethod`
     def ensure_namespace(self, namespace: str) -> None:
-        raise NotImplementedError(
-            "Subclass should implement `ensure_namespace` to ensure the namespace exists."
-        )
+        """Subclass should implement `ensure_namespace` to ensure the namespace exists."""

     `@abstractmethod`
     def write_table(
         self,
         table_id: Identifier,
         data: "pa.Table",
         write_mode: "WriteMode",
         *,
         merge_on: list[str] | None = None,
         partition: "PartitionConfig | None" = None,
         sort_order: "SortOrderConfig | None" = None,
         properties: dict[str, str] | None = None,
     ) -> None:
-        raise NotImplementedError(
-            "Subclass should implement `write_table` to write a table to the destination."
-        )
+        """Subclass should implement `write_table` to write a table to the destination."""
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elt-common/src/elt_common/typing.py` around lines 12 - 33, The abstract
methods in BaseIO—ensure_namespace and write_table—contain redundant raise
NotImplementedError blocks; remove those raise statements and replace them with
either a simple pass or a docstring while leaving the `@abstractmethod` decorator,
method signatures (including return/type hints like Identifier, pa.Table,
WriteMode, PartitionConfig, SortOrderConfig), and parameters (merge_on,
partition, sort_order, properties) intact so subclassing behavior is preserved.
elt-common/tests/unit_tests/iceberg/test_partition.py (1)

132-137: 💤 Low value

Consider using a more specific exception type.

Line 136 catches the generic Exception type. If you know which specific exception pyiceberg raises for missing fields (e.g., ValueError or a custom pyiceberg exception), catching that specific type would make the test more precise and prevent it from accidentally passing if a different unexpected error occurs.

♻️ Example if pyiceberg raises ValueError
-    with pytest.raises(Exception):  # pyiceberg raises an error for missing fields
+    with pytest.raises(ValueError, match="nonexistent_col"):
         create_partition_spec(partition_hint, sample_schema)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elt-common/tests/unit_tests/iceberg/test_partition.py` around lines 132 -
137, The test test_create_partition_spec_nonexistent_column_raises_error should
catch a specific exception rather than Exception; update the pytest.raises(...)
to the concrete exception pyiceberg raises for missing fields (e.g., replace
pytest.raises(Exception) with pytest.raises(ValueError) or import and use the
pyiceberg-specific exception), and add the necessary import for that exception
at the top of the test file so create_partition_spec(partition_hint,
sample_schema) is asserted to raise the precise error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@elt-common/src/elt_common/iceberg/schema.py`:
- Around line 37-38: The code incorrectly casts arrow_type to the Iceberg
DecimalType; instead extract precision and scale from the PyArrow decimal type
and pass those to the Iceberg DecimalType constructor. Replace the cast of
arrow_type to DecimalType in the decimal handling code (the variable
decimal_type and the use of DecimalType on return) with a check/cast to the
PyArrow decimal class (e.g., pa.Decimal128Type or pa.Decimal256Type) or simply
read arrow_type.precision and arrow_type.scale, then return
iceberg.schema.DecimalType(precision, scale).

In `@elt-common/tests/unit_tests/iceberg/test_io.py`:
- Around line 59-66: The test test_ensure_namespace_noop_when_namespace_exists
incorrectly asserts mock_dependencies.mock_catalog.assert_not_called(); change
the assertion to verify the specific method wasn't invoked: assert that
mock_dependencies.mock_catalog.create_namespace.assert_not_called() after
calling IcebergIO.ensure_namespace("test_ns") so the test confirms
create_namespace is not called when namespace_exists returns True; locate the
call in the test and replace the broad catalog assertion with the
create_namespace-specific assertion.

---

Nitpick comments:
In `@elt-common/src/elt_common/iceberg/io.py`:
- Around line 26-27: The line "NoSuchTableError = NoSuchTableError" is a
redundant self-assignment; remove that assignment from the module (or if you
intended to re-export the symbol, add "NoSuchTableError" to the module's __all__
list instead). Update the module where NoSuchTableError is declared/used (the
iceberg/io.py export area) to either delete the redundant alias or add the
symbol name to __all__ so it is properly exported.

In `@elt-common/src/elt_common/typing.py`:
- Around line 12-33: The abstract methods in BaseIO—ensure_namespace and
write_table—contain redundant raise NotImplementedError blocks; remove those
raise statements and replace them with either a simple pass or a docstring while
leaving the `@abstractmethod` decorator, method signatures (including return/type
hints like Identifier, pa.Table, WriteMode, PartitionConfig, SortOrderConfig),
and parameters (merge_on, partition, sort_order, properties) intact so
subclassing behavior is preserved.

In `@elt-common/tests/unit_tests/iceberg/test_partition.py`:
- Around line 132-137: The test
test_create_partition_spec_nonexistent_column_raises_error should catch a
specific exception rather than Exception; update the pytest.raises(...) to the
concrete exception pyiceberg raises for missing fields (e.g., replace
pytest.raises(Exception) with pytest.raises(ValueError) or import and use the
pyiceberg-specific exception), and add the necessary import for that exception
at the top of the test file so create_partition_spec(partition_hint,
sample_schema) is asserted to raise the precise error.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: cbea9581-c44d-49db-92c6-a53578607f39

📥 Commits

Reviewing files that changed from the base of the PR and between fd4bb45 and 9d0f026.

📒 Files selected for processing (13)
  • elt-common/src/elt_common/iceberg/catalog.py
  • elt-common/src/elt_common/iceberg/io.py
  • elt-common/src/elt_common/iceberg/partition.py
  • elt-common/src/elt_common/iceberg/schema.py
  • elt-common/src/elt_common/iceberg/sortorder.py
  • elt-common/src/elt_common/pipeline.py
  • elt-common/src/elt_common/typing.py
  • elt-common/tests/unit_tests/iceberg/conftest.py
  • elt-common/tests/unit_tests/iceberg/test_catalog.py
  • elt-common/tests/unit_tests/iceberg/test_io.py
  • elt-common/tests/unit_tests/iceberg/test_partition.py
  • elt-common/tests/unit_tests/iceberg/test_schema.py
  • elt-common/tests/unit_tests/iceberg/test_sortorder.py

Comment thread elt-common/src/elt_common/iceberg/schema.py Outdated
Comment thread elt-common/tests/unit_tests/iceberg/test_io.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants