Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f58d0faf8c | ||
|
|
df3b00621a | ||
|
|
72cb2689e8 | ||
|
|
ade279d1f2 | ||
|
|
9c5ac2927a | ||
|
|
7980f055fa | ||
|
|
eb2549a782 | ||
|
|
c419264a70 | ||
|
|
6b23e2da74 | ||
|
|
5ab0854b5b | ||
|
|
15981aa412 | ||
|
|
ac4f52c532 | ||
|
|
84fa497169 |
@@ -1150,126 +1150,50 @@ func (h *Handler) RequestIFlowToken(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"status": "error", "error": "failed to start callback server"})
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer stopCallbackForwarder(iflowauth.CallbackPort)
|
||||
fmt.Println("Waiting for authentication...")
|
||||
|
||||
waitFile := filepath.Join(h.cfg.AuthDir, fmt.Sprintf(".oauth-iflow-%s.oauth", state))
|
||||
deadline := time.Now().Add(5 * time.Minute)
|
||||
var resultMap map[string]string
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Println("Authentication failed: timeout waiting for callback")
|
||||
return
|
||||
}
|
||||
if data, errR := os.ReadFile(waitFile); errR == nil {
|
||||
_ = os.Remove(waitFile)
|
||||
_ = json.Unmarshal(data, &resultMap)
|
||||
break
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
if errStr := strings.TrimSpace(resultMap["error"]); errStr != "" {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Printf("Authentication failed: %s\n", errStr)
|
||||
return
|
||||
}
|
||||
if resultState := strings.TrimSpace(resultMap["state"]); resultState != state {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Println("Authentication failed: state mismatch")
|
||||
return
|
||||
}
|
||||
|
||||
code := strings.TrimSpace(resultMap["code"])
|
||||
if code == "" {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Println("Authentication failed: code missing")
|
||||
return
|
||||
}
|
||||
|
||||
tokenData, errExchange := authSvc.ExchangeCodeForTokens(ctx, code, redirectURI)
|
||||
if errExchange != nil {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Printf("Authentication failed: %v\n", errExchange)
|
||||
return
|
||||
}
|
||||
|
||||
tokenStorage := authSvc.CreateTokenStorage(tokenData)
|
||||
identifier := strings.TrimSpace(tokenStorage.Email)
|
||||
if identifier == "" {
|
||||
identifier = fmt.Sprintf("iflow-%d", time.Now().UnixMilli())
|
||||
tokenStorage.Email = identifier
|
||||
}
|
||||
record := &coreauth.Auth{
|
||||
ID: fmt.Sprintf("iflow-%s.json", identifier),
|
||||
Provider: "iflow",
|
||||
FileName: fmt.Sprintf("iflow-%s.json", identifier),
|
||||
Storage: tokenStorage,
|
||||
Metadata: map[string]any{"email": identifier, "api_key": tokenStorage.APIKey},
|
||||
Attributes: map[string]string{"api_key": tokenStorage.APIKey},
|
||||
}
|
||||
|
||||
savedPath, errSave := h.saveTokenRecord(ctx, record)
|
||||
if errSave != nil {
|
||||
oauthStatus[state] = "Failed to save authentication tokens"
|
||||
log.Fatalf("Failed to save authentication tokens: %v", errSave)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Authentication successful! Token saved to %s\n", savedPath)
|
||||
if tokenStorage.APIKey != "" {
|
||||
fmt.Println("API key obtained and saved")
|
||||
}
|
||||
fmt.Println("You can now use iFlow services through this CLI")
|
||||
delete(oauthStatus, state)
|
||||
}()
|
||||
|
||||
oauthStatus[state] = ""
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok", "url": authURL, "state": state})
|
||||
return
|
||||
}
|
||||
|
||||
oauthServer := iflowauth.NewOAuthServer(iflowauth.CallbackPort)
|
||||
if err := oauthServer.Start(); err != nil {
|
||||
oauthStatus[state] = "Failed to start authentication server"
|
||||
log.Errorf("Failed to start iFlow OAuth server: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"status": "error", "error": "failed to start local oauth server"})
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
if isWebUI {
|
||||
defer stopCallbackForwarder(iflowauth.CallbackPort)
|
||||
}
|
||||
fmt.Println("Waiting for authentication...")
|
||||
defer func() {
|
||||
stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if err := oauthServer.Stop(stopCtx); err != nil {
|
||||
log.Warnf("Failed to stop iFlow OAuth server: %v", err)
|
||||
|
||||
waitFile := filepath.Join(h.cfg.AuthDir, fmt.Sprintf(".oauth-iflow-%s.oauth", state))
|
||||
deadline := time.Now().Add(5 * time.Minute)
|
||||
var resultMap map[string]string
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Println("Authentication failed: timeout waiting for callback")
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
result, err := oauthServer.WaitForCallback(5 * time.Minute)
|
||||
if err != nil {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Printf("Authentication failed: %v\n", err)
|
||||
return
|
||||
if data, errR := os.ReadFile(waitFile); errR == nil {
|
||||
_ = os.Remove(waitFile)
|
||||
_ = json.Unmarshal(data, &resultMap)
|
||||
break
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
if result.Error != "" {
|
||||
if errStr := strings.TrimSpace(resultMap["error"]); errStr != "" {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Printf("Authentication failed: %s\n", result.Error)
|
||||
fmt.Printf("Authentication failed: %s\n", errStr)
|
||||
return
|
||||
}
|
||||
|
||||
if result.State != state {
|
||||
if resultState := strings.TrimSpace(resultMap["state"]); resultState != state {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Println("Authentication failed: state mismatch")
|
||||
return
|
||||
}
|
||||
|
||||
tokenData, errExchange := authSvc.ExchangeCodeForTokens(ctx, result.Code, redirectURI)
|
||||
code := strings.TrimSpace(resultMap["code"])
|
||||
if code == "" {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Println("Authentication failed: code missing")
|
||||
return
|
||||
}
|
||||
|
||||
tokenData, errExchange := authSvc.ExchangeCodeForTokens(ctx, code, redirectURI)
|
||||
if errExchange != nil {
|
||||
oauthStatus[state] = "Authentication failed"
|
||||
fmt.Printf("Authentication failed: %v\n", errExchange)
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -37,6 +38,7 @@ type Handler struct {
|
||||
localPassword string
|
||||
allowRemoteOverride bool
|
||||
envSecret string
|
||||
logDir string
|
||||
}
|
||||
|
||||
// NewHandler creates a new management handler instance.
|
||||
@@ -68,6 +70,19 @@ func (h *Handler) SetUsageStatistics(stats *usage.RequestStatistics) { h.usageSt
|
||||
// SetLocalPassword configures the runtime-local password accepted for localhost requests.
|
||||
func (h *Handler) SetLocalPassword(password string) { h.localPassword = password }
|
||||
|
||||
// SetLogDirectory updates the directory where main.log should be looked up.
|
||||
func (h *Handler) SetLogDirectory(dir string) {
|
||||
if dir == "" {
|
||||
return
|
||||
}
|
||||
if !filepath.IsAbs(dir) {
|
||||
if abs, err := filepath.Abs(dir); err == nil {
|
||||
dir = abs
|
||||
}
|
||||
}
|
||||
h.logDir = dir
|
||||
}
|
||||
|
||||
// Middleware enforces access control for management endpoints.
|
||||
// All requests (local and remote) require a valid management key.
|
||||
// Additionally, remote access requires allow-remote-management=true.
|
||||
|
||||
344
internal/api/handlers/management/logs.go
Normal file
344
internal/api/handlers/management/logs.go
Normal file
@@ -0,0 +1,344 @@
|
||||
package management
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLogFileName = "main.log"
|
||||
logScannerInitialBuffer = 64 * 1024
|
||||
logScannerMaxBuffer = 8 * 1024 * 1024
|
||||
)
|
||||
|
||||
// GetLogs returns log lines with optional incremental loading.
|
||||
func (h *Handler) GetLogs(c *gin.Context) {
|
||||
if h == nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "handler unavailable"})
|
||||
return
|
||||
}
|
||||
if h.cfg == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "configuration unavailable"})
|
||||
return
|
||||
}
|
||||
if !h.cfg.LoggingToFile {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "logging to file disabled"})
|
||||
return
|
||||
}
|
||||
|
||||
logDir := h.logDirectory()
|
||||
if strings.TrimSpace(logDir) == "" {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "log directory not configured"})
|
||||
return
|
||||
}
|
||||
|
||||
files, err := h.collectLogFiles(logDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
cutoff := parseCutoff(c.Query("after"))
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"lines": []string{},
|
||||
"line-count": 0,
|
||||
"latest-timestamp": cutoff,
|
||||
})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to list log files: %v", err)})
|
||||
return
|
||||
}
|
||||
|
||||
cutoff := parseCutoff(c.Query("after"))
|
||||
acc := newLogAccumulator(cutoff)
|
||||
for i := range files {
|
||||
if errProcess := acc.consumeFile(files[i]); errProcess != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to read log file %s: %v", files[i], errProcess)})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
lines, total, latest := acc.result()
|
||||
if latest == 0 || latest < cutoff {
|
||||
latest = cutoff
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"lines": lines,
|
||||
"line-count": total,
|
||||
"latest-timestamp": latest,
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteLogs removes all rotated log files and truncates the active log.
|
||||
func (h *Handler) DeleteLogs(c *gin.Context) {
|
||||
if h == nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "handler unavailable"})
|
||||
return
|
||||
}
|
||||
if h.cfg == nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "configuration unavailable"})
|
||||
return
|
||||
}
|
||||
if !h.cfg.LoggingToFile {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "logging to file disabled"})
|
||||
return
|
||||
}
|
||||
|
||||
dir := h.logDirectory()
|
||||
if strings.TrimSpace(dir) == "" {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "log directory not configured"})
|
||||
return
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "log directory not found"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to list log directory: %v", err)})
|
||||
return
|
||||
}
|
||||
|
||||
removed := 0
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
fullPath := filepath.Join(dir, name)
|
||||
if name == defaultLogFileName {
|
||||
if errTrunc := os.Truncate(fullPath, 0); errTrunc != nil && !os.IsNotExist(errTrunc) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to truncate log file: %v", errTrunc)})
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
if isRotatedLogFile(name) {
|
||||
if errRemove := os.Remove(fullPath); errRemove != nil && !os.IsNotExist(errRemove) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to remove %s: %v", name, errRemove)})
|
||||
return
|
||||
}
|
||||
removed++
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"success": true,
|
||||
"message": "Logs cleared successfully",
|
||||
"removed": removed,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) logDirectory() string {
|
||||
if h == nil {
|
||||
return ""
|
||||
}
|
||||
if h.logDir != "" {
|
||||
return h.logDir
|
||||
}
|
||||
if h.configFilePath != "" {
|
||||
dir := filepath.Dir(h.configFilePath)
|
||||
if dir != "" && dir != "." {
|
||||
return filepath.Join(dir, "logs")
|
||||
}
|
||||
}
|
||||
return "logs"
|
||||
}
|
||||
|
||||
func (h *Handler) collectLogFiles(dir string) ([]string, error) {
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
type candidate struct {
|
||||
path string
|
||||
order int64
|
||||
}
|
||||
cands := make([]candidate, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
if name == defaultLogFileName {
|
||||
cands = append(cands, candidate{path: filepath.Join(dir, name), order: 0})
|
||||
continue
|
||||
}
|
||||
if order, ok := rotationOrder(name); ok {
|
||||
cands = append(cands, candidate{path: filepath.Join(dir, name), order: order})
|
||||
}
|
||||
}
|
||||
if len(cands) == 0 {
|
||||
return []string{}, nil
|
||||
}
|
||||
sort.Slice(cands, func(i, j int) bool { return cands[i].order < cands[j].order })
|
||||
paths := make([]string, 0, len(cands))
|
||||
for i := len(cands) - 1; i >= 0; i-- {
|
||||
paths = append(paths, cands[i].path)
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
type logAccumulator struct {
|
||||
cutoff int64
|
||||
lines []string
|
||||
total int
|
||||
latest int64
|
||||
include bool
|
||||
}
|
||||
|
||||
func newLogAccumulator(cutoff int64) *logAccumulator {
|
||||
return &logAccumulator{
|
||||
cutoff: cutoff,
|
||||
lines: make([]string, 0, 256),
|
||||
}
|
||||
}
|
||||
|
||||
func (acc *logAccumulator) consumeFile(path string) error {
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
buf := make([]byte, 0, logScannerInitialBuffer)
|
||||
scanner.Buffer(buf, logScannerMaxBuffer)
|
||||
for scanner.Scan() {
|
||||
acc.addLine(scanner.Text())
|
||||
}
|
||||
if errScan := scanner.Err(); errScan != nil {
|
||||
return errScan
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (acc *logAccumulator) addLine(raw string) {
|
||||
line := strings.TrimRight(raw, "\r")
|
||||
acc.total++
|
||||
ts := parseTimestamp(line)
|
||||
if ts > acc.latest {
|
||||
acc.latest = ts
|
||||
}
|
||||
if ts > 0 {
|
||||
acc.include = acc.cutoff == 0 || ts > acc.cutoff
|
||||
if acc.cutoff == 0 || acc.include {
|
||||
acc.lines = append(acc.lines, line)
|
||||
}
|
||||
return
|
||||
}
|
||||
if acc.cutoff == 0 || acc.include {
|
||||
acc.lines = append(acc.lines, line)
|
||||
}
|
||||
}
|
||||
|
||||
func (acc *logAccumulator) result() ([]string, int, int64) {
|
||||
if acc.lines == nil {
|
||||
acc.lines = []string{}
|
||||
}
|
||||
return acc.lines, acc.total, acc.latest
|
||||
}
|
||||
|
||||
func parseCutoff(raw string) int64 {
|
||||
value := strings.TrimSpace(raw)
|
||||
if value == "" {
|
||||
return 0
|
||||
}
|
||||
ts, err := strconv.ParseInt(value, 10, 64)
|
||||
if err != nil || ts <= 0 {
|
||||
return 0
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
func parseTimestamp(line string) int64 {
|
||||
if strings.HasPrefix(line, "[") {
|
||||
line = line[1:]
|
||||
}
|
||||
if len(line) < 19 {
|
||||
return 0
|
||||
}
|
||||
candidate := line[:19]
|
||||
t, err := time.ParseInLocation("2006-01-02 15:04:05", candidate, time.Local)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return t.Unix()
|
||||
}
|
||||
|
||||
func isRotatedLogFile(name string) bool {
|
||||
if _, ok := rotationOrder(name); ok {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func rotationOrder(name string) (int64, bool) {
|
||||
if order, ok := numericRotationOrder(name); ok {
|
||||
return order, true
|
||||
}
|
||||
if order, ok := timestampRotationOrder(name); ok {
|
||||
return order, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func numericRotationOrder(name string) (int64, bool) {
|
||||
if !strings.HasPrefix(name, defaultLogFileName+".") {
|
||||
return 0, false
|
||||
}
|
||||
suffix := strings.TrimPrefix(name, defaultLogFileName+".")
|
||||
if suffix == "" {
|
||||
return 0, false
|
||||
}
|
||||
n, err := strconv.Atoi(suffix)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return int64(n), true
|
||||
}
|
||||
|
||||
func timestampRotationOrder(name string) (int64, bool) {
|
||||
ext := filepath.Ext(defaultLogFileName)
|
||||
base := strings.TrimSuffix(defaultLogFileName, ext)
|
||||
if base == "" {
|
||||
return 0, false
|
||||
}
|
||||
prefix := base + "-"
|
||||
if !strings.HasPrefix(name, prefix) {
|
||||
return 0, false
|
||||
}
|
||||
clean := strings.TrimPrefix(name, prefix)
|
||||
if strings.HasSuffix(clean, ".gz") {
|
||||
clean = strings.TrimSuffix(clean, ".gz")
|
||||
}
|
||||
if ext != "" {
|
||||
if !strings.HasSuffix(clean, ext) {
|
||||
return 0, false
|
||||
}
|
||||
clean = strings.TrimSuffix(clean, ext)
|
||||
}
|
||||
if clean == "" {
|
||||
return 0, false
|
||||
}
|
||||
if idx := strings.IndexByte(clean, '.'); idx != -1 {
|
||||
clean = clean[:idx]
|
||||
}
|
||||
parsed, err := time.ParseInLocation("2006-01-02T15-04-05", clean, time.Local)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return math.MaxInt64 - parsed.Unix(), true
|
||||
}
|
||||
@@ -19,7 +19,12 @@ import (
|
||||
func RequestLoggingMiddleware(logger logging.RequestLogger) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
path := c.Request.URL.Path
|
||||
if strings.HasPrefix(path, "/v0/management") || path == "/keep-alive" {
|
||||
shouldLog := false
|
||||
if strings.HasPrefix(path, "/v1") {
|
||||
shouldLog = true
|
||||
}
|
||||
|
||||
if !shouldLog {
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers/openai"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const oauthCallbackSuccessHTML = `<html><head><meta charset="utf-8"><title>Authentication successful</title><script>setTimeout(function(){window.close();},5000);</script></head><body><h1>Authentication successful!</h1><p>You can close this window.</p><p>This window will close automatically in 5 seconds.</p></body></html>`
|
||||
@@ -116,6 +117,10 @@ type Server struct {
|
||||
// cfg holds the current server configuration.
|
||||
cfg *config.Config
|
||||
|
||||
// oldConfigYaml stores a YAML snapshot of the previous configuration for change detection.
|
||||
// This prevents issues when the config object is modified in place by Management API.
|
||||
oldConfigYaml []byte
|
||||
|
||||
// accessManager handles request authentication providers.
|
||||
accessManager *sdkaccess.Manager
|
||||
|
||||
@@ -220,12 +225,15 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
|
||||
currentPath: wd,
|
||||
envManagementSecret: envManagementSecret,
|
||||
}
|
||||
// Save initial YAML snapshot
|
||||
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||
s.applyAccessConfig(nil, cfg)
|
||||
// Initialize management handler
|
||||
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
|
||||
if optionState.localPassword != "" {
|
||||
s.mgmt.SetLocalPassword(optionState.localPassword)
|
||||
}
|
||||
s.mgmt.SetLogDirectory(filepath.Join(s.currentPath, "logs"))
|
||||
s.localPassword = optionState.localPassword
|
||||
|
||||
// Setup routes
|
||||
@@ -404,6 +412,8 @@ func (s *Server) registerManagementRoutes() {
|
||||
mgmt.PATCH("/generative-language-api-key", s.mgmt.PatchGlKeys)
|
||||
mgmt.DELETE("/generative-language-api-key", s.mgmt.DeleteGlKeys)
|
||||
|
||||
mgmt.GET("/logs", s.mgmt.GetLogs)
|
||||
mgmt.DELETE("/logs", s.mgmt.DeleteLogs)
|
||||
mgmt.GET("/request-log", s.mgmt.GetRequestLog)
|
||||
mgmt.PUT("/request-log", s.mgmt.PutRequestLog)
|
||||
mgmt.PATCH("/request-log", s.mgmt.PutRequestLog)
|
||||
@@ -654,7 +664,11 @@ func (s *Server) applyAccessConfig(oldCfg, newCfg *config.Config) {
|
||||
// - clients: The new slice of AI service clients
|
||||
// - cfg: The new application configuration
|
||||
func (s *Server) UpdateClients(cfg *config.Config) {
|
||||
oldCfg := s.cfg
|
||||
// Reconstruct old config from YAML snapshot to avoid reference sharing issues
|
||||
var oldCfg *config.Config
|
||||
if len(s.oldConfigYaml) > 0 {
|
||||
_ = yaml.Unmarshal(s.oldConfigYaml, &oldCfg)
|
||||
}
|
||||
|
||||
// Update request logger enabled state if it has changed
|
||||
previousRequestLog := false
|
||||
@@ -735,6 +749,8 @@ func (s *Server) UpdateClients(cfg *config.Config) {
|
||||
|
||||
s.applyAccessConfig(oldCfg, cfg)
|
||||
s.cfg = cfg
|
||||
// Save YAML snapshot for next comparison
|
||||
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||
s.handlers.UpdateClients(&cfg.SDKConfig)
|
||||
|
||||
if !cfg.RemoteManagement.DisableControlPanel {
|
||||
|
||||
@@ -8,6 +8,15 @@ import "time"
|
||||
// GetClaudeModels returns the standard Claude model definitions
|
||||
func GetClaudeModels() []*ModelInfo {
|
||||
return []*ModelInfo{
|
||||
|
||||
{
|
||||
ID: "claude-haiku-4-5-20251001",
|
||||
Object: "model",
|
||||
Created: 1759276800, // 2025-10-01
|
||||
OwnedBy: "anthropic",
|
||||
Type: "claude",
|
||||
DisplayName: "Claude 4.5 Haiku",
|
||||
},
|
||||
{
|
||||
ID: "claude-sonnet-4-5-20250929",
|
||||
Object: "model",
|
||||
|
||||
@@ -143,6 +143,31 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
go func() {
|
||||
defer close(out)
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// If from == to (Claude → Claude), directly forward the SSE stream without translation
|
||||
if from == to {
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
buf := make([]byte, 20_971_520)
|
||||
scanner.Buffer(buf, 20_971_520)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||
if detail, ok := parseClaudeStreamUsage(line); ok {
|
||||
reporter.publish(ctx, detail)
|
||||
}
|
||||
// Forward the line as-is to preserve SSE format
|
||||
cloned := make([]byte, len(line)+1)
|
||||
copy(cloned, line)
|
||||
cloned[len(line)] = '\n'
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
|
||||
}
|
||||
if err = scanner.Err(); err != nil {
|
||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// For other formats, use translation
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
buf := make([]byte, 20_971_520)
|
||||
scanner.Buffer(buf, 20_971_520)
|
||||
|
||||
@@ -60,7 +60,11 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
||||
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("gemini-cli")
|
||||
budgetOverride, includeOverride, hasOverride := util.GeminiThinkingFromMetadata(req.Metadata)
|
||||
basePayload := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false)
|
||||
if hasOverride {
|
||||
basePayload = util.ApplyGeminiCLIThinkingConfig(basePayload, budgetOverride, includeOverride)
|
||||
}
|
||||
basePayload = fixGeminiCLIImageAspectRatio(req.Model, basePayload)
|
||||
|
||||
action := "generateContent"
|
||||
@@ -149,7 +153,11 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
||||
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("gemini-cli")
|
||||
budgetOverride, includeOverride, hasOverride := util.GeminiThinkingFromMetadata(req.Metadata)
|
||||
basePayload := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||
if hasOverride {
|
||||
basePayload = util.ApplyGeminiCLIThinkingConfig(basePayload, budgetOverride, includeOverride)
|
||||
}
|
||||
basePayload = fixGeminiCLIImageAspectRatio(req.Model, basePayload)
|
||||
|
||||
projectID := strings.TrimSpace(stringValue(auth.Metadata, "project_id"))
|
||||
@@ -292,8 +300,12 @@ func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
|
||||
budgetOverride, includeOverride, hasOverride := util.GeminiThinkingFromMetadata(req.Metadata)
|
||||
for _, attemptModel := range models {
|
||||
payload := sdktranslator.TranslateRequest(from, to, attemptModel, bytes.Clone(req.Payload), false)
|
||||
if hasOverride {
|
||||
payload = util.ApplyGeminiCLIThinkingConfig(payload, budgetOverride, includeOverride)
|
||||
}
|
||||
payload = deleteJSONField(payload, "project")
|
||||
payload = deleteJSONField(payload, "model")
|
||||
payload = disableGeminiThinkingConfig(payload, attemptModel)
|
||||
|
||||
@@ -77,6 +77,9 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("gemini")
|
||||
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false)
|
||||
if budgetOverride, includeOverride, ok := util.GeminiThinkingFromMetadata(req.Metadata); ok {
|
||||
body = util.ApplyGeminiThinkingConfig(body, budgetOverride, includeOverride)
|
||||
}
|
||||
body = disableGeminiThinkingConfig(body, req.Model)
|
||||
body = fixGeminiImageAspectRatio(req.Model, body)
|
||||
|
||||
@@ -136,6 +139,9 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("gemini")
|
||||
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||
if budgetOverride, includeOverride, ok := util.GeminiThinkingFromMetadata(req.Metadata); ok {
|
||||
body = util.ApplyGeminiThinkingConfig(body, budgetOverride, includeOverride)
|
||||
}
|
||||
body = disableGeminiThinkingConfig(body, req.Model)
|
||||
body = fixGeminiImageAspectRatio(req.Model, body)
|
||||
|
||||
@@ -208,6 +214,9 @@ func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut
|
||||
from := opts.SourceFormat
|
||||
to := sdktranslator.FromString("gemini")
|
||||
translatedReq := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), false)
|
||||
if budgetOverride, includeOverride, ok := util.GeminiThinkingFromMetadata(req.Metadata); ok {
|
||||
translatedReq = util.ApplyGeminiThinkingConfig(translatedReq, budgetOverride, includeOverride)
|
||||
}
|
||||
translatedReq = disableGeminiThinkingConfig(translatedReq, req.Model)
|
||||
translatedReq = fixGeminiImageAspectRatio(req.Model, translatedReq)
|
||||
respCtx := context.WithValue(ctx, "alt", opts.Alt)
|
||||
|
||||
@@ -37,6 +37,8 @@ type ConvertOpenAIResponseToAnthropicParams struct {
|
||||
ContentBlocksStopped bool
|
||||
// Track if message_delta has been sent
|
||||
MessageDeltaSent bool
|
||||
// Track if message_start has been sent
|
||||
MessageStarted bool
|
||||
}
|
||||
|
||||
// ToolCallAccumulator holds the state for accumulating tool call data
|
||||
@@ -84,20 +86,12 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR
|
||||
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||
}
|
||||
|
||||
root := gjson.ParseBytes(rawJSON)
|
||||
|
||||
// Check if this is a streaming chunk or non-streaming response
|
||||
objectType := root.Get("object").String()
|
||||
|
||||
if objectType == "chat.completion.chunk" {
|
||||
// Handle streaming response
|
||||
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||
} else if objectType == "chat.completion" {
|
||||
// Handle non-streaming response
|
||||
streamResult := gjson.GetBytes(originalRequestRawJSON, "stream")
|
||||
if !streamResult.Exists() || (streamResult.Exists() && streamResult.Type == gjson.False) {
|
||||
return convertOpenAINonStreamingToAnthropic(rawJSON)
|
||||
} else {
|
||||
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||
}
|
||||
|
||||
return []string{}
|
||||
}
|
||||
|
||||
// convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events
|
||||
@@ -118,7 +112,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
|
||||
// Check if this is the first chunk (has role)
|
||||
if delta := root.Get("choices.0.delta"); delta.Exists() {
|
||||
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" {
|
||||
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" && !param.MessageStarted {
|
||||
// Send message_start event
|
||||
messageStart := map[string]interface{}{
|
||||
"type": "message_start",
|
||||
@@ -138,6 +132,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
}
|
||||
messageStartJSON, _ := json.Marshal(messageStart)
|
||||
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
|
||||
param.MessageStarted = true
|
||||
|
||||
// Don't send content_block_start for text here - wait for actual content
|
||||
}
|
||||
|
||||
@@ -97,8 +97,8 @@ func ConvertOpenAIResponseToGemini(_ context.Context, _ string, originalRequestR
|
||||
var results []string
|
||||
|
||||
choices.ForEach(func(choiceIndex, choice gjson.Result) bool {
|
||||
// Base Gemini response template
|
||||
template := `{"candidates":[{"content":{"parts":[],"role":"model"},"finishReason":"STOP","index":0}]}`
|
||||
// Base Gemini response template without finishReason; set when known
|
||||
template := `{"candidates":[{"content":{"parts":[],"role":"model"},"index":0}]}`
|
||||
|
||||
// Set model if available
|
||||
if model := root.Get("model"); model.Exists() {
|
||||
@@ -514,8 +514,8 @@ func tryParseNumber(s string) (interface{}, bool) {
|
||||
func ConvertOpenAIResponseToGeminiNonStream(_ context.Context, _ string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string {
|
||||
root := gjson.ParseBytes(rawJSON)
|
||||
|
||||
// Base Gemini response template
|
||||
out := `{"candidates":[{"content":{"parts":[],"role":"model"},"finishReason":"STOP","index":0}]}`
|
||||
// Base Gemini response template without finishReason; set when known
|
||||
out := `{"candidates":[{"content":{"parts":[],"role":"model"},"index":0}]}`
|
||||
|
||||
// Set model if available
|
||||
if model := root.Get("model"); model.Exists() {
|
||||
|
||||
@@ -67,9 +67,20 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context,
|
||||
rawJSON = bytes.TrimSpace(rawJSON[5:])
|
||||
}
|
||||
|
||||
rawJSON = bytes.TrimSpace(rawJSON)
|
||||
if len(rawJSON) == 0 {
|
||||
return []string{}
|
||||
}
|
||||
if bytes.Equal(rawJSON, []byte("[DONE]")) {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
root := gjson.ParseBytes(rawJSON)
|
||||
obj := root.Get("object").String()
|
||||
if obj != "chat.completion.chunk" {
|
||||
obj := root.Get("object")
|
||||
if obj.Exists() && obj.String() != "" && obj.String() != "chat.completion.chunk" {
|
||||
return []string{}
|
||||
}
|
||||
if !root.Get("choices").Exists() || !root.Get("choices").IsArray() {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
|
||||
181
internal/util/gemini_thinking.go
Normal file
181
internal/util/gemini_thinking.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/tidwall/sjson"
|
||||
)
|
||||
|
||||
const (
|
||||
GeminiThinkingBudgetMetadataKey = "gemini_thinking_budget"
|
||||
GeminiIncludeThoughtsMetadataKey = "gemini_include_thoughts"
|
||||
GeminiOriginalModelMetadataKey = "gemini_original_model"
|
||||
)
|
||||
|
||||
func ParseGeminiThinkingSuffix(model string) (string, *int, *bool, bool) {
|
||||
if model == "" {
|
||||
return model, nil, nil, false
|
||||
}
|
||||
lower := strings.ToLower(model)
|
||||
if !strings.HasPrefix(lower, "gemini-") {
|
||||
return model, nil, nil, false
|
||||
}
|
||||
|
||||
if strings.HasSuffix(lower, "-nothinking") {
|
||||
base := model[:len(model)-len("-nothinking")]
|
||||
budgetValue := 0
|
||||
if strings.HasPrefix(lower, "gemini-2.5-pro") {
|
||||
budgetValue = 128
|
||||
}
|
||||
include := false
|
||||
return base, &budgetValue, &include, true
|
||||
}
|
||||
|
||||
idx := strings.LastIndex(lower, "-thinking-")
|
||||
if idx == -1 {
|
||||
return model, nil, nil, false
|
||||
}
|
||||
|
||||
digits := model[idx+len("-thinking-"):]
|
||||
if digits == "" {
|
||||
return model, nil, nil, false
|
||||
}
|
||||
end := len(digits)
|
||||
for i := 0; i < len(digits); i++ {
|
||||
if digits[i] < '0' || digits[i] > '9' {
|
||||
end = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if end == 0 {
|
||||
return model, nil, nil, false
|
||||
}
|
||||
valueStr := digits[:end]
|
||||
value, err := strconv.Atoi(valueStr)
|
||||
if err != nil {
|
||||
return model, nil, nil, false
|
||||
}
|
||||
base := model[:idx]
|
||||
budgetValue := value
|
||||
return base, &budgetValue, nil, true
|
||||
}
|
||||
|
||||
func ApplyGeminiThinkingConfig(body []byte, budget *int, includeThoughts *bool) []byte {
|
||||
if budget == nil && includeThoughts == nil {
|
||||
return body
|
||||
}
|
||||
updated := body
|
||||
if budget != nil {
|
||||
valuePath := "generationConfig.thinkingConfig.thinkingBudget"
|
||||
rewritten, err := sjson.SetBytes(updated, valuePath, *budget)
|
||||
if err == nil {
|
||||
updated = rewritten
|
||||
}
|
||||
}
|
||||
if includeThoughts != nil {
|
||||
valuePath := "generationConfig.thinkingConfig.include_thoughts"
|
||||
rewritten, err := sjson.SetBytes(updated, valuePath, *includeThoughts)
|
||||
if err == nil {
|
||||
updated = rewritten
|
||||
}
|
||||
}
|
||||
return updated
|
||||
}
|
||||
|
||||
func ApplyGeminiCLIThinkingConfig(body []byte, budget *int, includeThoughts *bool) []byte {
|
||||
if budget == nil && includeThoughts == nil {
|
||||
return body
|
||||
}
|
||||
updated := body
|
||||
if budget != nil {
|
||||
valuePath := "request.generationConfig.thinkingConfig.thinkingBudget"
|
||||
rewritten, err := sjson.SetBytes(updated, valuePath, *budget)
|
||||
if err == nil {
|
||||
updated = rewritten
|
||||
}
|
||||
}
|
||||
if includeThoughts != nil {
|
||||
valuePath := "request.generationConfig.thinkingConfig.include_thoughts"
|
||||
rewritten, err := sjson.SetBytes(updated, valuePath, *includeThoughts)
|
||||
if err == nil {
|
||||
updated = rewritten
|
||||
}
|
||||
}
|
||||
return updated
|
||||
}
|
||||
|
||||
func GeminiThinkingFromMetadata(metadata map[string]any) (*int, *bool, bool) {
|
||||
if len(metadata) == 0 {
|
||||
return nil, nil, false
|
||||
}
|
||||
var (
|
||||
budgetPtr *int
|
||||
includePtr *bool
|
||||
matched bool
|
||||
)
|
||||
if rawBudget, ok := metadata[GeminiThinkingBudgetMetadataKey]; ok {
|
||||
switch v := rawBudget.(type) {
|
||||
case int:
|
||||
budget := v
|
||||
budgetPtr = &budget
|
||||
matched = true
|
||||
case int32:
|
||||
budget := int(v)
|
||||
budgetPtr = &budget
|
||||
matched = true
|
||||
case int64:
|
||||
budget := int(v)
|
||||
budgetPtr = &budget
|
||||
matched = true
|
||||
case float64:
|
||||
budget := int(v)
|
||||
budgetPtr = &budget
|
||||
matched = true
|
||||
case json.Number:
|
||||
if val, err := v.Int64(); err == nil {
|
||||
budget := int(val)
|
||||
budgetPtr = &budget
|
||||
matched = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if rawInclude, ok := metadata[GeminiIncludeThoughtsMetadataKey]; ok {
|
||||
switch v := rawInclude.(type) {
|
||||
case bool:
|
||||
include := v
|
||||
includePtr = &include
|
||||
matched = true
|
||||
case string:
|
||||
if parsed, err := strconv.ParseBool(v); err == nil {
|
||||
include := parsed
|
||||
includePtr = &include
|
||||
matched = true
|
||||
}
|
||||
case json.Number:
|
||||
if val, err := v.Int64(); err == nil {
|
||||
include := val != 0
|
||||
includePtr = &include
|
||||
matched = true
|
||||
}
|
||||
case int:
|
||||
include := v != 0
|
||||
includePtr = &include
|
||||
matched = true
|
||||
case int32:
|
||||
include := v != 0
|
||||
includePtr = &include
|
||||
matched = true
|
||||
case int64:
|
||||
include := v != 0
|
||||
includePtr = &include
|
||||
matched = true
|
||||
case float64:
|
||||
include := v != 0
|
||||
includePtr = &include
|
||||
matched = true
|
||||
}
|
||||
}
|
||||
return budgetPtr, includePtr, matched
|
||||
}
|
||||
@@ -7,8 +7,9 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
@@ -197,33 +198,65 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
||||
}
|
||||
|
||||
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
||||
// v6.1: Intelligent Buffered Streamer strategy
|
||||
// Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms).
|
||||
// Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing
|
||||
// flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency.
|
||||
writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB
|
||||
ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms
|
||||
defer ticker.Stop()
|
||||
|
||||
var chunkIdx int
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.Request.Context().Done():
|
||||
// Context cancelled, flush any remaining data before exit
|
||||
_ = writer.Flush()
|
||||
cancel(c.Request.Context().Err())
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// Smart flush: only flush when buffer has sufficient data (≥50% full)
|
||||
// This reduces flush frequency while ensuring data flows naturally
|
||||
buffered := writer.Buffered()
|
||||
if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer)
|
||||
if err := writer.Flush(); err != nil {
|
||||
// Error flushing, cancel and return
|
||||
cancel(err)
|
||||
return
|
||||
}
|
||||
flusher.Flush() // Also flush the underlying http.ResponseWriter
|
||||
}
|
||||
|
||||
case chunk, ok := <-data:
|
||||
if !ok {
|
||||
flusher.Flush()
|
||||
// Stream ended, flush remaining data
|
||||
_ = writer.Flush()
|
||||
cancel(nil)
|
||||
return
|
||||
}
|
||||
|
||||
if bytes.HasPrefix(chunk, []byte("event:")) {
|
||||
_, _ = c.Writer.Write([]byte("\n"))
|
||||
// Forward the complete SSE event block directly (already formatted by the translator).
|
||||
// The translator returns a complete SSE-compliant event block, including event:, data:, and separators.
|
||||
// The handler just needs to forward it without reassembly.
|
||||
if len(chunk) > 0 {
|
||||
_, _ = writer.Write(chunk)
|
||||
}
|
||||
chunkIdx++
|
||||
|
||||
_, _ = c.Writer.Write(chunk)
|
||||
_, _ = c.Writer.Write([]byte("\n"))
|
||||
|
||||
flusher.Flush()
|
||||
case errMsg, ok := <-errs:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if errMsg != nil {
|
||||
h.WriteErrorResponse(c, errMsg)
|
||||
flusher.Flush()
|
||||
// An error occurred: emit as a proper SSE error event
|
||||
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
|
||||
_, _ = writer.WriteString("event: error\n")
|
||||
_, _ = writer.WriteString("data: ")
|
||||
_, _ = writer.Write(errorBytes)
|
||||
_, _ = writer.WriteString("\n\n")
|
||||
_ = writer.Flush()
|
||||
}
|
||||
var execErr error
|
||||
if errMsg != nil {
|
||||
@@ -231,7 +264,26 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
|
||||
}
|
||||
cancel(execErr)
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type claudeErrorDetail struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type claudeErrorResponse struct {
|
||||
Type string `json:"type"`
|
||||
Error claudeErrorDetail `json:"error"`
|
||||
}
|
||||
|
||||
func (h *ClaudeCodeAPIHandler) toClaudeError(msg *interfaces.ErrorMessage) claudeErrorResponse {
|
||||
return claudeErrorResponse{
|
||||
Type: "error",
|
||||
Error: claudeErrorDetail{
|
||||
Type: "api_error",
|
||||
Message: msg.Error.Error(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,20 +133,27 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c *
|
||||
// ExecuteWithAuthManager executes a non-streaming request via the core auth manager.
|
||||
// This path is the only supported execution route.
|
||||
func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
|
||||
providers := util.GetProviderName(modelName)
|
||||
normalizedModel, metadata := normalizeModelMetadata(modelName)
|
||||
providers := util.GetProviderName(normalizedModel)
|
||||
if len(providers) == 0 {
|
||||
return nil, &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: fmt.Errorf("unknown provider for model %s", modelName)}
|
||||
}
|
||||
req := coreexecutor.Request{
|
||||
Model: modelName,
|
||||
Model: normalizedModel,
|
||||
Payload: cloneBytes(rawJSON),
|
||||
}
|
||||
if cloned := cloneMetadata(metadata); cloned != nil {
|
||||
req.Metadata = cloned
|
||||
}
|
||||
opts := coreexecutor.Options{
|
||||
Stream: false,
|
||||
Alt: alt,
|
||||
OriginalRequest: cloneBytes(rawJSON),
|
||||
SourceFormat: sdktranslator.FromString(handlerType),
|
||||
}
|
||||
if cloned := cloneMetadata(metadata); cloned != nil {
|
||||
opts.Metadata = cloned
|
||||
}
|
||||
resp, err := h.AuthManager.Execute(ctx, providers, req, opts)
|
||||
if err != nil {
|
||||
return nil, &interfaces.ErrorMessage{StatusCode: http.StatusInternalServerError, Error: err}
|
||||
@@ -157,20 +164,27 @@ func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType
|
||||
// ExecuteCountWithAuthManager executes a non-streaming request via the core auth manager.
|
||||
// This path is the only supported execution route.
|
||||
func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
|
||||
providers := util.GetProviderName(modelName)
|
||||
normalizedModel, metadata := normalizeModelMetadata(modelName)
|
||||
providers := util.GetProviderName(normalizedModel)
|
||||
if len(providers) == 0 {
|
||||
return nil, &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: fmt.Errorf("unknown provider for model %s", modelName)}
|
||||
}
|
||||
req := coreexecutor.Request{
|
||||
Model: modelName,
|
||||
Model: normalizedModel,
|
||||
Payload: cloneBytes(rawJSON),
|
||||
}
|
||||
if cloned := cloneMetadata(metadata); cloned != nil {
|
||||
req.Metadata = cloned
|
||||
}
|
||||
opts := coreexecutor.Options{
|
||||
Stream: false,
|
||||
Alt: alt,
|
||||
OriginalRequest: cloneBytes(rawJSON),
|
||||
SourceFormat: sdktranslator.FromString(handlerType),
|
||||
}
|
||||
if cloned := cloneMetadata(metadata); cloned != nil {
|
||||
opts.Metadata = cloned
|
||||
}
|
||||
resp, err := h.AuthManager.ExecuteCount(ctx, providers, req, opts)
|
||||
if err != nil {
|
||||
return nil, &interfaces.ErrorMessage{StatusCode: http.StatusInternalServerError, Error: err}
|
||||
@@ -181,7 +195,8 @@ func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handle
|
||||
// ExecuteStreamWithAuthManager executes a streaming request via the core auth manager.
|
||||
// This path is the only supported execution route.
|
||||
func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) (<-chan []byte, <-chan *interfaces.ErrorMessage) {
|
||||
providers := util.GetProviderName(modelName)
|
||||
normalizedModel, metadata := normalizeModelMetadata(modelName)
|
||||
providers := util.GetProviderName(normalizedModel)
|
||||
if len(providers) == 0 {
|
||||
errChan := make(chan *interfaces.ErrorMessage, 1)
|
||||
errChan <- &interfaces.ErrorMessage{StatusCode: http.StatusBadRequest, Error: fmt.Errorf("unknown provider for model %s", modelName)}
|
||||
@@ -189,15 +204,21 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
|
||||
return nil, errChan
|
||||
}
|
||||
req := coreexecutor.Request{
|
||||
Model: modelName,
|
||||
Model: normalizedModel,
|
||||
Payload: cloneBytes(rawJSON),
|
||||
}
|
||||
if cloned := cloneMetadata(metadata); cloned != nil {
|
||||
req.Metadata = cloned
|
||||
}
|
||||
opts := coreexecutor.Options{
|
||||
Stream: true,
|
||||
Alt: alt,
|
||||
OriginalRequest: cloneBytes(rawJSON),
|
||||
SourceFormat: sdktranslator.FromString(handlerType),
|
||||
}
|
||||
if cloned := cloneMetadata(metadata); cloned != nil {
|
||||
opts.Metadata = cloned
|
||||
}
|
||||
chunks, err := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
|
||||
if err != nil {
|
||||
errChan := make(chan *interfaces.ErrorMessage, 1)
|
||||
@@ -232,6 +253,34 @@ func cloneBytes(src []byte) []byte {
|
||||
return dst
|
||||
}
|
||||
|
||||
func normalizeModelMetadata(modelName string) (string, map[string]any) {
|
||||
baseModel, budget, include, matched := util.ParseGeminiThinkingSuffix(modelName)
|
||||
if !matched {
|
||||
return baseModel, nil
|
||||
}
|
||||
metadata := map[string]any{
|
||||
util.GeminiOriginalModelMetadataKey: modelName,
|
||||
}
|
||||
if budget != nil {
|
||||
metadata[util.GeminiThinkingBudgetMetadataKey] = *budget
|
||||
}
|
||||
if include != nil {
|
||||
metadata[util.GeminiIncludeThoughtsMetadataKey] = *include
|
||||
}
|
||||
return baseModel, metadata
|
||||
}
|
||||
|
||||
func cloneMetadata(src map[string]any) map[string]any {
|
||||
if len(src) == 0 {
|
||||
return nil
|
||||
}
|
||||
dst := make(map[string]any, len(src))
|
||||
for k, v := range src {
|
||||
dst[k] = v
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// WriteErrorResponse writes an error message to the response writer using the HTTP status embedded in the message.
|
||||
func (h *BaseAPIHandler) WriteErrorResponse(c *gin.Context, msg *interfaces.ErrorMessage) {
|
||||
status := http.StatusInternalServerError
|
||||
|
||||
Reference in New Issue
Block a user