import json import os import string import tempfile from urllib.parse import urlparse import requests from fastapi import UploadFile, File, Form, HTTPException from fastapi.requests import Request from fastapi.routing import APIRouter from supabase import create_client from core import logging as logger from core.api.user_management_api import user_management from core.api.user_management_api import user_management as user_management_pipeline from core.models.apis_models import * from core.pipeline.chataipipeline import ChatAIPipeline from core.services.supabase.user_management.token_limit import token_limit_check from core.utils.error_handling import create_error_response, create_success_response, raise_http_exception from core.utils.utils import get_ip_info, encode_to_base64, clean_text, decode_base64 from core.services.supabase.limit.limit_check import LimitChecker from PyPDF2 import PdfReader from dotenv import load_dotenv load_dotenv() import io chatai_api_router = APIRouter(tags=["ChatAI"]) supabase_client = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY")) supabase_client_ = supabase_client ChatAI_pipeline = ChatAIPipeline() url_limit,pdf_limit,ocr_limit=LimitChecker(supabase_client) @chatai_api_router.post("/add_text") async def add_text(request: AddTextRequest): logger.info(f">>>AddText API Triggered By {request.vectorstore}<<<") try: vectorstore, text = request.vectorstore, request.text username, chat_bot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2] cleaned_text = " ".join(text.split()) num_token = len(cleaned_text) lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chat_bot_name, len_text=num_token) text = clean_text(text) if lim: dct = { "output": {"text": text}, "source": "Text", } cleaned_text = " ".join(text.split()) # handles unnencessary spaces # Count characters num_token = len(cleaned_text) logger.info(f"Number of token {num_token}") dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") file_name = user_management_pipeline.create_data_source_name(source_name="text", username=username) supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") supa = supabase_client.table("ChatAI_ChatbotDataSources").insert( {"username": username, "chatbotName": chat_bot_name, "dataSourceName": file_name, "numTokens": num_token, "sourceEndpoint": "/add_text", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{file_name}_data.json")}).execute() response = create_success_response(200, {"message": "Successfully added the text."}) logger.info(f">>>Text added successfully for {request.vectorstore}.<<<") return response else: response = create_error_response(400, "Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.") return response except Exception as e: logger.error(f">>>Error in add_text: {e} for {request.vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/answer_query") async def answer_query(request: AnswerQueryRequest, req: Request): logger.info(f">>>answer_query API Triggered By {request.vectorstore}<<<") try: username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2] ip_address = req.client.host city = get_ip_info(ip_address) output, followup_questions, source = ChatAI_pipeline.answer_query_(query=request.query, vectorstore=request.vectorstore, llm_model=request.llm_model) supa = supabase_client.table("ChatAI_ChatHistory").insert( {"username": username, "chatbotName": chatbot_name, "llmModel": request.llm_model, "question": request.query, "response": output, "IpAddress": ip_address, "ResponseTokenCount": len(output), "vectorstore": request.vectorstore, "City": city}).execute() response = create_success_response(200, data={"output": output, "follow_up_questions": followup_questions, "source": source}) logger.info(f">>>Query answered successfully for {request.vectorstore}.<<<") return response except Exception as e: logger.error(f">>>Error in answer_query: {e} for {request.vectorstore}.<<<") raise e @chatai_api_router.post("/get_links") async def get_links(request: GetLinksRequest): logger.info(f">>>get_links API Triggered By {request.url}<<<") try: response = ChatAI_pipeline.get_links_(url=request.url, timeout=30) response = create_success_response(200, {"urls": response, "source": urlparse(request.url).netloc}) logger.info(f">>>Links fetched successfully for {request.url}.<<<") return response except Exception as e: logger.error(f">>>Error in get_links: {e} for {request.url}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/image_pdf_text_extraction") async def image_pdf_text_extraction(vectorstore: str = Form(...) , pdf: UploadFile = File(...)): logger.info(f">>>image_pdf_text_extraction API Triggered By {pdf.filename}<<<") try: username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] pdf_bytes = await pdf.read() source = pdf.filename pdf_reader = PdfReader(io.BytesIO(pdf_bytes)) doc_len = len(pdf_reader.pages) if doc_len>>Text extracted successfully for {pdf.filename}.<<<") return response else: response = create_error_response(402, "Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.") return response else: response = create_error_response(402, "Exceeding limits, please try with a PDF having less than 20 pages for pdf .") return response except Exception as e: raise e @chatai_api_router.post("/text_pdf_extraction") async def text_pdf_extraction(vectorstore: str = Form(...) , pdf: UploadFile = File(...)): logger.info(f">>>text_pdf_extraction API Triggered By {pdf.filename}<<<") try: username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] content = await pdf.read() pdf_reader = PdfReader(io.BytesIO(content)) doc_len = len(pdf_reader.pages) if doc_len < pdf_limit : source = pdf.filename with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: temp_file.write(content) temp_file_path = temp_file.name response = ChatAI_pipeline.text_pdf_extraction_(pdf=temp_file_path) numTokens = len(" ".join([response[x] for x in response])) lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name, len_text=numTokens) os.remove(temp_file_path) if lim: dct = { "output": response, "source": source } numTokens = len(" ".join([response[x] for x in response])) logger.info(f"Num of tokens {numTokens} text_pdf_extraction") dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username) response = supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") response = ( supabase_client.table("ChatAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbot_name, "dataSourceName": file_name, "numTokens": numTokens, "sourceEndpoint": "/text_pdf_extraction", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{file_name}_data.json")}) .execute() ) response = create_success_response(200, {"source": source, "message": "Successfully extracted the text."}) logger.info(f">>>Text extracted successfully for {source}.<<<") return response else: response = create_error_response(402, "Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.") return response else: response = create_error_response(402, "Exceeding limits, please try with a pdf having pages less than 200.") return response except Exception as e: logger.error(f">>>Error in text_pdf_extraction: {e} for {vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/website_url_text_extraction") async def add_website(request: AddWebsiteRequest): vectorstore, website_urls, source = request.vectorstore, request.website_urls, request.source logger.info(f">>>website_url_text_extraction API Triggered By {request.website_urls}<<<") try: username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] total_requested_urls=len(website_urls) if total_requested_urls < url_limit : text = ChatAI_pipeline.website_url_text_extraction_list_(urls=website_urls) num_token = len(" ".join([text[x] for x in text])) logger.info(f">>>website_url_text_extraction len{num_token}<<<") lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name, len_text=num_token) if not lim: response = create_error_response(402, "Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.") return response else: dct = { "output": text, "source": source } dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace") file_name = user_management_pipeline.create_data_source_name(source_name=urlparse(source).netloc, username=username) supabase_client.storage.from_("ChatAI").upload(file=dct, path=f"{file_name}_data.json") ( supabase_client.table("ChatAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbot_name, "dataSourceName": file_name, "numTokens": num_token, "sourceEndpoint": "/fetch_text/urls", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{file_name}_data.json")}) .execute() ) response = create_success_response(200, {"message": "Successfully fetched the website text."}) logger.info(f">>>Website text extracted successfully for {request.website_urls}.<<<") return response else: response = create_error_response(402, "Please select urls less than 50") return response except Exception as e: logger.error(f">>>Error in website_url_text_extraction: {e} for {request.website_urls}.<<<") raise HTTPException(status_code=500, detail="Internal Server Error") @chatai_api_router.get("/get_current_count") async def get_count(vectorstore: str): logger.info(f">>>get_current_count API Triggered By {vectorstore}<<<") try: username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] current_count = user_management_pipeline.get_current_count_(username) response = create_success_response(200, {"current_count": current_count}) logger.info(f">>>Current count fetched successfully for {vectorstore}.<<<") return response except Exception as e: logger.error(f">>>Error in get_current_count: {e} for {vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/list_chatbots") async def list_chatbots(request: ListChatbotsRequest): logger.info(f">>>list_chatbots API Triggered By {request.username}<<<") try: chatbots = user_management.list_tables(username=request.username) response = create_success_response(200, {"chatbots": chatbots}) logger.info(f">>>Chatbots listed successfully for {request.username}.<<<") return response except Exception as e: logger.error(f">>>Error in list_chatbots: {e} for {request.username}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/get_chat_history") async def chat_history(request: GetChatHistoryRequest): logger.info(f">>>get_chat_history API Triggered By {request.vectorstore}<<<") try: _, username, chatbotName = request.vectorstore.split("$", 2) history = supabase_client.table("ChatAI_ChatHistory").select( "timestamp", "question", "response" ).eq("username", username).eq("chatbotName", chatbotName).execute().data response = create_success_response(200, {"history": history}) logger.info(f">>>Chat history fetched successfully for {request.vectorstore}.<<<") return response except IndexError: logger.warning(f"Chat history not found for {request.vectorstore}") return create_error_response(404, "Chat history not found for the given chatbot.") except Exception as e: logger.error(f">>>Error in get_chat_history: {e} for {request.vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/delete_chatbot") async def delete_chatbot(request: DeleteChatbotRequest): logger.info(f">>>delete_chatbot API Triggered By {request.vectorstore}<<<") try: username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2] supabase_client.table('ChatAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname', chatbot_name).execute() all_sources = supabase_client.table("ChatAI_ChatbotDataSources").select("*").eq("username", username).eq( "chatbotName", chatbot_name).execute().data all_sources = [x["sourceContentURL"].split("/")[-1] for x in all_sources] supabase_client.table("ChatAI_ChatbotDataSources").delete().eq("username", username).eq("chatbotName", chatbot_name).execute() for source in all_sources: supabase_client.table("ChatAI_Chatbot") supabase_client.storage.from_("ChatAI").remove(source) user_management.delete_table(table_name=chatbot_name) user_management.delete_qdrant_cluster(vectorstorename=request.vectorstore) response = create_success_response(200, {"message": "Chatbot deleted successfully"}) logger.info(f">>>Chatbot deleted successfully for {request.vectorstore}.<<<") return response except Exception as e: logger.error(f">>>Error in delete_chatbot: {e} for {request.vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.get("/list_chatbot_sources") async def list_chatbot_sources(vectorstore: str): try: logger.info(f">>>list_chatbot_sources API Triggered By {vectorstore}<<<") username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2] result = supabase_client.table("ChatAI_ChatbotDataSources").select("*").eq("username", username).eq( "chatbotName", chatbot_name).execute().data response = create_success_response(200, {"output": result}) logger.info(f">>>Chatbot listed successfully for {vectorstore}.<<<") return response except Exception as e: logger.error(f">>>Error in list_chatbot_sources: {e} for {vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.get("/get_data_source") async def get_data_source(vectorstore: str, source_url: str): try: logger.info(f">>>get_data_source API Triggered By {vectorstore}<<<") r = requests.get(source_url) res = encode_to_base64(eval(r.content.decode("utf-8", errors="replace"))) response = create_success_response(200, {"output": res}) return response except Exception as e: logger.error(f">>>Error in get_data_source: {e} for {vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/delete_chatbot_source") async def delete_chatbot_source(request: DeleteChatbotSourceRequest): vectorstore, data_source_name = request.vectorstore, request.data_source_name try: response = supabase_client.table("ChatAI_ChatbotDataSources").delete().eq("dataSourceName", data_source_name).execute() response = supabase_client.storage.from_('ChatAI').remove(f"{data_source_name}_data.json") response = create_success_response(200, {"output": f"Successfully deleted the {data_source_name} data source."}) logger.info(f">>>Data source deleted successfully for {vectorstore}.<<<") return response except Exception as e: logger.error(f">>>Error in delete_chatbot_source: {e} for {vectorstore}.<<<") raise_http_exception(500, "Internal Server Error") @chatai_api_router.post("/train_chatbot") async def train_chatbot(request: TrainChatbotRequest): vectorstore, url_sources = request.vectorstore, request.urls logger.info(f">>>train_chatbot API Triggered By {vectorstore}<<<") try: texts = [] sources = [] fileTypes = [ supabase_client.table("ChatAI_ChatbotDataSources").select("sourceEndpoint").eq("sourceContentURL", x).execute().data[0][ "sourceEndpoint"] for x in url_sources] for source, fileType in zip(url_sources, fileTypes): if ((fileType == "/text_pdf_extraction") | (fileType == "/image_pdf_text_extraction")): logger.info(f"Source is {source}") r = requests.get(source) file = eval(r.content.decode("utf-8", errors="replace")) content = file["output"] fileSource = file["source"] texts.append(".".join( [content[key] for key in content.keys()]).replace( "\n", " ")) sources.append(fileSource) elif fileType == "/add_text" or fileType == "/add_qa_pair": r = requests.get(source) file = eval(r.content.decode("utf-8", errors="replace")) content = file["output"]["text"] fileSource = file["source"] texts.append(content.replace("\n", " ")) sources.append(fileSource) elif ((fileType == "/fetch_text/urls") | (fileType == "/youtube_transcript")): r = requests.get(source) file = eval(r.content.decode("utf-8", errors="replace")) content = file["output"] fileSource = file["source"] texts.append(".".join( [content[key] for key in content.keys()]).replace( "\n", " ")) sources.append(fileSource) else: pass texts = [(text, source) for text, source in zip(texts, sources)] ChatAI_pipeline.add_document_(texts, vectorstore) response = create_success_response(200, {"message": "Chatbot trained successfully."}) logger.info(f">>>Chatbot trained successfully for {vectorstore}.<<<") return response except Exception as e: logger.error(f">>>Error in train_chatbot: {e} for {vectorstore}.<<<") raise e @chatai_api_router.post("/new_chatbot") async def new_chatbot(request: NewChatbotRequest): logger.info(f">>> new_chatbot API Triggered <<<") try: response = user_management.new_chatbot_(chatbot_name=request.chatbot_name, username=request.username) logger.info(f">>> Chatbot created successfully for {request.username}.<<<") return response except Exception as e: logger.error(f">>>Error in new_chatbot: {e} for {request.username}.<<<") raise_http_exception(500, "Internal Server Error")