Compare commits

...

18 Commits

Author SHA1 Message Date
Luis Pater
3569e5779a feat: enhance quota management with backoff levels and cooldown logic
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
2025-10-21 18:44:28 +08:00
Luis Pater
20985d1a10 Refactor executor error handling and usage reporting
- Updated the Execute methods in various executors (GeminiCLIExecutor, GeminiExecutor, IFlowExecutor, OpenAICompatExecutor, QwenExecutor) to return a response and error as named return values for improved clarity.
- Enhanced error handling by deferring failure tracking in usage reporters, ensuring that failures are reported correctly.
- Improved response body handling by ensuring proper closure and error logging for HTTP responses across all executors.
- Added failure tracking and reporting in the usage reporter to capture unsuccessful requests.
- Updated the usage logging structure to include a 'Failed' field for better tracking of request outcomes.
- Adjusted the logic in the RequestStatistics and Record methods to accommodate the new failure tracking mechanism.
2025-10-21 11:22:24 +08:00
Luis Pater
67f553806b feat: implement management asset configuration and auto-updater 2025-10-21 09:01:58 +08:00
Luis Pater
29044312a4 docs: add Subtitle Translator tool to README files 2025-10-21 02:48:08 +08:00
Luis Pater
5b3fc092ee Merge pull request #151 from VjayC/add-subtitle-translator
docs: add Subtitle Translator to projects list
2025-10-21 02:44:50 +08:00
Vijay Chimmi
792e8d09d7 docs: add Subtitle Translator to projects list 2025-10-20 11:29:18 -07:00
Luis Pater
eadccb229f Fixed: #148
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat(executor): add initial cache_helpers.go file
2025-10-20 10:17:29 +08:00
Luis Pater
fed6f3ecd7 Merge pull request #147 from router-for-me/config
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat(mgmt): support YAML config retrieval and updates via /config.yaml
2025-10-19 22:26:38 +08:00
hkfires
f8dcd707a6 feat(mgmt): support YAML config retrieval and updates via /config.yaml 2025-10-19 21:56:29 +08:00
Luis Pater
0e91e95287 Merge pull request #145 from router-for-me/path
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat: prefer util.WritablePath() for logs and local storage
2025-10-19 20:50:44 +08:00
Luis Pater
c5dcbc1c1a Merge pull request #146 from router-for-me/iflow
feat(iflow): add masked token logs; increase refresh lead to 24h
2025-10-19 20:49:40 +08:00
hkfires
4504ba5329 feat(iflow): add masked token logs; increase refresh lead to 24h 2025-10-19 10:56:29 +08:00
hkfires
d16599fa1d feat: prefer util.WritablePath() for logs and local storage 2025-10-19 10:19:55 +08:00
Luis Pater
674393ec12 Merge pull request #139 from router-for-me/log
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat(logging): centralize sensitive header masking
2025-10-18 22:25:28 +08:00
hkfires
9f45806106 feat(logging): centralize sensitive header masking 2025-10-18 17:16:00 +08:00
Luis Pater
307ae76ed4 refactor: streamline ConvertCodexResponseToGeminiNonStream by removing unnecessary buffer and improving response handling
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
2025-10-18 16:08:30 +08:00
Luis Pater
735b21394c Fixed: #137
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
refactor: simplify ConvertCodexResponseToClaudeNonStream by removing bufio.Scanner usage and restructuring response parsing logic
2025-10-18 06:22:42 +08:00
Luis Pater
9cdef937af fix: initialize contentBlocks with an empty slice and improve content handling in ConvertOpenAIResponseToClaudeNonStream
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
2025-10-17 08:47:09 +08:00
30 changed files with 1154 additions and 603 deletions

View File

@@ -797,6 +797,10 @@ Those projects are based on CLIProxyAPI:
Native macOS menu bar app to use your Claude Code & ChatGPT subscriptions with AI coding tools - no API keys needed Native macOS menu bar app to use your Claude Code & ChatGPT subscriptions with AI coding tools - no API keys needed
### [Subtitle Translator](https://github.com/VjayC/SRT-Subtitle-Translator-Validator)
Browser-based tool to translate SRT subtitles using your Gemini subscription via CLIProxyAPI with automatic validation/error correction - no API keys needed
> [!NOTE] > [!NOTE]
> If you developed a project based on CLIProxyAPI, please open a PR to add it to this list. > If you developed a project based on CLIProxyAPI, please open a PR to add it to this list.

View File

@@ -807,6 +807,10 @@ docker run --rm -p 8317:8317 -v /path/to/your/config.yaml:/CLIProxyAPI/config.ya
一个原生 macOS 菜单栏应用,让您可以使用 Claude Code & ChatGPT 订阅服务和 AI 编程工具,无需 API 密钥。 一个原生 macOS 菜单栏应用,让您可以使用 Claude Code & ChatGPT 订阅服务和 AI 编程工具,无需 API 密钥。
### [Subtitle Translator](https://github.com/VjayC/SRT-Subtitle-Translator-Validator)
一款基于浏览器的 SRT 字幕翻译工具,可通过 CLI 代理 API 使用您的 Gemini 订阅。内置自动验证与错误修正功能,无需 API 密钥。
> [!NOTE] > [!NOTE]
> 如果你开发了基于 CLIProxyAPI 的项目,请提交一个 PR拉取请求将其添加到此列表中。 > 如果你开发了基于 CLIProxyAPI 的项目,请提交一个 PR拉取请求将其添加到此列表中。

View File

@@ -20,6 +20,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/internal/cmd" "github.com/router-for-me/CLIProxyAPI/v6/internal/cmd"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/logging" "github.com/router-for-me/CLIProxyAPI/v6/internal/logging"
"github.com/router-for-me/CLIProxyAPI/v6/internal/managementasset"
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc" "github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
"github.com/router-for-me/CLIProxyAPI/v6/internal/store" "github.com/router-for-me/CLIProxyAPI/v6/internal/store"
_ "github.com/router-for-me/CLIProxyAPI/v6/internal/translator" _ "github.com/router-for-me/CLIProxyAPI/v6/internal/translator"
@@ -147,6 +148,7 @@ func main() {
} }
return "", false return "", false
} }
writableBase := util.WritablePath()
if value, ok := lookupEnv("PGSTORE_DSN", "pgstore_dsn"); ok { if value, ok := lookupEnv("PGSTORE_DSN", "pgstore_dsn"); ok {
usePostgresStore = true usePostgresStore = true
pgStoreDSN = value pgStoreDSN = value
@@ -158,6 +160,13 @@ func main() {
if value, ok := lookupEnv("PGSTORE_LOCAL_PATH", "pgstore_local_path"); ok { if value, ok := lookupEnv("PGSTORE_LOCAL_PATH", "pgstore_local_path"); ok {
pgStoreLocalPath = value pgStoreLocalPath = value
} }
if pgStoreLocalPath == "" {
if writableBase != "" {
pgStoreLocalPath = writableBase
} else {
pgStoreLocalPath = wd
}
}
useGitStore = false useGitStore = false
} }
if value, ok := lookupEnv("GITSTORE_GIT_URL", "gitstore_git_url"); ok { if value, ok := lookupEnv("GITSTORE_GIT_URL", "gitstore_git_url"); ok {
@@ -229,11 +238,14 @@ func main() {
log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir()) log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir())
} }
} else if useObjectStore { } else if useObjectStore {
objectStoreRoot := objectStoreLocalPath if objectStoreLocalPath == "" {
if objectStoreRoot == "" { if writableBase != "" {
objectStoreRoot = wd objectStoreLocalPath = writableBase
} else {
objectStoreLocalPath = wd
}
} }
objectStoreRoot = filepath.Join(objectStoreRoot, "objectstore") objectStoreRoot := filepath.Join(objectStoreLocalPath, "objectstore")
resolvedEndpoint := strings.TrimSpace(objectStoreEndpoint) resolvedEndpoint := strings.TrimSpace(objectStoreEndpoint)
useSSL := true useSSL := true
if strings.Contains(resolvedEndpoint, "://") { if strings.Contains(resolvedEndpoint, "://") {
@@ -289,7 +301,11 @@ func main() {
} }
} else if useGitStore { } else if useGitStore {
if gitStoreLocalPath == "" { if gitStoreLocalPath == "" {
gitStoreLocalPath = wd if writableBase != "" {
gitStoreLocalPath = writableBase
} else {
gitStoreLocalPath = wd
}
} }
gitStoreRoot = filepath.Join(gitStoreLocalPath, "gitstore") gitStoreRoot = filepath.Join(gitStoreLocalPath, "gitstore")
authDir := filepath.Join(gitStoreRoot, "auths") authDir := filepath.Join(gitStoreRoot, "auths")
@@ -376,6 +392,7 @@ func main() {
} else { } else {
cfg.AuthDir = resolvedAuthDir cfg.AuthDir = resolvedAuthDir
} }
managementasset.SetCurrentConfig(cfg)
// Create login options to be used in authentication flows. // Create login options to be used in authentication flows.
options := &cmd.LoginOptions{ options := &cmd.LoginOptions{
@@ -419,6 +436,7 @@ func main() {
return return
} }
// Start the main proxy service // Start the main proxy service
managementasset.StartAutoUpdater(context.Background(), configFilePath)
cmd.StartService(cfg, configFilePath, password) cmd.StartService(cfg, configFilePath, password)
} }
} }

View File

