diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index bf24ae32e..856a3f9a0 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -20,6 +20,10 @@ set(ICEBERG_INCLUDES "$" set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc + data/data_writer.cc + data/equality_delete_writer.cc + data/file_writer_factory.cc + data/position_delete_writer.cc data/writer.cc delete_file_index.cc expression/aggregate.cc diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc new file mode 100644 index 000000000..d3648294c --- /dev/null +++ b/src/iceberg/data/data_writer.cc @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/data_writer.h" + +namespace iceberg { + +//============================================================================= +// DataWriter - stub implementation (to be completed in separate PR per #441) +//============================================================================= + +class DataWriter::Impl { + public: + explicit Impl(DataWriterOptions options) : options_(std::move(options)) {} + DataWriterOptions options_; + bool is_closed_ = false; +}; + +DataWriter::DataWriter(std::unique_ptr impl) : impl_(std::move(impl)) {} +DataWriter::~DataWriter() = default; + +Status DataWriter::Write(ArrowArray* data) { + if (!data) { + return InvalidArgument("Cannot write null data"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +Result DataWriter::Length() const { + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +Status DataWriter::Close() { + if (impl_->is_closed_) { + return {}; // Close is idempotent + } + impl_->is_closed_ = true; + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +Result DataWriter::Metadata() { + if (!impl_->is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +// Internal factory function for FileWriterFactory +std::unique_ptr MakeDataWriterInternal(const DataWriterOptions& options) { + auto impl = std::make_unique(options); + return std::unique_ptr(new DataWriter(std::move(impl))); +} + +} // namespace iceberg diff --git a/src/iceberg/data/data_writer.h b/src/iceberg/data/data_writer.h new file mode 100644 index 000000000..cd7e4d978 --- /dev/null +++ b/src/iceberg/data/data_writer.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/data_writer.h +/// Data writer for Iceberg tables. + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating a DataWriter. +/// +/// \note The following features from Java DataWriter are not yet supported: +/// - Encryption key metadata (uses FileIO instead of EncryptedOutputFile) +/// - Metrics collection and reporting +/// - Split offsets tracking +struct ICEBERG_EXPORT DataWriterOptions { + std::string path; + std::shared_ptr schema; + std::shared_ptr spec; + PartitionValues partition; + FileFormatType format = FileFormatType::kParquet; + std::shared_ptr io; + std::optional sort_order_id; + std::shared_ptr properties; +}; + +/// \brief Writer for Iceberg data files. +/// +/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only +/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. +class ICEBERG_EXPORT DataWriter : public FileWriter { + public: + ~DataWriter() override; + + Status Write(ArrowArray* data) override; + Result Length() const override; + Status Close() override; + Result Metadata() override; + + private: + friend class FileWriterFactory; + friend std::unique_ptr MakeDataWriterInternal(const DataWriterOptions&); + class Impl; + std::unique_ptr impl_; + explicit DataWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc new file mode 100644 index 000000000..8d3487e5e --- /dev/null +++ b/src/iceberg/data/equality_delete_writer.cc @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/equality_delete_writer.h" + +namespace iceberg { + +//============================================================================= +// EqualityDeleteWriter - stub implementation (to be completed in separate PR per #441) +//============================================================================= + +class EqualityDeleteWriter::Impl { + public: + explicit Impl(EqualityDeleteWriterOptions options) : options_(std::move(options)) {} + EqualityDeleteWriterOptions options_; + bool is_closed_ = false; +}; + +EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} +EqualityDeleteWriter::~EqualityDeleteWriter() = default; + +Status EqualityDeleteWriter::Write(ArrowArray* data) { + if (!data) { + return InvalidArgument("Cannot write null data"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +Result EqualityDeleteWriter::Length() const { + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +Status EqualityDeleteWriter::Close() { + if (impl_->is_closed_) { + return {}; // Close is idempotent + } + impl_->is_closed_ = true; + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +Result EqualityDeleteWriter::Metadata() { + if (!impl_->is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +const std::vector& EqualityDeleteWriter::equality_field_ids() const { + return impl_->options_.equality_field_ids; +} + +// Internal factory function for FileWriterFactory +std::unique_ptr MakeEqualityDeleteWriterInternal( + const EqualityDeleteWriterOptions& options) { + auto impl = std::make_unique(options); + return std::unique_ptr(new EqualityDeleteWriter(std::move(impl))); +} + +} // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h new file mode 100644 index 000000000..100e96194 --- /dev/null +++ b/src/iceberg/data/equality_delete_writer.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/equality_delete_writer.h +/// Equality delete writer for Iceberg tables. + +#include +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating an EqualityDeleteWriter. +/// +/// \note The following features from Java EqualityDeleteWriter are not yet supported: +/// - Encryption key metadata +/// - Metrics collection and reporting +/// - Split offsets tracking +struct ICEBERG_EXPORT EqualityDeleteWriterOptions { + std::string path; + std::shared_ptr schema; + std::shared_ptr spec; + PartitionValues partition; + FileFormatType format = FileFormatType::kParquet; + std::shared_ptr io; + std::vector equality_field_ids; + std::optional sort_order_id; + std::shared_ptr properties; +}; + +/// \brief Writer for Iceberg equality delete files. +/// +/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only +/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. +class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { + public: + ~EqualityDeleteWriter() override; + + Status Write(ArrowArray* data) override; + Result Length() const override; + Status Close() override; + Result Metadata() override; + + const std::vector& equality_field_ids() const; + + private: + friend class FileWriterFactory; + friend std::unique_ptr MakeEqualityDeleteWriterInternal( + const EqualityDeleteWriterOptions&); + class Impl; + std::unique_ptr impl_; + explicit EqualityDeleteWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/file_writer_factory.cc b/src/iceberg/data/file_writer_factory.cc new file mode 100644 index 000000000..69834be8b --- /dev/null +++ b/src/iceberg/data/file_writer_factory.cc @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/file_writer_factory.h" + +#include "iceberg/data/data_writer.h" +#include "iceberg/data/equality_delete_writer.h" +#include "iceberg/data/position_delete_writer.h" + +namespace iceberg { + +// Forward declarations for internal factory functions +std::unique_ptr MakeDataWriterInternal(const DataWriterOptions& options); +std::unique_ptr MakePositionDeleteWriterInternal( + const PositionDeleteWriterOptions& options); +std::unique_ptr MakeEqualityDeleteWriterInternal( + const EqualityDeleteWriterOptions& options); + +//============================================================================= +// FileWriterFactory::Impl +//============================================================================= + +class FileWriterFactory::Impl { + public: + Impl(std::shared_ptr schema, std::shared_ptr spec, + std::shared_ptr io, std::shared_ptr properties) + : schema_(std::move(schema)), + spec_(std::move(spec)), + io_(std::move(io)), + properties_(std::move(properties)) {} + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; + std::shared_ptr properties_; + + std::shared_ptr eq_delete_schema_; + std::vector equality_field_ids_; + std::shared_ptr pos_delete_row_schema_; +}; + +//============================================================================= +// FileWriterFactory +//============================================================================= + +FileWriterFactory::FileWriterFactory(std::shared_ptr schema, + std::shared_ptr spec, + std::shared_ptr io, + std::shared_ptr properties) + : impl_(std::make_unique(std::move(schema), std::move(spec), std::move(io), + std::move(properties))) {} + +FileWriterFactory::~FileWriterFactory() = default; + +void FileWriterFactory::SetEqualityDeleteConfig(std::shared_ptr eq_delete_schema, + std::vector equality_field_ids) { + impl_->eq_delete_schema_ = std::move(eq_delete_schema); + impl_->equality_field_ids_ = std::move(equality_field_ids); +} + +void FileWriterFactory::SetPositionDeleteRowSchema( + std::shared_ptr pos_delete_row_schema) { + impl_->pos_delete_row_schema_ = std::move(pos_delete_row_schema); +} + +Result> FileWriterFactory::NewDataWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id) { + // Input validation + if (path.empty()) { + return InvalidArgument("Path cannot be empty"); + } + if (!impl_->schema_) { + return InvalidArgument("Schema cannot be null"); + } + if (!impl_->spec_) { + return InvalidArgument("PartitionSpec cannot be null"); + } + + DataWriterOptions options; + options.path = std::move(path); + options.schema = impl_->schema_; + options.spec = impl_->spec_; + options.partition = std::move(partition); + options.format = format; + options.io = impl_->io_; + options.sort_order_id = sort_order_id; + options.properties = impl_->properties_; + + return MakeDataWriterInternal(options); +} + +Result> FileWriterFactory::NewPositionDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition) { + // Input validation + if (path.empty()) { + return InvalidArgument("Path cannot be empty"); + } + if (!impl_->schema_) { + return InvalidArgument("Schema cannot be null"); + } + if (!impl_->spec_) { + return InvalidArgument("PartitionSpec cannot be null"); + } + + PositionDeleteWriterOptions options; + options.path = std::move(path); + options.schema = impl_->schema_; + options.spec = impl_->spec_; + options.partition = std::move(partition); + options.format = format; + options.io = impl_->io_; + options.row_schema = impl_->pos_delete_row_schema_; + options.properties = impl_->properties_; + + return MakePositionDeleteWriterInternal(options); +} + +Result> FileWriterFactory::NewEqualityDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id) { + // Input validation + if (path.empty()) { + return InvalidArgument("Path cannot be empty"); + } + if (!impl_->schema_) { + return InvalidArgument("Schema cannot be null"); + } + if (!impl_->spec_) { + return InvalidArgument("PartitionSpec cannot be null"); + } + if (impl_->equality_field_ids_.empty()) { + return InvalidArgument( + "Equality field IDs cannot be empty - call SetEqualityDeleteConfig first"); + } + + EqualityDeleteWriterOptions options; + options.path = std::move(path); + options.schema = impl_->eq_delete_schema_ ? impl_->eq_delete_schema_ : impl_->schema_; + options.spec = impl_->spec_; + options.partition = std::move(partition); + options.format = format; + options.io = impl_->io_; + options.equality_field_ids = impl_->equality_field_ids_; + options.sort_order_id = sort_order_id; + options.properties = impl_->properties_; + + return MakeEqualityDeleteWriterInternal(options); +} + +} // namespace iceberg diff --git a/src/iceberg/data/file_writer_factory.h b/src/iceberg/data/file_writer_factory.h new file mode 100644 index 000000000..af0e99c6a --- /dev/null +++ b/src/iceberg/data/file_writer_factory.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/file_writer_factory.h +/// Factory for creating Iceberg file writers. + +#include +#include +#include +#include +#include + +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +// Forward declarations +class DataWriter; +class PositionDeleteWriter; +class EqualityDeleteWriter; + +/// \brief Factory for creating Iceberg file writers. +/// +/// \warning Thread Safety: This class is NOT thread-safe. Each FileWriterFactory instance +/// should only be used by a single thread. To use from multiple threads, create separate +/// factory instances or use external synchronization. +/// +/// \note Differences from Java FileWriterFactory: +/// - Java uses EncryptedOutputFile parameter, C++ uses path + FileIO +/// - C++ factory has state (schema, spec, io) configured once, reused for all writers +/// - Java FileWriterFactory is an interface, C++ is a concrete class with configuration +/// - C++ provides SetEqualityDeleteConfig() and SetPositionDeleteRowSchema() for +/// customization +class ICEBERG_EXPORT FileWriterFactory { + public: + FileWriterFactory(std::shared_ptr schema, std::shared_ptr spec, + std::shared_ptr io, + std::shared_ptr properties = nullptr); + ~FileWriterFactory(); + + void SetEqualityDeleteConfig(std::shared_ptr eq_delete_schema, + std::vector equality_field_ids); + void SetPositionDeleteRowSchema(std::shared_ptr pos_delete_row_schema); + + Result> NewDataWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id = std::nullopt); + + Result> NewPositionDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition); + + Result> NewEqualityDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id = std::nullopt); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc new file mode 100644 index 000000000..836b47532 --- /dev/null +++ b/src/iceberg/data/position_delete_writer.cc @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/position_delete_writer.h" + +namespace iceberg { + +//============================================================================= +// PositionDeleteWriter - stub implementation (to be completed in separate PR per #441) +//============================================================================= + +class PositionDeleteWriter::Impl { + public: + explicit Impl(PositionDeleteWriterOptions options) : options_(std::move(options)) {} + PositionDeleteWriterOptions options_; + bool is_closed_ = false; +}; + +PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} +PositionDeleteWriter::~PositionDeleteWriter() = default; + +Status PositionDeleteWriter::Write(ArrowArray* data) { + if (!data) { + return InvalidArgument("Cannot write null data"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) { + if (file_path.empty()) { + return InvalidArgument("File path cannot be empty"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Result PositionDeleteWriter::Length() const { + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Status PositionDeleteWriter::Close() { + if (impl_->is_closed_) { + return {}; // Close is idempotent + } + impl_->is_closed_ = true; + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Result PositionDeleteWriter::Metadata() { + if (!impl_->is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +// Internal factory function for FileWriterFactory +std::unique_ptr MakePositionDeleteWriterInternal( + const PositionDeleteWriterOptions& options) { + auto impl = std::make_unique(options); + return std::unique_ptr(new PositionDeleteWriter(std::move(impl))); +} + +} // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.h b/src/iceberg/data/position_delete_writer.h new file mode 100644 index 000000000..849d65224 --- /dev/null +++ b/src/iceberg/data/position_delete_writer.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/position_delete_writer.h +/// Position delete writer for Iceberg tables. + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating a PositionDeleteWriter. +/// +/// \note The following features from Java PositionDeleteWriter are not yet supported: +/// - Encryption key metadata +/// - Referenced data files tracking (CharSequenceSet referencedDataFiles) +/// - Metrics stripping for multi-file deletes +/// - Split offsets tracking +struct ICEBERG_EXPORT PositionDeleteWriterOptions { + std::string path; + std::shared_ptr schema; + std::shared_ptr spec; + PartitionValues partition; + FileFormatType format = FileFormatType::kParquet; + std::shared_ptr io; + std::shared_ptr row_schema; // Optional row data schema + std::shared_ptr properties; +}; + +/// \brief Writer for Iceberg position delete files. +/// +/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only +/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. +class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter { + public: + ~PositionDeleteWriter() override; + + Status Write(ArrowArray* data) override; + Status WriteDelete(std::string_view file_path, int64_t pos); + Result Length() const override; + Status Close() override; + Result Metadata() override; + + private: + friend class FileWriterFactory; + friend std::unique_ptr MakePositionDeleteWriterInternal( + const PositionDeleteWriterOptions&); + class Impl; + std::unique_ptr impl_; + explicit PositionDeleteWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index 6c8400911..371f123df 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -28,8 +28,8 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" -#include "iceberg/type_fwd.h" namespace iceberg { @@ -64,10 +64,21 @@ class ICEBERG_EXPORT FileWriter { virtual Status Close() = 0; /// \brief File metadata for all files produced by the writer. + /// + /// \note The following features from Java are not yet supported: + /// - Encryption key metadata (EncryptionKeyMetadata) + /// - Split offsets for data files + /// - Referenced data files tracking for position deletes struct ICEBERG_EXPORT WriteResult { - /// Usually a writer produces a single data or delete file. - /// Position delete writer may produce multiple file-scoped delete files. - /// In the future, multiple files can be produced if file rolling is supported. + /// Data files produced by this writer. + /// + /// \note When multiple files are produced: + /// - Position delete writers may produce multiple file-scoped delete files (one per + /// referenced data file) to avoid large manifests + /// - Future feature: File rolling will produce multiple files when size/record + /// thresholds + /// are exceeded + /// - Current implementation: Each writer produces exactly one file std::vector> data_files; }; diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index df7ea9d89..8ba3ab8dc 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -17,6 +17,8 @@ * under the License. */ +#include "iceberg/data/data_writer.h" + #include #include @@ -24,10 +26,19 @@ #include #include "iceberg/arrow_c_data.h" +#include "iceberg/data/equality_delete_writer.h" +#include "iceberg/data/file_writer_factory.h" +#include "iceberg/data/position_delete_writer.h" #include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" #include "iceberg/test/matchers.h" +#include "iceberg/transform.h" namespace iceberg { @@ -215,4 +226,706 @@ TEST(FileWriterTest, EmptyWriteResult) { ASSERT_TRUE(result.data_files.empty()); } +// Tests for stub implementations (methods return NotImplemented) +class WriterStubTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; // Not needed for stub tests + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(WriterStubTest, DataWriterMethodsReturnNotImplemented) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + + auto& writer = *writer_result; + ArrowArray dummy_array = {}; + + // All methods should return NotImplemented + ASSERT_THAT(writer->Write(&dummy_array), + HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Length(), HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Metadata(), HasErrorMessage("DataWriter not yet implemented")); +} + +TEST_F(WriterStubTest, PositionDeleteWriterMethodsReturnNotImplemented) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos_deletes.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + + auto& writer = *writer_result; + ArrowArray dummy_array = {}; + + // All methods should return NotImplemented + ASSERT_THAT(writer->Write(&dummy_array), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->WriteDelete("/test/file.parquet", 0), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Length(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Close(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Metadata(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); +} + +TEST_F(WriterStubTest, EqualityDeleteWriterMethodsReturnNotImplemented) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + + PartitionValues partition; + auto writer_result = factory.NewEqualityDeleteWriter( + "/test/deletes/eq_deletes.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + + auto& writer = *writer_result; + ArrowArray dummy_array = {}; + + // All methods should return NotImplemented + ASSERT_THAT(writer->Write(&dummy_array), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Length(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Close(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Metadata(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + + // equality_field_ids should return the configured value + ASSERT_EQ(writer->equality_field_ids(), std::vector({1, 2})); +} + +// Tests for FileWriterFactory +class FileWriterFactoryTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string()), + SchemaField::MakeRequired(3, "category", string())}); + + ICEBERG_UNWRAP_OR_FAIL( + spec_, PartitionSpec::Make( + 0, {PartitionField(3, 1000, "category", Transform::Identity())})); + io_ = nullptr; // FileIO not needed for stub tests + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(FileWriterFactoryTest, ConstructorWithNullProperties) { + FileWriterFactory factory(schema_, spec_, io_, nullptr); + // Should construct successfully without properties +} + +TEST_F(FileWriterFactoryTest, NewDataWriterCreatesWriter) { + FileWriterFactory factory(schema_, spec_, io_); + + PartitionValues partition; + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + + // Factory should successfully create a writer + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewDataWriterWithSortOrder) { + FileWriterFactory factory(schema_, spec_, io_); + + PartitionValues partition; + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition, + /*sort_order_id=*/1); + + // Factory should successfully create a writer + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewPositionDeleteWriterCreatesWriter) { + FileWriterFactory factory(schema_, spec_, io_); + + PartitionValues partition; + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewPositionDeleteWriterWithRowSchema) { + FileWriterFactory factory(schema_, spec_, io_); + + auto row_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + factory.SetPositionDeleteRowSchema(row_schema); + + PartitionValues partition; + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterCreatesWriter) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Must set config first + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterWithConfig) { + FileWriterFactory factory(schema_, spec_, io_); + + auto eq_delete_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + std::vector equality_field_ids = {1, 2}; + factory.SetEqualityDeleteConfig(eq_delete_schema, equality_field_ids); + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterWithSortOrder) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Must set config first + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition, + /*sort_order_id=*/1); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterUsesDefaultSchema) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(nullptr, {1, 2}); // Set field IDs but no custom schema + + // Don't set custom equality delete schema - should use default schema + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterUsesCustomSchema) { + FileWriterFactory factory(schema_, spec_, io_); + + auto custom_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + std::vector equality_field_ids = {1}; + factory.SetEqualityDeleteConfig(custom_schema, equality_field_ids); + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +// Tests for input validation +class FileWriterFactoryInputValidationTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsEmptyPath) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("", FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsNullSchema) { + FileWriterFactory factory(nullptr, spec_, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + + ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsNullSpec) { + FileWriterFactory factory(schema_, nullptr, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + + ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsEmptyPath) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("", FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsNullSchema) { + FileWriterFactory factory(nullptr, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsNullSpec) { + FileWriterFactory factory(schema_, nullptr, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsEmptyPath) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("", FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsNullSchema) { + FileWriterFactory factory(nullptr, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsNullSpec) { + FileWriterFactory factory(schema_, nullptr, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, + NewEqualityDeleteWriterRejectsEmptyFieldIds) { + FileWriterFactory factory(schema_, spec_, io_); + // Don't set equality config, so field IDs will be empty + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Equality field IDs cannot be empty")); +} + +// Tests for state management in stub writers +class WriterStubStateManagementTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(WriterStubStateManagementTest, DataWriterRejectsWriteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); + + // Now try to write - should fail because writer is closed + ArrowArray dummy_array = {}; + auto status = writer->Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, DataWriterCloseIsIdempotent) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close multiple times - second close should succeed (idempotent) + ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(WriterStubStateManagementTest, DataWriterRejectsNullData) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsWriteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + + // Try to write - should fail + ArrowArray dummy_array = {}; + auto status = writer->Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsWriteDeleteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + + // Try WriteDelete - should fail + auto status = writer->WriteDelete("/test/file.parquet", 100); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsNullData) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsEmptyFilePath) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->WriteDelete("", 100); + ASSERT_THAT(status, HasErrorMessage("File path cannot be empty")); +} + +TEST_F(WriterStubStateManagementTest, EqualityDeleteWriterRejectsWriteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto writer_result = factory.NewEqualityDeleteWriter( + "/test/deletes/eq.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + + // Try to write - should fail + ArrowArray dummy_array = {}; + auto status = writer->Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, EqualityDeleteWriterRejectsNullData) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto writer_result = factory.NewEqualityDeleteWriter( + "/test/deletes/eq.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); +} + +// Tests for different file formats +class FileWriterFactoryFileFormatTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(FileWriterFactoryFileFormatTest, DataWriterSupportsParquet) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, DataWriterSupportsAvro) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = + factory.NewDataWriter("/test/data/file.avro", FileFormatType::kAvro, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, PositionDeleteWriterSupportsParquet) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, PositionDeleteWriterSupportsAvro) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.avro", + FileFormatType::kAvro, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, EqualityDeleteWriterSupportsParquet) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, EqualityDeleteWriterSupportsAvro) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.avro", + FileFormatType::kAvro, partition); + ASSERT_THAT(result, IsOk()); +} + +// Edge case tests +class WriterEdgeCaseTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(WriterEdgeCaseTest, EqualityDeleteWriterWithSingleFieldId) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1}); // Single field ID + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); + + auto& writer = *result; + ASSERT_EQ(writer->equality_field_ids(), std::vector({1})); +} + +TEST_F(WriterEdgeCaseTest, EqualityDeleteWriterWithMultipleFieldIds) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Multiple field IDs + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); + + auto& writer = *result; + ASSERT_EQ(writer->equality_field_ids(), std::vector({1, 2})); +} + +TEST_F(WriterEdgeCaseTest, MultipleWritersFromSameFactory) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + + PartitionValues partition; + + // Create multiple writers from the same factory + auto data_writer1 = + factory.NewDataWriter("/test/data1.parquet", FileFormatType::kParquet, partition); + auto data_writer2 = + factory.NewDataWriter("/test/data2.parquet", FileFormatType::kParquet, partition); + auto pos_writer = factory.NewPositionDeleteWriter("/test/pos.parquet", + FileFormatType::kParquet, partition); + auto eq_writer = factory.NewEqualityDeleteWriter("/test/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(data_writer1, IsOk()); + ASSERT_THAT(data_writer2, IsOk()); + ASSERT_THAT(pos_writer, IsOk()); + ASSERT_THAT(eq_writer, IsOk()); +} + +TEST_F(WriterEdgeCaseTest, FactoryWithPartitionedSpec) { + auto partitioned_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "category", string()), + SchemaField::MakeRequired(3, "data", string())}); + + auto partitioned_spec_result = PartitionSpec::Make( + 0, {PartitionField(2, 1000, "category", Transform::Identity())}); + ASSERT_THAT(partitioned_spec_result, IsOk()); + auto partitioned_spec = + std::shared_ptr(std::move(*partitioned_spec_result)); + + FileWriterFactory factory(partitioned_schema, partitioned_spec, io_); + + PartitionValues partition; + partition.AddValue(Literal::String("electronics")); + + auto result = factory.NewDataWriter("/test/data/category=electronics/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(WriterEdgeCaseTest, ReconfigureEqualityDeleteConfig) { + FileWriterFactory factory(schema_, spec_, io_); + + // Set initial config + factory.SetEqualityDeleteConfig(schema_, {1}); + + PartitionValues partition; + auto writer1 = factory.NewEqualityDeleteWriter("/test/eq1.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer1, IsOk()); + ASSERT_EQ((*writer1)->equality_field_ids(), std::vector({1})); + + // Reconfigure with different field IDs + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + + auto writer2 = factory.NewEqualityDeleteWriter("/test/eq2.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer2, IsOk()); + ASSERT_EQ((*writer2)->equality_field_ids(), std::vector({1, 2})); +} + +TEST_F(WriterEdgeCaseTest, ReconfigurePositionDeleteRowSchema) { + FileWriterFactory factory(schema_, spec_, io_); + + auto row_schema1 = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + factory.SetPositionDeleteRowSchema(row_schema1); + + PartitionValues partition; + auto writer1 = factory.NewPositionDeleteWriter("/test/pos1.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer1, IsOk()); + + // Reconfigure with different row schema + auto row_schema2 = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + factory.SetPositionDeleteRowSchema(row_schema2); + + auto writer2 = factory.NewPositionDeleteWriter("/test/pos2.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer2, IsOk()); +} + } // namespace iceberg