-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSocketPool.h
More file actions
86 lines (66 loc) · 1.89 KB
/
SocketPool.h
File metadata and controls
86 lines (66 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#ifndef CHANNELING_SOCKETPOOL_H_
#define CHANNELING_SOCKETPOOL_H_
namespace channeling {
namespace detail {
template <size_t size>
class socket_pool_instance_t
: public std::enable_shared_from_this<socket_pool_instance_t<size>> {
public:
class socket_handle {
friend class socket_pool_instance_t;
std::shared_ptr<socket_pool_instance_t> pool;
int socknum;
socket_handle(std::shared_ptr<socket_pool_instance_t>&& pool, int socknum)
: pool{std::move(pool)}, socknum{socknum} {}
public:
~socket_handle() {
pool->put(socknum);
}
inline zmq::socket_t& socket() { return pool->sockets[socknum]; }
};
socket_pool_instance_t(std::string_view address) {
for (size_t i = 0; i < size; ++i) {
sockets[i] = zmq::socket_t(client_ctx, ZMQ_REQ);
sockets[i].connect(address.data());
free_sockets.push_back(i);
}
}
socket_handle get() {
std::unique_lock lk(mtx);
cv.wait(lk, [&]{return not free_sockets.empty();});
int socknum = std::move(free_sockets.front());
free_sockets.pop_front();
lk.unlock();
cv.notify_one();
return socket_handle{this->shared_from_this(), socknum};
}
private:
void put(int socknum) {
std::unique_lock lk(mtx);
free_sockets.push_back(socknum);
lk.unlock();
cv.notify_one();
}
private:
std::array<zmq::socket_t, size> sockets;
std::deque<int> free_sockets;
std::mutex mtx;
std::condition_variable cv;
friend class socket_handle;
};
}
template <size_t size>
class SocketPool {
std::shared_ptr<detail::socket_pool_instance_t<size>> pool;
public:
using SocketHandle =
typename detail::socket_pool_instance_t<size>::socket_handle;
SocketPool(std::string_view address)
: pool{std::make_shared<detail::socket_pool_instance_t<size>>(address)}
{}
inline SocketHandle get() {
return pool->get();
}
};
}
#endif // CHANNELING_SOCKETPOOL_H_