Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3569e5779a | ||
|
|
20985d1a10 | ||
|
|
67f553806b | ||
|
|
29044312a4 | ||
|
|
5b3fc092ee | ||
|
|
792e8d09d7 |
@@ -797,6 +797,10 @@ Those projects are based on CLIProxyAPI:
|
|||||||
|
|
||||||
Native macOS menu bar app to use your Claude Code & ChatGPT subscriptions with AI coding tools - no API keys needed
|
Native macOS menu bar app to use your Claude Code & ChatGPT subscriptions with AI coding tools - no API keys needed
|
||||||
|
|
||||||
|
### [Subtitle Translator](https://github.com/VjayC/SRT-Subtitle-Translator-Validator)
|
||||||
|
|
||||||
|
Browser-based tool to translate SRT subtitles using your Gemini subscription via CLIProxyAPI with automatic validation/error correction - no API keys needed
|
||||||
|
|
||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
> If you developed a project based on CLIProxyAPI, please open a PR to add it to this list.
|
> If you developed a project based on CLIProxyAPI, please open a PR to add it to this list.
|
||||||
|
|
||||||
|
|||||||
@@ -807,6 +807,10 @@ docker run --rm -p 8317:8317 -v /path/to/your/config.yaml:/CLIProxyAPI/config.ya
|
|||||||
|
|
||||||
一个原生 macOS 菜单栏应用,让您可以使用 Claude Code & ChatGPT 订阅服务和 AI 编程工具,无需 API 密钥。
|
一个原生 macOS 菜单栏应用,让您可以使用 Claude Code & ChatGPT 订阅服务和 AI 编程工具,无需 API 密钥。
|
||||||
|
|
||||||
|
### [Subtitle Translator](https://github.com/VjayC/SRT-Subtitle-Translator-Validator)
|
||||||
|
|
||||||
|
一款基于浏览器的 SRT 字幕翻译工具,可通过 CLI 代理 API 使用您的 Gemini 订阅。内置自动验证与错误修正功能,无需 API 密钥。
|
||||||
|
|
||||||
> [!NOTE]
|
> [!NOTE]
|
||||||
> 如果你开发了基于 CLIProxyAPI 的项目,请提交一个 PR(拉取请求)将其添加到此列表中。
|
> 如果你开发了基于 CLIProxyAPI 的项目,请提交一个 PR(拉取请求)将其添加到此列表中。
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/cmd"
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/cmd"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/logging"
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/logging"
|
||||||
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/managementasset"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/misc"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/store"
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/store"
|
||||||
_ "github.com/router-for-me/CLIProxyAPI/v6/internal/translator"
|
_ "github.com/router-for-me/CLIProxyAPI/v6/internal/translator"
|
||||||
@@ -391,6 +392,7 @@ func main() {
|
|||||||
} else {
|
} else {
|
||||||
cfg.AuthDir = resolvedAuthDir
|
cfg.AuthDir = resolvedAuthDir
|
||||||
}
|
}
|
||||||
|
managementasset.SetCurrentConfig(cfg)
|
||||||
|
|
||||||
// Create login options to be used in authentication flows.
|
// Create login options to be used in authentication flows.
|
||||||
options := &cmd.LoginOptions{
|
options := &cmd.LoginOptions{
|
||||||
@@ -434,6 +436,7 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Start the main proxy service
|
// Start the main proxy service
|
||||||
|
managementasset.StartAutoUpdater(context.Background(), configFilePath)
|
||||||
cmd.StartService(cfg, configFilePath, password)
|
cmd.StartService(cfg, configFilePath, password)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,5 +13,8 @@ func (h *Handler) GetUsageStatistics(c *gin.Context) {
|
|||||||
if h != nil && h.usageStats != nil {
|
if h != nil && h.usageStats != nil {
|
||||||
snapshot = h.usageStats.Snapshot()
|
snapshot = h.usageStats.Snapshot()
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, gin.H{"usage": snapshot})
|
c.JSON(http.StatusOK, gin.H{
|
||||||
|
"usage": snapshot,
|
||||||
|
"failed_requests": snapshot.FailureCount,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -232,6 +232,7 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
|
|||||||
// Save initial YAML snapshot
|
// Save initial YAML snapshot
|
||||||
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||||
s.applyAccessConfig(nil, cfg)
|
s.applyAccessConfig(nil, cfg)
|
||||||
|
managementasset.SetCurrentConfig(cfg)
|
||||||
// Initialize management handler
|
// Initialize management handler
|
||||||
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
|
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
|
||||||
if optionState.localPassword != "" {
|
if optionState.localPassword != "" {
|
||||||
@@ -759,6 +760,7 @@ func (s *Server) UpdateClients(cfg *config.Config) {
|
|||||||
|
|
||||||
s.applyAccessConfig(oldCfg, cfg)
|
s.applyAccessConfig(oldCfg, cfg)
|
||||||
s.cfg = cfg
|
s.cfg = cfg
|
||||||
|
managementasset.SetCurrentConfig(cfg)
|
||||||
// Save YAML snapshot for next comparison
|
// Save YAML snapshot for next comparison
|
||||||
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||||
s.handlers.UpdateClients(&cfg.SDKConfig)
|
s.handlers.UpdateClients(&cfg.SDKConfig)
|
||||||
|
|||||||
@@ -13,8 +13,10 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
||||||
sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config"
|
sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -33,8 +35,83 @@ const ManagementFileName = managementAssetName
|
|||||||
var (
|
var (
|
||||||
lastUpdateCheckMu sync.Mutex
|
lastUpdateCheckMu sync.Mutex
|
||||||
lastUpdateCheckTime time.Time
|
lastUpdateCheckTime time.Time
|
||||||
|
|
||||||
|
currentConfigPtr atomic.Pointer[config.Config]
|
||||||
|
disableControlPanel atomic.Bool
|
||||||
|
schedulerOnce sync.Once
|
||||||
|
schedulerConfigPath atomic.Value
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SetCurrentConfig stores the latest configuration snapshot for management asset decisions.
|
||||||
|
func SetCurrentConfig(cfg *config.Config) {
|
||||||
|
if cfg == nil {
|
||||||
|
currentConfigPtr.Store(nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
prevDisabled := disableControlPanel.Load()
|
||||||
|
currentConfigPtr.Store(cfg)
|
||||||
|
disableControlPanel.Store(cfg.RemoteManagement.DisableControlPanel)
|
||||||
|
|
||||||
|
if prevDisabled && !cfg.RemoteManagement.DisableControlPanel {
|
||||||
|
lastUpdateCheckMu.Lock()
|
||||||
|
lastUpdateCheckTime = time.Time{}
|
||||||
|
lastUpdateCheckMu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartAutoUpdater launches a background goroutine that periodically ensures the management asset is up to date.
|
||||||
|
// It respects the disable-control-panel flag on every iteration and supports hot-reloaded configurations.
|
||||||
|
func StartAutoUpdater(ctx context.Context, configFilePath string) {
|
||||||
|
configFilePath = strings.TrimSpace(configFilePath)
|
||||||
|
if configFilePath == "" {
|
||||||
|
log.Debug("management asset auto-updater skipped: empty config path")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
schedulerConfigPath.Store(configFilePath)
|
||||||
|
|
||||||
|
schedulerOnce.Do(func() {
|
||||||
|
go runAutoUpdater(ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func runAutoUpdater(ctx context.Context) {
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(updateCheckInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
runOnce := func() {
|
||||||
|
cfg := currentConfigPtr.Load()
|
||||||
|
if cfg == nil {
|
||||||
|
log.Debug("management asset auto-updater skipped: config not yet available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if disableControlPanel.Load() {
|
||||||
|
log.Debug("management asset auto-updater skipped: control panel disabled")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
configPath, _ := schedulerConfigPath.Load().(string)
|
||||||
|
staticDir := StaticDir(configPath)
|
||||||
|
EnsureLatestManagementHTML(ctx, staticDir, cfg.ProxyURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
runOnce()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
runOnce()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newHTTPClient(proxyURL string) *http.Client {
|
func newHTTPClient(proxyURL string) *http.Client {
|
||||||
client := &http.Client{Timeout: 15 * time.Second}
|
client := &http.Client{Timeout: 15 * time.Second}
|
||||||
|
|
||||||
@@ -109,6 +186,11 @@ func EnsureLatestManagementHTML(ctx context.Context, staticDir string, proxyURL
|
|||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if disableControlPanel.Load() {
|
||||||
|
log.Debug("management asset sync skipped: control panel disabled by configuration")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
staticDir = strings.TrimSpace(staticDir)
|
staticDir = strings.TrimSpace(staticDir)
|
||||||
if staticDir == "" {
|
if staticDir == "" {
|
||||||
log.Debug("management asset sync skipped: empty static directory")
|
log.Debug("management asset sync skipped: empty static directory")
|
||||||
|
|||||||
@@ -36,13 +36,14 @@ func (e *ClaudeExecutor) Identifier() string { return "claude" }
|
|||||||
|
|
||||||
func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
func (e *ClaudeExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
||||||
|
|
||||||
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
apiKey, baseURL := claudeCreds(auth)
|
apiKey, baseURL := claudeCreds(auth)
|
||||||
|
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = "https://api.anthropic.com"
|
baseURL = "https://api.anthropic.com"
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("claude")
|
to := sdktranslator.FromString("claude")
|
||||||
// Use streaming translation to preserve function calling, except for claude.
|
// Use streaming translation to preserve function calling, except for claude.
|
||||||
@@ -56,7 +57,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
|
url := fmt.Sprintf("%s/v1/messages?beta=true", baseURL)
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
applyClaudeHeaders(httpReq, apiKey, false)
|
applyClaudeHeaders(httpReq, apiKey, false)
|
||||||
var authID, authLabel, authType, authValue string
|
var authID, authLabel, authType, authValue string
|
||||||
@@ -78,30 +79,31 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if errClose := resp.Body.Close(); errClose != nil {
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
log.Errorf("response body close error: %v", errClose)
|
log.Errorf("response body close error: %v", errClose)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
b, _ := io.ReadAll(resp.Body)
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
reader := io.Reader(resp.Body)
|
reader := io.Reader(httpResp.Body)
|
||||||
var decoder *zstd.Decoder
|
var decoder *zstd.Decoder
|
||||||
if hasZSTDEcoding(resp.Header.Get("Content-Encoding")) {
|
if hasZSTDEcoding(httpResp.Header.Get("Content-Encoding")) {
|
||||||
decoder, err = zstd.NewReader(resp.Body)
|
decoder, err = zstd.NewReader(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, fmt.Errorf("failed to initialize zstd decoder: %w", err)
|
return resp, fmt.Errorf("failed to initialize zstd decoder: %w", err)
|
||||||
}
|
}
|
||||||
reader = decoder
|
reader = decoder
|
||||||
defer decoder.Close()
|
defer decoder.Close()
|
||||||
@@ -109,7 +111,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
data, err := io.ReadAll(reader)
|
data, err := io.ReadAll(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
if stream {
|
if stream {
|
||||||
@@ -124,16 +126,18 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
}
|
}
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
apiKey, baseURL := claudeCreds(auth)
|
apiKey, baseURL := claudeCreds(auth)
|
||||||
|
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = "https://api.anthropic.com"
|
baseURL = "https://api.anthropic.com"
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("claude")
|
to := sdktranslator.FromString("claude")
|
||||||
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
body := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||||
@@ -164,27 +168,35 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
defer func() { _ = resp.Body.Close() }()
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("response body close error: %v", errClose)
|
||||||
|
}
|
||||||
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("response body close error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// If from == to (Claude → Claude), directly forward the SSE stream without translation
|
// If from == to (Claude → Claude), directly forward the SSE stream without translation
|
||||||
if from == to {
|
if from == to {
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
@@ -199,15 +211,16 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
cloned[len(line)] = '\n'
|
cloned[len(line)] = '\n'
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
|
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// For other formats, use translation
|
// For other formats, use translation
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
var param any
|
var param any
|
||||||
@@ -222,12 +235,13 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -39,13 +39,14 @@ func (e *CodexExecutor) Identifier() string { return "codex" }
|
|||||||
|
|
||||||
func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
func (e *CodexExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
||||||
|
|
||||||
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
apiKey, baseURL := codexCreds(auth)
|
apiKey, baseURL := codexCreds(auth)
|
||||||
|
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = "https://chatgpt.com/backend-api/codex"
|
baseURL = "https://chatgpt.com/backend-api/codex"
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("codex")
|
to := sdktranslator.FromString("codex")
|
||||||
@@ -101,7 +102,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
url := strings.TrimSuffix(baseURL, "/") + "/responses"
|
url := strings.TrimSuffix(baseURL, "/") + "/responses"
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
applyCodexHeaders(httpReq, auth, apiKey)
|
applyCodexHeaders(httpReq, auth, apiKey)
|
||||||
var authID, authLabel, authType, authValue string
|
var authID, authLabel, authType, authValue string
|
||||||
@@ -125,23 +126,28 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
AuthValue: authValue,
|
AuthValue: authValue,
|
||||||
})
|
})
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
}
|
||||||
|
}()
|
||||||
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
data, err := io.ReadAll(resp.Body)
|
data, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
|
|
||||||
@@ -162,18 +168,21 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
|
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, line, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
|
err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
apiKey, baseURL := codexCreds(auth)
|
apiKey, baseURL := codexCreds(auth)
|
||||||
|
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = "https://chatgpt.com/backend-api/codex"
|
baseURL = "https://chatgpt.com/backend-api/codex"
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("codex")
|
to := sdktranslator.FromString("codex")
|
||||||
@@ -253,28 +262,36 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
defer func() { _ = resp.Body.Close() }()
|
data, readErr := io.ReadAll(httpResp.Body)
|
||||||
b, readErr := io.ReadAll(resp.Body)
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, readErr)
|
recordAPIResponseError(ctx, e.cfg, readErr)
|
||||||
return nil, readErr
|
return nil, readErr
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("codex executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
var param any
|
var param any
|
||||||
@@ -296,12 +313,13 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -51,12 +51,13 @@ func (e *GeminiCLIExecutor) Identifier() string { return "gemini-cli" }
|
|||||||
|
|
||||||
func (e *GeminiCLIExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
func (e *GeminiCLIExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
||||||
|
|
||||||
func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
|
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("gemini-cli")
|
to := sdktranslator.FromString("gemini-cli")
|
||||||
@@ -104,7 +105,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
|||||||
|
|
||||||
tok, errTok := tokenSource.Token()
|
tok, errTok := tokenSource.Token()
|
||||||
if errTok != nil {
|
if errTok != nil {
|
||||||
return cliproxyexecutor.Response{}, errTok
|
err = errTok
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
|
updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
|
||||||
|
|
||||||
@@ -115,7 +117,8 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
|||||||
|
|
||||||
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
|
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
|
||||||
if errReq != nil {
|
if errReq != nil {
|
||||||
return cliproxyexecutor.Response{}, errReq
|
err = errReq
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
reqHTTP.Header.Set("Content-Type", "application/json")
|
reqHTTP.Header.Set("Content-Type", "application/json")
|
||||||
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
|
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
|
||||||
@@ -133,44 +136,60 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
|||||||
AuthValue: authValue,
|
AuthValue: authValue,
|
||||||
})
|
})
|
||||||
|
|
||||||
resp, errDo := httpClient.Do(reqHTTP)
|
httpResp, errDo := httpClient.Do(reqHTTP)
|
||||||
if errDo != nil {
|
if errDo != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||||
return cliproxyexecutor.Response{}, errDo
|
err = errDo
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
data, errRead := io.ReadAll(resp.Body)
|
|
||||||
_ = resp.Body.Close()
|
data, errRead := io.ReadAll(httpResp.Body)
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("gemini cli executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if errRead != nil {
|
if errRead != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||||
return cliproxyexecutor.Response{}, errRead
|
err = errRead
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
if httpResp.StatusCode >= 200 && httpResp.StatusCode < 300 {
|
||||||
reporter.publish(ctx, parseGeminiCLIUsage(data))
|
reporter.publish(ctx, parseGeminiCLIUsage(data))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, ¶m)
|
out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, bytes.Clone(opts.OriginalRequest), payload, data, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
lastStatus = resp.StatusCode
|
|
||||||
|
lastStatus = httpResp.StatusCode
|
||||||
lastBody = append([]byte(nil), data...)
|
lastBody = append([]byte(nil), data...)
|
||||||
if resp.StatusCode != 429 {
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
|
||||||
break
|
if httpResp.StatusCode == 429 {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(lastBody) > 0 {
|
if len(lastBody) > 0 {
|
||||||
appendAPIResponseChunk(ctx, e.cfg, lastBody)
|
appendAPIResponseChunk(ctx, e.cfg, lastBody)
|
||||||
}
|
}
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: lastStatus, msg: string(lastBody)}
|
if lastStatus == 0 {
|
||||||
|
lastStatus = 429
|
||||||
|
}
|
||||||
|
err = statusErr{code: lastStatus, msg: string(lastBody)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
|
tokenSource, baseTokenData, err := prepareGeminiCLITokenSource(ctx, e.cfg, auth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("gemini-cli")
|
to := sdktranslator.FromString("gemini-cli")
|
||||||
@@ -207,7 +226,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
|
|
||||||
tok, errTok := tokenSource.Token()
|
tok, errTok := tokenSource.Token()
|
||||||
if errTok != nil {
|
if errTok != nil {
|
||||||
return nil, errTok
|
err = errTok
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
|
updateGeminiCLITokenMetadata(auth, baseTokenData, tok)
|
||||||
|
|
||||||
@@ -220,7 +240,8 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
|
|
||||||
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
|
reqHTTP, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
|
||||||
if errReq != nil {
|
if errReq != nil {
|
||||||
return nil, errReq
|
err = errReq
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
reqHTTP.Header.Set("Content-Type", "application/json")
|
reqHTTP.Header.Set("Content-Type", "application/json")
|
||||||
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
|
reqHTTP.Header.Set("Authorization", "Bearer "+tok.AccessToken)
|
||||||
@@ -238,33 +259,43 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
AuthValue: authValue,
|
AuthValue: authValue,
|
||||||
})
|
})
|
||||||
|
|
||||||
resp, errDo := httpClient.Do(reqHTTP)
|
httpResp, errDo := httpClient.Do(reqHTTP)
|
||||||
if errDo != nil {
|
if errDo != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||||
return nil, errDo
|
err = errDo
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
data, errRead := io.ReadAll(resp.Body)
|
data, errRead := io.ReadAll(httpResp.Body)
|
||||||
_ = resp.Body.Close()
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("gemini cli executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
if errRead != nil {
|
if errRead != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||||
return nil, errRead
|
err = errRead
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
lastStatus = resp.StatusCode
|
lastStatus = httpResp.StatusCode
|
||||||
lastBody = append([]byte(nil), data...)
|
lastBody = append([]byte(nil), data...)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(data))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(data))
|
||||||
if resp.StatusCode == 429 {
|
if httpResp.StatusCode == 429 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(data)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func(resp *http.Response, reqBody []byte, attempt string) {
|
go func(resp *http.Response, reqBody []byte, attempt string) {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
|
if errClose := resp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("gemini cli executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
if opts.Alt == "" {
|
if opts.Alt == "" {
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
@@ -290,6 +321,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
}
|
}
|
||||||
if errScan := scanner.Err(); errScan != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
|
reporter.publishFailure(ctx)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@@ -298,6 +330,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
data, errRead := io.ReadAll(resp.Body)
|
data, errRead := io.ReadAll(resp.Body)
|
||||||
if errRead != nil {
|
if errRead != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||||
|
reporter.publishFailure(ctx)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: errRead}
|
out <- cliproxyexecutor.StreamChunk{Err: errRead}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -313,15 +346,19 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
for i := range segments {
|
for i := range segments {
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(segments[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(segments[i])}
|
||||||
}
|
}
|
||||||
}(resp, append([]byte(nil), payload...), attemptModel)
|
}(httpResp, append([]byte(nil), payload...), attemptModel)
|
||||||
|
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(lastBody) > 0 {
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, lastBody)
|
||||||
|
}
|
||||||
if lastStatus == 0 {
|
if lastStatus == 0 {
|
||||||
lastStatus = 429
|
lastStatus = 429
|
||||||
}
|
}
|
||||||
return nil, statusErr{code: lastStatus, msg: string(lastBody)}
|
err = statusErr{code: lastStatus, msg: string(lastBody)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -68,10 +68,11 @@ func (e *GeminiExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) e
|
|||||||
// Returns:
|
// Returns:
|
||||||
// - cliproxyexecutor.Response: The response from the API
|
// - cliproxyexecutor.Response: The response from the API
|
||||||
// - error: An error if the request fails
|
// - error: An error if the request fails
|
||||||
func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
apiKey, bearer := geminiCreds(auth)
|
apiKey, bearer := geminiCreds(auth)
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
// Official Gemini API via API key or OAuth bearer
|
// Official Gemini API via API key or OAuth bearer
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
@@ -98,7 +99,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
|
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
httpReq.Header.Set("Content-Type", "application/json")
|
httpReq.Header.Set("Content-Type", "application/json")
|
||||||
if apiKey != "" {
|
if apiKey != "" {
|
||||||
@@ -125,35 +126,42 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
log.Errorf("gemini executor: close response body error: %v", errClose)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
}
|
||||||
|
}()
|
||||||
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
data, err := io.ReadAll(resp.Body)
|
data, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
reporter.publish(ctx, parseGeminiUsage(data))
|
reporter.publish(ctx, parseGeminiUsage(data))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
apiKey, bearer := geminiCreds(auth)
|
apiKey, bearer := geminiCreds(auth)
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("gemini")
|
to := sdktranslator.FromString("gemini")
|
||||||
@@ -202,24 +210,32 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
defer func() { _ = resp.Body.Close() }()
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("gemini executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("gemini executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
var param any
|
var param any
|
||||||
@@ -238,12 +254,13 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
for i := range lines {
|
for i := range lines {
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(lines[i])}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -41,16 +41,18 @@ func (e *IFlowExecutor) Identifier() string { return "iflow" }
|
|||||||
func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
func (e *IFlowExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
||||||
|
|
||||||
// Execute performs a non-streaming chat completion request.
|
// Execute performs a non-streaming chat completion request.
|
||||||
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
apiKey, baseURL := iflowCreds(auth)
|
apiKey, baseURL := iflowCreds(auth)
|
||||||
if strings.TrimSpace(apiKey) == "" {
|
if strings.TrimSpace(apiKey) == "" {
|
||||||
return cliproxyexecutor.Response{}, fmt.Errorf("iflow executor: missing api key")
|
err = fmt.Errorf("iflow executor: missing api key")
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = iflowauth.DefaultAPIBaseURL
|
baseURL = iflowauth.DefaultAPIBaseURL
|
||||||
}
|
}
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("openai")
|
to := sdktranslator.FromString("openai")
|
||||||
@@ -60,7 +62,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
|
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
applyIFlowHeaders(httpReq, apiKey, false)
|
applyIFlowHeaders(httpReq, apiKey, false)
|
||||||
var authID, authLabel, authType, authValue string
|
var authID, authLabel, authType, authValue string
|
||||||
@@ -82,45 +84,53 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("iflow executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
|
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
b, _ := io.ReadAll(resp.Body)
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("iflow request error: status %d body %s", resp.StatusCode, string(b))
|
log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, string(b))
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := io.ReadAll(resp.Body)
|
data, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
reporter.publish(ctx, parseOpenAIUsage(data))
|
reporter.publish(ctx, parseOpenAIUsage(data))
|
||||||
|
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming chat completion request.
|
// ExecuteStream performs a streaming chat completion request.
|
||||||
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
apiKey, baseURL := iflowCreds(auth)
|
apiKey, baseURL := iflowCreds(auth)
|
||||||
if strings.TrimSpace(apiKey) == "" {
|
if strings.TrimSpace(apiKey) == "" {
|
||||||
return nil, fmt.Errorf("iflow executor: missing api key")
|
err = fmt.Errorf("iflow executor: missing api key")
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = iflowauth.DefaultAPIBaseURL
|
baseURL = iflowauth.DefaultAPIBaseURL
|
||||||
}
|
}
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("openai")
|
to := sdktranslator.FromString("openai")
|
||||||
@@ -158,27 +168,35 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
defer func() { _ = resp.Body.Close() }()
|
data, _ := io.ReadAll(httpResp.Body)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
log.Errorf("iflow executor: close response body error: %v", errClose)
|
||||||
log.Debugf("iflow streaming error: status %d body %s", resp.StatusCode, string(b))
|
}
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
|
log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, string(data))
|
||||||
|
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("iflow executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
var param any
|
var param any
|
||||||
@@ -193,13 +211,14 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := scanner.Err(); err != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountTokens is not implemented for iFlow.
|
// CountTokens is not implemented for iFlow.
|
||||||
|
|||||||
@@ -38,12 +38,15 @@ func (e *OpenAICompatExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.A
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
baseURL, apiKey := e.resolveCredentials(auth)
|
baseURL, apiKey := e.resolveCredentials(auth)
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
|
err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
|
||||||
|
|
||||||
// Translate inbound request to OpenAI format
|
// Translate inbound request to OpenAI format
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
@@ -56,7 +59,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
|
|||||||
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
|
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
httpReq.Header.Set("Content-Type", "application/json")
|
httpReq.Header.Set("Content-Type", "application/json")
|
||||||
if apiKey != "" {
|
if apiKey != "" {
|
||||||
@@ -82,38 +85,47 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
log.Errorf("openai compat executor: close response body error: %v", errClose)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
}
|
||||||
|
}()
|
||||||
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, body)
|
appendAPIResponseChunk(ctx, e.cfg, body)
|
||||||
reporter.publish(ctx, parseOpenAIUsage(body))
|
reporter.publish(ctx, parseOpenAIUsage(body))
|
||||||
// Translate response back to source format when needed
|
// Translate response back to source format when needed
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
baseURL, apiKey := e.resolveCredentials(auth)
|
baseURL, apiKey := e.resolveCredentials(auth)
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
return nil, statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
|
err = statusErr{code: http.StatusUnauthorized, msg: "missing provider baseURL"}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("openai")
|
to := sdktranslator.FromString("openai")
|
||||||
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
translated := sdktranslator.TranslateRequest(from, to, req.Model, bytes.Clone(req.Payload), true)
|
||||||
@@ -152,24 +164,32 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
defer func() { _ = resp.Body.Close() }()
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("openai compat executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("openai compat executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
var param any
|
var param any
|
||||||
@@ -189,12 +209,13 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
|
|||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -38,13 +38,14 @@ func (e *QwenExecutor) Identifier() string { return "qwen" }
|
|||||||
|
|
||||||
func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
func (e *QwenExecutor) PrepareRequest(_ *http.Request, _ *cliproxyauth.Auth) error { return nil }
|
||||||
|
|
||||||
func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
|
||||||
token, baseURL := qwenCreds(auth)
|
token, baseURL := qwenCreds(auth)
|
||||||
|
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = "https://portal.qwen.ai/v1"
|
baseURL = "https://portal.qwen.ai/v1"
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("openai")
|
to := sdktranslator.FromString("openai")
|
||||||
@@ -53,7 +54,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
|||||||
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
|
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
|
||||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
applyQwenHeaders(httpReq, token, false)
|
applyQwenHeaders(httpReq, token, false)
|
||||||
var authID, authLabel, authType, authValue string
|
var authID, authLabel, authType, authValue string
|
||||||
@@ -75,38 +76,45 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
log.Errorf("qwen executor: close response body error: %v", errClose)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
}
|
||||||
|
}()
|
||||||
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(b)}
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
data, err := io.ReadAll(resp.Body)
|
data, err := io.ReadAll(httpResp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return cliproxyexecutor.Response{}, err
|
return resp, err
|
||||||
}
|
}
|
||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
reporter.publish(ctx, parseOpenAIUsage(data))
|
reporter.publish(ctx, parseOpenAIUsage(data))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, data, ¶m)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
||||||
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
||||||
token, baseURL := qwenCreds(auth)
|
token, baseURL := qwenCreds(auth)
|
||||||
|
|
||||||
if baseURL == "" {
|
if baseURL == "" {
|
||||||
baseURL = "https://portal.qwen.ai/v1"
|
baseURL = "https://portal.qwen.ai/v1"
|
||||||
}
|
}
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), req.Model, auth)
|
||||||
|
defer reporter.trackFailure(ctx, &err)
|
||||||
|
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
to := sdktranslator.FromString("openai")
|
to := sdktranslator.FromString("openai")
|
||||||
@@ -145,24 +153,32 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
})
|
})
|
||||||
|
|
||||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||||
resp, err := httpClient.Do(httpReq)
|
httpResp, err := httpClient.Do(httpReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
recordAPIResponseError(ctx, e.cfg, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
|
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||||
defer func() { _ = resp.Body.Close() }()
|
b, _ := io.ReadAll(httpResp.Body)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, string(b))
|
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, string(b))
|
||||||
return nil, statusErr{code: resp.StatusCode, msg: string(b)}
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("qwen executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
stream = out
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() {
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||||
|
log.Errorf("qwen executor: close response body error: %v", errClose)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
var param any
|
var param any
|
||||||
@@ -177,12 +193,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err = scanner.Err(); err != nil {
|
doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), body, bytes.Clone([]byte("[DONE]")), ¶m)
|
||||||
recordAPIResponseError(ctx, e.cfg, err)
|
for i := range doneChunks {
|
||||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
out <- cliproxyexecutor.StreamChunk{Payload: []byte(doneChunks[i])}
|
||||||
|
}
|
||||||
|
if errScan := scanner.Err(); errScan != nil {
|
||||||
|
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||||
|
reporter.publishFailure(ctx)
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return out, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -41,6 +41,23 @@ func newUsageReporter(ctx context.Context, provider, model string, auth *cliprox
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
|
func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
|
||||||
|
r.publishWithOutcome(ctx, detail, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *usageReporter) publishFailure(ctx context.Context) {
|
||||||
|
r.publishWithOutcome(ctx, usage.Detail{}, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *usageReporter) trackFailure(ctx context.Context, errPtr *error) {
|
||||||
|
if r == nil || errPtr == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if *errPtr != nil {
|
||||||
|
r.publishFailure(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *usageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool) {
|
||||||
if r == nil {
|
if r == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -50,7 +67,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
|
|||||||
detail.TotalTokens = total
|
detail.TotalTokens = total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 {
|
if detail.InputTokens == 0 && detail.OutputTokens == 0 && detail.ReasoningTokens == 0 && detail.CachedTokens == 0 && detail.TotalTokens == 0 && !failed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.once.Do(func() {
|
r.once.Do(func() {
|
||||||
@@ -61,6 +78,7 @@ func (r *usageReporter) publish(ctx context.Context, detail usage.Detail) {
|
|||||||
APIKey: r.apiKey,
|
APIKey: r.apiKey,
|
||||||
AuthID: r.authID,
|
AuthID: r.authID,
|
||||||
RequestedAt: r.requestedAt,
|
RequestedAt: r.requestedAt,
|
||||||
|
Failed: failed,
|
||||||
Detail: detail,
|
Detail: detail,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ type RequestDetail struct {
|
|||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
Source string `json:"source"`
|
Source string `json:"source"`
|
||||||
Tokens TokenStats `json:"tokens"`
|
Tokens TokenStats `json:"tokens"`
|
||||||
|
Failed bool `json:"failed"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TokenStats captures the token usage breakdown for a request.
|
// TokenStats captures the token usage breakdown for a request.
|
||||||
@@ -165,7 +166,11 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record)
|
|||||||
if statsKey == "" {
|
if statsKey == "" {
|
||||||
statsKey = resolveAPIIdentifier(ctx, record)
|
statsKey = resolveAPIIdentifier(ctx, record)
|
||||||
}
|
}
|
||||||
success := resolveSuccess(ctx)
|
failed := record.Failed
|
||||||
|
if !failed {
|
||||||
|
failed = !resolveSuccess(ctx)
|
||||||
|
}
|
||||||
|
success := !failed
|
||||||
modelName := record.Model
|
modelName := record.Model
|
||||||
if modelName == "" {
|
if modelName == "" {
|
||||||
modelName = "unknown"
|
modelName = "unknown"
|
||||||
@@ -193,6 +198,7 @@ func (s *RequestStatistics) Record(ctx context.Context, record coreusage.Record)
|
|||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
Source: record.Source,
|
Source: record.Source,
|
||||||
Tokens: detail,
|
Tokens: detail,
|
||||||
|
Failed: failed,
|
||||||
})
|
})
|
||||||
|
|
||||||
s.requestsByDay[dayKey]++
|
s.requestsByDay[dayKey]++
|
||||||
|
|||||||
@@ -40,6 +40,8 @@ const (
|
|||||||
refreshCheckInterval = 5 * time.Second
|
refreshCheckInterval = 5 * time.Second
|
||||||
refreshPendingBackoff = time.Minute
|
refreshPendingBackoff = time.Minute
|
||||||
refreshFailureBackoff = 5 * time.Minute
|
refreshFailureBackoff = 5 * time.Minute
|
||||||
|
quotaBackoffBase = time.Second
|
||||||
|
quotaBackoffMax = 30 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// Result captures execution outcome used to adjust auth state.
|
// Result captures execution outcome used to adjust auth state.
|
||||||
@@ -532,9 +534,15 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
|
|||||||
suspendReason = "payment_required"
|
suspendReason = "payment_required"
|
||||||
shouldSuspendModel = true
|
shouldSuspendModel = true
|
||||||
case 429:
|
case 429:
|
||||||
next := now.Add(30 * time.Minute)
|
cooldown, nextLevel := nextQuotaCooldown(state.Quota.BackoffLevel)
|
||||||
|
next := now.Add(cooldown)
|
||||||
state.NextRetryAfter = next
|
state.NextRetryAfter = next
|
||||||
state.Quota = QuotaState{Exceeded: true, Reason: "quota", NextRecoverAt: next}
|
state.Quota = QuotaState{
|
||||||
|
Exceeded: true,
|
||||||
|
Reason: "quota",
|
||||||
|
NextRecoverAt: next,
|
||||||
|
BackoffLevel: nextLevel,
|
||||||
|
}
|
||||||
suspendReason = "quota"
|
suspendReason = "quota"
|
||||||
shouldSuspendModel = true
|
shouldSuspendModel = true
|
||||||
setModelQuota = true
|
setModelQuota = true
|
||||||
@@ -608,6 +616,7 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
|||||||
earliestRetry := time.Time{}
|
earliestRetry := time.Time{}
|
||||||
quotaExceeded := false
|
quotaExceeded := false
|
||||||
quotaRecover := time.Time{}
|
quotaRecover := time.Time{}
|
||||||
|
maxBackoffLevel := 0
|
||||||
for _, state := range auth.ModelStates {
|
for _, state := range auth.ModelStates {
|
||||||
if state == nil {
|
if state == nil {
|
||||||
continue
|
continue
|
||||||
@@ -636,6 +645,9 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
|||||||
if quotaRecover.IsZero() || (!state.Quota.NextRecoverAt.IsZero() && state.Quota.NextRecoverAt.Before(quotaRecover)) {
|
if quotaRecover.IsZero() || (!state.Quota.NextRecoverAt.IsZero() && state.Quota.NextRecoverAt.Before(quotaRecover)) {
|
||||||
quotaRecover = state.Quota.NextRecoverAt
|
quotaRecover = state.Quota.NextRecoverAt
|
||||||
}
|
}
|
||||||
|
if state.Quota.BackoffLevel > maxBackoffLevel {
|
||||||
|
maxBackoffLevel = state.Quota.BackoffLevel
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
auth.Unavailable = allUnavailable
|
auth.Unavailable = allUnavailable
|
||||||
@@ -648,10 +660,12 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
|||||||
auth.Quota.Exceeded = true
|
auth.Quota.Exceeded = true
|
||||||
auth.Quota.Reason = "quota"
|
auth.Quota.Reason = "quota"
|
||||||
auth.Quota.NextRecoverAt = quotaRecover
|
auth.Quota.NextRecoverAt = quotaRecover
|
||||||
|
auth.Quota.BackoffLevel = maxBackoffLevel
|
||||||
} else {
|
} else {
|
||||||
auth.Quota.Exceeded = false
|
auth.Quota.Exceeded = false
|
||||||
auth.Quota.Reason = ""
|
auth.Quota.Reason = ""
|
||||||
auth.Quota.NextRecoverAt = time.Time{}
|
auth.Quota.NextRecoverAt = time.Time{}
|
||||||
|
auth.Quota.BackoffLevel = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -685,6 +699,7 @@ func clearAuthStateOnSuccess(auth *Auth, now time.Time) {
|
|||||||
auth.Quota.Exceeded = false
|
auth.Quota.Exceeded = false
|
||||||
auth.Quota.Reason = ""
|
auth.Quota.Reason = ""
|
||||||
auth.Quota.NextRecoverAt = time.Time{}
|
auth.Quota.NextRecoverAt = time.Time{}
|
||||||
|
auth.Quota.BackoffLevel = 0
|
||||||
auth.LastError = nil
|
auth.LastError = nil
|
||||||
auth.NextRetryAfter = time.Time{}
|
auth.NextRetryAfter = time.Time{}
|
||||||
auth.UpdatedAt = now
|
auth.UpdatedAt = now
|
||||||
@@ -734,7 +749,9 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
|
|||||||
auth.StatusMessage = "quota exhausted"
|
auth.StatusMessage = "quota exhausted"
|
||||||
auth.Quota.Exceeded = true
|
auth.Quota.Exceeded = true
|
||||||
auth.Quota.Reason = "quota"
|
auth.Quota.Reason = "quota"
|
||||||
auth.Quota.NextRecoverAt = now.Add(30 * time.Minute)
|
cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel)
|
||||||
|
auth.Quota.NextRecoverAt = now.Add(cooldown)
|
||||||
|
auth.Quota.BackoffLevel = nextLevel
|
||||||
auth.NextRetryAfter = auth.Quota.NextRecoverAt
|
auth.NextRetryAfter = auth.Quota.NextRecoverAt
|
||||||
case 408, 500, 502, 503, 504:
|
case 408, 500, 502, 503, 504:
|
||||||
auth.StatusMessage = "transient upstream error"
|
auth.StatusMessage = "transient upstream error"
|
||||||
@@ -746,6 +763,21 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, now time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nextQuotaCooldown returns the next cooldown duration and updated backoff level for repeated quota errors.
|
||||||
|
func nextQuotaCooldown(prevLevel int) (time.Duration, int) {
|
||||||
|
if prevLevel < 0 {
|
||||||
|
prevLevel = 0
|
||||||
|
}
|
||||||
|
cooldown := quotaBackoffBase * time.Duration(1<<prevLevel)
|
||||||
|
if cooldown < quotaBackoffBase {
|
||||||
|
cooldown = quotaBackoffBase
|
||||||
|
}
|
||||||
|
if cooldown >= quotaBackoffMax {
|
||||||
|
return quotaBackoffMax, prevLevel
|
||||||
|
}
|
||||||
|
return cooldown, prevLevel + 1
|
||||||
|
}
|
||||||
|
|
||||||
// List returns all auth entries currently known by the manager.
|
// List returns all auth entries currently known by the manager.
|
||||||
func (m *Manager) List() []*Auth {
|
func (m *Manager) List() []*Auth {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
|
|||||||
@@ -65,6 +65,8 @@ type QuotaState struct {
|
|||||||
Reason string `json:"reason,omitempty"`
|
Reason string `json:"reason,omitempty"`
|
||||||
// NextRecoverAt is when the credential may become available again.
|
// NextRecoverAt is when the credential may become available again.
|
||||||
NextRecoverAt time.Time `json:"next_recover_at"`
|
NextRecoverAt time.Time `json:"next_recover_at"`
|
||||||
|
// BackoffLevel stores the progressive cooldown exponent used for rate limits.
|
||||||
|
BackoffLevel int `json:"backoff_level,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ModelState captures the execution state for a specific model under an auth entry.
|
// ModelState captures the execution state for a specific model under an auth entry.
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ type Record struct {
|
|||||||
AuthID string
|
AuthID string
|
||||||
Source string
|
Source string
|
||||||
RequestedAt time.Time
|
RequestedAt time.Time
|
||||||
|
Failed bool
|
||||||
Detail Detail
|
Detail Detail
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user