fix(amp): address PR review - stream thinking suppression, SSE detection, test init
- Call suppressAmpThinking in rewriteStreamEvent for streaming path - Handle nil return from suppressAmpThinking to skip suppressed events - Narrow looksLikeSSEChunk to line-prefix detection (HasPrefix vs Contains) - Initialize suppressedContentBlock map in test
This commit is contained in:
@@ -36,14 +36,14 @@ func NewResponseRewriter(w gin.ResponseWriter, originalModel string) *ResponseRe
|
|||||||
const maxBufferedResponseBytes = 2 * 1024 * 1024 // 2MB safety cap
|
const maxBufferedResponseBytes = 2 * 1024 * 1024 // 2MB safety cap
|
||||||
|
|
||||||
func looksLikeSSEChunk(data []byte) bool {
|
func looksLikeSSEChunk(data []byte) bool {
|
||||||
return bytes.Contains(data, []byte("data:")) ||
|
for _, line := range bytes.Split(data, []byte("\n")) {
|
||||||
bytes.Contains(data, []byte("event:")) ||
|
trimmed := bytes.TrimSpace(line)
|
||||||
bytes.Contains(data, []byte("message_start")) ||
|
if bytes.HasPrefix(trimmed, []byte("data:")) ||
|
||||||
bytes.Contains(data, []byte("message_delta")) ||
|
bytes.HasPrefix(trimmed, []byte("event:")) {
|
||||||
bytes.Contains(data, []byte("content_block_start")) ||
|
return true
|
||||||
bytes.Contains(data, []byte("content_block_delta")) ||
|
}
|
||||||
bytes.Contains(data, []byte("content_block_stop")) ||
|
}
|
||||||
bytes.Contains(data, []byte("\n\n"))
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rw *ResponseRewriter) enableStreaming(reason string) error {
|
func (rw *ResponseRewriter) enableStreaming(reason string) error {
|
||||||
@@ -250,11 +250,15 @@ func (rw *ResponseRewriter) rewriteStreamChunk(chunk []byte) []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if dataIdx >= 0 {
|
if dataIdx >= 0 {
|
||||||
// Found event+data pair - process through model rewriter only
|
// Found event+data pair - process through rewriter
|
||||||
// (no thinking suppression for streaming)
|
|
||||||
jsonData := bytes.TrimPrefix(bytes.TrimSpace(lines[dataIdx]), []byte("data: "))
|
jsonData := bytes.TrimPrefix(bytes.TrimSpace(lines[dataIdx]), []byte("data: "))
|
||||||
if len(jsonData) > 0 && jsonData[0] == '{' {
|
if len(jsonData) > 0 && jsonData[0] == '{' {
|
||||||
rewritten := rw.rewriteStreamEvent(jsonData)
|
rewritten := rw.rewriteStreamEvent(jsonData)
|
||||||
|
if rewritten == nil {
|
||||||
|
// Event suppressed (e.g. thinking block), skip event+data pair
|
||||||
|
i = dataIdx + 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Emit event line
|
// Emit event line
|
||||||
out = append(out, line)
|
out = append(out, line)
|
||||||
// Emit blank lines between event and data
|
// Emit blank lines between event and data
|
||||||
@@ -280,7 +284,9 @@ func (rw *ResponseRewriter) rewriteStreamChunk(chunk []byte) []byte {
|
|||||||
jsonData := bytes.TrimPrefix(trimmed, []byte("data: "))
|
jsonData := bytes.TrimPrefix(trimmed, []byte("data: "))
|
||||||
if len(jsonData) > 0 && jsonData[0] == '{' {
|
if len(jsonData) > 0 && jsonData[0] == '{' {
|
||||||
rewritten := rw.rewriteStreamEvent(jsonData)
|
rewritten := rw.rewriteStreamEvent(jsonData)
|
||||||
out = append(out, append([]byte("data: "), rewritten...))
|
if rewritten != nil {
|
||||||
|
out = append(out, append([]byte("data: "), rewritten...))
|
||||||
|
}
|
||||||
i++
|
i++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -296,9 +302,13 @@ func (rw *ResponseRewriter) rewriteStreamChunk(chunk []byte) []byte {
|
|||||||
|
|
||||||
// rewriteStreamEvent processes a single JSON event in the SSE stream.
|
// rewriteStreamEvent processes a single JSON event in the SSE stream.
|
||||||
// It rewrites model names and ensures signature fields exist.
|
// It rewrites model names and ensures signature fields exist.
|
||||||
// Unlike rewriteModelInResponse, it does NOT suppress thinking blocks
|
|
||||||
// in streaming mode - they are passed through with signature injection.
|
|
||||||
func (rw *ResponseRewriter) rewriteStreamEvent(data []byte) []byte {
|
func (rw *ResponseRewriter) rewriteStreamEvent(data []byte) []byte {
|
||||||
|
// Suppress thinking blocks before any other processing.
|
||||||
|
data = rw.suppressAmpThinking(data)
|
||||||
|
if len(data) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Inject empty signature where needed
|
// Inject empty signature where needed
|
||||||
data = ensureAmpSignature(data)
|
data = ensureAmpSignature(data)
|
||||||
|
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ func TestRewriteStreamChunk_MessageModel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRewriteStreamChunk_SuppressesThinkingContentBlockFrames(t *testing.T) {
|
func TestRewriteStreamChunk_SuppressesThinkingContentBlockFrames(t *testing.T) {
|
||||||
rw := &ResponseRewriter{}
|
rw := &ResponseRewriter{suppressedContentBlock: make(map[int]struct{})}
|
||||||
|
|
||||||
chunk := []byte("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\"}}\n\nevent: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"abc\"}}\n\nevent: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\nevent: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"tool_use\",\"name\":\"bash\",\"input\":{}}}\n\n")
|
chunk := []byte("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"thinking\",\"thinking\":\"\"}}\n\nevent: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"thinking_delta\",\"thinking\":\"abc\"}}\n\nevent: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\nevent: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":1,\"content_block\":{\"type\":\"tool_use\",\"name\":\"bash\",\"input\":{}}}\n\n")
|
||||||
result := rw.rewriteStreamChunk(chunk)
|
result := rw.rewriteStreamChunk(chunk)
|
||||||
|
|||||||
Reference in New Issue
Block a user