Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@
[submodule "cpp/third-party/googletest"]
path = cpp/third-party/googletest
url = git@github.com:google/googletest.git
[submodule "cpp/FlameGraph"]
path = cpp/FlameGraph
url = git@github.com:brendangregg/FlameGraph.git
[submodule "cpp/third-party/flatbuffers"]
path = cpp/third-party/flatbuffers
url = https://github.com/google/flatbuffers.git
5 changes: 5 additions & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ plot
**/cmake-build-debug
**/CMakeCache.txt
**/CMakeFiles
# remove perf svg
cpp/testcase/perf-*/*.svg/*.svg
*.csv
*.txt
*.svg
26 changes: 12 additions & 14 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
cmake_minimum_required(VERSION 3.19)
set(CMAKE_CXX_STANDARD 14)
# Set extension name here

set(CMAKE_CXX_STANDARD 20)
set(TARGET_NAME pixels)
set(DCMAKE_EXPORT_COMPILE_COMMANDS=1)
set(EXTENSION_NAME ${TARGET_NAME}_extension)
project(${TARGET_NAME})

add_definitions(-DDUCKDB_EXTENSION_LIBRARY)

include_directories(include)
include_directories(${CMAKE_CURRENT_BINARY_DIR})

set(EXTENSION_SOURCES
pixels-duckdb/pixels_extension.cpp
pixels-duckdb/PixelsScanFunction.cpp
)
add_library(${EXTENSION_NAME} STATIC ${EXTENSION_SOURCES})

find_package(Protobuf REQUIRED)
include_directories(${Protobuf_INCLUDE_DIRS})

include_directories(${CMAKE_CURRENT_BINARY_DIR})

add_subdirectory(pixels-common)
add_subdirectory(pixels-core)
Expand All @@ -29,18 +27,18 @@ include_directories(pixels-core/include)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_BINARY_DIR}/pixels-common/liburing/src/include)

target_link_libraries(
${EXTENSION_NAME}
build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})
set(PARAMETERS "-warnings")
build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES})

target_link_libraries(${EXTENSION_NAME}
pixels-common
pixels-core
)

# Add the subdirectory that contains the build_loadable_extension definition

set(PARAMETERS "-warnings")
build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES})

message("duckdb export set: ${DUCKDB_EXPORT_SET}" )
message("TARGET NAME: ${TARGET_NAME} EXTENSION NAME: ${EXTENSION_NAME}")

install(
TARGETS ${EXTENSION_NAME} pixels-core pixels-common
Expand Down
1 change: 1 addition & 0 deletions cpp/FlameGraph
Submodule FlameGraph added at 41fee1
52 changes: 26 additions & 26 deletions cpp/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: all clean debug release pull update deps
.PHONY: all clean debug release pull update fb-release fb-debug

all: release

Expand All @@ -14,21 +14,22 @@ ifeq (${STATIC_LIBCPP}, 1)
endif

ifeq ($(GEN),ninja)
GENERATOR=-G "Ninja"
FORCE_COLOR=-DFORCE_COLORED_OUTPUT=1
GENERATOR=-G "Ninja"
FORCE_COLOR=-DFORCE_COLORED_OUTPUT=1
endif

PROTOBUF_DIR=third-party/protobuf
# remove protobuf, use flatbuffer instead
BUILD_FLAGS=-DEXTENSION_STATIC_BUILD=1 -DBUILD_TPCH_EXTENSION=1 -DBUILD_BENCHMARKS=1 -DBUILD_PARQUET_EXTENSION=1 \
${OSX_BUILD_UNIVERSAL_FLAG} ${STATIC_LIBCPP}

CLIENT_FLAGS :=
PIXELS_BASE_DIR := $(shell dirname $(shell pwd))

# These flags will make DuckDB build the extension

EXTENSION_FLAGS=-DDUCKDB_EXTENSION_NAMES="pixels" -DDUCKDB_EXTENSION_PIXELS_PATH="$(PROJ_DIR)" \
-DDUCKDB_EXTENSION_PIXELS_SHOULD_LINK="TRUE" -DDUCKDB_EXTENSION_PIXELS_INCLUDE_PATH="$(PROJ_DIR)include" \
-DCMAKE_PREFIX_PATH=$(PROJ_DIR)third-party/protobuf/cmake/build -DPIXELS_SRC="$(dirname $(pwd))"
FB_FLAGS=-DUSE_FLATBUFFERS=ON \
-DDUCKDB_EXTENSION_NAMES="pixels" \
-DDUCKDB_EXTENSION_PIXELS_PATH="$(PROJ_DIR)" \
-DDUCKDB_EXTENSION_PIXELS_SHOULD_LINK="TRUE" \
-DDUCKDB_EXTENSION_PIXELS_INCLUDE_PATH="$(PROJ_DIR)include" \
-DPIXELS_SRC="$(PIXELS_BASE_DIR)"

pull:
git submodule init
Expand All @@ -37,24 +38,23 @@ pull:
update:
git submodule update --remote --merge pixels-duckdb/duckdb
git -C third-party/googletest checkout v1.15.2
git -C third-party/protobuf checkout v3.21.6

deps:
mkdir -p "${PROTOBUF_DIR}/cmake/build" && cd "third-party/protobuf/cmake/build" && \
cmake -Dprotobuf_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=Release ../.. -DCMAKE_POSITION_INDEPENDENT_CODE=ON \
-Dprotobuf_BUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=./ && \
make -j install

clean:
rm -rf build/release
rm -rf build/debug
rm -rf build/fb-release
rm -rf build/fb-debug
cd pixels-duckdb/duckdb && make clean

# Main build
debug: deps
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/debug && \
cmake --build build/debug --config Debug
debug: fb-debug
release: fb-release

fb-release:
cmake $(GENERATOR) $(FORCE_COLOR) $(FB_FLAGS) ${CLIENT_FLAGS} \
-DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} \
-S pixels-duckdb/duckdb -B build/release && \
cmake --build build/release --config Release

release: deps
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/release && \
cmake --build build/release --config Release
fb-debug:
cmake $(GENERATOR) $(FORCE_COLOR) $(FB_FLAGS) ${CLIENT_FLAGS} \
-DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} \
-S pixels-duckdb/duckdb -B build/debug && \
cmake --build build/debug --config Debug
2 changes: 1 addition & 1 deletion cpp/include/PixelsScanFunction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ namespace duckdb
PixelsScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input,
GlobalTableFunctionState *gstate_p);

static bool PixelsParallelStateNext(ClientContext &context, const PixelsReadBindData &bind_data,
static bool PixelsParallelStateNext(ClientContext &context, PixelsReadBindData &bind_data,
PixelsReadLocalState &scan_data, PixelsReadGlobalState &parallel_state,
bool is_init_state = false);

Expand Down
2 changes: 1 addition & 1 deletion cpp/pixels-cli/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
project(pixels-cli)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 20)

include(ExternalProject)
include(ProcessorCount)
Expand Down
2 changes: 1 addition & 1 deletion cpp/pixels-cli/include/executor/LoadExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ class LoadExecutor : public CommandExecutor

private:
bool startConsumers(const std::vector <std::string> &inputFiles, Parameters parameters,
const std::vector <std::string> &loadedFiles);
const std::vector <std::string> &loadedFiles, int concurrency);
};
#endif //PIXELS_LOADEXECUTOR_H
2 changes: 2 additions & 0 deletions cpp/pixels-cli/include/load/PixelsConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <vector>
#include <string>
#include <mutex>
#include <load/Parameters.h>

class PixelsConsumer
Expand All @@ -39,6 +40,7 @@ class PixelsConsumer

private:
static int GlobalTargetPathId;
static std::mutex globalMutex; // Mutex to protect GlobalTargetPathId
std::vector <std::string> queue;
Parameters parameters;
std::vector <std::string> loadedFiles;
Expand Down
59 changes: 53 additions & 6 deletions cpp/pixels-cli/lib/executor/LoadExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,35 @@
*/
#include <executor/LoadExecutor.h>
#include <iostream>
#include <fstream>
#include <filesystem>
#include <encoding/EncodingLevel.h>
#include <physical/storage/LocalFS.h>
#include <load/Parameters.h>
#include <chrono>
#include <thread>
#include <load/PixelsConsumer.h>

