package monica import ( "bufio" "fmt" "io" "log" "sync" "time" "monica-proxy/internal/types" "monica-proxy/internal/utils" "net/http" "strings" "github.com/bytedance/sonic" "github.com/sashabaranov/go-openai" ) const ( sseObject = "chat.completion.chunk" sseFinish = "[DONE]" flushInterval = 100 * time.Millisecond // 刷新间隔 bufferSize = 4096 // 缓冲区大小 ) // SSEData 用于解析 Monica SSE json type SSEData struct { Text string `json:"text"` Finished bool `json:"finished"` AgentStatus AgentStatus `json:"agent_status,omitempty"` } type AgentStatus struct { UID string `json:"uid"` Type string `json:"type"` Text string `json:"text"` Metadata struct { Title string `json:"title"` ReasoningDetail string `json:"reasoning_detail"` } `json:"metadata"` } var sseDataPool = sync.Pool{ New: func() interface{} { return &SSEData{} }, } // StreamMonicaSSEToClient 将 Monica SSE 转成前端可用的流 func StreamMonicaSSEToClient(model string, w io.Writer, r io.Reader) error { reader := bufio.NewReaderSize(r, bufferSize) writer := bufio.NewWriterSize(w, bufferSize) defer writer.Flush() chatId := utils.RandStringUsingMathRand(29) now := time.Now().Unix() fingerprint := utils.RandStringUsingMathRand(10) // 创建一个定时刷新的 ticker ticker := time.NewTicker(flushInterval) defer ticker.Stop() // 创建一个 done channel 用于清理 done := make(chan struct{}) defer close(done) // 启动一个 goroutine 定期刷新缓冲区 go func() { for { select { case <-ticker.C: if f, ok := w.(http.Flusher); ok { writer.Flush() f.Flush() } case <-done: return } } }() var thinkFlag bool for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { return nil } return fmt.Errorf("read error: %w", err) } // Monica SSE 的行前缀一般是 "data: " if !strings.HasPrefix(line, "data: ") { continue } jsonStr := strings.TrimPrefix(line, "data: ") if jsonStr == "" { continue } // 从对象池获取 SSEData sseObj := sseDataPool.Get().(*SSEData) if err := sonic.UnmarshalString(jsonStr, sseObj); err != nil { sseDataPool.Put(sseObj) // 记录错误但继续处理 log.Printf("Error unmarshaling SSE data: %v", err) continue } // 将拆分后的文字写回 var sseMsg types.ChatCompletionStreamResponse switch { case sseObj.Finished: sseMsg = types.ChatCompletionStreamResponse{ ID: "chatcmpl-" + chatId, Object: sseObject, Created: now, Model: model, Choices: []types.ChatCompletionStreamChoice{ { Index: 0, Delta: openai.ChatCompletionStreamChoiceDelta{ Role: openai.ChatMessageRoleAssistant, }, FinishReason: openai.FinishReasonStop, }, }, } case sseObj.AgentStatus.Type == "thinking": thinkFlag = true sseMsg = types.ChatCompletionStreamResponse{ ID: "chatcmpl-" + chatId, Object: sseObject, SystemFingerprint: fingerprint, Created: now, Model: model, Choices: []types.ChatCompletionStreamChoice{ { Index: 0, Delta: openai.ChatCompletionStreamChoiceDelta{ Role: openai.ChatMessageRoleAssistant, Content: ``, }, FinishReason: openai.FinishReasonNull, }, }, } case sseObj.AgentStatus.Type == "thinking_detail_stream": sseMsg = types.ChatCompletionStreamResponse{ ID: "chatcmpl-" + chatId, Object: sseObject, SystemFingerprint: fingerprint, Created: now, Model: model, Choices: []types.ChatCompletionStreamChoice{ { Index: 0, Delta: openai.ChatCompletionStreamChoiceDelta{ Role: openai.ChatMessageRoleAssistant, Content: sseObj.AgentStatus.Metadata.ReasoningDetail, }, FinishReason: openai.FinishReasonNull, }, }, } default: if thinkFlag { sseObj.Text = "" + sseObj.Text thinkFlag = false } sseMsg = types.ChatCompletionStreamResponse{ ID: "chatcmpl-" + chatId, Object: sseObject, SystemFingerprint: fingerprint, Created: now, Model: model, Choices: []types.ChatCompletionStreamChoice{ { Index: 0, Delta: openai.ChatCompletionStreamChoiceDelta{ Role: openai.ChatMessageRoleAssistant, Content: sseObj.Text, }, FinishReason: openai.FinishReasonNull, }, }, } } var sb strings.Builder sb.WriteString("data: ") sendLine, _ := sonic.MarshalString(sseMsg) sb.WriteString(sendLine) sb.WriteString("\n\n") // 写入缓冲区 if _, err := writer.WriteString(sb.String()); err != nil { return fmt.Errorf("write error: %w", err) } // 如果发现 finished=true,就可以结束 if sseObj.Finished { writer.WriteString(fmt.Sprintf("data: %s\n\n", sseFinish)) writer.Flush() if f, ok := w.(http.Flusher); ok { f.Flush() } return nil } sseObj.AgentStatus.Type = "" sseObj.Finished = false sseDataPool.Put(sseObj) } }