Ibraaheem's picture
Update private_gpt/server/chat/chat_router.py
e489a89 verified
from fastapi import APIRouter, Depends, Request
from llama_index.llms import ChatMessage, MessageRole
from pydantic import BaseModel
from starlette.responses import StreamingResponse
import logging
from private_gpt.open_ai.extensions.context_filter import ContextFilter
from private_gpt.open_ai.openai_models import (
OpenAICompletion,
OpenAIMessage,
to_openai_response,
to_openai_sse_stream,
)
from private_gpt.server.chat.chat_service import ChatService
#from private_gpt.server.utils.auth import authenticated
from private_gpt.server.utils.authentication import get_current_user
chat_router = APIRouter(prefix="/v1", dependencies=[Depends(get_current_user)])
class ChatBody(BaseModel):
messages: list[OpenAIMessage]
use_context: bool = False
context_filter: ContextFilter | None = None
include_sources: bool = True
stream: bool = False
model_config = {
"json_schema_extra": {
"examples": [
{
"messages": [
{
"role": "system",
"content": "You are a rapper. Always answer with a rap.",
},
{
"role": "user",
"content": "How do you fry an egg?",
},
],
"stream": False,
"use_context": True,
"include_sources": True,
"context_filter": {
"docs_ids": ["c202d5e6-7b69-4869-81cc-dd574ee8ee11"]
},
}
]
}
}
# @chat_router.post(
# "/chat/completions",
# response_model=None,
# responses={200: {"model": OpenAICompletion}},
# tags=["Contextual Completions"],
# )
# def chat_completion(
# request: Request, body: ChatBody
# ) -> OpenAICompletion | StreamingResponse:
# """Given a list of messages comprising a conversation, return a response.
# Optionally include an initial `role: system` message to influence the way
# the LLM answers.
# If `use_context` is set to `true`, the model will use context coming
# from the ingested documents to create the response. The documents being used can
# be filtered using the `context_filter` and passing the document IDs to be used.
# Ingested documents IDs can be found using `/ingest/list` endpoint. If you want
# all ingested documents to be used, remove `context_filter` altogether.
# When using `'include_sources': true`, the API will return the source Chunks used
# to create the response, which come from the context provided.
# When using `'stream': true`, the API will return data chunks following [OpenAI's
# streaming model](https://platform.openai.com/docs/api-reference/chat/streaming):
# ```
# {"id":"12345","object":"completion.chunk","created":1694268190,
# "model":"private-gpt","choices":[{"index":0,"delta":{"content":"Hello"},
# "finish_reason":null}]}
# ```
# """
# service = request.state.injector.get(ChatService)
# all_messages = [
# ChatMessage(content=m.content, role=MessageRole(m.role)) for m in body.messages
# ]
# if body.stream:
# completion_gen = service.stream_chat(
# messages=all_messages,
# use_context=body.use_context,
# context_filter=body.context_filter,
# )
# return StreamingResponse(
# to_openai_sse_stream(
# completion_gen.response,
# completion_gen.sources if body.include_sources else None,
# ),
# media_type="text/event-stream",
# )
# else:
# completion = service.chat(
# messages=all_messages,
# use_context=body.use_context,
# context_filter=body.context_filter,
# )
# return to_openai_response(
# completion.response, completion.sources if body.include_sources else None
# )
logger = logging.getLogger(__name__)
@chat_router.post(
"/chat/completions",
response_model=None,
responses={200: {"model": OpenAICompletion}},
tags=["Contextual Completions"],
)
def chat_completion(
request: Request, body: ChatBody
) -> OpenAICompletion | StreamingResponse:
"""Given a list of messages comprising a conversation, return a response."""
try:
logger.info("Received chat completion request with body: %s", body.json())
service = request.state.injector.get(ChatService)
all_messages = [
ChatMessage(content=m.content, role=MessageRole(m.role)) for m in body.messages
]
logger.info("Constructed all_messages: %s", all_messages)
if body.stream:
completion_gen = service.stream_chat(
messages=all_messages,
use_context=body.use_context,
context_filter=body.context_filter,
)
logger.info("Streaming response initialized")
return StreamingResponse(
to_openai_sse_stream(
completion_gen.response,
completion_gen.sources if body.include_sources else None,
),
media_type="text/event-stream",
)
else:
completion = service.chat(
messages=all_messages,
use_context=body.use_context,
context_filter=body.context_filter,
)
logger.info("Completed chat request: %s", completion.response)
return to_openai_response(
completion.response, completion.sources if body.include_sources else None
)
except Exception as e:
logger.error("Error processing chat completion: %s", str(e), exc_info=True)
return {"error": {"message": "Internal server error"}}