33
44#include " simple_socket/TCPSocket.hpp"
55#include " simple_socket/mqtt/mqtt_common.hpp"
6+ #include " simple_socket/ws/WebSocket.hpp"
67
78#include < iostream>
89#include < mutex>
910#include < thread>
1011#include < unordered_map>
1112#include < unordered_set>
1213#include < vector>
14+ #include < queue>
1315
1416
1517using namespace simple_socket ;
@@ -18,16 +20,19 @@ using namespace simple_socket;
1820struct MQTTBroker ::Impl {
1921
2022 explicit Impl (int port)
21- : server_(port), stop_(false ) {}
23+ : server_(port), ws_(port + 1 ), stop_(false ) {}
2224
2325 void start () {
2426 listener_ = std::thread ([this ] { acceptLoop (); });
27+ wsListener_ = std::thread ([this ] { wsAcceptLoop (); });
2528 }
2629
2730 void stop () {
2831 stop_ = true ;
2932 server_.close ();
33+ ws_.stop ();
3034 if (listener_.joinable ()) listener_.join ();
35+ if (wsListener_.joinable ()) wsListener_.join ();
3136 }
3237
3338private:
@@ -38,8 +43,10 @@ struct MQTTBroker::Impl {
3843 };
3944
4045 TCPServer server_;
46+ WebSocket ws_;
4147 std::atomic<bool > stop_;
4248 std::thread listener_;
49+ std::thread wsListener_;
4350
4451 std::mutex subsMutex_;
4552 std::unordered_map<std::string, std::vector<Client*>> subscribers_;
@@ -64,6 +71,88 @@ struct MQTTBroker::Impl {
6471 }
6572 }
6673
74+ void wsAcceptLoop () {
75+
76+ struct WsWrapper : SimpleConnection {
77+
78+ WebSocketConnection* connection;
79+
80+ explicit WsWrapper (WebSocketConnection* c): connection(c) {}
81+
82+ int read (uint8_t * buffer, size_t size) override {
83+ std::unique_lock lock (m_);
84+ cv_.wait (lock, [&]{ return closed_ || !queue_.empty (); });
85+ if (queue_.empty ()) return 0 ; // closed and no data
86+
87+ std::string msg = std::move (queue_.front ());
88+ queue_.pop_front ();
89+
90+ size_t toCopy = std::min (size, msg.size ());
91+ std::memcpy (buffer, msg.data (), toCopy);
92+
93+ if (toCopy < msg.size ()) {
94+ // put the remainder back to the front so next read continues it
95+ queue_.push_front (msg.substr (toCopy));
96+ }
97+
98+ return static_cast <int >(toCopy);
99+ }
100+
101+ bool write (const uint8_t * data, size_t size) override {
102+ return connection->send (data, size);
103+ }
104+
105+ void close () override {
106+ {
107+ std::lock_guard lock (m_);
108+ closed_ = true ;
109+ }
110+ cv_.notify_all ();
111+ }
112+
113+ void push_back (const std::string& msg) {
114+ {
115+ std::lock_guard lock (m_);
116+ queue_.push_back (msg);
117+ }
118+ cv_.notify_one ();
119+ }
120+
121+ private:
122+ bool closed_{false };
123+ std::deque<std::string> queue_;
124+ std::mutex m_;
125+ std::condition_variable cv_;
126+ };
127+
128+ std::unordered_map<WebSocketConnection*, WsWrapper*> connections;
129+
130+ ws_.onOpen = [this , &connections](WebSocketConnection* conn) {
131+ std::cout << " MQTTBroker: new WebSocket connection" << std::endl;
132+ auto client = std::make_unique<Client>();
133+ Client* clientPtr = client.get ();
134+ clients_.push_back (clientPtr);
135+ auto wrapper = std::make_unique<WsWrapper>(conn);
136+ connections[conn] = wrapper.get ();
137+ client->conn = std::move (wrapper);
138+
139+ std::thread (&Impl::handleClient, this , std::move (client)).detach ();
140+ };
141+ ws_.onMessage = [&connections](WebSocketConnection* conn, const std::string& msg) {
142+ std::cout << " MQTTBroker: new message on WebSocket connection: " << msg << std::endl;
143+ connections[conn]->push_back (msg);
144+ };
145+ ws_.onClose = [](WebSocketConnection* conn) {
146+ std::cout << " MQTTBroker: WebSocket connection closed" << std::endl;
147+ };
148+ ws_.start ();
149+
150+ while (!stop_) {
151+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
152+ }
153+ ws_.stop ();
154+ }
155+
67156 void handleClient (std::unique_ptr<Client> c) {
68157 try {
69158 // CONNECT
0 commit comments