diff --git a/examples/ArgosCollector/CMakeLists.txt b/examples/ArgosCollector/CMakeLists.txt new file mode 100644 index 00000000..bf8bddd0 --- /dev/null +++ b/examples/ArgosCollector/CMakeLists.txt @@ -0,0 +1 @@ +simdb_test(ArgosCollector main.cpp) diff --git a/examples/ArgosCollector/main.cpp b/examples/ArgosCollector/main.cpp new file mode 100644 index 00000000..a9e4e576 --- /dev/null +++ b/examples/ArgosCollector/main.cpp @@ -0,0 +1,343 @@ +#include "SimDBTester.hpp" +#include "simdb/apps/AppManager.hpp" +#include "simdb/apps/argos/CollectionMgr.hpp" + +#include +std::random_device rd; // Seed source for the random number engine +std::mt19937 gen(rd()); // mersenne_twister_engine + +/// This test shows how to use the SimDB data collection system for Argos. +TEST_INIT; + +/// Enum used to verify TinyStrings +enum class Colors { RED = 1, GREEN = 2, BLUE = 3, WHITE = 0, TRANSPARENT = -1 }; + +inline std::ostream& operator<<(std::ostream& os, const Colors& c) +{ + switch (c) + { + case Colors::RED: + os << "RED"; + break; + case Colors::GREEN: + os << "GREEN"; + break; + case Colors::BLUE: + os << "BLUE"; + break; + case Colors::WHITE: + os << "WHITE"; + break; + case Colors::TRANSPARENT: + os << "TRANSPARENT"; + break; + default: + os << "UNKNOWN"; + break; + } + return os; +} + +/// Example struct that contains a wide variety of supported data fields +struct DummyPacket +{ + Colors e_color; + char ch; + int8_t int8; + int16_t int16; + int32_t int32; + int64_t int64; + uint8_t uint8; + uint16_t uint16; + uint32_t uint32; + uint64_t uint64; + float flt; + double dbl; + bool b; + std::string str; +}; + +using DummyPacketPtr = std::shared_ptr; +using DummyPacketPtrVec = std::vector; + +/// Random number/string/struct generators +template T generateRandomInt() +{ + constexpr auto minval = std::numeric_limits::min(); + constexpr auto maxval = std::numeric_limits::max(); + static std::uniform_int_distribution distrib(minval, maxval); + return distrib(gen); +} + +template T generateRandomFloat() +{ + constexpr auto minval = std::numeric_limits::min(); + constexpr auto maxval = std::numeric_limits::max(); + static std::uniform_real_distribution distrib(minval, maxval); + return distrib(gen); +} + +char generateRandomChar() +{ + return 'A' + rand() % 26; +} + +bool generateRandomBool() +{ + return rand() % 2 == 0; +} + +std::string generateRandomString(size_t minchars = 2, size_t maxchars = 8) +{ + EXPECT_TRUE(minchars <= maxchars); + + std::string str; + while (str.size() < minchars) + { + str += generateRandomChar(); + } + + while (str.size() < maxchars) + { + str += generateRandomChar(); + } + + return str; +} + +Colors generateRandomColor() +{ + return static_cast(rand() % 6 - 1); +} + +DummyPacketPtr generateRandomDummyPacket() +{ + auto s = std::make_shared(); + + s->e_color = generateRandomColor(); + s->ch = generateRandomChar(); + s->int8 = generateRandomInt(); + s->int16 = generateRandomInt(); + s->int32 = generateRandomInt(); + s->int64 = generateRandomInt(); + s->uint8 = generateRandomInt(); + s->uint16 = generateRandomInt(); + s->uint32 = generateRandomInt(); + s->uint64 = generateRandomInt(); + s->flt = generateRandomFloat(); + s->dbl = generateRandomFloat(); + s->b = generateRandomBool(); + s->str = generateRandomString(); + + return s; +} + +// Template specializations +namespace simdb { + +template <> void defineEnumMap(std::string& enum_name, std::map& map) +{ + enum_name = "Colors"; + map["RED"] = 1; + map["GREEN"] = 2; + map["BLUE"] = 3; + map["WHITE"] = 0; + map["TRANSPARENT"] = -1; +} + +template <> void defineStructSchema(StructSchema& schema) +{ + schema.addEnum("color"); + schema.addField("ch"); + schema.addField("int8"); + schema.addField("int16"); + schema.addField("int32"); + schema.addField("int64"); + schema.addField("uint8"); + schema.addField("uint16"); + schema.addField("uint32"); + schema.addField("uint64"); + schema.addField("flt"); + schema.addField("dbl"); + schema.addBool("b"); + schema.addString("str"); +} + +template <> void writeStructFields(const DummyPacket* pkt, StructFieldSerializer* serializer) +{ + serializer->writeField(pkt->e_color); + serializer->writeField(pkt->ch); + serializer->writeField(pkt->int8); + serializer->writeField(pkt->int16); + serializer->writeField(pkt->int32); + serializer->writeField(pkt->int64); + serializer->writeField(pkt->uint8); + serializer->writeField(pkt->uint16); + serializer->writeField(pkt->uint32); + serializer->writeField(pkt->uint64); + serializer->writeField(pkt->flt); + serializer->writeField(pkt->dbl); + serializer->writeField(pkt->b); + serializer->writeField(pkt->str); +} + +} // namespace simdb + +/// Example simulator that configures all supported types of collections. +class Sim +{ +public: + void configCollectables(simdb::DatabaseManager& db_mgr, simdb::CollectionMgr* collection_mgr) + { + db_mgr.safeTransaction([&]() { + collection_mgr->addClock("root", 10); + + uint64_collectable_ = collection_mgr->createCollectable("top.uint64", "root"); + bool_collectable_ = collection_mgr->createCollectable("top.bool", "root"); + enum_collectable_ = collection_mgr->createCollectable("top.enum", "root"); + dummy_packet_collectable_ = collection_mgr->createCollectable("top.dummy_packet", "root"); + dummy_collectable_vec_contig_ = collection_mgr->createIterableCollector( + "top.dummy_packet_vec_contig", "root", 32); + dummy_collectable_vec_sparse_ = collection_mgr->createIterableCollector( + "top.dummy_packet_vec_sparse", "root", 32); + }); + } + + void step() + { + randomizeDummyPacketCollectables_(); + auto tick = ++current_tick_; + + // Collect a random uint64_t between ticks 10 and 25 + if (tick == 1000) + { + uint64_collectable_->activate(generateRandomInt()); + } else if (tick == 2000) + { + uint64_collectable_->deactivate(); + } + + // Collect a random bool between ticks 1500 and 2500 + if (tick == 1500) + { + bool_collectable_->activate(rand() % 2 == 0); + } else if (tick == 2500) + { + bool_collectable_->deactivate(); + } + + // Collect a random enum between ticks 1800 and 2800 + if (tick == 1800) + { + enum_collectable_->activate(generateRandomColor()); + } else if (tick == 2800) + { + enum_collectable_->deactivate(); + } + + // Collect a random DummyPacket between ticks 2000 and 3000 + if (tick == 2000) + { + dummy_packet_collectable_->activate(generateRandomDummyPacket()); + } else if (tick == 3000) + { + dummy_packet_collectable_->deactivate(); + } + + // Collect some different values for just one cycle. To do this, we call + // the activate() method, passing in "once=true". + if (tick >= 5000 && tick % 5 == 0) + { + uint64_collectable_->activate(generateRandomInt(), true); + bool_collectable_->activate(rand() % 2 == 0, true); + enum_collectable_->activate(generateRandomColor(), true); + dummy_packet_collectable_->activate(generateRandomDummyPacket(), true); + } + + dummy_collectable_vec_contig_->activate(&dummy_packet_vec_contig_); + dummy_collectable_vec_sparse_->activate(&dummy_packet_vec_sparse_); + } + + uint64_t getCurrentTick() const { return current_tick_; } + +private: + void randomizeDummyPacketCollectables_() + { + dummy_packet_vec_contig_.clear(); + for (int i = 0; i < rand() % 10; ++i) + { + dummy_packet_vec_contig_.push_back(generateRandomDummyPacket()); + } + + dummy_packet_vec_sparse_.clear(); + dummy_packet_vec_sparse_.resize(32); + for (int i = 0; i < rand() % 10; ++i) + { + if (rand() % 2 == 0) + { + dummy_packet_vec_sparse_[i] = generateRandomDummyPacket(); + } + } + } + + uint64_t current_tick_ = 0; + + std::shared_ptr uint64_collectable_; + std::shared_ptr bool_collectable_; + std::shared_ptr enum_collectable_; + std::shared_ptr dummy_packet_collectable_; + + DummyPacketPtrVec dummy_packet_vec_contig_; + std::shared_ptr dummy_collectable_vec_contig_; + + DummyPacketPtrVec dummy_packet_vec_sparse_; + std::shared_ptr dummy_collectable_vec_sparse_; +}; + +int main(int argc, char** argv) +{ + simdb::AppManagers app_mgrs; + app_mgrs.registerApp(); + + // Create the app/db managers + auto& app_mgr = app_mgrs.createAppManager("test.db"); + auto& db_mgr = app_mgrs.getDatabaseManager(); + + // Create the test simulator + Sim sim; + + // Setup... + app_mgr.enableApp(simdb::CollectionMgr::NAME); + app_mgrs.createEnabledApps(); + app_mgrs.createSchemas(); + + auto collection_mgr = app_mgr.getApp(); + sim.configCollectables(db_mgr, collection_mgr); + + app_mgrs.postInit(argc, argv); + app_mgrs.initializePipelines(); + app_mgrs.openPipelines(); + + // Simulate... + while (true) + { + sim.step(); + auto tick = sim.getCurrentTick(); + + // "Sweep" the collection system for the current cycle, + // sending all active values to the database. + collection_mgr->sweep("root", tick); + + // Stop at 10k steps + if (tick == 10000) + { + break; + } + } + + // Finalize... + app_mgrs.postSimLoopTeardown(); + + REPORT_ERROR; + return ERROR_CODE; +} diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index fd6d7a8d..661fceb2 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(ConcurrentApps) add_subdirectory(DatabaseWatchdog) add_subdirectory(PipelineSnoopers) add_subdirectory(AppFactory) +add_subdirectory(ArgosCollector) # TODO cnyce: fix sporadic failures #add_subdirectory(MultiPortStages) diff --git a/include/simdb/apps/argos/CollectionBuffer.hpp b/include/simdb/apps/argos/CollectionBuffer.hpp index f6766d44..7cfd0ed9 100644 --- a/include/simdb/apps/argos/CollectionBuffer.hpp +++ b/include/simdb/apps/argos/CollectionBuffer.hpp @@ -2,7 +2,7 @@ #pragma once -#include "simdb/utils/MetaStructs.hpp" +#include "simdb/utils/TypeTraits.hpp" #include #include diff --git a/include/simdb/apps/argos/CollectionMgr.hpp b/include/simdb/apps/argos/CollectionMgr.hpp new file mode 100644 index 00000000..4f28f7cd --- /dev/null +++ b/include/simdb/apps/argos/CollectionMgr.hpp @@ -0,0 +1,408 @@ +#pragma once + +#include "simdb/apps/App.hpp" +#include "simdb/apps/argos/CollectionPoints.hpp" +#include "simdb/apps/argos/TreeNode.hpp" +#include "simdb/pipeline/Pipeline.hpp" +#include "simdb/pipeline/PipelineManager.hpp" +#include "simdb/schema/SchemaDef.hpp" +#include "simdb/utils/Compress.hpp" + +#include + +namespace simdb { + +/*! + * \class CollectionMgr + * + * \brief This class provides an easy way to handle simulation-wide data collection. + * All databases produced by this app are readable by the Argos pipeline viewer front-end. + */ +class CollectionMgr : public App +{ +public: + static constexpr auto NAME = "collection-mgr"; + + //! \brief Construct with the associated DatabaseManager and heartbeat. The heartbeat + //! value dictates the max number of cycles that we employ the optimization "only write + //! to the database if the collected data is different from the last collected data". + //! This prevents Argos from having to go back more than N cycles to find the last + //! known value. + CollectionMgr(DatabaseManager* db_mgr, size_t heartbeat = 100) : + db_mgr_(db_mgr), + heartbeat_(heartbeat), + tiny_strings_(db_mgr) + { + } + + //! \brief Populate the schema with the appropriate tables for all the collections. + static void defineSchema(Schema& schema) + { + using dt = SqlDataType; + + auto& globals_tbl = schema.addTable("CollectionGlobals"); + globals_tbl.addColumn("Heartbeat", dt::int32_t); + globals_tbl.setColumnDefaultValue("Heartbeat", 10); + + auto& clks_tbl = schema.addTable("Clocks"); + clks_tbl.addColumn("Name", dt::string_t); + clks_tbl.addColumn("Period", dt::int32_t); + + auto& elem_tns_tbl = schema.addTable("ElementTreeNodes"); + elem_tns_tbl.addColumn("Name", dt::string_t); + elem_tns_tbl.addColumn("ParentID", dt::int32_t); + + auto& collectable_tns_tbl = schema.addTable("CollectableTreeNodes"); + collectable_tns_tbl.addColumn("ElementTreeNodeID", dt::int32_t); + collectable_tns_tbl.addColumn("ClockID", dt::int32_t); + collectable_tns_tbl.addColumn("DataType", dt::string_t); + collectable_tns_tbl.addColumn("Location", dt::string_t); + collectable_tns_tbl.addColumn("AutoCollected", dt::int32_t); + collectable_tns_tbl.setColumnDefaultValue("AutoCollected", 0); + + auto& struct_fields_tbl = schema.addTable("StructFields"); + struct_fields_tbl.addColumn("StructName", dt::string_t); + struct_fields_tbl.addColumn("FieldName", dt::string_t); + struct_fields_tbl.addColumn("FieldType", dt::string_t); + struct_fields_tbl.addColumn("FormatCode", dt::int32_t); + struct_fields_tbl.addColumn("IsAutoColorizeKey", dt::int32_t); + struct_fields_tbl.addColumn("IsDisplayedByDefault", dt::int32_t); + struct_fields_tbl.setColumnDefaultValue("IsAutoColorizeKey", 0); + struct_fields_tbl.setColumnDefaultValue("IsDisplayedByDefault", 1); + + auto& enum_defns_tbl = schema.addTable("EnumDefns"); + enum_defns_tbl.addColumn("EnumName", dt::string_t); + enum_defns_tbl.addColumn("EnumValStr", dt::string_t); + enum_defns_tbl.addColumn("EnumValBlob", dt::blob_t); + enum_defns_tbl.addColumn("IntType", dt::string_t); + + auto& string_map_tbl = schema.addTable("StringMap"); + string_map_tbl.addColumn("IntVal", dt::int32_t); + string_map_tbl.addColumn("String", dt::string_t); + + auto& collection_records_tbl = schema.addTable("CollectionRecords"); + collection_records_tbl.addColumn("Tick", dt::uint64_t); + collection_records_tbl.addColumn("Data", dt::blob_t); + collection_records_tbl.addColumn("OldestReferredTick", dt::uint64_t); + collection_records_tbl.setColumnDefaultValue("OldestReferredTick", 0); + collection_records_tbl.createIndexOn("Tick"); + + auto& queue_max_sizes_tbl = schema.addTable("QueueMaxSizes"); + queue_max_sizes_tbl.addColumn("CollectableTreeNodeID", dt::int32_t); + queue_max_sizes_tbl.addColumn("MaxSize", dt::int32_t); + } + + //! \brief Add a new clock domain for collection. + void addClock(const std::string& name, const uint32_t period) + { + auto [it, inserted] = clocks_.insert({name, period}); + if (!inserted && it->second != period) + { + throw DBException("Clock '") << name << "' already registered with period " << it->second + << ". Cannot change period to " << period << "."; + } + } + + //! \brief Create a collection point for a POD or struct-like type. + template + std::shared_ptr createCollectable(const std::string& path, const std::string& clock) + { + auto treenode = updateTree_(path, clock); + auto elem_id = treenode->db_id; + auto clk_id = treenode->clk_id; + + using value_type = type_traits::remove_any_pointer_t; + + std::string dtype; + if constexpr (std::is_same_v) + { + dtype = "bool"; + } else if constexpr (std::is_trivial_v) + { + dtype = getFieldDTypeStr(getFieldDTypeEnum()); + } else + { + dtype = demangle(typeid(value_type).name()); + } + + auto collectable = std::make_shared(elem_id, clk_id, heartbeat_, dtype, &tiny_strings_); + + if constexpr (!std::is_trivial::value) + { + StructSerializer::getInstance()->serializeDefn(db_mgr_); + } + + collectables_.push_back(collectable); + collectables_by_path_[path] = collectable.get(); + + return collectable; + } + + // Automatically collect iterable data (non-POD types). + template + std::shared_ptr> + createIterableCollector(const std::string& path, const std::string& clock, const size_t capacity) + { + auto treenode = updateTree_(path, clock); + auto elem_id = treenode->db_id; + auto clk_id = treenode->clk_id; + + using value_type = type_traits::remove_any_pointer_t; + + if constexpr (!std::is_trivial::value) + { + StructSerializer::getInstance()->serializeDefn(db_mgr_); + } + + std::string dtype; + if constexpr (std::is_same_v) + { + dtype = "bool"; + } else if constexpr (std::is_trivial_v) + { + dtype = getFieldDTypeStr(getFieldDTypeEnum()); + } else + { + dtype = demangle(typeid(value_type).name()); + } + + if constexpr (Sparse) + { + dtype += "_sparse"; + } else + { + dtype += "_contig"; + } + + dtype += "_capacity" + std::to_string(capacity); + + using collection_point_type = + std::conditional_t; + auto collectable = + std::make_shared(elem_id, clk_id, heartbeat_, dtype, capacity, &tiny_strings_); + collectables_.push_back(collectable); + collectables_by_path_[path] = collectable.get(); + return collectable; + } + + // Create the pipeline which processes all collected data. + void createPipeline(pipeline::PipelineManager* pipeline_mgr) override + { + auto pipeline = pipeline_mgr->createPipeline(NAME, this); + + pipeline->addStage("compressor"); + pipeline->addStage("db_writer"); + pipeline->noMoreStages(); + + pipeline->bind("compressor.compressed_data", "db_writer.data_to_write"); + pipeline->noMoreBindings(); + + // As soon as we call noMoreBindings(), all input/output queues are available. + pipeline_head_ = pipeline->getInPortQueue("compressor.input_data"); + } + + // Sweep the collection system for all active collectables that exist on + // the given clock, and send their data to the database. + void sweep(const std::string& clk, uint64_t tick) + { + const auto clk_id = clock_db_ids_by_name_.at(clk); + uint64_t oldest_referred_tick = std::numeric_limits::max(); + + std::vector swept_data; + for (auto& collectable : collectables_) + { + if (collectable->getClockId() == clk_id) + { + collectable->sweep(swept_data); + + auto oldest_tick = collectable->getLastCollectedTick(); + if (oldest_tick > 0) + { + oldest_referred_tick = std::min(oldest_referred_tick, oldest_tick); + } + } + } + + if (swept_data.empty()) + { + return; + } + + DatabaseEntry entry; + entry.bytes = std::move(swept_data); + entry.tick = tick; + + if (oldest_referred_tick != std::numeric_limits::max()) + { + entry.oldest_referred_tick = oldest_referred_tick; + } + + pipeline_head_->emplace(std::move(entry)); + } + + // One-time call to write post-simulation metadata to SimDB. + void postTeardown() override + { + for (auto& collectable : collectables_) + { + if (auto iterable_collector = dynamic_cast(collectable.get())) + { + auto elem_id = iterable_collector->getElemId(); + auto queue_max_size = iterable_collector->getQueueMaxSize(); + db_mgr_->INSERT(SQL_TABLE("QueueMaxSizes"), SQL_VALUES((int)elem_id, (int)queue_max_size)); + } + } + + tiny_strings_.serialize(); + } + +private: + /// \class DatabaseEntry + /// \brief Packet type sent down collection pipeline + struct DatabaseEntry + { + std::vector bytes; + uint64_t tick = std::numeric_limits::max(); + uint64_t oldest_referred_tick = std::numeric_limits::max(); + }; + + /// Compress on pipeline thread 1 + class CompressionStage : public pipeline::Stage + { + public: + CompressionStage() + { + addInPort_("input_data", input_queue_); + addOutPort_("compressed_data", output_queue_); + } + + private: + pipeline::PipelineAction run_(bool) override + { + DatabaseEntry entry; + if (input_queue_->try_pop(entry)) + { + DatabaseEntry compressed_entry; + compressed_entry.tick = entry.tick; + compressed_entry.oldest_referred_tick = entry.oldest_referred_tick; + compressData(entry.bytes, compressed_entry.bytes); + + output_queue_->emplace(std::move(compressed_entry)); + return simdb::pipeline::PROCEED; + } + + return simdb::pipeline::SLEEP; + } + + ConcurrentQueue* input_queue_ = nullptr; + ConcurrentQueue* output_queue_ = nullptr; + }; + + /// Write to SQLite on dedicated database thread. + class DatabaseStage : public pipeline::DatabaseStage + { + public: + DatabaseStage() { addInPort_("data_to_write", input_queue_); } + + private: + pipeline::PipelineAction run_(bool) override + { + DatabaseEntry entry; + if (input_queue_->try_pop(entry)) + { + auto inserter = getTableInserter_("CollectionRecords"); + inserter->createRecordWithColValues(entry.tick, entry.bytes, entry.oldest_referred_tick); + + return pipeline::PROCEED; + } + return pipeline::SLEEP; + } + + ConcurrentQueue* input_queue_ = nullptr; + }; + + /// Update the collection tree piecemeal as the manager creates new collection points. + TreeNode* updateTree_(const std::string& path, const std::string& clk) + { + if (!root_) + { + root_ = std::make_unique("root"); + auto record = db_mgr_->INSERT(SQL_TABLE("ElementTreeNodes"), SQL_VALUES("root", 0)); + root_->db_id = record->getId(); + } + + if (clock_db_ids_by_name_.find(clk) == clock_db_ids_by_name_.end()) + { + auto period = clocks_.at(clk); + auto record = db_mgr_->INSERT(SQL_TABLE("Clocks"), SQL_VALUES(clk, period)); + clock_db_ids_by_name_[clk] = record->getId(); + } + + auto node = root_.get(); + std::vector path_parts; + boost::split(path_parts, path, boost::is_any_of(".")); + for (size_t part_idx = 0; part_idx < path_parts.size(); ++part_idx) + { + auto part = path_parts[part_idx]; + auto found = false; + for (const auto& child : node->children) + { + if (child->name == part) + { + node = child.get(); + found = true; + break; + } + } + + if (!found) + { + auto new_node = std::make_unique(part, node); + node->children.push_back(std::move(new_node)); + node = node->children.back().get(); + + auto record = db_mgr_->INSERT(SQL_TABLE("ElementTreeNodes"), SQL_VALUES(part, node->parent->db_id)); + + node->db_id = record->getId(); + if (part_idx == path_parts.size() - 1) + { + node->clk_id = clock_db_ids_by_name_.at(clk); + } + } + } + + return node; + } + + /// The DatabaseManager that we are collecting data for. + DatabaseManager* db_mgr_; + + /// The max number of cycles that we employ the optimization "only write to the + /// database if the collected data is different from the last collected data". + /// This prevents Argos from having to go back more than N cycles to find the + /// last known value. + const size_t heartbeat_; + + /// All registered clocks (name->period). + std::unordered_map clocks_; + + /// The root of the serialized element tree. + std::unique_ptr root_; + + /// Mapping of clock names to clock IDs. + std::unordered_map clock_db_ids_by_name_; + + /// All collectables. + std::vector> collectables_; + + /// Mapping of collectable paths to the collectable objects. + std::unordered_map collectables_by_path_; + + /// Mapping from strings to integer ID. + TinyStrings<> tiny_strings_; + + /// Main entry point to the pipeline. + ConcurrentQueue* pipeline_head_ = nullptr; +}; + +} // namespace simdb diff --git a/include/simdb/apps/argos/CollectionPoints.hpp b/include/simdb/apps/argos/CollectionPoints.hpp index 5e9c8320..3c3ba85c 100644 --- a/include/simdb/apps/argos/CollectionPoints.hpp +++ b/include/simdb/apps/argos/CollectionPoints.hpp @@ -2,7 +2,7 @@ #pragma once -#include "simdb/pipeline/Serialize.hpp" +#include "simdb/apps/argos/Serialize.hpp" #include #include @@ -46,12 +46,14 @@ class TickReader class CollectionPointBase { public: - CollectionPointBase(uint16_t elem_id, uint16_t clk_id, size_t heartbeat, const std::string& dtype) : + CollectionPointBase(uint16_t elem_id, uint16_t clk_id, size_t heartbeat, const std::string& dtype, + TinyStrings<>* tiny_strings) : argos_record_(elem_id), elem_id_(elem_id), clk_id_(clk_id), heartbeat_(heartbeat), - dtype_(dtype) + dtype_(dtype), + tiny_strings_(tiny_strings) { } @@ -94,6 +96,8 @@ class CollectionPointBase bool isAutoCollected() const { return is_auto_collected_; } + uint64_t getLastCollectedTick() const { return last_collected_tick_; } + /// Append the collected data from the black box unless the status is /// DONT_READ. void sweep(std::vector& swept_data) @@ -101,6 +105,14 @@ class CollectionPointBase if (argos_record_.status != ArgosRecord::Status::DONT_READ) { swept_data.insert(swept_data.end(), argos_record_.data.begin(), argos_record_.data.end()); + last_collected_tick_ = getTick_(); + } else if (last_collected_tick_ > 0) + { + CollectionBuffer buffer(reusable_buf_, getElemId()); + static constexpr uint8_t REFER_TICK = std::numeric_limits::max(); + buffer << REFER_TICK; + buffer << last_collected_tick_; + swept_data.insert(swept_data.end(), reusable_buf_.begin(), reusable_buf_.end()); } if (argos_record_.status == ArgosRecord::Status::READ_ONCE) @@ -109,13 +121,6 @@ class CollectionPointBase } } - /// Called at the end of simulation / when the pipeline collector is - /// destroyed. Given the DatabaseManager in case the collectable needs to - /// write any final metadata etc. - /// - /// Note that postSim() is called inside a BEGIN/COMMIT TRANSACTION block. - virtual void postSim(DatabaseManager*) {} - protected: ArgosRecord argos_record_; @@ -129,6 +134,8 @@ class CollectionPointBase uint64_t getTick_() const { return tick_reader_ ? tick_reader_->getTick() : 0; } + TinyStrings<>* getTinyStrings_() const { return tiny_strings_; } + private: const uint16_t elem_id_; const uint16_t clk_id_; @@ -136,6 +143,9 @@ class CollectionPointBase const std::string dtype_; TickReader* tick_reader_ = nullptr; bool is_auto_collected_ = false; + uint64_t last_collected_tick_ = 0; + std::vector reusable_buf_; + TinyStrings<>* tiny_strings_ = nullptr; }; #define LOG_MINIFICATION simdb::CollectionPointBase::minificationLoggingEnabled() @@ -205,7 +215,7 @@ class CollectionPoint : public CollectionPointBase if (LOG_MINIFICATION) std::cout << "\n\n[simdb verbose] tick " << getTick_() << ", cid " << getElemId() << "\n"; - const auto num_bytes = getNumBytes_(val); + const auto num_bytes = getNumBytes_(); if (!isAutoCollected()) { if (LOG_MINIFICATION) @@ -215,7 +225,7 @@ class CollectionPoint : public CollectionPointBase buffer << val; } else { - StructSerializer::getInstance()->extract(&val, curr_data_); + StructSerializer::getInstance()->extract(&val, curr_data_, getTinyStrings_()); buffer << curr_data_; } return; @@ -236,7 +246,7 @@ class CollectionPoint : public CollectionPointBase } } - StructSerializer::getInstance()->extract(&val, curr_data_); + StructSerializer::getInstance()->extract(&val, curr_data_, getTinyStrings_()); if (num_carry_overs_ < getHeartbeat() && curr_data_ == prev_data_) { if (LOG_MINIFICATION) @@ -254,7 +264,7 @@ class CollectionPoint : public CollectionPointBase } } - template size_t getNumBytes_(const T& val) + template size_t getNumBytes_() { if constexpr (std::is_same_v) { @@ -306,9 +316,28 @@ class CollectionPoint : public CollectionPointBase size_t num_carry_overs_ = 0; }; +/// Base class for sparse/contig iterable collectors. +class IterableCollectorBase : public CollectionPointBase +{ +public: + uint16_t getQueueMaxSize() const { return queue_max_size_; } + +protected: + template + IterableCollectorBase(Args&&... args) : + CollectionPointBase(std::forward(args)...) + { + } + + void updateQueueMaxSize_(uint16_t size) { queue_max_size_ = std::max(queue_max_size_, size); } + +private: + uint16_t queue_max_size_ = 0; +}; + /// Collectable for contiguous (non-sparse) iterable data e.g. /// queue/vector/deque/etc. -class ContigIterableCollectionPoint : public CollectionPointBase +class ContigIterableCollectionPoint : public IterableCollectorBase { public: /// Minification for these collectables can do the following: @@ -326,8 +355,8 @@ class ContigIterableCollectionPoint : public CollectionPointBase enum class Action : uint8_t { ARRIVE, DEPART, BOOKENDS, CHANGE, CARRY, FULL }; ContigIterableCollectionPoint(uint16_t elem_id, uint16_t clk_id, size_t heartbeat, const std::string& dtype, - size_t capacity) : - CollectionPointBase(elem_id, clk_id, heartbeat, dtype), + size_t capacity, TinyStrings<>* tiny_strings) : + IterableCollectorBase(elem_id, clk_id, heartbeat, dtype, tiny_strings), curr_snapshot_(capacity), prev_snapshot_(capacity) { @@ -598,7 +627,7 @@ class ContigIterableCollectionPoint : public CollectionPointBase size = prev_snapshot_.capacity(); } - queue_max_size_ = std::max(queue_max_size_, (uint16_t)size); + updateQueueMaxSize_((uint16_t)size); curr_snapshot_.clear(); auto itr = container.begin(); @@ -641,28 +670,21 @@ class ContigIterableCollectionPoint : public CollectionPointBase typename std::enable_if::value, bool>::type writeStruct_(const T& el, IterableSnapshot& snapshot, uint16_t bin_idx) { - StructSerializer::getInstance()->extract(&el, snapshot[bin_idx]); + StructSerializer::getInstance()->extract(&el, snapshot[bin_idx], getTinyStrings_()); return true; } - /// Write the maximum size of this queue during simulation. This is used - /// for Argos' SchedulingLines feature. - /// - /// Note that postSim() is called inside a BEGIN/COMMIT TRANSACTION block. - void postSim(DatabaseManager* db_mgr) override; - IterableSnapshot curr_snapshot_; IterableSnapshot prev_snapshot_; - uint16_t queue_max_size_ = 0; }; /// Collectable for sparse iterable data e.g. queue/vector/deque/etc. -class SparseIterableCollectionPoint : public CollectionPointBase +class SparseIterableCollectionPoint : public IterableCollectorBase { public: SparseIterableCollectionPoint(uint16_t elem_id, uint16_t clk_id, size_t heartbeat, const std::string& dtype, - size_t capacity) : - CollectionPointBase(elem_id, clk_id, heartbeat, dtype), + size_t capacity, TinyStrings<>* tiny_strings) : + IterableCollectorBase(elem_id, clk_id, heartbeat, dtype, tiny_strings), expected_capacity_(capacity) { prev_data_by_bin_.resize(capacity); @@ -723,7 +745,7 @@ class SparseIterableCollectionPoint : public CollectionPointBase } } - queue_max_size_ = std::max(queue_max_size_, num_valid); + updateQueueMaxSize_((uint16_t)num_valid); CollectionBuffer buffer(argos_record_.data, getElemId()); if (LOG_MINIFICATION) @@ -772,7 +794,7 @@ class SparseIterableCollectionPoint : public CollectionPointBase writeStruct_(const T& el, CollectionBuffer& buffer, uint16_t bin_idx) { buffer << bin_idx; - StructSerializer::getInstance()->writeStruct(&el, buffer); + StructSerializer::getInstance()->writeStruct(&el, buffer, getTinyStrings_()); if (LOG_MINIFICATION) std::cout << "[simdb verbose] bin " << bin_idx << ", " @@ -780,16 +802,9 @@ class SparseIterableCollectionPoint : public CollectionPointBase return true; } - /// Write the maximum size of this queue during simulation. This is used - /// for Argos' SchedulingLines feature. - /// - /// Note that postSim() is called inside a BEGIN/COMMIT TRANSACTION block. - void postSim(DatabaseManager* db_mgr) override; - const size_t expected_capacity_; std::vector> prev_data_by_bin_; std::vector num_carry_overs_by_bin_; - uint16_t queue_max_size_ = 0; }; } // namespace simdb diff --git a/include/simdb/apps/argos/Serialize.hpp b/include/simdb/apps/argos/Serialize.hpp index 825f1fb6..41fe08c4 100644 --- a/include/simdb/apps/argos/Serialize.hpp +++ b/include/simdb/apps/argos/Serialize.hpp @@ -3,7 +3,7 @@ #pragma once #include "simdb/Exceptions.hpp" -#include "simdb/pipeline/CollectionBuffer.hpp" +#include "simdb/apps/argos/CollectionBuffer.hpp" #include "simdb/utils/Demangle.hpp" #include "simdb/utils/TinyStrings.hpp" #include "simdb/utils/TypeTraits.hpp" @@ -39,7 +39,7 @@ enum class StructFields { string_t }; -template inline StructFields getFieldDTypeEnum() +template inline constexpr StructFields getFieldDTypeEnum() { if constexpr (std::is_same_v) { @@ -86,7 +86,7 @@ template inline StructFields getFieldDTypeEnum() return getFieldDTypeEnum(); } else { - throw DBException("Unsupported data type: ") << demangle(typeid(FieldT).name()); + static_assert(false, "Unsupported data type"); } } @@ -251,7 +251,31 @@ template class EnumMap const std::string& getEnumName() const { return enum_name_; } - void serializeDefn(DatabaseManager* db_mgr) const; + void serializeDefn(DatabaseManager* db_mgr) const + { + using enum_int_t = typename std::underlying_type::type; + + if (!serialized_) + { + auto dtype = getFieldDTypeEnum(); + auto int_type_str = getFieldDTypeStr(dtype); + + for (const auto& kvp : *map_) + { + auto enum_val_str = kvp.first; + auto enum_val_vec = convertIntToBlob(kvp.second); + + SqlBlob enum_val_blob; + enum_val_blob.data_ptr = enum_val_vec.data(); + enum_val_blob.num_bytes = enum_val_vec.size(); + + db_mgr->INSERT(SQL_TABLE("EnumDefns"), + SQL_VALUES(enum_name_, enum_val_str, enum_val_blob, int_type_str)); + } + + serialized_ = true; + } + } private: EnumMap() @@ -288,7 +312,19 @@ class FieldBase virtual size_t getNumBytes() const { return getDTypeNumBytes(dtype_); } - virtual void serializeDefn(DatabaseManager* db_mgr, const std::string& struct_name) const; + virtual void serializeDefn(DatabaseManager* db_mgr, const std::string& struct_name) const + { + const auto field_dtype_str = getFieldDTypeStr(dtype_); + const auto fmt = static_cast(format_); + const auto is_autocolorize_key = (int)isAutocolorizeKey(); + const auto is_displayed_by_default = (int)isDisplayedByDefault(); + + db_mgr->INSERT( + SQL_TABLE("StructFields"), + SQL_COLUMNS("StructName", "FieldName", "FieldType", "FormatCode", "IsAutoColorizeKey", + "IsDisplayedByDefault"), + SQL_VALUES(struct_name, name_, field_dtype_str, fmt, is_autocolorize_key, is_displayed_by_default)); + } void setIsAutocolorizeKey(bool is_autocolorize_key) { @@ -325,7 +361,18 @@ template class EnumField : public FieldBase { } - virtual void serializeDefn(DatabaseManager* db_mgr, const std::string& struct_name) const override; + void serializeDefn(DatabaseManager* db_mgr, const std::string& struct_name) const override + { + const auto field_name = getName(); + const auto is_autocolorize_key = (int)isAutocolorizeKey(); + const auto is_displayed_by_default = (int)isDisplayedByDefault(); + + db_mgr->INSERT(SQL_TABLE("StructFields"), + SQL_COLUMNS("StructName", "FieldName", "FieldType", "IsAutoColorizeKey", "IsDisplayedByDefault"), + SQL_VALUES(struct_name, field_name, enum_name_, is_autocolorize_key, is_displayed_by_default)); + + EnumMap::instance()->serializeDefn(db_mgr); + } private: const typename EnumMap::enum_map_t map_; @@ -351,6 +398,7 @@ template class StructFieldSerializer; /// the serializer. template void writeStructFields(const StructT* s, StructFieldSerializer* serializer) { + (void)s; (void)serializer; } @@ -381,9 +429,11 @@ template void writeStructFields(const StructT* s, StructField template class StructFieldSerializer { public: - StructFieldSerializer(const std::vector>& fields, CollectionBuffer& buffer) : + StructFieldSerializer(const std::vector>& fields, CollectionBuffer& buffer, + TinyStrings<>* tiny_strings) : fields_(fields), - buffer_(buffer) + buffer_(buffer), + tiny_strings_(tiny_strings) { } @@ -421,7 +471,7 @@ template class StructFieldSerializer { if (dynamic_cast(fields_[current_field_idx_].get())) { - uint32_t string_id = StringMap::instance()->getStringId(val); + uint32_t string_id = tiny_strings_->getStringID(val); writeField(string_id); } else { @@ -442,6 +492,7 @@ template class StructFieldSerializer size_t current_field_idx_ = 0; CollectionBuffer& buffer_; size_t num_bytes_written_ = 0; + TinyStrings<>* tiny_strings_ = nullptr; }; template class StructSerializer; @@ -623,17 +674,17 @@ template class StructSerializer void serializeDefn(DatabaseManager* db_mgr) const { schema_.serializeDefn(db_mgr); } - size_t writeStruct(const StructT* s, CollectionBuffer& buffer) const + size_t writeStruct(const StructT* s, CollectionBuffer& buffer, TinyStrings<>* tiny_strings) const { - StructFieldSerializer field_serializer(schema_.fields_, buffer); + StructFieldSerializer field_serializer(schema_.fields_, buffer, tiny_strings); field_serializer.writeFields(s); return field_serializer.numBytesWritten(); } - void extract(const StructT* s, std::vector& bytes) const + void extract(const StructT* s, std::vector& bytes, TinyStrings<>* tiny_strings) const { CollectionBuffer buffer(bytes); - writeStruct(s, buffer); + writeStruct(s, buffer, tiny_strings); } private: