Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions examples/ConcurrentApps/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ int main()
auto& app_mgr = app_mgrs.createAppManager("test.db");
auto& db_mgr = app_mgrs.getDatabaseManager();

// Disable pipeline messages; it clutters up stdout with so many running apps
app_mgr.disableMessageLog();
app_mgr.disableErrorLog();

// Create 4 instances of the SimplePipeline app
app_mgr.enableApp(SimplePipeline::NAME, 4);

Expand Down
3 changes: 0 additions & 3 deletions examples/DatabaseWatchdog/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,6 @@ int main()
app_mgrs.registerApp<DatabaseWatchdog>();
auto& app_mgr = app_mgrs.createAppManager("test.db");

app_mgr.disableMessageLog();
app_mgr.disableErrorLog();

// Setup...
app_mgr.enableApp(WatchedPipeline::NAME);
app_mgr.enableApp(DatabaseWatchdog::NAME);
Expand Down
24 changes: 19 additions & 5 deletions include/simdb/apps/App.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class AsyncDatabaseAccessor;
class PipelineManager;
} // namespace pipeline

class AppManager;
class ThreadSafeLogger;

/// Base class for SimDB applications. Note that app subclasses are given
/// the DatabaseManager instance as a constructor argument, so they can
/// access the database and perform operations like appending schemas,
Expand All @@ -57,19 +60,30 @@ class App
virtual ~App() = default;
void setInstance(size_t instance) { instance_ = instance; }
size_t getInstance() const { return instance_; }
virtual void postInit(int argc, char** argv)
{
(void)argc;
(void)argv;
}
virtual void postInit([[maybe_unused]] int argc, [[maybe_unused]] char** argv) {}
virtual void createPipeline(pipeline::PipelineManager*) {}
virtual void preTeardown() {}
virtual void postTeardown() {}

ThreadSafeLogger* getStdoutLogger() const { return stdout_logger_; }
ThreadSafeLogger* getStderrLogger() const { return stderr_logger_; }
ThreadSafeLogger* getFileLogger() const { return file_logger_; }

protected:
void setStdoutLogger_(ThreadSafeLogger* logger) { stdout_logger_ = logger; }
void setStderrLogger_(ThreadSafeLogger* logger) { stderr_logger_ = logger; }
void setFileLogger_(ThreadSafeLogger* logger) { file_logger_ = logger; }

private:
/// Instance number for multi-instance apps (1-based).
/// If zero, then this is a single-instance app.
size_t instance_ = 0;

/// Thread-safe loggers.
ThreadSafeLogger* stdout_logger_ = nullptr;
ThreadSafeLogger* stderr_logger_ = nullptr;
ThreadSafeLogger* file_logger_ = nullptr;
friend class AppManager;
};

class AppFactoryBase
Expand Down
194 changes: 84 additions & 110 deletions include/simdb/apps/AppManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include "simdb/apps/App.hpp"
#include "simdb/pipeline/PipelineManager.hpp"
#include "simdb/sqlite/DatabaseManager.hpp"
#include "simdb/utils/ThreadSafeLogger.hpp"

#include <filesystem>
#include <iostream>
#include <map>
#include <set>

#define PROFILE_APP_PHASE [[maybe_unused]] ScopedTimer timer(getDatabaseManager(), __FUNCTION__, msg_log_);
#define PROFILE_APP_PHASE [[maybe_unused]] ScopedTimer timer(getDatabaseManager(), __FUNCTION__);

namespace simdb {

Expand Down Expand Up @@ -173,18 +174,6 @@ class AppManager
}
}

/// Disable all logged messages.
void disableMessageLog() { msg_log_.disable(); }

/// Disable all logged errors.
void disableErrorLog() { err_log_.disable(); }

/// Redirect messages (defaults to stdout)
void redirectMessageLog(std::ostream* msg_log) { msg_log_.redirectMessages(msg_log); }

/// Redirect errors (defaults to stderr)
void redirectErrorsLog(std::ostream* err_log) { err_log_.redirectErrors(err_log); }

/// After parsing command line arguments or configuration files,
/// enable an app by its name. This will allow the app to be instantiated
/// and run during the simulation lifecycle.
Expand Down Expand Up @@ -331,10 +320,12 @@ class AppManager

private:
/// AppManagers are associated 1-to-1 with a DatabaseManager.
AppManager(DatabaseManager* db_mgr) :
AppManager(DatabaseManager* db_mgr, ThreadSafeLogger* stdout_logger = nullptr,
ThreadSafeLogger* stderr_logger = nullptr, ThreadSafeLogger* file_logger = nullptr) :
db_mgr_(db_mgr),
msg_log_(&std::cout),
err_log_(&std::cerr)
stdout_logger_(stdout_logger),
stderr_logger_(stderr_logger),
file_logger_(file_logger)
{
}

