Skip to content

Commit 69a8df5

Browse files
committed
DPL Analysis: use shm metadata in CCDB tables instead of binary view
1 parent 27d833f commit 69a8df5

8 files changed

Lines changed: 242 additions & 19 deletions

File tree

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "AnalysisCCDBHelpers.h"
1313
#include "CCDBFetcherHelper.h"
14+
#include "Framework/ArrowTypes.h"
1415
#include "Framework/DataProcessingStats.h"
1516
#include "Framework/DeviceSpec.h"
1617
#include "Framework/TimingInfo.h"
@@ -22,6 +23,7 @@
2223
#include "Framework/DanglingEdgesContext.h"
2324
#include "Framework/ConfigContext.h"
2425
#include "Framework/ConfigParamsHelper.h"
26+
#include <fairmq/Version.h>
2527
#include <arrow/array/builder_binary.h>
2628
#include <arrow/type.h>
2729
#include <arrow/type_fwd.h>
@@ -109,20 +111,45 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
109111
auto it = ccdbUrls.find(m.name);
110112
fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString());
111113
auto columnName = m.name.substr(strlen("ccdb:"));
114+
#if (FAIRMQ_VERSION_DEC >= 111000)
115+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, soa::asArrowDataType<int64_t[3]>(), false, fieldMetadata));
116+
#else
112117
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, fieldMetadata));
118+
#endif
113119
}
114120
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
115121
}
116122

123+
std::vector<std::pair<uint32_t, std::shared_ptr<arrow::FixedSizeListBuilder>>> allbuilders;
124+
allbuilders.resize([&schemas]() { size_t size = 0; for (auto& schema : schemas) { size += schema->num_fields(); }; return size; }());
125+
auto* pool = arrow::default_memory_pool();
126+
127+
int idx = 0;
128+
int sidx = 0;
129+
for (auto const& schema : schemas) {
130+
for (auto const& _ : schema->fields()) {
131+
#if (FAIRMQ_VERSION_DEC >= 111000)
132+
auto value_builder = std::make_shared<arrow::Int64Builder>();
133+
allbuilders[idx] = std::make_pair(sidx, std::make_shared<arrow::FixedSizeListBuilder>(pool, std::move(value_builder), 3));
134+
#else
135+
allbuilders[idx] = std::make_pair(sidx, std::make_shared<arrow::BinaryViewBuilder>());
136+
#endif
137+
++idx;
138+
}
139+
++sidx;
140+
}
141+
117142
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
118143
CCDBFetcherHelper::initialiseHelper(*helper, options);
119144
std::unordered_map<std::string, int> bindings;
120145
fillValidRoutes(*helper, spec.outputs, bindings);
121146

