diff --git a/engine/iouring/consts.go b/engine/iouring/consts.go index 7c055f9..09693fe 100644 --- a/engine/iouring/consts.go +++ b/engine/iouring/consts.go @@ -52,12 +52,6 @@ const ( acceptMultishot = 1 << 0 ) -// Recv flags. -const ( - recvMultishot = 1 << 1 - msgPeek = 0x2 -) - // Mmap offsets for ring buffers. const ( offSQRing = 0 diff --git a/engine/iouring/cqe.go b/engine/iouring/cqe.go index d453aaf..0cc1cb9 100644 --- a/engine/iouring/cqe.go +++ b/engine/iouring/cqe.go @@ -15,7 +15,6 @@ const ( udRecv uint64 = 0x02 << 56 udSend uint64 = 0x03 << 56 udClose uint64 = 0x04 << 56 - udPeek uint64 = 0x05 << 56 udProvide uint64 = 0x06 << 56 udMask uint64 = 0xFF << 56 fdMask uint64 = (1 << 56) - 1 diff --git a/engine/iouring/sqe.go b/engine/iouring/sqe.go index c289ddd..879597b 100644 --- a/engine/iouring/sqe.go +++ b/engine/iouring/sqe.go @@ -63,17 +63,6 @@ func prepProvideBuffers(sqePtr unsafe.Pointer, addr unsafe.Pointer, bufLen int, *(*uint16)(unsafe.Pointer(&sqe[28])) = bufID } -func prepRecvPeek(sqePtr unsafe.Pointer, fd int, buf []byte) { - sqe := (*[sqeSize]byte)(sqePtr) - sqe[0] = opRECV - *(*int32)(unsafe.Pointer(&sqe[4])) = int32(fd) - if len(buf) > 0 { - *(*uint64)(unsafe.Pointer(&sqe[16])) = uint64(uintptr(unsafe.Pointer(&buf[0]))) - *(*uint32)(unsafe.Pointer(&sqe[24])) = uint32(len(buf)) - } - *(*uint32)(unsafe.Pointer(&sqe[28])) = msgPeek // msg_flags at offset 28 -} - func setSQEUserData(sqePtr unsafe.Pointer, data uint64) { *(*uint64)(unsafe.Pointer(uintptr(sqePtr) + 32)) = data } diff --git a/engine/iouring/worker.go b/engine/iouring/worker.go index 7d8db2d..953aa7f 100644 --- a/engine/iouring/worker.go +++ b/engine/iouring/worker.go @@ -164,8 +164,6 @@ func (w *Worker) processCQE(ctx context.Context, c *completionEntry) { w.handleSend(c, fd) case udClose: w.handleClose(fd) - case udPeek: - w.handlePeek(c, fd) case udProvide: // Buffer provide completion, no action needed } @@ -196,57 +194,36 @@ func (w *Worker) handleAccept(ctx context.Context, c *completionEntry, listenFD w.conns[newFD] = cs w.activeConns.Add(1) - if w.cfg.Protocol == engine.Auto { - sqe := w.ring.GetSQE() - if sqe != nil { - prepRecvPeek(sqe, newFD, cs.buf[:detect.MinPeekBytes]) - setSQEUserData(sqe, encodeUserData(udPeek, newFD)) - } - } else { + if w.cfg.Protocol != engine.Auto { cs.protocol = w.cfg.Protocol cs.detected = true w.initProtocol(cs) - w.tier.PrepareRecv(w.ring, newFD, cs.buf) } + // For Auto mode, cs.detected is false; the first handleRecv will + // detect the protocol from the received data before processing it. + w.tier.PrepareRecv(w.ring, newFD, cs.buf) if !cqeHasMore(c.Flags) && !w.tier.SupportsMultishotAccept() { w.tier.PrepareAccept(w.ring, listenFD) } } -func (w *Worker) handlePeek(c *completionEntry, fd int) { - cs, ok := w.conns[fd] - if !ok { - return - } - - if c.Res < int32(detect.MinPeekBytes) { - sqe := w.ring.GetSQE() - if sqe != nil { - prepRecvPeek(sqe, fd, cs.buf[:24]) - setSQEUserData(sqe, encodeUserData(udPeek, fd)) - } - return - } - - proto, err := detect.Detect(cs.buf[:c.Res]) - if err == detect.ErrInsufficientData { - sqe := w.ring.GetSQE() - if sqe != nil { - prepRecvPeek(sqe, fd, cs.buf[:24]) - setSQEUserData(sqe, encodeUserData(udPeek, fd)) - } - return - } +// detectProtocol performs protocol detection on the first received bytes. +// Returns true if detection succeeded and the data should be processed. +func (w *Worker) detectProtocol(cs *connState, data []byte) bool { + proto, err := detect.Detect(data) if err != nil { - w.closeConn(fd) - return + if err == detect.ErrInsufficientData { + // Need more data — re-arm recv. The data is already in cs.buf + // so we don't lose it; the next recv appends after it. + return false + } + return false } - cs.protocol = proto cs.detected = true w.initProtocol(cs) - w.tier.PrepareRecv(w.ring, fd, cs.buf) + return true } func (w *Worker) initProtocol(cs *connState) { @@ -285,6 +262,15 @@ func (w *Worker) handleRecv(c *completionEntry, fd int) { } } + // Auto protocol detection on first recv (no MSG_PEEK needed). + if !cs.detected { + if !w.detectProtocol(cs, data) { + // Need more data or unknown protocol — re-arm recv. + w.tier.PrepareRecv(w.ring, fd, cs.buf) + return + } + } + writeFn := w.makeWriteFn(fd) var processErr error diff --git a/protocol/h2/stream/flowcontrol.go b/protocol/h2/stream/flowcontrol.go index 93a4300..53d0a53 100644 --- a/protocol/h2/stream/flowcontrol.go +++ b/protocol/h2/stream/flowcontrol.go @@ -4,7 +4,7 @@ import ( "sync/atomic" ) -const windowUpdateThreshold = 16384 +const windowUpdateThreshold = 1024 // ConsumeSendWindow decrements connection and stream windows after sending DATA. func (m *Manager) ConsumeSendWindow(streamID uint32, n int32) { @@ -51,39 +51,24 @@ func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32) { // Returns true if updates were sent. func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool { m.windowUpdateMu.Lock() + defer m.windowUpdateMu.Unlock() - connUpdate := m.pendingConnWindowUpdate - needsConnFlush := force || connUpdate >= windowUpdateThreshold - - streamUpdates := make(map[uint32]uint32) - for sid, pending := range m.pendingStreamUpdates { - if pending != nil && *pending > 0 { - if force || *pending >= windowUpdateThreshold { - streamUpdates[sid] = *pending - } - } - } + flushed := false - if needsConnFlush { + connUpdate := m.pendingConnWindowUpdate + if connUpdate > 0 && (force || connUpdate >= windowUpdateThreshold) { m.pendingConnWindowUpdate = 0 - } - for sid := range streamUpdates { - if m.pendingStreamUpdates[sid] != nil { - *m.pendingStreamUpdates[sid] = 0 - } - } - - m.windowUpdateMu.Unlock() - - flushed := false - if needsConnFlush && connUpdate > 0 { _ = writer.WriteWindowUpdate(0, connUpdate) flushed = true } - for sid, increment := range streamUpdates { - _ = writer.WriteWindowUpdate(sid, increment) - flushed = true + for sid, pending := range m.pendingStreamUpdates { + if pending != nil && *pending > 0 && (force || *pending >= windowUpdateThreshold) { + val := *pending + *pending = 0 + _ = writer.WriteWindowUpdate(sid, val) + flushed = true + } } return flushed diff --git a/protocol/h2/stream/manager.go b/protocol/h2/stream/manager.go index 6a609b2..56f212b 100644 --- a/protocol/h2/stream/manager.go +++ b/protocol/h2/stream/manager.go @@ -102,6 +102,10 @@ func (m *Manager) DeleteStream(id uint32) { } delete(m.streams, id) m.priorityTree.RemoveStream(id) + + m.windowUpdateMu.Lock() + delete(m.pendingStreamUpdates, id) + m.windowUpdateMu.Unlock() } // StreamCount returns the number of streams in the manager.