-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubscriber.cpp
More file actions
114 lines (97 loc) · 3.44 KB
/
subscriber.cpp
File metadata and controls
114 lines (97 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Minimal Subscriber using Eclipse S-Core MW / LoLA IPC
// Finds a service, subscribes to events, and prints motor angle samples.
#include "datatype.h"
#include "score/mw/com/runtime.h"
#include "score/mw/com/types.h"
#include <chrono>
#include <csignal>
#include <cstdlib>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
static volatile bool g_running = true;
static void signal_handler(int /*sig*/)
{
g_running = false;
}
int main(int argc, const char** argv)
{
std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
// Initialize LoLA runtime with the service instance manifest
if (argc > 1)
{
const char* runtime_args[2u] = {"--service_instance_manifest", argv[1]};
score::mw::com::runtime::InitializeRuntime(2, runtime_args);
}
else
{
std::cerr << "Usage: subscriber <path/to/mw_com_config.json>\n";
return EXIT_FAILURE;
}
const auto instance_specifier_result =
score::mw::com::InstanceSpecifier::Create(std::string{"score/examples/MotorAngle"});
if (!instance_specifier_result.has_value())
{
std::cerr << "Invalid instance specifier\n";
return EXIT_FAILURE;
}
const auto& instance_specifier = instance_specifier_result.value();
// Find the service (wait until available)
std::cout << "[Subscriber] Looking for service...\n";
score::mw::com::ServiceHandleContainer<score::mw::com::HandleType> handles;
do
{
auto result = score::mw::com::MotorAngleProxy::FindService(instance_specifier);
if (result.has_value())
{
handles = std::move(result).value();
}
if (handles.empty())
{
std::this_thread::sleep_for(500ms);
}
} while (handles.empty() && g_running);
if (handles.empty())
{
return EXIT_SUCCESS;
}
std::cout << "[Subscriber] Service found. Connecting...\n";
auto proxy_result = score::mw::com::MotorAngleProxy::Create(std::move(handles.front()));
if (!proxy_result.has_value())
{
std::cerr << "[Subscriber] Failed to create proxy: " << proxy_result.error() << "\n";
return EXIT_FAILURE;
}
auto& proxy = proxy_result.value();
// Subscribe to the event
constexpr std::size_t kMaxSamples = 5U;
if (!proxy.motor_angle_.Subscribe(kMaxSamples).has_value())
{
std::cerr << "[Subscriber] Failed to subscribe\n";
return EXIT_FAILURE;
}
// Event-driven callback: triggered by middleware when new samples arrive
proxy.motor_angle_.SetReceiveHandler([&proxy]() noexcept {
constexpr std::size_t kBatch = 10U;
const auto result = proxy.motor_angle_.GetNewSamples(
[](score::mw::com::SamplePtr<score::mw::com::MotorAngle> sample) noexcept {
std::cout << "[Subscriber] Received motor angle [deg]: " << sample->angle_deg << "\n";
},
kBatch);
if (!result.has_value())
{
std::cerr << "[Subscriber] GetNewSamples failed: " << result.error() << "\n";
}
});
std::cout << "[Subscriber] Subscribed. Waiting for events...\n";
// Keep process alive; no polling of GetNewSamples in a timed loop
while (g_running)
{
std::this_thread::sleep_for(1s);
}
proxy.motor_angle_.UnsetReceiveHandler();
proxy.motor_angle_.Unsubscribe();
std::cout << "[Subscriber] Unsubscribed. Bye!\n";
return EXIT_SUCCESS;
}