void LoadExecutor::execute(const bpo::variables_map &ns, const std::string &command)
{
std::string schema = ns["schema"].as<std::string>();
if (std::filesystem::exists(schema) && std::filesystem::is_regular_file(schema))
{
std::ifstream ifs(schema);
if (ifs.is_open())
{
std::stringstream buffer;
buffer << ifs.rdbuf();
schema = buffer.str();
}
}
std::string origin = ns["origin"].as<std::string>();
std::string target = ns["target"].as<std::string>();
int rowNum = ns["row_num"].as<int>();
std::string regex = ns["row_regex"].as<std::string>();
EncodingLevel encodingLevel = EncodingLevel::from(ns["encoding_level"].as<int>());
bool nullPadding = ns["nulls_padding"].as<bool>();
int concurrency = ns["concurrency"].as<int>();

if (origin.back() != '/')
{
Expand All @@ -55,7 +69,7 @@ void LoadExecutor::execute(const bpo::variables_map &ns, const std::string &comm
}

auto startTime = std::chrono::system_clock::now();
if (startConsumers(inputFiles, parameters, loadedFiles))
if (startConsumers(inputFiles, parameters, loadedFiles, concurrency))
{
std::cout << command << " is successful" << std::endl;
}
Expand All @@ -65,14 +79,47 @@ void LoadExecutor::execute(const bpo::variables_map &ns, const std::string &comm
}
auto endTime = std::chrono::system_clock::now();
std::chrono::duration<double> elapsedSeconds = endTime - startTime;
std::cout << "Text file in " << origin << " are loaded by 1 thread in "
std::cout << "Text file in " << origin << " are loaded by " << concurrency << " thread(s) in "
<< elapsedSeconds.count() << " seconds." << std::endl;
}

bool LoadExecutor::startConsumers(const std::vector <std::string> &inputFiles, Parameters parameters,
const std::vector <std::string> &loadedFiles)
const std::vector <std::string> &loadedFiles, int concurrency)
{
PixelsConsumer consumer(inputFiles, parameters, loadedFiles);
consumer.run();
if (concurrency <= 1 || inputFiles.size() <= 1)
{
// Single-threaded mode
PixelsConsumer consumer(inputFiles, parameters, loadedFiles);
consumer.run();
}
else
{
// Multi-threaded mode: each thread processes one file
std::vector<std::thread> threads;
int numThreads = std::min(concurrency, static_cast<int>(inputFiles.size()));
std::vector<std::vector<std::string>> inputfilesQueue(numThreads);
int currentThread=0;
for (int i = 0; i < inputFiles.size(); ++i)
{
inputfilesQueue[(currentThread++)%numThreads].push_back(inputFiles[i]);
}
// Each thread gets one file queue to process
for (int i=0;i<numThreads;i++)
{
auto queue=inputfilesQueue[i];
threads.emplace_back([queue, parameters, loadedFiles]() {
PixelsConsumer consumer(queue, parameters, loadedFiles);
consumer.run();
});
}
// Wait for all threads to complete
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
return true;
}
}
14 changes: 9 additions & 5 deletions cpp/pixels-cli/lib/load/PixelsConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#include <fstream>
#include <sstream>
#include <chrono>
#include <mutex>

