Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify, redirect, url_for | |
from huggingface_hub import InferenceClient | |
import os | |
import json | |
import pandas as pd | |
import PyPDF2 | |
import docx | |
from werkzeug.utils import secure_filename | |
app = Flask(__name__) | |
app.config["UPLOAD_FOLDER"] = "uploads" | |
app.config["HISTORY_FILE"] = "history.json" | |
# Initialize Hugging Face API client | |
API_KEY = "APIHUGGING" # Replace with your key | |
client = InferenceClient(api_key=API_KEY) | |
# Allowed file extensions | |
ALLOWED_EXTENSIONS = {"txt", "csv", "json", "pdf", "docx"} | |
# Utility: Check allowed file types | |
def allowed_file(filename): | |
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS | |
# Utility: Load conversation history | |
def load_history(): | |
try: | |
with open(app.config["HISTORY_FILE"], "r") as file: | |
return json.load(file) | |
except FileNotFoundError: | |
return [] | |
# Utility: Save conversation history | |
def save_history(history): | |
with open(app.config["HISTORY_FILE"], "w") as file: | |
json.dump(history, file, indent=4) | |
# Utility: Extract text from files | |
def extract_text(file_path, file_type): | |
if file_type == "txt": | |
with open(file_path, "r") as f: | |
return f.read() | |
elif file_type == "csv": | |
df = pd.read_csv(file_path) | |
return df.to_string() | |
elif file_type == "json": | |
with open(file_path, "r") as f: | |
data = json.load(f) | |
return json.dumps(data, indent=4) | |
elif file_type == "pdf": | |
text = "" | |
with open(file_path, "rb") as f: | |
reader = PyPDF2.PdfReader(f) | |
for page in reader.pages: | |
text += page.extract_text() | |
return text | |
elif file_type == "docx": | |
doc = docx.Document(file_path) | |
return "\n".join([p.text for p in doc.paragraphs]) | |
else: | |
return "" | |
# Hugging Face Chat Response | |
def get_bot_response(messages): | |
stream = client.chat.completions.create( | |
model="Qwen/Qwen2.5-Coder-32B-Instruct", | |
messages=messages, | |
max_tokens=500, | |
stream=True | |
) | |
bot_response = "" | |
for chunk in stream: | |
if chunk.choices and len(chunk.choices) > 0: | |
new_content = chunk.choices[0].delta.content | |
bot_response += new_content | |
return bot_response | |
def home(): | |
history = load_history() | |
return render_template("home.html", history=history) | |
def upload_file(): | |
if "file" not in request.files: | |
return redirect(request.url) | |
file = request.files["file"] | |
if file and allowed_file(file.filename): | |
filename = secure_filename(file.filename) | |
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) | |
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) | |
file.save(file_path) | |
# Extract text from file | |
file_type = filename.rsplit(".", 1)[1].lower() | |
extracted_text = extract_text(file_path, file_type) | |
# Update conversation history | |
history = load_history() | |
history.append({"role": "user", "content": f"File content:\n{extracted_text}"}) | |
# Get response from Hugging Face API | |
bot_response = get_bot_response(history) | |
history.append({"role": "assistant", "content": bot_response}) | |
save_history(history) | |
return jsonify({"response": bot_response}) | |
else: | |
return jsonify({"error": "Invalid file type"}), 400 | |
def generate_response(): | |
data = request.json | |
user_message = data.get("message") | |
if not user_message: | |
return jsonify({"error": "Message is required"}), 400 | |
# Update conversation history | |
history = load_history() | |
history.append({"role": "user", "content": user_message}) | |
# Get response from Hugging Face API | |
bot_response = get_bot_response(history) | |
history.append({"role": "assistant", "content": bot_response}) | |
save_history(history) | |
return jsonify({"response": bot_response}) | |
if __name__ == "__main__": | |
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) | |
app.run(debug=True) | |