From 09f7185773477058763ece20554ddc7c7d9000c9 Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 15:18:54 -0800 Subject: [PATCH 1/9] Blob.hpp doxygen --- include/simdb/schema/Blob.hpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/include/simdb/schema/Blob.hpp b/include/simdb/schema/Blob.hpp index d8f73c5a..ecb7756f 100644 --- a/include/simdb/schema/Blob.hpp +++ b/include/simdb/schema/Blob.hpp @@ -9,13 +9,23 @@ namespace simdb { -/// Blob descriptor used for writing and reading raw bytes -/// to/from the database. +/*! + * \struct SqlBlob + * + * \brief Blob descriptor used for writing and reading raw bytes to/from the + * database. Holds a pointer and byte count; used with SQL_VALUES() and + * blob columns (e.g. getPropertyBlob / setPropertyBlob on SqlRecord). + */ struct SqlBlob { + /// Pointer to the raw bytes to store or that were read. const void* data_ptr = nullptr; + /// Number of bytes at \p data_ptr. size_t num_bytes = 0; + /// Construct from a contiguous container (e.g. std::vector). + /// \tparam T Element type; sizeof(T) * vals.size() gives num_bytes. + /// \param vals Container whose data() and size() define the blob. template SqlBlob(const std::vector& vals) : data_ptr(vals.data()), @@ -27,6 +37,7 @@ struct SqlBlob SqlBlob(const SqlBlob&) = default; }; +/// Write a short debug representation of the blob to \p os (e.g. blob(addr, n=size)). inline std::ostream& operator<<(std::ostream& os, const SqlBlob& blob) { if (!blob.data_ptr || blob.num_bytes == 0) From 40e72e9c073ec9b111429d887d16db0673b175dd Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 15:25:10 -0800 Subject: [PATCH 2/9] Doxygen: PreparedINSERT.hpp and ValueContainer.hpp --- docs/doxygen-burndown.md | 130 ++++++++++++++++++++++++ include/simdb/sqlite/PreparedINSERT.hpp | 84 +++++++-------- include/simdb/sqlite/ValueContainer.hpp | 49 +++++++-- 3 files changed, 214 insertions(+), 49 deletions(-) create mode 100644 docs/doxygen-burndown.md diff --git a/docs/doxygen-burndown.md b/docs/doxygen-burndown.md new file mode 100644 index 00000000..d38fc228 --- /dev/null +++ b/docs/doxygen-burndown.md @@ -0,0 +1,130 @@ +# Doxygen documentation burndown list + +Classes/structs below have minimal or no Doxygen compared to the rest of the codebase (e.g. no `\class` block or only a one-line `///`). Target: add a `/*! \class Name \brief ... */` (or `\struct`) block and, where useful, `\param` / `\return` on public API. + +**Reference style** (well-documented): `DatabaseManager`, `Column`, `Table`, `Schema`, `Connection`, `Transaction`, `SqlQuery`, `SqlRecord`, `ResultWriterBase` and all `ResultWriter*` in Iterator.hpp, `CollectionBuffer`, `InterruptException`, `SqlBlob` (Blob.hpp). + +--- + +## include/simdb/sqlite + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| `PreparedINSERT` | PreparedINSERT.hpp | One-line `///` only; no `\class` block | +| `ValueContainerBase` | ValueContainer.hpp | Has full block ✓ | +| `Integral32ValueContainer` | ValueContainer.hpp | One-line `///` only | +| `IntegralU32ValueContainer` | ValueContainer.hpp | One-line `///` only | +| `Integral64ValueContainer` | ValueContainer.hpp | One-line `///` only | +| `IntegralU64ValueContainer` | ValueContainer.hpp | One-line `///` only | +| `FloatingPointValueContainer` | ValueContainer.hpp | One-line `///` only | +| `StringValueContainer` | ValueContainer.hpp | One-line `///` only | +| `BlobValueContainer` | ValueContainer.hpp | One-line `///` only | +| `VectorValueContainer` | ValueContainer.hpp | One-line `///` only | + +--- + +## include/simdb/schema + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| *(none remaining)* | | | + +--- + +## include/simdb/utils + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| `ThreadSafeLogger` | ThreadSafeLogger.hpp | No Doxygen; `//` comments only. Inner `Guard` also undocumented | +| `RunningMean` | RunningMean.hpp | `///` on class and methods; no `\class` block | +| `SelfProfiler` | TickTock.hpp | No Doxygen at all | + +--- + +## include/simdb/pipeline + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| `PollingThread` | PollingThread.hpp | `///` only; no `\class` block | +| `PipelineManager` | PipelineManager.hpp | `///` only; no `\class` block | +| `Stage` | Stage.hpp | No class-level Doxygen | +| `DatabaseStageBase` | Stage.hpp | No class-level Doxygen | +| `DatabaseStage` | Stage.hpp | No class-level Doxygen | +| `Runnable` | Runnable.hpp | `///` only; no `\class` block | +| `ScopedRunnableDisabler` | Runnable.hpp | `///` only; no `\class` block | +| `QueueBase` | Queue.hpp | `///` only; no `\class` block | +| `Queue` | Queue.hpp | `///` only; no `\class` block | +| `QueuePlaceholder` | QueueRepo.hpp | No class-level docs | +| `InputQueuePlaceholder` | QueueRepo.hpp | Only constructor `///` | +| `OutputQueuePlaceholder` | QueueRepo.hpp | Only constructor `///` | +| `StageQueueRepo` | QueueRepo.hpp | No class-level docs | +| `PipelineQueueRepo` | QueueRepo.hpp | No class-level docs | +| `Pipeline` | Pipeline.hpp | Multi-line `///` but no `\class` block | +| `Flusher` | Flusher.hpp | `///` only; no `\class` block | +| `FlusherWithTransaction` | Flusher.hpp | `///` only; no `\class` block | +| `DatabaseThread` | DatabaseThread.hpp | Multi-line `///`; no `\class` block | +| `DatabaseAccessor` | DatabaseAccessor.hpp | `///` only; no `\class` block | +| `AsyncDatabaseTask` | AsyncDatabaseAccessor.hpp | `///` only; no `\struct` block | +| `AsyncDatabaseAccessHandler` | AsyncDatabaseAccessor.hpp | `///` only; no `\class` block | +| `AsyncDatabaseAccessor` | AsyncDatabaseAccessor.hpp | `///` only; no `\class` block | +| `ThreadMerger` | ThreadMerger.hpp | `///` only; no `\class` block | +| `PipelineSnooper` | PipelineSnooper.hpp | No Doxygen | + +--- + +## include/simdb/Exceptions.hpp + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| `DBException` | Exceptions.hpp | One-line `///` only | +| `SafeTransactionSilentException` | Exceptions.hpp | One-line `///` only | + +--- + +## include/simdb/apps + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| `App` | App.hpp | `///` only; no `\class` block | +| `AppFactoryBase` | App.hpp | No Doxygen | +| `AppFactory` | App.hpp | No Doxygen | +| `AppRegistrationBase` | AppManager.hpp | No Doxygen | +| `AppRegistration` | AppManager.hpp | No Doxygen | +| `AppManager` | AppManager.hpp | Long method docs; class has only `///` | +| `AppManagers` | AppManager.hpp | `///` only; no `\class` block | +| `AppRegistrations` | AppManager.hpp | One-line `///` only | + +--- + +## include/simdb/apps/argos + +| Class / struct | File | Current docs | +|----------------|------|--------------| +| `TreeNode` | TreeNode.hpp | `///` multi-line; could add `\struct` block | +| `ArgosRecord` | CollectionPoints.hpp | `///` on struct; no `\struct` block | +| `TickReader` | CollectionPoints.hpp | No class-level Doxygen | +| `CollectionPointBase` | CollectionPoints.hpp | `///` only; no `\class` block | +| `CollectionPoint` | CollectionPoints.hpp | No class-level Doxygen (inherits from CollectionPointBase) | +| `ContigIterableCollectionPoint` | CollectionPoints.hpp | No class-level Doxygen | +| `SparseIterableCollectionPoint` | CollectionPoints.hpp | No class-level Doxygen | + +--- + +## Table.hpp copy-paste fix + +In **include/simdb/sqlite/Table.hpp**, the block before `SqlColumns` and the block before `SqlValues` both say `\class SqlTable`. Those should be corrected to `\class SqlColumns` and `\class SqlValues` respectively. + +--- + +## Summary counts + +- **Completed:** 1 (SqlBlob). +- **sqlite:** 1 class with full block; 10 with minimal (PreparedINSERT + ValueContainer family). +- **schema:** 0 remaining. +- **utils:** 3 (ThreadSafeLogger, RunningMean, SelfProfiler). +- **pipeline:** 24 classes/templates with minimal or no class-level docs. +- **Exceptions:** 2 with one-liner only. +- **apps:** 8 (App, AppFactoryBase, AppFactory, AppRegistrationBase, AppRegistration, AppManager, AppManagers, AppRegistrations). +- **apps/argos:** 7 (TreeNode, ArgosRecord, TickReader, CollectionPointBase, CollectionPoint, ContigIterableCollectionPoint, SparseIterableCollectionPoint). + +**Total remaining:** ~55 class/struct entries (excluding the Table.hpp copy-paste fix). diff --git a/include/simdb/sqlite/PreparedINSERT.hpp b/include/simdb/sqlite/PreparedINSERT.hpp index 33b9214f..ce4f0681 100644 --- a/include/simdb/sqlite/PreparedINSERT.hpp +++ b/include/simdb/sqlite/PreparedINSERT.hpp @@ -8,11 +8,18 @@ namespace simdb { -/// This class is used for high-volume table inserts with reusable prepared -/// statements. +/*! + * \class PreparedINSERT + * + * \brief Reusable prepared INSERT for high-volume table inserts. Obtained from + * DatabaseManager::prepareINSERT(); use setColumnValue() for each + * column (by 0-based index) then createRecord() to insert a row. + * Column types must match the table schema. + */ class PreparedINSERT { public: + /// \brief Internal constructor; use DatabaseManager::prepareINSERT() to obtain instances. PreparedINSERT(SQLitePreparedStatement&& stmt, const std::vector& col_dtypes, std::shared_ptr db_conn) : prepared_stmt_(std::move(stmt)), @@ -22,9 +29,9 @@ class PreparedINSERT { } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param ival Value to set for the column. + /// \brief Set the value for the given column index (int32_t). + /// \param col_idx 0-based column index. + /// \param ival Value to set for the column. void setColumnValue(uint32_t col_idx, const int32_t ival) { if (col_dtypes_.at(col_idx) != SqlDataType::int32_t) @@ -35,9 +42,9 @@ class PreparedINSERT sqlite3_bind_int(stmt_, (int32_t)col_idx + 1, ival); } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param ival Value to set for the column. + /// \brief Set the value for the given column index (uint32_t). + /// \param col_idx 0-based column index. + /// \param ival Value to set for the column. void setColumnValue(uint32_t col_idx, const uint32_t ival) { if (col_dtypes_.at(col_idx) != SqlDataType::uint32_t) @@ -48,9 +55,9 @@ class PreparedINSERT sqlite3_bind_int64(stmt_, (int32_t)col_idx + 1, static_cast(ival)); } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param ival Value to set for the column. + /// \brief Set the value for the given column index (int64_t). + /// \param col_idx 0-based column index. + /// \param ival Value to set for the column. void setColumnValue(uint32_t col_idx, const int64_t ival) { if (col_dtypes_.at(col_idx) != SqlDataType::int64_t) @@ -61,9 +68,9 @@ class PreparedINSERT sqlite3_bind_int64(stmt_, (int32_t)col_idx + 1, ival); } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param ival Value to set for the column. + /// \brief Set the value for the given column index (uint64_t). + /// \param col_idx 0-based column index. + /// \param ival Value to set for the column. void setColumnValue(uint32_t col_idx, const uint64_t ival) { if (col_dtypes_.at(col_idx) != SqlDataType::uint64_t) @@ -75,10 +82,10 @@ class PreparedINSERT sqlite3_bind_text16(stmt_, (int32_t)col_idx + 1, u16.data(), 40, SQLITE_TRANSIENT); } - /// @brief Set the value for the given column index. - /// @tparam ColumnT double or float - /// @param col_idx 0-based column index. - /// @param dval Value to set for the column. + /// \brief Set the value for the given column index (double/float). + /// \tparam ColumnT double or float + /// \param col_idx 0-based column index. + /// \param dval Value to set for the column. template typename std::enable_if, void>::type setColumnValue(uint32_t col_idx, const ColumnT dval) @@ -91,10 +98,10 @@ class PreparedINSERT sqlite3_bind_double(stmt_, (int32_t)col_idx + 1, dval); } - /// @brief Set the value for the given column index. - /// @tparam T blob element type - /// @param col_idx 0-based column index. - /// @param blobval Value to set for the column. + /// \brief Set the value for the given column index (blob from std::vector). + /// \tparam T Blob element type. + /// \param col_idx 0-based column index. + /// \param blobval Value to set for the column. template void setColumnValue(uint32_t col_idx, const std::vector& blobval) { if (col_dtypes_.at(col_idx) != SqlDataType::blob_t) @@ -105,9 +112,9 @@ class PreparedINSERT sqlite3_bind_blob(stmt_, (int32_t)col_idx + 1, blobval.data(), blobval.size() * sizeof(T), 0); } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param blobval Value to set for the column. + /// \brief Set the value for the given column index (SqlBlob). + /// \param col_idx 0-based column index. + /// \param blobval Value to set for the column. void setColumnValue(uint32_t col_idx, const SqlBlob& blobval) { if (col_dtypes_.at(col_idx) != SqlDataType::blob_t) @@ -118,9 +125,9 @@ class PreparedINSERT sqlite3_bind_blob(stmt_, (int32_t)col_idx + 1, blobval.data_ptr, (int32_t)blobval.num_bytes, 0); } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param strval Value to set for the column. + /// \brief Set the value for the given column index (std::string). + /// \param col_idx 0-based column index. + /// \param strval Value to set for the column. void setColumnValue(uint32_t col_idx, const std::string& strval) { if (col_dtypes_.at(col_idx) != SqlDataType::string_t) @@ -131,9 +138,9 @@ class PreparedINSERT sqlite3_bind_text(stmt_, (int32_t)col_idx + 1, strval.c_str(), -1, SQLITE_TRANSIENT); } - /// @brief Set the value for the given column index. - /// @param col_idx 0-based column index. - /// @param strval Value to set for the column. + /// \brief Set the value for the given column index (const char*). + /// \param col_idx 0-based column index. + /// \param strval Value to set for the column. void setColumnValue(uint32_t col_idx, const char* strval) { if (col_dtypes_.at(col_idx) != SqlDataType::string_t) @@ -144,16 +151,11 @@ class PreparedINSERT sqlite3_bind_text(stmt_, (int32_t)col_idx + 1, strval, -1, SQLITE_STATIC); } - /// @brief Execute the prepared INSERT statement and create a new record. - /// @param clear_bindings If true, clears all bound values after executing - /// the INSERT. If false, the bound values remain for the next INSERT unless - /// explicitly changed by calling setColumnValue() again. For any previously - /// bound values that are not changed, the same values will be used for the - /// next INSERT, which requires that the user is careful to ensure that the - /// bound values are valid for each INSERT. If they went out of scope or - /// were otherwise invalidated, the INSERT will likely fail if not cause a - /// crash. - /// @return The row ID of the newly inserted record. + /// \brief Execute the prepared INSERT and return the new row ID. + /// \param clear_bindings If true, clears all bound values after the INSERT. + /// If false, bound values are reused for the next createRecord(); + /// ensure they remain valid (e.g. not out of scope). + /// \return The row ID of the newly inserted record. int createRecord(bool clear_bindings = true) { auto rc = SQLiteReturnCode(sqlite3_step(stmt_)); diff --git a/include/simdb/sqlite/ValueContainer.hpp b/include/simdb/sqlite/ValueContainer.hpp index 41078f9d..f22e677c 100644 --- a/include/simdb/sqlite/ValueContainer.hpp +++ b/include/simdb/sqlite/ValueContainer.hpp @@ -25,7 +25,11 @@ class ValueContainerBase virtual int32_t bind(sqlite3_stmt* stmt, int32_t col_idx) const = 0; }; -/// Bind an int32_t to an INSERT prepared statement. +/*! + * \class Integral32ValueContainer + * + * \brief Binds an int32_t value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class Integral32ValueContainer : public ValueContainerBase { public: @@ -40,7 +44,11 @@ class Integral32ValueContainer : public ValueContainerBase int32_t val_; }; -/// Bind a uint32_t to an INSERT prepared statement. +/*! + * \class IntegralU32ValueContainer + * + * \brief Binds a uint32_t value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class IntegralU32ValueContainer : public ValueContainerBase { public: @@ -58,7 +66,11 @@ class IntegralU32ValueContainer : public ValueContainerBase uint32_t val_; }; -/// Bind an int64_t to an INSERT prepared statement. +/*! + * \class Integral64ValueContainer + * + * \brief Binds an int64_t value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class Integral64ValueContainer : public ValueContainerBase { public: @@ -73,7 +85,11 @@ class Integral64ValueContainer : public ValueContainerBase int64_t val_; }; -/// Bind a uint64_t to an INSERT prepared statement. +/*! + * \class IntegralU64ValueContainer + * + * \brief Binds a uint64_t value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class IntegralU64ValueContainer : public ValueContainerBase { public: @@ -91,7 +107,11 @@ class IntegralU64ValueContainer : public ValueContainerBase std::u16string u16_; }; -/// Bind a double to an INSERT prepared statement. +/*! + * \class FloatingPointValueContainer + * + * \brief Binds a double value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class FloatingPointValueContainer : public ValueContainerBase { public: @@ -109,7 +129,11 @@ class FloatingPointValueContainer : public ValueContainerBase double val_; }; -/// Bind a string to an INSERT prepared statement. +/*! + * \class StringValueContainer + * + * \brief Binds a std::string value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class StringValueContainer : public ValueContainerBase { public: @@ -127,7 +151,11 @@ class StringValueContainer : public ValueContainerBase std::string val_; }; -/// Bind a blob to an INSERT prepared statement. +/*! + * \class BlobValueContainer + * + * \brief Binds a SqlBlob value to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + */ class BlobValueContainer : public ValueContainerBase { public: @@ -145,7 +173,12 @@ class BlobValueContainer : public ValueContainerBase SqlBlob val_; }; -/// Bind a blob to an INSERT prepared statement. +/*! + * \class VectorValueContainer + * + * \brief Binds a std::vector as a blob to an INSERT prepared statement (SQL_VALUES / PreparedINSERT). + * \tparam T Element type of the vector; stored as raw bytes in the BLOB column. + */ template class VectorValueContainer : public ValueContainerBase { public: From 1598a2b4d172cb7685d9849bf41715b0c9e29910 Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 15:29:04 -0800 Subject: [PATCH 3/9] Doxygen: RunningMean.hpp, ThreadSafeLogger.hpp, and TickTock.hpp --- include/simdb/utils/RunningMean.hpp | 18 ++++++++++------- include/simdb/utils/ThreadSafeLogger.hpp | 25 ++++++++++++++++++++++-- include/simdb/utils/TickTock.hpp | 23 +++++++++++++++++++++- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/include/simdb/utils/RunningMean.hpp b/include/simdb/utils/RunningMean.hpp index b9d1ba53..4c78362e 100644 --- a/include/simdb/utils/RunningMean.hpp +++ b/include/simdb/utils/RunningMean.hpp @@ -6,24 +6,28 @@ namespace simdb { -/// This classes uses Welford's method to compute a running -/// mean of a series of values given one at a time (without -/// requiring all values to be stored in memory). +/*! + * \class RunningMean + * + * \brief Computes a running mean of a stream of values using Welford's method. + * Values are supplied one at a time via add(); no need to store all + * values in memory. + */ class RunningMean { public: - /// Update the running average with a new value + /// \brief Update the running average with a new value (Welford's method). + /// \param value Next value to include in the mean. void add(double value) { - // Welford's method to update the mean count_++; mean_ += (value - mean_) / count_; } - /// Get the current running average + /// \brief Return the current running average. double mean() const { return mean_; } - /// Get the number of values added + /// \brief Return the number of values added so far. uint64_t count() const { return count_; } private: diff --git a/include/simdb/utils/ThreadSafeLogger.hpp b/include/simdb/utils/ThreadSafeLogger.hpp index 2d5917a3..959fb41a 100644 --- a/include/simdb/utils/ThreadSafeLogger.hpp +++ b/include/simdb/utils/ThreadSafeLogger.hpp @@ -11,17 +11,29 @@ namespace simdb { +/*! + * \class ThreadSafeLogger + * + * \brief Thread-safe logger that serializes writes to an ostream or file. Use + * protect() to obtain a Guard; stream into the Guard, then on destruction + * the line is written under a lock. Optional "[log] " prefix per line. + */ class ThreadSafeLogger { public: - // Construct from existing ostream (cout, cerr, custom stream) + /// \brief Construct from an existing ostream (e.g. std::cout, std::cerr). + /// \param os Output stream to write to (caller keeps ownership). + /// \param prefix If true, prepend "[log] " to each line. explicit ThreadSafeLogger(std::ostream& os, bool prefix = false) : out_(&os), prefix_(prefix ? "[log] " : "") { } - // Construct from file name (logger owns the file) + /// \brief Construct from a file path; the logger owns and opens the file. + /// \param filename Path to the log file. + /// \param prefix If true, prepend "[log] " to each line. + /// \throws std::runtime_error if the file cannot be opened. explicit ThreadSafeLogger(const std::string& filename, bool prefix = false) : owned_file_(std::make_unique(filename)), out_(owned_file_.get()), @@ -33,6 +45,12 @@ class ThreadSafeLogger } } + /*! + * \class Guard + * + * \brief RAII handle that buffers streamed output and flushes it to the + * logger (under lock) on destruction. Use via ThreadSafeLogger::protect(). + */ class Guard { public: @@ -48,12 +66,14 @@ class ThreadSafeLogger logger_.out_->flush(); } + /// \brief Stream a value into this line's buffer. template Guard& operator<<(T&& value) { buffer_ << std::forward(value); return *this; } + /// \brief Stream an ostream manipulator (e.g. std::endl) into this line's buffer. Guard& operator<<(std::ostream& (*manip)(std::ostream&)) { buffer_ << manip; @@ -65,6 +85,7 @@ class ThreadSafeLogger std::ostringstream buffer_; }; + /// \brief Return a Guard that buffers one line; when it is destroyed, the line is written under lock. Guard protect() const { return Guard(*this); } private: diff --git a/include/simdb/utils/TickTock.hpp b/include/simdb/utils/TickTock.hpp index 692b8545..379aa34f 100644 --- a/include/simdb/utils/TickTock.hpp +++ b/include/simdb/utils/TickTock.hpp @@ -11,15 +11,24 @@ namespace simdb::utils { +/*! + * \class SelfProfiler + * + * \brief Singleton that records per-method elapsed time (seconds) and prints + * a summary on destruction. Use profile(method_name) to get a MethodTimer + * that records one call, or PROFILE_METHOD / PROFILE_BLOCK() in code. + */ class SelfProfiler { public: + /// \brief Return the singleton instance. static SelfProfiler* getInstance() { static SelfProfiler profiler; return &profiler; } + /// \brief On destruction, print a report of method names and average times (sorted by avg). ~SelfProfiler() { if (results_.empty()) @@ -54,15 +63,23 @@ class SelfProfiler } } + /*! + * \class MethodTimer + * + * \brief RAII timer that records elapsed time for one invocation of a named + * method/block and reports it to the SelfProfiler on destruction. + */ class MethodTimer { public: - MethodTimer(const char* method_name) : + /// \brief Start timing; \p method_name is used in the final report. + explicit MethodTimer(const char* method_name) : method_name_(method_name), start_time_(now_()) { } + /// \brief Record elapsed time since construction with SelfProfiler. ~MethodTimer() { auto end_time = now_(); @@ -80,6 +97,8 @@ class SelfProfiler TimeT start_time_; }; + /// \brief Create a timer for the given method/block name; hold until scope exit to record. + /// \param method_name Label for this timing in the profiler report. MethodTimer profile(const char* method_name) const { return MethodTimer(method_name); } private: @@ -110,7 +129,9 @@ class SelfProfiler #define CONCAT(a, b) CONCAT_INNER(a, b) #define CONCAT_INNER(a, b) a##b +/// \brief Profile a named block; declare at start of block; time is recorded when scope exits. #define PROFILE_BLOCK(block_name) \ auto CONCAT(__block_timer_, __COUNTER__) = simdb::utils::SelfProfiler::getInstance()->profile(block_name); +/// \brief Profile the current function (uses __FUNCTION__ as the name). #define PROFILE_METHOD PROFILE_BLOCK(__FUNCTION__) From 50dea369754f07fb2173649dcd2f0e45755179a3 Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 15:39:31 -0800 Subject: [PATCH 4/9] Doxygen: PipelineManager.hpp, PollingThread.hpp, Runnable.hpp, and Stage.hpp --- include/simdb/pipeline/PipelineManager.hpp | 36 ++++++++++++---- include/simdb/pipeline/PollingThread.hpp | 31 +++++++++++--- include/simdb/pipeline/Runnable.hpp | 50 ++++++++++++---------- include/simdb/pipeline/Stage.hpp | 32 +++++++++++--- 4 files changed, 106 insertions(+), 43 deletions(-) diff --git a/include/simdb/pipeline/PipelineManager.hpp b/include/simdb/pipeline/PipelineManager.hpp index 1d0f5a0d..450e3c05 100644 --- a/include/simdb/pipeline/PipelineManager.hpp +++ b/include/simdb/pipeline/PipelineManager.hpp @@ -16,16 +16,25 @@ class App; namespace simdb::pipeline { -/// This class manages all pipelines and their threads for an AppManager (or -/// unit test). +/*! + * \class PipelineManager + * + * \brief Manages all Pipeline instances and their PollingThreads for an + * AppManager (or unit test). Creates pipelines, merges threads + * (minimizeThreads), opens threads, and provides async DB access. + */ class PipelineManager { public: + /// \brief Construct with the DatabaseManager to be used by the pipelines. + /// \param db_mgr Non-null DatabaseManager to be used by the pipelines. PipelineManager(DatabaseManager* db_mgr) : db_mgr_(db_mgr) { } + /// \brief Return the AsyncDatabaseAccessor for async DB work; only valid after openPipelines(). + /// \throws DBException if called before openPipelines(). AsyncDatabaseAccessor* getAsyncDatabaseAccessor() { checkOpen_(); @@ -37,6 +46,10 @@ class PipelineManager return async_db_accessor_; } + /// \brief Create and own a new Pipeline with the given name and owning App. + /// \param name Pipeline name. Only used for reporting purposes. + /// \param app The App that owns this pipeline. + /// \return Raw pointer to the new Pipeline (manager retains ownership). Pipeline* createPipeline(const std::string& name, const App* app) { checkOpen_(); @@ -45,6 +58,7 @@ class PipelineManager return pipelines_.back().get(); } + /// \brief Return pointers to all created pipelines. std::vector getPipelines() { checkOpen_(); @@ -57,12 +71,16 @@ class PipelineManager return pipelines; } + /// \brief Create a snooper for iterating stages with a key and snooped object type. + /// \return Unique_ptr to a PipelineSnooper. template std::unique_ptr> createSnooper() { return std::make_unique>(this); } + /// \brief Merge all apps' pipeline threads into a minimal set; call at most once. + /// \throws DBException if called more than once. void minimizeThreads() { if (thread_merger_) @@ -74,6 +92,7 @@ class PipelineManager thread_merger_->mergeAllAppThreads(); } + /// \brief Mark one app's pipeline threads for merging (call before openPipelines()). void minimizeThreads(const App* app) { if (!thread_merger_) @@ -83,6 +102,7 @@ class PipelineManager thread_merger_->addAppForMerging(app); } + /// \brief Mark multiple apps' pipeline threads for merging (variadic). template void minimizeThreads(const App* app, Apps&&... rest) { if (!thread_merger_) @@ -93,6 +113,7 @@ class PipelineManager minimizeThreads(std::forward(rest)...); } + /// \brief Create and open all polling threads (after stages are added and optionally minimizeThreads). void openPipelines() { checkOpen_(); @@ -139,13 +160,9 @@ class PipelineManager threads_opened_ = true; } - /// Use this API to temporarily disable all pipeline tasks. - /// The pipelines will be re-enabled when the returned object - /// goes out of scope. - /// - /// Note that recursive calls to this method are no-ops. Only - /// the first call will disable the runnables; nested calls - /// will return a nullptr. + /// \brief Temporarily disable all pipeline runnables (and optionally pause threads); re-enabled when the returned object is destroyed. + /// \param disable_threads_too If true, also pause polling threads; if false, only disable runnables. + /// \return A ScopedRunnableDisabler, or nullptr if a disabler is already active (nested calls are no-ops). std::unique_ptr scopedDisableAll(bool disable_threads_too = true) { if (disabler_active_) @@ -169,6 +186,7 @@ class PipelineManager return disabler; } + /// \brief Close all threads, flush runnables, and print performance reports; marks manager as closed. void postSimLoopTeardown() { checkOpen_(); diff --git a/include/simdb/pipeline/PollingThread.hpp b/include/simdb/pipeline/PollingThread.hpp index 3f47c280..0c906af0 100644 --- a/include/simdb/pipeline/PollingThread.hpp +++ b/include/simdb/pipeline/PollingThread.hpp @@ -17,14 +17,18 @@ namespace simdb::pipeline { -/// Timer thread which "polls" its runnables for any activity. Goes back -/// to sleep for a fixed amount of time before polling again. +/*! + * \class PollingThread + * + * \brief Thread that repeatedly polls its Runnables for work; when none do + * work, it sleeps for a fixed interval before polling again. Supports + * pause/resume and performance reporting. Base for DatabaseThread. + */ class PollingThread { public: - /// Create a thread with an "interval" in milliseconds. This value says - /// how long the thread should sleep if none of its Runnables had any - /// work to do. + /// \brief Create a thread with the given sleep interval when no work is done. + /// \param interval_milliseconds How long to sleep when no Runnable had work (default 100ms). PollingThread(const size_t interval_milliseconds = 100) : interval_ms_(interval_milliseconds) { @@ -32,8 +36,11 @@ class PollingThread virtual ~PollingThread() noexcept = default; + /// \brief Return the sleep interval in milliseconds (when no work is done). size_t getIntervalMilliseconds() const { return interval_ms_; } + /// \brief Add a Runnable to this thread; must not be called while the thread is running. + /// \throws DBException if called while the thread is running. void addRunnable(Runnable* runnable) { if (is_running_) @@ -43,10 +50,14 @@ class PollingThread runnables_.emplace_back(runnable); } + /// \brief Return the Runnables on this thread. const std::vector& getRunnables() const { return runnables_; } + /// \brief Return the number of Runnables on this thread. size_t getNumRunnables() const { return runnables_.size(); } + /// \brief Reorder this thread's Runnables to match the order in \p runnables (only those + /// that belong to this thread). void ensureRelativeOrder(const std::vector& runnables) { const std::set my_runnables(runnables_.begin(), runnables_.end()); @@ -61,6 +72,7 @@ class PollingThread std::swap(ordered_runnables, runnables_); } + /// \brief Call processAll(true) on all enabled Runnables; return true if any did work. virtual bool flushRunnables() { bool did_work = false; @@ -79,6 +91,7 @@ class PollingThread return did_work; } + /// \brief Start the polling thread (must have at least one Runnable). virtual void open() { if (runnables_.empty()) @@ -96,6 +109,8 @@ class PollingThread } } + /// \brief Stop the thread and join. + /// \note Meant to be called from the main thread. virtual void close() noexcept { if (!thread_) @@ -118,6 +133,7 @@ class PollingThread thread_.reset(); } + /// \brief Pause the polling loop; blocks until the thread has acknowledged it is paused. void pause() { if (!is_running_ || paused_) @@ -140,8 +156,10 @@ class PollingThread paused_promise_.get_future().wait(); } + /// \brief Return true if the thread is currently paused. bool paused() { return paused_; } + /// \brief Resume the polling loop after a pause. void resume() { if (!is_running_ || !paused_) @@ -154,9 +172,10 @@ class PollingThread paused_ = false; } - pause_cv_.notify_all(); // Wake the thread to resume + pause_cv_.notify_all(); } + /// \brief Print a performance report (sleep vs work %) for this thread. void printPerfReport() const noexcept { if (runnables_.empty()) diff --git a/include/simdb/pipeline/Runnable.hpp b/include/simdb/pipeline/Runnable.hpp index efb82ac6..8f036b68 100644 --- a/include/simdb/pipeline/Runnable.hpp +++ b/include/simdb/pipeline/Runnable.hpp @@ -16,51 +16,52 @@ namespace simdb::pipeline { class PipelineManager; class PollingThread; -/// Various outcomes for each processOne/processAll calls to a runnable: +/// \brief Outcome of processOne() / processAll(): whether the runnable did work or the thread may sleep. enum PipelineAction { - // Return if the runnable (task) pushed any data to its output queue, - // or otherwise should leave the pipeline tasks greedily executing - // as normal. + /// Runnable did work (or should keep the thread busy); thread continues without sleeping. PROCEED, - - // Return if the runnable had no data to consume or otherwise should - // tell the pipeline thread to go back to sleep for a bit. Note that - // ALL runnables on a PollingThread need to return SLEEP in order for - // the thread to go back to sleep. If any runnable returns PROCEED, - // the thread will continue processing as normal without sleeping. + /// Runnable had nothing to do; if all runnables on the thread return SLEEP, the thread sleeps. SLEEP }; -/// Base class for all things that can be run on a pipeline thread. +/*! + * \class Runnable + * + * \brief Base class for units of work on a pipeline thread (e.g. Stage). PollingThread + * calls processOne() / processAll(); PROCEED means work was done, SLEEP means + * none. Supports enable/disable and a human-readable description for reporting. + */ class Runnable { public: virtual ~Runnable() = default; - /// Get this runnable's description. + /// \brief Return the runnable's description (for logging/reports); uses set value or getDescription_(). std::string getDescription() const { return !description_.empty() ? description_ : getDescription_(); } - /// Set/overwrite the this runnable's description. + /// \brief Set or overwrite the runnable's description. + /// \param desc Description string. void setDescription(const std::string& desc) { description_ = desc; } - /// Process one item from the input queue, returning true - /// if this runnable did anything. + /// \brief Process one unit of work; return PROCEED if work was done, SLEEP otherwise. + /// \param force If true, run even when there is no input (e.g. for flushing). virtual PipelineAction processOne(bool force) = 0; - /// Flush and process everything from the input queue, - /// returning true if this runnable did anything. + /// \brief Process all pending work; return PROCEED if any work was done, SLEEP otherwise. + /// \param force If true, run until no more work (e.g. for flushing). virtual PipelineAction processAll(bool force) = 0; - /// Print info about this runnable for reporting purposes. + /// \brief Print a one-line description to \p os with \p indent spaces (for perf reports). virtual void print(std::ostream& os, int indent) const { os << std::string(indent, ' ') << getDescription() << "\n"; } - /// Check if this runnable is enabled. + /// \brief Return true if this runnable is enabled (disabled runnables are skipped by the thread). bool enabled() const { return enabled_; } - /// Disable/re-enable this runnable. + /// \brief Enable or disable this runnable. + /// \param enable true to enable, false to disable. void enable(bool enable = true) { enabled_ = enable; } private: @@ -69,8 +70,13 @@ class Runnable bool enabled_ = true; }; -/// RAII utility used to disable all runnables while in scope, -/// re-enabling them when going out of scope. +/*! + * \class ScopedRunnableDisabler + * + * \brief RAII guard that disables the given runnables (and optionally pauses + * the given polling threads) on construction and re-enables/resumes + * them on destruction. Obtained from PipelineManager::scopedDisableAll(). + */ class ScopedRunnableDisabler { public: diff --git a/include/simdb/pipeline/Stage.hpp b/include/simdb/pipeline/Stage.hpp index 116214b3..a45d33c0 100644 --- a/include/simdb/pipeline/Stage.hpp +++ b/include/simdb/pipeline/Stage.hpp @@ -13,15 +13,20 @@ namespace simdb::pipeline { +/*! + * \class Stage + * + * \brief Base class for pipeline stages (Runnables with input/output ports + * and an optional polling interval). Subclass to implement run_(); use + * addInPort_/addOutPort_ in derived constructors to define queues. + * The interval is the PollingThread sleep time when no work is done. + */ class Stage : public Runnable { protected: - /// Note that the interval_milliseconds will inform the associated - /// PollingThread to sleep for this amount of time when there is nothing to - /// do on the thread. Overriding the interval is only available for - /// non-database stages. If you intend to call - /// AppManager::minimizeThreads(), then all non-database stages must agree - /// on the interval for their shared PollingThread. + /// \brief Construct with the polling interval (ms) for the thread when no work is done. + /// \param interval_milliseconds Sleep time for the PollingThread; non-database stages + /// that share a thread must use the same interval. Stage(size_t interval_milliseconds = 100) : interval_milliseconds_(interval_milliseconds) { @@ -90,6 +95,13 @@ class Stage : public Runnable friend class Flusher; }; +/*! + * \class DatabaseStageBase + * + * \brief Base for stages that run on the dedicated DatabaseThread and use + * DatabaseAccessor (getDatabaseManager_(), getTableInserter_()). Do not + * use getAsyncDatabaseAccessor_() from a DatabaseStage. + */ class DatabaseStageBase : public Stage { protected: @@ -101,6 +113,14 @@ class DatabaseStageBase : public Stage } }; +/*! + * \class DatabaseStage + * + * \brief Concrete base for app-specific database stages. Provides + * getDatabaseManager_() and getTableInserter_() keyed by \p AppT. + * Derive from DatabaseStage and implement run_(). + * \tparam AppT The App type (for schema and table inserters). + */ template class DatabaseStage : public DatabaseStageBase { protected: From e51b6c16bfcbe9a6b0e381ae9bb33f463aab392a Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 16:31:17 -0800 Subject: [PATCH 5/9] Doxygen: Flusher.hpp, Pipeline.hpp, Queue.hpp, and QueueRepo.hpp --- docs/doxygen-burndown.md | 130 --------------------------- include/simdb/pipeline/Flusher.hpp | 26 ++++-- include/simdb/pipeline/Pipeline.hpp | 48 ++++++++-- include/simdb/pipeline/Queue.hpp | 30 ++++--- include/simdb/pipeline/QueueRepo.hpp | 81 +++++++++++++++-- 5 files changed, 155 insertions(+), 160 deletions(-) delete mode 100644 docs/doxygen-burndown.md diff --git a/docs/doxygen-burndown.md b/docs/doxygen-burndown.md deleted file mode 100644 index d38fc228..00000000 --- a/docs/doxygen-burndown.md +++ /dev/null @@ -1,130 +0,0 @@ -# Doxygen documentation burndown list - -Classes/structs below have minimal or no Doxygen compared to the rest of the codebase (e.g. no `\class` block or only a one-line `///`). Target: add a `/*! \class Name \brief ... */` (or `\struct`) block and, where useful, `\param` / `\return` on public API. - -**Reference style** (well-documented): `DatabaseManager`, `Column`, `Table`, `Schema`, `Connection`, `Transaction`, `SqlQuery`, `SqlRecord`, `ResultWriterBase` and all `ResultWriter*` in Iterator.hpp, `CollectionBuffer`, `InterruptException`, `SqlBlob` (Blob.hpp). - ---- - -## include/simdb/sqlite - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| `PreparedINSERT` | PreparedINSERT.hpp | One-line `///` only; no `\class` block | -| `ValueContainerBase` | ValueContainer.hpp | Has full block ✓ | -| `Integral32ValueContainer` | ValueContainer.hpp | One-line `///` only | -| `IntegralU32ValueContainer` | ValueContainer.hpp | One-line `///` only | -| `Integral64ValueContainer` | ValueContainer.hpp | One-line `///` only | -| `IntegralU64ValueContainer` | ValueContainer.hpp | One-line `///` only | -| `FloatingPointValueContainer` | ValueContainer.hpp | One-line `///` only | -| `StringValueContainer` | ValueContainer.hpp | One-line `///` only | -| `BlobValueContainer` | ValueContainer.hpp | One-line `///` only | -| `VectorValueContainer` | ValueContainer.hpp | One-line `///` only | - ---- - -## include/simdb/schema - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| *(none remaining)* | | | - ---- - -## include/simdb/utils - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| `ThreadSafeLogger` | ThreadSafeLogger.hpp | No Doxygen; `//` comments only. Inner `Guard` also undocumented | -| `RunningMean` | RunningMean.hpp | `///` on class and methods; no `\class` block | -| `SelfProfiler` | TickTock.hpp | No Doxygen at all | - ---- - -## include/simdb/pipeline - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| `PollingThread` | PollingThread.hpp | `///` only; no `\class` block | -| `PipelineManager` | PipelineManager.hpp | `///` only; no `\class` block | -| `Stage` | Stage.hpp | No class-level Doxygen | -| `DatabaseStageBase` | Stage.hpp | No class-level Doxygen | -| `DatabaseStage` | Stage.hpp | No class-level Doxygen | -| `Runnable` | Runnable.hpp | `///` only; no `\class` block | -| `ScopedRunnableDisabler` | Runnable.hpp | `///` only; no `\class` block | -| `QueueBase` | Queue.hpp | `///` only; no `\class` block | -| `Queue` | Queue.hpp | `///` only; no `\class` block | -| `QueuePlaceholder` | QueueRepo.hpp | No class-level docs | -| `InputQueuePlaceholder` | QueueRepo.hpp | Only constructor `///` | -| `OutputQueuePlaceholder` | QueueRepo.hpp | Only constructor `///` | -| `StageQueueRepo` | QueueRepo.hpp | No class-level docs | -| `PipelineQueueRepo` | QueueRepo.hpp | No class-level docs | -| `Pipeline` | Pipeline.hpp | Multi-line `///` but no `\class` block | -| `Flusher` | Flusher.hpp | `///` only; no `\class` block | -| `FlusherWithTransaction` | Flusher.hpp | `///` only; no `\class` block | -| `DatabaseThread` | DatabaseThread.hpp | Multi-line `///`; no `\class` block | -| `DatabaseAccessor` | DatabaseAccessor.hpp | `///` only; no `\class` block | -| `AsyncDatabaseTask` | AsyncDatabaseAccessor.hpp | `///` only; no `\struct` block | -| `AsyncDatabaseAccessHandler` | AsyncDatabaseAccessor.hpp | `///` only; no `\class` block | -| `AsyncDatabaseAccessor` | AsyncDatabaseAccessor.hpp | `///` only; no `\class` block | -| `ThreadMerger` | ThreadMerger.hpp | `///` only; no `\class` block | -| `PipelineSnooper` | PipelineSnooper.hpp | No Doxygen | - ---- - -## include/simdb/Exceptions.hpp - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| `DBException` | Exceptions.hpp | One-line `///` only | -| `SafeTransactionSilentException` | Exceptions.hpp | One-line `///` only | - ---- - -## include/simdb/apps - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| `App` | App.hpp | `///` only; no `\class` block | -| `AppFactoryBase` | App.hpp | No Doxygen | -| `AppFactory` | App.hpp | No Doxygen | -| `AppRegistrationBase` | AppManager.hpp | No Doxygen | -| `AppRegistration` | AppManager.hpp | No Doxygen | -| `AppManager` | AppManager.hpp | Long method docs; class has only `///` | -| `AppManagers` | AppManager.hpp | `///` only; no `\class` block | -| `AppRegistrations` | AppManager.hpp | One-line `///` only | - ---- - -## include/simdb/apps/argos - -| Class / struct | File | Current docs | -|----------------|------|--------------| -| `TreeNode` | TreeNode.hpp | `///` multi-line; could add `\struct` block | -| `ArgosRecord` | CollectionPoints.hpp | `///` on struct; no `\struct` block | -| `TickReader` | CollectionPoints.hpp | No class-level Doxygen | -| `CollectionPointBase` | CollectionPoints.hpp | `///` only; no `\class` block | -| `CollectionPoint` | CollectionPoints.hpp | No class-level Doxygen (inherits from CollectionPointBase) | -| `ContigIterableCollectionPoint` | CollectionPoints.hpp | No class-level Doxygen | -| `SparseIterableCollectionPoint` | CollectionPoints.hpp | No class-level Doxygen | - ---- - -## Table.hpp copy-paste fix - -In **include/simdb/sqlite/Table.hpp**, the block before `SqlColumns` and the block before `SqlValues` both say `\class SqlTable`. Those should be corrected to `\class SqlColumns` and `\class SqlValues` respectively. - ---- - -## Summary counts - -- **Completed:** 1 (SqlBlob). -- **sqlite:** 1 class with full block; 10 with minimal (PreparedINSERT + ValueContainer family). -- **schema:** 0 remaining. -- **utils:** 3 (ThreadSafeLogger, RunningMean, SelfProfiler). -- **pipeline:** 24 classes/templates with minimal or no class-level docs. -- **Exceptions:** 2 with one-liner only. -- **apps:** 8 (App, AppFactoryBase, AppFactory, AppRegistrationBase, AppRegistration, AppManager, AppManagers, AppRegistrations). -- **apps/argos:** 7 (TreeNode, ArgosRecord, TickReader, CollectionPointBase, CollectionPoint, ContigIterableCollectionPoint, SparseIterableCollectionPoint). - -**Total remaining:** ~55 class/struct entries (excluding the Table.hpp copy-paste fix). diff --git a/include/simdb/pipeline/Flusher.hpp b/include/simdb/pipeline/Flusher.hpp index 7dd8cf95..0bec1ce9 100644 --- a/include/simdb/pipeline/Flusher.hpp +++ b/include/simdb/pipeline/Flusher.hpp @@ -7,11 +7,17 @@ namespace simdb::pipeline { -/// This class serves as a utility to flush pipeline stages -/// in a specific order. Create with Pipeline::createFlusher(). +/*! + * \class Flusher + * + * \brief Utility to run pipeline stages in order until no more work (processAll + * with force). Obtain from Pipeline::createFlusher(). Use when you need + * to drain queues or sync stages outside the normal polling loop. + */ class Flusher { protected: + /// \brief Construct with the ordered list of stages to flush (internal; use Pipeline::createFlusher()). Flusher(const std::vector& stages) : stages_(stages) { @@ -26,6 +32,8 @@ class Flusher public: virtual ~Flusher() = default; + /// \brief Run all stages with processAll(true) in order until none return PROCEED. + /// \return PROCEED if any stage did work, SLEEP otherwise. virtual PipelineAction flush() { PipelineAction outcome = PipelineAction::SLEEP; @@ -52,13 +60,17 @@ class Flusher std::vector stages_; }; -/// This subclass is instantiated by Pipeline::createFlusher() when -/// any of the stages that need flushing are database stages. It only -/// serves to put the flush() inside a BEGIN/COMMIT TRANSACTION block -/// for performance (avoid many small transactions). +/*! + * \class FlusherWithTransaction + * + * \brief Flusher that runs flush() inside a single safeTransaction() when any + * of the stages are DatabaseStages. Used by Pipeline::createFlusher() + * automatically when needed; reduces many small transactions to one. + */ class FlusherWithTransaction : public Flusher { private: + /// \brief Internal constructor (Pipeline::createFlusher() uses this when a database stage is present). FlusherWithTransaction(const std::vector& stages, DatabaseManager* db_mgr) : Flusher(stages), db_mgr_(db_mgr) @@ -82,6 +94,8 @@ class FlusherWithTransaction : public Flusher friend class Pipeline; public: + /// \brief Run flush() inside a single BEGIN/COMMIT transaction. + /// \return PROCEED if any stage did work, SLEEP otherwise. PipelineAction flush() override { auto outcome = PipelineAction::SLEEP; diff --git a/include/simdb/pipeline/Pipeline.hpp b/include/simdb/pipeline/Pipeline.hpp index 53052eaf..1bebded1 100644 --- a/include/simdb/pipeline/Pipeline.hpp +++ b/include/simdb/pipeline/Pipeline.hpp @@ -13,14 +13,23 @@ class DatabaseManager; namespace simdb::pipeline { -/// SimDB pipelines are used to create high-performance multi-stage -/// data processors en route to the database. Unlike other pipeline -/// libraries, SimDB enforces move-only semantics for performance. -/// There are no limitations regarding I/O data type changes from -/// one stage/filter/transform to the next. +/*! + * \class Pipeline + * + * \brief Multi-stage data processor with typed input/output ports between + * stages. Add stages, declare port bindings, then finalize; stages + * run on PollingThreads. Move-only semantics; no restriction on + * I/O type changes between stages. + * + * \note Create via PipelineManager::createPipeline(). + */ class Pipeline { public: + /// \brief Construct a pipeline (typically via PipelineManager::createPipeline()). + /// \param db_mgr DatabaseManager for database stages. + /// \param name Pipeline name. + /// \param app The App that owns this pipeline. Pipeline(DatabaseManager* db_mgr, const std::string& name, const App* app) : db_mgr_(db_mgr), pipeline_name_(name), @@ -28,12 +37,21 @@ class Pipeline { } + /// \brief Return the App that owns this pipeline. const App* getOwningApp() const { return app_; } + /// \brief Return the DatabaseManager used by database stages. DatabaseManager* getDatabaseManager() const { return db_mgr_; } + /// \brief Return the pipeline name. std::string getName() const { return pipeline_name_; } + /// \brief Add a stage; only valid before noMoreStages(). Returns pointer to the new stage. + /// \tparam StageType Stage class (must derive from Stage). + /// \tparam StageCtorArgs Constructor argument types. + /// \param name Unique stage name within this pipeline. + /// \param args Arguments forwarded to the stage constructor. + /// \throws DBException if not accepting stages or stage name already exists. template StageType* addStage(const std::string& name, StageCtorArgs&&... args) { @@ -54,6 +72,8 @@ class Pipeline return static_cast(stage.get()); } + /// \brief Finalize stages and start accepting bindings. + /// \throws DBException if not accepting stages. void noMoreStages() { if (state_ != State::ACCEPTING_STAGES) @@ -69,6 +89,8 @@ class Pipeline state_ = State::ACCEPTING_BINDINGS; } + /// \brief Bind an output port to an input port (full names: "StageName.port"). + /// \throws DBException if not accepting bindings. void bind(const std::string& output_port_full_name, const std::string& input_port_full_name) { if (state_ != State::ACCEPTING_BINDINGS) @@ -78,6 +100,8 @@ class Pipeline queue_repo_.bind(output_port_full_name, input_port_full_name); } + /// \brief Finalize bindings and create queues; required before getInPortQueue/getOutPortQueue. + /// \throws DBException if not accepting bindings. void noMoreBindings() { if (state_ != State::ACCEPTING_BINDINGS) @@ -89,6 +113,8 @@ class Pipeline state_ = State::BINDINGS_COMPLETE; } + /// \brief Return the input port queue; only valid after noMoreBindings(). + /// \tparam T Element type of the queue. template simdb::ConcurrentQueue* getInPortQueue(const std::string& port_full_name) { if (state_ != State::BINDINGS_COMPLETE && state_ != State::FINALIZED) @@ -98,6 +124,8 @@ class Pipeline return queue_repo_.getInPortQueue(port_full_name); } + /// \brief Return the output port queue; only valid after noMoreBindings(). + /// \tparam T Element type of the queue. template simdb::ConcurrentQueue* getOutPortQueue(const std::string& port_full_name) { if (state_ != State::BINDINGS_COMPLETE && state_ != State::FINALIZED) @@ -107,6 +135,9 @@ class Pipeline return queue_repo_.getOutPortQueue(port_full_name); } + /// \brief Assign each stage to a PollingThread (or the shared DatabaseThread); call after noMoreBindings(). + /// \param threads Vector to which new PollingThreads may be appended. + /// \param database_thread Single shared DatabaseThread for all DatabaseStages (created if null). void assignStageThreads(std::vector>& threads, std::unique_ptr& database_thread) { @@ -126,6 +157,10 @@ class Pipeline state_ = State::FINALIZED; } + /// \brief Create a Flusher that runs the given stages (or all in order); wraps in transaction if any stage is a DatabaseStage. + /// \param stage_names Stage names in flush order; if empty, use add order. + /// \return Flusher or FlusherWithTransaction; caller owns. + /// \throws DBException if a stage name does not exist. std::unique_ptr createFlusher(const std::vector& stage_names = {}) { std::vector stages; @@ -153,8 +188,10 @@ class Pipeline : std::unique_ptr(new Flusher(stages)); } + /// \brief Return the AsyncDatabaseAccessor for this pipeline (set by PipelineManager after openPipelines()). AsyncDatabaseAccessor* getAsyncDatabaseAccessor() const { return async_db_accessor_; } + /// \brief Return a map of stage name to Stage* (all stages in this pipeline). std::map getStages() { std::map stages; @@ -165,6 +202,7 @@ class Pipeline return stages; } + /// \brief Return stages in the order they were added (name, Stage* pairs). std::vector> getOrderedStages() { std::vector> ordered_stages; diff --git a/include/simdb/pipeline/Queue.hpp b/include/simdb/pipeline/Queue.hpp index 7813baae..2dd6030b 100644 --- a/include/simdb/pipeline/Queue.hpp +++ b/include/simdb/pipeline/Queue.hpp @@ -9,38 +9,48 @@ namespace simdb::pipeline { -/// Base class for all concurrent queues marshalling -/// data between pipeline threads. +/*! + * \class QueueBase + * + * \brief Type-erased base for concurrent queues that move data between + * pipeline threads. Use Queue for typed access. + */ class QueueBase { public: virtual ~QueueBase() = default; + /// \brief Return a string representation of the queue's value type (e.g. for diagnostics). virtual std::string stringifiedType() const = 0; + /// \brief Return the number of elements currently in the queue. virtual size_t size() const = 0; }; -/// Wrapper around a concurrent queue which is used by -/// pipeline tasks that know their specific input/output -/// types. +/*! + * \class Queue + * + * \brief Typed wrapper around ConcurrentQueue for pipeline input/output + * ports. + * \tparam T Element type of the queue. + */ template class Queue : public QueueBase { public: + /// \brief Mutable access to the underlying ConcurrentQueue. ConcurrentQueue& get() { return queue_; } + /// \brief Const access to the underlying ConcurrentQueue. const ConcurrentQueue& get() const { return queue_; } + /// \brief Return a string representation of the queue's value type (e.g. for diagnostics). std::string stringifiedType() const override { return demangle_type(); } + /// \brief Return the number of elements currently in the queue. size_t size() const override { return queue_.size(); } private: ConcurrentQueue queue_; }; +/// \brief Unique_ptr to a Queue (convenience alias). template using InputQueuePtr = std::unique_ptr>; -template inline InputQueuePtr makeQueue() -{ - return std::make_unique>(); -} - } // namespace simdb::pipeline diff --git a/include/simdb/pipeline/QueueRepo.hpp b/include/simdb/pipeline/QueueRepo.hpp index a3a78b4c..beffad0a 100644 --- a/include/simdb/pipeline/QueueRepo.hpp +++ b/include/simdb/pipeline/QueueRepo.hpp @@ -6,26 +6,40 @@ #include "simdb/pipeline/Queue.hpp" #include -/// The classes in this file are used to create all the required -/// simdb::ConcurrentQueue(s) needed for all apps' pipeline stages. - namespace simdb::pipeline { +/*! + * \class QueuePlaceholder + * + * \brief Abstract placeholder for a pipeline port queue. Used by StageQueueRepo + * and PipelineQueueRepo to create and bind ConcurrentQueues when stages + * are finalized and bindings are applied. + */ class QueuePlaceholder { public: virtual ~QueuePlaceholder() = default; + /// \brief Create a new Queue and return ownership. virtual std::unique_ptr createQueue() = 0; + /// \brief Assign an existing queue to this placeholder (e.g. after binding). virtual void assignQueue(QueueBase* queue) = 0; + /// \brief Return true if a queue has been created or assigned. virtual bool hasQueue() const = 0; }; +/*! + * \class InputQueuePlaceholder + * + * \brief Placeholder for an input port queue; holds a reference to the stage's + * ConcurrentQueue* so the queue can be assigned when bound. + * \tparam T Element type of the queue. + */ template class InputQueuePlaceholder : public QueuePlaceholder { public: - /// Create placeholder with a backpointer to the stage's queue member - /// variable. We will assign the queue to the stage variable when the queue - /// is created. + /// \brief Construct with a reference to the stage's queue pointer (must be null initially). + /// \param queue Reference to the stage's ConcurrentQueue*; set when + /// \throws DBException if \p queue is non-null. InputQueuePlaceholder(ConcurrentQueue*& queue) : queue_(queue) { @@ -53,20 +67,29 @@ template class InputQueuePlaceholder : public QueuePlaceholder queue_ = &typed_queue->get(); } + /// \brief Return true if a queue has been created or assigned. bool hasQueue() const override { return queue_ != nullptr; } + /// \brief Return the assigned queue pointer (null until createQueue or assignQueue). ConcurrentQueue* getQueue() { return queue_; } private: ConcurrentQueue*& queue_; }; +/*! + * \class OutputQueuePlaceholder + * + * \brief Placeholder for an output port queue; holds a reference to the stage's + * ConcurrentQueue* so the queue can be assigned when bound. + * \tparam T Element type of the queue. + */ template class OutputQueuePlaceholder : public QueuePlaceholder { public: - /// Create placeholder with a backpointer to the stage's queue member - /// variable. We will assign the queue to the stage variable when the queue - /// is created. + /// \brief Construct with a reference to the stage's queue pointer (must be null initially). + /// \param queue Reference to the stage's ConcurrentQueue*; set when noMoreStages() is called. + /// \throws DBException if \p queue is non-null. OutputQueuePlaceholder(ConcurrentQueue*& queue) : queue_(queue) { @@ -77,6 +100,7 @@ template class OutputQueuePlaceholder : public QueuePlaceholder } } + /// \brief Create a new Queue and return ownership. std::unique_ptr createQueue() override { auto queue = std::make_unique>(); @@ -84,6 +108,7 @@ template class OutputQueuePlaceholder : public QueuePlaceholder return queue; } + /// \brief Assign an existing queue to this placeholder (e.g. after binding). void assignQueue(QueueBase* queue) override { auto typed_queue = dynamic_cast*>(queue); @@ -96,15 +121,25 @@ template class OutputQueuePlaceholder : public QueuePlaceholder bool hasQueue() const override { return queue_ != nullptr; } + /// \brief Return the assigned queue pointer (null until createQueue or assignQueue). ConcurrentQueue* getQueue() { return queue_; } private: ConcurrentQueue*& queue_; }; +/*! + * \class StageQueueRepo + * + * \brief Per-stage registry of input and output port placeholders. Used in + * Stage subclasses (addInPort_/addOutPort_) before the stage name is set. + * Keys become "stage_name.port_name" after setStageName(). + */ class StageQueueRepo { public: + /// \brief Register an input port placeholder; only in Stage subclass constructors (before name is set). + /// \throws DBException if stage name already set, or port name already exists. template void addInPortPlaceholder(const std::string& port_name, ConcurrentQueue*& queue) { if (!stage_name_.empty()) @@ -120,6 +155,8 @@ class StageQueueRepo placeholder = std::make_unique>(queue); } + /// \brief Register an output port placeholder; only in Stage subclass constructors (before name is set). + /// \throws DBException if stage name already set, or port name already exists. template void addOutPortPlaceholder(const std::string& port_name, ConcurrentQueue*& queue) { if (!stage_name_.empty()) @@ -135,6 +172,8 @@ class StageQueueRepo placeholder = std::make_unique>(queue); } + /// \brief Set the stage name; rekeys all placeholders to "stage_name.port_name". Call at most once. + /// \throws DBException if renaming or if name already set differently. void setStageName(const std::string& stage_name) { if (stage_name_ != stage_name && !stage_name_.empty()) @@ -178,9 +217,19 @@ class StageQueueRepo friend class PipelineQueueRepo; }; +/*! + * \class PipelineQueueRepo + * + * \brief Aggregates StageQueueRepos, manages port bindings, and creates/assigns + * queues in finalizeBindings(). Lifecycle: add stages via merge(), then + * noMoreStages(), then bind() pairs, then finalizeBindings(); after that + * getInPortQueue/getOutPortQueue and validateQueues() are valid. + */ class PipelineQueueRepo { public: + /// \brief Merge another stage's placeholders into this repo; only while accepting stages. + /// \throws DBException if not accepting stages or if a port name collides. void merge(StageQueueRepo& other) { if (state_ != RepoState::ACCEPTING_STAGES) @@ -211,6 +260,8 @@ class PipelineQueueRepo other.output_placeholders_.clear(); } + /// \brief Signal that no more stages will be merged; switch to accepting bindings. + /// \throws DBException if not accepting stages. void noMoreStages() { if (state_ != RepoState::ACCEPTING_STAGES) @@ -221,6 +272,8 @@ class PipelineQueueRepo state_ = RepoState::ACCEPTING_BINDINGS; } + /// \brief Bind an output port to an input port (both full names, e.g. "StageA.out" -> "StageB.in"). + /// \throws DBException if not accepting bindings. void bind(const std::string& output_port_full_name, const std::string& input_port_full_name) { if (state_ != RepoState::ACCEPTING_BINDINGS) @@ -230,6 +283,8 @@ class PipelineQueueRepo port_bindings_[output_port_full_name] = input_port_full_name; } + /// \brief Create queues from bindings and unbound ports; switch to bindings complete. + /// \throws DBException if not accepting bindings or if a port is missing. void finalizeBindings() { if (state_ != RepoState::ACCEPTING_BINDINGS) @@ -279,6 +334,9 @@ class PipelineQueueRepo state_ = RepoState::BINDINGS_COMPLETE; } + /// \brief Return the input port queue for the given full port name; only after finalizeBindings(). + /// \tparam T Element type of the queue. + /// \throws DBException if bindings not finalized, port not found, or type mismatch. template ConcurrentQueue* getInPortQueue(const std::string& port_full_name) { if (state_ != RepoState::BINDINGS_COMPLETE) @@ -301,6 +359,9 @@ class PipelineQueueRepo return typed_placeholder->getQueue(); } + /// \brief Return the output port queue for the given full port name; only after finalizeBindings(). + /// \tparam T Element type of the queue. + /// \throws DBException if bindings not finalized, port not found, or type mismatch. template ConcurrentQueue* getOutPortQueue(const std::string& port_full_name) { if (state_ != RepoState::BINDINGS_COMPLETE) @@ -323,6 +384,8 @@ class PipelineQueueRepo return typed_placeholder->getQueue(); } + /// \brief Throw if any input or output port is unbound (not connected and not explicitly unbound). + /// \throws DBException with a message listing unbound ports. void validateQueues() { std::ostringstream oss; From 69957a74162e8136793af5ec9e347e92b426a2db Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 16:45:03 -0800 Subject: [PATCH 6/9] Doxygen: AsyncDataabseAccessor.hpp, DatabaseAccessor.hpp, DatabaseThread.hpp, PipelineSnooper.hpp, and ThreadMerger.hpp --- .../simdb/pipeline/AsyncDatabaseAccessor.hpp | 53 +++++++++++-------- include/simdb/pipeline/DatabaseAccessor.hpp | 17 +++++- include/simdb/pipeline/DatabaseThread.hpp | 39 ++++++++++---- include/simdb/pipeline/PipelineSnooper.hpp | 24 ++++++++- include/simdb/pipeline/ThreadMerger.hpp | 21 ++++++-- 5 files changed, 114 insertions(+), 40 deletions(-) diff --git a/include/simdb/pipeline/AsyncDatabaseAccessor.hpp b/include/simdb/pipeline/AsyncDatabaseAccessor.hpp index c274b260..aba9c2b1 100644 --- a/include/simdb/pipeline/AsyncDatabaseAccessor.hpp +++ b/include/simdb/pipeline/AsyncDatabaseAccessor.hpp @@ -13,18 +13,21 @@ class DatabaseManager; namespace simdb::pipeline { -/// Function that is queued by any pipeline thread (or the main -/// thread) and invoked on the dedicated database thread. +/// \brief Callable queued by any thread and invoked on the dedicated database thread (receives DatabaseManager*). using AsyncDbAccessFunc = std::function; -/// This struct is used to wrap a DB access function and -/// an exception that is suitable for std::future/promise. +/*! + * \struct AsyncDatabaseTask + * + * \brief Wraps an AsyncDbAccessFunc and a std::promise for the exception message; + * used to run the function on the DB thread and propagate exceptions to the caller. + */ struct AsyncDatabaseTask { AsyncDbAccessFunc func; std::promise exception_reason; - AsyncDatabaseTask(AsyncDbAccessFunc func) : + explicit AsyncDatabaseTask(AsyncDbAccessFunc func) : func(func) { } @@ -33,34 +36,40 @@ struct AsyncDatabaseTask using AsyncDatabaseTaskPtr = std::shared_ptr; -/// Base class to be implemented by the DatabaseThread. +/*! + * \class AsyncDatabaseAccessHandler + * + * \brief Interface implemented by DatabaseThread. Receives tasks and runs them + * on the DB thread inside safeTransaction(); callers block until the + * task completes (or timeout). + */ class AsyncDatabaseAccessHandler { public: virtual ~AsyncDatabaseAccessHandler() = default; - /// Put a task on the DB thread for evaluation, and BLOCK - /// until it is called. The DB thread will complete its - /// current transaction (INSERTs) immediately and evaluate - /// this task inside a separate safeTransaction(). - /// - /// If a nonzero timeout is given, throws a DBException - /// if the task is not completed within the timeout. + /// \brief Run the task on the DB thread; block until done (or timeout). + /// \param task Task to run inside a safeTransaction(). + /// \param timeout_seconds If > 0, throw if not completed within this many seconds. + /// \throws DBException on timeout or if the task throws. virtual void eval(AsyncDatabaseTaskPtr&& task, double timeout_seconds = 0) = 0; }; -/// This class is used by SimDB apps and pipeline elements to -/// asynchronously access the database. It supports async data -/// writes and enqueuing general-purpose std::functions. +/*! + * \class AsyncDatabaseAccessor + * + * \brief Handle for pipeline stages and apps to run code on the dedicated + * database thread. eval(func) blocks the caller until func(DatabaseManager*) + * has run. Obtained from PipelineManager::getAsyncDatabaseAccessor() or + * DatabaseThread::getAsyncDatabaseAccessor(). + */ class AsyncDatabaseAccessor { public: - /// Invoke a std::function on the DB thread. This BLOCKS the - /// calling thread until the function is processed. - /// (Uses std::future/promise). - /// - /// If a nonzero timeout is given, throws a DBException - /// if the task is not completed within the timeout. + /// \brief Run \p func on the DB thread; block until it completes (or timeout). + /// \param func Callable that receives the DatabaseManager*. + /// \param timeout_seconds If > 0, throw if not completed within this many seconds. + /// \throws DBException on timeout or if \p func throws. void eval(const AsyncDbAccessFunc& func, double timeout_seconds = 0) { auto task = std::make_shared(func); diff --git a/include/simdb/pipeline/DatabaseAccessor.hpp b/include/simdb/pipeline/DatabaseAccessor.hpp index 23936313..0b400093 100644 --- a/include/simdb/pipeline/DatabaseAccessor.hpp +++ b/include/simdb/pipeline/DatabaseAccessor.hpp @@ -9,18 +9,31 @@ namespace simdb::pipeline { -/// This class provides access to the DatabaseManager as well as prepared -/// INSERT objects that are specific to a particular App's schema. +/*! + * \class DatabaseAccessor + * + * \brief Provides DatabaseManager access and per-App prepared INSERT objects + * for DatabaseStage. Used by DatabaseStage to get the manager + * and getTableInserter(table_name) for high-volume inserts. + */ class DatabaseAccessor { public: + /// \brief Construct with the DatabaseManager used for all operations. + /// \param db_mgr Non-null DatabaseManager. DatabaseAccessor(DatabaseManager* db_mgr) : db_mgr_(db_mgr) { } + /// \brief Return the DatabaseManager. DatabaseManager* getDatabaseManager() const { return db_mgr_; } + /// \brief Return a prepared INSERT for the given table; lazily builds inserters from App::defineSchema(). + /// \tparam App App type (must have NAME and defineSchema()). + /// \param tbl_name Table name in the App's schema. + /// \return Raw pointer to the PreparedINSERT (owned by this accessor). + /// \throws DBException if the table is not in the App's schema. template PreparedINSERT* getTableInserter(const std::string& tbl_name) { auto& inserters = tbl_inserters_by_app_[App::NAME]; diff --git a/include/simdb/pipeline/DatabaseThread.hpp b/include/simdb/pipeline/DatabaseThread.hpp index a6b3f773..90671bb3 100644 --- a/include/simdb/pipeline/DatabaseThread.hpp +++ b/include/simdb/pipeline/DatabaseThread.hpp @@ -14,15 +14,20 @@ class DatabaseManager; namespace simdb::pipeline { -/// The database thread is used to ensure that Runnable::run() -/// methods are grouped inside BEGIN/COMMIT TRANSACTION blocks -/// for much better performance. -/// -/// It also provides asynchronous access to the database for -/// async pipeline queries or serialized data writes. +/*! + * \class DatabaseThread + * + * \brief PollingThread dedicated to database stages. Runs all its Runnables + * inside batched BEGIN/COMMIT transactions for performance, and + * provides AsyncDatabaseAccessor for other threads to submit work + * (eval) that runs on this thread. Exactly one DatabaseThread exists + * for every pipeline associated with the same DatabaseManager (.db file). + */ class DatabaseThread : public PollingThread, private AsyncDatabaseAccessHandler { public: + /// \brief Construct the database thread for the given DatabaseManager. + /// \param db_mgr DatabaseManager used for safeTransaction and async tasks. DatabaseThread(DatabaseManager* db_mgr) : db_mgr_(db_mgr), dormant_thread_(db_mgr) @@ -31,6 +36,7 @@ class DatabaseThread : public PollingThread, private AsyncDatabaseAccessHandler ~DatabaseThread() noexcept = default; + /// \brief Return the AsyncDatabaseAccessor for submitting work to this thread. AsyncDatabaseAccessor* getAsyncDatabaseAccessor() { return &db_accessor_; } private: @@ -106,19 +112,25 @@ class DatabaseThread : public PollingThread, private AsyncDatabaseAccessHandler return did_work; } - /// Unlike the "always-on" PollingThread, the DormantThread - /// class uses a condition variable to wake up and service - /// async DB accesses before going back to sleep. + /*! + * \class DormantThread + * + * \brief Thread that sleeps until an async DB task is enqueued; then runs + * the task in a safeTransaction() and goes back to sleep. Used to + * service AsyncDatabaseAccessor::eval() without blocking the main + * DB polling loop. + */ class DormantThread { public: - DormantThread(DatabaseManager* db_mgr) : + explicit DormantThread(DatabaseManager* db_mgr) : db_mgr_(db_mgr) { } ~DormantThread() noexcept { close(); } + /// \brief Start the dormant thread (waits on condition variable for tasks). void open() { if (!thread_) @@ -128,6 +140,7 @@ class DatabaseThread : public PollingThread, private AsyncDatabaseAccessHandler } } + /// \brief Stop the thread and join. void close() noexcept { { @@ -146,6 +159,10 @@ class DatabaseThread : public PollingThread, private AsyncDatabaseAccessHandler } } + /// \brief Enqueue a task and optionally block until it completes (or timeout). + /// \param task Task to run on this thread inside safeTransaction(). + /// \param timeout_seconds If > 0, block up to this many seconds; throw on timeout. + /// \throws DBException on timeout or if the task throws. void eval(AsyncDatabaseTaskPtr&& task, double timeout_seconds = 0) { if (stop_) @@ -176,9 +193,11 @@ class DatabaseThread : public PollingThread, private AsyncDatabaseAccessHandler } } + /// \brief Return true if there are pending async DB tasks in the queue. bool hasTasks() const { return !pending_async_db_tasks_.empty(); } private: + /// \brief Main loop that runs until the thread is stopped. void loop_() { while (true) diff --git a/include/simdb/pipeline/PipelineSnooper.hpp b/include/simdb/pipeline/PipelineSnooper.hpp index 6d981c12..469e640e 100644 --- a/include/simdb/pipeline/PipelineSnooper.hpp +++ b/include/simdb/pipeline/PipelineSnooper.hpp @@ -13,14 +13,30 @@ namespace simdb::pipeline { class PipelineManager; class Stage; +/*! + * \class PipelineSnooper + * + * \brief Collects Stage::snoop(key, snooped_obj) callbacks and runs them + * via snoopAllStages(). Create with PipelineManager::createSnooper(), + * addStage() for each stage that implements snoop(), then call + * snoopAllStages() to query all stages (optionally with pipeline disabled). + * \tparam KeyType Type of the key passed to snoop(). + * \tparam SnoopedType Type of the object that stages fill in (e.g. aggregate result). + */ template class PipelineSnooper { public: - PipelineSnooper(PipelineManager* pipeline_mgr) : + /// \brief Construct with the PipelineManager (used for snoopAllStages to optionally disable pipeline). + /// \param pipeline_mgr Non-null PipelineManager. + explicit PipelineSnooper(PipelineManager* pipeline_mgr) : pipeline_mgr_(pipeline_mgr) { } + /// \brief Register a stage's snoop method; StageType must have void snoop(const KeyType&, SnoopedType&). + /// \tparam StageType Stage-derived type that implements snoop(). + /// \param stage Stage to call snoop() on. + /// \throws DBException if this stage was already added. template void addStage(StageType* stage) { static_assert(std::is_base_of::value); @@ -33,7 +49,11 @@ template class PipelineSnooper callbacks_.push_back(cb); } - /// Implemented in PipelineManager.hpp to avoid circular includes + /// \brief Call each stage's snoop(key, snooped_obj) until one returns true or all are tried. + /// \param key Key to pass to snoop (e.g. lookup key). + /// \param snooped_obj Object that stages may fill; passed by reference to each snoop(). + /// \param disable_pipeline If true, use scopedDisableAll() while snooping so pipeline is paused. + /// \return true if any stage's snoop() returned true, false otherwise. bool snoopAllStages(const KeyType& key, SnoopedType& snooped_obj, bool disable_pipeline = true); private: diff --git a/include/simdb/pipeline/ThreadMerger.hpp b/include/simdb/pipeline/ThreadMerger.hpp index 46c0e1ff..c00a6a61 100644 --- a/include/simdb/pipeline/ThreadMerger.hpp +++ b/include/simdb/pipeline/ThreadMerger.hpp @@ -13,18 +13,28 @@ namespace simdb::pipeline { -/// This class is used when finalizing all apps' pipeline stages, -/// merging non-database threads appropriately when a call was made -/// to AppManager::minimizeThreads() before openPipelines(). - +/*! + * \class ThreadMerger + * + * \brief Merges non-database PollingThreads across pipelines when + * PipelineManager::minimizeThreads() or minimizeThreads(app,...) was + * called before openPipelines(). Creates a minimal set of threads. + * Exactly one DatabaseThread is shared by all pipeline(s) DatabaseStages + * if any. + */ class ThreadMerger { public: + /// \brief Construct with the pipelines that will contribute stages. + /// \param pipelines All pipelines (from PipelineManager). ThreadMerger(const std::vector>& pipelines) : pipelines_(pipelines) { } + /// \brief Mark one app's non-database stages for merging; cannot mix with mergeAllAppThreads(). + /// \param app App whose pipeline threads should be merged. + /// \throws DBException if already finalized or if mergeAllAppThreads() was used. void addAppForMerging(const App* app) { if (!accepting_apps_) @@ -46,8 +56,11 @@ class ThreadMerger apps_to_merge_.push_back(app); } + /// \brief Merge all apps' non-database threads into a minimal set; cannot mix with addAppForMerging(). void mergeAllAppThreads() { merge_all_apps_ = true; } + /// \brief Build the final list of PollingThreads (and one DatabaseThread); call once from PipelineManager::openPipelines(). + /// \param polling_threads Output vector; merged threads are appended (caller takes ownership). void performMerge(std::vector>& polling_threads) { if (apps_to_merge_.empty() && !merge_all_apps_) From a36b9bd5c1be4c57cffe7e99265f30133e941b0a Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 16:52:28 -0800 Subject: [PATCH 7/9] Doxygen: Exceptions.hpp, App.hpp, AppManager.hpp, and Table.hpp --- include/simdb/Exceptions.hpp | 36 ++++++++++++----------- include/simdb/apps/App.hpp | 41 ++++++++++++++++++++++++--- include/simdb/apps/AppManager.hpp | 47 +++++++++++++++++++++++++------ include/simdb/sqlite/Table.hpp | 4 +-- 4 files changed, 97 insertions(+), 31 deletions(-) diff --git a/include/simdb/Exceptions.hpp b/include/simdb/Exceptions.hpp index 8503a5c3..765c3637 100644 --- a/include/simdb/Exceptions.hpp +++ b/include/simdb/Exceptions.hpp @@ -9,34 +9,34 @@ namespace simdb { -/// Used to construct and throw a standard C++ exception +/*! + * \class DBException + * + * \brief SimDB exception type; stream additional context with operator<< before + * throwing. what() returns the full message built from the initial + * reason and any appended values. + */ class DBException : public std::exception { public: DBException() = default; - /// Construct a DBException object - DBException(const std::string& reason) { reason_ << reason; } + /// \brief Construct with an initial reason string. + explicit DBException(const std::string& reason) { reason_ << reason; } - /// Copy construct a DBException object + /// \brief Copy constructor; copies the accumulated message. DBException(const DBException& rhs) { reason_ << rhs.reason_.str(); } - /// Destroy! virtual ~DBException() noexcept override {} - /** - * \brief Overload from std::exception - * \return Const char * of the exception reason - */ + /// \brief Override from std::exception; returns the full exception message. virtual const char* what() const noexcept override { reason_str_ = reason_.str(); return reason_str_.c_str(); } - /** - * \brief Append additional information to the message. - */ + /// \brief Append a value to the exception message (stream-style). template DBException& operator<<(const T& msg) { reason_ << msg; @@ -52,13 +52,17 @@ class DBException : public std::exception mutable std::string reason_str_; }; -/// Used in order to signal to safeTransaction() that the transaction -/// must be retried. Since SimDB is multi-threaded, we expect the database -/// to encounter locked tables etc. which should not be thrown out of -/// calls to safeTransaction(). +/*! + * \class SafeTransactionSilentException + * + * \brief Internal exception signaling that safeTransaction() should retry (e.g. + * SQLITE_BUSY, SQLITE_LOCKED). Not propagated out of safeTransaction(); + * the transaction is retried instead. + */ class SafeTransactionSilentException : public std::exception { public: + /// \brief Construct with the SQLite return code (used in the message). explicit SafeTransactionSilentException(int rc) : msg_("The database is locked (return code " + std::to_string(rc) + ")") { diff --git a/include/simdb/apps/App.hpp b/include/simdb/apps/App.hpp index 1587ba3d..ba351a9d 100644 --- a/include/simdb/apps/App.hpp +++ b/include/simdb/apps/App.hpp @@ -50,23 +50,37 @@ class PipelineManager; class AppManager; class ThreadSafeLogger; -/// Base class for SimDB applications. Note that app subclasses are given -/// the DatabaseManager instance as a constructor argument, so they can -/// access the database and perform operations like appending schemas, -/// inserting records, etc.) +/*! + * \class App + * + * \brief Base class for SimDB applications. Subclasses receive a + * DatabaseManager in the constructor and can append schemas, insert + * records, and create pipelines. Lifecycle hooks: postInit(), + * createPipeline(), preTeardown(), postTeardown(). Use AppManager + * to register, enable, and instantiate apps. + */ class App { public: virtual ~App() = default; + /// \brief Set the instance number (1-based; 0 = single-instance). Called by AppManager. void setInstance(size_t instance) { instance_ = instance; } + /// \brief Get the instance number (0 if single-instance). size_t getInstance() const { return instance_; } + /// \brief Hook called after command-line parsing, before simulation starts. virtual void postInit([[maybe_unused]] int argc, [[maybe_unused]] char** argv) {} + /// \brief Hook to create and configure this app's pipeline(s) on the given manager. virtual void createPipeline(pipeline::PipelineManager*) {} + /// \brief Hook called before simulation teardown. virtual void preTeardown() {} + /// \brief Hook called after simulation teardown (resource cleanup). virtual void postTeardown() {} + /// \brief Return the stdout logger (set by AppManager); may be null. ThreadSafeLogger* getStdoutLogger() const { return stdout_logger_; } + /// \brief Return the stderr logger (set by AppManager); may be null. ThreadSafeLogger* getStderrLogger() const { return stderr_logger_; } + /// \brief Return the file logger (set by AppManager); may be null. ThreadSafeLogger* getFileLogger() const { return file_logger_; } protected: @@ -86,14 +100,33 @@ class App friend class AppManager; }; +/*! + * \class AppFactoryBase + * + * \brief Abstract factory for creating App instances and defining their schema. + * Each app type exposes a nested AppFactory that implements this interface; + * AppManager uses it to instantiate apps and register their tables. + * + * \note It is optional to define a custom AppFactory nested in your app class. + * If you don't, the default AppFactory will be used, which creates the app + * with the default constructor (DatabaseManager*). + */ class AppFactoryBase { public: virtual ~AppFactoryBase() = default; + /// \brief Create an App instance for the given DatabaseManager. virtual App* createApp(DatabaseManager*) = 0; + /// \brief Define this app's schema (tables, columns) on the given Schema. virtual void defineSchema(Schema& schema) const = 0; }; +/*! + * \class AppFactory + * + * \brief Default factory that creates AppT and delegates defineSchema to AppT::defineSchema. + * \tparam AppT App subclass (must have defineSchema(Schema&) and a constructor taking DatabaseManager*). + */ template class AppFactory : public AppFactoryBase { public: diff --git a/include/simdb/apps/AppManager.hpp b/include/simdb/apps/AppManager.hpp index 17a07a90..f8172964 100644 --- a/include/simdb/apps/AppManager.hpp +++ b/include/simdb/apps/AppManager.hpp @@ -29,13 +29,22 @@ template struct has_nested_factory(). template class AppRegistration : public AppRegistrationBase { private: @@ -44,16 +53,20 @@ template class AppRegistration : public AppRegistrationBase friend class AppManagers; }; -/// This class is responsible for registering, enabling, instantiating, -/// and managing the lifecycle of all SimDB applications running in a -/// simulation. +/*! + * \class AppManager + * + * \brief Registers, enables, instantiates, and manages the lifecycle of SimDB + * apps for one database. Obtainfrom AppManagers::createAppManager() or + * AppManagers::getAppManager(). + */ class AppManager { public: - /// Get our associated DatabaseManager. + /// \brief Return the associated DatabaseManager. DatabaseManager* getDatabaseManager() const { return db_mgr_; } - /// Parameterize an app factory. Call this before createEnabledApps(). + /// \brief Parameterize an app factory. Must be called before createEnabledApps(). /// Your app subclass must have a public nested class called "AppFactory", /// inheriting publicly from simdb::AppFactoryBase. Your nested AppFactory /// can have any signature it needs to accept all required app constructor @@ -715,11 +728,20 @@ template void AppRegistration::registerApp(AppManager* app app_manager->registerApp_(); } -/// This class holds onto all DatabaseManagers and their AppManagers. +/*! + * \class AppManagers + * + * \brief Singleton-like holder for all DatabaseManagers and AppManagers. Call + * registerApp() for each app type before createAppManager(); then + * createAppManager(db_file), createEnabledApps(), createSchemas(), + * postInit(), initializePipelines(), openPipelines(). Use getAppManager() + * or getDatabaseManager() to access managers. Obtain from + * AppManagers::getInstance() or AppRegistrations::getInstance(). + */ class AppManagers { public: - /// Register an app as early as possible (doesn't create it yet). + /// \brief Register an app type; must be called before createAppManager() (call once per app type). /// You need to call this method for all apps you might end up /// creating **before** calling createAppManager(). As soon as /// createAppManager() is called the first time, you can no @@ -980,15 +1002,22 @@ class AppManagers bool accepting_logger_requests_ = true; }; -/// Helper class to only expose AppManagers::registerApp() api. +/*! + * \class AppRegistrations + * + * \brief Thin wrapper that exposes only registerApp() on an AppManagers + * instance. Use when you want to restrict API surface to registration only. + */ class AppRegistrations { public: - AppRegistrations(AppManagers* app_managers) : + /// \brief Construct with the AppManagers instance to wrap. + explicit AppRegistrations(AppManagers* app_managers) : app_managers_(app_managers) { } + /// \brief Register an app type (forwards to AppManagers::registerApp()). template void registerApp() { app_managers_->registerApp(); } private: diff --git a/include/simdb/sqlite/Table.hpp b/include/simdb/sqlite/Table.hpp index 1297c2d3..2b63e7c4 100644 --- a/include/simdb/sqlite/Table.hpp +++ b/include/simdb/sqlite/Table.hpp @@ -34,7 +34,7 @@ class SqlTable }; /*! - * \class SqlTable + * \class SqlColumns * * \brief Helper class that is used under the hood for db_mgr.INSERT(..., * SQL_COLUMNS("ColA", "ColB"), ...) @@ -91,7 +91,7 @@ class SqlColumns }; /*! - * \class SqlTable + * \class SqlValues * * \brief Helper class that is used together with SQL_VALUES() in order to bind * data values to a prepared statement for INSERT's. From 85aa4f2ed9fa11e3ebce1ef6952e9d50d73b5c91 Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 16:52:59 -0800 Subject: [PATCH 8/9] Run clang-format --- include/simdb/pipeline/Pipeline.hpp | 3 ++- include/simdb/pipeline/PipelineManager.hpp | 3 ++- include/simdb/pipeline/QueueRepo.hpp | 2 +- include/simdb/pipeline/ThreadMerger.hpp | 3 ++- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/include/simdb/pipeline/Pipeline.hpp b/include/simdb/pipeline/Pipeline.hpp index 1bebded1..426ecabc 100644 --- a/include/simdb/pipeline/Pipeline.hpp +++ b/include/simdb/pipeline/Pipeline.hpp @@ -157,7 +157,8 @@ class Pipeline state_ = State::FINALIZED; } - /// \brief Create a Flusher that runs the given stages (or all in order); wraps in transaction if any stage is a DatabaseStage. + /// \brief Create a Flusher that runs the given stages (or all in order); wraps in transaction if any stage is a + /// DatabaseStage. /// \param stage_names Stage names in flush order; if empty, use add order. /// \return Flusher or FlusherWithTransaction; caller owns. /// \throws DBException if a stage name does not exist. diff --git a/include/simdb/pipeline/PipelineManager.hpp b/include/simdb/pipeline/PipelineManager.hpp index 450e3c05..66780170 100644 --- a/include/simdb/pipeline/PipelineManager.hpp +++ b/include/simdb/pipeline/PipelineManager.hpp @@ -160,7 +160,8 @@ class PipelineManager threads_opened_ = true; } - /// \brief Temporarily disable all pipeline runnables (and optionally pause threads); re-enabled when the returned object is destroyed. + /// \brief Temporarily disable all pipeline runnables (and optionally pause threads); re-enabled when the returned + /// object is destroyed. /// \param disable_threads_too If true, also pause polling threads; if false, only disable runnables. /// \return A ScopedRunnableDisabler, or nullptr if a disabler is already active (nested calls are no-ops). std::unique_ptr scopedDisableAll(bool disable_threads_too = true) diff --git a/include/simdb/pipeline/QueueRepo.hpp b/include/simdb/pipeline/QueueRepo.hpp index beffad0a..13ddfab4 100644 --- a/include/simdb/pipeline/QueueRepo.hpp +++ b/include/simdb/pipeline/QueueRepo.hpp @@ -38,7 +38,7 @@ template class InputQueuePlaceholder : public QueuePlaceholder { public: /// \brief Construct with a reference to the stage's queue pointer (must be null initially). - /// \param queue Reference to the stage's ConcurrentQueue*; set when + /// \param queue Reference to the stage's ConcurrentQueue*; set when /// \throws DBException if \p queue is non-null. InputQueuePlaceholder(ConcurrentQueue*& queue) : queue_(queue) diff --git a/include/simdb/pipeline/ThreadMerger.hpp b/include/simdb/pipeline/ThreadMerger.hpp index c00a6a61..195c9f9b 100644 --- a/include/simdb/pipeline/ThreadMerger.hpp +++ b/include/simdb/pipeline/ThreadMerger.hpp @@ -59,7 +59,8 @@ class ThreadMerger /// \brief Merge all apps' non-database threads into a minimal set; cannot mix with addAppForMerging(). void mergeAllAppThreads() { merge_all_apps_ = true; } - /// \brief Build the final list of PollingThreads (and one DatabaseThread); call once from PipelineManager::openPipelines(). + /// \brief Build the final list of PollingThreads (and one DatabaseThread); call once from + /// PipelineManager::openPipelines(). /// \param polling_threads Output vector; merged threads are appended (caller takes ownership). void performMerge(std::vector>& polling_threads) { From 8d099c62024dfa9f7a3a190ce05086ff1e7a093a Mon Sep 17 00:00:00 2001 From: Colby Nyce Date: Sun, 1 Mar 2026 16:59:30 -0800 Subject: [PATCH 9/9] PR self-review --- include/simdb/apps/App.hpp | 10 ++++++++++ include/simdb/apps/AppManager.hpp | 10 +++++----- include/simdb/pipeline/AsyncDatabaseAccessor.hpp | 2 +- include/simdb/pipeline/Pipeline.hpp | 4 ++-- include/simdb/pipeline/PipelineManager.hpp | 12 +++++++----- include/simdb/pipeline/Queue.hpp | 1 + 6 files changed, 26 insertions(+), 13 deletions(-) diff --git a/include/simdb/apps/App.hpp b/include/simdb/apps/App.hpp index ba351a9d..a72b0388 100644 --- a/include/simdb/apps/App.hpp +++ b/include/simdb/apps/App.hpp @@ -63,23 +63,31 @@ class App { public: virtual ~App() = default; + /// \brief Set the instance number (1-based; 0 = single-instance). Called by AppManager. void setInstance(size_t instance) { instance_ = instance; } + /// \brief Get the instance number (0 if single-instance). size_t getInstance() const { return instance_; } + /// \brief Hook called after command-line parsing, before simulation starts. virtual void postInit([[maybe_unused]] int argc, [[maybe_unused]] char** argv) {} + /// \brief Hook to create and configure this app's pipeline(s) on the given manager. virtual void createPipeline(pipeline::PipelineManager*) {} + /// \brief Hook called before simulation teardown. virtual void preTeardown() {} + /// \brief Hook called after simulation teardown (resource cleanup). virtual void postTeardown() {} /// \brief Return the stdout logger (set by AppManager); may be null. ThreadSafeLogger* getStdoutLogger() const { return stdout_logger_; } + /// \brief Return the stderr logger (set by AppManager); may be null. ThreadSafeLogger* getStderrLogger() const { return stderr_logger_; } + /// \brief Return the file logger (set by AppManager); may be null. ThreadSafeLogger* getFileLogger() const { return file_logger_; } @@ -115,8 +123,10 @@ class AppFactoryBase { public: virtual ~AppFactoryBase() = default; + /// \brief Create an App instance for the given DatabaseManager. virtual App* createApp(DatabaseManager*) = 0; + /// \brief Define this app's schema (tables, columns) on the given Schema. virtual void defineSchema(Schema& schema) const = 0; }; diff --git a/include/simdb/apps/AppManager.hpp b/include/simdb/apps/AppManager.hpp index f8172964..a66cc514 100644 --- a/include/simdb/apps/AppManager.hpp +++ b/include/simdb/apps/AppManager.hpp @@ -741,11 +741,11 @@ template void AppRegistration::registerApp(AppManager* app class AppManagers { public: - /// \brief Register an app type; must be called before createAppManager() (call once per app type). - /// You need to call this method for all apps you might end up - /// creating **before** calling createAppManager(). As soon as - /// createAppManager() is called the first time, you can no - /// longer call registerApp(). + /// \brief Register an app type; must be called before createAppManager() + /// \note You need to call this method for all apps you might end up + /// creating **before** calling createAppManager(). As soon as + /// createAppManager() is called the first time, you can no + /// longer call registerApp(). template void registerApp() { if (app_registration_locked_) diff --git a/include/simdb/pipeline/AsyncDatabaseAccessor.hpp b/include/simdb/pipeline/AsyncDatabaseAccessor.hpp index aba9c2b1..6367e8d3 100644 --- a/include/simdb/pipeline/AsyncDatabaseAccessor.hpp +++ b/include/simdb/pipeline/AsyncDatabaseAccessor.hpp @@ -13,7 +13,7 @@ class DatabaseManager; namespace simdb::pipeline { -/// \brief Callable queued by any thread and invoked on the dedicated database thread (receives DatabaseManager*). +/// \brief Callable queued by any thread and invoked on the dedicated database thread. using AsyncDbAccessFunc = std::function; /*! diff --git a/include/simdb/pipeline/Pipeline.hpp b/include/simdb/pipeline/Pipeline.hpp index 426ecabc..f6f36594 100644 --- a/include/simdb/pipeline/Pipeline.hpp +++ b/include/simdb/pipeline/Pipeline.hpp @@ -157,8 +157,8 @@ class Pipeline state_ = State::FINALIZED; } - /// \brief Create a Flusher that runs the given stages (or all in order); wraps in transaction if any stage is a - /// DatabaseStage. + /// \brief Create a Flusher that runs the given stages (or all in order); wraps in transaction + /// if any stage is a DatabaseStage. /// \param stage_names Stage names in flush order; if empty, use add order. /// \return Flusher or FlusherWithTransaction; caller owns. /// \throws DBException if a stage name does not exist. diff --git a/include/simdb/pipeline/PipelineManager.hpp b/include/simdb/pipeline/PipelineManager.hpp index 66780170..2e1e73f3 100644 --- a/include/simdb/pipeline/PipelineManager.hpp +++ b/include/simdb/pipeline/PipelineManager.hpp @@ -160,10 +160,12 @@ class PipelineManager threads_opened_ = true; } - /// \brief Temporarily disable all pipeline runnables (and optionally pause threads); re-enabled when the returned - /// object is destroyed. - /// \param disable_threads_too If true, also pause polling threads; if false, only disable runnables. - /// \return A ScopedRunnableDisabler, or nullptr if a disabler is already active (nested calls are no-ops). + /// \brief Temporarily disable all pipeline runnables (and optionally pause threads); + /// re-enabled when the returnedobject is destroyed. + /// \param disable_threads_too If true, also pause polling threads; if false, only + /// disable runnables. + /// \return A ScopedRunnableDisabler, or nullptr if a disabler is already active (nested + /// calls are no-ops). std::unique_ptr scopedDisableAll(bool disable_threads_too = true) { if (disabler_active_) @@ -187,7 +189,7 @@ class PipelineManager return disabler; } - /// \brief Close all threads, flush runnables, and print performance reports; marks manager as closed. + /// \brief Close all threads, flush runnables, and print performance reports. void postSimLoopTeardown() { checkOpen_(); diff --git a/include/simdb/pipeline/Queue.hpp b/include/simdb/pipeline/Queue.hpp index 2dd6030b..ba437e49 100644 --- a/include/simdb/pipeline/Queue.hpp +++ b/include/simdb/pipeline/Queue.hpp @@ -37,6 +37,7 @@ template class Queue : public QueueBase public: /// \brief Mutable access to the underlying ConcurrentQueue. ConcurrentQueue& get() { return queue_; } + /// \brief Const access to the underlying ConcurrentQueue. const ConcurrentQueue& get() const { return queue_; }