Spaces:
Running
Running
# app.py | |
import eventlet | |
eventlet.monkey_patch() | |
from flask import Flask, render_template, request, redirect, url_for, flash, session, send_from_directory | |
from flask_socketio import SocketIO | |
import traceback | |
import os | |
from werkzeug.utils import secure_filename | |
import json | |
import logging | |
import agent # your agent.py module | |
from agent import refresh_memory | |
from agent import run_stream | |
from typing import List, Dict | |
import markdown2 | |
import re | |
import time | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Inialized VAR & FS βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
BASE_DIR = os.path.abspath(os.path.dirname(__file__)) | |
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
CHAT_FOLDER = os.path.join(BASE_DIR, "chats") | |
os.makedirs(CHAT_FOLDER, exist_ok=True) | |
try: | |
os.chmod(CHAT_FOLDER, 0o777) | |
except Exception: | |
pass | |
app = Flask(__name__, template_folder="templates") | |
# For storing the processed files | |
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER | |
# For storing the Chat history and other files | |
app.config["CHAT_FOLDER"] = CHAT_FOLDER | |
app.secret_key = os.getenv("FLASK_SECRET", "supersecret") | |
# Use eventlet for async SocketIO | |
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet") | |
# Allowed file extensions | |
ALLOWED_EXTENSIONS = { | |
".db", ".sqlite", # SQLite databases | |
".pdf", ".txt", ".doc", ".docx", # Documents | |
".png", ".jpg", ".jpeg", ".gif" # Images | |
} | |
import config | |
DB_PATH = None # will be set when a .db is uploaded | |
DOC_PATH = None # will be set when a document is uploaded | |
IMG_PATH = None # will be set when an image is uploaded | |
OTH_PATH = None # will be set when an other file is uploaded | |
# import config | |
# IMG_PATH = "path/to/user_uploaded_file.jpg" | |
# DOC_PATH = "path/to/user_uploaded_file.pdf" | |
# DB_PATH = "path/to/user_uploaded_file.db" | |
# OTH_PATH = "path/to/user_uploaded_file.txt" | |
def allowed_file(filename: str) -> bool: | |
ext = os.path.splitext(filename.lower())[1] | |
return ext in ALLOWED_EXTENSIONS | |
def ensure_user_session(): | |
if "user_id" not in session: | |
session["user_id"] = os.urandom(16).hex() | |
session["uploads"] = [] | |
session["chat_history"] = [] | |
refresh_memory() | |
#text cleaning function | |
def clean_html_chunk(text): | |
""" | |
Removes outer <p>...</p> tags and trims extra backticks or 'json' words. | |
Similar to your 'format:' cleaning logic. | |
""" | |
text = text.strip() | |
# Pattern to match single <p>...</p> wrapping | |
pattern = r'^<p>(.*?)</p>$' | |
match = re.match(pattern, text, re.DOTALL) | |
if match: | |
text = match.group(1).strip() | |
# Extra clean-up (optional, like your example) | |
text = text.strip('`').strip('json').strip() | |
return text | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ AGENT Defination ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def run_agent_thread( | |
prompt: str, | |
user_id: str, | |
chat_history: List[Dict], # β new parameter | |
): | |
""" | |
Launches the agent in a background thread, streaming results back over SocketIO. | |
`data` is the uploaded file path (image, document, or DB) that will be injected | |
into config before the agent runs. | |
""" | |
global DB_PATH, DOC_PATH, IMG_PATH, OTH_PATH # force re-initializing agent | |
# Build the list of all paths, skipping None or empty | |
data_paths = [DB_PATH, DOC_PATH, IMG_PATH, OTH_PATH] | |
data_paths = [p for p in data_paths if p] | |
print(f"Data paths----------------->: {data_paths}") | |
text_accum = "" | |
try: | |
# Stream using run_stream (which will also pick up the same globals) | |
for piece in run_stream(prompt, data_paths): | |
html_chunk = markdown2.markdown(piece, extras=["fenced-code-blocks", "tables", "strike", "task_list", "break-on-newline"]) | |
print(f"HTML chunk: {html_chunk}") # Debugging output | |
html_chunk = clean_html_chunk(html_chunk) | |
text_accum += html_chunk # accumulate HTML | |
socketio.emit("final_stream", {"message": html_chunk}) | |
print(f"Streaming chunk: {html_chunk}") # Debugging output | |
except Exception as e: | |
socketio.emit("error", {"message": f"Streaming error: {e}"}) | |
traceback.print_exc() | |
return | |
# Fallback / finalize | |
try: | |
if not text_accum: | |
text_accum = markdown2.markdown(agent.agent.executor.run(prompt), extras=["fenced-code-blocks", "tables", "strike", "task_list", "break-on-newline"]) | |
print(f"Final HTML chunk: {text_accum}") # Debugging output | |
text_accum = clean_html_chunk(text_accum) | |
print(f"Final text: {text_accum}") # Debugging output | |
socketio.emit("stream_complete", {"message": text_accum}) | |
socketio.emit("final", {"message": text_accum}) | |
chat_history.append({"user": prompt, "assistant": text_accum}) | |
output_path = os.path.join( | |
app.config["CHAT_FOLDER"], | |
f"user_chat_no_{user_id}.json" | |
) | |
with open(output_path, "w", encoding="utf-8") as f: | |
json.dump(chat_history, f, ensure_ascii=False, indent=4) | |
except Exception as e: | |
socketio.emit("error", {"message": f"Final generation error: {e}"}) | |
traceback.print_exc() | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Main Page ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def index(): | |
if "user_id" not in session: | |
session["user_id"] = os.urandom(16).hex() | |
session["uploads"] = [] | |
session["chat_history"] = [] | |
refresh_memory() | |
else: | |
# Load previous chat history from JSON file if exists | |
user_id = session["user_id"] | |
chat_file = os.path.join(app.config.get("CHAT_FOLDER", "chat_history"), f"user_chat_no_{user_id}.json") | |
if os.path.exists(chat_file): | |
with open(chat_file, "r", encoding="utf-8") as f: | |
session["chat_history"] = json.load(f) | |
return render_template("index.html", chat_history=session.get("chat_history", [])) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Upload section ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def upload(): | |
ensure_user_session() | |
global DB_PATH, DOC_PATH, IMG_PATH, OTH_PATH # force re-initializing agent | |
if request.method == "POST": | |
f = request.files.get("file") | |
if not f or f.filename == "": | |
flash("No file selected", "error") | |
return render_template("upload.html") | |
filename = secure_filename(f.filename) | |
if not allowed_file(filename): | |
flash("File type not supported", "error") | |
return render_template("upload.html") | |
ext = os.path.splitext(filename.lower())[1] | |
if ext in (".db", ".sqlite"): | |
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "databases", filename) | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
global DB_PATH | |
DB_PATH = save_path | |
# DB_PATH = f"http://127.0.0.1:5000/uploads/databases/{filename}" | |
print(f"Database path: {save_path}") | |
f.save(save_path) | |
elif ext in (".pdf", ".txt", ".doc", ".docx"): | |
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "documents", filename) | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
global DOC_PATH | |
DOC_PATH = save_path | |
# DOC_PATH = f"http://127.0.0.1:5000/uploads/documents/{filename}" | |
print(f"Document path: {save_path}") | |
f.save(save_path) | |
elif ext in (".png", ".jpg", ".jpeg", ".gif"): | |
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "images", filename) | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
global IMG_PATH | |
IMG_PATH = save_path | |
# IMG_PATH = f"http://127.0.0.1:5000/uploads/images/{filename}" | |
print(f"Image path: {save_path}") | |
f.save(save_path) | |
else: | |
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "others", filename) | |
os.makedirs(os.path.dirname(save_path), exist_ok=True) | |
global OTH_PATH | |
OTH_PATH = save_path | |
# OTH_PATH = f"http://127.0.0.1:5000/uploads/others/{filename}" | |
print(f"Other file path: {save_path}") | |
f.save(save_path) | |
#f.save(save_path) | |
# Add the uploaded file to the session | |
session["uploads"].append(filename) | |
# β Database files β | |
if ext in (".db", ".sqlite"): | |
DB_PATH = save_path | |
agent.GLOBAL_DB_PATH = DB_PATH | |
flash(f"Database uploaded and set: {filename}", "success") | |
# β Documents for RAG indexing β | |
elif ext in (".pdf", ".txt", ".doc", ".docx"): | |
agent.rag_index_document(save_path) | |
flash(f"Document indexed for RAG: {filename}", "success") | |
# β Images or other files β | |
else: | |
flash(f"File uploaded: {filename}", "success") | |
return redirect(url_for("index")) | |
# GET | |
return render_template("upload.html") | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ Static Upload βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def serve_database(filename): | |
""" | |
Serve an database file from the uploads/databases folder. | |
""" | |
return send_from_directory(os.path.join(app.root_path, 'uploads', 'databases'), filename) | |
def serve_image(filename): | |
""" | |
Serve an document file from the uploads/images folder. | |
""" | |
return send_from_directory(os.path.join(app.root_path, 'uploads', 'images'), filename) | |
def serve_document(filename): | |
""" | |
Serve an image file from the uploads/documents folder. | |
""" | |
return send_from_directory(os.path.join(app.root_path, 'uploads', 'documents'), filename) | |
def serve_other(filename): | |
""" | |
Serve an other file from the uploads/others folder. | |
""" | |
return send_from_directory(os.path.join(app.root_path, 'uploads', 'others'), filename) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ AGENT calling βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def generate(): | |
prompt = request.json.get("prompt", "").strip() | |
ensure_user_session() | |
if not prompt: | |
return "No prompt provided", 400 | |
socketio.start_background_task( | |
run_agent_thread, | |
prompt, | |
session["user_id"], | |
session["chat_history"] | |
# β pass it here | |
) | |
return "OK", 200 | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ SESSION handling βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def session_info(): | |
# Endpoint to view session details (for debugging purposes) | |
return { | |
"user_id": session.get("user_id"), | |
"uploads": session.get("uploads", []) | |
} | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# βββββββββββββββββββββββββββββββββββββββββββββββ SESSION clearing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def clear_chat(): | |
ensure_user_session() | |
user_id = session.get("user_id") | |
# Remove saved JSON chat file | |
if user_id: | |
chat_file = os.path.join(app.config["CHAT_FOLDER"], f"user_chat_no_{user_id}.json") | |
if os.path.exists(chat_file): | |
os.remove(chat_file) | |
# Reset session | |
session.clear() | |
# Generate new session id and chat history | |
session["user_id"] = os.urandom(16).hex() | |
session["uploads"] = [] | |
session["chat_history"] = [] | |
# Refresh agent memory | |
refresh_memory() | |
return {"message": "Chat history and session cleared!"}, 200 | |
if __name__ == "__main__": | |
socketio.run(app, debug=True, port=5000) | |