Expand Down Expand Up @@ -397,9 +388,13 @@ class AppManager
}

App* app = factory->createApp(db_mgr_);
app->setInstance(instance_num);
app->instance_ = instance_num;
std::string instance_name = app_name + std::string("-") + std::to_string(instance_num);
apps_[instance_name] = std::unique_ptr<App>(app);

app->stdout_logger_ = stdout_logger_;
app->stderr_logger_ = stderr_logger_;
app->file_logger_ = file_logger_;
}
}
}
Expand Down Expand Up @@ -459,17 +454,17 @@ class AppManager
}

// Print final pipeline configurations.
msg_log_ << "\nSimDB app pipeline configuration for database '" << db_mgr_->getDatabaseFilePath() << "':\n";
std::cout << "\nSimDB app pipeline configuration for database '" << db_mgr_->getDatabaseFilePath() << "':\n";
for (auto pipeline : pipeline_mgr_->getPipelines())
{
msg_log_ << "---- Pipeline: " << pipeline->getName() << "\n";
std::cout << "---- Pipeline: " << pipeline->getName() << "\n";
for (auto& [stage_name, stage] : pipeline->getOrderedStages())
{
msg_log_ << "------ Stage: " << stage_name << "\n";
std::cout << "------ Stage: " << stage_name << "\n";
}
}

msg_log_ << std::endl;
std::cout << std::endl;
}

/// Call this once after initializePipelines() (and after minimizeThreads()
Expand Down Expand Up @@ -500,7 +495,7 @@ class AppManager

if (pipeline_mgr_)
{
pipeline_mgr_->postSimLoopTeardown(msg_log_);
pipeline_mgr_->postSimLoopTeardown();
}

