fix(api): prevent idle TCP connections from blocking the accept loop
Move per-connection protocol detection (TLS handshake, reader.Peek) out of the accept loop and into a per-connection goroutine. An idle TCP connection that never sends bytes would previously block Peek(1) indefinitely, preventing all subsequent connections from being accepted and making the management/API server unresponsive. Closes #3267
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
@@ -48,13 +49,30 @@ func (s *Server) acceptMuxConnections(listener net.Listener, httpListener *muxLi
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dispatch each connection to a goroutine so that slow/idle clients
|
||||||
|
// cannot block the accept loop. Previously, TLS handshake and
|
||||||
|
// reader.Peek(1) were performed inline; an idle TCP connection that
|
||||||
|
// never sent bytes would block Peek indefinitely, preventing all
|
||||||
|
// subsequent connections from being accepted (issue #3267).
|
||||||
|
go s.routeMuxConnection(conn, httpListener)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// routeMuxConnection performs per-connection protocol detection and routing.
|
||||||
|
func (s *Server) routeMuxConnection(conn net.Conn, httpListener *muxListener) {
|
||||||
|
// Set a read deadline so that idle connections that never send bytes do not
|
||||||
|
// leak goroutines and file descriptors. The deadline is cleared once the
|
||||||
|
// connection is successfully routed to its handler.
|
||||||
|
const muxSniffDeadline = 10 * time.Second
|
||||||
|
_ = conn.SetReadDeadline(time.Now().Add(muxSniffDeadline))
|
||||||
|
|
||||||
tlsConn, ok := conn.(*tls.Conn)
|
tlsConn, ok := conn.(*tls.Conn)
|
||||||
if ok {
|
if ok {
|
||||||
if errHandshake := tlsConn.Handshake(); errHandshake != nil {
|
if errHandshake := tlsConn.Handshake(); errHandshake != nil {
|
||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close connection after TLS handshake error: %v", errClose)
|
log.Errorf("failed to close connection after TLS handshake error: %v", errClose)
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
proto := strings.TrimSpace(tlsConn.ConnectionState().NegotiatedProtocol)
|
proto := strings.TrimSpace(tlsConn.ConnectionState().NegotiatedProtocol)
|
||||||
if proto == "h2" || proto == "http/1.1" {
|
if proto == "h2" || proto == "http/1.1" {
|
||||||
@@ -62,14 +80,16 @@ func (s *Server) acceptMuxConnections(listener net.Listener, httpListener *muxLi
|
|||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close connection: %v", errClose)
|
log.Errorf("failed to close connection: %v", errClose)
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
if errPut := httpListener.Put(tlsConn); errPut != nil {
|
if errPut := httpListener.Put(tlsConn); errPut != nil {
|
||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close connection after HTTP routing failure: %v", errClose)
|
log.Errorf("failed to close connection after HTTP routing failure: %v", errClose)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
_ = conn.SetReadDeadline(time.Time{})
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,7 +99,7 @@ func (s *Server) acceptMuxConnections(listener net.Listener, httpListener *muxLi
|
|||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close connection after protocol peek failure: %v", errClose)
|
log.Errorf("failed to close connection after protocol peek failure: %v", errClose)
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if isRedisRESPPrefix(prefix[0]) {
|
if isRedisRESPPrefix(prefix[0]) {
|
||||||
@@ -87,29 +107,30 @@ func (s *Server) acceptMuxConnections(listener net.Listener, httpListener *muxLi
|
|||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close redis connection while home mode is enabled: %v", errClose)
|
log.Errorf("failed to close redis connection while home mode is enabled: %v", errClose)
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
if !s.managementRoutesEnabled.Load() {
|
if !s.managementRoutesEnabled.Load() {
|
||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close redis connection while management is disabled: %v", errClose)
|
log.Errorf("failed to close redis connection while management is disabled: %v", errClose)
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
go s.handleRedisConnection(conn, reader)
|
s.handleRedisConnection(conn, reader)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if httpListener == nil {
|
if httpListener == nil {
|
||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close connection without HTTP listener: %v", errClose)
|
log.Errorf("failed to close connection without HTTP listener: %v", errClose)
|
||||||
}
|
}
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if errPut := httpListener.Put(&bufferedConn{Conn: conn, reader: reader}); errPut != nil {
|
if errPut := httpListener.Put(&bufferedConn{Conn: conn, reader: reader}); errPut != nil {
|
||||||
if errClose := conn.Close(); errClose != nil {
|
if errClose := conn.Close(); errClose != nil {
|
||||||
log.Errorf("failed to close connection after HTTP routing failure: %v", errClose)
|
log.Errorf("failed to close connection after HTTP routing failure: %v", errClose)
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
|
_ = conn.SetReadDeadline(time.Time{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,65 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAcceptMuxNotBlockedByIdleConnection(t *testing.T) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
defer listener.Close()
|
||||||
|
|
||||||
|
var routed atomic.Int32
|
||||||
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
routed.Add(1)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
})
|
||||||
|
srv := httptest.NewUnstartedServer(handler)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
muxLn := newMuxListener(listener.Addr(), 1024)
|
||||||
|
server := &Server{managementRoutesEnabled: atomic.Bool{}}
|
||||||
|
server.managementRoutesEnabled.Store(false)
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
errCh <- server.acceptMuxConnections(listener, muxLn)
|
||||||
|
}()
|
||||||
|
|
||||||
|
srv.Listener = muxLn
|
||||||
|
srv.Start()
|
||||||
|
|
||||||
|
// Open an idle TCP connection that never sends any bytes.
|
||||||
|
idleConn, err := net.DialTimeout("tcp", listener.Addr().String(), 2*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to dial idle connection: %v", err)
|
||||||
|
}
|
||||||
|
defer idleConn.Close()
|
||||||
|
|
||||||
|
// Give the accept loop time to pick up the idle connection.
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
// Send a real HTTP request. Before the fix, the accept loop would be
|
||||||
|
// blocked on Peek(1) for the idle connection, causing this request to
|
||||||
|
// time out.
|
||||||
|
client := &http.Client{Timeout: 3 * time.Second}
|
||||||
|
resp, err := client.Get("http://" + listener.Addr().String() + "/")
|
||||||
|
if err != nil {
|
||||||
|
listener.Close()
|
||||||
|
t.Fatalf("HTTP request failed (accept loop may be blocked by idle connection): %v", err)
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
listener.Close()
|
||||||
|
|
||||||
|
if routed.Load() == 0 {
|
||||||
|
t.Error("expected at least one request to be routed")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user