feat(logging): add home request-log forwarding support
- Introduced `SetHomeEnabled` to enable/disable request-log forwarding to the home control plane. - Implemented `forwardRequestLogToHome` for non-streaming logs and `homeStreamingLogWriter` for real-time streaming logs. - Enhanced `FileRequestLogger` to bypass local logging when home forwarding is enabled. - Updated server configuration to dynamically toggle home request-log forwarding based on changes. - Added corresponding unit tests to ensure correct forwarding behavior and fallback mechanisms.
This commit is contained in:
@@ -8,6 +8,8 @@ import (
|
||||
"bytes"
|
||||
"compress/flate"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@@ -23,12 +25,22 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/buildinfo"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/home"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/interfaces"
|
||||
"github.com/router-for-me/CLIProxyAPI/v7/internal/util"
|
||||
)
|
||||
|
||||
var requestLogID atomic.Uint64
|
||||
|
||||
type homeRequestLogClient interface {
|
||||
HeartbeatOK() bool
|
||||
RPushRequestLog(ctx context.Context, payload []byte) error
|
||||
}
|
||||
|
||||
var currentHomeRequestLogClient = func() homeRequestLogClient {
|
||||
return home.Current()
|
||||
}
|
||||
|
||||
// RequestLogger defines the interface for logging HTTP requests and responses.
|
||||
// It provides methods for logging both regular and streaming HTTP request/response cycles.
|
||||
type RequestLogger interface {
|
||||
@@ -148,6 +160,58 @@ type FileRequestLogger struct {
|
||||
|
||||
// errorLogsMaxFiles limits the number of error log files retained.
|
||||
errorLogsMaxFiles int
|
||||
|
||||
homeEnabled bool
|
||||
}
|
||||
|
||||
type homeRequestLogPayload struct {
|
||||
Headers map[string][]string `json:"headers,omitempty"`
|
||||
RequestLog string `json:"request_log,omitempty"`
|
||||
}
|
||||
|
||||
func cloneHeaders(headers map[string][]string) map[string][]string {
|
||||
if len(headers) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string][]string, len(headers))
|
||||
for key, values := range headers {
|
||||
if strings.TrimSpace(key) == "" {
|
||||
continue
|
||||
}
|
||||
if values == nil {
|
||||
out[key] = nil
|
||||
continue
|
||||
}
|
||||
copied := make([]string, len(values))
|
||||
copy(copied, values)
|
||||
out[key] = copied
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (l *FileRequestLogger) forwardRequestLogToHome(ctx context.Context, headers map[string][]string, logText string) error {
|
||||
if l == nil || !l.homeEnabled {
|
||||
return nil
|
||||
}
|
||||
client := currentHomeRequestLogClient()
|
||||
if client == nil || !client.HeartbeatOK() {
|
||||
return nil
|
||||
}
|
||||
payload := homeRequestLogPayload{
|
||||
Headers: cloneHeaders(headers),
|
||||
RequestLog: logText,
|
||||
}
|
||||
raw, errMarshal := json.Marshal(&payload)
|
||||
if errMarshal != nil {
|
||||
return errMarshal
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return client.RPushRequestLog(ctx, raw)
|
||||
}
|
||||
|
||||
// NewFileRequestLogger creates a new file-based request logger.
|
||||
@@ -173,9 +237,19 @@ func NewFileRequestLogger(enabled bool, logsDir string, configDir string, errorL
|
||||
enabled: enabled,
|
||||
logsDir: logsDir,
|
||||
errorLogsMaxFiles: errorLogsMaxFiles,
|
||||
homeEnabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
// SetHomeEnabled toggles home request-log forwarding.
|
||||
// When enabled, request logs are not written to disk and are instead forwarded to home via Redis RESP.
|
||||
func (l *FileRequestLogger) SetHomeEnabled(enabled bool) {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
l.homeEnabled = enabled
|
||||
}
|
||||
|
||||
// IsEnabled returns whether request logging is currently enabled.
|
||||
//
|
||||
// Returns:
|
||||
@@ -231,6 +305,38 @@ func (l *FileRequestLogger) logRequest(url, method string, requestHeaders map[st
|
||||
return nil
|
||||
}
|
||||
|
||||
if l.homeEnabled && l.enabled {
|
||||
responseToWrite, decompressErr := l.decompressResponse(responseHeaders, response)
|
||||
if decompressErr != nil {
|
||||
responseToWrite = response
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
writeErr := l.writeNonStreamingLog(
|
||||
&buf,
|
||||
url,
|
||||
method,
|
||||
requestHeaders,
|
||||
body,
|
||||
"",
|
||||
websocketTimeline,
|
||||
apiRequest,
|
||||
apiResponse,
|
||||
apiWebsocketTimeline,
|
||||
apiResponseErrors,
|
||||
statusCode,
|
||||
responseHeaders,
|
||||
responseToWrite,
|
||||
decompressErr,
|
||||
requestTimestamp,
|
||||
apiResponseTimestamp,
|
||||
)
|
||||
if writeErr != nil {
|
||||
return fmt.Errorf("failed to build request log content: %w", writeErr)
|
||||
}
|
||||
return l.forwardRequestLogToHome(context.Background(), requestHeaders, buf.String())
|
||||
}
|
||||
|
||||
// Ensure logs directory exists
|
||||
if errEnsure := l.ensureLogsDir(); errEnsure != nil {
|
||||
return fmt.Errorf("failed to create logs directory: %w", errEnsure)
|
||||
@@ -321,6 +427,14 @@ func (l *FileRequestLogger) LogStreamingRequest(url, method string, headers map[
|
||||
return &NoOpStreamingLogWriter{}, nil
|
||||
}
|
||||
|
||||
if l.homeEnabled {
|
||||
client := home.Current()
|
||||
if client == nil || !client.HeartbeatOK() {
|
||||
return &NoOpStreamingLogWriter{}, nil
|
||||
}
|
||||
return newHomeStreamingLogWriter(url, method, headers, body, requestID), nil
|
||||
}
|
||||
|
||||
// Ensure logs directory exists
|
||||
if err := l.ensureLogsDir(); err != nil {
|
||||
return nil, fmt.Errorf("failed to create logs directory: %w", err)
|
||||
@@ -1498,3 +1612,165 @@ func (w *NoOpStreamingLogWriter) SetFirstChunkTimestamp(_ time.Time) {}
|
||||
// Returns:
|
||||
// - error: Always returns nil
|
||||
func (w *NoOpStreamingLogWriter) Close() error { return nil }
|
||||
|
||||
type homeStreamingLogWriter struct {
|
||||
url string
|
||||
method string
|
||||
timestamp time.Time
|
||||
|
||||
requestHeaders map[string][]string
|
||||
requestBody []byte
|
||||
|
||||
chunkChan chan []byte
|
||||
doneChan chan struct{}
|
||||
|
||||
responseStatus int
|
||||
statusWritten bool
|
||||
responseHeaders map[string][]string
|
||||
responseBody bytes.Buffer
|
||||
apiRequest []byte
|
||||
apiResponse []byte
|
||||
apiWebsocketTime []byte
|
||||
apiResponseTS time.Time
|
||||
firstChunkTS time.Time
|
||||
}
|
||||
|
||||
func newHomeStreamingLogWriter(url, method string, headers map[string][]string, body []byte, _ string) *homeStreamingLogWriter {
|
||||
requestHeaders := make(map[string][]string, len(headers))
|
||||
for key, values := range headers {
|
||||
headerValues := make([]string, len(values))
|
||||
copy(headerValues, values)
|
||||
requestHeaders[key] = headerValues
|
||||
}
|
||||
|
||||
writer := &homeStreamingLogWriter{
|
||||
url: url,
|
||||
method: method,
|
||||
timestamp: time.Now(),
|
||||
requestHeaders: requestHeaders,
|
||||
requestBody: append([]byte(nil), body...),
|
||||
chunkChan: make(chan []byte, 100),
|
||||
doneChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
go writer.asyncWriter()
|
||||
return writer
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) asyncWriter() {
|
||||
defer close(w.doneChan)
|
||||
for chunk := range w.chunkChan {
|
||||
if len(chunk) == 0 {
|
||||
continue
|
||||
}
|
||||
_, _ = w.responseBody.Write(chunk)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) WriteChunkAsync(chunk []byte) {
|
||||
if w == nil || w.chunkChan == nil || len(chunk) == 0 {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case w.chunkChan <- append([]byte(nil), chunk...):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) WriteStatus(status int, headers map[string][]string) error {
|
||||
if w == nil || status == 0 {
|
||||
return nil
|
||||
}
|
||||
w.responseStatus = status
|
||||
w.statusWritten = true
|
||||
if headers != nil {
|
||||
w.responseHeaders = make(map[string][]string, len(headers))
|
||||
for key, values := range headers {
|
||||
copied := make([]string, len(values))
|
||||
copy(copied, values)
|
||||
w.responseHeaders[key] = copied
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) WriteAPIRequest(apiRequest []byte) error {
|
||||
if w == nil || len(apiRequest) == 0 {
|
||||
return nil
|
||||
}
|
||||
w.apiRequest = bytes.Clone(apiRequest)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) WriteAPIResponse(apiResponse []byte) error {
|
||||
if w == nil || len(apiResponse) == 0 {
|
||||
return nil
|
||||
}
|
||||
w.apiResponse = bytes.Clone(apiResponse)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) WriteAPIWebsocketTimeline(apiWebsocketTimeline []byte) error {
|
||||
if w == nil || len(apiWebsocketTimeline) == 0 {
|
||||
return nil
|
||||
}
|
||||
w.apiWebsocketTime = bytes.Clone(apiWebsocketTimeline)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) SetFirstChunkTimestamp(timestamp time.Time) {
|
||||
if w == nil {
|
||||
return
|
||||
}
|
||||
if !timestamp.IsZero() {
|
||||
w.firstChunkTS = timestamp
|
||||
w.apiResponseTS = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
func (w *homeStreamingLogWriter) Close() error {
|
||||
if w == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
client := currentHomeRequestLogClient()
|
||||
if client == nil || !client.HeartbeatOK() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if w.chunkChan != nil {
|
||||
close(w.chunkChan)
|
||||
<-w.doneChan
|
||||
w.chunkChan = nil
|
||||
}
|
||||
|
||||
responsePayload := w.responseBody.Bytes()
|
||||
|
||||
var buf bytes.Buffer
|
||||
upstreamTransport := inferUpstreamTransport(w.apiRequest, w.apiResponse, w.apiWebsocketTime, nil)
|
||||
if errWrite := writeRequestInfoWithBody(&buf, w.url, w.method, w.requestHeaders, w.requestBody, "", w.timestamp, "http", upstreamTransport, true); errWrite != nil {
|
||||
return errWrite
|
||||
}
|
||||
if errWrite := writeAPISection(&buf, "=== API WEBSOCKET TIMELINE ===\n", "=== API WEBSOCKET TIMELINE", w.apiWebsocketTime, time.Time{}); errWrite != nil {
|
||||
return errWrite
|
||||
}
|
||||
if errWrite := writeAPISection(&buf, "=== API REQUEST ===\n", "=== API REQUEST", w.apiRequest, time.Time{}); errWrite != nil {
|
||||
return errWrite
|
||||
}
|
||||
if errWrite := writeAPISection(&buf, "=== API RESPONSE ===\n", "=== API RESPONSE", w.apiResponse, w.apiResponseTS); errWrite != nil {
|
||||
return errWrite
|
||||
}
|
||||
if errWrite := writeResponseSection(&buf, w.responseStatus, w.statusWritten, w.responseHeaders, bytes.NewReader(responsePayload), nil, false); errWrite != nil {
|
||||
return errWrite
|
||||
}
|
||||
|
||||
payload := homeRequestLogPayload{
|
||||
Headers: cloneHeaders(w.requestHeaders),
|
||||
RequestLog: buf.String(),
|
||||
}
|
||||
raw, errMarshal := json.Marshal(&payload)
|
||||
if errMarshal != nil {
|
||||
return errMarshal
|
||||
}
|
||||
return client.RPushRequestLog(context.Background(), raw)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user