Pulse is a lightweight and fast reactive C++ library.
It allows you to connect parts of your application with data streams (observable), transform them with operators, and easily propagate events between classes.
observable<T>β data stream of any typetopic<T>β event bus for publishing values- Rich set of operators:
map,filter,debounce,distinct_until_changed,
combine_latest,switch_map,take,zip,
publish,ref_count,timeout,
throttle,throttle_latest,buffer,window,
merge,concat_map,observe_onand more - Timers and intervals:
timer(),interval() - Subscription management (
subscription) - Hot and cold observables (
publish,ref_count,ref_count(grace)) - Simple executors (
inline_executor,thread_pool)
- C++ standard: C++20
- CMake: β₯ 3.21
- Compilers (tested / recommended):
- GCC β₯ 12
- Clang β₯ 14 / AppleClang β₯ 14
- MSVC β₯ 19.36 (Visual Studio 2022 17.6)
Pulse is header-only. No linking required, only include paths.
- Qt β if
-DPULSE_WITH_QT=ON(foradapters/qt.hpp) - CTest β if
-DPULSE_BUILD_TESTS=ON - Google Benchmark β if
-DPULSE_BUILD_BENCHMARKS=ON
git clone https://github.com/danrom11/Pulse.git
cd pulse
cmake -S . -B build
cmake --build build -j
cmake --install build --prefix /your/install/pathYou can customize the build with the following CMake options:
| Option | Default | Description |
|---|---|---|
PULSE_WITH_QT |
OFF |
Enable Qt adapters (adapters/qt.hpp). Requires Qt. |
PULSE_BUILD_TESTS |
ON |
Build unit tests. |
PULSE_BUILD_EXAMPLES |
ON |
Build example programs. |
PULSE_BUILD_BENCHMARKS |
OFF |
Build benchmarks (requires Google Benchmark). |
PULSE_TRACE |
OFF |
Enable tracing hooks (experimental, not yet implemented). |
cmake --install build --prefix /your/install/pathfind_package(Pulse REQUIRED)
target_link_libraries(your_target PRIVATE Pulse::pulse)include(FetchContent)
FetchContent_Declare(
pulse
GIT_REPOSITORY https://github.com/danrom11/Pulse.git
GIT_TAG main
)
FetchContent_MakeAvailable(pulse)
target_link_libraries(your_target PRIVATE Pulse::pulse)#include <pulse/pulse.hpp>
#include <iostream>
using namespace pulse;
int main() {
inline_executor ui;
topic<int> numbers;
auto obs = as_observable(numbers, ui);
auto sub = (obs | filter([](int x){ return x % 2 == 0; }))
.subscribe([](int x){ std::cout << "even: " << x << "\n"; });
numbers.publish(1);
numbers.publish(2);
numbers.publish(3);
// => even: 2
}class Producer {
public:
void send(std::string msg) { bus_.publish(msg); }
observable<std::string> stream(executor& ui) { return as_observable(bus_, ui); }
private:
topic<std::string> bus_;
};
class Consumer {
public:
Consumer(observable<std::string> input) {
sub_ = input.subscribe([](const std::string& s){
std::cout << "[Consumer] got: " << s << "\n";
});
}
private:
subscription sub_;
};
int main() {
inline_executor ui;
Producer p;
Consumer c(p.stream(ui));
p.send("hello");
p.send("world");
}Output:
[Consumer] got: hello
[Consumer] got: world
SearchBox box;
SearchService service(box.stream(ui), ui, io);
box.type("que");
box.type("query");
// => [SearchService] [result] queryBy default, an observable is βcoldβ: each subscriber restarts it.
To share an upstream source:
auto cold = interval(100ms, io);
auto shared = ref_count(publish(cold), 250ms); // 250ms grace period
auto a = shared.subscribe([](auto v){ std::cout << "[A] " << v << "\n"; });
auto b = shared.subscribe([](auto v){ std::cout << "[B] " << v << "\n"; });map(f)β transformationfilter(f)β filteringdebounce(ms, exec)β debounce (suppress intermediate events)distinct_until_changed()β only propagate changescombine_latest(a, b, f)β combine streamsswitch_map(f)β switch to a new streamtake(n)β first N valueszip(a,b)β pairwise mergetimeout(ms, exec)β fail if no event within timethrottle(ms, exec)β emit first value per windowthrottle_latest(ms, exec)β emit first + last value per windowbuffer(n)β group N values into vectorswindow(n)β sliding windows as nested observablesmerge(a,b)β merge multiple streamsconcat_map(f)β sequential map/flattenobserve_on(exec)β deliver on specified executorinterval(period, exec, delay)β periodic eventstimer(delay, exec)β one-shot event
Every subscription returns a subscription object.
When destroyed or reset, events stop flowing:
auto sub = obs.subscribe(...);
sub.reset(); // unsubscribe- observable β stream declaration
- subscription β subscription management
- topic β event bus
- executor / thread_pool β execution context
- publish / ref_count β hot sharing
The project includes unit tests (CTest + assert).
cmake -S . -B build -DPULSE_BUILD_TESTS=ON
cmake --build build -j
cd build && ctest --output-on-failureCovers correctness of operators (map, filter, window, buffer, merge, take, timeout, β¦), unsubscription, and error propagation.
- Minimal overhead β only lambda captures and a few
shared_ptr. - No extra allocations in hot paths (operators are inline-friendly).
- Multithreading supported via executors.
- Comparable or faster than RxCpp in common cases.
- Use
autowith operators to avoid verbose types. - Manage
subscriptionlifetimes with RAII. - For async scenarios, use
thread_poolor your own executor. - Use
mergeandcombine_latestfor multi-stream composition.
| Pulse | RxCpp | |
|---|---|---|
| API complexity | Minimal, focused | Full Rx standard |
| Language req. | C++20 | C++11 |
| Executors | Built-in | None (external) |
| Performance | Low overhead | Sometimes heavy |
| Code size | ~10 files | >100 files |
| Pulse | std::execution (C++20/23) | |
|---|---|---|
| Paradigm | Reactive streams (push model) | Bulk execution, parallel loops (pull model) |
| Data | Events over time (observable<T>) |
Containers / ranges |
| Operators | Reactive ops (map, filter, zip, β¦) |
Parallel policies (par, par_unseq) |
| Asynchrony | Built-in executors (thread_pool, β¦) |
Delegated to the implementation |
| Goal | Event composition + reactive pipelines | Efficient parallel algorithms |
The library defines pulse/version.hpp:
#include <pulse/version.hpp>
#include <iostream>
int main() {
std::cout << "Pulse version: " << pulse::version::string << "\n";
}You can check version macros:
#if PULSE_VERSION_CODE >= 0x00020000
// code for version >= 2.0.0
#endifWe measured operator performance using Google Benchmark.
Tests run in Release on macOS (AppleClang 15, 8 cores).
| Operator | Size | Time (ns) | Per element (ns) | Events/sec (approx) | Notes |
|---|---|---|---|---|---|
filter |
1000 | ~102 000 | ~100 | ~10 M/sec | Simple filter pass |
map_chain (3Γ) |
1000 | ~170 000 | ~170 | ~6 M/sec | Sequential 3 maps |
throttle_latest |
1000 | ~222 000 | ~222 | ~4.5 M/sec | Timer + latest logic |
thread_pool(filter) |
1000 | ~216 000 | ~216 | ~4.6 M/sec | Cross-thread overhead |
β¨ Takeaways:
- All operators scale linearly (O(n)) with input size.
inline_executorachieves ~100 ns per element.thread_poolandthrottle_latestare ~2Γ heavier but provide async and backpressure control.- Even with heavy operators, Pulse processes millions of events per second.
Pull requests are welcome!
1. Fork the repo
2. Create a branch (git checkout -b feature/awesome-thing)
3. Commit changes
4. Run tests (ctest)
5. Open PR π
- Support for custom executors (asio, libuv)
- Support for the stdexec.hpp adapter
- Tracing hooks
- Additional operators (
group_by,replay, and others) - Doxygen-style documentation
- More examples of integration with the Qt UI framework
Apache-2.0 License. Free to use and embed in your projects π