refactor(logging): centralize websocket handshake recording
This commit is contained in:
@@ -570,6 +570,9 @@ func (l *FileRequestLogger) writeNonStreamingLog(
|
|||||||
return errWrite
|
return errWrite
|
||||||
}
|
}
|
||||||
if isWebsocketTranscript {
|
if isWebsocketTranscript {
|
||||||
|
// Intentionally omit the generic downstream HTTP response section for websocket
|
||||||
|
// transcripts. The durable session exchange is captured in WEBSOCKET TIMELINE,
|
||||||
|
// and appending a one-off upgrade response snapshot would dilute that transcript.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return writeResponseSection(w, statusCode, true, responseHeaders, bytes.NewReader(response), decompressErr, true)
|
return writeResponseSection(w, statusCode, true, responseHeaders, bytes.NewReader(response), decompressErr, true)
|
||||||
@@ -949,6 +952,8 @@ func (l *FileRequestLogger) formatLogContent(url, method string, headers map[str
|
|||||||
}
|
}
|
||||||
|
|
||||||
if isWebsocketTranscript {
|
if isWebsocketTranscript {
|
||||||
|
// Mirror writeNonStreamingLog: websocket transcripts end with the dedicated
|
||||||
|
// timeline sections instead of a generic downstream HTTP response block.
|
||||||
return content.String()
|
return content.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -247,10 +247,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
|||||||
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial", errDial)
|
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial", errDial)
|
||||||
return resp, errDial
|
return resp, errDial
|
||||||
}
|
}
|
||||||
if respHS != nil {
|
recordAPIWebsocketHandshake(ctx, e.cfg, respHS)
|
||||||
helps.RecordAPIWebsocketHandshake(ctx, e.cfg, respHS.StatusCode, respHS.Header.Clone())
|
|
||||||
}
|
|
||||||
closeHTTPResponseBody(respHS, "codex websockets executor: close handshake response body error")
|
|
||||||
if sess == nil {
|
if sess == nil {
|
||||||
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
|
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -279,7 +276,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
|||||||
// Retry once with a fresh websocket connection. This is mainly to handle
|
// Retry once with a fresh websocket connection. This is mainly to handle
|
||||||
// upstream closing the socket between sequential requests within the same
|
// upstream closing the socket between sequential requests within the same
|
||||||
// execution session.
|
// execution session.
|
||||||
connRetry, _, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
|
connRetry, respHSRetry, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
|
||||||
if errDialRetry == nil && connRetry != nil {
|
if errDialRetry == nil && connRetry != nil {
|
||||||
wsReqBodyRetry := buildCodexWebsocketRequestBody(body)
|
wsReqBodyRetry := buildCodexWebsocketRequestBody(body)
|
||||||
helps.RecordAPIWebsocketRequest(ctx, e.cfg, helps.UpstreamRequestLog{
|
helps.RecordAPIWebsocketRequest(ctx, e.cfg, helps.UpstreamRequestLog{
|
||||||
@@ -293,6 +290,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
|||||||
AuthType: authType,
|
AuthType: authType,
|
||||||
AuthValue: authValue,
|
AuthValue: authValue,
|
||||||
})
|
})
|
||||||
|
recordAPIWebsocketHandshake(ctx, e.cfg, respHSRetry)
|
||||||
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry == nil {
|
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry == nil {
|
||||||
conn = connRetry
|
conn = connRetry
|
||||||
wsReqBody = wsReqBodyRetry
|
wsReqBody = wsReqBodyRetry
|
||||||
@@ -302,6 +300,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut
|
|||||||
return resp, errSendRetry
|
return resp, errSendRetry
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
closeHTTPResponseBody(respHSRetry, "codex websockets executor: close handshake response body error")
|
||||||
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial_retry", errDialRetry)
|
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial_retry", errDialRetry)
|
||||||
return resp, errDialRetry
|
return resp, errDialRetry
|
||||||
}
|
}
|
||||||
@@ -449,10 +448,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
|||||||
}
|
}
|
||||||
return nil, errDial
|
return nil, errDial
|
||||||
}
|
}
|
||||||
if respHS != nil {
|
recordAPIWebsocketHandshake(ctx, e.cfg, respHS)
|
||||||
helps.RecordAPIWebsocketHandshake(ctx, e.cfg, respHS.StatusCode, respHS.Header.Clone())
|
|
||||||
}
|
|
||||||
closeHTTPResponseBody(respHS, "codex websockets executor: close handshake response body error")
|
|
||||||
|
|
||||||
if sess == nil {
|
if sess == nil {
|
||||||
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
|
logCodexWebsocketConnected(executionSessionID, authID, wsURL)
|
||||||
@@ -470,8 +466,9 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
|||||||
e.invalidateUpstreamConn(sess, conn, "send_error", errSend)
|
e.invalidateUpstreamConn(sess, conn, "send_error", errSend)
|
||||||
|
|
||||||
// Retry once with a new websocket connection for the same execution session.
|
// Retry once with a new websocket connection for the same execution session.
|
||||||
connRetry, _, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
|
connRetry, respHSRetry, errDialRetry := e.ensureUpstreamConn(ctx, auth, sess, authID, wsURL, wsHeaders)
|
||||||
if errDialRetry != nil || connRetry == nil {
|
if errDialRetry != nil || connRetry == nil {
|
||||||
|
closeHTTPResponseBody(respHSRetry, "codex websockets executor: close handshake response body error")
|
||||||
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial_retry", errDialRetry)
|
helps.RecordAPIWebsocketError(ctx, e.cfg, "dial_retry", errDialRetry)
|
||||||
sess.clearActive(readCh)
|
sess.clearActive(readCh)
|
||||||
sess.reqMu.Unlock()
|
sess.reqMu.Unlock()
|
||||||
@@ -489,6 +486,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
|
|||||||
AuthType: authType,
|
AuthType: authType,
|
||||||
AuthValue: authValue,
|
AuthValue: authValue,
|
||||||
})
|
})
|
||||||
|
recordAPIWebsocketHandshake(ctx, e.cfg, respHSRetry)
|
||||||
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry != nil {
|
if errSendRetry := writeCodexWebsocketMessage(sess, connRetry, wsReqBodyRetry); errSendRetry != nil {
|
||||||
helps.RecordAPIWebsocketError(ctx, e.cfg, "send_retry", errSendRetry)
|
helps.RecordAPIWebsocketError(ctx, e.cfg, "send_retry", errSendRetry)
|
||||||
e.invalidateUpstreamConn(sess, connRetry, "send_error", errSendRetry)
|
e.invalidateUpstreamConn(sess, connRetry, "send_error", errSendRetry)
|
||||||
@@ -1044,6 +1042,14 @@ func websocketUpgradeRequestLog(info helps.UpstreamRequestLog) helps.UpstreamReq
|
|||||||
return upgradeInfo
|
return upgradeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func recordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, resp *http.Response) {
|
||||||
|
if resp == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
helps.RecordAPIWebsocketHandshake(ctx, cfg, resp.StatusCode, resp.Header.Clone())
|
||||||
|
closeHTTPResponseBody(resp, "codex websockets executor: close handshake response body error")
|
||||||
|
}
|
||||||
|
|
||||||
func websocketHandshakeBody(resp *http.Response) []byte {
|
func websocketHandshakeBody(resp *http.Response) []byte {
|
||||||
if resp == nil || resp.Body == nil {
|
if resp == nil || resp.Body == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user