122-
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
147+
return adaptStateless([schemas, bindings, helper, allbuilders](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
123148
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
124149
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
125-
for (auto& schema : schemas) {
150+
std::ranges::for_each(allbuilders, [](auto& builder) { builder.second->Reset(); });
151+
for (auto i = 0U; i < schemas.size(); ++i) {
152+
auto& schema = schemas[i];
126153
std::vector<CCDBFetcherHelper::FetchOp> ops;
127154
auto inputBinding = *schema->metadata()->Get("sourceTable");
128155
auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
@@ -134,6 +161,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
134161
auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
135162
// FIXME: make the fTimestamp column configurable.
136163
auto timestampColumn = table->GetColumnByName("fTimestamp");
164+
auto reserveSize = timestampColumn->length();
137165
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
138166
"There are %zu bindings available", bindings.size());
139167
for (auto& binding : bindings) {
@@ -143,9 +171,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
143171
}
144172
int outputRouteIndex = bindings.at(outRouteDesc);
145173
auto& spec = helper->routes[outputRouteIndex].matcher;
146-
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
147-
for (auto const& _ : schema->fields()) {
148-
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
174+
auto builders = allbuilders | std::views::filter([&i](auto const& builder) { return builder.first == i; });
175+
unsigned int numBuilders = std::ranges::count_if(allbuilders, [&i](auto const& builder) { return builder.first == i; });
176+
arrow::Status status;
177+
std::ranges::for_each(builders, [&status, &reserveSize](auto& builder) {
178+
if (reserveSize > builder.second->capacity()) {
179+
status &= builder.second->Reserve(reserveSize - builder.second->capacity());
180+
}
181+
});
182+
if (!status.ok()) {
183+
throw framework::runtime_error_f("Failed to reserve arrays: ", status.ToString().c_str());
149184
}
150185

151186
for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
@@ -171,15 +206,25 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
171206
O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
172207
"Got %zu responses from server.",
173208
responses.size());
174-
if (builders.size() != responses.size()) {
175-
LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
209+
if (numBuilders != responses.size()) {
210+
LOGP(fatal, "Not enough responses (expected {}, found {})", numBuilders, responses.size());
176211
}
177212
arrow::Status result;
178-
for (size_t bi = 0; bi < responses.size(); bi++) {
179-
auto& builder = builders[bi];
213+
214+
int bi = 0;
215+
for (auto& builder : builders) {
180216
auto& response = responses[bi];
217+
#if (FAIRMQ_VERSION_DEC >= 111000)
218+
result &= builder.second->Append();
219+
auto* value_builder = dynamic_cast<arrow::Int64Builder*>(builder.second->value_builder());
220+
result &= value_builder->Append(response.id.handle);
221+
result &= value_builder->Append(response.id.segment);
222+
result &= value_builder->Append(response.size);
223+
#else
181224
char const* address = reinterpret_cast<char const*>(response.id.value);
182225
result &= builder->Append(std::string_view(address, response.size));
226+
#endif
227+
++bi;
183228
}
184229
if (!result.ok()) {
185230
LOGP(fatal, "Error adding results from CCDB");
@@ -188,9 +233,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
188233
}
189234
}
190235
arrow::ArrayVector arrays;
191-
for (auto& builder : builders) {
192-
arrays.push_back(*builder->Finish());
193-
}
236+
std::ranges::for_each(builders, [&arrays](auto& builder) { arrays.push_back(*builder.second->Finish()); });
194237
auto outTable = arrow::Table::Make(schema, arrays);
195238
auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
196239
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);

Framework/Core/include/Framework/ASoA.h

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "Framework/ArrowTableSlicingCache.h" // IWYU pragma: export
2525
#include "Framework/SliceCache.h" // IWYU pragma: export
2626
#include "Framework/VariantHelpers.h" // IWYU pragma: export
27+
#include <fairmq/Version.h>
2728
#include <arrow/array/array_binary.h>
2829
#include <arrow/table.h> // IWYU pragma: export
2930
#include <arrow/array.h> // IWYU pragma: export
@@ -36,9 +37,16 @@
3637
#include <cstring>
3738
#include <gsl/span> // IWYU pragma: export
3839

40+
namespace fair::mq::shmem {
41+
struct MetaHeader;
42+
}
43+
3944
namespace o2::framework
4045
{
4146
using ListVector = std::vector<std::vector<int64_t>>;
47+
#if (FAIRMQ_VERSION_DEC >= 111000)
48+
using PointerReconstructor = std::function<std::byte*(fair::mq::shmem::MetaHeader&&)>;
49+
#endif
4250

4351
std::string cutString(std::string&& str);
4452
std::string strToUpper(std::string&& str);
@@ -1081,6 +1089,9 @@ concept can_bind = requires(T&& t) {
10811089
template <typename... C>
10821090
concept has_index = (is_indexing_column<C> || ...);
10831091

1092+
template <typename C>
1093+
concept needs_ptr_rec = C::needs_ptr_rec;
1094+
10841095
template <typename D, typename O, typename IP, typename... C>
10851096
struct TableIterator : IP, C... {
10861097
public:
@@ -1248,6 +1259,21 @@ struct TableIterator : IP, C... {
12481259
{
12491260
doSetCurrentInternal(internal_index_columns_t{}, table);
12501261
}
1262+
#if (FAIRMQ_VERSION_DEC >= 111000)
1263+
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
1264+
{
1265+
[&pointerReconstructor, this]<typename... Cs>(framework::pack<Cs...>) {
1266+
([&pointerReconstructor, this]<typename CC>() {
1267+
if constexpr (needs_ptr_rec<CC>) {
1268+
if (pointerReconstructor) {
1269+
CC::ptrRec = &pointerReconstructor;
1270+
}
1271+
}
1272+
}.template operator()<Cs>(),
1273+
...);
1274+
}(all_columns{});
1275+
}
1276+
#endif
12511277

12521278
private:
12531279
/// Helper to move at the end of columns which actually have an iterator.
@@ -2285,7 +2311,12 @@ class Table
22852311
{
22862312
return self_t{mTable->Slice(0, 0), 0};
22872313
}
2288-
2314+
#if (FAIRMQ_VERSION_DEC >= 111000)
2315+
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
2316+
{
2317+
mBegin.setPointerReconstructor(pointerReconstructor);
2318+
}
2319+
#endif
22892320
private:
22902321
template <typename T>
22912322
arrow::ChunkedArray* lookupColumn()
@@ -2464,6 +2495,56 @@ consteval static std::string_view namespace_prefix()
24642495
}; \
24652496
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }
24662497

2498+
#if (FAIRMQ_VERSION_DEC >= 111000)
2499+
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
2500+
struct _Name_ : o2::soa::Column<int64_t[3], _Name_> { \
2501+
static constexpr const char* mLabel = _Label_; \
2502+
static constexpr const char* query = _CCDBQuery_; \
2503+
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
2504+
static constexpr bool needs_ptr_rec = true; \
2505+
std::function<std::byte*(fair::mq::shmem::MetaHeader&&)> const* ptrRec = nullptr; \
2506+
using base = o2::soa::Column<int64_t[3], _Name_>; \
2507+
using type = int64_t[3]; \
2508+
using column_t = _Name_; \
2509+
_Name_(arrow::ChunkedArray const* column) \
2510+
: o2::soa::Column<int64_t[3], _Name_>(o2::soa::ColumnIterator<int64_t[3]>(column)) \
2511+
{ \
2512+
} \
2513+
\
2514+
_Name_() = default; \
2515+
_Name_(_Name_ const& other) = default; \
2516+
_Name_& operator=(_Name_ const& other) = default; \
2517+
\
2518+
decltype(auto) _Getter_() const \
2519+
{ \
2520+
auto& [handle, segment, size] = *mColumnIterator; \
2521+
auto span = std::span<std::byte>{(*ptrRec)(fair::mq::shmem::MetaHeader{ \
2522+
static_cast<size_t>(size), \
2523+
0, handle, 0, 0, \
2524+
static_cast<uint16_t>(segment), true}), static_cast<size_t>(size)}; \
2525+
if constexpr (std::same_as<_ConcreteType_, std::span<std::byte>>) { \
2526+
return span; \
2527+
} else { \
2528+
static std::byte* payload = nullptr; \
2529+
static _ConcreteType_* deserialised = nullptr; \
2530+
static TClass* c = TClass::GetClass(#_ConcreteType_); \
2531+
if (payload != (std::byte*)span.data()) { \
2532+
payload = (std::byte*)span.data(); \
2533+
delete deserialised; \
2534+
TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \
2535+
deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \
2536+
} \
2537+
return *deserialised; \
2538+
} \
2539+
} \
2540+
\
2541+
decltype(auto) \
2542+
get() const \
2543+
{ \
2544+
return _Getter_(); \
2545+
} \
2546+
};
2547+
#else
24672548
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
24682549
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
24692550
static constexpr const char* mLabel = _Label_; \
@@ -2506,6 +2587,7 @@ consteval static std::string_view namespace_prefix()
25062587
return _Getter_(); \
25072588
} \
25082589
};
2590+
#endif
25092591

25102592
#define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \
25112593
DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_)
@@ -3849,7 +3931,12 @@ class FilteredBase : public T
38493931
{
38503932
return mCached;
38513933
}
3852-
3934+
#if (FAIRMQ_VERSION_DEC >= 111000)
3935+
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
3936+
{
3937+
mFilteredBegin.setPointerReconstructor(pointerReconstructor);
3938+
}
3939+
#endif
38533940
private:
38543941
void resetRanges()
38553942
{

Framework/Core/include/Framework/AnalysisDataModel.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
#include "SimulationDataFormat/MCGenProperties.h"
2727
#include "Framework/PID.h"
2828

29+
#include <fairmq/Version.h>
30+
#if (FAIRMQ_VERSION_DEC >= 111000)
31+
#include <fairmq/shmem/Common.h>
32+
#endif
33+
2934
namespace o2
3035
{
3136
namespace aod

0 commit comments

Comments
 (0)