int PixelsConsumer::GlobalTargetPathId = 0;
std::mutex PixelsConsumer::globalMutex;

PixelsConsumer::PixelsConsumer(const std::vector <std::string> &queue, const Parameters &parameters,
const std::vector <std::string> &loadedFiles)
Expand Down Expand Up @@ -106,8 +108,13 @@ void PixelsConsumer::run()
if (initPixelsFile)
{
LocalFS targetStorage;
int fileId;
{
std::lock_guard<std::mutex> lock(globalMutex);
fileId = GlobalTargetPathId++;
}
targetFileName = std::to_string(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())) + \
"_" + std::to_string(this->loadedFiles.size()) + ".pxl";
"_" + std::to_string(fileId) + ".pxl";
targetFilePath = targetPath + targetFileName;
pixelsWriter = std::make_shared<PixelsWriterImpl>(schema, pixelsStride, rowGroupSize,
targetFilePath, blockSize,
Expand Down Expand Up @@ -137,10 +144,7 @@ void PixelsConsumer::run()

if (rowBatch->rowCount == rowBatch->getMaxSize())
{
std::cout << "writing row group to file: " << targetFilePath << " rowCount:" << rowBatch->rowCount
<< std::endl;
pixelsWriter->addRowBatch(rowBatch);

rowBatch->reset();
}

Expand Down Expand Up @@ -173,4 +177,4 @@ void PixelsConsumer::run()
this->loadedFiles.push_back(targetFilePath);
}
std::cout << "Exit PixelsConsumer" << std::endl;
}
}
11 changes: 6 additions & 5 deletions cpp/pixels-cli/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ int main()
("encoding_level,e", bpo::value<int>()->default_value(2),
"specify the encoding level for data loading")
("nulls_padding,p", bpo::value<bool>()->default_value(false),
"specify whether nulls padding is enabled");
"specify whether nulls padding is enabled")
("concurrency,c", bpo::value<int>()->default_value(1),
"specify the number of threads for data loading");

bpo::variables_map vm;
try
Expand All @@ -127,10 +129,8 @@ int main()
{
std::cerr << "Error parsing options: " << e.what() << "\n";
}
// try {
LoadExecutor *loadExecutor = new LoadExecutor();
std::unique_ptr<LoadExecutor> loadExecutor = std::make_unique<LoadExecutor>();
loadExecutor->execute(vm, command);
// } catch
}
else if (command == "QUERY")
{
Expand Down Expand Up @@ -160,6 +160,7 @@ int main()
{
std::cout << "Command " << command << " not found" << std::endl;
}
for (char* p : argv) free(p);
} // end of while loop
return 0;
}
}
Loading