Compare commits

...

3 Commits

Author SHA1 Message Date
Luis Pater
09b9d3b3fa Unlock mutex before returning error in handlers.go to prevent deadlocks
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
2025-08-29 09:46:12 +08:00
Luis Pater
e9e0016a63 Fix some bugs. 2025-08-29 04:05:08 +08:00
Luis Pater
3704dae342 Add nil-check for GetRequestMutex across handlers to prevent potential panics
Some checks failed
docker-image / docker (push) Has been cancelled
goreleaser / goreleaser (push) Has been cancelled
- Updated all handlers to safely unlock the request mutex only if it's non-nil.
- Enhanced mutex locking and unlocking logic to avoid runtime errors.
- Improved robustness of resource cleanup across clients.

Add `GetRequestMutex` method for synchronization across clients

- Introduced a new `GetRequestMutex` method in OpenAICompatibilityClient, CodexClient, GeminiCLIClient, GeminiClient, and QwenClient for request synchronization.
- Ensures only one request is processed at a time to manage quotas effectively.
2025-08-29 00:23:37 +08:00
11 changed files with 99 additions and 17 deletions

View File

@@ -17,6 +17,7 @@ import (
. "github.com/luispater/CLIProxyAPI/internal/constant"
"github.com/luispater/CLIProxyAPI/internal/interfaces"
"github.com/luispater/CLIProxyAPI/internal/registry"
"github.com/luispater/CLIProxyAPI/internal/util"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
@@ -133,7 +134,9 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
// Ensure the client's mutex is unlocked on function exit.
// This prevents deadlocks and ensures proper resource cleanup
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
retryCount := 0
@@ -191,7 +194,7 @@ outLoop:
continue outLoop // Restart the client selection process
}
case 403, 408, 500, 502, 503, 504:
log.Debugf("http status code %d, switch client", errInfo.StatusCode)
log.Debugf("http status code %d, switch client, %s", errInfo.StatusCode, util.HideAPIKey(cliClient.GetEmail()))
retryCount++
continue outLoop
default:

View File

@@ -163,7 +163,9 @@ func (h *GeminiCLIAPIHandler) handleInternalStreamGenerateContent(c *gin.Context
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -244,7 +246,9 @@ func (h *GeminiCLIAPIHandler) handleInternalGenerateContent(c *gin.Context, rawJ
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -214,7 +214,9 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -303,7 +305,9 @@ func (h *GeminiAPIHandler) handleCountTokens(c *gin.Context, modelName string, r
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -354,7 +358,9 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -102,18 +102,19 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
}
}
// Lock the mutex to update the last used client index
h.Mutex.Lock()
if _, hasKey := h.LastUsedClientIndex[modelName]; !hasKey {
h.LastUsedClientIndex[modelName] = 0
}
if len(clients) == 0 {
h.Mutex.Unlock()
return nil, &interfaces.ErrorMessage{StatusCode: 500, Error: fmt.Errorf("no clients available")}
}
var cliClient interfaces.Client
// Lock the mutex to update the last used client index
h.Mutex.Lock()
startIndex := h.LastUsedClientIndex[modelName]
if (len(isGenerateContent) > 0 && isGenerateContent[0]) || len(isGenerateContent) == 0 {
currentIndex := (startIndex + 1) % len(clients)
@@ -157,14 +158,20 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
locked := false
for i := 0; i < len(reorderedClients); i++ {
cliClient = reorderedClients[i]
if cliClient.GetRequestMutex().TryLock() {
if mutex := cliClient.GetRequestMutex(); mutex != nil {
if mutex.TryLock() {
locked = true
break
}
} else {
locked = true
break
}
}
if !locked {
cliClient = clients[0]
cliClient.GetRequestMutex().Lock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Lock()
}
}
return cliClient, nil

View File

@@ -380,7 +380,9 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -454,7 +456,9 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -543,7 +547,9 @@ func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context,
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -623,7 +629,9 @@ func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, ra
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -535,7 +535,7 @@ func (c *ClaudeClient) GetEmail() string {
if ts, ok := c.tokenStorage.(*claude.ClaudeTokenStorage); ok {
return ts.Email
} else {
return ""
return c.cfg.ClaudeKey[c.apiKeyIndex].APIKey
}
}
@@ -557,3 +557,12 @@ func (c *ClaudeClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *ClaudeClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -430,3 +430,12 @@ func (c *CodexClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *CodexClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -851,3 +851,12 @@ func (c *GeminiCLIClient) GetUserAgent() string {
// return fmt.Sprintf("GeminiCLI/%s (%s; %s)", pluginVersion, runtime.GOOS, runtime.GOARCH)
return "google-api-nodejs-client/9.15.1"
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *GeminiCLIClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -425,3 +425,12 @@ func (c *GeminiClient) GetUserAgent() string {
// return fmt.Sprintf("GeminiCLI/%s (%s; %s)", pluginVersion, runtime.GOOS, runtime.GOARCH)
return "google-api-nodejs-client/9.15.1"
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *GeminiClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -390,3 +390,12 @@ func (c *OpenAICompatibilityClient) RefreshTokens(ctx context.Context) error {
// API keys don't need refreshing
return nil
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *OpenAICompatibilityClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -432,3 +432,12 @@ func (c *QwenClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *QwenClient) GetRequestMutex() *sync.Mutex {
return nil
}