ishworrsubedii's picture
Initial commit
a4a2363
"""
Created By: ishwor subedi
Date: 2024-08-02
"""
import io
import json
import os
import string
import tempfile
import requests
import pandas as pd
from src import logging as logger
from supabase import create_client
from urllib.parse import urlparse
from src.api.jwt_bearer import access_check_bearer
from src.models.apis_models import *
from fastapi.requests import Request
from fastapi.routing import APIRouter
from fastapi import UploadFile, File, HTTPException, Form, Depends
from src.pipeline.conversai_pipeline import ConversAIPipeline
from src.api.user_management_api import user_management
from src.services.supabase.analytics.analytic_tables import track_usage
from src.services.supabase.user_management.token_limit import token_limit_check
from src.utils.error_handling import create_error_response, create_success_response, raise_http_exception
from src.api.user_management_api import user_management as user_management_pipeline
from src.utils.utils import get_ip_info, encode_to_base64, clean_text, decode_base64
conversai_api_router = APIRouter(tags=["ConversAI"])
supabase_client = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY"))
supabase_client_ = supabase_client
conversai_pipeline = ConversAIPipeline()
@conversai_api_router.post("/add_text", dependencies=[Depends(access_check_bearer)])
async def add_text(request: AddTextRequest):
logger.info(f">>>AddText API Triggered By {request.vectorstore}<<<")
try:
vectorstore, text = request.vectorstore, request.text
track_usage(vectorstore=vectorstore, endpoint="/add_text", supabase_client=supabase_client)
username, chat_bot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chat_bot_name,
text=text)
text = clean_text(text)
if lim:
dct = {
"output": {"text": text},
"source": "Text",
}
num_token = len(text.translate(str.maketrans('', '', string.punctuation)).split(" "))
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name="text", username=username)
supabase_client.storage.from_("ConversAI").upload(file=dct, path=f"{file_name}_data.json")
supa = supabase_client.table("ConversAI_ChatbotDataSources").insert(
{"username": username, "chatbotName": chat_bot_name, "dataSourceName": file_name,
"numTokens": num_token, "sourceEndpoint": "/add_text",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")}).execute()
response = create_success_response(200, {"message": "Successfully added the text."})
logger.info(f">>>Text added successfully for {request.vectorstore}.<<<")
return response
else:
response = create_error_response(400,
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.")
return response
except Exception as e:
logger.error(f">>>Error in add_text: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/answer_query")
async def answer_query(request: AnswerQueryRequest, req: Request):
logger.info(f">>>answer_query API Triggered By {request.vectorstore}<<<")
try:
track_usage(supabase_client=supabase_client, vectorstore=request.vectorstore, endpoint="/answer_query")
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
ip_address = req.client.host
city = get_ip_info(ip_address)
output, followup_questions, source = conversai_pipeline.answer_query_(query=request.query,
vectorstore=request.vectorstore,
llm_model=request.llm_model)
supa = supabase_client.table("ConversAI_ChatHistory").insert(
{"username": username, "chatbotName": chatbot_name, "llmModel": request.llm_model,
"question": request.query, "response": output, "IpAddress": ip_address, "ResponseTokenCount": len(output),
"vectorstore": request.vectorstore, "City": city}).execute()
response = create_success_response(200, data={"output": output, "follow_up_questions": followup_questions,
"source": source})
logger.info(f">>>Query answered successfully for {request.vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in answer_query: {e} for {request.vectorstore}.<<<")
raise HTTPException(status_code=500, detail="Internal Server Error")
@conversai_api_router.post("/data_analyzer", dependencies=[Depends(access_check_bearer)])
async def data_analyzer(query: str = Form(...), file: UploadFile = File(...)):
logger.info(f">>>data_analyzer API Triggered By {query}<<<")
try:
extension = file.filename.split(".")[-1]
if extension in ["xls", "xlsx", "xlsm", "xlsb"]:
df = pd.read_excel(io.BytesIO(await file.read()))
elif extension == "csv":
df = pd.read_csv(io.BytesIO(await file.read()))
else:
return {"output": "INVALID FILE TYPE"}
response = conversai_pipeline.data_analyzer(query=query, dataframe=df)
response = create_success_response(200, {"output": response})
logger.info(f">>>Data analyzed successfully for {query}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in data_analyzer: {e} for {query}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/get_links", dependencies=[Depends(access_check_bearer)])
async def get_links(request: GetLinksRequest):
logger.info(f">>>get_links API Triggered By {request.url}<<<")
try:
response = conversai_pipeline.get_links_(url=request.url, timeout=30)
response = create_success_response(200, {"urls": response, "source": urlparse(request.url).netloc})
logger.info(f">>>Links fetched successfully for {request.url}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in get_links: {e} for {request.url}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/image_pdf_text_extraction", dependencies=[Depends(access_check_bearer)])
async def image_pdf_text_extraction(vectorstore: str = Form(...)
, pdf: UploadFile = File(...)):
logger.info(f">>>image_pdf_text_extraction API Triggered By {pdf.filename}<<<")
try:
track_usage(vectorstore=vectorstore, endpoint="/image_pdf_text_extraction", supabase_client=supabase_client)
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
source = pdf.filename
pdf_bytes = await pdf.read()
response = conversai_pipeline.image_pdf_text_extraction_(image_pdf=pdf_bytes)
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
text=response)
if lim:
dct = {
"output": response,
"source": source
}
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username)
num_tokens = len(
" ".join([response[x] for x in response]).translate(str.maketrans('', '', string.punctuation)).split(
" "))
response = supabase_client.storage.from_("ConversAI").upload(file=dct, path=f"{file_name}_data.json")
supa = supabase_client.table("ConversAI_ChatbotDataSources").insert(
{"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": num_tokens,
"sourceEndpoint": "/image_pdf_text_extraction",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")}).execute()
response = create_success_response(200,
{"source": pdf.filename, "message": "Successfully extracted the text."})
logger.info(f">>>Text extracted successfully for {pdf.filename}.<<<")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.")
return response
except Exception as e:
logger.error(f">>>Error in image_pdf_text_extraction: {e} for {pdf.filename}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/text_pdf_extraction", dependencies=[Depends(access_check_bearer)])
async def text_pdf_extraction(vectorstore: str = Form(...)
, pdf: UploadFile = File(...)):
logger.info(f">>>text_pdf_extraction API Triggered By {pdf.filename}<<<")
try:
track_usage(vectorstore=vectorstore, endpoint="/text_pdf_extraction", supabase_client=supabase_client)
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
source = pdf.filename
pdf = await pdf.read()
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
temp_file.write(pdf)
temp_file_path = temp_file.name
response = conversai_pipeline.text_pdf_extraction_(pdf=temp_file_path)
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
text=response)
os.remove(temp_file_path)
if lim:
dct = {
"output": response,
"source": source
}
numTokens = len(
" ".join([response[x] for x in response]).translate(str.maketrans('', '', string.punctuation)).split(
" "))
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=source, username=username)
response = supabase_client.storage.from_("ConversAI").upload(file=dct, path=f"{file_name}_data.json")
response = (
supabase_client.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": numTokens,
"sourceEndpoint": "/text_pdf_extraction",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"source": source, "message": "Successfully extracted the text."})
logger.info(f">>>Text extracted successfully for {source}.<<<")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of PDF or subscribe to our premium plan.")
return response
except Exception as e:
logger.error(f">>>Error in text_pdf_extraction: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/youtube_transcript", dependencies=[Depends(access_check_bearer)])
async def youtube_transcript(request: YoutubeTranscriptRequest):
vectorstore, urls = request.vectorstore, request.urls
logger.info(f">>>youtube_transcript API Triggered By {urls}<<<")
try:
track_usage(supabase_client=supabase_client, vectorstore=vectorstore, endpoint="/youtube_transcript")
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
response = conversai_pipeline.youtube_transcript_(url=urls)
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
text=response)
if lim:
dct = {
"output": response,
"source": "www.youtube.com"
}
num_tokens = len(
" ".join([response[x] for x in response]).translate(str.maketrans('', '', string.punctuation)).split(
" "))
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name="youtube", username=username)
response = supabase_client.storage.from_("ConversAI").upload(file=dct, path=f"{file_name}_data.json")
response = (
supabase_client.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": num_tokens,
"sourceEndpoint": "/youtube_transcript",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"message": "Successfully fetched the youtube transcript."})
logger.info(f">>>Youtube transcript fetched successfully for {urls}.<<<")
return response
else:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.")
return response
except Exception as e:
logger.error(f">>>Error in youtube_transcript: {e} for {urls}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/website_url_text_extraction", dependencies=[Depends(access_check_bearer)])
async def add_website(request: AddWebsiteRequest):
vectorstore, website_urls, source = request.vectorstore, request.website_urls, request.source
logger.info(f">>>website_url_text_extraction API Triggered By {request.website_urls}<<<")
try:
track_usage(supabase_client=supabase_client, vectorstore=vectorstore, endpoint="/fetch_text/urls")
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
text = conversai_pipeline.website_url_text_extraction_list_(urls=website_urls)
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chatbot_name,
text=text)
if not lim:
response = create_error_response(402,
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.")
return response
else:
dct = {
"output": text,
"source": source
}
num_tokens = len(
" ".join([text[x] for x in text]).translate(str.maketrans('', '', string.punctuation)).split(" "))
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=urlparse(source).netloc,
username=username)
supabase_client.storage.from_("ConversAI").upload(file=dct, path=f"{file_name}_data.json")
(
supabase_client.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"numTokens": num_tokens,
"sourceEndpoint": "/fetch_text/urls",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"message": "Successfully fetched the website text."})
logger.info(f">>>Website text extracted successfully for {request.website_urls}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in website_url_text_extraction: {e} for {request.website_urls}.<<<")
raise HTTPException(status_code=500, detail="Internal Server Error")
@conversai_api_router.get("/get_current_count", dependencies=[Depends(access_check_bearer)])
async def get_count(vectorstore: str):
logger.info(f">>>get_current_count API Triggered By {vectorstore}<<<")
try:
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
current_count = user_management_pipeline.get_current_count_(username)
response = create_success_response(200, {"current_count": current_count})
logger.info(f">>>Current count fetched successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in get_current_count: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/list_chatbots", dependencies=[Depends(access_check_bearer)])
async def list_chatbots(request: ListChatbotsRequest):
logger.info(f">>>list_chatbots API Triggered By {request.username}<<<")
try:
chatbots = user_management.list_tables(username=request.username)
response = create_success_response(200, {"chatbots": chatbots})
logger.info(f">>>Chatbots listed successfully for {request.username}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in list_chatbots: {e} for {request.username}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/get_chat_history", dependencies=[Depends(access_check_bearer)])
async def chat_history(request: GetChatHistoryRequest):
logger.info(f">>>get_chat_history API Triggered By {request.vectorstore}<<<")
try:
_, username, chatbotName = request.vectorstore.split("$", 2)
history = supabase_client.table("ConversAI_ChatHistory").select(
"timestamp", "question", "response"
).eq("username", username).eq("chatbotName", chatbotName).execute().data
response = create_success_response(200, {"history": history})
logger.info(f">>>Chat history fetched successfully for {request.vectorstore}.<<<")
return response
except IndexError:
logger.warning(f"Chat history not found for {request.vectorstore}")
return create_error_response(404, "Chat history not found for the given chatbot.")
except Exception as e:
logger.error(f">>>Error in get_chat_history: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/delete_chatbot", dependencies=[Depends(access_check_bearer)])
async def delete_chatbot(request: DeleteChatbotRequest):
logger.info(f">>>delete_chatbot API Triggered By {request.vectorstore}<<<")
try:
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
supabase_client.table('ConversAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname',
chatbot_name).execute()
all_sources = supabase_client.table("ConversAI_ChatbotDataSources").select("*").eq("username", username).eq(
"chatbotName", chatbot_name).execute().data
all_sources = [x["sourceContentURL"].split("/")[-1] for x in all_sources]
supabase_client.table("ConversAI_ChatbotDataSources").delete().eq("username", username).eq("chatbotName",
chatbot_name).execute()
supabase_client.table("ConversAI_ActivityLog").update({"isActive": False}).eq("username", username).eq(
"chatbotName",
chatbot_name).execute()
supabase_client.table("ConversAI_ChatHistory").update({"isActive": False}).eq("username", username).eq(
"chatbotName",
chatbot_name).execute()
for source in all_sources:
supabase_client.table("ConversAI_Chatbot")
supabase_client.storage.from_("ConversAI").remove(source)
user_management.delete_table(table_name=chatbot_name)
user_management.delete_qdrant_cluster(vectorstorename=request.vectorstore)
response = create_success_response(200, {"message": "Chatbot deleted successfully"})
logger.info(f">>>Chatbot deleted successfully for {request.vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in delete_chatbot: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/add_qa_pair", dependencies=[Depends(access_check_bearer)])
async def add_qa_pair(request: AddQAPairRequest):
logger.info(f">>>add_qa_pair API Triggered By {request.vectorstore}<<<")
try:
vectorstore, question, answer = request.vectorstore, request.question, request.answer
track_usage(vectorstore=vectorstore, endpoint="/add_qa_pair", supabase_client=supabase_client)
username, chat_bot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
normal_text = f"\nQUESTION: {question}\nANSWER: {answer}\n"
lim = token_limit_check(supabase_client=supabase_client, username=username, chatbot_name=chat_bot_name,
text=normal_text)
if lim:
dct = {
"output": {"text": normal_text},
"source": "QA Pair",
}
num_token = len(normal_text.translate(str.maketrans('', '', string.punctuation)).split(" "))
dct = json.dumps(dct, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name="qa_pair", username=username)
supabase_client.storage.from_("ConversAI").upload(file=dct, path=f"{file_name}_data.json")
(
supabase_client.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chat_bot_name,
"dataSourceName": file_name,
"numTokens": num_token,
"sourceEndpoint": "/add_qa_pair",
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"message": "Successfully added the qa pair."})
logger.info(f">>>QA Pair added successfully for {request.vectorstore}.<<<")
return response
else:
response = create_error_response(400,
"Exceeding limits, please try with a smaller chunks of information or subscribe to our premium plan.")
return response
except Exception as e:
logger.error(f">>>Error in add_qa_pair: {e} for {request.vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/load_edited_json", dependencies=[Depends(access_check_bearer)])
async def load_edited_json(request: LoadEditedJson):
vectorstore, data_source_name, source_endpoint, json_data = request.vectorstore, request.data_source_name, request.source_endpoint, request.json_data
username, chatbot_name = request.vectorstore.split("$")[1], request.vectorstore.split("$")[2]
logger.info(f">>>loadEditedJson API Triggered By {request.vectorstore}<<<")
try:
track_usage(supabase_client=supabase_client, vectorstore=request.vectorstore,
endpoint="/load_edited_json")
json_data = decode_base64(request.json_data)
json_data = json.dumps(json_data, indent=1).encode("utf-8", errors="replace")
file_name = user_management_pipeline.create_data_source_name(source_name=data_source_name,
username=username)
response = supabase_client.storage.from_("ConversAI").upload(file=json_data, path=f"{file_name}_data.json")
response = (
supabase_client.table("ConversAI_ChatbotDataSources")
.insert({"username": username,
"chatbotName": chatbot_name,
"dataSourceName": file_name,
"sourceEndpoint": source_endpoint,
"sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"],
f"{file_name}_data.json")})
.execute()
)
response = create_success_response(200, {"output": "Successfully loaded the edited json."})
logger.info(f">>>Edited json loaded successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in loadEditedJson: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.get("/list_chatbot_sources", dependencies=[Depends(access_check_bearer)])
async def list_chatbot_sources(vectorstore: str):
try:
logger.info(f">>>list_chatbot_sources API Triggered By {vectorstore}<<<")
track_usage(supabase_client=supabase_client, vectorstore=vectorstore, endpoint="/list_chatbot_sources")
username, chatbot_name = vectorstore.split("$")[1], vectorstore.split("$")[2]
result = supabase_client.table("ConversAI_ChatbotDataSources").select("*").eq("username", username).eq(
"chatbotName",
chatbot_name).execute().data
response = create_success_response(200, {"output": result})
logger.info(f">>>Chatbot listed successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in list_chatbot_sources: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.get("/get_data_source", dependencies=[Depends(access_check_bearer)])
async def get_data_source(vectorstore: str, source_url: str):
try:
logger.info(f">>>get_data_source API Triggered By {vectorstore}<<<")
track_usage(supabase_client=supabase_client, vectorstore=vectorstore, endpoint="/get_data_source")
r = requests.get(source_url)
res = encode_to_base64(eval(r.content.decode("utf-8", errors="replace")))
response = create_success_response(200, {"output": res})
return response
except Exception as e:
logger.error(f">>>Error in get_data_source: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/delete_chatbot_source", dependencies=[Depends(access_check_bearer)])
async def delete_chatbot_source(request: DeleteChatbotSourceRequest):
vectorstore, data_source_name = request.vectorstore, request.data_source_name
try:
track_usage(supabase_client=supabase_client, vectorstore=vectorstore, endpoint="/delete_chatbot_source")
response = supabase_client.table("ConversAI_ChatbotDataSources").delete().eq("dataSourceName",
data_source_name).execute()
response = supabase_client.storage.from_('ConversAI').remove(f"{data_source_name}_data.json")
response = create_success_response(200, {"output": f"Successfully deleted the {data_source_name} data source."})
logger.info(f">>>Data source deleted successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in delete_chatbot_source: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/train_chatbot", dependencies=[Depends(access_check_bearer)])
async def train_chatbot(request: TrainChatbotRequest):
vectorstore, url_sources = request.vectorstore, request.urls
logger.info(f">>>train_chatbot API Triggered By {vectorstore}<<<")
try:
track_usage(supabase_client=supabase_client, vectorstore=vectorstore, endpoint="/train_chatbot")
texts = []
sources = []
fileTypes = [
supabase_client.table("ConversAI_ChatbotDataSources").select("sourceEndpoint").eq("sourceContentURL",
x).execute().data[0][
"sourceEndpoint"] for x in url_sources]
for source, fileType in zip(url_sources, fileTypes):
if ((fileType == "/text_pdf_extraction") | (fileType == "/image_pdf_text_extraction")):
r = requests.get(source)
file = eval(r.content.decode("utf-8", errors="replace"))
content = file["output"]
fileSource = file["source"]
texts.append(".".join(
[content[key] for key in content.keys()]).replace(
"\n", " "))
sources.append(fileSource)
elif fileType == "/add_text" or fileType == "/add_qa_pair":
r = requests.get(source)
file = eval(r.content.decode("utf-8", errors="replace"))
content = file["output"]["text"]
fileSource = file["source"]
texts.append(content.replace("\n", " "))
sources.append(fileSource)
elif ((fileType == "/website_url_text_extraction") | (fileType == "/youtube_transcript")):
r = requests.get(source)
file = eval(r.content.decode("utf-8", errors="replace"))
content = file["output"]
fileSource = file["source"]
texts.append(".".join(
[content[key] for key in content.keys()]).replace(
"\n", " "))
sources.append(fileSource)
else:
pass
texts = [(text, source) for text, source in zip(texts, sources)]
conversai_pipeline.add_document_(texts, vectorstore)
response = create_success_response(200, {"message": "Chatbot trained successfully."})
logger.info(f">>>Chatbot trained successfully for {vectorstore}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in train_chatbot: {e} for {vectorstore}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.get("/activity_log", dependencies=[Depends(access_check_bearer)])
async def activity_log(username: str):
logger.info(f">>>activityLog API Triggered By {username}<<<")
try:
response = supabase_client.table("ConversAI_ActivityLog").select("*").eq("username", username).execute().data
logger.info(f">>>Activity log fetched successfully for {username}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in activityLog: {e} for {username}.<<<")
raise_http_exception(500, "Internal Server Error")
@conversai_api_router.post("/new_chatbot", dependencies=[Depends(access_check_bearer)])
async def new_chatbot(request: NewChatbotRequest):
logger.info(f">>> new_chatbot API Triggered <<<")
try:
response = user_management.new_chatbot_(chatbot_name=request.chatbot_name, username=request.username)
logger.info(f">>> Chatbot created successfully for {request.username}.<<<")
return response
except Exception as e:
logger.error(f">>>Error in new_chatbot: {e} for {request.username}.<<<")
raise_http_exception(500, "Internal Server Error")