zlm_socket_deadlock_fix#291
Conversation
…ithin an event callback.
…ithin an event callback.
There was a problem hiding this comment.
Pull request overview
Addresses ZLMediaKit issue #4732, in which calling Socket::send() from within a user-provided event callback (e.g. on_read, on_err) can deadlock because the callback already holds _mtx_event while send_l paths re-enter the same mutex / poller logic. The fix introduces a reentrancy counter that detects "we are currently inside an event callback" and, in that case, defers the send onto the poller via _poller->async.
Changes:
- Add
std::atomic<int> _event_reentrancymember toSocketand an RAIIEventReentrancyGuardthat bumps it for the duration of an event callback. - Wrap the user-callback invocations in
Socket::onReadandSocket::emitErrwith the guard, and inSocket::send_lreroute the send through_poller->asyncwhenever the counter is non-zero. - Add VS "Open Folder" build output and
CMakeSettings.jsonto.gitignore.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| src/Network/Socket.h | Adds the _event_reentrancy atomic counter member on Socket. |
| src/Network/Socket.cpp | Defines EventReentrancyGuard, instruments onRead/emitErr with it, and deflects send_l onto the poller when reentrant. |
| .gitignore | Ignores VS CMake out/build/x64-Debug outputs and CMakeSettings.json. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // 如果当前 Socket 实例正在事件回调中,延迟发送 | ||
| if (_event_reentrancy.load(std::memory_order_relaxed) > 0) { | ||
| weak_ptr<Socket> weak_self = shared_from_this(); | ||
| _poller->async([weak_self, buf, is_buf_sock, try_flush]() mutable { | ||
| auto self = weak_self.lock(); | ||
| if (self) { | ||
| self->send_l(std::move(buf), is_buf_sock, try_flush); | ||
| } | ||
| }); | ||
| return size; | ||
| } |
| // RAII guard for _event_reentrancy | ||
| class EventReentrancyGuard { | ||
| std::atomic<int> &_counter; | ||
|
|
||
| public: | ||
| explicit EventReentrancyGuard(std::atomic<int> &c) | ||
| : _counter(c) { | ||
| _counter.fetch_add(1, std::memory_order_relaxed); | ||
| } | ||
| ~EventReentrancyGuard() { _counter.fetch_sub(1, std::memory_order_relaxed); } | ||
| EventReentrancyGuard(const EventReentrancyGuard &) = delete; | ||
| EventReentrancyGuard &operator=(const EventReentrancyGuard &) = delete; | ||
| }; |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 3 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
src/Network/Socket.cpp:452
_in_event_callbackis read here withmemory_order_relaxedfrom a potentially different thread than the one that set it (the poller thread running the event callback). The check is intended to detect whether the current thread is inside an event callback (to avoid recursive locking of_mtx_event), but a true value seen here just means some thread is currently in an event callback — sends issued from unrelated threads will be unnecessarily deferred to the poller, while a same-thread re-entrant send across an unrelated cross-thread store may also race. Consider using a thread-local marker (orstd::this_thread::get_id()recorded under the guard) so the deferral path is only taken when the calling thread actually holds_mtx_event.
if (_in_event_callback.load(std::memory_order_relaxed)) {
if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
weak_ptr<Socket> weak_self = shared_from_this();
_poller->async([weak_self]() {
auto self = weak_self.lock();
if (self) {
self->flushPendingAsync();
}
});
}
return size;
}
src/Network/Socket.cpp:457
- When the deferred-flush branch is taken,
send_lreturnssizeto indicate success, but the underlyingflushAllhas not been attempted yet. If that asynchronous flush ultimately fails, only_pending_flush_errorand anemitErrwill signal it — the caller of thissend_lhas already been told the bytes were sent. Combined with the single-shot nature of_pending_flush_error, this can hide write failures that previously surfaced synchronously as a -1 return. Please document this behavior change (it alters the API contract ofsend/send_l) and ensure callers that rely on a synchronous error return are accounted for.
if (try_flush) {
// 如果在事件回调中,延迟 flush 避免死锁
if (_in_event_callback.load(std::memory_order_relaxed)) {
if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
weak_ptr<Socket> weak_self = shared_from_this();
_poller->async([weak_self]() {
auto self = weak_self.lock();
if (self) {
self->flushPendingAsync();
}
});
}
return size;
}
if (flushAll()) {
return -1;
}
}
|
|
||
| public: | ||
| explicit EventGuard(Socket *sock) | ||
| : _socket(sock) { | ||
| _socket->_in_event_callback.store(true, std::memory_order_relaxed); | ||
| } | ||
| ~EventGuard() { _socket->_in_event_callback.store(false, std::memory_order_relaxed); } |
| if (_pending_flush_error.exchange(false, std::memory_order_relaxed)) { | ||
| return -1; | ||
| } |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| _async_flush_scheduled.store(false, std::memory_order_relaxed); | ||
|
|
||
| if (flushAll()) { | ||
| _pending_flush_error.store(true, std::memory_order_relaxed); |
| self->flushPendingAsync(); | ||
| } | ||
| }); | ||
| } |
| class EventGuard { | ||
| Socket *_socket; | ||
|
|
||
| public: | ||
| explicit EventGuard(Socket *sock) | ||
| : _socket(sock) { | ||
| _socket->_in_event_callback.store(true, std::memory_order_relaxed); | ||
| } | ||
| ~EventGuard() { _socket->_in_event_callback.store(false, std::memory_order_relaxed); } | ||
| EventGuard(const EventGuard &) = delete; | ||
| EventGuard &operator=(const EventGuard &) = delete; | ||
| }; |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 3 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (3)
src/Network/Socket.cpp:501
closeSock()unconditionally resets_in_event_callbackto 0, butcloseSock(false)is invoked from inside theemitErrlambda (Socket.cpp:385) while anEventGuardis still in scope on the stack (line 376). When thatEventGuardlater destructs, it willfetch_sub(1)on the already-zeroed counter, leaving it at-1. Similarly, a user callback can callshutdown/closeSockfrom insideonRead/onAccept, causing the same underflow. The counter then stays negative and the> 0check on line 441 will never trigger again for this socket, defeating the deadlock-avoidance logic.
_in_event_callback should not be reset in closeSock; the EventGuard destructor is solely responsible for maintaining the count. The same concern applies to resetting _async_flush_scheduled here — if closeSock(false) runs between scheduling and execution of flushPendingAsync, the flag is cleared but the queued task is still pending, which can then race with a new send that also schedules another task.
_pending_flush_error.store(false, std::memory_order_relaxed);
_async_flush_scheduled.store(false, std::memory_order_relaxed);
_in_event_callback.store(0, std::memory_order_relaxed);
src/Network/Socket.cpp:682
EventGuardis added around_on_before_accept(line 682) but not around the second_mtx_event-protected user callback in this same function —_on_acceptat line 729 — nor around_on_flushinvoked inonFlushed()(line 491). Since the stated goal is to mark all event-callback regions so that nestedsend()calls can detect re-entry and defer flushing, omitting these sites leaves the original deadlock window open whenever the user callssend()from withinonAcceptoronFlush. Please addEventGuard guard(this);(foronFlushed) and a matching guard around the second try block inonAcceptfor consistency.
EventGuard guard(this);
src/Network/Socket.cpp:452
- The deferred-flush path returns
sizeimmediately after enqueuing the buffer, butflushAll()may also be reached through other code paths (e.g. when the socket becomes writable). More importantly, the check_in_event_callback > 0is local to thisSocket. The most common usage is callingpeer->send(...)from insidethis'sonReadcallback (e.g. a TCP proxy), in which casepeer->_in_event_callbackis 0 and the deferral never triggers — yet the original deadlock report on the same callback chain would still occur becausepeer's flush path runs underthis's_mtx_event. If that scenario is intentionally out of scope please document it; otherwise the fix only addresses the narrower self-send case.
// 如果在事件回调中,延迟 flush 避免死锁
if (_in_event_callback.load(std::memory_order_relaxed) > 0) {
if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
weak_ptr<Socket> weak_self = shared_from_this();
_poller->async([weak_self]() {
auto self = weak_self.lock();
if (self) {
self->flushPendingAsync();
}
});
}
return size;
}
| if (_pending_flush_error.load(std::memory_order_relaxed)) { | ||
| return -1; | ||
| } |
| return 0; | ||
| } | ||
|
|
||
| if (_pending_flush_error.load(std::memory_order_relaxed)) { |
| _async_flush_scheduled.store(false, std::memory_order_relaxed); | ||
|
|
||
| if (flushAll()) { | ||
| _pending_flush_error.store(true, std::memory_order_relaxed); | ||
| emitErr(SockException(Err_other, "flush failed in async")); |
| if (_in_event_callback.load(std::memory_order_relaxed) > 0) { | ||
| if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) { | ||
| weak_ptr<Socket> weak_self = shared_from_this(); | ||
| _poller->async([weak_self]() { | ||
| auto self = weak_self.lock(); | ||
| if (self) { | ||
| self->flushPendingAsync(); | ||
| } | ||
| }); | ||
| } | ||
| return size; | ||
| } |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| try { | ||
| EventGuard guard(this); |
|
|
||
| if (try_flush) { | ||
| // 如果在事件回调中,延迟 flush 避免死锁 | ||
| if ((tl_event_socket == this) && (_in_event_callback.load(std::memory_order_relaxed) > 0)) { |
| _async_flush_scheduled.store(false, std::memory_order_relaxed); | ||
|
|
||
| if (flushAll()) { | ||
| _pending_flush_error.store(true, std::memory_order_relaxed); | ||
| emitErr(SockException(Err_other, "flush failed in async")); | ||
| } |
| if (_pending_flush_error.load(std::memory_order_relaxed)) { | ||
| return -1; | ||
| } |
| @@ -866,6 +872,19 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable, | |||
| //Connection cache address, to prevent TCP reset from causing the inability to obtain the peer's address | |||
| struct sockaddr_storage _local_addr; | |||
| struct sockaddr_storage _peer_addr; | |||
|
|
|||
| private: | |||
| class EventGuard { | |||
| private: | |||
| Socket *_socket; | |||
| Socket *_prev; | |||
|
|
|||
| public: | |||
| explicit EventGuard(Socket *sock); | |||
| ~EventGuard(); | |||
| EventGuard(const EventGuard &) = delete; | |||
| EventGuard &operator=(const EventGuard &) = delete; | |||
| }; | |||
ZLMediaKit/ZLMediaKit#4732