@@ -1,13 +1,126 @@
package management package management
import ( import (
"io"
"net/http"
"os"
"path/filepath"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"gopkg.in/yaml.v3"
) )
func (h *Handler) GetConfig(c *gin.Context) { func (h *Handler) GetConfig(c *gin.Context) {
c.JSON(200, h.cfg) c.JSON(200, h.cfg)
} }
func (h *Handler) GetConfigYAML(c *gin.Context) {
data, err := os.ReadFile(h.configFilePath)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "read_failed", "message": err.Error()})
return
}
var node yaml.Node
if err := yaml.Unmarshal(data, &node); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "parse_failed", "message": err.Error()})
return
}
c.Header("Content-Type", "application/yaml; charset=utf-8")
c.Header("Vary", "format, Accept")
enc := yaml.NewEncoder(c.Writer)
enc.SetIndent(2)
_ = enc.Encode(&node)
_ = enc.Close()
}
func WriteConfig(path string, data []byte) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
if _, err := f.Write(data); err != nil {
f.Close()
return err
}
if err := f.Sync(); err != nil {
f.Close()
return err
}
return f.Close()
}
func (h *Handler) PutConfigYAML(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid_yaml", "message": "cannot read request body"})
return
}
var cfg config.Config
if err := yaml.Unmarshal(body, &cfg); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid_yaml", "message": err.Error()})
return
}
// Validate config using LoadConfigOptional with optional=false to enforce parsing
tmpDir := filepath.Dir(h.configFilePath)
tmpFile, err := os.CreateTemp(tmpDir, "config-validate-*.yaml")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": err.Error()})
return
}
tempFile := tmpFile.Name()
if _, err := tmpFile.Write(body); err != nil {
tmpFile.Close()
os.Remove(tempFile)
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": err.Error()})
return
}
if err := tmpFile.Close(); err != nil {
os.Remove(tempFile)
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": err.Error()})
return
}
defer os.Remove(tempFile)
_, err = config.LoadConfigOptional(tempFile, false)
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "invalid_config", "message": err.Error()})
return
}
h.mu.Lock()
defer h.mu.Unlock()
if WriteConfig(h.configFilePath, body) != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": "failed to write config"})
return
}
// Reload into handler to keep memory in sync
newCfg, err := config.LoadConfig(h.configFilePath)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "reload_failed", "message": err.Error()})
return
}
h.cfg = newCfg
c.JSON(http.StatusOK, gin.H{"ok": true, "changed": []string{"config"}})
}
// GetConfigFile returns the raw config.yaml file bytes without re-encoding.
// It preserves comments and original formatting/styles.
func (h *Handler) GetConfigFile(c *gin.Context) {
data, err := os.ReadFile(h.configFilePath)
if err != nil {
if os.IsNotExist(err) {
c.JSON(http.StatusNotFound, gin.H{"error": "not_found", "message": "config file not found"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": "read_failed", "message": err.Error()})
return
}
c.Header("Content-Type", "application/yaml; charset=utf-8")
c.Header("Cache-Control", "no-store")
c.Header("X-Content-Type-Options", "nosniff")
// Write raw bytes as-is
_, _ = c.Writer.Write(data)
}
// Debug // Debug
func (h *Handler) GetDebug(c *gin.Context) { c.JSON(200, gin.H{"debug": h.cfg.Debug}) } func (h *Handler) GetDebug(c *gin.Context) { c.JSON(200, gin.H{"debug": h.cfg.Debug}) }
func (h *Handler) PutDebug(c *gin.Context) { h.updateBoolField(c, func(v bool) { h.cfg.Debug = v }) } func (h *Handler) PutDebug(c *gin.Context) { h.updateBoolField(c, func(v bool) { h.cfg.Debug = v }) }

View File

@@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
) )
const ( const (
@@ -145,6 +146,9 @@ func (h *Handler) logDirectory() string {
if h.logDir != "" { if h.logDir != "" {
return h.logDir return h.logDir
} }
if base := util.WritablePath(); base != "" {
return filepath.Join(base, "logs")
}
if h.configFilePath != "" { if h.configFilePath != "" {
dir := filepath.Dir(h.configFilePath) dir := filepath.Dir(h.configFilePath)
if dir != "" && dir != "." { if dir != "" && dir != "." {

View File

@@ -13,5 +13,8 @@ func (h *Handler) GetUsageStatistics(c *gin.Context) {
if h != nil && h.usageStats != nil { if h != nil && h.usageStats != nil {
snapshot = h.usageStats.Snapshot() snapshot = h.usageStats.Snapshot()
} }
c.JSON(http.StatusOK, gin.H{"usage": snapshot}) c.JSON(http.StatusOK, gin.H{
"usage": snapshot,
"failed_requests": snapshot.FailureCount,
})
} }

View File

@@ -52,7 +52,11 @@ type serverOptionConfig struct {
type ServerOption func(*serverOptionConfig) type ServerOption func(*serverOptionConfig)
func defaultRequestLoggerFactory(cfg *config.Config, configPath string) logging.RequestLogger { func defaultRequestLoggerFactory(cfg *config.Config, configPath string) logging.RequestLogger {
return logging.NewFileRequestLogger(cfg.RequestLog, "logs", filepath.Dir(configPath)) configDir := filepath.Dir(configPath)
if base := util.WritablePath(); base != "" {
return logging.NewFileRequestLogger(cfg.RequestLog, filepath.Join(base, "logs"), configDir)
}
return logging.NewFileRequestLogger(cfg.RequestLog, "logs", configDir)
} }
// WithMiddleware appends additional Gin middleware during server construction. // WithMiddleware appends additional Gin middleware during server construction.
@@ -228,12 +232,17 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
// Save initial YAML snapshot // Save initial YAML snapshot
s.oldConfigYaml, _ = yaml.Marshal(cfg) s.oldConfigYaml, _ = yaml.Marshal(cfg)
s.applyAccessConfig(nil, cfg) s.applyAccessConfig(nil, cfg)
managementasset.SetCurrentConfig(cfg)
// Initialize management handler // Initialize management handler
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager) s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
if optionState.localPassword != "" { if optionState.localPassword != "" {
s.mgmt.SetLocalPassword(optionState.localPassword) s.mgmt.SetLocalPassword(optionState.localPassword)
} }
s.mgmt.SetLogDirectory(filepath.Join(s.currentPath, "logs")) logDir := filepath.Join(s.currentPath, "logs")
if base := util.WritablePath(); base != "" {
logDir = filepath.Join(base, "logs")
}
s.mgmt.SetLogDirectory(logDir)
s.localPassword = optionState.localPassword s.localPassword = optionState.localPassword
// Setup routes // Setup routes
@@ -376,6 +385,8 @@ func (s *Server) registerManagementRoutes() {
{ {
mgmt.GET("/usage", s.mgmt.GetUsageStatistics) mgmt.GET("/usage", s.mgmt.GetUsageStatistics)
mgmt.GET("/config", s.mgmt.GetConfig) mgmt.GET("/config", s.mgmt.GetConfig)
mgmt.PUT("/config.yaml", s.mgmt.PutConfigYAML)
mgmt.GET("/config.yaml", s.mgmt.GetConfigFile)
mgmt.GET("/debug", s.mgmt.GetDebug) mgmt.GET("/debug", s.mgmt.GetDebug)
mgmt.PUT("/debug", s.mgmt.PutDebug) mgmt.PUT("/debug", s.mgmt.PutDebug)
@@ -749,6 +760,7 @@ func (s *Server) UpdateClients(cfg *config.Config) {
s.applyAccessConfig(oldCfg, cfg) s.applyAccessConfig(oldCfg, cfg)
s.cfg = cfg s.cfg = cfg
managementasset.SetCurrentConfig(cfg)
// Save YAML snapshot for next comparison // Save YAML snapshot for next comparison
s.oldConfigYaml, _ = yaml.Marshal(cfg) s.oldConfigYaml, _ = yaml.Marshal(cfg)
s.handlers.UpdateClients(&cfg.SDKConfig) s.handlers.UpdateClients(&cfg.SDKConfig)

View File

@@ -10,6 +10,7 @@ import (
"sync" "sync"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/natefinch/lumberjack.v2" "gopkg.in/natefinch/lumberjack.v2"
) )
@@ -72,7 +73,10 @@ func ConfigureLogOutput(loggingToFile bool) error {
defer writerMu.Unlock() defer writerMu.Unlock()
if loggingToFile { if loggingToFile {
const logDir = "logs" logDir := "logs"
if base := util.WritablePath(); base != "" {
logDir = filepath.Join(base, "logs")
}
if err := os.MkdirAll(logDir, 0o755); err != nil { if err := os.MkdirAll(logDir, 0o755); err != nil {
return fmt.Errorf("logging: failed to create log directory: %w", err) return fmt.Errorf("logging: failed to create log directory: %w", err)
} }

View File

@@ -16,6 +16,7 @@ import (
"time" "time"
"github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
) )
// RequestLogger defines the interface for logging HTTP requests and responses. // RequestLogger defines the interface for logging HTTP requests and responses.
@@ -485,7 +486,8 @@ func (l *FileRequestLogger) formatRequestInfo(url, method string, headers map[st
content.WriteString("=== HEADERS ===\n") content.WriteString("=== HEADERS ===\n")
for key, values := range headers { for key, values := range headers {
for _, value := range values { for _, value := range values {
content.WriteString(fmt.Sprintf("%s: %s\n", key, value)) masked := util.MaskSensitiveHeaderValue(key, value)
content.WriteString(fmt.Sprintf("%s: %s\n", key, masked))
} }
} }
content.WriteString("\n") content.WriteString("\n")

View File

@@ -13,8 +13,10 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util" "github.com/router-for-me/CLIProxyAPI/v6/internal/util"
sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -33,8 +35,83 @@ const ManagementFileName = managementAssetName
var ( var (
lastUpdateCheckMu sync.Mutex lastUpdateCheckMu sync.Mutex
lastUpdateCheckTime time.Time lastUpdateCheckTime time.Time
currentConfigPtr atomic.Pointer[config.Config]
disableControlPanel atomic.Bool
schedulerOnce sync.Once
schedulerConfigPath atomic.Value
) )
// SetCurrentConfig stores the latest configuration snapshot for management asset decisions.
func SetCurrentConfig(cfg *config.Config) {
if cfg == nil {
currentConfigPtr.Store(nil)
return
}
prevDisabled := disableControlPanel.Load()
currentConfigPtr.Store(cfg)
disableControlPanel.Store(cfg.RemoteManagement.DisableControlPanel)
if prevDisabled && !cfg.RemoteManagement.DisableControlPanel {
lastUpdateCheckMu.Lock()
lastUpdateCheckTime = time.Time{}
lastUpdateCheckMu.Unlock()
}
}
// StartAutoUpdater launches a background goroutine that periodically ensures the management asset is up to date.
// It respects the disable-control-panel flag on every iteration and supports hot-reloaded configurations.
func StartAutoUpdater(ctx context.Context, configFilePath string) {
configFilePath = strings.TrimSpace(configFilePath)
if configFilePath == "" {
log.Debug("management asset auto-updater skipped: empty config path")
return
}
schedulerConfigPath.Store(configFilePath)
schedulerOnce.Do(func() {
go runAutoUpdater(ctx)
})
}
func runAutoUpdater(ctx context.Context) {
if ctx == nil {
ctx = context.Background()
}
ticker := time.NewTicker(updateCheckInterval)
defer ticker.Stop()
runOnce := func() {
cfg := currentConfigPtr.Load()
if cfg == nil {
log.Debug("management asset auto-updater skipped: config not yet available")
return
}
if disableControlPanel.Load() {
log.Debug("management asset auto-updater skipped: control panel disabled")
return
}
configPath, _ := schedulerConfigPath.Load().(string)
staticDir := StaticDir(configPath)
EnsureLatestManagementHTML(ctx, staticDir, cfg.ProxyURL)
}
runOnce()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runOnce()
}
}
}
func newHTTPClient(proxyURL string) *http.Client { func newHTTPClient(proxyURL string) *http.Client {
client := &http.Client{Timeout: 15 * time.Second} client := &http.Client{Timeout: 15 * time.Second}
@@ -64,6 +141,10 @@ func StaticDir(configFilePath string) string {
return cleaned return cleaned
} }
if writable := util.WritablePath(); writable != "" {
return filepath.Join(writable, "static")
}
configFilePath = strings.TrimSpace(configFilePath) configFilePath = strings.TrimSpace(configFilePath)
if configFilePath == "" { if configFilePath == "" {
return "" return ""
@@ -105,6 +186,11 @@ func EnsureLatestManagementHTML(ctx context.Context, staticDir string, proxyURL
ctx = context.Background() ctx = context.Background()
} }
if disableControlPanel.Load() {
log.Debug("management asset sync skipped: control panel disabled by configuration")
return
}
staticDir = strings.TrimSpace(staticDir) staticDir = strings.TrimSpace(staticDir)
if staticDir == "" { if staticDir == "" {
log.Debug("management asset sync skipped: empty static directory") log.Debug("management asset sync skipped: empty static directory")

View File

@@ -0,0 +1,10 @@
package executor
import "time"
type codexCache struct {
ID string
Expire time.Time
}
var codexCacheMap = map[string]codexCache{}

View File

@@ -36,13 +36,14 @@ func (e *ClaudeExecutor) Identifier() string { return "claude" }
func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := claudeCreds(auth) apiKey, baseURL := claudeCreds(auth)
if baseURL == "" { if baseURL == "" {
baseURL = "https://api.anthropic.com" baseURL = "https://api.anthropic.com"
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("claude") to := sdktranslator.FromString("claude")
// Use streaming translation to preserve function calling, except for claude. // Use streaming translation to preserve function calling, except for claude.
@@ -56,7 +57,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL) url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
applyClaudeHeaders(httpReq, apiKey, false) applyClaudeHeaders(httpReq, apiKey, false)
var authID, authLabel, authType, authValue string var authID, authLabel, authType, authValue string
@@ -78,30 +79,31 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
defer func() { defer func() {
if errClose := resp.Body.Close(); errClose != nil { if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose) log.Errorf("response body close error: %v", errClose)
} }
}() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body) b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
} }
reader := io.Reader(resp.Body) reader := io.Reader(httpResp.Body)
var decoder *zstd.Decoder var decoder *zstd.Decoder
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) { if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) {
decoder, err = zstd.NewReader(resp.Body) decoder, err = zstd.NewReader(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err) return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err)
} }
reader = decoder reader = decoder
defer decoder.Close() defer decoder.Close()
@@ -109,7 +111,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
data, err := io.ReadAll(reader) data, err := io.ReadAll(reader)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
if stream { if stream {
@@ -124,16 +126,18 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
} }
var param any var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param) out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := claudeCreds(auth) apiKey, baseURL := claudeCreds(auth)
if baseURL == "" { if baseURL == "" {
baseURL = "https://api.anthropic.com" baseURL = "https://api.anthropic.com"
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("claude") to := sdktranslator.FromString("claude")
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
@@ -164,27 +168,35 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(httpResp.Body)
b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)} if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
// If from == to (Claude → Claude), directly forward the SSE stream without translation // If from == to (Claude → Claude), directly forward the SSE stream without translation
if from == to { if from == to {
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
for scanner.Scan() { for scanner.Scan() {
@@ -199,15 +211,16 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
cloned[len(line)] = '\n' cloned[len(line)] = '\n'
out <- cliproxyexecutor.StreamChunk{Payload: cloned} out <- cliproxyexecutor.StreamChunk{Payload: cloned}
} }
if err = scanner.Err(); err != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, errScan)
out <- cliproxyexecutor.StreamChunk{Err: err} reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
return return
} }
// For other formats, use translation // For other formats, use translation
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -222,12 +235,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
} }
} }
if err = scanner.Err(); err != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, errScan)
out <- cliproxyexecutor.StreamChunk{Err: err} reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
}() }()
return out, nil return stream, nil
} }
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -39,13 +39,14 @@ func (e *CodexExecutor) Identifier() string { return "codex" }
func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := codexCreds(auth) apiKey, baseURL := codexCreds(auth)
if baseURL == "" { if baseURL == "" {
baseURL = "https://chatgpt.com/backend-api/codex" baseURL = "https://chatgpt.com/backend-api/codex"
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("codex") to := sdktranslator.FromString("codex")
@@ -78,10 +79,30 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
body, _ = sjson.SetBytes(body, "stream", true) body, _ = sjson.SetBytes(body, "stream", true)
body, _ = sjson.DeleteBytes(body, "previous_response_id") body, _ = sjson.DeleteBytes(body, "previous_response_id")
additionalHeaders := make(map[string]string)
if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() {
var cache codexCache
var hasKey bool
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) {
cache = codexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
codexCacheMap[key] = cache
}
additionalHeaders["Conversation_id"] = cache.ID
additionalHeaders["Session_id"] = cache.ID
body, _ = sjson.SetBytes(body, "prompt_cache_key", cache.ID)
}
}
url := strings.TrimSuffix(baseURL, "/") + "/responses" url := strings.TrimSuffix(baseURL, "/") + "/responses"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
applyCodexHeaders(httpReq, auth, apiKey) applyCodexHeaders(httpReq, auth, apiKey)
var authID, authLabel, authType, authValue string var authID, authLabel, authType, authValue string
@@ -90,6 +111,9 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
authLabel = auth.Label authLabel = auth.Label
authType, authValue = auth.AccountInfo() authType, authValue = auth.AccountInfo()
} }
for k, v := range additionalHeaders {
httpReq.Header.Set(k, v)
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: url, URL: url,
Method: http.MethodPost, Method: http.MethodPost,
@@ -101,25 +125,29 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
AuthType: authType, AuthType: authType,
AuthValue: authValue, AuthValue: authValue,
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
defer func() { _ = resp.Body.Close() }() defer func() {
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if errClose := httpResp.Body.Close(); errClose != nil {
if resp.StatusCode < 200 || resp.StatusCode >= 300 { log.Errorf("codex executor: close response body error: %v", errClose)
b, _ := io.ReadAll(resp.Body) }
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
} }
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
@@ -140,18 +168,21 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
var param any var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, &param) out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
return cliproxyexecutor.Response{}, statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"} err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
return resp, err
} }
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := codexCreds(auth) apiKey, baseURL := codexCreds(auth)
if baseURL == "" { if baseURL == "" {
baseURL = "https://chatgpt.com/backend-api/codex" baseURL = "https://chatgpt.com/backend-api/codex"
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("codex") to := sdktranslator.FromString("codex")
@@ -183,6 +214,26 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
body, _ = sjson.DeleteBytes(body, "previous_response_id") body, _ = sjson.DeleteBytes(body, "previous_response_id")
additionalHeaders := make(map[string]string)
if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() {
var cache codexCache
var hasKey bool
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) {
cache = codexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
codexCacheMap[key] = cache
}
additionalHeaders["Conversation_id"] = cache.ID
additionalHeaders["Session_id"] = cache.ID
body, _ = sjson.SetBytes(body, "prompt_cache_key", cache.ID)
}
}
url := strings.TrimSuffix(baseURL, "/") + "/responses" url := strings.TrimSuffix(baseURL, "/") + "/responses"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil { if err != nil {
@@ -195,6 +246,9 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
authLabel = auth.Label authLabel = auth.Label
authType, authValue = auth.AccountInfo() authType, authValue = auth.AccountInfo()
} }
for k, v := range additionalHeaders {
httpReq.Header.Set(k, v)
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{ recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: url, URL: url,
Method: http.MethodPost, Method: http.MethodPost,
@@ -208,28 +262,36 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }() data, readErr := io.ReadAll(httpResp.Body)
b, readErr := io.ReadAll(resp.Body) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("codex executor: close response body error: %v", errClose)
}
if readErr != nil { if readErr != nil {
recordAPIResponseError(ctx, e.cfg, readErr) recordAPIResponseError(ctx, e.cfg, readErr)
return nil, readErr return nil, readErr
} }
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, data)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
return nil, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
scanner := bufio.NewScanner(resp.Body) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("codex executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -251,12 +313,13 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
} }
} }
if err = scanner.Err(); err != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, errScan)
out <- cliproxyexecutor.StreamChunk{Err: err} reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
}() }()
return out, nil return stream, nil
} }
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -51,12 +51,13 @@ func (e *GeminiCLIExecutor) Identifier() string { return "gemini-cli" }
func (e *GeminiCLIExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *GeminiCLIExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth) tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("gemini-cli") to := sdktranslator.FromString("gemini-cli")
@@ -104,7 +105,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
tok, errTok := tokenSource.Token() tok, errTok := tokenSource.Token()
if errTok != nil { if errTok != nil {
return cliproxyexecutor.Response{}, errTok err = errTok
return resp, err
} }
updateGeminiCLITokenMetadata(auth, baseTokenData, tok) updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
@@ -115,7 +117,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload)) reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
if errReq != nil { if errReq != nil {
return cliproxyexecutor.Response{}, errReq err = errReq
return resp, err
} }
reqHTTP.Header.Set("Content-Type", "application/json") reqHTTP.Header.Set("Content-Type", "application/json")
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken) reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
@@ -133,44 +136,60 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
AuthValue: authValue, AuthValue: authValue,
}) })
resp, errDo := httpClient.Do(reqHTTP) httpResp, errDo := httpClient.Do(reqHTTP)
if errDo != nil { if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo) recordAPIResponseError(ctx, e.cfg, errDo)
return cliproxyexecutor.Response{}, errDo err = errDo
return resp, err
} }
data, errRead := io.ReadAll(resp.Body)
_ = resp.Body.Close() data, errRead := io.ReadAll(httpResp.Body)
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini cli executor: close response body error: %v", errClose)
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if errRead != nil { if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead) recordAPIResponseError(ctx, e.cfg, errRead)
return cliproxyexecutor.Response{}, errRead err = errRead
return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
if resp.StatusCode >= 200 && resp.StatusCode < 300 { if httpResp.StatusCode >= 200 && httpResp.StatusCode < 300 {
reporter.publish(ctx, parseGeminiCLIUsage(data)) reporter.publish(ctx, parseGeminiCLIUsage(data))
var param any var param any
out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, &param) out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
lastStatus = resp.StatusCode
lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), data...) lastBody = append([]byte(nil), data...)
if resp.StatusCode != 429 { log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
break if httpResp.StatusCode == 429 {
continue
} }
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return resp, err
} }
if len(lastBody) > 0 { if len(lastBody) > 0 {
appendAPIResponseChunk(ctx, e.cfg, lastBody) appendAPIResponseChunk(ctx, e.cfg, lastBody)
} }
return cliproxyexecutor.Response{}, statusErr{code: lastStatus, msg: string(lastBody)} if lastStatus == 0 {
lastStatus = 429
}
err = statusErr{code: lastStatus, msg: string(lastBody)}
return resp, err
} }
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth) tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
if err != nil { if err != nil {
return nil, err return nil, err
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("gemini-cli") to := sdktranslator.FromString("gemini-cli")
@@ -207,7 +226,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
tok, errTok := tokenSource.Token() tok, errTok := tokenSource.Token()
if errTok != nil { if errTok != nil {
return nil, errTok err = errTok
return nil, err
} }
updateGeminiCLITokenMetadata(auth, baseTokenData, tok) updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
@@ -220,7 +240,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload)) reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
if errReq != nil { if errReq != nil {
return nil, errReq err = errReq
return nil, err
} }
reqHTTP.Header.Set("Content-Type", "application/json") reqHTTP.Header.Set("Content-Type", "application/json")
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken) reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
@@ -238,33 +259,43 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
AuthValue: authValue, AuthValue: authValue,
}) })
resp, errDo := httpClient.Do(reqHTTP) httpResp, errDo := httpClient.Do(reqHTTP)
if errDo != nil { if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo) recordAPIResponseError(ctx, e.cfg, errDo)
return nil, errDo err = errDo
return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
data, errRead := io.ReadAll(resp.Body) data, errRead := io.ReadAll(httpResp.Body)
_ = resp.Body.Close() if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini cli executor: close response body error: %v", errClose)
}
if errRead != nil { if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead) recordAPIResponseError(ctx, e.cfg, errRead)
return nil, errRead err = errRead
return nil, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
lastStatus = resp.StatusCode lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), data...) lastBody = append([]byte(nil), data...)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(data)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
if resp.StatusCode == 429 { if httpResp.StatusCode == 429 {
continue continue
} }
return nil, statusErr{code: resp.StatusCode, msg: string(data)} err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func(resp *http.Response, reqBody []byte, attempt string) { go func(resp *http.Response, reqBody []byte, attempt string) {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("gemini cli executor: close response body error: %v", errClose)
}
}()
if opts.Alt == "" { if opts.Alt == "" {
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(resp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
@@ -290,6 +321,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
} }
if errScan := scanner.Err(); errScan != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan) recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan} out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
return return
@@ -298,6 +330,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
data, errRead := io.ReadAll(resp.Body) data, errRead := io.ReadAll(resp.Body)
if errRead != nil { if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead) recordAPIResponseError(ctx, e.cfg, errRead)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errRead} out <- cliproxyexecutor.StreamChunk{Err: errRead}
return return
} }
@@ -313,15 +346,19 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
for i := range segments { for i := range segments {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(segments[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(segments[i])}
} }
}(resp, append([]byte(nil), payload...), attemptModel) }(httpResp, append([]byte(nil), payload...), attemptModel)
return out, nil return stream, nil
} }
if len(lastBody) > 0 {
appendAPIResponseChunk(ctx, e.cfg, lastBody)
}
if lastStatus == 0 { if lastStatus == 0 {
lastStatus = 429 lastStatus = 429
} }
return nil, statusErr{code: lastStatus, msg: string(lastBody)} err = statusErr{code: lastStatus, msg: string(lastBody)}
return nil, err
} }
func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -68,10 +68,11 @@ func (e *GeminiExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) e
// Returns: // Returns:
// - cliproxyexecutor.Response: The response from the API // - cliproxyexecutor.Response: The response from the API
// - error: An error if the request fails // - error: An error if the request fails
func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, bearer := geminiCreds(auth) apiKey, bearer := geminiCreds(auth)
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
// Official Gemini API via API key or OAuth bearer // Official Gemini API via API key or OAuth bearer
from := opts.SourceFormat from := opts.SourceFormat
@@ -98,7 +99,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Content-Type", "application/json")
if apiKey != "" { if apiKey != "" {
@@ -125,35 +126,42 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
defer func() { _ = resp.Body.Close() }() defer func() {
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if errClose := httpResp.Body.Close(); errClose != nil {
if resp.StatusCode < 200 || resp.StatusCode >= 300 { log.Errorf("gemini executor: close response body error: %v", errClose)
b, _ := io.ReadAll(resp.Body) }
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
} }
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseGeminiUsage(data)) reporter.publish(ctx, parseGeminiUsage(data))
var param any var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param) out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, bearer := geminiCreds(auth) apiKey, bearer := geminiCreds(auth)
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("gemini") to := sdktranslator.FromString("gemini")
@@ -202,24 +210,32 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(httpResp.Body)
b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)} if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
scanner := bufio.NewScanner(resp.Body) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -238,12 +254,13 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
for i := range lines { for i := range lines {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])}
} }
if err = scanner.Err(); err != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, errScan)
out <- cliproxyexecutor.StreamChunk{Err: err} reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
}() }()
return out, nil return stream, nil
} }
func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -12,6 +12,7 @@ import (
iflowauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/iflow" iflowauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/iflow"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
@@ -40,16 +41,18 @@ func (e *IFlowExecutor) Identifier() string { return "iflow" }
func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
// Execute performs a non-streaming chat completion request. // Execute performs a non-streaming chat completion request.
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := iflowCreds(auth) apiKey, baseURL := iflowCreds(auth)
if strings.TrimSpace(apiKey) == "" { if strings.TrimSpace(apiKey) == "" {
return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: missing api key") err = fmt.Errorf("iflow executor: missing api key")
return resp, err
} }
if baseURL == "" { if baseURL == "" {
baseURL = iflowauth.DefaultAPIBaseURL baseURL = iflowauth.DefaultAPIBaseURL
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("openai") to := sdktranslator.FromString("openai")
@@ -59,7 +62,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
applyIFlowHeaders(httpReq, apiKey, false) applyIFlowHeaders(httpReq, apiKey, false)
var authID, authLabel, authType, authValue string var authID, authLabel, authType, authValue string
@@ -81,45 +84,53 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
defer func() { _ = resp.Body.Close() }() defer func() {
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body) b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("iflow request error: status %d body %s", resp.StatusCode, string(b)) log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
} }
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseOpenAIUsage(data)) reporter.publish(ctx, parseOpenAIUsage(data))
var param any var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param) out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
// ExecuteStream performs a streaming chat completion request. // ExecuteStream performs a streaming chat completion request.
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := iflowCreds(auth) apiKey, baseURL := iflowCreds(auth)
if strings.TrimSpace(apiKey) == "" { if strings.TrimSpace(apiKey) == "" {
return nil, fmt.Errorf("iflow executor: missing api key") err = fmt.Errorf("iflow executor: missing api key")
return nil, err
} }
if baseURL == "" { if baseURL == "" {
baseURL = iflowauth.DefaultAPIBaseURL baseURL = iflowauth.DefaultAPIBaseURL
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("openai") to := sdktranslator.FromString("openai")
@@ -157,27 +168,35 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }() data, _ := io.ReadAll(httpResp.Body)
b, _ := io.ReadAll(resp.Body) if errClose := httpResp.Body.Close(); errClose != nil {
appendAPIResponseChunk(ctx, e.cfg, b) log.Errorf("iflow executor: close response body error: %v", errClose)
log.Debugf("iflow streaming error: status %d body %s", resp.StatusCode, string(b)) }
return nil, statusErr{code: resp.StatusCode, msg: string(b)} appendAPIResponseChunk(ctx, e.cfg, data)
log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, string(data))
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -192,13 +211,14 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
} }
} }
if err := scanner.Err(); err != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, errScan)
out <- cliproxyexecutor.StreamChunk{Err: err} reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
}() }()
return out, nil return stream, nil
} }
// CountTokens is not implemented for iFlow. // CountTokens is not implemented for iFlow.
@@ -214,18 +234,28 @@ func (e *IFlowExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*
} }
refreshToken := "" refreshToken := ""
oldAccessToken := ""
if auth.Metadata != nil { if auth.Metadata != nil {
if v, ok := auth.Metadata["refresh_token"].(string); ok { if v, ok := auth.Metadata["refresh_token"].(string); ok {
refreshToken = strings.TrimSpace(v) refreshToken = strings.TrimSpace(v)
} }
if v, ok := auth.Metadata["access_token"].(string); ok {
oldAccessToken = strings.TrimSpace(v)
}
} }
if refreshToken == "" { if refreshToken == "" {
return auth, nil return auth, nil
} }
// Log the old access token (masked) before refresh
if oldAccessToken != "" {
log.Debugf("iflow executor: refreshing access token, old: %s", util.HideAPIKey(oldAccessToken))
}
svc := iflowauth.NewIFlowAuth(e.cfg) svc := iflowauth.NewIFlowAuth(e.cfg)
tokenData, err := svc.RefreshTokens(ctx, refreshToken) tokenData, err := svc.RefreshTokens(ctx, refreshToken)
if err != nil { if err != nil {
log.Errorf("iflow executor: token refresh failed: %v", err)
return nil, err return nil, err
} }
@@ -243,6 +273,9 @@ func (e *IFlowExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*
auth.Metadata["type"] = "iflow" auth.Metadata["type"] = "iflow"
auth.Metadata["last_refresh"] = time.Now().Format(time.RFC3339) auth.Metadata["last_refresh"] = time.Now().Format(time.RFC3339)
// Log the new access token (masked) after successful refresh
log.Debugf("iflow executor: token refresh successful, new: %s", util.HideAPIKey(tokenData.AccessToken))
if auth.Attributes == nil { if auth.Attributes == nil {
auth.Attributes = make(map[string]string) auth.Attributes = make(map[string]string)
} }

View File

@@ -275,7 +275,8 @@ func writeHeaders(builder *strings.Builder, headers http.Header) {
continue continue
} }
for _, value := range values { for _, value := range values {
builder.WriteString(fmt.Sprintf("%s: %s\n", key, sanitizeHeaderValue(key, value))) masked := util.MaskSensitiveHeaderValue(key, value)
builder.WriteString(fmt.Sprintf("%s: %s\n", key, masked))
} }
} }
} }
@@ -319,18 +320,3 @@ func formatAuthInfo(info upstreamRequestLog) string {
return strings.Join(parts, ", ") return strings.Join(parts, ", ")
} }
func sanitizeHeaderValue(key, value string) string {
trimmedValue := strings.TrimSpace(value)
lowerKey := strings.ToLower(strings.TrimSpace(key))
switch {
case strings.Contains(lowerKey, "authorization"),
strings.Contains(lowerKey, "api-key"),
strings.Contains(lowerKey, "apikey"),
strings.Contains(lowerKey, "token"),
strings.Contains(lowerKey, "secret"):
return util.HideAPIKey(trimmedValue)
default:
return trimmedValue
}
}

View File

@@ -38,12 +38,15 @@ func (e *OpenAICompatExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.A
return nil return nil
} }
func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
baseURL, apiKey := e.resolveCredentials(auth) baseURL, apiKey := e.resolveCredentials(auth)
if baseURL == "" { if baseURL == "" {
return cliproxyexecutor.Response{}, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
return
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
// Translate inbound request to OpenAI format // Translate inbound request to OpenAI format
from := opts.SourceFormat from := opts.SourceFormat
@@ -56,7 +59,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Content-Type", "application/json")
if apiKey != "" { if apiKey != "" {
@@ -82,38 +85,47 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
defer func() { _ = resp.Body.Close() }() defer func() {
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if errClose := httpResp.Body.Close(); errClose != nil {
if resp.StatusCode < 200 || resp.StatusCode >= 300 { log.Errorf("openai compat executor: close response body error: %v", errClose)
b, _ := io.ReadAll(resp.Body) }
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, body) appendAPIResponseChunk(ctx, e.cfg, body)
reporter.publish(ctx, parseOpenAIUsage(body)) reporter.publish(ctx, parseOpenAIUsage(body))
// Translate response back to source format when needed // Translate response back to source format when needed
var param any var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param) out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
baseURL, apiKey := e.resolveCredentials(auth) baseURL, apiKey := e.resolveCredentials(auth)
if baseURL == "" { if baseURL == "" {
return nil, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"} err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
return nil, err
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("openai") to := sdktranslator.FromString("openai")
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true) translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
@@ -152,24 +164,32 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(httpResp.Body)
b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)} if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
scanner := bufio.NewScanner(resp.Body) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -189,12 +209,13 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
} }
} }
if err = scanner.Err(); err != nil { if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, errScan)
out <- cliproxyexecutor.StreamChunk{Err: err} reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
}() }()
return out, nil return stream, nil
} }
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -38,13 +38,14 @@ func (e *QwenExecutor) Identifier() string { return "qwen" }
func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil } func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
token, baseURL := qwenCreds(auth) token, baseURL := qwenCreds(auth)
if baseURL == "" { if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1" baseURL = "https://portal.qwen.ai/v1"
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("openai") to := sdktranslator.FromString("openai")
@@ -53,7 +54,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil { if err != nil {
return cliproxyexecutor.Response{}, err return resp, err
} }
applyQwenHeaders(httpReq, token, false) applyQwenHeaders(httpReq, token, false)
var authID, authLabel, authType, authValue string var authID, authLabel, authType, authValue string
@@ -75,38 +76,45 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
defer func() { _ = resp.Body.Close() }() defer func() {
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) if errClose := httpResp.Body.Close(); errClose != nil {
if resp.StatusCode < 200 || resp.StatusCode >= 300 { log.Errorf("qwen executor: close response body error: %v", errClose)
b, _ := io.ReadAll(resp.Body) }
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)} err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
} }
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(httpResp.Body)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err return resp, err
} }
appendAPIResponseChunk(ctx, e.cfg, data) appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseOpenAIUsage(data)) reporter.publish(ctx, parseOpenAIUsage(data))
var param any var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param) out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
} }
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
token, baseURL := qwenCreds(auth) token, baseURL := qwenCreds(auth)
if baseURL == "" { if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1" baseURL = "https://portal.qwen.ai/v1"
} }
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth) reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat from := opts.SourceFormat
to := sdktranslator.FromString("openai") to := sdktranslator.FromString("openai")
@@ -145,24 +153,32 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
}) })
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0) httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq) httpResp, err := httpClient.Do(httpReq)
if err != nil { if err != nil {
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone()) recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }() b, _ := io.ReadAll(httpResp.Body)
b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b) appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b)) log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)} if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
} }
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() { go func() {
defer close(out) defer close(out)
defer func() { _ = resp.Body.Close() }() defer func() {
scanner := bufio.NewScanner(resp.Body) if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520) buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520) scanner.Buffer(buf, 20_971_520)
var param any var param any
@@ -177,12 +193,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])} out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
} }
} }
if err = scanner.Err(); err != nil { doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone([]byte("[DONE]")), &param)
recordAPIResponseError(ctx, e.cfg, err) for i := range doneChunks {
out <- cliproxyexecutor.StreamChunk{Err: err} out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])}
}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} }
}() }()
return out, nil return stream, nil
} }
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -41,6 +41,23 @@ func newUsageReporter(ctx context.Context, provider, model string, auth *cliprox
} }
func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) { func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
r.publishWithOutcome(ctx, detail, false)
}
func (r *usageReporter) publishFailure(ctx context.Context) {
r.publishWithOutcome(ctx, usage.Detail{}, true)
}
func (r *usageReporter) trackFailure(ctx context.Context, errPtr *error) {
if r == nil || errPtr == nil {
return
}
if *errPtr != nil {
r.publishFailure(ctx)
}
}
func (r *usageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool) {
if r == nil { if r == nil {
return return
} }
@@ -50,7 +67,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
detail.TotalTokens = total detail.TotalTokens = total
} }
} }
if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 { if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 && !failed {
return return
} }
r.once.Do(func() { r.once.Do(func() {
@@ -61,6 +78,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
APIKey: r.apiKey, APIKey: r.apiKey,
AuthID: r.authID, AuthID: r.authID,
RequestedAt: r.requestedAt, RequestedAt: r.requestedAt,
Failed: failed,
Detail: detail, Detail: detail,
}) })
}) })

View File

@@ -7,7 +7,6 @@
package claude package claude
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
@@ -180,56 +179,58 @@ func ConvertCodexResponseToClaude(_ context.Context, _ string, originalRequestRa
// Returns: // Returns:
// - string: A Claude Code-compatible JSON response containing all message content and metadata // - string: A Claude Code-compatible JSON response containing all message content and metadata
func ConvertCodexResponseToClaudeNonStream(_ context.Context, _ string, originalRequestRawJSON, _ []byte, rawJSON []byte, _ *any) string { func ConvertCodexResponseToClaudeNonStream(_ context.Context, _ string, originalRequestRawJSON, _ []byte, rawJSON []byte, _ *any) string {
scanner := bufio.NewScanner(bytes.NewReader(rawJSON))
buffer := make([]byte, 20_971_520)
scanner.Buffer(buffer, 20_971_520)
revNames := buildReverseMapFromClaudeOriginalShortToOriginal(originalRequestRawJSON) revNames := buildReverseMapFromClaudeOriginalShortToOriginal(originalRequestRawJSON)
for scanner.Scan() { rootResult := gjson.ParseBytes(rawJSON)
line := scanner.Bytes() if rootResult.Get("type").String() != "response.completed" {
if !bytes.HasPrefix(line, dataTag) { return ""
continue }
}
payload := bytes.TrimSpace(line[len(dataTag):])
if len(payload) == 0 {
continue
}
rootResult := gjson.ParseBytes(payload) responseData := rootResult.Get("response")
if rootResult.Get("type").String() != "response.completed" { if !responseData.Exists() {
continue return ""
} }
responseData := rootResult.Get("response") response := map[string]interface{}{
if !responseData.Exists() { "id": responseData.Get("id").String(),
continue "type": "message",
} "role": "assistant",
"model": responseData.Get("model").String(),
"content": []interface{}{},
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]interface{}{
"input_tokens": responseData.Get("usage.input_tokens").Int(),
"output_tokens": responseData.Get("usage.output_tokens").Int(),
},
}
response := map[string]interface{}{ var contentBlocks []interface{}
"id": responseData.Get("id").String(), hasToolCall := false
"type": "message",
"role": "assistant",
"model": responseData.Get("model").String(),
"content": []interface{}{},
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]interface{}{
"input_tokens": responseData.Get("usage.input_tokens").Int(),
"output_tokens": responseData.Get("usage.output_tokens").Int(),
},
}
var contentBlocks []interface{} if output := responseData.Get("output"); output.Exists() && output.IsArray() {
hasToolCall := false output.ForEach(func(_, item gjson.Result) bool {
switch item.Get("type").String() {
if output := responseData.Get("output"); output.Exists() && output.IsArray() { case "reasoning":
output.ForEach(func(_, item gjson.Result) bool { thinkingBuilder := strings.Builder{}
switch item.Get("type").String() { if summary := item.Get("summary"); summary.Exists() {
case "reasoning": if summary.IsArray() {
thinkingBuilder := strings.Builder{} summary.ForEach(func(_, part gjson.Result) bool {
if summary := item.Get("summary"); summary.Exists() { if txt := part.Get("text"); txt.Exists() {
if summary.IsArray() { thinkingBuilder.WriteString(txt.String())
summary.ForEach(func(_, part gjson.Result) bool { } else {
thinkingBuilder.WriteString(part.String())
}
return true
})
} else {
thinkingBuilder.WriteString(summary.String())
}
}
if thinkingBuilder.Len() == 0 {
if content := item.Get("content"); content.Exists() {
if content.IsArray() {
content.ForEach(func(_, part gjson.Result) bool {
if txt := part.Get("text"); txt.Exists() { if txt := part.Get("text"); txt.Exists() {
thinkingBuilder.WriteString(txt.String()) thinkingBuilder.WriteString(txt.String())
} else { } else {
@@ -238,114 +239,96 @@ func ConvertCodexResponseToClaudeNonStream(_ context.Context, _ string, original
return true return true
}) })
} else { } else {
thinkingBuilder.WriteString(summary.String()) thinkingBuilder.WriteString(content.String())
} }
} }
if thinkingBuilder.Len() == 0 {
if content := item.Get("content"); content.Exists() {
if content.IsArray() {
content.ForEach(func(_, part gjson.Result) bool {
if txt := part.Get("text"); txt.Exists() {
thinkingBuilder.WriteString(txt.String())
} else {
thinkingBuilder.WriteString(part.String())
}
return true
})
} else {
thinkingBuilder.WriteString(content.String())
}
}
}
if thinkingBuilder.Len() > 0 {
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "thinking",
"thinking": thinkingBuilder.String(),
})
}
case "message":
if content := item.Get("content"); content.Exists() {
if content.IsArray() {
content.ForEach(func(_, part gjson.Result) bool {
if part.Get("type").String() == "output_text" {
text := part.Get("text").String()
if text != "" {
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": text,
})
}
}
return true
})
} else {
text := content.String()
if text != "" {
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": text,
})
}
}
}
case "function_call":
hasToolCall = true
name := item.Get("name").String()
if original, ok := revNames[name]; ok {
name = original
}
toolBlock := map[string]interface{}{
"type": "tool_use",
"id": item.Get("call_id").String(),
"name": name,
"input": map[string]interface{}{},
}
if argsStr := item.Get("arguments").String(); argsStr != "" {
var args interface{}
if err := json.Unmarshal([]byte(argsStr), &args); err == nil {
toolBlock["input"] = args
}
}
contentBlocks = append(contentBlocks, toolBlock)
} }
return true if thinkingBuilder.Len() > 0 {
}) contentBlocks = append(contentBlocks, map[string]interface{}{
} "type": "thinking",
"thinking": thinkingBuilder.String(),
})
}
case "message":
if content := item.Get("content"); content.Exists() {
if content.IsArray() {
content.ForEach(func(_, part gjson.Result) bool {
if part.Get("type").String() == "output_text" {
text := part.Get("text").String()
if text != "" {
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": text,
})
}
}
return true
})
} else {
text := content.String()
if text != "" {
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": text,
})
}
}
}
case "function_call":
hasToolCall = true
name := item.Get("name").String()
if original, ok := revNames[name]; ok {
name = original
}
if len(contentBlocks) > 0 { toolBlock := map[string]interface{}{
response["content"] = contentBlocks "type": "tool_use",
} "id": item.Get("call_id").String(),
"name": name,
"input": map[string]interface{}{},
}
if stopReason := responseData.Get("stop_reason"); stopReason.Exists() && stopReason.String() != "" { if argsStr := item.Get("arguments").String(); argsStr != "" {
response["stop_reason"] = stopReason.String() var args interface{}
} else if hasToolCall { if err := json.Unmarshal([]byte(argsStr), &args); err == nil {
response["stop_reason"] = "tool_use" toolBlock["input"] = args
} else { }
response["stop_reason"] = "end_turn" }
}
if stopSequence := responseData.Get("stop_sequence"); stopSequence.Exists() && stopSequence.String() != "" { contentBlocks = append(contentBlocks, toolBlock)
response["stop_sequence"] = stopSequence.Value()
}
if responseData.Get("usage.input_tokens").Exists() || responseData.Get("usage.output_tokens").Exists() {
response["usage"] = map[string]interface{}{
"input_tokens": responseData.Get("usage.input_tokens").Int(),
"output_tokens": responseData.Get("usage.output_tokens").Int(),
} }
} return true
})
responseJSON, err := json.Marshal(response)
if err != nil {
return ""
}
return string(responseJSON)
} }
return "" if len(contentBlocks) > 0 {
response["content"] = contentBlocks
}
if stopReason := responseData.Get("stop_reason"); stopReason.Exists() && stopReason.String() != "" {
response["stop_reason"] = stopReason.String()
} else if hasToolCall {
response["stop_reason"] = "tool_use"
} else {
response["stop_reason"] = "end_turn"
}
if stopSequence := responseData.Get("stop_sequence"); stopSequence.Exists() && stopSequence.String() != "" {
response["stop_sequence"] = stopSequence.Value()
}
if responseData.Get("usage.input_tokens").Exists() || responseData.Get("usage.output_tokens").Exists() {
response["usage"] = map[string]interface{}{
"input_tokens": responseData.Get("usage.input_tokens").Int(),
"output_tokens": responseData.Get("usage.output_tokens").Int(),
}
}
responseJSON, err := json.Marshal(response)
if err != nil {
return ""
}
return string(responseJSON)
} }
// buildReverseMapFromClaudeOriginalShortToOriginal builds a map[short]original from original Claude request tools. // buildReverseMapFromClaudeOriginalShortToOriginal builds a map[short]original from original Claude request tools.

View File

@@ -5,7 +5,6 @@
package gemini package gemini
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
@@ -152,159 +151,146 @@ func ConvertCodexResponseToGemini(_ context.Context, modelName string, originalR
// Returns: // Returns:
// - string: A Gemini-compatible JSON response containing all message content and metadata // - string: A Gemini-compatible JSON response containing all message content and metadata
func ConvertCodexResponseToGeminiNonStream(_ context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string { func ConvertCodexResponseToGeminiNonStream(_ context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, _ *any) string {
scanner := bufio.NewScanner(bytes.NewReader(rawJSON)) rootResult := gjson.ParseBytes(rawJSON)
buffer := make([]byte, 20_971_520)
scanner.Buffer(buffer, 20_971_520)
for scanner.Scan() {
line := scanner.Bytes()
// log.Debug(string(line))
if !bytes.HasPrefix(line, dataTag) {
continue
}
rawJSON = bytes.TrimSpace(rawJSON[5:])
rootResult := gjson.ParseBytes(rawJSON) // Verify this is a response.completed event
if rootResult.Get("type").String() != "response.completed" {
// Verify this is a response.completed event return ""
if rootResult.Get("type").String() != "response.completed" {
continue
}
// Base Gemini response template for non-streaming
template := `{"candidates":[{"content":{"role":"model","parts":[]},"finishReason":"STOP"}],"usageMetadata":{"trafficType":"PROVISIONED_THROUGHPUT"},"modelVersion":"","createTime":"","responseId":""}`
// Set model version
template, _ = sjson.Set(template, "modelVersion", modelName)
// Set response metadata from the completed response
responseData := rootResult.Get("response")
if responseData.Exists() {
// Set response ID
if responseId := responseData.Get("id"); responseId.Exists() {
template, _ = sjson.Set(template, "responseId", responseId.String())
}
// Set creation time
if createdAt := responseData.Get("created_at"); createdAt.Exists() {
template, _ = sjson.Set(template, "createTime", time.Unix(createdAt.Int(), 0).Format(time.RFC3339Nano))
}
// Set usage metadata
if usage := responseData.Get("usage"); usage.Exists() {
inputTokens := usage.Get("input_tokens").Int()
outputTokens := usage.Get("output_tokens").Int()
totalTokens := inputTokens + outputTokens
template, _ = sjson.Set(template, "usageMetadata.promptTokenCount", inputTokens)
template, _ = sjson.Set(template, "usageMetadata.candidatesTokenCount", outputTokens)
template, _ = sjson.Set(template, "usageMetadata.totalTokenCount", totalTokens)
}
// Process output content to build parts array
var parts []interface{}
hasToolCall := false
var pendingFunctionCalls []interface{}
flushPendingFunctionCalls := func() {
if len(pendingFunctionCalls) > 0 {
// Add all pending function calls as individual parts
// This maintains the original Gemini API format while ensuring consecutive calls are grouped together
for _, fc := range pendingFunctionCalls {
parts = append(parts, fc)
}
pendingFunctionCalls = nil
}
}
if output := responseData.Get("output"); output.Exists() && output.IsArray() {
output.ForEach(func(key, value gjson.Result) bool {
itemType := value.Get("type").String()
switch itemType {
case "reasoning":
// Flush any pending function calls before adding non-function content
flushPendingFunctionCalls()
// Add thinking content
if content := value.Get("content"); content.Exists() {
part := map[string]interface{}{
"thought": true,
"text": content.String(),
}
parts = append(parts, part)
}
case "message":
// Flush any pending function calls before adding non-function content
flushPendingFunctionCalls()
// Add regular text content
if content := value.Get("content"); content.Exists() && content.IsArray() {
content.ForEach(func(_, contentItem gjson.Result) bool {
if contentItem.Get("type").String() == "output_text" {
if text := contentItem.Get("text"); text.Exists() {
part := map[string]interface{}{
"text": text.String(),
}
parts = append(parts, part)
}
}
return true
})
}
case "function_call":
// Collect function call for potential merging with consecutive ones
hasToolCall = true
functionCall := map[string]interface{}{
"functionCall": map[string]interface{}{
"name": func() string {
n := value.Get("name").String()
rev := buildReverseMapFromGeminiOriginal(originalRequestRawJSON)
if orig, ok := rev[n]; ok {
return orig
}
return n
}(),
"args": map[string]interface{}{},
},
}
// Parse and set arguments
if argsStr := value.Get("arguments").String(); argsStr != "" {
argsResult := gjson.Parse(argsStr)
if argsResult.IsObject() {
var args map[string]interface{}
if err := json.Unmarshal([]byte(argsStr), &args); err == nil {
functionCall["functionCall"].(map[string]interface{})["args"] = args
}
}
}
pendingFunctionCalls = append(pendingFunctionCalls, functionCall)
}
return true
})
// Handle any remaining pending function calls at the end
flushPendingFunctionCalls()
}
// Set the parts array
if len(parts) > 0 {
template, _ = sjson.SetRaw(template, "candidates.0.content.parts", mustMarshalJSON(parts))
}
// Set finish reason based on whether there were tool calls
if hasToolCall {
template, _ = sjson.Set(template, "candidates.0.finishReason", "STOP")
} else {
template, _ = sjson.Set(template, "candidates.0.finishReason", "STOP")
}
}
return template
} }
return ""
// Base Gemini response template for non-streaming
template := `{"candidates":[{"content":{"role":"model","parts":[]},"finishReason":"STOP"}],"usageMetadata":{"trafficType":"PROVISIONED_THROUGHPUT"},"modelVersion":"","createTime":"","responseId":""}`
// Set model version
template, _ = sjson.Set(template, "modelVersion", modelName)
// Set response metadata from the completed response
responseData := rootResult.Get("response")
if responseData.Exists() {
// Set response ID
if responseId := responseData.Get("id"); responseId.Exists() {
template, _ = sjson.Set(template, "responseId", responseId.String())
}
// Set creation time
if createdAt := responseData.Get("created_at"); createdAt.Exists() {
template, _ = sjson.Set(template, "createTime", time.Unix(createdAt.Int(), 0).Format(time.RFC3339Nano))
}
// Set usage metadata
if usage := responseData.Get("usage"); usage.Exists() {
inputTokens := usage.Get("input_tokens").Int()
outputTokens := usage.Get("output_tokens").Int()
totalTokens := inputTokens + outputTokens
template, _ = sjson.Set(template, "usageMetadata.promptTokenCount", inputTokens)
template, _ = sjson.Set(template, "usageMetadata.candidatesTokenCount", outputTokens)
template, _ = sjson.Set(template, "usageMetadata.totalTokenCount", totalTokens)
}
// Process output content to build parts array
var parts []interface{}
hasToolCall := false
var pendingFunctionCalls []interface{}
flushPendingFunctionCalls := func() {
if len(pendingFunctionCalls) > 0 {
// Add all pending function calls as individual parts
// This maintains the original Gemini API format while ensuring consecutive calls are grouped together
for _, fc := range pendingFunctionCalls {
parts = append(parts, fc)
}
pendingFunctionCalls = nil
}
}
if output := responseData.Get("output"); output.Exists() && output.IsArray() {
output.ForEach(func(key, value gjson.Result) bool {
itemType := value.Get("type").String()
switch itemType {
case "reasoning":
// Flush any pending function calls before adding non-function content
flushPendingFunctionCalls()
// Add thinking content
if content := value.Get("content"); content.Exists() {
part := map[string]interface{}{
"thought": true,
"text": content.String(),
}
parts = append(parts, part)
}
case "message":
// Flush any pending function calls before adding non-function content
flushPendingFunctionCalls()
// Add regular text content
if content := value.Get("content"); content.Exists() && content.IsArray() {
content.ForEach(func(_, contentItem gjson.Result) bool {
if contentItem.Get("type").String() == "output_text" {
if text := contentItem.Get("text"); text.Exists() {
part := map[string]interface{}{
"text": text.String(),
}
parts = append(parts, part)
}
}
return true
})
}
case "function_call":
// Collect function call for potential merging with consecutive ones
hasToolCall = true
functionCall := map[string]interface{}{
"functionCall": map[string]interface{}{
"name": func() string {
n := value.Get("name").String()
rev := buildReverseMapFromGeminiOriginal(originalRequestRawJSON)
if orig, ok := rev[n]; ok {
return orig
}
return n
}(),
"args": map[string]interface{}{},
},
}
// Parse and set arguments
if argsStr := value.Get("arguments").String(); argsStr != "" {
argsResult := gjson.Parse(argsStr)
if argsResult.IsObject() {
var args map[string]interface{}
if err := json.Unmarshal([]byte(argsStr), &args); err == nil {
functionCall["functionCall"].(map[string]interface{})["args"] = args
}
}
}
pendingFunctionCalls = append(pendingFunctionCalls, functionCall)
}
return true
})
// Handle any remaining pending function calls at the end
flushPendingFunctionCalls()
}
// Set the parts array
if len(parts) > 0 {
template, _ = sjson.SetRaw(template, "candidates.0.content.parts", mustMarshalJSON(parts))
}
// Set finish reason based on whether there were tool calls
if hasToolCall {
template, _ = sjson.Set(template, "candidates.0.finishReason", "STOP")
} else {
template, _ = sjson.Set(template, "candidates.0.finishReason", "STOP")
}
}
return template
} }
// buildReverseMapFromGeminiOriginal builds a map[short]original from original Gemini request tools. // buildReverseMapFromGeminiOriginal builds a map[short]original from original Gemini request tools.

View File

@@ -466,7 +466,7 @@ func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, origina
}, },
} }
var contentBlocks []interface{} contentBlocks := make([]interface{}, 0)
hasToolCall := false hasToolCall := false
if choices := root.Get("choices"); choices.Exists() && choices.IsArray() && len(choices.Array()) > 0 { if choices := root.Get("choices"); choices.Exists() && choices.IsArray() && len(choices.Array()) > 0 {
@@ -477,80 +477,90 @@ func ConvertOpenAIResponseToClaudeNonStream(_ context.Context, _ string, origina
} }
if message := choice.Get("message"); message.Exists() { if message := choice.Get("message"); message.Exists() {
if contentArray := message.Get("content"); contentArray.Exists() && contentArray.IsArray() { if contentResult := message.Get("content"); contentResult.Exists() {
var textBuilder strings.Builder if contentResult.IsArray() {
var thinkingBuilder strings.Builder var textBuilder strings.Builder
var thinkingBuilder strings.Builder
flushText := func() { flushText := func() {
if textBuilder.Len() == 0 { if textBuilder.Len() == 0 {
return return
}
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": textBuilder.String(),
})
textBuilder.Reset()
} }
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": textBuilder.String(),
})
textBuilder.Reset()
}
flushThinking := func() { flushThinking := func() {
if thinkingBuilder.Len() == 0 { if thinkingBuilder.Len() == 0 {
return return
}
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "thinking",
"thinking": thinkingBuilder.String(),
})
thinkingBuilder.Reset()
} }
contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "thinking",
"thinking": thinkingBuilder.String(),
})
thinkingBuilder.Reset()
}
for _, item := range contentArray.Array() { for _, item := range contentResult.Array() {
typeStr := item.Get("type").String() typeStr := item.Get("type").String()
switch typeStr { switch typeStr {
case "text": case "text":
flushThinking() flushThinking()
textBuilder.WriteString(item.Get("text").String()) textBuilder.WriteString(item.Get("text").String())
case "tool_calls": case "tool_calls":
flushThinking() flushThinking()
flushText() flushText()
toolCalls := item.Get("tool_calls") toolCalls := item.Get("tool_calls")
if toolCalls.IsArray() { if toolCalls.IsArray() {
toolCalls.ForEach(func(_, tc gjson.Result) bool { toolCalls.ForEach(func(_, tc gjson.Result) bool {
hasToolCall = true hasToolCall = true
toolUse := map[string]interface{}{ toolUse := map[string]interface{}{
"type": "tool_use", "type": "tool_use",
"id": tc.Get("id").String(), "id": tc.Get("id").String(),
"name": tc.Get("function.name").String(), "name": tc.Get("function.name").String(),
} }
argsStr := util.FixJSON(tc.Get("function.arguments").String()) argsStr := util.FixJSON(tc.Get("function.arguments").String())
if argsStr != "" { if argsStr != "" {
var parsed interface{} var parsed interface{}
if err := json.Unmarshal([]byte(argsStr), &parsed); err == nil { if err := json.Unmarshal([]byte(argsStr), &parsed); err == nil {
toolUse["input"] = parsed toolUse["input"] = parsed
} else {
toolUse["input"] = map[string]interface{}{}
}
} else { } else {
toolUse["input"] = map[string]interface{}{} toolUse["input"] = map[string]interface{}{}
} }
} else {
toolUse["input"] = map[string]interface{}{}
}
contentBlocks = append(contentBlocks, toolUse) contentBlocks = append(contentBlocks, toolUse)
return true return true
}) })
}
case "reasoning":
flushText()
if thinking := item.Get("text"); thinking.Exists() {
thinkingBuilder.WriteString(thinking.String())
}
default:
flushThinking()
flushText()
} }
case "reasoning": }
flushText()
if thinking := item.Get("text"); thinking.Exists() { flushThinking()
thinkingBuilder.WriteString(thinking.String()) flushText()
} } else if contentResult.Type == gjson.String {
default: textContent := contentResult.String()
flushThinking() if textContent != "" {
flushText() contentBlocks = append(contentBlocks, map[string]interface{}{
"type": "text",
"text": textContent,
})
} }
} }
flushThinking()
flushText()
} }
if toolCalls := message.Get("tool_calls"); toolCalls.Exists() && toolCalls.IsArray() { if toolCalls := message.Get("tool_calls"); toolCalls.Exists() && toolCalls.IsArray() {

View File

@@ -91,6 +91,7 @@ type RequestDetail struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Source string `json:"source"` Source string `json:"source"`
Tokens TokenStats `json:"tokens"` Tokens TokenStats `json:"tokens"`
Failed bool `json:"failed"`
} }
// TokenStats captures the token usage breakdown for a request. // TokenStats captures the token usage breakdown for a request.
@@ -165,7 +166,11 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record)
if statsKey == "" { if statsKey == "" {
statsKey = resolveAPIIdentifier(ctx, record) statsKey = resolveAPIIdentifier(ctx, record)
} }
success := resolveSuccess(ctx) failed := record.Failed
if !failed {
failed = !resolveSuccess(ctx)
}
success := !failed
modelName := record.Model modelName := record.Model
if modelName == "" { if modelName == "" {
modelName = "unknown" modelName = "unknown"
@@ -193,6 +198,7 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record)
Timestamp: timestamp, Timestamp: timestamp,
Source: record.Source, Source: record.Source,
Tokens: detail, Tokens: detail,
Failed: failed,
}) })
s.requestsByDay[dayKey]++ s.requestsByDay[dayKey]++

View File

@@ -4,6 +4,8 @@
package util package util
import ( import (
"strings"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry" "github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
) )
@@ -141,3 +143,48 @@ func HideAPIKey(apiKey string) string {
} }
return apiKey return apiKey
} }
// maskAuthorizationHeader masks the Authorization header value while preserving the auth type prefix.
// Common formats: "Bearer <token>", "Basic <credentials>", "ApiKey <key>", etc.
// It preserves the prefix (e.g., "Bearer ") and only masks the token/credential part.
//
// Parameters:
// - value: The Authorization header value
//
// Returns:
// - string: The masked Authorization value with prefix preserved
func MaskAuthorizationHeader(value string) string {
parts := strings.SplitN(strings.TrimSpace(value), " ", 2)
if len(parts) < 2 {
return HideAPIKey(value)
}
return parts[0] + " " + HideAPIKey(parts[1])
}
// MaskSensitiveHeaderValue masks sensitive header values while preserving expected formats.
//
// Behavior by header key (case-insensitive):
// - "Authorization": Preserve the auth type prefix (e.g., "Bearer ") and mask only the credential part.
// - Headers containing "api-key": Mask the entire value using HideAPIKey.
// - Others: Return the original value unchanged.
//
// Parameters:
// - key: The HTTP header name to inspect (case-insensitive matching).
// - value: The header value to mask when sensitive.
//
// Returns:
// - string: The masked value according to the header type; unchanged if not sensitive.
func MaskSensitiveHeaderValue(key, value string) string {
lowerKey := strings.ToLower(strings.TrimSpace(key))
switch {
case lowerKey == "authorization":
return MaskAuthorizationHeader(value)
case strings.Contains(lowerKey, "api-key"),
strings.Contains(lowerKey, "apikey"),
strings.Contains(lowerKey, "token"),
strings.Contains(lowerKey, "secret"):
return HideAPIKey(value)
default:
return value
}
}

View File

@@ -84,3 +84,17 @@ func CountAuthFiles(authDir string) int {
} }
return count return count
} }
// WritablePath returns the cleaned WRITABLE_PATH environment variable when it is set.
// It accepts both uppercase and lowercase variants for compatibility with existing conventions.
func WritablePath() string {
for _, key := range []string{"WRITABLE_PATH", "writable_path"} {
if value, ok := os.LookupEnv(key); ok {
trimmed := strings.TrimSpace(value)
if trimmed != "" {
return filepath.Clean(trimmed)
}
}
}
return ""
}

View File

@@ -26,7 +26,7 @@ func (a *IFlowAuthenticator) Provider() string { return "iflow" }
// RefreshLead indicates how soon before expiry a refresh should be attempted. // RefreshLead indicates how soon before expiry a refresh should be attempted.
func (a *IFlowAuthenticator) RefreshLead() *time.Duration { func (a *IFlowAuthenticator) RefreshLead() *time.Duration {
d := 3 * time.Hour d := 24 * time.Hour
return &d return &d
} }

View File

@@ -40,6 +40,8 @@ const (
refreshCheckInterval = 5 * time.Second refreshCheckInterval = 5 * time.Second
refreshPendingBackoff = time.Minute refreshPendingBackoff = time.Minute
refreshFailureBackoff = 5 * time.Minute refreshFailureBackoff = 5 * time.Minute
quotaBackoffBase = time.Second
quotaBackoffMax = 30 * time.Minute
) )
// Result captures execution outcome used to adjust auth state. // Result captures execution outcome used to adjust auth state.
@@ -532,9 +534,15 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
suspendReason = "payment_required" suspendReason = "payment_required"
shouldSuspendModel = true shouldSuspendModel = true
case 429: case 429:
next := now.Add(30 * time.Minute) cooldown, nextLevel := nextQuotaCooldown(state.Quota.BackoffLevel)
next := now.Add(cooldown)
state.NextRetryAfter = next state.NextRetryAfter = next
state.Quota = QuotaState{Exceeded: true, Reason: "quota", NextRecoverAt: next} state.Quota = QuotaState{
Exceeded: true,
Reason: "quota",
NextRecoverAt: next,
BackoffLevel: nextLevel,
}
suspendReason = "quota" suspendReason = "quota"
shouldSuspendModel = true shouldSuspendModel = true
setModelQuota = true setModelQuota = true
@@ -608,6 +616,7 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
earliestRetry := time.Time{} earliestRetry := time.Time{}
quotaExceeded := false quotaExceeded := false
quotaRecover := time.Time{} quotaRecover := time.Time{}
maxBackoffLevel := 0
for _, state := range auth.ModelStates { for _, state := range auth.ModelStates {
if state == nil { if state == nil {
continue continue
@@ -636,6 +645,9 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
if quotaRecover.IsZero() || (!state.Quota.NextRecoverAt.IsZero() && state.Quota.NextRecoverAt.Before(quotaRecover)) { if quotaRecover.IsZero() || (!state.Quota.NextRecoverAt.IsZero() && state.Quota.NextRecoverAt.Before(quotaRecover)) {
quotaRecover = state.Quota.NextRecoverAt quotaRecover = state.Quota.NextRecoverAt
} }
if state.Quota.BackoffLevel > maxBackoffLevel {
maxBackoffLevel = state.Quota.BackoffLevel
}
} }
} }
auth.Unavailable = allUnavailable auth.Unavailable = allUnavailable
@@ -648,10 +660,12 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
auth.Quota.Exceeded = true auth.Quota.Exceeded = true
auth.Quota.Reason = "quota" auth.Quota.Reason = "quota"
auth.Quota.NextRecoverAt = quotaRecover auth.Quota.NextRecoverAt = quotaRecover
auth.Quota.BackoffLevel = maxBackoffLevel
} else { } else {
auth.Quota.Exceeded = false auth.Quota.Exceeded = false
auth.Quota.Reason = "" auth.Quota.Reason = ""
auth.Quota.NextRecoverAt = time.Time{} auth.Quota.NextRecoverAt = time.Time{}
auth.Quota.BackoffLevel = 0
} }
} }
@@ -685,6 +699,7 @@ func clearAuthStateOnSuccess(auth *Auth, now time.Time) {
auth.Quota.Exceeded = false auth.Quota.Exceeded = false
auth.Quota.Reason = "" auth.Quota.Reason = ""
auth.Quota.NextRecoverAt = time.Time{} auth.Quota.NextRecoverAt = time.Time{}
auth.Quota.BackoffLevel = 0
auth.LastError = nil auth.LastError = nil
auth.NextRetryAfter = time.Time{} auth.NextRetryAfter = time.Time{}
auth.UpdatedAt = now auth.UpdatedAt = now
@@ -734,7 +749,9 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
auth.StatusMessage = "quota exhausted" auth.StatusMessage = "quota exhausted"
auth.Quota.Exceeded = true auth.Quota.Exceeded = true
auth.Quota.Reason = "quota" auth.Quota.Reason = "quota"
auth.Quota.NextRecoverAt = now.Add(30 * time.Minute) cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel)
auth.Quota.NextRecoverAt = now.Add(cooldown)
auth.Quota.BackoffLevel = nextLevel
auth.NextRetryAfter = auth.Quota.NextRecoverAt auth.NextRetryAfter = auth.Quota.NextRecoverAt
case 408, 500, 502, 503, 504: case 408, 500, 502, 503, 504:
auth.StatusMessage = "transient upstream error" auth.StatusMessage = "transient upstream error"
@@ -746,6 +763,21 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
} }
} }
// nextQuotaCooldown returns the next cooldown duration and updated backoff level for repeated quota errors.
func nextQuotaCooldown(prevLevel int) (time.Duration, int) {
if prevLevel < 0 {
prevLevel = 0
}
cooldown := quotaBackoffBase * time.Duration(1<<prevLevel)
if cooldown < quotaBackoffBase {
cooldown = quotaBackoffBase
}
if cooldown >= quotaBackoffMax {
return quotaBackoffMax, prevLevel
}
return cooldown, prevLevel + 1
}
// List returns all auth entries currently known by the manager. // List returns all auth entries currently known by the manager.
func (m *Manager) List() []*Auth { func (m *Manager) List() []*Auth {
m.mu.RLock() m.mu.RLock()

View File

@@ -65,6 +65,8 @@ type QuotaState struct {
Reason string `json:"reason,omitempty"` Reason string `json:"reason,omitempty"`
// NextRecoverAt is when the credential may become available again. // NextRecoverAt is when the credential may become available again.
NextRecoverAt time.Time `json:"next_recover_at"` NextRecoverAt time.Time `json:"next_recover_at"`
// BackoffLevel stores the progressive cooldown exponent used for rate limits.
BackoffLevel int `json:"backoff_level,omitempty"`
} }
// ModelState captures the execution state for a specific model under an auth entry. // ModelState captures the execution state for a specific model under an auth entry.

View File

@@ -16,6 +16,7 @@ type Record struct {
AuthID string AuthID string
Source string Source string
RequestedAt time.Time RequestedAt time.Time
Failed bool
Detail Detail Detail Detail
} }