import threading import uvicorn import asyncio from pathlib import Path from fastapi import FastAPI, File, UploadFile, HTTPException, Form, status from pydantic import BaseModel, HttpUrl from chat_database import create_chat_entry, get_all_chat_details, rename_chat_title, save_context_detail, clear_context_detail, delete_chat, save_system_prompt from fastapi.responses import JSONResponse from pdfminer.high_level import extract_text from io import BytesIO import httpx import pandas as pd from bs4 import BeautifulSoup from urllib.parse import urlparse from embeddings import get_and_store_embeddings from qdrent import delete_embeddings import re from readability import Document as ReadabilityDocument from providers.ppt_and_docx_helper import extract_text_from_docx, extract_text_from_pptx ALLOWED_EXT = { ".pdf", ".csv", ".txt", ".ppt", ".pptx", ".doc", ".docx", ".xls", ".xlsx" } app = FastAPI() class Document(BaseModel): session_id: str data: str filename: str class RenameChatRequest(BaseModel): sessionId: str title: str class SavePromptRequest(BaseModel): sessionId: str prompt: str class LinkInput(BaseModel): link: HttpUrl sessionId: str title: str summary: str categories: str class TextInput(BaseModel): text: str sessionId: str title: str name: str summary: str categories: str class clearContextInput(BaseModel): sessionId: str @app.get('/get-chats') async def get_chat_names(): chat_history = get_all_chat_details() return chat_history @app.post('/create-chat/{sessionId}') async def createChat(sessionId: str): chat_history = create_chat_entry(sessionId) return chat_history @app.post('/save-prompt') async def savePrompt(req: SavePromptRequest): saved = save_system_prompt(req.sessionId, req.prompt) return saved @app.post('/rename-chat') async def renameChat(req: RenameChatRequest): renamed = rename_chat_title(req.sessionId, req.title) return renamed def _process_documents(contents: bytes, session_id: str, name: str, title: str, summary: str, categories: str) -> str: ext = Path(name).suffix.lower() # 1) extract text (blocking) if ext == ".pdf": text = extract_text(BytesIO(contents)) elif ext in {".doc", ".docx"}: text = extract_text_from_docx(contents) elif ext in {".ppt", ".pptx"}: text = extract_text_from_pptx(contents) elif ext in {".xls", ".xlsx"}: xls = pd.read_excel(BytesIO(contents), sheet_name=None) parts = [] for sheet, df in xls.items(): parts.append(f"--- Sheet: {sheet} ---") parts.append(df.to_csv(index=False)) text = "\n".join(parts) elif ext in {".csv", ".txt"}: text = contents.decode("utf-8", errors="ignore") else: raise ValueError(f"Unsupported extension {ext!r}") asyncio.run(save_context_detail( session_id, name, title, summary, categories)) asyncio.run(get_and_store_embeddings( text, session_id, name, title, summary, categories)) return text @app.post("/upload-pdf") async def upload_pdf( pdf_file: UploadFile = File(...), name: str = Form(...), sessionId: str = Form(...), title: str = Form(...), summary: str = Form(...), categories: str = Form(...) ): try: ext = Path(name).suffix.lower() if ext not in ALLOWED_EXT: raise HTTPException( 400, detail=( f"Invalid file type {ext!r}. " "Allowed: PDF, CSV, TXT, PPT(X), DOC(X), XLS(X)." ) ) contents = await pdf_file.read() loop = asyncio.get_running_loop() text_content = await loop.run_in_executor( None, # use default ThreadPoolExecutor _process_documents, # the blocking function contents, sessionId, name, title, summary, categories ) # pdf_stream = BytesIO(contents) # print("Content : ", contents) # print("pdf_stream : ", pdf_stream) # try: # text_content = extract_text(pdf_stream) # print("pdf content : ", text_content) # except Exception as e: # print("error in pdf content : ", e) # raise HTTPException( # status_code=400, detail=f"Error extracting text from PDF: {e}") # print("\n\nSaving details") # await save_context_detail(sessionId, name) # print("\n\nSaving embeddings") # embeddded = await get_and_store_embeddings(text_content, sessionId, name) return JSONResponse(status_code=200, content={"status": "received", "text": text_content}) except Exception as e: print("Error in embedding pdf : ", e) return JSONResponse(status_code=500, content={"status": "failed", "detail": e}) async def fetch_url_content(link: str): try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(link) response.raise_for_status() # Raise exception for HTTP errors return response except httpx.RequestError as exc: raise HTTPException( status_code=400, detail=f"Error fetching the URL: {exc}") from exc except httpx.HTTPStatusError as exc: raise HTTPException( status_code=exc.response.status_code, detail=f"Error response {exc.response.status_code} while requesting {exc.request.url}" ) from exc def get_content_type(response: httpx.Response) -> str: content_type = response.headers.get('Content-Type', '').lower() if ';' in content_type: content_type = content_type.split(';')[0].strip() return content_type def extract_text_from_pdf(pdf_content: bytes) -> str: pdf_stream = BytesIO(pdf_content) try: text = extract_text(pdf_stream) return text except Exception as e: raise HTTPException( status_code=400, detail=f"Error extracting text from PDF: {e}") from e def extract_text_from_html(html_content: str) -> str: soup = BeautifulSoup(html_content, 'html.parser') for script_or_style in soup(['script', 'style']): script_or_style.decompose() text = soup.get_text(separator='\n') lines = [line.strip() for line in text.splitlines()] text = '\n'.join(line for line in lines if line) return text def is_supported_domain(url: str) -> bool: parsed_url = urlparse(url) unsupported_domains = ['drive.google.com', 'docs.google.com'] return parsed_url.netloc not in unsupported_domains @app.post("/process-link") async def process_link(input_data: LinkInput): try: link = str(input_data.link) session_id = input_data.sessionId title = input_data.title summary = input_data.summary categories = input_data.categories blocked_domains = ("drive.google.com", "docs.google.com", "dropbox.com") if any(blocked in link for blocked in blocked_domains): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Links from Google Drive or similar services are not supported. Please provide a direct link to a PDF or a public web page." ) if not is_supported_domain(link): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Links from Google Drive or similar services are not supported. Please provide a direct link to a PDF or a public web page." ) response = await fetch_url_content(link) content_type = get_content_type(response) text_content = None extracted_from = None if content_type.startswith('application/pdf'): text_content = extract_text_from_pdf(response.content) extracted_from = 'pdf' elif content_type.startswith('application/vnd.openxmlformats-officedocument.wordprocessingml.document'): text_content = extract_text_from_docx(response.content) extracted_from = 'docx' elif content_type.startswith('application/vnd.openxmlformats-officedocument.presentationml.presentation'): text_content = extract_text_from_pptx(response.content) extracted_from = 'pptx' elif content_type.startswith('text/html'): html = response.text async def try_fetch_readme_raw(urls): for raw_url in urls: try: raw_resp = await fetch_url_content(raw_url) if raw_resp.status_code == 200 and raw_resp.text.strip(): return raw_resp.text except Exception: continue return None github_repo_match = re.match( r"https://github\.com/([^/]+)/([^/]+)(/)?$", link) if github_repo_match: user, repo = github_repo_match.group( 1), github_repo_match.group(2) raw_urls = [ f"https://raw.githubusercontent.com/{user}/{repo}/main/README.md", f"https://raw.githubusercontent.com/{user}/{repo}/master/README.md" ] text_content = await try_fetch_readme_raw(raw_urls) if text_content: extracted_from = 'github_readme' if text_content is None: gitlab_repo_match = re.match( r"https://gitlab\.com/([^/]+)/([^/]+)(/)?$", link) if gitlab_repo_match: user, repo = gitlab_repo_match.group( 1), gitlab_repo_match.group(2) raw_urls = [ f"https://gitlab.com/{user}/{repo}/-/raw/main/README.md", f"https://gitlab.com/{user}/{repo}/-/raw/master/README.md" ] text_content = await try_fetch_readme_raw(raw_urls) if text_content: extracted_from = 'gitlab_readme' if text_content is None and "huggingface.co/" in link: raw_readme_url = link.rstrip("/") + "/raw/main/README.md" try: raw_resp = await fetch_url_content(raw_readme_url) if raw_resp.status_code == 200 and raw_resp.text.strip(): text_content = raw_resp.text extracted_from = 'huggingface_readme' except Exception: pass if text_content is None: try: doc = ReadabilityDocument(html) except Exception as e: print(f"Error creating Document: {e}") summary_html = doc.summary() soup = BeautifulSoup(summary_html, "html.parser") text_content = "\n".join(soup.stripped_strings) # print("\n\n\n\n\n\nScraped Text : ", text_content) extracted_from = 'html' else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unsupported content type: {content_type}" ) await save_context_detail(session_id, link, title, summary, categories) await get_and_store_embeddings(text_content, session_id, link, title, summary, categories) return JSONResponse( status_code=status.HTTP_200_OK, content={ "status": "success", "content_type": extracted_from, "text": text_content } ) except HTTPException as http_exc: raise http_exc except Exception as e: print("Error in uploding link : ", e) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "success": False, "detail": str(e) } ) @app.post("/process-text") async def process_text(input_data: TextInput): try: text = str(input_data.text) session_id = input_data.sessionId name = input_data.name title = input_data.title summary = input_data.summary categories = input_data.categories await save_context_detail(session_id, name, title, summary, categories) await get_and_store_embeddings(text, session_id, name, title, summary, categories) return JSONResponse( status_code=status.HTTP_200_OK, content={ "status": "success", "text": text } ) except HTTPException as http_exc: raise http_exc except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "success": False, "detail": str(e) } ) @app.post("/clear-context") async def clearContext(body: clearContextInput): sessionId = body.sessionId deleted = delete_embeddings(sessionId) if deleted: clear_context_detail(sessionId) return JSONResponse( status_code=status.HTTP_200_OK, content={ "status": "success", "message": "all the embedding are deleted" } ) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "status": "failed", "message": "failed to delete" } ) @app.post("/delete-chat") async def clearChat(body: clearContextInput): sessionId = body.sessionId deleted = delete_embeddings(sessionId) if deleted: delete_chat(sessionId) return JSONResponse( status_code=status.HTTP_200_OK, content={ "status": "success", "message": "all the embedding are deleted" } ) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={ "status": "failed", "message": "failed to delete" } ) def run_fastapi(): uvicorn.run(app, host="0.0.0.0", port=8082, log_level="info") threading.Thread(target=run_fastapi, daemon=True).start()