Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b23e2da74 | ||
|
|
5ab0854b5b | ||
|
|
15981aa412 | ||
|
|
ac4f52c532 | ||
|
|
84fa497169 |
@@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers/openai"
|
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers/openai"
|
||||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
const oauthCallbackSuccessHTML = `<html><head><meta charset="utf-8"><title>Authentication successful</title><script>setTimeout(function(){window.close();},5000);</script></head><body><h1>Authentication successful!</h1><p>You can close this window.</p><p>This window will close automatically in 5 seconds.</p></body></html>`
|
const oauthCallbackSuccessHTML = `<html><head><meta charset="utf-8"><title>Authentication successful</title><script>setTimeout(function(){window.close();},5000);</script></head><body><h1>Authentication successful!</h1><p>You can close this window.</p><p>This window will close automatically in 5 seconds.</p></body></html>`
|
||||||
@@ -116,6 +117,10 @@ type Server struct {
|
|||||||
// cfg holds the current server configuration.
|
// cfg holds the current server configuration.
|
||||||
cfg *config.Config
|
cfg *config.Config
|
||||||
|
|
||||||
|
// oldConfigYaml stores a YAML snapshot of the previous configuration for change detection.
|
||||||
|
// This prevents issues when the config object is modified in place by Management API.
|
||||||
|
oldConfigYaml []byte
|
||||||
|
|
||||||
// accessManager handles request authentication providers.
|
// accessManager handles request authentication providers.
|
||||||
accessManager *sdkaccess.Manager
|
accessManager *sdkaccess.Manager
|
||||||
|
|
||||||
@@ -220,6 +225,8 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
|
|||||||
currentPath: wd,
|
currentPath: wd,
|
||||||
envManagementSecret: envManagementSecret,
|
envManagementSecret: envManagementSecret,
|
||||||
}
|
}
|
||||||
|
// Save initial YAML snapshot
|
||||||
|
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||||
s.applyAccessConfig(nil, cfg)
|
s.applyAccessConfig(nil, cfg)
|
||||||
// Initialize management handler
|
// Initialize management handler
|
||||||
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
|
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
|
||||||
@@ -654,7 +661,11 @@ func (s *Server) applyAccessConfig(oldCfg, newCfg *config.Config) {
|
|||||||
// - clients: The new slice of AI service clients
|
// - clients: The new slice of AI service clients
|
||||||
// - cfg: The new application configuration
|
// - cfg: The new application configuration
|
||||||
func (s *Server) UpdateClients(cfg *config.Config) {
|
func (s *Server) UpdateClients(cfg *config.Config) {
|
||||||
oldCfg := s.cfg
|
// Reconstruct old config from YAML snapshot to avoid reference sharing issues
|
||||||
|
var oldCfg *config.Config
|
||||||
|
if len(s.oldConfigYaml) > 0 {
|
||||||
|
_ = yaml.Unmarshal(s.oldConfigYaml, &oldCfg)
|
||||||
|
}
|
||||||
|
|
||||||
// Update request logger enabled state if it has changed
|
// Update request logger enabled state if it has changed
|
||||||
previousRequestLog := false
|
previousRequestLog := false
|
||||||
@@ -735,6 +746,8 @@ func (s *Server) UpdateClients(cfg *config.Config) {
|
|||||||
|
|
||||||
s.applyAccessConfig(oldCfg, cfg)
|
s.applyAccessConfig(oldCfg, cfg)
|
||||||
s.cfg = cfg
|
s.cfg = cfg
|
||||||
|
// Save YAML snapshot for next comparison
|
||||||
|
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||||
s.handlers.UpdateClients(&cfg.SDKConfig)
|
s.handlers.UpdateClients(&cfg.SDKConfig)
|
||||||
|
|
||||||
if !cfg.RemoteManagement.DisableControlPanel {
|
if !cfg.RemoteManagement.DisableControlPanel {
|
||||||
|
|||||||
@@ -8,6 +8,15 @@ import "time"
|
|||||||
// GetClaudeModels returns the standard Claude model definitions
|
// GetClaudeModels returns the standard Claude model definitions
|
||||||
func GetClaudeModels() []*ModelInfo {
|
func GetClaudeModels() []*ModelInfo {
|
||||||
return []*ModelInfo{
|
return []*ModelInfo{
|
||||||
|
|
||||||
|
{
|
||||||
|
ID: "claude-haiku-4-5-20251001",
|
||||||
|
Object: "model",
|
||||||
|
Created: 1759276800, // 2025-10-01
|
||||||
|
OwnedBy: "anthropic",
|
||||||
|
Type: "claude",
|
||||||
|
DisplayName: "Claude 4.5 Haiku",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
ID: "claude-sonnet-4-5-20250929",
|
ID: "claude-sonnet-4-5-20250929",
|
||||||
Object: "model",
|
Object: "model",
|
||||||
|
|||||||
@@ -143,6 +143,31 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
|
// If from == to (Claude → Claude), directly forward the SSE stream without translation
|
||||||
|
if from == to {
|
||||||
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
|
buf := make([]byte, 20_971_520)
|
||||||
|
scanner.Buffer(buf, 20_971_520)
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Bytes()
|
||||||
|
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||||
|
if detail, ok := parseClaudeStreamUsage(line); ok {
|
||||||
|
reporter.publish(ctx, detail)
|
||||||
|
}
|
||||||
|
// Forward the line as-is to preserve SSE format
|
||||||
|
cloned := make([]byte, len(line)+1)
|
||||||
|
copy(cloned, line)
|
||||||
|
cloned[len(line)] = '\n'
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
|
||||||
|
}
|
||||||
|
if err = scanner.Err(); err != nil {
|
||||||
|
out <- cliproxyexecutor.StreamChunk{Err: err}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// For other formats, use translation
|
||||||
scanner := bufio.NewScanner(resp.Body)
|
scanner := bufio.NewScanner(resp.Body)
|
||||||
buf := make([]byte, 20_971_520)
|
buf := make([]byte, 20_971_520)
|
||||||
scanner.Buffer(buf, 20_971_520)
|
scanner.Buffer(buf, 20_971_520)
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ type ConvertOpenAIResponseToAnthropicParams struct {
|
|||||||
ContentBlocksStopped bool
|
ContentBlocksStopped bool
|
||||||
// Track if message_delta has been sent
|
// Track if message_delta has been sent
|
||||||
MessageDeltaSent bool
|
MessageDeltaSent bool
|
||||||
|
// Track if message_start has been sent
|
||||||
|
MessageStarted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToolCallAccumulator holds the state for accumulating tool call data
|
// ToolCallAccumulator holds the state for accumulating tool call data
|
||||||
@@ -84,20 +86,12 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR
|
|||||||
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
|
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||||
}
|
}
|
||||||
|
|
||||||
root := gjson.ParseBytes(rawJSON)
|
streamResult := gjson.GetBytes(originalRequestRawJSON, "stream")
|
||||||
|
if !streamResult.Exists() || (streamResult.Exists() && streamResult.Type == gjson.False) {
|
||||||
// Check if this is a streaming chunk or non-streaming response
|
|
||||||
objectType := root.Get("object").String()
|
|
||||||
|
|
||||||
if objectType == "chat.completion.chunk" {
|
|
||||||
// Handle streaming response
|
|
||||||
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
|
|
||||||
} else if objectType == "chat.completion" {
|
|
||||||
// Handle non-streaming response
|
|
||||||
return convertOpenAINonStreamingToAnthropic(rawJSON)
|
return convertOpenAINonStreamingToAnthropic(rawJSON)
|
||||||
|
} else {
|
||||||
|
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||||
}
|
}
|
||||||
|
|
||||||
return []string{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events
|
// convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events
|
||||||
@@ -118,7 +112,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
|||||||
|
|
||||||
// Check if this is the first chunk (has role)
|
// Check if this is the first chunk (has role)
|
||||||
if delta := root.Get("choices.0.delta"); delta.Exists() {
|
if delta := root.Get("choices.0.delta"); delta.Exists() {
|
||||||
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" {
|
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" && !param.MessageStarted {
|
||||||
// Send message_start event
|
// Send message_start event
|
||||||
messageStart := map[string]interface{}{
|
messageStart := map[string]interface{}{
|
||||||
"type": "message_start",
|
"type": "message_start",
|
||||||
@@ -138,6 +132,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
|||||||
}
|
}
|
||||||
messageStartJSON, _ := json.Marshal(messageStart)
|
messageStartJSON, _ := json.Marshal(messageStart)
|
||||||
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
|
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
|
||||||
|
param.MessageStarted = true
|
||||||
|
|
||||||
// Don't send content_block_start for text here - wait for actual content
|
// Don't send content_block_start for text here - wait for actual content
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,8 +7,9 @@
|
|||||||
package claude
|
package claude
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
@@ -197,33 +198,65 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
||||||
|
// v6.1: Intelligent Buffered Streamer strategy
|
||||||
|
// Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms).
|
||||||
|
// Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing
|
||||||
|
// flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency.
|
||||||
|
writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB
|
||||||
|
ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
var chunkIdx int
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.Request.Context().Done():
|
case <-c.Request.Context().Done():
|
||||||
|
// Context cancelled, flush any remaining data before exit
|
||||||
|
_ = writer.Flush()
|
||||||
cancel(c.Request.Context().Err())
|
cancel(c.Request.Context().Err())
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
// Smart flush: only flush when buffer has sufficient data (≥50% full)
|
||||||
|
// This reduces flush frequency while ensuring data flows naturally
|
||||||
|
buffered := writer.Buffered()
|
||||||
|
if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer)
|
||||||
|
if err := writer.Flush(); err != nil {
|
||||||
|
// Error flushing, cancel and return
|
||||||
|
cancel(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
flusher.Flush() // Also flush the underlying http.ResponseWriter
|
||||||
|
}
|
||||||
|
|
||||||
case chunk, ok := <-data:
|
case chunk, ok := <-data:
|
||||||
if !ok {
|
if !ok {
|
||||||
flusher.Flush()
|
// Stream ended, flush remaining data
|
||||||
|
_ = writer.Flush()
|
||||||
cancel(nil)
|
cancel(nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytes.HasPrefix(chunk, []byte("event:")) {
|
// Forward the complete SSE event block directly (already formatted by the translator).
|
||||||
_, _ = c.Writer.Write([]byte("\n"))
|
// The translator returns a complete SSE-compliant event block, including event:, data:, and separators.
|
||||||
|
// The handler just needs to forward it without reassembly.
|
||||||
|
if len(chunk) > 0 {
|
||||||
|
_, _ = writer.Write(chunk)
|
||||||
}
|
}
|
||||||
|
chunkIdx++
|
||||||
|
|
||||||
_, _ = c.Writer.Write(chunk)
|
|
||||||
_, _ = c.Writer.Write([]byte("\n"))
|
|
||||||
|
|
||||||
flusher.Flush()
|
|
||||||
case errMsg, ok := <-errs:
|
case errMsg, ok := <-errs:
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
// An error occurred: emit as a proper SSE error event
|
||||||
flusher.Flush()
|
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
|
||||||
|
_, _ = writer.WriteString("event: error\n")
|
||||||
|
_, _ = writer.WriteString("data: ")
|
||||||
|
_, _ = writer.Write(errorBytes)
|
||||||
|
_, _ = writer.WriteString("\n\n")
|
||||||
|
_ = writer.Flush()
|
||||||
}
|
}
|
||||||
var execErr error
|
var execErr error
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
@@ -231,7 +264,26 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
|
|||||||
}
|
}
|
||||||
cancel(execErr)
|
cancel(execErr)
|
||||||
return
|
return
|
||||||
case <-time.After(500 * time.Millisecond):
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type claudeErrorDetail struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type claudeErrorResponse struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Error claudeErrorDetail `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ClaudeCodeAPIHandler) toClaudeError(msg *interfaces.ErrorMessage) claudeErrorResponse {
|
||||||
|
return claudeErrorResponse{
|
||||||
|
Type: "error",
|
||||||
|
Error: claudeErrorDetail{
|
||||||
|
Type: "api_error",
|
||||||
|
Message: msg.Error.Error(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user