From 5e28bbb538cffc8cd0bcf012886ab924c18fa438 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 30 Dec 2025 22:07:27 +0800 Subject: [PATCH] feat: add snapshot update --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/snapshot.cc | 134 +++++++ src/iceberg/snapshot.h | 85 ++++ src/iceberg/table.h | 20 +- src/iceberg/table_metadata.cc | 173 ++++++++- src/iceberg/table_metadata.h | 2 + src/iceberg/table_properties.h | 3 + src/iceberg/table_update.cc | 4 +- src/iceberg/transaction.cc | 27 ++ src/iceberg/transaction.h | 4 +- src/iceberg/type_fwd.h | 1 + src/iceberg/update/meson.build | 1 + src/iceberg/update/pending_update.h | 1 + src/iceberg/update/snapshot_update.cc | 452 ++++++++++++++++++++++ src/iceberg/update/snapshot_update.h | 221 +++++++++++ src/iceberg/util/snapshot_util.cc | 14 + src/iceberg/util/snapshot_util_internal.h | 9 + src/iceberg/util/uuid.cc | 12 + src/iceberg/util/uuid.h | 3 + 19 files changed, 1151 insertions(+), 16 deletions(-) create mode 100644 src/iceberg/update/snapshot_update.cc create mode 100644 src/iceberg/update/snapshot_update.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index a6b836c4c..a2c5c58a3 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -81,6 +81,7 @@ set(ICEBERG_SOURCES transform_function.cc type.cc update/pending_update.cc + update/snapshot_update.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index f421e8381..e71f2f7c6 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -19,6 +19,8 @@ #include "iceberg/snapshot.h" +#include + #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" @@ -75,6 +77,39 @@ std::optional Snapshot::operation() const { return std::nullopt; } +std::optional Snapshot::FirstRowId() const { + auto it = summary.find("first-row-id"); + if (it == summary.end()) { + return std::nullopt; + } + + int64_t first_row_id = 0; + const std::string& first_row_id_str = it->second; + auto [_, ec] = + std::from_chars(first_row_id_str.data(), + first_row_id_str.data() + first_row_id_str.size(), first_row_id); + if (ec != std::errc()) { + return std::nullopt; + } + return first_row_id; +} + +std::optional Snapshot::AddedRows() const { + auto it = summary.find("added-rows"); + if (it == summary.end()) { + return std::nullopt; + } + + int64_t added_rows = 0; + const std::string& added_rows_str = it->second; + auto [_, ec] = std::from_chars( + added_rows_str.data(), added_rows_str.data() + added_rows_str.size(), added_rows); + if (ec != std::errc()) { + return std::nullopt; + } + return added_rows; +} + bool Snapshot::Equals(const Snapshot& other) const { if (this == &other) { return true; @@ -141,4 +176,103 @@ Result> CachedSnapshot::DeleteManifests( return std::span(cache.first.data() + delete_start, delete_count); } +// SnapshotRef::Builder implementation + +SnapshotRef::Builder::Builder(SnapshotRefType type, int64_t snapshot_id) + : type_(type), snapshot_id_(snapshot_id) {} + +SnapshotRef::Builder SnapshotRef::Builder::TagBuilder(int64_t snapshot_id) { + return Builder(SnapshotRefType::kTag, snapshot_id); +} + +SnapshotRef::Builder SnapshotRef::Builder::BranchBuilder(int64_t snapshot_id) { + return Builder(SnapshotRefType::kBranch, snapshot_id); +} + +SnapshotRef::Builder SnapshotRef::Builder::BuilderFor(int64_t snapshot_id, + SnapshotRefType type) { + return Builder(type, snapshot_id); +} + +SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref) { + Builder builder(ref.type(), ref.snapshot_id); + if (ref.type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref.retention); + builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep; + builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms; + builder.max_ref_age_ms_ = branch.max_ref_age_ms; + } else { + const auto& tag = std::get(ref.retention); + builder.max_ref_age_ms_ = tag.max_ref_age_ms; + } + return builder; +} + +SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref, + int64_t snapshot_id) { + Builder builder(ref.type(), snapshot_id); + if (ref.type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref.retention); + builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep; + builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms; + builder.max_ref_age_ms_ = branch.max_ref_age_ms; + } else { + const auto& tag = std::get(ref.retention); + builder.max_ref_age_ms_ = tag.max_ref_age_ms; + } + return builder; +} + +SnapshotRef::Builder& SnapshotRef::Builder::MinSnapshotsToKeep( + std::optional value) { + if (type_ == SnapshotRefType::kTag && value.has_value()) { + return AddError(ErrorKind::kInvalidArgument, + "Tags do not support setting minSnapshotsToKeep"); + } + if (value.has_value() && value.value() <= 0) { + return AddError(ErrorKind::kInvalidArgument, + "Min snapshots to keep must be greater than 0"); + } + min_snapshots_to_keep_ = value; + return *this; +} + +SnapshotRef::Builder& SnapshotRef::Builder::MaxSnapshotAgeMs( + std::optional value) { + if (type_ == SnapshotRefType::kTag && value.has_value()) { + return AddError(ErrorKind::kInvalidArgument, + "Tags do not support setting maxSnapshotAgeMs"); + } + if (value.has_value() && value.value() <= 0) { + return AddError(ErrorKind::kInvalidArgument, + "Max snapshot age must be greater than 0 ms"); + } + max_snapshot_age_ms_ = value; + return *this; +} + +SnapshotRef::Builder& SnapshotRef::Builder::MaxRefAgeMs(std::optional value) { + if (value.has_value() && value.value() <= 0) { + return AddError(ErrorKind::kInvalidArgument, + "Max reference age must be greater than 0"); + } + max_ref_age_ms_ = value; + return *this; +} + +Result SnapshotRef::Builder::Build() const { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + if (type_ == SnapshotRefType::kBranch) { + return SnapshotRef{ + .snapshot_id = snapshot_id_, + .retention = SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep_, + .max_snapshot_age_ms = max_snapshot_age_ms_, + .max_ref_age_ms = max_ref_age_ms_}}; + } else { + return SnapshotRef{.snapshot_id = snapshot_id_, + .retention = SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms_}}; + } +} + } // namespace iceberg diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index 3889607f9..206f99bc0 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -32,6 +32,7 @@ #include "iceberg/manifest/manifest_list.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/error_collector.h" #include "iceberg/util/lazy.h" #include "iceberg/util/timepoint.h" @@ -119,6 +120,67 @@ struct ICEBERG_EXPORT SnapshotRef { return lhs.Equals(rhs); } + /// \brief Builder class for constructing SnapshotRef objects + class ICEBERG_EXPORT Builder : public ErrorCollector { + public: + /// \brief Create a builder for a tag reference + /// \param snapshot_id The snapshot ID for the tag + /// \return A new Builder instance for a tag + static Builder TagBuilder(int64_t snapshot_id); + + /// \brief Create a builder for a branch reference + /// \param snapshot_id The snapshot ID for the branch + /// \return A new Builder instance for a branch + static Builder BranchBuilder(int64_t snapshot_id); + + /// \brief Create a builder from an existing SnapshotRef + /// \param ref The existing reference to copy properties from + /// \return A new Builder instance with properties from the existing ref + static Builder BuilderFrom(const SnapshotRef& ref); + + /// \brief Create a builder from an existing SnapshotRef with a new snapshot ID + /// \param ref The existing reference to copy properties from + /// \param snapshot_id The new snapshot ID to use + /// \return A new Builder instance with properties from the existing ref but new + /// snapshot ID + static Builder BuilderFrom(const SnapshotRef& ref, int64_t snapshot_id); + + /// \brief Create a builder for a specific type + /// \param snapshot_id The snapshot ID + /// \param type The type of reference (branch or tag) + /// \return A new Builder instance + static Builder BuilderFor(int64_t snapshot_id, SnapshotRefType type); + + /// \brief Set the minimum number of snapshots to keep (branch only) + /// \param value The minimum number of snapshots to keep, or nullopt for default + /// \return Reference to this builder for method chaining + Builder& MinSnapshotsToKeep(std::optional value); + + /// \brief Set the maximum snapshot age in milliseconds (branch only) + /// \param value The maximum snapshot age in milliseconds, or nullopt for default + /// \return Reference to this builder for method chaining + Builder& MaxSnapshotAgeMs(std::optional value); + + /// \brief Set the maximum reference age in milliseconds + /// \param value The maximum reference age in milliseconds, or nullopt for default + /// \return Reference to this builder for method chaining + Builder& MaxRefAgeMs(std::optional value); + + /// \brief Build the SnapshotRef + /// \return A Result containing the SnapshotRef instance, or an error if validation + /// failed + Result Build() const; + + private: + explicit Builder(SnapshotRefType type, int64_t snapshot_id); + + SnapshotRefType type_; + int64_t snapshot_id_; + std::optional min_snapshots_to_keep_; + std::optional max_snapshot_age_ms_; + std::optional max_ref_age_ms_; + }; + private: /// \brief Compare two snapshot refs for equality. bool Equals(const SnapshotRef& other) const; @@ -253,6 +315,29 @@ struct ICEBERG_EXPORT Snapshot { /// unknown. std::optional operation() const; + /// \brief The row-id of the first newly added row in this snapshot. + /// + /// All rows added in this snapshot will have a row-id assigned to them greater than + /// this value. All rows with a row-id less than this value were created in a snapshot + /// that was added to the table (but not necessarily committed to this branch) in the + /// past. + /// + /// \return the first row-id to be used in this snapshot or nullopt when row lineage + /// is not supported + std::optional FirstRowId() const; + + /// \brief The upper bound of number of rows with assigned row IDs in this snapshot. + /// + /// It can be used safely to increment the table's `next-row-id` during a commit. It + /// can be more than the number of rows added in this snapshot and include some + /// existing rows. + /// + /// This field is optional but is required when the table version supports row lineage. + /// + /// \return the upper bound of number of rows with assigned row IDs in this snapshot + /// or nullopt if the value was not stored. + std::optional AddedRows() const; + /// \brief Compare two snapshots for equality. friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) { return lhs.Equals(rhs); diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 311395856..77fe763ff 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { virtual ~Table(); - /// \brief Return the identifier of this table + /// \brief Returns the identifier of this table const TableIdentifier& name() const { return identifier_; } /// \brief Returns the UUID of the table @@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Return the schema for this table, return NotFoundError if not found Result> schema() const; - /// \brief Return a map of schema for this table + /// \brief Returns a map of schema for this table Result< std::reference_wrapper>>> schemas() const; - /// \brief Return the partition spec for this table, return NotFoundError if not found + /// \brief Returns the partition spec for this table, return NotFoundError if not found Result> spec() const; - /// \brief Return a map of partition specs for this table + /// \brief Returns a map of partition specs for this table Result>>> specs() const; - /// \brief Return the sort order for this table, return NotFoundError if not found + /// \brief Returns the sort order for this table, return NotFoundError if not found Result> sort_order() const; - /// \brief Return a map of sort order IDs to sort orders for this table + /// \brief Returns a map of sort order IDs to sort orders for this table Result>>> sort_orders() const; - /// \brief Return a map of string properties for this table + /// \brief Returns the properties of this table const TableProperties& properties() const; - /// \brief Return the table's metadata file location + /// \brief Returns the table's metadata file location std::string_view metadata_file_location() const; - /// \brief Return the table's base location + /// \brief Returns the table's base location std::string_view location() const; /// \brief Returns the time when this table was last updated TimePointMs last_updated_ms() const; - /// \brief Return the table's current snapshot, return NotFoundError if not found + /// \brief Returns the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; /// \brief Get the snapshot of this table with the given id diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 851048b30..9672a37c2 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -277,6 +277,10 @@ Result> TableMetadata::SnapshotById(int64_t snapshot_i return *iter; } +int64_t TableMetadata::NextSequenceNumber() const { + return format_version > 1 ? last_sequence_number + 1 : kInitialSequenceNumber; +} + namespace { template @@ -555,6 +559,10 @@ class TableMetadataBuilder::Impl { sort_orders_by_id_.emplace(order->order_id(), order); } + for (const auto& snapshot : metadata_.snapshots) { + snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot); + } + metadata_.last_updated_ms = kInvalidLastUpdatedMs; } @@ -591,6 +599,9 @@ class TableMetadataBuilder::Impl { Status RemoveSchemas(const std::unordered_set& schema_ids); Result AddSchema(const Schema& schema, int32_t new_last_column_id); void SetLocation(std::string_view location); + Status AddSnapshot(std::shared_ptr snapshot); + Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch); + Status SetRef(const std::string& name, std::shared_ptr ref); Result> Build(); @@ -634,6 +645,7 @@ class TableMetadataBuilder::Impl { std::unordered_map> schemas_by_id_; std::unordered_map> specs_by_id_; std::unordered_map> sort_orders_by_id_; + std::unordered_map> snapshots_by_id_; }; Status TableMetadataBuilder::Impl::AssignUUID(std::string_view uuid) { @@ -982,6 +994,158 @@ void TableMetadataBuilder::Impl::SetLocation(std::string_view location) { changes_.push_back(std::make_unique(std::string(location))); } +Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { + if (snapshot == nullptr) { + // change is a noop + return {}; + } + ICEBERG_CHECK(!metadata_.schemas.empty(), + "Attempting to add a snapshot before a schema is added"); + ICEBERG_CHECK(!metadata_.partition_specs.empty(), + "Attempting to add a snapshot before a partition spec is added"); + ICEBERG_CHECK(!metadata_.sort_orders.empty(), + "Attempting to add a snapshot before a sort order is added"); + + ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id), + "Snapshot already exists for id: {}", snapshot->snapshot_id); + + ICEBERG_CHECK( + metadata_.format_version == 1 || + snapshot->sequence_number > metadata_.last_sequence_number || + !snapshot->parent_snapshot_id.has_value(), + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot->sequence_number, metadata_.last_sequence_number); + + metadata_.last_updated_ms = snapshot->timestamp_ms; + metadata_.last_sequence_number = snapshot->sequence_number; + + metadata_.snapshots.push_back(snapshot); + snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot); + + changes_.push_back(std::make_unique(snapshot)); + + // Handle row lineage for format version >= 3 + if (metadata_.format_version >= TableMetadata::kMinFormatVersionRowLineage) { + auto first_row_id = snapshot->FirstRowId(); + ICEBERG_CHECK(first_row_id.has_value(), + "Cannot add a snapshot: first-row-id is null"); + ICEBERG_CHECK( + first_row_id.value() >= metadata_.next_row_id, + "Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}", + first_row_id.value(), metadata_.next_row_id); + + metadata_.next_row_id += snapshot->AddedRows().value_or(0); + } + + return {}; +} + +Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id, + const std::string& branch) { + // Check if ref already exists with the same snapshot ID + auto ref_it = metadata_.refs.find(branch); + if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id == snapshot_id) { + return {}; + } + + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_PRECHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", branch, snapshot_id); + const auto& snapshot = snapshot_it->second; + + // If ref exists, validate it's a branch and check if snapshot ID matches + if (ref_it != metadata_.refs.end()) { + const auto& ref = ref_it->second; + ICEBERG_CHECK(ref->type() == SnapshotRefType::kBranch, + "Cannot update branch: {} is a tag", branch); + if (ref->snapshot_id == snapshot_id) { + return {}; + } + } + + ICEBERG_CHECK( + metadata_.format_version == 1 || + snapshot->sequence_number <= metadata_.last_sequence_number, + "Last sequence number {} is less than existing snapshot sequence number {}", + metadata_.last_sequence_number, snapshot->sequence_number); + + // Create new ref: either from existing ref or create new branch ref + std::shared_ptr new_ref; + if (ref_it != metadata_.refs.end()) { + ICEBERG_ASSIGN_OR_RAISE( + auto ref_result, + SnapshotRef::Builder::BuilderFrom(*ref_it->second, snapshot_id).Build()); + new_ref = std::make_shared(std::move(ref_result)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto ref_result, + SnapshotRef::Builder::BranchBuilder(snapshot_id).Build()); + new_ref = std::make_shared(std::move(ref_result)); + } + + return SetRef(branch, std::move(new_ref)); +} + +Status TableMetadataBuilder::Impl::SetRef(const std::string& name, + std::shared_ptr ref) { + auto existing_ref_it = metadata_.refs.find(name); + if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second == *ref) { + return {}; + } + + int64_t snapshot_id = ref->snapshot_id; + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", name, snapshot_id); + const auto& snapshot = snapshot_it->second; + + // If snapshot was added in this set of changes, update last_updated_ms + if (std::ranges::any_of(changes_, [snapshot_id](const auto& change) { + if (change->kind() != TableUpdate::Kind::kAddSnapshot) { + return false; + } + const auto* add_snapshot = + internal::checked_cast(change.get()); + return add_snapshot->snapshot()->snapshot_id == snapshot_id; + })) { + metadata_.last_updated_ms = snapshot->timestamp_ms; + } + + // If it's MAIN_BRANCH, update currentSnapshotId and add to snapshotLog + if (name == SnapshotRef::kMainBranch) { + metadata_.current_snapshot_id = ref->snapshot_id; + if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) { + metadata_.last_updated_ms = + TimePointMs{std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch())}; + } + metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms, ref->snapshot_id); + } + + // Set the ref + metadata_.refs[name] = ref; + + // Extract properties from SnapshotRef for the change + std::optional min_snapshots_to_keep = std::nullopt; + std::optional max_snapshot_age_ms = std::nullopt; + std::optional max_ref_age_ms = std::nullopt; + + if (ref->type() == SnapshotRefType::kBranch) { + const auto& branch = std::get(ref->retention); + min_snapshots_to_keep = branch.min_snapshots_to_keep; + max_snapshot_age_ms = branch.max_snapshot_age_ms; + max_ref_age_ms = branch.max_ref_age_ms; + } else { + const auto& tag = std::get(ref->retention); + max_ref_age_ms = tag.max_ref_age_ms; + } + + changes_.push_back(std::make_unique( + name, ref->snapshot_id, ref->type(), min_snapshots_to_keep, max_snapshot_age_ms, + max_ref_age_ms)); + + return {}; +} + Result> TableMetadataBuilder::Impl::Build() { // 1. Validate metadata consistency through TableMetadata#Validate @@ -1207,17 +1371,20 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder( TableMetadataBuilder& TableMetadataBuilder::AddSnapshot( std::shared_ptr snapshot) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id, const std::string& branch) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id, branch)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name, std::shared_ptr ref) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref))); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) { diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 3e3eb9c70..f402b680c 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -145,6 +145,8 @@ struct ICEBERG_EXPORT TableMetadata { Result> Snapshot() const; /// \brief Get the snapshot of this table with the given id Result> SnapshotById(int64_t snapshot_id) const; + /// \brief Get the next sequence number for + int64_t NextSequenceNumber() const; ICEBERG_EXPORT friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs); diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index feb4a2001..e9599bcdd 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -244,6 +244,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { inline static Entry kDeleteTargetFileSizeBytes{ "write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024}; // 64 MB + inline static Entry kSnapshotIdInheritanceEnabled{ + "compatibility.snapshot-id-inheritance.enabled", false}; + // Garbage collection properties inline static Entry kGcEnabled{"gc.enabled", true}; diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 91578977c..5697fcade 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -134,7 +134,7 @@ void SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) cons // AddSnapshot void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.AddSnapshot(snapshot_); } void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const { @@ -162,7 +162,7 @@ void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const // SetSnapshotRef void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetBranchSnapshot(snapshot_id_, ref_name_); } void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 6641a1afd..511ba622e 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -29,6 +29,7 @@ #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" @@ -113,6 +114,23 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->SetCurrentSchema(std::move(result.schema), result.new_last_column_id); } break; + case PendingUpdate::Kind::kUpdateSnapshot: { + auto& update_snapshot = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply()); + if (metadata_builder_->current() + .SnapshotById(result.snapshot->snapshot_id) + .has_value()) { + metadata_builder_->SetBranchSnapshot(result.snapshot->snapshot_id, + result.target_branch); + } else if (result.stage_only) { + metadata_builder_->AddSnapshot(result.snapshot); + } else { + // Normal commit - add snapshot first, then set as branch snapshot + metadata_builder_->AddSnapshot(result.snapshot); + metadata_builder_->SetBranchSnapshot(result.snapshot->snapshot_id, + result.target_branch); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -158,6 +176,15 @@ Result> Transaction::Commit() { ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog()->UpdateTable( table_->name(), requirements, updates)); + for (const auto& update : pending_updates_) { + if (auto update_ptr = update.lock()) { + if (auto snapshot_update = + internal::checked_cast(update_ptr.get())) { + std::ignore = snapshot_update->Finalize(); + } + } + } + // Mark as committed and update table reference committed_ = true; table_ = std::move(updated_table); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index ea918a173..3008d105f 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -81,7 +81,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this +#include +#include + +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/rolling_manifest_writer.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + target_manifest_size_bytes_ = + transaction_->current().properties.Get(TableProperties::kManifestTargetSizeBytes); + + // For format version 1, check if snapshot ID inheritance is enabled + if (transaction_->current().format_version == 1) { + can_inherit_snapshot_id_ = transaction_->current().properties.Get( + TableProperties::kSnapshotIdInheritanceEnabled); + } + + // Generate commit UUID + commit_uuid_ = Uuid::GenerateV7().ToString(); + + // Initialize delete function if not set + if (!delete_func_) { + delete_func_ = [this](const std::string& path) { + return transaction_->table()->io()->DeleteFile(path); + }; + } +} + +Result> SnapshotUpdate::WriteDataManifests( + const std::vector& data_files, const std::shared_ptr& spec) { + if (data_files.empty()) { + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, transaction_->current().Schema()); + + int8_t format_version = transaction_->current().format_version; + std::optional snapshot_id = + snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt; + + // Create factory function for rolling manifest writer + RollingManifestWriter::ManifestWriterFactory factory = + [this, spec, current_schema, format_version, + snapshot_id]() -> Result> { + std::string manifest_path = ManifestPath(); + + if (format_version == 1) { + return ManifestWriter::MakeV1Writer( + snapshot_id, manifest_path, transaction_->table()->io(), spec, current_schema); + } else if (format_version == 2) { + return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kData); + } else { // format_version == 3 + std::optional first_row_id = + transaction_->table()->metadata()->next_row_id; + return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kData); + } + }; + + // Create rolling manifest writer + RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_); + + // Write all files + for (const auto& file : data_files) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(std::make_shared(file))); + } + + // Close the rolling writer + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + + // Get all manifest files + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, rolling_writer.ToManifestFiles()); + + return manifest_files; +} + +Result> SnapshotUpdate::WriteDeleteManifests( + const std::vector& delete_files, + const std::shared_ptr& spec) { + if (delete_files.empty()) { + return std::vector{}; + } + + int8_t format_version = transaction_->current().format_version; + if (format_version < 2) { + // Delete manifests are only supported in format version 2+ + return std::vector{}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto current_schema, transaction_->current().Schema()); + + std::optional snapshot_id = + snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt; + + // Create factory function for rolling manifest writer + RollingManifestWriter::ManifestWriterFactory factory = + [this, spec, current_schema, format_version, + snapshot_id]() -> Result> { + std::string manifest_path = ManifestPath(); + + if (format_version == 2) { + return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kDeletes); + } else { // format_version == 3 + std::optional first_row_id = + transaction_->table()->metadata()->next_row_id; + return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id, manifest_path, + transaction_->table()->io(), spec, + current_schema, ManifestContent::kDeletes); + } + }; + + // Create rolling manifest writer + RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_); + + // Write all delete files + for (const auto& file : delete_files) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(std::make_shared(file))); + } + + // Close the rolling writer + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + + // Get all manifest files + ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, rolling_writer.ToManifestFiles()); + + return manifest_files; +} + +int64_t SnapshotUpdate::SnapshotId() { + if (snapshot_id_.has_value()) { + return *snapshot_id_; + } + snapshot_id_ = SnapshotUtil::GenerateSnapshotId(transaction_->current()); + return *snapshot_id_; +} + +Result SnapshotUpdate::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + // Get the latest snapshot for the target branch + std::shared_ptr parent_snapshot; + std::optional parent_snapshot_id; + auto parent_snapshot_result = + SnapshotUtil::LatestSnapshot(transaction_->current(), target_branch_); + if (!parent_snapshot_result.has_value()) [[unlikely]] { + if (parent_snapshot_result.error().kind == ErrorKind::kNotFound) { + parent_snapshot_id = std::nullopt; + } + return std::unexpected(parent_snapshot_result.error()); + } else { + parent_snapshot = *parent_snapshot_result; + parent_snapshot_id = parent_snapshot->snapshot_id; + } + int64_t sequence_number = transaction_->current().NextSequenceNumber(); + + ICEBERG_RETURN_UNEXPECTED(Validate(transaction_->current(), parent_snapshot)); + + std::vector manifests = Apply(transaction_->current(), parent_snapshot); + + std::string manifest_list_path = ManifestListPath(); + manifest_lists_.push_back(manifest_list_path); + + // Create manifest list writer based on format version + int8_t format_version = transaction_->current().format_version; + int64_t snapshot_id = SnapshotId(); + std::unique_ptr writer; + + if (format_version == 1) { + ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV1Writer( + snapshot_id, parent_snapshot_id, + manifest_list_path, transaction_->table()->io())); + } else if (format_version == 2) { + ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV2Writer( + snapshot_id, parent_snapshot_id, sequence_number, + manifest_list_path, transaction_->table()->io())); + } else { // format_version == 3 + int64_t first_row_id = transaction_->current().next_row_id; + ICEBERG_ASSIGN_OR_RAISE( + writer, ManifestListWriter::MakeV3Writer( + snapshot_id, parent_snapshot_id, sequence_number, first_row_id, + manifest_list_path, transaction_->table()->io())); + } + + ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + // Get nextRowId and assignedRows for format version 3 + std::optional next_row_id; + std::optional assigned_rows; + if (format_version >= 3) { + next_row_id = transaction_->current().next_row_id; + if (writer->next_row_id().has_value()) { + assigned_rows = writer->next_row_id().value() - next_row_id.value(); + } + } + + std::unordered_map summary = Summary(); + std::string op = operation(); + + if (!op.empty() && op == DataOperation::kReplace) { + auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords); + auto replaced_records_it = summary.find(SnapshotSummaryFields::kDeletedRecords); + if (added_records_it != summary.end() && replaced_records_it != summary.end()) { + int64_t added_records = 0; + int64_t replaced_records = 0; + auto [_, ec1] = std::from_chars( + added_records_it->second.data(), + added_records_it->second.data() + added_records_it->second.size(), + added_records); + auto [__, ec2] = std::from_chars( + replaced_records_it->second.data(), + replaced_records_it->second.data() + replaced_records_it->second.size(), + replaced_records); + if (ec1 == std::errc() && ec2 == std::errc() && added_records > replaced_records) { + return InvalidArgument( + "Invalid REPLACE operation: {} added records > {} replaced records", + added_records, replaced_records); + } + } + } + + summary = ComputeSummary(transaction_->current()); + + // Get current time + auto now = std::chrono::system_clock::now(); + auto duration_since_epoch = now.time_since_epoch(); + TimePointMs timestamp_ms = std::chrono::time_point_cast( + std::chrono::system_clock::time_point(duration_since_epoch)); + + // Get schema ID + std::optional schema_id = transaction_->current().current_schema_id; + + // Create snapshot + staged_snapshot_ = + std::make_shared(Snapshot{.snapshot_id = snapshot_id, + .parent_snapshot_id = parent_snapshot_id, + .sequence_number = sequence_number, + .timestamp_ms = timestamp_ms, + .manifest_list = manifest_list_path, + .summary = std::move(summary), + .schema_id = schema_id}); + + // Return the new snapshot + return ApplyResult{.snapshot = staged_snapshot_, + .target_branch = target_branch_, + .stage_only = stage_only_}; +} + +Status SnapshotUpdate::Finalize() { + // Cleanup after successful commit + if (cleanup_after_commit()) { + auto cached_snapshot = CachedSnapshot(*staged_snapshot_); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, + cached_snapshot.Manifests(transaction_->table()->io())); + std::unordered_set manifest_paths; + for (const auto& manifest : manifests) { + manifest_paths.insert(manifest.manifest_path); + } + CleanUncommitted(manifest_paths); + // Clean up unused manifest lists + for (const auto& manifest_list : manifest_lists_) { + if (manifest_list != staged_snapshot_->manifest_list) { + std::ignore = DeleteFile(manifest_list); + } + } + } + + return {}; +} + +void SnapshotUpdate::SetTargetBranch(const std::string& branch) { + if (branch.empty()) { + AddError(ErrorKind::kInvalidArgument, "Invalid branch name: empty"); + return; + } + + auto ref_it = transaction_->current().refs.find(branch); + if (ref_it != transaction_->current().refs.end()) { + if (ref_it->second->type() != SnapshotRefType::kBranch) { + AddError( + ErrorKind::kInvalidArgument, + "{} is a tag, not a branch. Tags cannot be targets for producing snapshots", + branch); + return; + } + } + + target_branch_ = branch; +} + +std::unordered_map SnapshotUpdate::ComputeSummary( + const TableMetadata& previous) { + std::unordered_map summary = Summary(); + + if (summary.empty()) { + return std::unordered_map{}; + } + + // Get previous summary from the target branch + std::unordered_map previous_summary; + if (auto ref_it = previous.refs.find(target_branch_); ref_it != previous.refs.end()) { + auto snapshot_result = previous.SnapshotById(ref_it->second->snapshot_id); + if (snapshot_result.has_value() && (*snapshot_result)->summary.size() > 0) { + previous_summary = (*snapshot_result)->summary; + } + } + + // If no previous summary, initialize with zeros + if (previous_summary.empty()) { + previous_summary[SnapshotSummaryFields::kTotalRecords] = "0"; + previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0"; + previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0"; + previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0"; + } + + // Update totals + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalRecords, + SnapshotSummaryFields::kAddedRecords, + SnapshotSummaryFields::kDeletedRecords); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalFileSize, + SnapshotSummaryFields::kAddedFileSize, + SnapshotSummaryFields::kRemovedFileSize); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles, + SnapshotSummaryFields::kAddedDataFiles, + SnapshotSummaryFields::kDeletedDataFiles); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalDeleteFiles, + SnapshotSummaryFields::kAddedDeleteFiles, + SnapshotSummaryFields::kRemovedDeleteFiles); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalPosDeletes, + SnapshotSummaryFields::kAddedPosDeletes, + SnapshotSummaryFields::kRemovedPosDeletes); + UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes, + SnapshotSummaryFields::kAddedEqDeletes, + SnapshotSummaryFields::kRemovedEqDeletes); + + return summary; +} + +void SnapshotUpdate::CleanAll() { + for (const auto& manifest_list : manifest_lists_) { + DeleteFile(manifest_list); + } + manifest_lists_.clear(); + // Pass empty set - subclasses will implement CleanUncommitted + CleanUncommitted(std::unordered_set{}); +} + +Status SnapshotUpdate::DeleteFile(const std::string& path) { return delete_func_(path); } + +std::string SnapshotUpdate::ManifestListPath() { + // Generate manifest list path + // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro + int64_t snapshot_id = SnapshotId(); + std::string filename = std::format("snap-{}-{}-{}.avro", snapshot_id, + attempt_.fetch_add(1) + 1, commit_uuid_); + return std::format("{}/metadata/{}", transaction_->table()->location(), filename); +} + +std::string SnapshotUpdate::ManifestPath() { + // Generate manifest path + // Format: {metadata_location}/{uuid}-m{manifest_count}.avro + int32_t count = manifest_count_.fetch_add(1); + std::string filename = std::format("{}-m{}.avro", commit_uuid_, count); + return std::format("{}/metadata/{}", transaction_->table()->location(), filename); +} + +void SnapshotUpdate::UpdateTotal( + std::unordered_map& summary, + const std::unordered_map& previous_summary, + const std::string& total_property, const std::string& added_property, + const std::string& deleted_property) { + auto total_it = previous_summary.find(total_property); + if (total_it != previous_summary.end()) { + int64_t new_total; + auto [_, ec] = + std::from_chars(total_it->second.data(), + total_it->second.data() + total_it->second.size(), new_total); + if (ec != std::errc()) [[unlikely]] { + // Ignore and do not add total + return; + } + + auto added_it = summary.find(added_property); + if (new_total >= 0 && added_it != summary.end()) { + int64_t added_value; + auto [_, ec] = + std::from_chars(added_it->second.data(), + added_it->second.data() + added_it->second.size(), added_value); + if (ec == std::errc()) [[unlikely]] { + new_total += added_value; + } + } + + auto deleted_it = summary.find(deleted_property); + if (new_total >= 0 && deleted_it != summary.end()) { + int64_t deleted_value; + auto [_, ec] = std::from_chars( + deleted_it->second.data(), + deleted_it->second.data() + deleted_it->second.size(), deleted_value); + if (ec == std::errc()) [[unlikely]] { + new_total -= deleted_value; + } + } + + if (new_total >= 0) { + summary[total_property] = std::to_string(new_total); + } + } +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h new file mode 100644 index 000000000..7d51ce6da --- /dev/null +++ b/src/iceberg/update/snapshot_update.h @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/catalog.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief Base class for operations that produce snapshots. +/// +/// This class provides common functionality for creating new snapshots, +/// including manifest list writing, commit retries, and cleanup. +/// +class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { + public: + /// \brief Result of applying a snapshot update + struct ApplyResult { + /// \brief The new snapshot + std::shared_ptr snapshot; + std::string target_branch; + bool stage_only = false; + }; + + ~SnapshotUpdate() override = default; + + /// \brief Set a callback to delete files instead of the table's default. + /// + /// \param delete_func A function used to delete file locations + /// \return Reference to this for method chaining + /// \tparam T The concrete subclass type + template + requires std::is_base_of_v + T& DeleteWith(std::function delete_func) { + delete_func_ = std::move(delete_func); + return static_cast(*this); + } + + /// \brief Stage a snapshot in table metadata, but not update the current snapshot id. + /// + /// \return Reference to this for method chaining + /// \tparam T The concrete subclass type + template + requires std::is_base_of_v + T& StageOnly() { + stage_only_ = true; + return static_cast(*this); + } + + /// \brief Apply the update's changes to create a new snapshot. + /// + /// This method validates the changes, applies them to the metadata, + /// and creates a new snapshot without committing it. The snapshot + /// is stored internally and can be accessed after Apply() succeeds. + /// + /// \return A result containing the new snapshot, or an error + Result Apply(); + + /// \brief Finalizes the snapshot, cleaning up any uncommitted files. + /// + /// \return Status indicating success or failure + Status Finalize(); + + protected: + explicit SnapshotUpdate(std::shared_ptr transaction); + + /// \brief Write data manifests for the given data files + /// + /// \param data_files The data files to write + /// \param spec The partition spec to use + /// \return A vector of manifest files + Result> WriteDataManifests( + const std::vector& data_files, + const std::shared_ptr& spec); + + /// \brief Write delete manifests for the given delete files + /// + /// \param delete_files The delete files to write + /// \param spec The partition spec to use + /// \return A vector of manifest files + Result> WriteDeleteManifests( + const std::vector& delete_files, + const std::shared_ptr& spec); + + void SetTargetBranch(const std::string& branch); + + const std::string& target_branch() const { return target_branch_; } + + bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } + + int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } + + const std::string& commit_uuid() const { return commit_uuid_; } + + int32_t attempt() const { return attempt_.load(); } + + int32_t manifest_count() const { return manifest_count_.load(); } + + /// \brief Clean up any uncommitted manifests that were created. + /// + /// Manifests may not be committed if apply is called multiple times + /// because a commit conflict has occurred. Implementations may keep + /// around manifests because the same changes will be made by both + /// apply calls. This method instructs the implementation to clean up + /// those manifests and passes the paths of the manifests that were + /// actually committed. + /// + /// \param committed A set of manifest paths that were actually committed + virtual void CleanUncommitted(const std::unordered_set& committed) = 0; + + /// \brief A string that describes the action that produced the new snapshot. + /// + /// \return A string operation name + virtual std::string operation() = 0; + + /// \brief Validate the current metadata. + /// + /// Child operations can override this to add custom validation. + /// + /// \param current_metadata Current table metadata to validate + /// \param snapshot Ending snapshot on the lineage which is being validated + virtual Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) = 0; + + /// \brief Apply the update's changes to the given metadata and snapshot. + /// + /// \param metadata_to_update The base table metadata to apply changes to + /// \param snapshot Snapshot to apply the changes to + /// \return A vector of manifest files for the new snapshot + virtual std::vector Apply(const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) = 0; + + /// \brief Get the summary map for this operation. + /// + /// \return A map of summary properties + virtual std::unordered_map Summary() = 0; + + /// \brief Check if cleanup should happen after commit + /// + /// \return True if cleanup should happen after commit + virtual bool cleanup_after_commit() const { return true; } + + int64_t SnapshotId(); + + private: + /// \brief Returns the snapshot summary from the implementation and updates totals. + std::unordered_map ComputeSummary( + const TableMetadata& previous); + + /// \brief Clean up all uncommitted files + void CleanAll(); + + /// \brief Delete a file using the configured delete function + Status DeleteFile(const std::string& path); + + /// \brief Get the path for a manifest list file + std::string ManifestListPath(); + + /// \brief Get the path for a manifest file + std::string ManifestPath(); + + /// \brief Update a total property in the summary + void UpdateTotal(std::unordered_map& summary, + const std::unordered_map& previous_summary, + const std::string& total_property, const std::string& added_property, + const std::string& deleted_property); + + int32_t format_version_; + std::shared_ptr spec_; + std::shared_ptr schema_; + + std::function delete_func_; + bool stage_only_ = false; + std::string target_branch_ = std::string(SnapshotRef::kMainBranch); + + std::optional snapshot_id_{std::nullopt}; + std::atomic attempt_{0}; + std::atomic manifest_count_{0}; + std::vector manifest_lists_; + std::string commit_uuid_; + std::shared_ptr staged_snapshot_; + + int64_t target_manifest_size_bytes_; + // For format version > 1, inheritance is enabled by default + bool can_inherit_snapshot_id_{true}; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index e76426f3c..0b9a47f25 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -26,6 +26,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" +#include "iceberg/util/uuid.h" namespace iceberg { @@ -320,4 +321,17 @@ Result> SnapshotUtil::LatestSnapshot( return metadata.SnapshotById(it->second->snapshot_id); } +int64_t SnapshotUtil::GenerateSnapshotId() { + auto uuid = Uuid::GenerateV7(); + return (uuid.highbits() ^ uuid.lowbits()) & std::numeric_limits::max(); +} + +int64_t SnapshotUtil::GenerateSnapshotId(const TableMetadata& metadata) { + auto snapshot_id = GenerateSnapshotId(); + while (metadata.SnapshotById(snapshot_id).has_value()) { + snapshot_id = GenerateSnapshotId(); + } + return snapshot_id; +} + } // namespace iceberg diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index e0d8830ff..4d6012f24 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -247,6 +247,15 @@ class ICEBERG_EXPORT SnapshotUtil { static Result> LatestSnapshot(const TableMetadata& metadata, const std::string& branch); + /// \brief Generate a new snapshot ID. + static int64_t GenerateSnapshotId(); + + /// \brief Generate a new snapshot ID for the given metadata. + /// + /// \param metadata The table metadata + /// \return A new snapshot ID + static int64_t GenerateSnapshotId(const TableMetadata& metadata); + private: /// \brief Helper function to traverse ancestors of a snapshot. /// diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc index 9322deb93..8fea859c2 100644 --- a/src/iceberg/util/uuid.cc +++ b/src/iceberg/util/uuid.cc @@ -217,4 +217,16 @@ std::string Uuid::ToString() const { data_[15]); } +int64_t Uuid::highbits() const { + int64_t result; + std::memcpy(&result, data_.data(), 8); + return result; +} + +int64_t Uuid::lowbits() const { + int64_t result; + std::memcpy(&result, data_.data() + 8, 8); + return result; +} + } // namespace iceberg diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h index 64db7c5d6..923736ecc 100644 --- a/src/iceberg/util/uuid.h +++ b/src/iceberg/util/uuid.h @@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable { return lhs.data_ == rhs.data_; } + int64_t highbits() const; + int64_t lowbits() const; + private: std::array data_; };