Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions engine/iouring/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ const (
acceptMultishot = 1 << 0
)

// Recv flags.
const (
recvMultishot = 1 << 1
msgPeek = 0x2
)

// Mmap offsets for ring buffers.
const (
offSQRing = 0
Expand Down
1 change: 0 additions & 1 deletion engine/iouring/cqe.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const (
udRecv uint64 = 0x02 << 56
udSend uint64 = 0x03 << 56
udClose uint64 = 0x04 << 56
udPeek uint64 = 0x05 << 56
udProvide uint64 = 0x06 << 56
udMask uint64 = 0xFF << 56
fdMask uint64 = (1 << 56) - 1
Expand Down
11 changes: 0 additions & 11 deletions engine/iouring/sqe.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,6 @@ func prepProvideBuffers(sqePtr unsafe.Pointer, addr unsafe.Pointer, bufLen int,
*(*uint16)(unsafe.Pointer(&sqe[28])) = bufID
}

func prepRecvPeek(sqePtr unsafe.Pointer, fd int, buf []byte) {
sqe := (*[sqeSize]byte)(sqePtr)
sqe[0] = opRECV
*(*int32)(unsafe.Pointer(&sqe[4])) = int32(fd)
if len(buf) > 0 {
*(*uint64)(unsafe.Pointer(&sqe[16])) = uint64(uintptr(unsafe.Pointer(&buf[0])))
*(*uint32)(unsafe.Pointer(&sqe[24])) = uint32(len(buf))
}
*(*uint32)(unsafe.Pointer(&sqe[28])) = msgPeek // msg_flags at offset 28
}

func setSQEUserData(sqePtr unsafe.Pointer, data uint64) {
*(*uint64)(unsafe.Pointer(uintptr(sqePtr) + 32)) = data
}
62 changes: 24 additions & 38 deletions engine/iouring/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ func (w *Worker) processCQE(ctx context.Context, c *completionEntry) {
w.handleSend(c, fd)
case udClose:
w.handleClose(fd)
case udPeek:
w.handlePeek(c, fd)
case udProvide:
// Buffer provide completion, no action needed
}
Expand Down Expand Up @@ -196,57 +194,36 @@ func (w *Worker) handleAccept(ctx context.Context, c *completionEntry, listenFD
w.conns[newFD] = cs
w.activeConns.Add(1)

if w.cfg.Protocol == engine.Auto {
sqe := w.ring.GetSQE()
if sqe != nil {
prepRecvPeek(sqe, newFD, cs.buf[:detect.MinPeekBytes])
setSQEUserData(sqe, encodeUserData(udPeek, newFD))
}
} else {
if w.cfg.Protocol != engine.Auto {
cs.protocol = w.cfg.Protocol
cs.detected = true
w.initProtocol(cs)
w.tier.PrepareRecv(w.ring, newFD, cs.buf)
}
// For Auto mode, cs.detected is false; the first handleRecv will
// detect the protocol from the received data before processing it.
w.tier.PrepareRecv(w.ring, newFD, cs.buf)

if !cqeHasMore(c.Flags) && !w.tier.SupportsMultishotAccept() {
w.tier.PrepareAccept(w.ring, listenFD)
}
}

func (w *Worker) handlePeek(c *completionEntry, fd int) {
cs, ok := w.conns[fd]
if !ok {
return
}

if c.Res < int32(detect.MinPeekBytes) {
sqe := w.ring.GetSQE()
if sqe != nil {
prepRecvPeek(sqe, fd, cs.buf[:24])
setSQEUserData(sqe, encodeUserData(udPeek, fd))
}
return
}

proto, err := detect.Detect(cs.buf[:c.Res])
if err == detect.ErrInsufficientData {
sqe := w.ring.GetSQE()
if sqe != nil {
prepRecvPeek(sqe, fd, cs.buf[:24])
setSQEUserData(sqe, encodeUserData(udPeek, fd))
}
return
}
// detectProtocol performs protocol detection on the first received bytes.
// Returns true if detection succeeded and the data should be processed.
func (w *Worker) detectProtocol(cs *connState, data []byte) bool {
proto, err := detect.Detect(data)
if err != nil {
w.closeConn(fd)
return
if err == detect.ErrInsufficientData {
// Need more data — re-arm recv. The data is already in cs.buf
// so we don't lose it; the next recv appends after it.
return false
}
return false
}

cs.protocol = proto
cs.detected = true
w.initProtocol(cs)
w.tier.PrepareRecv(w.ring, fd, cs.buf)
return true
}

func (w *Worker) initProtocol(cs *connState) {
Expand Down Expand Up @@ -285,6 +262,15 @@ func (w *Worker) handleRecv(c *completionEntry, fd int) {
}
}

// Auto protocol detection on first recv (no MSG_PEEK needed).
if !cs.detected {
if !w.detectProtocol(cs, data) {
// Need more data or unknown protocol — re-arm recv.
w.tier.PrepareRecv(w.ring, fd, cs.buf)
return
}
}

writeFn := w.makeWriteFn(fd)

var processErr error
Expand Down
39 changes: 12 additions & 27 deletions protocol/h2/stream/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"sync/atomic"
)

const windowUpdateThreshold = 16384
const windowUpdateThreshold = 1024

// ConsumeSendWindow decrements connection and stream windows after sending DATA.
func (m *Manager) ConsumeSendWindow(streamID uint32, n int32) {
Expand Down Expand Up @@ -51,39 +51,24 @@ func (m *Manager) AccumulateWindowUpdate(streamID uint32, increment uint32) {
// Returns true if updates were sent.
func (m *Manager) FlushWindowUpdates(writer FrameWriter, force bool) bool {
m.windowUpdateMu.Lock()
defer m.windowUpdateMu.Unlock()

connUpdate := m.pendingConnWindowUpdate
needsConnFlush := force || connUpdate >= windowUpdateThreshold

streamUpdates := make(map[uint32]uint32)
for sid, pending := range m.pendingStreamUpdates {
if pending != nil && *pending > 0 {
if force || *pending >= windowUpdateThreshold {
streamUpdates[sid] = *pending
}
}
}
flushed := false

if needsConnFlush {
connUpdate := m.pendingConnWindowUpdate
if connUpdate > 0 && (force || connUpdate >= windowUpdateThreshold) {
m.pendingConnWindowUpdate = 0
}
for sid := range streamUpdates {
if m.pendingStreamUpdates[sid] != nil {
*m.pendingStreamUpdates[sid] = 0
}
}

m.windowUpdateMu.Unlock()

flushed := false
if needsConnFlush && connUpdate > 0 {
_ = writer.WriteWindowUpdate(0, connUpdate)
flushed = true
}

for sid, increment := range streamUpdates {
_ = writer.WriteWindowUpdate(sid, increment)
flushed = true
for sid, pending := range m.pendingStreamUpdates {
if pending != nil && *pending > 0 && (force || *pending >= windowUpdateThreshold) {
val := *pending
*pending = 0
_ = writer.WriteWindowUpdate(sid, val)
flushed = true
}
}

return flushed
Expand Down
4 changes: 4 additions & 0 deletions protocol/h2/stream/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (m *Manager) DeleteStream(id uint32) {
}
delete(m.streams, id)
m.priorityTree.RemoveStream(id)

m.windowUpdateMu.Lock()
delete(m.pendingStreamUpdates, id)
m.windowUpdateMu.Unlock()
}

// StreamCount returns the number of streams in the manager.
Expand Down