|
import json |
|
import os |
|
import string |
|
import tempfile |
|
from urllib.parse import urlparse |
|
import requests |
|
from fastapi import UploadFile, File, Form, HTTPException |
|
from fastapi.requests import Request |
|
from fastapi.routing import APIRouter |
|
from supabase import create_client |
|
from core import logging as logger |
|
from core.api.user_management_api import user_management |
|
from core.api.user_management_api import user_management as user_management_pipeline |
|
from core.models.apis_models import * |
|
from core.pipeline.chataipipeline import ChatAIPipeline |
|
from core.services.supabase.user_management.token_limit import token_limit_check |
|
from core.utils.error_handling import create_error_response, create_success_response, raise_http_exception |
|
from core.utils.utils import get_ip_info, encode_to_base64, clean_text, decode_base64 |
|
from core.services.supabase.limit.limit_check import LimitChecker |
|
from PyPDF2 import PdfReader |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
import io |
|
|
|
chatai_api_router = APIRouter(tags=["ChatAI"]) |
|
supabase_client = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY")) |
|
supabase_client_ = supabase_client |
|
ChatAI_pipeline = ChatAIPipeline() |
|
url_limit,pdf_limit,ocr_limit=LimitChecker(supabase_client) |
|
|
|
@chatai_api_router.post("/add_text") |
|
async def add_text(request: AddTextRequest): |
|
logger.info(f">>>AddText API Triggered By {request.vectorstore}<<<") |
|
try: |
|
vectorstore, text = request.vectorstore, request.text |
|
username, chat_bot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2] |
|
cleaned_text = " ".join(text.split()) |
|
num_token = len(cleaned_text) |
|
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chat_bot_name, |
|
len_text=num_token) |
|
text = clean_text(text) |
|
if lim: |
|
dct = { |
|
"output": {"text": text}, |
|
"source": "Text", |
|
} |
|
cleaned_text = " ".join(text.split()) |
|
|
|
num_token = len(cleaned_text) |
|
logger.info(f"Number of token {num_token}") |
|
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") |
|
file_name = user_management_pipeline.create_data_source_name(source_name="text", username=username) |
|
supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") |
|
|
|
supa = supabase_client.table("ChatAI_ChatbotDataSources").insert( |
|
{"username": username, "chatbotName": chat_bot_name, "dataSourceName": file_name, |
|
"numTokens": num_token, "sourceEndpoint": "/add_text", |
|
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], |
|
f"{file_name}_data.json")}).execute() |
|
|
|
response = create_success_response(200, {"message": "Successfully added the text."}) |
|
logger.info(f">>>Text added successfully for {request.vectorstore}.<<<") |
|
|
|
return response |
|
else: |
|
response = create_error_response(400, |
|
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in add_text: {e} for {request.vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/answer_query") |
|
async def answer_query(request: AnswerQueryRequest, req: Request): |
|
logger.info(f">>>answer_query API Triggered By {request.vectorstore}<<<") |
|
try: |
|
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2] |
|
ip_address = req.client.host |
|
city = get_ip_info(ip_address) |
|
output, followup_questions, source = ChatAI_pipeline.answer_query_(query=request.query, |
|
vectorstore=request.vectorstore, |
|
llm_model=request.llm_model) |
|
supa = supabase_client.table("ChatAI_ChatHistory").insert( |
|
{"username": username, "chatbotName": chatbot_name, "llmModel": request.llm_model, |
|
"question": request.query, "response": output, "IpAddress": ip_address, "ResponseTokenCount": len(output), |
|
"vectorstore": request.vectorstore, "City": city}).execute() |
|
|
|
response = create_success_response(200, data={"output": output, "follow_up_questions": followup_questions, |
|
"source": source}) |
|
logger.info(f">>>Query answered successfully for {request.vectorstore}.<<<") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in answer_query: {e} for {request.vectorstore}.<<<") |
|
raise e |
|
|
|
|
|
@chatai_api_router.post("/get_links") |
|
async def get_links(request: GetLinksRequest): |
|
logger.info(f">>>get_links API Triggered By {request.url}<<<") |
|
try: |
|
response = ChatAI_pipeline.get_links_(url=request.url, timeout=30) |
|
response = create_success_response(200, {"urls": response, "source": urlparse(request.url).netloc}) |
|
logger.info(f">>>Links fetched successfully for {request.url}.<<<") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in get_links: {e} for {request.url}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/image_pdf_text_extraction") |
|
async def image_pdf_text_extraction(vectorstore: str = Form(...) |
|
, pdf: UploadFile = File(...)): |
|
logger.info(f">>>image_pdf_text_extraction API Triggered By {pdf.filename}<<<") |
|
try: |
|
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] |
|
pdf_bytes = await pdf.read() |
|
source = pdf.filename |
|
pdf_reader = PdfReader(io.BytesIO(pdf_bytes)) |
|
doc_len = len(pdf_reader.pages) |
|
if doc_len<ocr_limit: |
|
response = ChatAI_pipeline.image_pdf_text_extraction_(image_pdf=pdf_bytes) |
|
|
|
num_tokens = 0 |
|
try: |
|
num_tokens = len(" ".join([response[x] for x in response])) |
|
except (KeyError, TypeError, AttributeError): |
|
pass |
|
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name, |
|
len_text=num_tokens) |
|
logger.info(f"this is the {lim}") |
|
if lim: |
|
dct = { |
|
"output": response, |
|
"source": source |
|
} |
|
|
|
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") |
|
file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username) |
|
num_tokens = 0 |
|
try: |
|
valid_responses = [response[x] for x in response if response[x] is not None] |
|
num_tokens = len(" ".join(valid_responses)) |
|
except Exception as e: |
|
num_tokens = 0 |
|
|
|
response = supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") |
|
supa = supabase_client.table("ChatAI_ChatbotDataSources").insert( |
|
{"username": username, |
|
"chatbotName": chatbot_name, |
|
"dataSourceName": file_name, |
|
"numTokens": num_tokens, |
|
"sourceEndpoint": "/image_pdf_text_extraction", |
|
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], |
|
f"{file_name}_data.json")}).execute() |
|
|
|
response = create_success_response(200, |
|
{"source": pdf.filename, "message": "Successfully extracted the text."}) |
|
logger.info(f">>>Text extracted successfully for {pdf.filename}.<<<") |
|
return response |
|
else: |
|
response = create_error_response(402, |
|
"Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.") |
|
return response |
|
else: |
|
response = create_error_response(402, |
|
"Exceeding limits, please try with a PDF having less than 20 pages for pdf .") |
|
return response |
|
except Exception as e: |
|
raise e |
|
|
|
|
|
@chatai_api_router.post("/text_pdf_extraction") |
|
async def text_pdf_extraction(vectorstore: str = Form(...) |
|
, pdf: UploadFile = File(...)): |
|
logger.info(f">>>text_pdf_extraction API Triggered By {pdf.filename}<<<") |
|
try: |
|
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] |
|
content = await pdf.read() |
|
pdf_reader = PdfReader(io.BytesIO(content)) |
|
doc_len = len(pdf_reader.pages) |
|
|
|
if doc_len < pdf_limit : |
|
source = pdf.filename |
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: |
|
temp_file.write(content) |
|
temp_file_path = temp_file.name |
|
|
|
response = ChatAI_pipeline.text_pdf_extraction_(pdf=temp_file_path) |
|
numTokens = len(" ".join([response[x] for x in response])) |
|
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name, |
|
len_text=numTokens) |
|
os.remove(temp_file_path) |
|
if lim: |
|
dct = { |
|
"output": response, |
|
"source": source |
|
} |
|
numTokens = len(" ".join([response[x] for x in response])) |
|
logger.info(f"Num of tokens {numTokens} text_pdf_extraction") |
|
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") |
|
file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username) |
|
response = supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") |
|
response = ( |
|
supabase_client.table("ChatAI_ChatbotDataSources") |
|
.insert({"username": username, |
|
"chatbotName": chatbot_name, |
|
"dataSourceName": file_name, |
|
"numTokens": numTokens, |
|
"sourceEndpoint": "/text_pdf_extraction", |
|
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], |
|
f"{file_name}_data.json")}) |
|
.execute() |
|
) |
|
response = create_success_response(200, {"source": source, "message": "Successfully extracted the text."}) |
|
logger.info(f">>>Text extracted successfully for {source}.<<<") |
|
return response |
|
else: |
|
response = create_error_response(402, |
|
"Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.") |
|
return response |
|
else: |
|
response = create_error_response(402, |
|
"Exceeding limits, please try with a pdf having pages less than 200.") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in text_pdf_extraction: {e} for {vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
|
|
|
|
@chatai_api_router.post("/website_url_text_extraction") |
|
async def add_website(request: AddWebsiteRequest): |
|
vectorstore, website_urls, source = request.vectorstore, request.website_urls, request.source |
|
|
|
logger.info(f">>>website_url_text_extraction API Triggered By {request.website_urls}<<<") |
|
try: |
|
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] |
|
total_requested_urls=len(website_urls) |
|
|
|
if total_requested_urls < url_limit : |
|
text = ChatAI_pipeline.website_url_text_extraction_list_(urls=website_urls) |
|
num_token = len(" ".join([text[x] for x in text])) |
|
|
|
logger.info(f">>>website_url_text_extraction len{num_token}<<<") |
|
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name, |
|
len_text=num_token) |
|
if not lim: |
|
|
|
response = create_error_response(402, |
|
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.") |
|
return response |
|
else: |
|
dct = { |
|
"output": text, |
|
"source": source |
|
} |
|
|
|
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") |
|
file_name = user_management_pipeline.create_data_source_name(source_name=urlparse(source).netloc, |
|
username=username) |
|
supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") |
|
( |
|
supabase_client.table("ChatAI_ChatbotDataSources") |
|
.insert({"username": username, |
|
"chatbotName": chatbot_name, |
|
"dataSourceName": file_name, |
|
"numTokens": num_token, |
|
"sourceEndpoint": "/fetch_text/urls", |
|
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], |
|
f"{file_name}_data.json")}) |
|
.execute() |
|
) |
|
response = create_success_response(200, {"message": "Successfully fetched the website text."}) |
|
logger.info(f">>>Website text extracted successfully for {request.website_urls}.<<<") |
|
return response |
|
else: |
|
response = create_error_response(402, |
|
"Please select urls less than 50") |
|
return response |
|
except Exception as e: |
|
logger.error(f">>>Error in website_url_text_extraction: {e} for {request.website_urls}.<<<") |
|
raise HTTPException(status_code=500, detail="Internal Server Error") |
|
|
|
|
|
@chatai_api_router.get("/get_current_count") |
|
async def get_count(vectorstore: str): |
|
logger.info(f">>>get_current_count API Triggered By {vectorstore}<<<") |
|
try: |
|
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] |
|
current_count = user_management_pipeline.get_current_count_(username) |
|
|
|
response = create_success_response(200, {"current_count": current_count}) |
|
logger.info(f">>>Current count fetched successfully for {vectorstore}.<<<") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in get_current_count: {e} for {vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/list_chatbots") |
|
async def list_chatbots(request: ListChatbotsRequest): |
|
logger.info(f">>>list_chatbots API Triggered By {request.username}<<<") |
|
try: |
|
chatbots = user_management.list_tables(username=request.username) |
|
response = create_success_response(200, {"chatbots": chatbots}) |
|
logger.info(f">>>Chatbots listed successfully for {request.username}.<<<") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in list_chatbots: {e} for {request.username}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/get_chat_history") |
|
async def chat_history(request: GetChatHistoryRequest): |
|
logger.info(f">>>get_chat_history API Triggered By {request.vectorstore}<<<") |
|
try: |
|
_, username, chatbotName = request.vectorstore.split("$", 2) |
|
|
|
history = supabase_client.table("ChatAI_ChatHistory").select( |
|
"timestamp", "question", "response" |
|
).eq("username", username).eq("chatbotName", chatbotName).execute().data |
|
|
|
response = create_success_response(200, {"history": history}) |
|
logger.info(f">>>Chat history fetched successfully for {request.vectorstore}.<<<") |
|
return response |
|
|
|
|
|
except IndexError: |
|
logger.warning(f"Chat history not found for {request.vectorstore}") |
|
return create_error_response(404, "Chat history not found for the given chatbot.") |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in get_chat_history: {e} for {request.vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/delete_chatbot") |
|
async def delete_chatbot(request: DeleteChatbotRequest): |
|
logger.info(f">>>delete_chatbot API Triggered By {request.vectorstore}<<<") |
|
try: |
|
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2] |
|
supabase_client.table('ChatAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname', |
|
chatbot_name).execute() |
|
all_sources = supabase_client.table("ChatAI_ChatbotDataSources").select("*").eq("username", username).eq( |
|
"chatbotName", chatbot_name).execute().data |
|
all_sources = [x["sourceContentURL"].split("/")[-1] for x in all_sources] |
|
supabase_client.table("ChatAI_ChatbotDataSources").delete().eq("username", username).eq("chatbotName", |
|
chatbot_name).execute() |
|
for source in all_sources: |
|
supabase_client.table("ChatAI_Chatbot") |
|
supabase_client.storage.from_("ChatAI").remove(source) |
|
user_management.delete_table(table_name=chatbot_name) |
|
user_management.delete_qdrant_cluster(vectorstorename=request.vectorstore) |
|
response = create_success_response(200, {"message": "Chatbot deleted successfully"}) |
|
logger.info(f">>>Chatbot deleted successfully for {request.vectorstore}.<<<") |
|
return response |
|
except Exception as e: |
|
logger.error(f">>>Error in delete_chatbot: {e} for {request.vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
|
|
|
|
|
|
|
|
@chatai_api_router.get("/list_chatbot_sources") |
|
async def list_chatbot_sources(vectorstore: str): |
|
try: |
|
logger.info(f">>>list_chatbot_sources API Triggered By {vectorstore}<<<") |
|
|
|
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] |
|
result = supabase_client.table("ChatAI_ChatbotDataSources").select("*").eq("username", username).eq( |
|
"chatbotName", |
|
chatbot_name).execute().data |
|
|
|
response = create_success_response(200, {"output": result}) |
|
logger.info(f">>>Chatbot listed successfully for {vectorstore}.<<<") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in list_chatbot_sources: {e} for {vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.get("/get_data_source") |
|
async def get_data_source(vectorstore: str, source_url: str): |
|
try: |
|
logger.info(f">>>get_data_source API Triggered By {vectorstore}<<<") |
|
|
|
r = requests.get(source_url) |
|
res = encode_to_base64(eval(r.content.decode("utf-8", errors="replace"))) |
|
|
|
response = create_success_response(200, {"output": res}) |
|
|
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in get_data_source: {e} for {vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/delete_chatbot_source") |
|
async def delete_chatbot_source(request: DeleteChatbotSourceRequest): |
|
vectorstore, data_source_name = request.vectorstore, request.data_source_name |
|
try: |
|
|
|
response = supabase_client.table("ChatAI_ChatbotDataSources").delete().eq("dataSourceName", |
|
data_source_name).execute() |
|
response = supabase_client.storage.from_('ChatAI').remove(f"{data_source_name}_data.json") |
|
|
|
response = create_success_response(200, {"output": f"Successfully deleted the {data_source_name} data source."}) |
|
|
|
logger.info(f">>>Data source deleted successfully for {vectorstore}.<<<") |
|
return response |
|
|
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in delete_chatbot_source: {e} for {vectorstore}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|
|
|
|
@chatai_api_router.post("/train_chatbot") |
|
async def train_chatbot(request: TrainChatbotRequest): |
|
vectorstore, url_sources = request.vectorstore, request.urls |
|
logger.info(f">>>train_chatbot API Triggered By {vectorstore}<<<") |
|
try: |
|
texts = [] |
|
sources = [] |
|
fileTypes = [ |
|
supabase_client.table("ChatAI_ChatbotDataSources").select("sourceEndpoint").eq("sourceContentURL", |
|
x).execute().data[0][ |
|
"sourceEndpoint"] for x in url_sources] |
|
for source, fileType in zip(url_sources, fileTypes): |
|
if ((fileType == "/text_pdf_extraction") | (fileType == "/image_pdf_text_extraction")): |
|
logger.info(f"Source is {source}") |
|
r = requests.get(source) |
|
file = eval(r.content.decode("utf-8", errors="replace")) |
|
content = file["output"] |
|
fileSource = file["source"] |
|
texts.append(".".join( |
|
[content[key] for key in content.keys()]).replace( |
|
"\n", " ")) |
|
|
|
sources.append(fileSource) |
|
elif fileType == "/add_text" or fileType == "/add_qa_pair": |
|
r = requests.get(source) |
|
file = eval(r.content.decode("utf-8", errors="replace")) |
|
content = file["output"]["text"] |
|
fileSource = file["source"] |
|
texts.append(content.replace("\n", " ")) |
|
sources.append(fileSource) |
|
elif ((fileType == "/fetch_text/urls") | (fileType == "/youtube_transcript")): |
|
r = requests.get(source) |
|
file = eval(r.content.decode("utf-8", errors="replace")) |
|
content = file["output"] |
|
fileSource = file["source"] |
|
texts.append(".".join( |
|
[content[key] for key in content.keys()]).replace( |
|
"\n", " ")) |
|
sources.append(fileSource) |
|
else: |
|
pass |
|
texts = [(text, source) for text, source in zip(texts, sources)] |
|
ChatAI_pipeline.add_document_(texts, vectorstore) |
|
response = create_success_response(200, {"message": "Chatbot trained successfully."}) |
|
logger.info(f">>>Chatbot trained successfully for {vectorstore}.<<<") |
|
|
|
return response |
|
|
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in train_chatbot: {e} for {vectorstore}.<<<") |
|
raise e |
|
|
|
|
|
|
|
|
|
|
|
@chatai_api_router.post("/new_chatbot") |
|
async def new_chatbot(request: NewChatbotRequest): |
|
logger.info(f">>> new_chatbot API Triggered <<<") |
|
try: |
|
response = user_management.new_chatbot_(chatbot_name=request.chatbot_name, username=request.username) |
|
logger.info(f">>> Chatbot created successfully for {request.username}.<<<") |
|
return response |
|
|
|
except Exception as e: |
|
logger.error(f">>>Error in new_chatbot: {e} for {request.username}.<<<") |
|
raise_http_exception(500, "Internal Server Error") |
|
|