|
from openai import OpenAI |
|
import os |
|
from dotenv import load_dotenv |
|
from prompts import give_system_prompt_for_rag, system_prompt, improved_system_prompt_for_rag |
|
from chat_database import get_chat_history |
|
from embeddings import get_query_embeddings |
|
from qdrent import search_embeddings |
|
from pydantic import BaseModel |
|
import asyncio |
|
|
|
load_dotenv() |
|
|
|
provider = os.getenv("LLM_PROVIDER") |
|
|
|
if (provider == "google"): |
|
client = OpenAI( |
|
api_key=os.getenv("GEMINI_API_KEY"), |
|
base_url="https://generativelanguage.googleapis.com/v1beta/openai/" |
|
) |
|
elif (provider == "together"): |
|
client = OpenAI( |
|
api_key=os.getenv("TOGETHER_API_KEY"), |
|
base_url="" |
|
) |
|
else: |
|
client = OpenAI( |
|
api_key="" |
|
) |
|
|
|
llm_model = os.getenv("LLL_MODEL") |
|
|
|
|
|
class IntentAndRagResponseFormat(BaseModel): |
|
intent: bool |
|
rag: bool |
|
|
|
|
|
class RagResponseFormat(BaseModel): |
|
rag: bool |
|
|
|
|
|
def getResponse(text, session_id): |
|
try: |
|
res = get_chat_history(session_id) |
|
chat_history = res[0] |
|
user_prompt = res[1] |
|
chat_history.insert( |
|
0, {"role": "system", "content": system_prompt(user_prompt)}) |
|
response = client.chat.completions.create( |
|
model=llm_model, |
|
messages=chat_history, |
|
stream=True |
|
) |
|
return response |
|
except Exception as e: |
|
print("Error in getResponse : ", e) |
|
|
|
|
|
async def getResponseAsync(text, session_id): |
|
loop = asyncio.get_running_loop() |
|
response = await loop.run_in_executor(None, getResponse, text, session_id) |
|
return response |
|
|
|
|
|
def getResponseWithRAG(text, session_id): |
|
try: |
|
chat_history = get_chat_history(session_id) |
|
prompt = [] |
|
query_embeddings = get_query_embeddings(text) |
|
content = search_embeddings(session_id, query_embeddings) |
|
|
|
paragraphs = "" |
|
|
|
for index, item in enumerate(content, start=1): |
|
text = item.payload["text"] |
|
|
|
if len(text) > 100: |
|
paragraphs += f"Paragraph {index} : {text}\n\n" |
|
|
|
system_prompt = improved_system_prompt_for_rag(paragraphs) |
|
prompt.append( |
|
{"role": "system", "content": system_prompt}) |
|
prompt.append( |
|
{"role": "user", "content": f"Previous chat : {str(chat_history)}"}) |
|
prompt.append({"role": "user", "content": f"Query : {text}"}) |
|
response = client.chat.completions.create( |
|
model=llm_model, |
|
messages=prompt, |
|
stream=True |
|
) |
|
return response |
|
except Exception as e: |
|
print("Error in getResponse : ", e) |
|
|
|
|
|
async def getResponseWithRagAsync(text, session_id): |
|
loop = asyncio.get_running_loop() |
|
response = await loop.run_in_executor(None, getResponseWithRAG, text, session_id) |
|
return response |
|
|
|
|
|
def check_for_user_intent_and_rag(text, session_id): |
|
try: |
|
recent_history = get_chat_history(session_id) |
|
response = client.beta.chat.completions.parse( |
|
model=llm_model, |
|
messages=[{"role": "system", "content": intent_and_rag_system_prompt}, { |
|
"role": "user", "content": f"User Query : {text}"}, { |
|
"role": "user", "content": f"Previous chat : {str(recent_history)}"}], |
|
response_format=IntentAndRagResponseFormat, |
|
) |
|
return response.choices[0].message.parsed |
|
except Exception as e: |
|
print("Error in check_for_user_intent_and_rag : ", e) |
|
|
|
|
|
def check_for_rag(text, session_id): |
|
try: |
|
chat_history = get_chat_history(session_id) |
|
recent_history = chat_history[-10:] |
|
response = client.beta.chat.completions.parse( |
|
model=llm_model, |
|
messages=[{"role": "system", "content": system_prompt_for_checking_rag}, { |
|
"role": "user", "content": f"User Query : {text}"}, { |
|
"role": "user", "content": f"Previous chat : {str(recent_history)}"}], |
|
response_format=RagResponseFormat, |
|
) |
|
return response.choices[0].message.parsed |
|
except Exception as e: |
|
print("Error in check_for_user_intent_and_rag : ", e) |
|
|