from fastapi import APIRouter, Depends, Request from llama_index.llms import ChatMessage, MessageRole from pydantic import BaseModel from starlette.responses import StreamingResponse import logging from private_gpt.open_ai.extensions.context_filter import ContextFilter from private_gpt.open_ai.openai_models import ( OpenAICompletion, OpenAIMessage, to_openai_response, to_openai_sse_stream, ) from private_gpt.server.chat.chat_service import ChatService #from private_gpt.server.utils.auth import authenticated from private_gpt.server.utils.authentication import get_current_user chat_router = APIRouter(prefix="/v1", dependencies=[Depends(get_current_user)]) class ChatBody(BaseModel): messages: list[OpenAIMessage] use_context: bool = False context_filter: ContextFilter | None = None include_sources: bool = True stream: bool = False model_config = { "json_schema_extra": { "examples": [ { "messages": [ { "role": "system", "content": "You are a rapper. Always answer with a rap.", }, { "role": "user", "content": "How do you fry an egg?", }, ], "stream": False, "use_context": True, "include_sources": True, "context_filter": { "docs_ids": ["c202d5e6-7b69-4869-81cc-dd574ee8ee11"] }, } ] } } # @chat_router.post( # "/chat/completions", # response_model=None, # responses={200: {"model": OpenAICompletion}}, # tags=["Contextual Completions"], # ) # def chat_completion( # request: Request, body: ChatBody # ) -> OpenAICompletion | StreamingResponse: # """Given a list of messages comprising a conversation, return a response. # Optionally include an initial `role: system` message to influence the way # the LLM answers. # If `use_context` is set to `true`, the model will use context coming # from the ingested documents to create the response. The documents being used can # be filtered using the `context_filter` and passing the document IDs to be used. # Ingested documents IDs can be found using `/ingest/list` endpoint. If you want # all ingested documents to be used, remove `context_filter` altogether. # When using `'include_sources': true`, the API will return the source Chunks used # to create the response, which come from the context provided. # When using `'stream': true`, the API will return data chunks following [OpenAI's # streaming model](https://platform.openai.com/docs/api-reference/chat/streaming): # ``` # {"id":"12345","object":"completion.chunk","created":1694268190, # "model":"private-gpt","choices":[{"index":0,"delta":{"content":"Hello"}, # "finish_reason":null}]} # ``` # """ # service = request.state.injector.get(ChatService) # all_messages = [ # ChatMessage(content=m.content, role=MessageRole(m.role)) for m in body.messages # ] # if body.stream: # completion_gen = service.stream_chat( # messages=all_messages, # use_context=body.use_context, # context_filter=body.context_filter, # ) # return StreamingResponse( # to_openai_sse_stream( # completion_gen.response, # completion_gen.sources if body.include_sources else None, # ), # media_type="text/event-stream", # ) # else: # completion = service.chat( # messages=all_messages, # use_context=body.use_context, # context_filter=body.context_filter, # ) # return to_openai_response( # completion.response, completion.sources if body.include_sources else None # ) logger = logging.getLogger(__name__) @chat_router.post( "/chat/completions", response_model=None, responses={200: {"model": OpenAICompletion}}, tags=["Contextual Completions"], ) def chat_completion( request: Request, body: ChatBody ) -> OpenAICompletion | StreamingResponse: """Given a list of messages comprising a conversation, return a response.""" try: logger.info("Received chat completion request with body: %s", body.json()) service = request.state.injector.get(ChatService) all_messages = [ ChatMessage(content=m.content, role=MessageRole(m.role)) for m in body.messages ] logger.info("Constructed all_messages: %s", all_messages) if body.stream: completion_gen = service.stream_chat( messages=all_messages, use_context=body.use_context, context_filter=body.context_filter, ) logger.info("Streaming response initialized") return StreamingResponse( to_openai_sse_stream( completion_gen.response, completion_gen.sources if body.include_sources else None, ), media_type="text/event-stream", ) else: completion = service.chat( messages=all_messages, use_context=body.use_context, context_filter=body.context_filter, ) logger.info("Completed chat request: %s", completion.response) return to_openai_response( completion.response, completion.sources if body.include_sources else None ) except Exception as e: logger.error("Error processing chat completion: %s", str(e), exc_info=True) return {"error": {"message": "Internal server error"}}