Skip to content

zlm_socket_deadlock_fix#291

Open
PioLing wants to merge 11 commits into
ZLMediaKit:masterfrom
PioLing:coding
Open

zlm_socket_deadlock_fix#291
PioLing wants to merge 11 commits into
ZLMediaKit:masterfrom
PioLing:coding

Conversation

@PioLing
Copy link
Copy Markdown
Contributor

@PioLing PioLing commented May 16, 2026

Copilot AI review requested due to automatic review settings May 16, 2026 07:49
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Addresses ZLMediaKit issue #4732, in which calling Socket::send() from within a user-provided event callback (e.g. on_read, on_err) can deadlock because the callback already holds _mtx_event while send_l paths re-enter the same mutex / poller logic. The fix introduces a reentrancy counter that detects "we are currently inside an event callback" and, in that case, defers the send onto the poller via _poller->async.

Changes:

  • Add std::atomic<int> _event_reentrancy member to Socket and an RAII EventReentrancyGuard that bumps it for the duration of an event callback.
  • Wrap the user-callback invocations in Socket::onRead and Socket::emitErr with the guard, and in Socket::send_l reroute the send through _poller->async whenever the counter is non-zero.
  • Add VS "Open Folder" build output and CMakeSettings.json to .gitignore.

Reviewed changes

Copilot reviewed 2 out of 3 changed files in this pull request and generated 2 comments.

File Description
src/Network/Socket.h Adds the _event_reentrancy atomic counter member on Socket.
src/Network/Socket.cpp Defines EventReentrancyGuard, instruments onRead/emitErr with it, and deflects send_l onto the poller when reentrant.
.gitignore Ignores VS CMake out/build/x64-Debug outputs and CMakeSettings.json.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/Network/Socket.cpp Outdated
Comment on lines +444 to +454
// 如果当前 Socket 实例正在事件回调中,延迟发送
if (_event_reentrancy.load(std::memory_order_relaxed) > 0) {
weak_ptr<Socket> weak_self = shared_from_this();
_poller->async([weak_self, buf, is_buf_sock, try_flush]() mutable {
auto self = weak_self.lock();
if (self) {
self->send_l(std::move(buf), is_buf_sock, try_flush);
}
});
return size;
}
Comment thread src/Network/Socket.cpp Outdated
Comment on lines +28 to +40
// RAII guard for _event_reentrancy
class EventReentrancyGuard {
std::atomic<int> &_counter;

public:
explicit EventReentrancyGuard(std::atomic<int> &c)
: _counter(c) {
_counter.fetch_add(1, std::memory_order_relaxed);
}
~EventReentrancyGuard() { _counter.fetch_sub(1, std::memory_order_relaxed); }
EventReentrancyGuard(const EventReentrancyGuard &) = delete;
EventReentrancyGuard &operator=(const EventReentrancyGuard &) = delete;
};
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 3 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (2)

src/Network/Socket.cpp:452

  • _in_event_callback is read here with memory_order_relaxed from a potentially different thread than the one that set it (the poller thread running the event callback). The check is intended to detect whether the current thread is inside an event callback (to avoid recursive locking of _mtx_event), but a true value seen here just means some thread is currently in an event callback — sends issued from unrelated threads will be unnecessarily deferred to the poller, while a same-thread re-entrant send across an unrelated cross-thread store may also race. Consider using a thread-local marker (or std::this_thread::get_id() recorded under the guard) so the deferral path is only taken when the calling thread actually holds _mtx_event.
        if (_in_event_callback.load(std::memory_order_relaxed)) {
            if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
                weak_ptr<Socket> weak_self = shared_from_this();
                _poller->async([weak_self]() {
                    auto self = weak_self.lock();
                    if (self) {
                        self->flushPendingAsync();
                    }
                });
            }
            return size;
        }

src/Network/Socket.cpp:457

  • When the deferred-flush branch is taken, send_l returns size to indicate success, but the underlying flushAll has not been attempted yet. If that asynchronous flush ultimately fails, only _pending_flush_error and an emitErr will signal it — the caller of this send_l has already been told the bytes were sent. Combined with the single-shot nature of _pending_flush_error, this can hide write failures that previously surfaced synchronously as a -1 return. Please document this behavior change (it alters the API contract of send/send_l) and ensure callers that rely on a synchronous error return are accounted for.
    if (try_flush) {
        // 如果在事件回调中,延迟 flush 避免死锁
        if (_in_event_callback.load(std::memory_order_relaxed)) {
            if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
                weak_ptr<Socket> weak_self = shared_from_this();
                _poller->async([weak_self]() {
                    auto self = weak_self.lock();
                    if (self) {
                        self->flushPendingAsync();
                    }
                });
            }
            return size;
        }
        
        if (flushAll()) {
            return -1;
        }
    }

Comment thread src/Network/Socket.cpp Outdated
Comment thread src/Network/Socket.h Outdated
Comment on lines +879 to +885

public:
explicit EventGuard(Socket *sock)
: _socket(sock) {
_socket->_in_event_callback.store(true, std::memory_order_relaxed);
}
~EventGuard() { _socket->_in_event_callback.store(false, std::memory_order_relaxed); }
Comment thread src/Network/Socket.cpp Outdated
Comment on lines +430 to +432
if (_pending_flush_error.exchange(false, std::memory_order_relaxed)) {
return -1;
}
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 3 changed files in this pull request and generated 4 comments.

Comment thread src/Network/Socket.cpp
_async_flush_scheduled.store(false, std::memory_order_relaxed);

