Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ set(ICEBERG_SOURCES
file_writer.cc
inheritable_metadata.cc
json_internal.cc
location_provider.cc
manifest/manifest_adapter.cc
manifest/manifest_entry.cc
manifest/manifest_list.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/expression/literal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ std::string Literal::ToString() const {
.value_or("invalid literal of type decimal");
}
case TypeId::kString: {
// TODO(zhuo.wang): escape string?
return "\"" + std::get<std::string>(value_) + "\"";
}
case TypeId::kUuid: {
Expand Down
176 changes: 176 additions & 0 deletions src/iceberg/location_provider.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/location_provider.h"

#include "iceberg/partition_spec.h"
#include "iceberg/table_properties.h"
#include "iceberg/util/location_util.h"
#include "iceberg/util/murmurhash3_internal.h"

namespace iceberg {

namespace {

constexpr uint8_t kEntropyDirMask = 0x0f;
constexpr uint8_t kRestDirMask = 0xff;
constexpr int32_t kHashBits = 20;
constexpr int32_t kEntropyDirLength = 4;
constexpr int32_t kEntropyDirDepth = 3;

std::string DataLocation(const TableProperties& properties,
const std::string& table_location) {
auto data_location = properties.Get(TableProperties::kWriteDataLocation);
if (data_location.empty()) {
data_location = std::format("{}/data", table_location);
}
return data_location;
}

std::string PathContext(std::string_view table_location) {
std::string path = LocationUtil::StripTrailingSlash(table_location);

size_t last_slash = path.find_last_of('/');
if (last_slash != std::string::npos && last_slash < path.length() - 1) {
std::string_view data_path(path.data(), path.size() - last_slash - 1);
std::string_view parent_path(path.data(), last_slash);
size_t parent_last_slash = parent_path.find_last_of('/');

if (parent_last_slash != std::string::npos) {
std::string_view parent_name = parent_path.substr(parent_last_slash + 1);
return std::format("{}/{}", parent_name, data_path);
} else {
return std::format("{}/{}", parent_path, data_path);
}
}

return std::string(table_location);
}

/// \brief Divides hash into directories for optimized orphan removal operation using
/// kEntropyDirDepth and kEntropyDirLength.
///
/// If the low `kHashBits = 20` of `hash` is '10011001100110011001', then return
/// '1001/1001/1001/10011001' with depth 3 and length 4.
///
/// \param hash The hash value to be divided.
/// \return The path according to the `hash` value.
std::string DirsFromHash(int32_t hash) {
std::string hash_with_dirs;

for (int i = 0; i < kEntropyDirDepth * kEntropyDirLength; i += kEntropyDirLength) {
if (i > 0) {
hash_with_dirs += "/";
}
uint8_t dir_bits = kEntropyDirMask & (hash >> (kHashBits - i - kEntropyDirLength));
hash_with_dirs += std::format("{:04b}", dir_bits);
}

hash_with_dirs += "/";
uint8_t rest_bits = kRestDirMask & hash;
hash_with_dirs += std::format("{:08b}", rest_bits);

return hash_with_dirs;
}

std::string ComputeHash(std::string_view file_name) {
int32_t hash_value = 0;
MurmurHash3_x86_32(file_name.data(), file_name.size(), 0, &hash_value);
return DirsFromHash(hash_value);
}

} // namespace

Result<std::unique_ptr<LocationProvider>> LocationProviderFactory::For(
const std::string& input_location, const TableProperties& properties) {
std::string location = LocationUtil::StripTrailingSlash(input_location);

// Note: Not support dynamic constructor according to kWriteLocationProviderImpl

properties.Get(TableProperties::kObjectStoreEnabled);

if (properties.Get(TableProperties::kObjectStoreEnabled)) {
return std::make_unique<ObjectStoreLocationProvider>(location, properties);
} else {
return std::make_unique<DefaultLocationProvider>(location, properties);
}
}

// Implementation of DefaultLocationProvider
DefaultLocationProvider::DefaultLocationProvider(const std::string& table_location,
const TableProperties& properties)
: data_location_(
LocationUtil::StripTrailingSlash(DataLocation(properties, table_location))) {}

std::string DefaultLocationProvider::NewDataLocation(const std::string& filename) {
return std::format("{}/{}", data_location_, filename);
}

Result<std::string> DefaultLocationProvider::NewDataLocation(
const PartitionSpec& spec, const PartitionValues& partition_data,
const std::string& filename) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_path, spec.PartitionPath(partition_data));
return std::format("{}/{}/{}", data_location_, partition_path, filename);
}

// Implementation of ObjectStoreLocationProvider
ObjectStoreLocationProvider::ObjectStoreLocationProvider(
const std::string& table_location, const TableProperties& properties)
: include_partition_paths_(
properties.Get(TableProperties::kWriteObjectStorePartitionedPaths)) {
storage_location_ =
LocationUtil::StripTrailingSlash(DataLocation(properties, table_location));

// If the storage location is within the table prefix, don't add table and database name
// context
if (!storage_location_.starts_with(table_location)) {
context_ = PathContext(table_location);
}
}

std::string ObjectStoreLocationProvider::NewDataLocation(const std::string& filename) {
std::string hash = ComputeHash(filename);

if (!context_.empty()) {
return std::format("{}/{}/{}/{}", storage_location_, hash, context_, filename);
} else {
// If partition paths are included, add last part of entropy as dir before partition
// names
if (include_partition_paths_) {
return std::format("{}/{}/{}", storage_location_, hash, filename);
} else {
// If partition paths are not included, append last part of entropy with `-` to file
// name
return std::format("{}/{}-{}", storage_location_, hash, filename);
}
}
}

Result<std::string> ObjectStoreLocationProvider::NewDataLocation(
const PartitionSpec& spec, const PartitionValues& partition_data,
const std::string& filename) {
if (include_partition_paths_) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_path, spec.PartitionPath(partition_data));
return NewDataLocation(std::format("{}/{}", partition_path, filename));
} else {
return NewDataLocation(filename);
}
}

} // namespace iceberg
56 changes: 53 additions & 3 deletions src/iceberg/location_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#pragma once

#include <memory>
#include <string>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand All @@ -48,9 +50,57 @@ class ICEBERG_EXPORT LocationProvider {
///
/// TODO(wgtmac): StructLike is not well thought yet, we may wrap an ArrowArray
/// with single row in StructLike.
virtual std::string NewDataLocation(const PartitionSpec& spec,
const StructLike& partition_data,
const std::string& filename) = 0;
virtual Result<std::string> NewDataLocation(const PartitionSpec& spec,
const PartitionValues& partition_data,
const std::string& filename) = 0;
};

class ICEBERG_EXPORT LocationProviderFactory {
public:
virtual ~LocationProviderFactory() = default;

/// \brief Create a LocationProvider for the given table location and properties.
///
/// \param input_location the table location
/// \param properties the table properties
/// \return a LocationProvider instance
static Result<std::unique_ptr<LocationProvider>> For(const std::string& input_location,
const TableProperties& properties);
};

/// \brief DefaultLocationProvider privides default location provider for local file
/// system.
class ICEBERG_EXPORT DefaultLocationProvider : public LocationProvider {
public:
DefaultLocationProvider(const std::string& table_location,
const TableProperties& properties);

std::string NewDataLocation(const std::string& filename) override;

Result<std::string> NewDataLocation(const PartitionSpec& spec,
const PartitionValues& partition_data,
const std::string& filename) override;

private:
std::string data_location_;
};

/// \brief ObjectStoreLocationProvider provides location provider for object stores.
class ICEBERG_EXPORT ObjectStoreLocationProvider : public LocationProvider {
public:
ObjectStoreLocationProvider(const std::string& table_location,
const TableProperties& properties);

std::string NewDataLocation(const std::string& filename) override;

Result<std::string> NewDataLocation(const PartitionSpec& spec,
const PartitionValues& partition_data,
const std::string& filename) override;

private:
std::string storage_location_;
std::string context_;
bool include_partition_paths_;
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ iceberg_sources = files(
'file_writer.cc',
'inheritable_metadata.cc',
'json_internal.cc',
'location_provider.cc',
'manifest/manifest_adapter.cc',
'manifest/manifest_entry.cc',
'manifest/manifest_list.cc',
Expand Down
18 changes: 18 additions & 0 deletions src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
#include <map>
#include <memory>
#include <ranges>
#include <sstream>
#include <unordered_map>
#include <utility>

#include "iceberg/result.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/transform.h"
Expand Down Expand Up @@ -98,6 +100,22 @@ Result<std::unique_ptr<StructType>> PartitionSpec::PartitionType(
return std::make_unique<StructType>(std::move(partition_fields));
}

Result<std::string> PartitionSpec::PartitionPath(const PartitionValues& data) const {
std::stringstream ss;
for (int32_t i = 0; i < fields_.size(); ++i) {
ICEBERG_ASSIGN_OR_RAISE(auto value, data.ValueAt(i));
if (i > 0) {
ss << "/";
}
// TODO(zhuo.wang): json parse string literal?
std::string partition_value = value.get().ToString();
// TODO(zhuo.wang): UrlEncoder::Encode for partition name and value
ss << fields_[i].name() << "="
<< std::string_view(partition_value.data() + 1, partition_value.size() - 2);
}
return ss.str();
}

bool PartitionSpec::CompatibleWith(const PartitionSpec& other) const {
if (Equals(other)) {
return true;
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
/// \brief Get the partition type binding to the input schema.
Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) const;

/// \brief Get the partition path for the given partition data.
Result<std::string> PartitionPath(const PartitionValues& data) const;

/// \brief Returns true if this spec is equivalent to the other, with partition field
/// ids ignored. That is, if both specs have the same number of fields, field order,
/// field name, source columns, and transforms.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_iceberg_test(schema_test

add_iceberg_test(table_test
SOURCES
location_provider_test.cc
metrics_config_test.cc
snapshot_test.cc
snapshot_util_test.cc
Expand Down
Loading
Loading