db_mgr_->safeTransaction([&]() {
Expand Down Expand Up @@ -664,123 +659,55 @@ class AppManager
/// All pipelines and threads are managed by PipelineManager.
std::unique_ptr<pipeline::PipelineManager> pipeline_mgr_;

/// Stdout logger (thread-safe). Owned by AppManagers.
ThreadSafeLogger* stdout_logger_ = nullptr;

/// Stderr logger (thread-safe). Owned by AppManagers.
ThreadSafeLogger* stderr_logger_ = nullptr;

/// File logger (thread-safe). Owned by AppManagers.
ThreadSafeLogger* file_logger_ = nullptr;

/// RAII timer to measure the performance of various app setup/teardown
/// phases.
class ScopedTimer
{
public:
ScopedTimer(const DatabaseManager* db_mgr, const std::string& block_name, std::ostream* msg_out = &std::cout) :
ScopedTimer(const DatabaseManager* db_mgr, const std::string& block_name) :
start_(std::chrono::high_resolution_clock::now()),
block_name_(block_name),
msg_out_(msg_out)
block_name_(block_name)
{
if (msg_out_)
{
auto db_filepath = db_mgr->getDatabaseFilePath();
*msg_out_ << "SimDB: Entering " << block_name << " for database: " << db_filepath << "\n";
}
auto db_filepath = db_mgr->getDatabaseFilePath();
std::cout << "SimDB: Entering " << block_name << " for database: " << db_filepath << "\n";
}

~ScopedTimer()
{
if (!msg_out_)
{
return;
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> dur = end - start_;
auto us = std::chrono::duration_cast<std::chrono::microseconds>(dur).count();
if (us >= 1000000)
{
auto sec = (double)us / 1000000;
*msg_out_ << "SimDB: Completed " << block_name_ << " in ";
*msg_out_ << std::fixed << std::setprecision(2) << sec << " seconds.\n";
std::cout << "SimDB: Completed " << block_name_ << " in ";
std::cout << std::fixed << std::setprecision(2) << sec << " seconds.\n";
} else if (us >= 1000)
{
auto milli = (double)us / 1000;
*msg_out_ << "SimDB: Completed " << block_name_ << " in ";
*msg_out_ << std::fixed << std::setprecision(0) << milli << " milliseconds.\n";
std::cout << "SimDB: Completed " << block_name_ << " in ";
std::cout << std::fixed << std::setprecision(0) << milli << " milliseconds.\n";
} else
{
auto micro = (double)us;
*msg_out_ << "SimDB: Completed " << block_name_ << " in ";
*msg_out_ << std::fixed << std::setprecision(0) << micro << " microseconds.\n";
std::cout << "SimDB: Completed " << block_name_ << " in ";
std::cout << std::fixed << std::setprecision(0) << micro << " microseconds.\n";
}
}

private:
std::chrono::high_resolution_clock::time_point start_;
std::string block_name_;
std::ostream* msg_out_ = nullptr;
};

/// Simple wrapper around std::ostream* for conditional logging
class Logger
{
public:
Logger(std::ostream* out) :
out_(out)
{
}

template <typename T> Logger& operator<<(const T& msg)
{
if (out_ && enabled_)
{
*out_ << msg;
out_->flush();
}
return *this;
}

Logger& operator<<(const char* msg)
{
if (out_ && enabled_)
{
*out_ << msg;
out_->flush();
}
return *this;
}

Logger& operator<<(std::ostream& (*manip)(std::ostream&))
{
if (out_ && enabled_)
{
manip(*out_);
out_->flush();
}
return *this;
}

operator std::ostream*() { return enabled_ ? out_ : nullptr; }

void disable() { enabled_ = false; }

void enable() { enabled_ = true; }

void enable(bool enabled) { enabled_ = enabled; }

void redirectMessages(std::ostream* msg_log)
{
out_ = msg_log;
enable(out_ != nullptr);
}

void redirectErrors(std::ostream* err_log)
{
out_ = err_log;
enable(out_ != nullptr);
}

private:
std::ostream* out_ = nullptr;
bool enabled_ = true;
};

Logger msg_log_;
Logger err_log_;
};

template <typename AppT> void AppRegistration<AppT>::registerApp(AppManager* app_manager) const
Expand All @@ -806,6 +733,45 @@ class AppManagers
app_registrations_.emplace_back(new AppRegistration<AppT>());
}

/// If you want all apps to share a global thread-safe stdout logger,
/// call this method before createAppManager().
///
/// Pass in prefix=true to see "[log]" before each line written to the logger's output.
void useThreadSafeStdoutLogger(bool prefix = false)
{
if (!accepting_logger_requests_)
{
throw DBException("No longer accepting thread-safe logger requests");
}
stdout_logger_ = std::make_unique<ThreadSafeLogger>(std::cout, prefix);
}

/// If you want all apps to share a global thread-safe stderr logger,
/// call this method before createAppManager().
///
/// Pass in prefix=true to see "[log]" before each line written to the logger's output.
void useThreadSafeStderrLogger(bool prefix = false)
{
if (!accepting_logger_requests_)
{
throw DBException("No longer accepting thread-safe logger requests");
}
stderr_logger_ = std::make_unique<ThreadSafeLogger>(std::cerr, prefix);
}

/// If you want all apps to share a global thread-safe file logger,
/// call this method before createAppManager().
///
/// Pass in prefix=true to see "[log]" before each line written to the logger's output.
void useThreadSafeFileLogger(const std::string& filename, bool prefix = false)
{
if (!accepting_logger_requests_)
{
throw DBException("No longer accepting thread-safe logger requests");
}
file_logger_ = std::make_unique<ThreadSafeLogger>(filename, prefix);
}

/// Create a new AppManager with a new database.
///
/// Pass in new_db=true to overwrite existing database, or new_db=false to use
Expand All @@ -815,6 +781,8 @@ class AppManagers
/// Throws if an AppManager for this database already exists.
AppManager& createAppManager(const std::string& db_file, bool new_db = true)
{
accepting_logger_requests_ = false;

if (app_mgrs_by_db_file_.find(db_file) != app_mgrs_by_db_file_.end())
{
throw DBException("AppManager already exists for database: ") << db_file;
Expand All @@ -826,7 +794,8 @@ class AppManagers
}

std::shared_ptr<DatabaseManager> db_mgr(new DatabaseManager(db_file, new_db));
std::shared_ptr<AppManager> app_mgr(new AppManager(db_mgr.get()));
std::shared_ptr<AppManager> app_mgr(
new AppManager(db_mgr.get(), stdout_logger_.get(), stderr_logger_.get(), file_logger_.get()));

db_mgrs_by_db_file_[db_file] = db_mgr;
app_mgrs_by_db_file_[db_file] = app_mgr;
Expand Down Expand Up @@ -1004,6 +973,11 @@ class AppManagers
std::map<std::string, std::shared_ptr<AppManager>> app_mgrs_by_db_file_;
std::map<DatabaseManager*, std::shared_ptr<AppManager>> app_mgrs_by_db_mgr_;
std::map<AppManager*, std::shared_ptr<DatabaseManager>> db_mgrs_by_app_mgr_;

std::unique_ptr<ThreadSafeLogger> stdout_logger_;
std::unique_ptr<ThreadSafeLogger> stderr_logger_;
std::unique_ptr<ThreadSafeLogger> file_logger_;
bool accepting_logger_requests_ = true;
};

/// Helper class to only expose AppManagers::registerApp() api.
Expand Down
Loading