DevForML's picture
Update app.py
d8559f0 verified
# app.py
import eventlet
eventlet.monkey_patch()
from flask import Flask, render_template, request, redirect, url_for, flash, session, send_from_directory
from flask_socketio import SocketIO
import traceback
import os
from werkzeug.utils import secure_filename
import json
import logging
import agent # your agent.py module
from agent import refresh_memory
from agent import run_stream
from typing import List, Dict
import markdown2
import re
import time
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Inialized VAR & FS ───────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
CHAT_FOLDER = os.path.join(BASE_DIR, "chats")
os.makedirs(CHAT_FOLDER, exist_ok=True)
try:
os.chmod(CHAT_FOLDER, 0o777)
except Exception:
pass
app = Flask(__name__, template_folder="templates")
# For storing the processed files
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
# For storing the Chat history and other files
app.config["CHAT_FOLDER"] = CHAT_FOLDER
app.secret_key = os.getenv("FLASK_SECRET", "supersecret")
# Use eventlet for async SocketIO
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet")
# Allowed file extensions
ALLOWED_EXTENSIONS = {
".db", ".sqlite", # SQLite databases
".pdf", ".txt", ".doc", ".docx", # Documents
".png", ".jpg", ".jpeg", ".gif" # Images
}
import config
DB_PATH = None # will be set when a .db is uploaded
DOC_PATH = None # will be set when a document is uploaded
IMG_PATH = None # will be set when an image is uploaded
OTH_PATH = None # will be set when an other file is uploaded
# import config
# IMG_PATH = "path/to/user_uploaded_file.jpg"
# DOC_PATH = "path/to/user_uploaded_file.pdf"
# DB_PATH = "path/to/user_uploaded_file.db"
# OTH_PATH = "path/to/user_uploaded_file.txt"
def allowed_file(filename: str) -> bool:
ext = os.path.splitext(filename.lower())[1]
return ext in ALLOWED_EXTENSIONS
def ensure_user_session():
if "user_id" not in session:
session["user_id"] = os.urandom(16).hex()
session["uploads"] = []
session["chat_history"] = []
refresh_memory()
#text cleaning function
def clean_html_chunk(text):
"""
Removes outer <p>...</p> tags and trims extra backticks or 'json' words.
Similar to your 'format:' cleaning logic.
"""
text = text.strip()
# Pattern to match single <p>...</p> wrapping
pattern = r'^<p>(.*?)</p>$'
match = re.match(pattern, text, re.DOTALL)
if match:
text = match.group(1).strip()
# Extra clean-up (optional, like your example)
text = text.strip('`').strip('json').strip()
return text
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── AGENT Defination ────────────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
def run_agent_thread(
prompt: str,
user_id: str,
chat_history: List[Dict], # ← new parameter
):
"""
Launches the agent in a background thread, streaming results back over SocketIO.
`data` is the uploaded file path (image, document, or DB) that will be injected
into config before the agent runs.
"""
global DB_PATH, DOC_PATH, IMG_PATH, OTH_PATH # force re-initializing agent
# Build the list of all paths, skipping None or empty
data_paths = [DB_PATH, DOC_PATH, IMG_PATH, OTH_PATH]
data_paths = [p for p in data_paths if p]
print(f"Data paths----------------->: {data_paths}")
text_accum = ""
try:
# Stream using run_stream (which will also pick up the same globals)
for piece in run_stream(prompt, data_paths):
html_chunk = markdown2.markdown(piece, extras=["fenced-code-blocks", "tables", "strike", "task_list", "break-on-newline"])
print(f"HTML chunk: {html_chunk}") # Debugging output
html_chunk = clean_html_chunk(html_chunk)
text_accum += html_chunk # accumulate HTML
socketio.emit("final_stream", {"message": html_chunk})
print(f"Streaming chunk: {html_chunk}") # Debugging output
except Exception as e:
socketio.emit("error", {"message": f"Streaming error: {e}"})
traceback.print_exc()
return
# Fallback / finalize
try:
if not text_accum:
text_accum = markdown2.markdown(agent.agent.executor.run(prompt), extras=["fenced-code-blocks", "tables", "strike", "task_list", "break-on-newline"])
print(f"Final HTML chunk: {text_accum}") # Debugging output
text_accum = clean_html_chunk(text_accum)
print(f"Final text: {text_accum}") # Debugging output
socketio.emit("stream_complete", {"message": text_accum})
socketio.emit("final", {"message": text_accum})
chat_history.append({"user": prompt, "assistant": text_accum})
output_path = os.path.join(
app.config["CHAT_FOLDER"],
f"user_chat_no_{user_id}.json"
)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(chat_history, f, ensure_ascii=False, indent=4)
except Exception as e:
socketio.emit("error", {"message": f"Final generation error: {e}"})
traceback.print_exc()
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Main Page ────────────────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@app.route("/")
def index():
if "user_id" not in session:
session["user_id"] = os.urandom(16).hex()
session["uploads"] = []
session["chat_history"] = []
refresh_memory()
else:
# Load previous chat history from JSON file if exists
user_id = session["user_id"]
chat_file = os.path.join(app.config.get("CHAT_FOLDER", "chat_history"), f"user_chat_no_{user_id}.json")
if os.path.exists(chat_file):
with open(chat_file, "r", encoding="utf-8") as f:
session["chat_history"] = json.load(f)
return render_template("index.html", chat_history=session.get("chat_history", []))
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Upload section ──────────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@app.route("/upload", methods=["GET", "POST"])
def upload():
ensure_user_session()
global DB_PATH, DOC_PATH, IMG_PATH, OTH_PATH # force re-initializing agent
if request.method == "POST":
f = request.files.get("file")
if not f or f.filename == "":
flash("No file selected", "error")
return render_template("upload.html")
filename = secure_filename(f.filename)
if not allowed_file(filename):
flash("File type not supported", "error")
return render_template("upload.html")
ext = os.path.splitext(filename.lower())[1]
if ext in (".db", ".sqlite"):
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "databases", filename)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
global DB_PATH
DB_PATH = save_path
# DB_PATH = f"http://127.0.0.1:5000/uploads/databases/{filename}"
print(f"Database path: {save_path}")
f.save(save_path)
elif ext in (".pdf", ".txt", ".doc", ".docx"):
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "documents", filename)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
global DOC_PATH
DOC_PATH = save_path
# DOC_PATH = f"http://127.0.0.1:5000/uploads/documents/{filename}"
print(f"Document path: {save_path}")
f.save(save_path)
elif ext in (".png", ".jpg", ".jpeg", ".gif"):
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "images", filename)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
global IMG_PATH
IMG_PATH = save_path
# IMG_PATH = f"http://127.0.0.1:5000/uploads/images/{filename}"
print(f"Image path: {save_path}")
f.save(save_path)
else:
save_path = os.path.join(app.config["UPLOAD_FOLDER"], "others", filename)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
global OTH_PATH
OTH_PATH = save_path
# OTH_PATH = f"http://127.0.0.1:5000/uploads/others/{filename}"
print(f"Other file path: {save_path}")
f.save(save_path)
#f.save(save_path)
# Add the uploaded file to the session
session["uploads"].append(filename)
# β€” Database files β€”
if ext in (".db", ".sqlite"):
DB_PATH = save_path
agent.GLOBAL_DB_PATH = DB_PATH
flash(f"Database uploaded and set: {filename}", "success")
# β€” Documents for RAG indexing β€”
elif ext in (".pdf", ".txt", ".doc", ".docx"):
agent.rag_index_document(save_path)
flash(f"Document indexed for RAG: {filename}", "success")
# β€” Images or other files β€”
else:
flash(f"File uploaded: {filename}", "success")
return redirect(url_for("index"))
# GET
return render_template("upload.html")
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── Static Upload ───────────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@app.route('/uploads/databases/<filename>')
def serve_database(filename):
"""
Serve an database file from the uploads/databases folder.
"""
return send_from_directory(os.path.join(app.root_path, 'uploads', 'databases'), filename)
@app.route('/uploads/images/<filename>')
def serve_image(filename):
"""
Serve an document file from the uploads/images folder.
"""
return send_from_directory(os.path.join(app.root_path, 'uploads', 'images'), filename)
@app.route('/uploads/documents/<filename>')
def serve_document(filename):
"""
Serve an image file from the uploads/documents folder.
"""
return send_from_directory(os.path.join(app.root_path, 'uploads', 'documents'), filename)
@app.route('/uploads/others/<filename>')
def serve_other(filename):
"""
Serve an other file from the uploads/others folder.
"""
return send_from_directory(os.path.join(app.root_path, 'uploads', 'others'), filename)
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── AGENT calling ───────────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@app.route("/generate", methods=["POST"])
def generate():
prompt = request.json.get("prompt", "").strip()
ensure_user_session()
if not prompt:
return "No prompt provided", 400
socketio.start_background_task(
run_agent_thread,
prompt,
session["user_id"],
session["chat_history"]
# ← pass it here
)
return "OK", 200
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── SESSION handling ───────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@app.route("/session-info")
def session_info():
# Endpoint to view session details (for debugging purposes)
return {
"user_id": session.get("user_id"),
"uploads": session.get("uploads", [])
}
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
# ─────────────────────────────────────────────── SESSION clearing ───────────────────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
@app.route("/clear_chat", methods=["POST"])
def clear_chat():
ensure_user_session()
user_id = session.get("user_id")
# Remove saved JSON chat file
if user_id:
chat_file = os.path.join(app.config["CHAT_FOLDER"], f"user_chat_no_{user_id}.json")
if os.path.exists(chat_file):
os.remove(chat_file)
# Reset session
session.clear()
# Generate new session id and chat history
session["user_id"] = os.urandom(16).hex()
session["uploads"] = []
session["chat_history"] = []
# Refresh agent memory
refresh_memory()
return {"message": "Chat history and session cleared!"}, 200
if __name__ == "__main__":
socketio.run(app, debug=True, port=5000)