Add non-dlt iceberg writer#337
Conversation
ref #321 Changes taken from various commits on the elt-command-without-dlt branch Co-authored-by: Will Taylor <william.h.taylor@stfc.ac.uk>
Because TimeType has a fixed precision, it doesn't actually have a 'unit' field. This worked because 'cast' just returns the original value, but the type checker was confused.
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR adds comprehensive Iceberg support to the ELT platform. It introduces type contracts for IO operations, schema translation from PyArrow to Iceberg format, configuration helpers for partitioning and sorting, and a full ChangesIceberg integration for ELT destinations
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
WHTaylor
left a comment
There was a problem hiding this comment.
Should the preexisting trino.py be moved into the maintenance subpackage? I think it's only used in there.
| return Schema(*iceberg_fields, identifier_field_ids=identifier_field_ids) | ||
|
|
||
|
|
||
| def evolve_schema(iceberg_schema: Schema, new_arrow_schema: pa.Schema) -> Schema | None: |
There was a problem hiding this comment.
This only detects new fields. Do we want to try and handle cases where fields are removed/changed as well, or is that beyond scope?
| ) -> PartitionSpec: | ||
| """Create an Iceberg partition spec from the partition hints""" | ||
|
|
||
| def field_name(column_name: str, transform: str): |
There was a problem hiding this comment.
No idea what the norms are for this, but would it make sense to just return the column name if the 'identity' transform is being used?
| """ | ||
| iceberg_fields, identifier_field_ids = [], [] | ||
| for index, arrow_field in enumerate(arrow_schema): | ||
| col_id = index + 1 |
There was a problem hiding this comment.
Why is this (and also in evolve_schema) + 1? Does iceberg want indexes to start at 1 rather than 0?
|
|
||
|
|
||
| @dc.dataclass(frozen=True) | ||
| class ELTJobManifest: |
There was a problem hiding this comment.
I moved this here because it's where it ends up on elt-command-without-dlt, to be shared between pipeline and extract, but it feels like a slight misnomer for it to be in typing. I might move it later when implementing the runner functionality.
| *( | ||
| PartitionField( | ||
| source_id=iceberg_schema.find_field(column_name).field_id, | ||
| field_id=1000 + index, # the documentation does this... |
There was a problem hiding this comment.
Probably not, based on the comment, but do we know why this is + 1000?
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
elt-common/src/elt_common/iceberg/io.py (1)
26-27: ⚡ Quick winRemove redundant self-assignment.
Line 27 assigns
NoSuchTableErrorto itself, which has no effect. The comment suggests the intent is to expose this exception at the module level, but this alias doesn't accomplish anything. Either remove these lines or, if re-exporting is desired, addNoSuchTableErrorto an__all__list.♻️ Proposed fix
Remove the redundant assignment:
-# Map exception 1:1 -NoSuchTableError = NoSuchTableError - LOGGER = logging.getLogger(__name__)Or, if you intend to re-export it, add an
__all__declaration:+__all__ = ["IcebergIO", "NoSuchTableError"] + # Map exception 1:1 NoSuchTableError = NoSuchTableError🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@elt-common/src/elt_common/iceberg/io.py` around lines 26 - 27, The line "NoSuchTableError = NoSuchTableError" is a redundant self-assignment; remove that assignment from the module (or if you intended to re-export the symbol, add "NoSuchTableError" to the module's __all__ list instead). Update the module where NoSuchTableError is declared/used (the iceberg/io.py export area) to either delete the redundant alias or add the symbol name to __all__ so it is properly exported.elt-common/src/elt_common/typing.py (1)
12-33: ⚡ Quick winConsider removing redundant
NotImplementedErrorraises from abstract methods.Abstract methods decorated with
@abstractmethodalready prevent instantiation of the abstract class if the methods aren't implemented by subclasses. Theraise NotImplementedErrorstatements on lines 15-17 and 31-33 are redundant and will never execute. The Pythonic approach is to usepassor just a docstring.♻️ Suggested refactor
class BaseIO(ABC): `@abstractmethod` def ensure_namespace(self, namespace: str) -> None: - raise NotImplementedError( - "Subclass should implement `ensure_namespace` to ensure the namespace exists." - ) + """Subclass should implement `ensure_namespace` to ensure the namespace exists.""" `@abstractmethod` def write_table( self, table_id: Identifier, data: "pa.Table", write_mode: "WriteMode", *, merge_on: list[str] | None = None, partition: "PartitionConfig | None" = None, sort_order: "SortOrderConfig | None" = None, properties: dict[str, str] | None = None, ) -> None: - raise NotImplementedError( - "Subclass should implement `write_table` to write a table to the destination." - ) + """Subclass should implement `write_table` to write a table to the destination."""🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@elt-common/src/elt_common/typing.py` around lines 12 - 33, The abstract methods in BaseIO—ensure_namespace and write_table—contain redundant raise NotImplementedError blocks; remove those raise statements and replace them with either a simple pass or a docstring while leaving the `@abstractmethod` decorator, method signatures (including return/type hints like Identifier, pa.Table, WriteMode, PartitionConfig, SortOrderConfig), and parameters (merge_on, partition, sort_order, properties) intact so subclassing behavior is preserved.elt-common/tests/unit_tests/iceberg/test_partition.py (1)
132-137: 💤 Low valueConsider using a more specific exception type.
Line 136 catches the generic
Exceptiontype. If you know which specific exception pyiceberg raises for missing fields (e.g.,ValueErroror a custom pyiceberg exception), catching that specific type would make the test more precise and prevent it from accidentally passing if a different unexpected error occurs.♻️ Example if pyiceberg raises ValueError
- with pytest.raises(Exception): # pyiceberg raises an error for missing fields + with pytest.raises(ValueError, match="nonexistent_col"): create_partition_spec(partition_hint, sample_schema)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@elt-common/tests/unit_tests/iceberg/test_partition.py` around lines 132 - 137, The test test_create_partition_spec_nonexistent_column_raises_error should catch a specific exception rather than Exception; update the pytest.raises(...) to the concrete exception pyiceberg raises for missing fields (e.g., replace pytest.raises(Exception) with pytest.raises(ValueError) or import and use the pyiceberg-specific exception), and add the necessary import for that exception at the top of the test file so create_partition_spec(partition_hint, sample_schema) is asserted to raise the precise error.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@elt-common/src/elt_common/iceberg/schema.py`:
- Around line 37-38: The code incorrectly casts arrow_type to the Iceberg
DecimalType; instead extract precision and scale from the PyArrow decimal type
and pass those to the Iceberg DecimalType constructor. Replace the cast of
arrow_type to DecimalType in the decimal handling code (the variable
decimal_type and the use of DecimalType on return) with a check/cast to the
PyArrow decimal class (e.g., pa.Decimal128Type or pa.Decimal256Type) or simply
read arrow_type.precision and arrow_type.scale, then return
iceberg.schema.DecimalType(precision, scale).
In `@elt-common/tests/unit_tests/iceberg/test_io.py`:
- Around line 59-66: The test test_ensure_namespace_noop_when_namespace_exists
incorrectly asserts mock_dependencies.mock_catalog.assert_not_called(); change
the assertion to verify the specific method wasn't invoked: assert that
mock_dependencies.mock_catalog.create_namespace.assert_not_called() after
calling IcebergIO.ensure_namespace("test_ns") so the test confirms
create_namespace is not called when namespace_exists returns True; locate the
call in the test and replace the broad catalog assertion with the
create_namespace-specific assertion.
---
Nitpick comments:
In `@elt-common/src/elt_common/iceberg/io.py`:
- Around line 26-27: The line "NoSuchTableError = NoSuchTableError" is a
redundant self-assignment; remove that assignment from the module (or if you
intended to re-export the symbol, add "NoSuchTableError" to the module's __all__
list instead). Update the module where NoSuchTableError is declared/used (the
iceberg/io.py export area) to either delete the redundant alias or add the
symbol name to __all__ so it is properly exported.
In `@elt-common/src/elt_common/typing.py`:
- Around line 12-33: The abstract methods in BaseIO—ensure_namespace and
write_table—contain redundant raise NotImplementedError blocks; remove those
raise statements and replace them with either a simple pass or a docstring while
leaving the `@abstractmethod` decorator, method signatures (including return/type
hints like Identifier, pa.Table, WriteMode, PartitionConfig, SortOrderConfig),
and parameters (merge_on, partition, sort_order, properties) intact so
subclassing behavior is preserved.
In `@elt-common/tests/unit_tests/iceberg/test_partition.py`:
- Around line 132-137: The test
test_create_partition_spec_nonexistent_column_raises_error should catch a
specific exception rather than Exception; update the pytest.raises(...) to the
concrete exception pyiceberg raises for missing fields (e.g., replace
pytest.raises(Exception) with pytest.raises(ValueError) or import and use the
pyiceberg-specific exception), and add the necessary import for that exception
at the top of the test file so create_partition_spec(partition_hint,
sample_schema) is asserted to raise the precise error.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cbea9581-c44d-49db-92c6-a53578607f39
📒 Files selected for processing (13)
elt-common/src/elt_common/iceberg/catalog.pyelt-common/src/elt_common/iceberg/io.pyelt-common/src/elt_common/iceberg/partition.pyelt-common/src/elt_common/iceberg/schema.pyelt-common/src/elt_common/iceberg/sortorder.pyelt-common/src/elt_common/pipeline.pyelt-common/src/elt_common/typing.pyelt-common/tests/unit_tests/iceberg/conftest.pyelt-common/tests/unit_tests/iceberg/test_catalog.pyelt-common/tests/unit_tests/iceberg/test_io.pyelt-common/tests/unit_tests/iceberg/test_partition.pyelt-common/tests/unit_tests/iceberg/test_schema.pyelt-common/tests/unit_tests/iceberg/test_sortorder.py
Includes the same kind of change as 4d4455e
ref #321
Adds functionality for writing pyarrow tables to iceberg, similar to what's in
elt_common.dlt_destinations.pyicebergwithout thedltparts.Again, the vast majority of these changes were taken directly from the
elt-command-without-dltbranch, compressed into commit 35df000. The commits after that are various small tweaks/additions that are probably easiest to review individually.There are some outstanding questions I have that I've left as comments below.
Summary by CodeRabbit
New Features
Tests