from openai import OpenAI import os from dotenv import load_dotenv from prompts import give_system_prompt_for_rag, system_prompt, improved_system_prompt_for_rag from chat_database import get_chat_history from embeddings import get_query_embeddings from qdrent import search_embeddings from pydantic import BaseModel import asyncio load_dotenv() provider = os.getenv("LLM_PROVIDER") if (provider == "google"): client = OpenAI( api_key=os.getenv("GEMINI_API_KEY"), base_url="https://generativelanguage.googleapis.com/v1beta/openai/" ) elif (provider == "together"): client = OpenAI( api_key=os.getenv("TOGETHER_API_KEY"), base_url="" ) else: client = OpenAI( api_key="" ) llm_model = os.getenv("LLL_MODEL") class IntentAndRagResponseFormat(BaseModel): intent: bool rag: bool class RagResponseFormat(BaseModel): rag: bool def getResponse(text, session_id): try: res = get_chat_history(session_id) chat_history = res[0] user_prompt = res[1] chat_history.insert( 0, {"role": "system", "content": system_prompt(user_prompt)}) response = client.chat.completions.create( model=llm_model, messages=chat_history, stream=True ) return response except Exception as e: print("Error in getResponse : ", e) async def getResponseAsync(text, session_id): loop = asyncio.get_running_loop() response = await loop.run_in_executor(None, getResponse, text, session_id) return response def getResponseWithRAG(text, session_id): try: chat_history = get_chat_history(session_id) prompt = [] query_embeddings = get_query_embeddings(text) content = search_embeddings(session_id, query_embeddings) paragraphs = "" for index, item in enumerate(content, start=1): text = item.payload["text"] if len(text) > 100: paragraphs += f"Paragraph {index} : {text}\n\n" system_prompt = improved_system_prompt_for_rag(paragraphs) prompt.append( {"role": "system", "content": system_prompt}) prompt.append( {"role": "user", "content": f"Previous chat : {str(chat_history)}"}) prompt.append({"role": "user", "content": f"Query : {text}"}) response = client.chat.completions.create( model=llm_model, messages=prompt, stream=True ) return response except Exception as e: print("Error in getResponse : ", e) async def getResponseWithRagAsync(text, session_id): loop = asyncio.get_running_loop() response = await loop.run_in_executor(None, getResponseWithRAG, text, session_id) return response def check_for_user_intent_and_rag(text, session_id): try: recent_history = get_chat_history(session_id) response = client.beta.chat.completions.parse( model=llm_model, messages=[{"role": "system", "content": intent_and_rag_system_prompt}, { "role": "user", "content": f"User Query : {text}"}, { "role": "user", "content": f"Previous chat : {str(recent_history)}"}], response_format=IntentAndRagResponseFormat, ) return response.choices[0].message.parsed except Exception as e: print("Error in check_for_user_intent_and_rag : ", e) def check_for_rag(text, session_id): try: chat_history = get_chat_history(session_id) recent_history = chat_history[-10:] response = client.beta.chat.completions.parse( model=llm_model, messages=[{"role": "system", "content": system_prompt_for_checking_rag}, { "role": "user", "content": f"User Query : {text}"}, { "role": "user", "content": f"Previous chat : {str(recent_history)}"}], response_format=RagResponseFormat, ) return response.choices[0].message.parsed except Exception as e: print("Error in check_for_user_intent_and_rag : ", e)