Project / core /api /chatai.py
puzan789's picture
updated
ad87194
raw
history blame
24.2 kB
import json
import os
import string
import tempfile
from urllib.parse import urlparse
import requests
from fastapi import UploadFile, File, Form, HTTPException
from fastapi.requests import Request
from fastapi.routing import APIRouter
from supabase import create_client
from core import logging as logger
from core.api.user_management_api import user_management
from core.api.user_management_api import user_management as user_management_pipeline
from core.models.apis_models import *
from core.pipeline.chataipipeline import ChatAIPipeline
from core.services.supabase.user_management.token_limit import token_limit_check
from core.utils.error_handling import create_error_response, create_success_response, raise_http_exception
from core.utils.utils import get_ip_info, encode_to_base64, clean_text, decode_base64
from core.services.supabase.limit.limit_check import LimitChecker
from PyPDF2 import PdfReader
from dotenv import load_dotenv
load_dotenv()
import io
chatai_api_router = APIRouter(tags=["ChatAI"])
supabase_client = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY"))
supabase_client_ = supabase_client
ChatAI_pipeline = ChatAIPipeline()
url_limit,pdf_limit,ocr_limit=LimitChecker(supabase_client)
@chatai_api_router.post("/add_text")
async def add_text(request: AddTextRequest):
logger.info(f">>>AddText API Triggered By {request.vectorstore}<<<")
try:
vectorstore, text = request.vectorstore, request.text
username, chat_bot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
cleaned_text = " ".join(text.split())
num_token = len(cleaned_text)
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chat_bot_name,
len_text=num_token)
text = clean_text(text)
if lim:
dct = {
"output": {"text": text},
"source": "Text",
}
cleaned_text = " ".join(text.split()) # handles unnencessary spaces
# Count characters
num_token = len(cleaned_text)
logger.info(f"Number of token {num_token}")
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name="text", username=username)
supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json")
supa = supabase_client.table("ChatAI_ChatbotDataSources").insert(
{"username": username, "chatbotName": chat_bot_name, "dataSourceName": file_name,
"numTokens": num_token, "sourceEndpoint": "/add_text",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")}).execute()
response = create_success_response(200, {"message": "Successfully added the text."})
logger.info(f">>>Text added successfully for {request.vectorstore}.<<<")
return response
else:
response = create_error_response(400,
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.")
return response
except Exception as e:
logger.error(f">>>Error in add_text: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/answer_query")
async def answer_query(request: AnswerQueryRequest, req: Request):
logger.info(f">>>answer_query API Triggered By {request.vectorstore}<<<")
try:
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
ip_address = req.client.host
city = get_ip_info(ip_address)
output, followup_questions, source = ChatAI_pipeline.answer_query_(query=request.query,
vectorstore=request.vectorstore,
llm_model=request.llm_model)
supa = supabase_client.table("ChatAI_ChatHistory").insert(
{"username": username, "chatbotName": chatbot_name, "llmModel": request.llm_model,
"question": request.query, "response": output, "IpAddress": ip_address, "ResponseTokenCount": len(output),
"vectorstore": request.vectorstore, "City": city}).execute()
response = create_success_response(200, data={"output": output, "follow_up_questions": followup_questions,
"source": source})
logger.info(f">>>Query answered successfully for {request.vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in answer_query: {e} for {request.vectorstore}.<<<")
raise e
@chatai_api_router.post("/get_links")
async def get_links(request: GetLinksRequest):
logger.info(f">>>get_links API Triggered By {request.url}<<<")
try:
response = ChatAI_pipeline.get_links_(url=request.url, timeout=30)
response = create_success_response(200, {"urls": response, "source": urlparse(request.url).netloc})
logger.info(f">>>Links fetched successfully for {request.url}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in get_links: {e} for {request.url}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/image_pdf_text_extraction")
async def image_pdf_text_extraction(vectorstore: str = Form(...)
, pdf: UploadFile = File(...)):
logger.info(f">>>image_pdf_text_extraction API Triggered By {pdf.filename}<<<")
try:
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
pdf_bytes = await pdf.read()
source = pdf.filename
pdf_reader = PdfReader(io.BytesIO(pdf_bytes))
doc_len = len(pdf_reader.pages)
if doc_len<ocr_limit:
response = ChatAI_pipeline.image_pdf_text_extraction_(image_pdf=pdf_bytes)
num_tokens = 0
try:
num_tokens = len(" ".join([response[x] for x in response]))
except (KeyError, TypeError, AttributeError):
pass
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
len_text=num_tokens)
logger.info(f"this is the {lim}")
if lim:
dct = {
"output": response,
"source": source
}
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username)
num_tokens = 0
try:
valid_responses = [response[x] for x in response if response[x] is not None]
num_tokens = len(" ".join(valid_responses))
except Exception as e:
num_tokens = 0
response = supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json")
supa = supabase_client.table("ChatAI_ChatbotDataSources").insert(
{"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": num_tokens,
"sourceEndpoint": "/image_pdf_text_extraction",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")}).execute()
response = create_success_response(200,
{"source": pdf.filename, "message": "Successfully extracted the text."})
logger.info(f">>>Text extracted successfully for {pdf.filename}.<<<")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a PDF having less than 20 pages for pdf .")
return response
except Exception as e:
raise e
@chatai_api_router.post("/text_pdf_extraction")
async def text_pdf_extraction(vectorstore: str = Form(...)
, pdf: UploadFile = File(...)):
logger.info(f">>>text_pdf_extraction API Triggered By {pdf.filename}<<<")
try:
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
content = await pdf.read()
pdf_reader = PdfReader(io.BytesIO(content))
doc_len = len(pdf_reader.pages)
if doc_len < pdf_limit :
source = pdf.filename
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
temp_file.write(content)
temp_file_path = temp_file.name
response = ChatAI_pipeline.text_pdf_extraction_(pdf=temp_file_path)
numTokens = len(" ".join([response[x] for x in response]))
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
len_text=numTokens)
os.remove(temp_file_path)
if lim:
dct = {
"output": response,
"source": source
}
numTokens = len(" ".join([response[x] for x in response]))
logger.info(f"Num of tokens {numTokens} text_pdf_extraction")
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username)
response = supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json")
response = (
supabase_client.table("ChatAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": numTokens,
"sourceEndpoint": "/text_pdf_extraction",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"source": source, "message": "Successfully extracted the text."})
logger.info(f">>>Text extracted successfully for {source}.<<<")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a pdf having pages less than 200.")
return response
except Exception as e:
logger.error(f">>>Error in text_pdf_extraction: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/website_url_text_extraction")
async def add_website(request: AddWebsiteRequest):
vectorstore, website_urls, source = request.vectorstore, request.website_urls, request.source
logger.info(f">>>website_url_text_extraction API Triggered By {request.website_urls}<<<")
try:
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
total_requested_urls=len(website_urls)
if total_requested_urls < url_limit :
text = ChatAI_pipeline.website_url_text_extraction_list_(urls=website_urls)
num_token = len(" ".join([text[x] for x in text]))
logger.info(f">>>website_url_text_extraction len{num_token}<<<")
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
len_text=num_token)
if not lim:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.")
return response
else:
dct = {
"output": text,
"source": source
}
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=urlparse(source).netloc,
username=username)
supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json")
(
supabase_client.table("ChatAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": num_token,
"sourceEndpoint": "/fetch_text/urls",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"message": "Successfully fetched the website text."})
logger.info(f">>>Website text extracted successfully for {request.website_urls}.<<<")
return response
else:
response = create_error_response(402,
"Please select urls less than 50")
return response
except Exception as e:
logger.error(f">>>Error in website_url_text_extraction: {e} for {request.website_urls}.<<<")
raise HTTPException(status_code=500, detail="Internal Server Error")
@chatai_api_router.get("/get_current_count")
async def get_count(vectorstore: str):
logger.info(f">>>get_current_count API Triggered By {vectorstore}<<<")
try:
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
current_count = user_management_pipeline.get_current_count_(username)
response = create_success_response(200, {"current_count": current_count})
logger.info(f">>>Current count fetched successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in get_current_count: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/list_chatbots")
async def list_chatbots(request: ListChatbotsRequest):
logger.info(f">>>list_chatbots API Triggered By {request.username}<<<")
try:
chatbots = user_management.list_tables(username=request.username)
response = create_success_response(200, {"chatbots": chatbots})
logger.info(f">>>Chatbots listed successfully for {request.username}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in list_chatbots: {e} for {request.username}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/get_chat_history")
async def chat_history(request: GetChatHistoryRequest):
logger.info(f">>>get_chat_history API Triggered By {request.vectorstore}<<<")
try:
_, username, chatbotName = request.vectorstore.split("$", 2)
history = supabase_client.table("ChatAI_ChatHistory").select(
"timestamp", "question", "response"
).eq("username", username).eq("chatbotName", chatbotName).execute().data
response = create_success_response(200, {"history": history})
logger.info(f">>>Chat history fetched successfully for {request.vectorstore}.<<<")
return response
except IndexError:
logger.warning(f"Chat history not found for {request.vectorstore}")
return create_error_response(404, "Chat history not found for the given chatbot.")
except Exception as e:
logger.error(f">>>Error in get_chat_history: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/delete_chatbot")
async def delete_chatbot(request: DeleteChatbotRequest):
logger.info(f">>>delete_chatbot API Triggered By {request.vectorstore}<<<")
try:
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
supabase_client.table('ChatAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname',
chatbot_name).execute()
all_sources = supabase_client.table("ChatAI_ChatbotDataSources").select("*").eq("username", username).eq(
"chatbotName", chatbot_name).execute().data
all_sources = [x["sourceContentURL"].split("/")[-1] for x in all_sources]
supabase_client.table("ChatAI_ChatbotDataSources").delete().eq("username", username).eq("chatbotName",
chatbot_name).execute()
for source in all_sources:
supabase_client.table("ChatAI_Chatbot")
supabase_client.storage.from_("ChatAI").remove(source)
user_management.delete_table(table_name=chatbot_name)
user_management.delete_qdrant_cluster(vectorstorename=request.vectorstore)
response = create_success_response(200, {"message": "Chatbot deleted successfully"})
logger.info(f">>>Chatbot deleted successfully for {request.vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in delete_chatbot: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.get("/list_chatbot_sources")
async def list_chatbot_sources(vectorstore: str):
try:
logger.info(f">>>list_chatbot_sources API Triggered By {vectorstore}<<<")
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
result = supabase_client.table("ChatAI_ChatbotDataSources").select("*").eq("username", username).eq(
"chatbotName",
chatbot_name).execute().data
response = create_success_response(200, {"output": result})
logger.info(f">>>Chatbot listed successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in list_chatbot_sources: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.get("/get_data_source")
async def get_data_source(vectorstore: str, source_url: str):
try:
logger.info(f">>>get_data_source API Triggered By {vectorstore}<<<")
r = requests.get(source_url)
res = encode_to_base64(eval(r.content.decode("utf-8", errors="replace")))
response = create_success_response(200, {"output": res})
return response
except Exception as e:
logger.error(f">>>Error in get_data_source: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/delete_chatbot_source")
async def delete_chatbot_source(request: DeleteChatbotSourceRequest):
vectorstore, data_source_name = request.vectorstore, request.data_source_name
try:
response = supabase_client.table("ChatAI_ChatbotDataSources").delete().eq("dataSourceName",
data_source_name).execute()
response = supabase_client.storage.from_('ChatAI').remove(f"{data_source_name}_data.json")
response = create_success_response(200, {"output": f"Successfully deleted the {data_source_name} data source."})
logger.info(f">>>Data source deleted successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in delete_chatbot_source: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@chatai_api_router.post("/train_chatbot")
async def train_chatbot(request: TrainChatbotRequest):
vectorstore, url_sources = request.vectorstore, request.urls
logger.info(f">>>train_chatbot API Triggered By {vectorstore}<<<")
try:
texts = []
sources = []
fileTypes = [
supabase_client.table("ChatAI_ChatbotDataSources").select("sourceEndpoint").eq("sourceContentURL",
x).execute().data[0][
"sourceEndpoint"] for x in url_sources]
for source, fileType in zip(url_sources, fileTypes):
if ((fileType == "/text_pdf_extraction") | (fileType == "/image_pdf_text_extraction")):
logger.info(f"Source is {source}")
r = requests.get(source)
file = eval(r.content.decode("utf-8", errors="replace"))
content = file["output"]
fileSource = file["source"]
texts.append(".".join(
[content[key] for key in content.keys()]).replace(
"\n", " "))
sources.append(fileSource)
elif fileType == "/add_text" or fileType == "/add_qa_pair":
r = requests.get(source)
file = eval(r.content.decode("utf-8", errors="replace"))
content = file["output"]["text"]
fileSource = file["source"]
texts.append(content.replace("\n", " "))
sources.append(fileSource)
elif ((fileType == "/fetch_text/urls") | (fileType == "/youtube_transcript")):
r = requests.get(source)
file = eval(r.content.decode("utf-8", errors="replace"))
content = file["output"]
fileSource = file["source"]
texts.append(".".join(
[content[key] for key in content.keys()]).replace(
"\n", " "))
sources.append(fileSource)
else:
pass
texts = [(text, source) for text, source in zip(texts, sources)]
ChatAI_pipeline.add_document_(texts, vectorstore)
response = create_success_response(200, {"message": "Chatbot trained successfully."})
logger.info(f">>>Chatbot trained successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in train_chatbot: {e} for {vectorstore}.<<<")
raise e
@chatai_api_router.post("/new_chatbot")
async def new_chatbot(request: NewChatbotRequest):
logger.info(f">>> new_chatbot API Triggered <<<")
try:
response = user_management.new_chatbot_(chatbot_name=request.chatbot_name, username=request.username)
logger.info(f">>> Chatbot created successfully for {request.username}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in new_chatbot: {e} for {request.username}.<<<")
raise_http_exception(500, "Internal Server Error")