diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ca0253458..5626e1fdd 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -40,6 +40,7 @@ set(ICEBERG_SOURCES file_writer.cc inheritable_metadata.cc json_internal.cc + location_provider.cc manifest/manifest_adapter.cc manifest/manifest_entry.cc manifest/manifest_list.cc diff --git a/src/iceberg/expression/literal.cc b/src/iceberg/expression/literal.cc index cb0a4c6d0..dce7b5663 100644 --- a/src/iceberg/expression/literal.cc +++ b/src/iceberg/expression/literal.cc @@ -488,6 +488,7 @@ std::string Literal::ToString() const { .value_or("invalid literal of type decimal"); } case TypeId::kString: { + // TODO(zhuo.wang): escape string? return "\"" + std::get(value_) + "\""; } case TypeId::kUuid: { diff --git a/src/iceberg/location_provider.cc b/src/iceberg/location_provider.cc new file mode 100644 index 000000000..d6e5cba90 --- /dev/null +++ b/src/iceberg/location_provider.cc @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/location_provider.h" + +#include "iceberg/partition_spec.h" +#include "iceberg/table_properties.h" +#include "iceberg/util/location_util.h" +#include "iceberg/util/murmurhash3_internal.h" + +namespace iceberg { + +namespace { + +constexpr uint8_t kEntropyDirMask = 0x0f; +constexpr uint8_t kRestDirMask = 0xff; +constexpr int32_t kHashBits = 20; +constexpr int32_t kEntropyDirLength = 4; +constexpr int32_t kEntropyDirDepth = 3; + +std::string DataLocation(const TableProperties& properties, + const std::string& table_location) { + auto data_location = properties.Get(TableProperties::kWriteDataLocation); + if (data_location.empty()) { + data_location = std::format("{}/data", table_location); + } + return data_location; +} + +std::string PathContext(std::string_view table_location) { + std::string path = LocationUtil::StripTrailingSlash(table_location); + + size_t last_slash = path.find_last_of('/'); + if (last_slash != std::string::npos && last_slash < path.length() - 1) { + std::string_view data_path(path.data(), path.size() - last_slash - 1); + std::string_view parent_path(path.data(), last_slash); + size_t parent_last_slash = parent_path.find_last_of('/'); + + if (parent_last_slash != std::string::npos) { + std::string_view parent_name = parent_path.substr(parent_last_slash + 1); + return std::format("{}/{}", parent_name, data_path); + } else { + return std::format("{}/{}", parent_path, data_path); + } + } + + return std::string(table_location); +} + +/// \brief Divides hash into directories for optimized orphan removal operation using +/// kEntropyDirDepth and kEntropyDirLength. +/// +/// If the low `kHashBits = 20` of `hash` is '10011001100110011001', then return +/// '1001/1001/1001/10011001' with depth 3 and length 4. +/// +/// \param hash The hash value to be divided. +/// \return The path according to the `hash` value. +std::string DirsFromHash(int32_t hash) { + std::string hash_with_dirs; + + for (int i = 0; i < kEntropyDirDepth * kEntropyDirLength; i += kEntropyDirLength) { + if (i > 0) { + hash_with_dirs += "/"; + } + uint8_t dir_bits = kEntropyDirMask & (hash >> (kHashBits - i - kEntropyDirLength)); + hash_with_dirs += std::format("{:04b}", dir_bits); + } + + hash_with_dirs += "/"; + uint8_t rest_bits = kRestDirMask & hash; + hash_with_dirs += std::format("{:08b}", rest_bits); + + return hash_with_dirs; +} + +std::string ComputeHash(std::string_view file_name) { + int32_t hash_value = 0; + MurmurHash3_x86_32(file_name.data(), file_name.size(), 0, &hash_value); + return DirsFromHash(hash_value); +} + +} // namespace + +Result> LocationProviderFactory::For( + const std::string& input_location, const TableProperties& properties) { + std::string location = LocationUtil::StripTrailingSlash(input_location); + + // Note: Not support dynamic constructor according to kWriteLocationProviderImpl + + properties.Get(TableProperties::kObjectStoreEnabled); + + if (properties.Get(TableProperties::kObjectStoreEnabled)) { + return std::make_unique(location, properties); + } else { + return std::make_unique(location, properties); + } +} + +// Implementation of DefaultLocationProvider +DefaultLocationProvider::DefaultLocationProvider(const std::string& table_location, + const TableProperties& properties) + : data_location_( + LocationUtil::StripTrailingSlash(DataLocation(properties, table_location))) {} + +std::string DefaultLocationProvider::NewDataLocation(const std::string& filename) { + return std::format("{}/{}", data_location_, filename); +} + +Result DefaultLocationProvider::NewDataLocation( + const PartitionSpec& spec, const PartitionValues& partition_data, + const std::string& filename) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_path, spec.PartitionPath(partition_data)); + return std::format("{}/{}/{}", data_location_, partition_path, filename); +} + +// Implementation of ObjectStoreLocationProvider +ObjectStoreLocationProvider::ObjectStoreLocationProvider( + const std::string& table_location, const TableProperties& properties) + : include_partition_paths_( + properties.Get(TableProperties::kWriteObjectStorePartitionedPaths)) { + storage_location_ = + LocationUtil::StripTrailingSlash(DataLocation(properties, table_location)); + + // If the storage location is within the table prefix, don't add table and database name + // context + if (!storage_location_.starts_with(table_location)) { + context_ = PathContext(table_location); + } +} + +std::string ObjectStoreLocationProvider::NewDataLocation(const std::string& filename) { + std::string hash = ComputeHash(filename); + + if (!context_.empty()) { + return std::format("{}/{}/{}/{}", storage_location_, hash, context_, filename); + } else { + // If partition paths are included, add last part of entropy as dir before partition + // names + if (include_partition_paths_) { + return std::format("{}/{}/{}", storage_location_, hash, filename); + } else { + // If partition paths are not included, append last part of entropy with `-` to file + // name + return std::format("{}/{}-{}", storage_location_, hash, filename); + } + } +} + +Result ObjectStoreLocationProvider::NewDataLocation( + const PartitionSpec& spec, const PartitionValues& partition_data, + const std::string& filename) { + if (include_partition_paths_) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_path, spec.PartitionPath(partition_data)); + return NewDataLocation(std::format("{}/{}", partition_path, filename)); + } else { + return NewDataLocation(filename); + } +} + +} // namespace iceberg diff --git a/src/iceberg/location_provider.h b/src/iceberg/location_provider.h index 90c63eb68..ffeb07d94 100644 --- a/src/iceberg/location_provider.h +++ b/src/iceberg/location_provider.h @@ -19,9 +19,11 @@ #pragma once +#include #include #include "iceberg/iceberg_export.h" +#include "iceberg/result.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -48,9 +50,57 @@ class ICEBERG_EXPORT LocationProvider { /// /// TODO(wgtmac): StructLike is not well thought yet, we may wrap an ArrowArray /// with single row in StructLike. - virtual std::string NewDataLocation(const PartitionSpec& spec, - const StructLike& partition_data, - const std::string& filename) = 0; + virtual Result NewDataLocation(const PartitionSpec& spec, + const PartitionValues& partition_data, + const std::string& filename) = 0; +}; + +class ICEBERG_EXPORT LocationProviderFactory { + public: + virtual ~LocationProviderFactory() = default; + + /// \brief Create a LocationProvider for the given table location and properties. + /// + /// \param input_location the table location + /// \param properties the table properties + /// \return a LocationProvider instance + static Result> For(const std::string& input_location, + const TableProperties& properties); +}; + +/// \brief DefaultLocationProvider privides default location provider for local file +/// system. +class ICEBERG_EXPORT DefaultLocationProvider : public LocationProvider { + public: + DefaultLocationProvider(const std::string& table_location, + const TableProperties& properties); + + std::string NewDataLocation(const std::string& filename) override; + + Result NewDataLocation(const PartitionSpec& spec, + const PartitionValues& partition_data, + const std::string& filename) override; + + private: + std::string data_location_; +}; + +/// \brief ObjectStoreLocationProvider provides location provider for object stores. +class ICEBERG_EXPORT ObjectStoreLocationProvider : public LocationProvider { + public: + ObjectStoreLocationProvider(const std::string& table_location, + const TableProperties& properties); + + std::string NewDataLocation(const std::string& filename) override; + + Result NewDataLocation(const PartitionSpec& spec, + const PartitionValues& partition_data, + const std::string& filename) override; + + private: + std::string storage_location_; + std::string context_; + bool include_partition_paths_; }; } // namespace iceberg diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 3929e1803..fa3381de6 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -61,6 +61,7 @@ iceberg_sources = files( 'file_writer.cc', 'inheritable_metadata.cc', 'json_internal.cc', + 'location_provider.cc', 'manifest/manifest_adapter.cc', 'manifest/manifest_entry.cc', 'manifest/manifest_list.cc', diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc index 3bfd0ffbb..4c364bc3d 100644 --- a/src/iceberg/partition_spec.cc +++ b/src/iceberg/partition_spec.cc @@ -26,10 +26,12 @@ #include #include #include +#include #include #include #include "iceberg/result.h" +#include "iceberg/row/partition_values.h" #include "iceberg/schema.h" #include "iceberg/schema_field.h" #include "iceberg/transform.h" @@ -98,6 +100,22 @@ Result> PartitionSpec::PartitionType( return std::make_unique(std::move(partition_fields)); } +Result PartitionSpec::PartitionPath(const PartitionValues& data) const { + std::stringstream ss; + for (int32_t i = 0; i < fields_.size(); ++i) { + ICEBERG_ASSIGN_OR_RAISE(auto value, data.ValueAt(i)); + if (i > 0) { + ss << "/"; + } + // TODO(zhuo.wang): json parse string literal? + std::string partition_value = value.get().ToString(); + // TODO(zhuo.wang): UrlEncoder::Encode for partition name and value + ss << fields_[i].name() << "=" + << std::string_view(partition_value.data() + 1, partition_value.size() - 2); + } + return ss.str(); +} + bool PartitionSpec::CompatibleWith(const PartitionSpec& other) const { if (Equals(other)) { return true; diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h index ae10dfccf..0fb8814b8 100644 --- a/src/iceberg/partition_spec.h +++ b/src/iceberg/partition_spec.h @@ -64,6 +64,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable { /// \brief Get the partition type binding to the input schema. Result> PartitionType(const Schema& schema) const; + /// \brief Get the partition path for the given partition data. + Result PartitionPath(const PartitionValues& data) const; + /// \brief Returns true if this spec is equivalent to the other, with partition field /// ids ignored. That is, if both specs have the same number of fields, field order, /// field name, source columns, and transforms. diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 731fe0af6..b54a33592 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -69,6 +69,7 @@ add_iceberg_test(schema_test add_iceberg_test(table_test SOURCES + location_provider_test.cc metrics_config_test.cc snapshot_test.cc snapshot_util_test.cc diff --git a/src/iceberg/test/location_provider_test.cc b/src/iceberg/test/location_provider_test.cc new file mode 100644 index 000000000..0e71ba7ed --- /dev/null +++ b/src/iceberg/test/location_provider_test.cc @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/location_provider.h" + +#include + +#include + +#include "iceberg/location_provider.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/table_properties.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" + +namespace iceberg { + +namespace { +// Helper function to split a string by delimiter +std::vector SplitString(const std::string& str, char delimiter) { + std::vector result; + std::stringstream ss(str); + std::string item; + + while (std::getline(ss, item, delimiter)) { + result.push_back(item); + } + + return result; +} +} // namespace + +// Test fixture for location provider tests +class LocationProviderTest : public ::testing::Test { + protected: + void SetUp() override { + // Setup a test table location and properties + table_location_ = "/test/table/location"; + properties_ = TableProperties::FromMap({}); + } + + std::string table_location_; + TableProperties properties_; +}; + +TEST_F(LocationProviderTest, DefaultLocationProvider) { + properties_ = {}; // Empty properties to use defaults + ICEBERG_UNWRAP_OR_FAIL(auto provider, + LocationProviderFactory::For(table_location_, properties_)); + + auto location = provider->NewDataLocation("my_file"); + EXPECT_EQ(std::format("{}/data/my_file", table_location_), location); +} + +TEST_F(LocationProviderTest, DefaultLocationProviderWithCustomDataLocation) { + std::ignore = + properties_.Set(TableProperties::kWriteDataLocation, std::string("new_location")); + ICEBERG_UNWRAP_OR_FAIL(auto provider, + LocationProviderFactory::For(table_location_, properties_)); + + auto location = provider->NewDataLocation("my_file"); + EXPECT_EQ("new_location/my_file", location); +} + +TEST_F(LocationProviderTest, ObjectStorageLocationProvider) { + std::ignore = properties_.Set(TableProperties::kObjectStoreEnabled, true); + ICEBERG_UNWRAP_OR_FAIL(auto provider, + LocationProviderFactory::For(table_location_, properties_)); + + auto location = provider->NewDataLocation("test.parquet"); + std::string relative_location = location; + if (relative_location.starts_with(table_location_)) { + relative_location = relative_location.substr(table_location_.size()); + } + + std::vector parts = SplitString(relative_location, '/'); + ASSERT_EQ(7, parts.size()); + EXPECT_EQ("", parts[0]); + EXPECT_EQ("data", parts[1]); + for (int i = 2; i <= 5; i++) { + EXPECT_FALSE(parts[i].empty()); + } + EXPECT_EQ("test.parquet", parts[6]); +} + +TEST_F(LocationProviderTest, ObjectStorageWithPartition) { + std::ignore = properties_.Set(TableProperties::kObjectStoreEnabled, true); + ICEBERG_UNWRAP_OR_FAIL(auto provider, + LocationProviderFactory::For(table_location_, properties_)); + + ICEBERG_UNWRAP_OR_FAIL( + auto mock_spec, + PartitionSpec::Make(PartitionSpec::kInitialSpecId, + {PartitionField(1, 1, "data#1", Transform::Identity())}, + PartitionSpec::kInvalidPartitionFieldId + 1)); + PartitionValues mock_partition_data({Literal::String("val#1")}); + ICEBERG_UNWRAP_OR_FAIL( + auto location, + provider->NewDataLocation(*mock_spec, mock_partition_data, "test.parquet")); + + std::vector parts = SplitString(location, '/'); + ASSERT_GT(parts.size(), 2); + // TODO(zhuo.wang): url encoder is not supported yet + // EXPECT_EQ("data%231=val%231", parts[parts.size() - 2]); + EXPECT_EQ("data#1=val#1", parts[parts.size() - 2]); +} + +TEST_F(LocationProviderTest, ObjectStorageExcludePartitionInPath) { + std::ignore = properties_.Set(TableProperties::kObjectStoreEnabled, true) + .Set(TableProperties::kWriteObjectStorePartitionedPaths, false); + ICEBERG_UNWRAP_OR_FAIL(auto provider, + LocationProviderFactory::For(table_location_, properties_)); + + auto location = provider->NewDataLocation("test.parquet"); + + EXPECT_THAT(location, testing::HasSubstr(table_location_)); + EXPECT_THAT(location, testing::HasSubstr("/data/")); + EXPECT_THAT(location, testing::HasSubstr("-test.parquet")); +} + +TEST_F(LocationProviderTest, HashInjection) { + std::ignore = properties_.Set(TableProperties::kObjectStoreEnabled, true); + ICEBERG_UNWRAP_OR_FAIL(auto provider, + LocationProviderFactory::For(table_location_, properties_)); + + auto location_a = provider->NewDataLocation("a"); + EXPECT_THAT(location_a, testing::EndsWith("/data/0101/0110/1001/10110010/a")); + + auto location_b = provider->NewDataLocation("b"); + EXPECT_THAT(location_b, testing::EndsWith("/data/1110/0111/1110/00000011/b")); + + auto location_c = provider->NewDataLocation("c"); + EXPECT_THAT(location_c, testing::EndsWith("/data/0010/1101/0110/01011111/c")); + + auto location_d = provider->NewDataLocation("d"); + EXPECT_THAT(location_d, testing::EndsWith("/data/1001/0001/0100/01110011/d")); +} + +} // namespace iceberg diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 378182819..33d9bba90 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -46,6 +46,7 @@ iceberg_tests = { }, 'table_test': { 'sources': files( + 'location_provider_test.cc', 'metrics_config_test.cc', 'snapshot_test.cc', 'snapshot_util_test.cc', diff --git a/src/iceberg/util/location_util.h b/src/iceberg/util/location_util.h index 547dd5cef..53bffe4dc 100644 --- a/src/iceberg/util/location_util.h +++ b/src/iceberg/util/location_util.h @@ -27,16 +27,15 @@ namespace iceberg { class ICEBERG_EXPORT LocationUtil { public: - static std::string StripTrailingSlash(const std::string& path) { + static std::string StripTrailingSlash(std::string_view path) { if (path.empty()) { return ""; } - std::string_view result = path; - while (result.ends_with("/") && !result.ends_with("://")) { - result.remove_suffix(1); + while (path.ends_with("/") && !path.ends_with("://")) { + path.remove_suffix(1); } - return std::string(result); + return std::string(path); } };