if (flushAll()) {
_pending_flush_error.store(true, std::memory_order_relaxed);
Comment thread src/Network/Socket.cpp
self->flushPendingAsync();
}
});
}
Comment thread src/Network/Socket.h
Comment on lines +877 to +888
class EventGuard {
Socket *_socket;

public:
explicit EventGuard(Socket *sock)
: _socket(sock) {
_socket->_in_event_callback.store(true, std::memory_order_relaxed);
}
~EventGuard() { _socket->_in_event_callback.store(false, std::memory_order_relaxed); }
EventGuard(const EventGuard &) = delete;
EventGuard &operator=(const EventGuard &) = delete;
};
Comment thread src/Network/Socket.cpp Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 3 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (3)

src/Network/Socket.cpp:501

  • closeSock() unconditionally resets _in_event_callback to 0, but closeSock(false) is invoked from inside the emitErr lambda (Socket.cpp:385) while an EventGuard is still in scope on the stack (line 376). When that EventGuard later destructs, it will fetch_sub(1) on the already-zeroed counter, leaving it at -1. Similarly, a user callback can call shutdown/closeSock from inside onRead/onAccept, causing the same underflow. The counter then stays negative and the > 0 check on line 441 will never trigger again for this socket, defeating the deadlock-avoidance logic.

_in_event_callback should not be reset in closeSock; the EventGuard destructor is solely responsible for maintaining the count. The same concern applies to resetting _async_flush_scheduled here — if closeSock(false) runs between scheduling and execution of flushPendingAsync, the flag is cleared but the queued task is still pending, which can then race with a new send that also schedules another task.

    _pending_flush_error.store(false, std::memory_order_relaxed);
    _async_flush_scheduled.store(false, std::memory_order_relaxed);
    _in_event_callback.store(0, std::memory_order_relaxed);

src/Network/Socket.cpp:682

  • EventGuard is added around _on_before_accept (line 682) but not around the second _mtx_event-protected user callback in this same function — _on_accept at line 729 — nor around _on_flush invoked in onFlushed() (line 491). Since the stated goal is to mark all event-callback regions so that nested send() calls can detect re-entry and defer flushing, omitting these sites leaves the original deadlock window open whenever the user calls send() from within onAccept or onFlush. Please add EventGuard guard(this); (for onFlushed) and a matching guard around the second try block in onAccept for consistency.
                EventGuard guard(this);

src/Network/Socket.cpp:452

  • The deferred-flush path returns size immediately after enqueuing the buffer, but flushAll() may also be reached through other code paths (e.g. when the socket becomes writable). More importantly, the check _in_event_callback > 0 is local to this Socket. The most common usage is calling peer->send(...) from inside this's onRead callback (e.g. a TCP proxy), in which case peer->_in_event_callback is 0 and the deferral never triggers — yet the original deadlock report on the same callback chain would still occur because peer's flush path runs under this's _mtx_event. If that scenario is intentionally out of scope please document it; otherwise the fix only addresses the narrower self-send case.
        // 如果在事件回调中,延迟 flush 避免死锁
        if (_in_event_callback.load(std::memory_order_relaxed) > 0) {
            if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
                weak_ptr<Socket> weak_self = shared_from_this();
                _poller->async([weak_self]() {
                    auto self = weak_self.lock();
                    if (self) {
                        self->flushPendingAsync();
                    }
                });
            }
            return size;
        }

Comment thread src/Network/Socket.cpp
Comment on lines +430 to +432
if (_pending_flush_error.load(std::memory_order_relaxed)) {
return -1;
}
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 3 changed files in this pull request and generated 4 comments.

Comment thread src/Network/Socket.cpp Outdated
Comment thread src/Network/Socket.cpp
return 0;
}

if (_pending_flush_error.load(std::memory_order_relaxed)) {
Comment thread src/Network/Socket.cpp
Comment on lines +604 to +608
_async_flush_scheduled.store(false, std::memory_order_relaxed);

if (flushAll()) {
_pending_flush_error.store(true, std::memory_order_relaxed);
emitErr(SockException(Err_other, "flush failed in async"));
Comment thread src/Network/Socket.cpp Outdated
Comment on lines +441 to +452
if (_in_event_callback.load(std::memory_order_relaxed) > 0) {
if (!_async_flush_scheduled.exchange(true, std::memory_order_relaxed)) {
weak_ptr<Socket> weak_self = shared_from_this();
_poller->async([weak_self]() {
auto self = weak_self.lock();
if (self) {
self->flushPendingAsync();
}
});
}
return size;
}
PioLing and others added 2 commits May 16, 2026 22:51
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 3 changed files in this pull request and generated 5 comments.

Comment thread src/Network/Socket.cpp
Comment on lines 694 to +695
try {
EventGuard guard(this);
Comment thread src/Network/Socket.cpp

if (try_flush) {
// 如果在事件回调中,延迟 flush 避免死锁
if ((tl_event_socket == this) && (_in_event_callback.load(std::memory_order_relaxed) > 0)) {
Comment thread src/Network/Socket.cpp
Comment on lines +617 to +622
_async_flush_scheduled.store(false, std::memory_order_relaxed);

if (flushAll()) {
_pending_flush_error.store(true, std::memory_order_relaxed);
emitErr(SockException(Err_other, "flush failed in async"));
}
Comment thread src/Network/Socket.cpp
Comment on lines +444 to +446
if (_pending_flush_error.load(std::memory_order_relaxed)) {
return -1;
}
Comment thread src/Network/Socket.h
Comment on lines 821 to +887
@@ -866,6 +872,19 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
//Connection cache address, to prevent TCP reset from causing the inability to obtain the peer's address
struct sockaddr_storage _local_addr;
struct sockaddr_storage _peer_addr;

private:
class EventGuard {
private:
Socket *_socket;
Socket *_prev;

public:
explicit EventGuard(Socket *sock);
~EventGuard();
EventGuard(const EventGuard &) = delete;
EventGuard &operator=(const EventGuard &) = delete;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants