Compare commits

...

13 Commits

Author SHA1 Message Date
Luis Pater
3569e5779a feat: enhance quota management with backoff levels and cooldown logic
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
2025-10-21 18:44:28 +08:00
Luis Pater
20985d1a10 Refactor executor error handling and usage reporting
- Updated the Execute methods in various executors (GeminiCLIExecutor, GeminiExecutor, IFlowExecutor, OpenAICompatExecutor, QwenExecutor) to return a response and error as named return values for improved clarity.
- Enhanced error handling by deferring failure tracking in usage reporters, ensuring that failures are reported correctly.
- Improved response body handling by ensuring proper closure and error logging for HTTP responses across all executors.
- Added failure tracking and reporting in the usage reporter to capture unsuccessful requests.
- Updated the usage logging structure to include a 'Failed' field for better tracking of request outcomes.
- Adjusted the logic in the RequestStatistics and Record methods to accommodate the new failure tracking mechanism.
2025-10-21 11:22:24 +08:00
Luis Pater
67f553806b feat: implement management asset configuration and auto-updater 2025-10-21 09:01:58 +08:00
Luis Pater
29044312a4 docs: add Subtitle Translator tool to README files 2025-10-21 02:48:08 +08:00
Luis Pater
5b3fc092ee Merge pull request #151 from VjayC/add-subtitle-translator
docs: add Subtitle Translator to projects list
2025-10-21 02:44:50 +08:00
Vijay Chimmi
792e8d09d7 docs: add Subtitle Translator to projects list 2025-10-20 11:29:18 -07:00
Luis Pater
eadccb229f Fixed: #148
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat(executor): add initial cache_helpers.go file
2025-10-20 10:17:29 +08:00
Luis Pater
fed6f3ecd7 Merge pull request #147 from router-for-me/config
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat(mgmt): support YAML config retrieval and updates via /config.yaml
2025-10-19 22:26:38 +08:00
hkfires
f8dcd707a6 feat(mgmt): support YAML config retrieval and updates via /config.yaml 2025-10-19 21:56:29 +08:00
Luis Pater
0e91e95287 Merge pull request #145 from router-for-me/path
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
feat: prefer util.WritablePath() for logs and local storage
2025-10-19 20:50:44 +08:00
Luis Pater
c5dcbc1c1a Merge pull request #146 from router-for-me/iflow
feat(iflow): add masked token logs; increase refresh lead to 24h
2025-10-19 20:49:40 +08:00
hkfires
4504ba5329 feat(iflow): add masked token logs; increase refresh lead to 24h 2025-10-19 10:56:29 +08:00
hkfires
d16599fa1d feat: prefer util.WritablePath() for logs and local storage 2025-10-19 10:19:55 +08:00
24 changed files with 765 additions and 228 deletions

View File

@@ -797,6 +797,10 @@ Those projects are based on CLIProxyAPI:
Native macOS menu bar app to use your Claude Code & ChatGPT subscriptions with AI coding tools - no API keys needed
### [Subtitle Translator](https://github.com/VjayC/SRT-Subtitle-Translator-Validator)
Browser-based tool to translate SRT subtitles using your Gemini subscription via CLIProxyAPI with automatic validation/error correction - no API keys needed
> [!NOTE]
> If you developed a project based on CLIProxyAPI, please open a PR to add it to this list.

View File

@@ -807,6 +807,10 @@ docker run --rm -p 8317:8317 -v /path/to/your/config.yaml:/CLIProxyAPI/config.ya
一个原生 macOS 菜单栏应用,让您可以使用 Claude Code & ChatGPT 订阅服务和 AI 编程工具,无需 API 密钥。
### [Subtitle Translator](https://github.com/VjayC/SRT-Subtitle-Translator-Validator)
一款基于浏览器的 SRT 字幕翻译工具,可通过 CLI 代理 API 使用您的 Gemini 订阅。内置自动验证与错误修正功能,无需 API 密钥。
> [!NOTE]
> 如果你开发了基于 CLIProxyAPI 的项目,请提交一个 PR拉取请求将其添加到此列表中。

View File

@@ -20,6 +20,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/internal/cmd"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/logging"
"github.com/router-for-me/CLIProxyAPI/v6/internal/managementasset"
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
"github.com/router-for-me/CLIProxyAPI/v6/internal/store"
_ "github.com/router-for-me/CLIProxyAPI/v6/internal/translator"
@@ -147,6 +148,7 @@ func main() {
}
return "", false
}
writableBase := util.WritablePath()
if value, ok := lookupEnv("PGSTORE_DSN", "pgstore_dsn"); ok {
usePostgresStore = true
pgStoreDSN = value
@@ -158,6 +160,13 @@ func main() {
if value, ok := lookupEnv("PGSTORE_LOCAL_PATH", "pgstore_local_path"); ok {
pgStoreLocalPath = value
}
if pgStoreLocalPath == "" {
if writableBase != "" {
pgStoreLocalPath = writableBase
} else {
pgStoreLocalPath = wd
}
}
useGitStore = false
}
if value, ok := lookupEnv("GITSTORE_GIT_URL", "gitstore_git_url"); ok {
@@ -229,11 +238,14 @@ func main() {
log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir())
}
} else if useObjectStore {
objectStoreRoot := objectStoreLocalPath
if objectStoreRoot == "" {
objectStoreRoot = wd
if objectStoreLocalPath == "" {
if writableBase != "" {
objectStoreLocalPath = writableBase
} else {
objectStoreLocalPath = wd
}
}
objectStoreRoot = filepath.Join(objectStoreRoot, "objectstore")
objectStoreRoot := filepath.Join(objectStoreLocalPath, "objectstore")
resolvedEndpoint := strings.TrimSpace(objectStoreEndpoint)
useSSL := true
if strings.Contains(resolvedEndpoint, "://") {
@@ -289,7 +301,11 @@ func main() {
}
} else if useGitStore {
if gitStoreLocalPath == "" {
gitStoreLocalPath = wd
if writableBase != "" {
gitStoreLocalPath = writableBase
} else {
gitStoreLocalPath = wd
}
}
gitStoreRoot = filepath.Join(gitStoreLocalPath, "gitstore")
authDir := filepath.Join(gitStoreRoot, "auths")
@@ -376,6 +392,7 @@ func main() {
} else {
cfg.AuthDir = resolvedAuthDir
}
managementasset.SetCurrentConfig(cfg)
// Create login options to be used in authentication flows.
options := &cmd.LoginOptions{
@@ -419,6 +436,7 @@ func main() {
return
}
// Start the main proxy service
managementasset.StartAutoUpdater(context.Background(), configFilePath)
cmd.StartService(cfg, configFilePath, password)
}
}

View File

@@ -1,13 +1,126 @@
package management
import (
"io"
"net/http"
"os"
"path/filepath"
"github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"gopkg.in/yaml.v3"
)
func (h *Handler) GetConfig(c *gin.Context) {
c.JSON(200, h.cfg)
}
func (h *Handler) GetConfigYAML(c *gin.Context) {
data, err := os.ReadFile(h.configFilePath)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "read_failed", "message": err.Error()})
return
}
var node yaml.Node
if err := yaml.Unmarshal(data, &node); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "parse_failed", "message": err.Error()})
return
}
c.Header("Content-Type", "application/yaml; charset=utf-8")
c.Header("Vary", "format, Accept")
enc := yaml.NewEncoder(c.Writer)
enc.SetIndent(2)
_ = enc.Encode(&node)
_ = enc.Close()
}
func WriteConfig(path string, data []byte) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
if _, err := f.Write(data); err != nil {
f.Close()
return err
}
if err := f.Sync(); err != nil {
f.Close()
return err
}
return f.Close()
}
func (h *Handler) PutConfigYAML(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid_yaml", "message": "cannot read request body"})
return
}
var cfg config.Config
if err := yaml.Unmarshal(body, &cfg); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid_yaml", "message": err.Error()})
return
}
// Validate config using LoadConfigOptional with optional=false to enforce parsing
tmpDir := filepath.Dir(h.configFilePath)
tmpFile, err := os.CreateTemp(tmpDir, "config-validate-*.yaml")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": err.Error()})
return
}
tempFile := tmpFile.Name()
if _, err := tmpFile.Write(body); err != nil {
tmpFile.Close()
os.Remove(tempFile)
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": err.Error()})
return
}
if err := tmpFile.Close(); err != nil {
os.Remove(tempFile)
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": err.Error()})
return
}
defer os.Remove(tempFile)
_, err = config.LoadConfigOptional(tempFile, false)
if err != nil {
c.JSON(http.StatusUnprocessableEntity, gin.H{"error": "invalid_config", "message": err.Error()})
return
}
h.mu.Lock()
defer h.mu.Unlock()
if WriteConfig(h.configFilePath, body) != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "write_failed", "message": "failed to write config"})
return
}
// Reload into handler to keep memory in sync
newCfg, err := config.LoadConfig(h.configFilePath)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "reload_failed", "message": err.Error()})
return
}
h.cfg = newCfg
c.JSON(http.StatusOK, gin.H{"ok": true, "changed": []string{"config"}})
}
// GetConfigFile returns the raw config.yaml file bytes without re-encoding.
// It preserves comments and original formatting/styles.
func (h *Handler) GetConfigFile(c *gin.Context) {
data, err := os.ReadFile(h.configFilePath)
if err != nil {
if os.IsNotExist(err) {
c.JSON(http.StatusNotFound, gin.H{"error": "not_found", "message": "config file not found"})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": "read_failed", "message": err.Error()})
return
}
c.Header("Content-Type", "application/yaml; charset=utf-8")
c.Header("Cache-Control", "no-store")
c.Header("X-Content-Type-Options", "nosniff")
// Write raw bytes as-is
_, _ = c.Writer.Write(data)
}
// Debug
func (h *Handler) GetDebug(c *gin.Context) { c.JSON(200, gin.H{"debug": h.cfg.Debug}) }
func (h *Handler) PutDebug(c *gin.Context) { h.updateBoolField(c, func(v bool) { h.cfg.Debug = v }) }

View File

@@ -13,6 +13,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
)
const (
@@ -145,6 +146,9 @@ func (h *Handler) logDirectory() string {
if h.logDir != "" {
return h.logDir
}
if base := util.WritablePath(); base != "" {
return filepath.Join(base, "logs")
}
if h.configFilePath != "" {
dir := filepath.Dir(h.configFilePath)
if dir != "" && dir != "." {

View File

@@ -13,5 +13,8 @@ func (h *Handler) GetUsageStatistics(c *gin.Context) {
if h != nil && h.usageStats != nil {
snapshot = h.usageStats.Snapshot()
}
c.JSON(http.StatusOK, gin.H{"usage": snapshot})
c.JSON(http.StatusOK, gin.H{
"usage": snapshot,
"failed_requests": snapshot.FailureCount,
})
}

View File

@@ -52,7 +52,11 @@ type serverOptionConfig struct {
type ServerOption func(*serverOptionConfig)
func defaultRequestLoggerFactory(cfg *config.Config, configPath string) logging.RequestLogger {
return logging.NewFileRequestLogger(cfg.RequestLog, "logs", filepath.Dir(configPath))
configDir := filepath.Dir(configPath)
if base := util.WritablePath(); base != "" {
return logging.NewFileRequestLogger(cfg.RequestLog, filepath.Join(base, "logs"), configDir)
}
return logging.NewFileRequestLogger(cfg.RequestLog, "logs", configDir)
}
// WithMiddleware appends additional Gin middleware during server construction.
@@ -228,12 +232,17 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
// Save initial YAML snapshot
s.oldConfigYaml, _ = yaml.Marshal(cfg)
s.applyAccessConfig(nil, cfg)
managementasset.SetCurrentConfig(cfg)
// Initialize management handler
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
if optionState.localPassword != "" {
s.mgmt.SetLocalPassword(optionState.localPassword)
}
s.mgmt.SetLogDirectory(filepath.Join(s.currentPath, "logs"))
logDir := filepath.Join(s.currentPath, "logs")
if base := util.WritablePath(); base != "" {
logDir = filepath.Join(base, "logs")
}
s.mgmt.SetLogDirectory(logDir)
s.localPassword = optionState.localPassword
// Setup routes
@@ -376,6 +385,8 @@ func (s *Server) registerManagementRoutes() {
{
mgmt.GET("/usage", s.mgmt.GetUsageStatistics)
mgmt.GET("/config", s.mgmt.GetConfig)
mgmt.PUT("/config.yaml", s.mgmt.PutConfigYAML)
mgmt.GET("/config.yaml", s.mgmt.GetConfigFile)
mgmt.GET("/debug", s.mgmt.GetDebug)
mgmt.PUT("/debug", s.mgmt.PutDebug)
@@ -749,6 +760,7 @@ func (s *Server) UpdateClients(cfg *config.Config) {
s.applyAccessConfig(oldCfg, cfg)
s.cfg = cfg
managementasset.SetCurrentConfig(cfg)
// Save YAML snapshot for next comparison
s.oldConfigYaml, _ = yaml.Marshal(cfg)
s.handlers.UpdateClients(&cfg.SDKConfig)

View File

@@ -10,6 +10,7 @@ import (
"sync"
"github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
log "github.com/sirupsen/logrus"
"gopkg.in/natefinch/lumberjack.v2"
)
@@ -72,7 +73,10 @@ func ConfigureLogOutput(loggingToFile bool) error {
defer writerMu.Unlock()
if loggingToFile {
const logDir = "logs"
logDir := "logs"
if base := util.WritablePath(); base != "" {
logDir = filepath.Join(base, "logs")
}
if err := os.MkdirAll(logDir, 0o755); err != nil {
return fmt.Errorf("logging: failed to create log directory: %w", err)
}

View File

@@ -13,8 +13,10 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config"
log "github.com/sirupsen/logrus"
@@ -33,8 +35,83 @@ const ManagementFileName = managementAssetName
var (
lastUpdateCheckMu sync.Mutex
lastUpdateCheckTime time.Time
currentConfigPtr atomic.Pointer[config.Config]
disableControlPanel atomic.Bool
schedulerOnce sync.Once
schedulerConfigPath atomic.Value
)
// SetCurrentConfig stores the latest configuration snapshot for management asset decisions.
func SetCurrentConfig(cfg *config.Config) {
if cfg == nil {
currentConfigPtr.Store(nil)
return
}
prevDisabled := disableControlPanel.Load()
currentConfigPtr.Store(cfg)
disableControlPanel.Store(cfg.RemoteManagement.DisableControlPanel)
if prevDisabled && !cfg.RemoteManagement.DisableControlPanel {
lastUpdateCheckMu.Lock()
lastUpdateCheckTime = time.Time{}
lastUpdateCheckMu.Unlock()
}
}
// StartAutoUpdater launches a background goroutine that periodically ensures the management asset is up to date.
// It respects the disable-control-panel flag on every iteration and supports hot-reloaded configurations.
func StartAutoUpdater(ctx context.Context, configFilePath string) {
configFilePath = strings.TrimSpace(configFilePath)
if configFilePath == "" {
log.Debug("management asset auto-updater skipped: empty config path")
return
}
schedulerConfigPath.Store(configFilePath)
schedulerOnce.Do(func() {
go runAutoUpdater(ctx)
})
}
func runAutoUpdater(ctx context.Context) {
if ctx == nil {
ctx = context.Background()
}
ticker := time.NewTicker(updateCheckInterval)
defer ticker.Stop()
runOnce := func() {
cfg := currentConfigPtr.Load()
if cfg == nil {
log.Debug("management asset auto-updater skipped: config not yet available")
return
}
if disableControlPanel.Load() {
log.Debug("management asset auto-updater skipped: control panel disabled")
return
}
configPath, _ := schedulerConfigPath.Load().(string)
staticDir := StaticDir(configPath)
EnsureLatestManagementHTML(ctx, staticDir, cfg.ProxyURL)
}
runOnce()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
runOnce()
}
}
}
func newHTTPClient(proxyURL string) *http.Client {
client := &http.Client{Timeout: 15 * time.Second}
@@ -64,6 +141,10 @@ func StaticDir(configFilePath string) string {
return cleaned
}
if writable := util.WritablePath(); writable != "" {
return filepath.Join(writable, "static")
}
configFilePath = strings.TrimSpace(configFilePath)
if configFilePath == "" {
return ""
@@ -105,6 +186,11 @@ func EnsureLatestManagementHTML(ctx context.Context, staticDir string, proxyURL
ctx = context.Background()
}
if disableControlPanel.Load() {
log.Debug("management asset sync skipped: control panel disabled by configuration")
return
}
staticDir = strings.TrimSpace(staticDir)
if staticDir == "" {
log.Debug("management asset sync skipped: empty static directory")

View File

@@ -0,0 +1,10 @@
package executor
import "time"
type codexCache struct {
ID string
Expire time.Time
}
var codexCacheMap = map[string]codexCache{}

View File

@@ -36,13 +36,14 @@ func (e *ClaudeExecutor) Identifier() string { return "claude" }
func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := claudeCreds(auth)
if baseURL == "" {
baseURL = "https://api.anthropic.com"
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("claude")
// Use streaming translation to preserve function calling, except for claude.
@@ -56,7 +57,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
applyClaudeHeaders(httpReq, apiKey, false)
var authID, authLabel, authType, authValue string
@@ -78,30 +79,31 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() {
if errClose := resp.Body.Close(); errClose != nil {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
reader := io.Reader(resp.Body)
reader := io.Reader(httpResp.Body)
var decoder *zstd.Decoder
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) {
decoder, err = zstd.NewReader(resp.Body)
if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) {
decoder, err = zstd.NewReader(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err)
return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err)
}
reader = decoder
defer decoder.Close()
@@ -109,7 +111,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
data, err := io.ReadAll(reader)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
if stream {
@@ -124,16 +126,18 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
}
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := claudeCreds(auth)
if baseURL == "" {
baseURL = "https://api.anthropic.com"
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("claude")
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
@@ -164,27 +168,35 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, _ := io.ReadAll(resp.Body)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("response body close error: %v", errClose)
}
}()
// If from == to (Claude → Claude), directly forward the SSE stream without translation
if from == to {
scanner := bufio.NewScanner(resp.Body)
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
for scanner.Scan() {
@@ -199,15 +211,16 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
cloned[len(line)] = '\n'
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
}
if err = scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
return
}
// For other formats, use translation
scanner := bufio.NewScanner(resp.Body)
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -222,12 +235,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
if err = scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -39,13 +39,14 @@ func (e *CodexExecutor) Identifier() string { return "codex" }
func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := codexCreds(auth)
if baseURL == "" {
baseURL = "https://chatgpt.com/backend-api/codex"
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("codex")
@@ -78,10 +79,30 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
body, _ = sjson.SetBytes(body, "stream", true)
body, _ = sjson.DeleteBytes(body, "previous_response_id")
additionalHeaders := make(map[string]string)
if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() {
var cache codexCache
var hasKey bool
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) {
cache = codexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
codexCacheMap[key] = cache
}
additionalHeaders["Conversation_id"] = cache.ID
additionalHeaders["Session_id"] = cache.ID
body, _ = sjson.SetBytes(body, "prompt_cache_key", cache.ID)
}
}
url := strings.TrimSuffix(baseURL, "/") + "/responses"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
applyCodexHeaders(httpReq, auth, apiKey)
var authID, authLabel, authType, authValue string
@@ -90,6 +111,9 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
for k, v := range additionalHeaders {
httpReq.Header.Set(k, v)
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: url,
Method: http.MethodPost,
@@ -101,25 +125,29 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
AuthType: authType,
AuthValue: authValue,
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("codex executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
data, err := io.ReadAll(resp.Body)
data, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
@@ -140,18 +168,21 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
return cliproxyexecutor.Response{}, statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
return resp, err
}
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := codexCreds(auth)
if baseURL == "" {
baseURL = "https://chatgpt.com/backend-api/codex"
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("codex")
@@ -183,6 +214,26 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
body, _ = sjson.DeleteBytes(body, "previous_response_id")
additionalHeaders := make(map[string]string)
if from == "claude" {
userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id")
if userIDResult.Exists() {
var cache codexCache
var hasKey bool
key := fmt.Sprintf("%s-%s", req.Model, userIDResult.String())
if cache, hasKey = codexCacheMap[key]; !hasKey || cache.Expire.Before(time.Now()) {
cache = codexCache{
ID: uuid.New().String(),
Expire: time.Now().Add(1 * time.Hour),
}
codexCacheMap[key] = cache
}
additionalHeaders["Conversation_id"] = cache.ID
additionalHeaders["Session_id"] = cache.ID
body, _ = sjson.SetBytes(body, "prompt_cache_key", cache.ID)
}
}
url := strings.TrimSuffix(baseURL, "/") + "/responses"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
@@ -195,6 +246,9 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
for k, v := range additionalHeaders {
httpReq.Header.Set(k, v)
}
recordAPIRequest(ctx, e.cfg, upstreamRequestLog{
URL: url,
Method: http.MethodPost,
@@ -208,28 +262,36 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, readErr := io.ReadAll(resp.Body)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
data, readErr := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("codex executor: close response body error: %v", errClose)
}
if readErr != nil {
recordAPIResponseError(ctx, e.cfg, readErr)
return nil, readErr
}
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
appendAPIResponseChunk(ctx, e.cfg, data)
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
scanner := bufio.NewScanner(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("codex executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -251,12 +313,13 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
if err = scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -51,12 +51,13 @@ func (e *GeminiCLIExecutor) Identifier() string { return "gemini-cli" }
func (e *GeminiCLIExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("gemini-cli")
@@ -104,7 +105,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
tok, errTok := tokenSource.Token()
if errTok != nil {
return cliproxyexecutor.Response{}, errTok
err = errTok
return resp, err
}
updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
@@ -115,7 +117,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
if errReq != nil {
return cliproxyexecutor.Response{}, errReq
err = errReq
return resp, err
}
reqHTTP.Header.Set("Content-Type", "application/json")
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
@@ -133,44 +136,60 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
AuthValue: authValue,
})
resp, errDo := httpClient.Do(reqHTTP)
httpResp, errDo := httpClient.Do(reqHTTP)
if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo)
return cliproxyexecutor.Response{}, errDo
err = errDo
return resp, err
}
data, errRead := io.ReadAll(resp.Body)
_ = resp.Body.Close()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
data, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini cli executor: close response body error: %v", errClose)
}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead)
return cliproxyexecutor.Response{}, errRead
err = errRead
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
if httpResp.StatusCode >= 200 && httpResp.StatusCode < 300 {
reporter.publish(ctx, parseGeminiCLIUsage(data))
var param any
out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
lastStatus = resp.StatusCode
lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), data...)
if resp.StatusCode != 429 {
break
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
if httpResp.StatusCode == 429 {
continue
}
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return resp, err
}
if len(lastBody) > 0 {
appendAPIResponseChunk(ctx, e.cfg, lastBody)
}
return cliproxyexecutor.Response{}, statusErr{code: lastStatus, msg: string(lastBody)}
if lastStatus == 0 {
lastStatus = 429
}
err = statusErr{code: lastStatus, msg: string(lastBody)}
return resp, err
}
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
if err != nil {
return nil, err
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("gemini-cli")
@@ -207,7 +226,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
tok, errTok := tokenSource.Token()
if errTok != nil {
return nil, errTok
err = errTok
return nil, err
}
updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
@@ -220,7 +240,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
if errReq != nil {
return nil, errReq
err = errReq
return nil, err
}
reqHTTP.Header.Set("Content-Type", "application/json")
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
@@ -238,33 +259,43 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
AuthValue: authValue,
})
resp, errDo := httpClient.Do(reqHTTP)
httpResp, errDo := httpClient.Do(reqHTTP)
if errDo != nil {
recordAPIResponseError(ctx, e.cfg, errDo)
return nil, errDo
err = errDo
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
data, errRead := io.ReadAll(resp.Body)
_ = resp.Body.Close()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
data, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini cli executor: close response body error: %v", errClose)
}
if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead)
return nil, errRead
err = errRead
return nil, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
lastStatus = resp.StatusCode
lastStatus = httpResp.StatusCode
lastBody = append([]byte(nil), data...)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(data))
if resp.StatusCode == 429 {
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
if httpResp.StatusCode == 429 {
continue
}
return nil, statusErr{code: resp.StatusCode, msg: string(data)}
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func(resp *http.Response, reqBody []byte, attempt string) {
defer close(out)
defer func() { _ = resp.Body.Close() }()
defer func() {
if errClose := resp.Body.Close(); errClose != nil {
log.Errorf("gemini cli executor: close response body error: %v", errClose)
}
}()
if opts.Alt == "" {
scanner := bufio.NewScanner(resp.Body)
buf := make([]byte, 20_971_520)
@@ -290,6 +321,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
return
@@ -298,6 +330,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
data, errRead := io.ReadAll(resp.Body)
if errRead != nil {
recordAPIResponseError(ctx, e.cfg, errRead)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errRead}
return
}
@@ -313,15 +346,19 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
for i := range segments {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(segments[i])}
}
}(resp, append([]byte(nil), payload...), attemptModel)
}(httpResp, append([]byte(nil), payload...), attemptModel)
return out, nil
return stream, nil
}
if len(lastBody) > 0 {
appendAPIResponseChunk(ctx, e.cfg, lastBody)
}
if lastStatus == 0 {
lastStatus = 429
}
return nil, statusErr{code: lastStatus, msg: string(lastBody)}
err = statusErr{code: lastStatus, msg: string(lastBody)}
return nil, err
}
func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -68,10 +68,11 @@ func (e *GeminiExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) e
// Returns:
// - cliproxyexecutor.Response: The response from the API
// - error: An error if the request fails
func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, bearer := geminiCreds(auth)
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
// Official Gemini API via API key or OAuth bearer
from := opts.SourceFormat
@@ -98,7 +99,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
httpReq.Header.Set("Content-Type", "application/json")
if apiKey != "" {
@@ -125,35 +126,42 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
data, err := io.ReadAll(resp.Body)
data, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseGeminiUsage(data))
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, bearer := geminiCreds(auth)
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("gemini")
@@ -202,24 +210,32 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, _ := io.ReadAll(resp.Body)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
scanner := bufio.NewScanner(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("gemini executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -238,12 +254,13 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
for i := range lines {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])}
}
if err = scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -12,6 +12,7 @@ import (
iflowauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/iflow"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
@@ -40,16 +41,18 @@ func (e *IFlowExecutor) Identifier() string { return "iflow" }
func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
// Execute performs a non-streaming chat completion request.
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
apiKey, baseURL := iflowCreds(auth)
if strings.TrimSpace(apiKey) == "" {
return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: missing api key")
err = fmt.Errorf("iflow executor: missing api key")
return resp, err
}
if baseURL == "" {
baseURL = iflowauth.DefaultAPIBaseURL
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -59,7 +62,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
applyIFlowHeaders(httpReq, apiKey, false)
var authID, authLabel, authType, authValue string
@@ -81,45 +84,53 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("iflow request error: status %d body %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
data, err := io.ReadAll(resp.Body)
data, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseOpenAIUsage(data))
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
// ExecuteStream performs a streaming chat completion request.
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
apiKey, baseURL := iflowCreds(auth)
if strings.TrimSpace(apiKey) == "" {
return nil, fmt.Errorf("iflow executor: missing api key")
err = fmt.Errorf("iflow executor: missing api key")
return nil, err
}
if baseURL == "" {
baseURL = iflowauth.DefaultAPIBaseURL
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -157,27 +168,35 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, _ := io.ReadAll(resp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("iflow streaming error: status %d body %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
data, _ := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
appendAPIResponseChunk(ctx, e.cfg, data)
log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, string(data))
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("iflow executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(resp.Body)
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -192,13 +211,14 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
if err := scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
// CountTokens is not implemented for iFlow.
@@ -214,18 +234,28 @@ func (e *IFlowExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*
}
refreshToken := ""
oldAccessToken := ""
if auth.Metadata != nil {
if v, ok := auth.Metadata["refresh_token"].(string); ok {
refreshToken = strings.TrimSpace(v)
}
if v, ok := auth.Metadata["access_token"].(string); ok {
oldAccessToken = strings.TrimSpace(v)
}
}
if refreshToken == "" {
return auth, nil
}
// Log the old access token (masked) before refresh
if oldAccessToken != "" {
log.Debugf("iflow executor: refreshing access token, old: %s", util.HideAPIKey(oldAccessToken))
}
svc := iflowauth.NewIFlowAuth(e.cfg)
tokenData, err := svc.RefreshTokens(ctx, refreshToken)
if err != nil {
log.Errorf("iflow executor: token refresh failed: %v", err)
return nil, err
}
@@ -243,6 +273,9 @@ func (e *IFlowExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*
auth.Metadata["type"] = "iflow"
auth.Metadata["last_refresh"] = time.Now().Format(time.RFC3339)
// Log the new access token (masked) after successful refresh
log.Debugf("iflow executor: token refresh successful, new: %s", util.HideAPIKey(tokenData.AccessToken))
if auth.Attributes == nil {
auth.Attributes = make(map[string]string)
}

View File

@@ -38,12 +38,15 @@ func (e *OpenAICompatExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.A
return nil
}
func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
baseURL, apiKey := e.resolveCredentials(auth)
if baseURL == "" {
return cliproxyexecutor.Response{}, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
return
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
// Translate inbound request to OpenAI format
from := opts.SourceFormat
@@ -56,7 +59,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
httpReq.Header.Set("Content-Type", "application/json")
if apiKey != "" {
@@ -82,38 +85,47 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
body, err := io.ReadAll(resp.Body)
body, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, body)
reporter.publish(ctx, parseOpenAIUsage(body))
// Translate response back to source format when needed
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
baseURL, apiKey := e.resolveCredentials(auth)
if baseURL == "" {
return nil, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
return nil, err
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
@@ -152,24 +164,32 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, _ := io.ReadAll(resp.Body)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
scanner := bufio.NewScanner(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("openai compat executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -189,12 +209,13 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
if err = scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -38,13 +38,14 @@ func (e *QwenExecutor) Identifier() string { return "qwen" }
func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
token, baseURL := qwenCreds(auth)
if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1"
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -53,7 +54,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return cliproxyexecutor.Response{}, err
return resp, err
}
applyQwenHeaders(httpReq, token, false)
var authID, authLabel, authType, authValue string
@@ -75,38 +76,45 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
}()
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return resp, err
}
data, err := io.ReadAll(resp.Body)
data, err := io.ReadAll(httpResp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return cliproxyexecutor.Response{}, err
return resp, err
}
appendAPIResponseChunk(ctx, e.cfg, data)
reporter.publish(ctx, parseOpenAIUsage(data))
var param any
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, &param)
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
resp = cliproxyexecutor.Response{Payload: []byte(out)}
return resp, nil
}
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
token, baseURL := qwenCreds(auth)
if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1"
}
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
defer reporter.trackFailure(ctx, &err)
from := opts.SourceFormat
to := sdktranslator.FromString("openai")
@@ -145,24 +153,32 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
})
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
resp, err := httpClient.Do(httpReq)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() { _ = resp.Body.Close() }()
b, _ := io.ReadAll(resp.Body)
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
appendAPIResponseChunk(ctx, e.cfg, b)
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
stream = out
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
scanner := bufio.NewScanner(resp.Body)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
var param any
@@ -177,12 +193,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
if err = scanner.Err(); err != nil {
recordAPIResponseError(ctx, e.cfg, err)
out <- cliproxyexecutor.StreamChunk{Err: err}
doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone([]byte("[DONE]")), &param)
for i := range doneChunks {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])}
}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return out, nil
return stream, nil
}
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {

View File

@@ -41,6 +41,23 @@ func newUsageReporter(ctx context.Context, provider, model string, auth *cliprox
}
func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
r.publishWithOutcome(ctx, detail, false)
}
func (r *usageReporter) publishFailure(ctx context.Context) {
r.publishWithOutcome(ctx, usage.Detail{}, true)
}
func (r *usageReporter) trackFailure(ctx context.Context, errPtr *error) {
if r == nil || errPtr == nil {
return
}
if *errPtr != nil {
r.publishFailure(ctx)
}
}
func (r *usageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool) {
if r == nil {
return
}
@@ -50,7 +67,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
detail.TotalTokens = total
}
}
if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 {
if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 && !failed {
return
}
r.once.Do(func() {
@@ -61,6 +78,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
APIKey: r.apiKey,
AuthID: r.authID,
RequestedAt: r.requestedAt,
Failed: failed,
Detail: detail,
})
})

View File

@@ -91,6 +91,7 @@ type RequestDetail struct {
Timestamp time.Time `json:"timestamp"`
Source string `json:"source"`
Tokens TokenStats `json:"tokens"`
Failed bool `json:"failed"`
}
// TokenStats captures the token usage breakdown for a request.
@@ -165,7 +166,11 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record)
if statsKey == "" {
statsKey = resolveAPIIdentifier(ctx, record)
}
success := resolveSuccess(ctx)
failed := record.Failed
if !failed {
failed = !resolveSuccess(ctx)
}
success := !failed
modelName := record.Model
if modelName == "" {
modelName = "unknown"
@@ -193,6 +198,7 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record)
Timestamp: timestamp,
Source: record.Source,
Tokens: detail,
Failed: failed,
})
s.requestsByDay[dayKey]++

View File

@@ -84,3 +84,17 @@ func CountAuthFiles(authDir string) int {
}
return count
}
// WritablePath returns the cleaned WRITABLE_PATH environment variable when it is set.
// It accepts both uppercase and lowercase variants for compatibility with existing conventions.
func WritablePath() string {
for _, key := range []string{"WRITABLE_PATH", "writable_path"} {
if value, ok := os.LookupEnv(key); ok {
trimmed := strings.TrimSpace(value)
if trimmed != "" {
return filepath.Clean(trimmed)
}
}
}
return ""
}

View File

@@ -26,7 +26,7 @@ func (a *IFlowAuthenticator) Provider() string { return "iflow" }
// RefreshLead indicates how soon before expiry a refresh should be attempted.
func (a *IFlowAuthenticator) RefreshLead() *time.Duration {
d := 3 * time.Hour
d := 24 * time.Hour
return &d
}

View File

@@ -40,6 +40,8 @@ const (
refreshCheckInterval = 5 * time.Second
refreshPendingBackoff = time.Minute
refreshFailureBackoff = 5 * time.Minute
quotaBackoffBase = time.Second
quotaBackoffMax = 30 * time.Minute
)
// Result captures execution outcome used to adjust auth state.
@@ -532,9 +534,15 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
suspendReason = "payment_required"
shouldSuspendModel = true
case 429:
next := now.Add(30 * time.Minute)
cooldown, nextLevel := nextQuotaCooldown(state.Quota.BackoffLevel)
next := now.Add(cooldown)
state.NextRetryAfter = next
state.Quota = QuotaState{Exceeded: true, Reason: "quota", NextRecoverAt: next}
state.Quota = QuotaState{
Exceeded: true,
Reason: "quota",
NextRecoverAt: next,
BackoffLevel: nextLevel,
}
suspendReason = "quota"
shouldSuspendModel = true
setModelQuota = true
@@ -608,6 +616,7 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
earliestRetry := time.Time{}
quotaExceeded := false
quotaRecover := time.Time{}
maxBackoffLevel := 0
for _, state := range auth.ModelStates {
if state == nil {
continue
@@ -636,6 +645,9 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
if quotaRecover.IsZero() || (!state.Quota.NextRecoverAt.IsZero() && state.Quota.NextRecoverAt.Before(quotaRecover)) {
quotaRecover = state.Quota.NextRecoverAt
}
if state.Quota.BackoffLevel > maxBackoffLevel {
maxBackoffLevel = state.Quota.BackoffLevel
}
}
}
auth.Unavailable = allUnavailable
@@ -648,10 +660,12 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
auth.Quota.Exceeded = true
auth.Quota.Reason = "quota"
auth.Quota.NextRecoverAt = quotaRecover
auth.Quota.BackoffLevel = maxBackoffLevel
} else {
auth.Quota.Exceeded = false
auth.Quota.Reason = ""
auth.Quota.NextRecoverAt = time.Time{}
auth.Quota.BackoffLevel = 0
}
}
@@ -685,6 +699,7 @@ func clearAuthStateOnSuccess(auth *Auth, now time.Time) {
auth.Quota.Exceeded = false
auth.Quota.Reason = ""
auth.Quota.NextRecoverAt = time.Time{}
auth.Quota.BackoffLevel = 0
auth.LastError = nil
auth.NextRetryAfter = time.Time{}
auth.UpdatedAt = now
@@ -734,7 +749,9 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
auth.StatusMessage = "quota exhausted"
auth.Quota.Exceeded = true
auth.Quota.Reason = "quota"
auth.Quota.NextRecoverAt = now.Add(30 * time.Minute)
cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel)
auth.Quota.NextRecoverAt = now.Add(cooldown)
auth.Quota.BackoffLevel = nextLevel
auth.NextRetryAfter = auth.Quota.NextRecoverAt
case 408, 500, 502, 503, 504:
auth.StatusMessage = "transient upstream error"
@@ -746,6 +763,21 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
}
}
// nextQuotaCooldown returns the next cooldown duration and updated backoff level for repeated quota errors.
func nextQuotaCooldown(prevLevel int) (time.Duration, int) {
if prevLevel < 0 {
prevLevel = 0
}
cooldown := quotaBackoffBase * time.Duration(1<<prevLevel)
if cooldown < quotaBackoffBase {
cooldown = quotaBackoffBase
}
if cooldown >= quotaBackoffMax {
return quotaBackoffMax, prevLevel
}
return cooldown, prevLevel + 1
}
// List returns all auth entries currently known by the manager.
func (m *Manager) List() []*Auth {
m.mu.RLock()

View File

@@ -65,6 +65,8 @@ type QuotaState struct {
Reason string `json:"reason,omitempty"`
// NextRecoverAt is when the credential may become available again.
NextRecoverAt time.Time `json:"next_recover_at"`
// BackoffLevel stores the progressive cooldown exponent used for rate limits.
BackoffLevel int `json:"backoff_level,omitempty"`
}
// ModelState captures the execution state for a specific model under an auth entry.

View File

@@ -16,6 +16,7 @@ type Record struct {
AuthID string
Source string
RequestedAt time.Time
Failed bool
Detail Detail
}