|
from flask import Flask, render_template, request, jsonify, Response, stream_with_context |
|
from google import genai |
|
import os |
|
from google.genai import types |
|
from PIL import Image |
|
import io |
|
import base64 |
|
import json |
|
import requests |
|
import threading |
|
import uuid |
|
import time |
|
import tempfile |
|
import subprocess |
|
import shutil |
|
import re |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") |
|
|
|
TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" |
|
TELEGRAM_CHAT_ID = "-1002564204301" |
|
|
|
|
|
if GOOGLE_API_KEY: |
|
try: |
|
client = genai.Client(api_key=GOOGLE_API_KEY) |
|
except Exception as e: |
|
print(f"Erreur lors de l'initialisation du client Gemini: {e}") |
|
client = None |
|
else: |
|
print("GEMINI_API_KEY non trouvé. Le client Gemini ne sera pas initialisé.") |
|
client = None |
|
|
|
|
|
def load_prompt_from_file(filename): |
|
"""Charge un prompt depuis un fichier texte""" |
|
try: |
|
|
|
prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') |
|
filepath = os.path.join(prompts_dir, filename) |
|
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
except FileNotFoundError: |
|
print(f"ERREUR: Fichier prompt '{filename}' introuvable dans le dossier prompts/") |
|
return "" |
|
except Exception as e: |
|
print(f"ERREUR lors du chargement du prompt '{filename}': {e}") |
|
return "" |
|
|
|
def get_prompt_for_style(style): |
|
"""Retourne le prompt approprié selon le style""" |
|
if style == 'light': |
|
return load_prompt_from_file('prompt_light.txt') |
|
else: |
|
return load_prompt_from_file('prompt_colorful.txt') |
|
|
|
|
|
task_results = {} |
|
|
|
|
|
def check_latex_installation(): |
|
"""Vérifie si pdflatex est installé sur le système""" |
|
try: |
|
subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) |
|
print("INFO: pdflatex est installé et accessible.") |
|
return True |
|
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: |
|
print(f"AVERTISSEMENT: pdflatex n'est pas installé ou ne fonctionne pas correctement: {e}") |
|
print("AVERTISSEMENT: La génération de PDF sera désactivée. Seuls les fichiers .tex seront envoyés.") |
|
return False |
|
|
|
IS_LATEX_INSTALLED = check_latex_installation() |
|
|
|
def clean_latex_code(latex_code): |
|
"""Removes markdown code block fences (```latex ... ``` or ``` ... ```) if present.""" |
|
|
|
match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) |
|
if match_latex: |
|
return match_latex.group(1).strip() |
|
|
|
|
|
match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) |
|
if match_generic: |
|
return match_generic.group(1).strip() |
|
|
|
return latex_code.strip() |
|
|
|
def latex_to_pdf(latex_code, output_filename_base="document"): |
|
""" |
|
Converts LaTeX code to PDF. |
|
Returns: path_to_final_pdf (str) or None if error, message (str) |
|
The returned path_to_final_pdf is a temporary file that the caller is responsible for deleting. |
|
""" |
|
if not IS_LATEX_INSTALLED: |
|
return None, "pdflatex n'est pas disponible sur le système." |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir_compile: |
|
tex_filename = f"{output_filename_base}.tex" |
|
tex_path = os.path.join(temp_dir_compile, tex_filename) |
|
pdf_path_in_compile_dir = os.path.join(temp_dir_compile, f"{output_filename_base}.pdf") |
|
|
|
try: |
|
with open(tex_path, "w", encoding="utf-8") as tex_file: |
|
tex_file.write(latex_code) |
|
|
|
my_env = os.environ.copy() |
|
my_env["LC_ALL"] = "C.UTF-8" |
|
my_env["LANG"] = "C.UTF-8" |
|
|
|
last_result = None |
|
for i in range(2): |
|
process = subprocess.run( |
|
["pdflatex", "-interaction=nonstopmode", "-output-directory", temp_dir_compile, tex_path], |
|
capture_output=True, |
|
text=True, |
|
check=False, |
|
encoding="utf-8", |
|
errors="replace", |
|
env=my_env, |
|
) |
|
last_result = process |
|
if not os.path.exists(pdf_path_in_compile_dir) and process.returncode != 0: |
|
break |
|
|
|
if os.path.exists(pdf_path_in_compile_dir): |
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf_out_file: |
|
shutil.copy(pdf_path_in_compile_dir, temp_pdf_out_file.name) |
|
final_pdf_path = temp_pdf_out_file.name |
|
return final_pdf_path, f"PDF généré avec succès: {os.path.basename(final_pdf_path)}" |
|
else: |
|
error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation." |
|
|
|
if "LaTeX Error: File `" in error_log: |
|
match = re.search(r"LaTeX Error: File `(.*?)' not found.", error_log) |
|
if match: |
|
return None, f"Erreur de compilation PDF: Fichier LaTeX manquant '{match.group(1)}'. Assurez-vous que tous les packages nécessaires (comme fontawesome5) sont installés." |
|
|
|
if "! Undefined control sequence." in error_log: |
|
return None, "Erreur de compilation PDF: Commande LaTeX non définie. Vérifiez le code LaTeX pour des erreurs de syntaxe." |
|
|
|
if "! LaTeX Error:" in error_log: |
|
match = re.search(r"! LaTeX Error: (.*?)\n", error_log) |
|
if match: |
|
return None, f"Erreur de compilation PDF: {match.group(1).strip()}" |
|
|
|
log_preview = error_log[-1000:] |
|
print(f"Erreur de compilation PDF complète pour {output_filename_base}:\n{error_log}") |
|
return None, f"Erreur lors de la compilation du PDF. Détails dans les logs du serveur. Aperçu: ...{log_preview}" |
|
|
|
except subprocess.TimeoutExpired: |
|
print(f"Timeout lors de la compilation de {output_filename_base}.tex") |
|
return None, "Timeout lors de la compilation du PDF. Le document est peut-être trop complexe ou contient une boucle infinie." |
|
except Exception as e: |
|
print(f"Exception inattendue lors de la génération du PDF ({output_filename_base}): {e}") |
|
return None, f"Exception inattendue lors de la génération du PDF: {str(e)}" |
|
|
|
|
|
def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): |
|
"""Envoie un fichier (image ou PDF) à un chat Telegram spécifié""" |
|
try: |
|
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): |
|
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" |
|
files = {'photo': (filename, file_data)} |
|
else: |
|
|
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" |
|
files = {'document': (filename, file_data)} |
|
|
|
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} |
|
response = requests.post(url, files=files, data=data, timeout=30) |
|
|
|
if response.status_code == 200: |
|
print(f"Fichier '{filename}' envoyé avec succès à Telegram") |
|
return True |
|
else: |
|
print(f"Erreur lors de l'envoi de '{filename}' à Telegram: {response.status_code} - {response.text}") |
|
return False |
|
except requests.exceptions.RequestException as e: |
|
print(f"Exception Request lors de l'envoi de '{filename}' à Telegram: {e}") |
|
return False |
|
except Exception as e: |
|
print(f"Exception générale lors de l'envoi de '{filename}' à Telegram: {e}") |
|
return False |
|
|
|
def send_document_to_telegram(content_or_path, filename="reponse.txt", caption="Réponse", is_pdf=False): |
|
"""Envoie un fichier (texte ou PDF) à un chat Telegram spécifié""" |
|
try: |
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" |
|
files = None |
|
|
|
if is_pdf: |
|
with open(content_or_path, 'rb') as f_pdf: |
|
files = {'document': (filename, f_pdf.read(), 'application/pdf')} |
|
else: |
|
files = {'document': (filename, content_or_path.encode('utf-8'), 'text/plain')} |
|
|
|
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} |
|
response = requests.post(url, files=files, data=data, timeout=60) |
|
|
|
if response.status_code == 200: |
|
print(f"Document '{filename}' envoyé avec succès à Telegram.") |
|
return True |
|
else: |
|
print(f"Erreur lors de l'envoi du document '{filename}' à Telegram: {response.status_code} - {response.text}") |
|
return False |
|
except requests.exceptions.RequestException as e: |
|
print(f"Exception Request lors de l'envoi du document '{filename}' à Telegram: ") |
|
return False |
|
except Exception as e: |
|
print(f"Exception générale lors de l'envoi du document '{filename}' à Telegram:") |
|
return False |
|
|
|
|
|
def process_files_background(task_id, files_data, resolution_style='colorful'): |
|
"""Traite les fichiers (images + PDF), génère LaTeX, convertit en PDF, et envoie via Telegram.""" |
|
pdf_file_to_clean = None |
|
uploaded_file_refs = [] |
|
|
|
try: |
|
task_results[task_id]['status'] = 'processing' |
|
|
|
if not client: |
|
raise ConnectionError("Client Gemini non initialisé. Vérifiez la clé API et la configuration.") |
|
|
|
|
|
contents = [] |
|
|
|
|
|
for file_info in files_data: |
|
filename = file_info['filename'] |
|
file_data = file_info['data'] |
|
file_type = file_info['type'] |
|
|
|
if file_type.startswith('image/'): |
|
|
|
img = Image.open(io.BytesIO(file_data)) |
|
buffered = io.BytesIO() |
|
img.save(buffered, format="PNG") |
|
img_base64_str = base64.b64encode(buffered.getvalue()).decode() |
|
|
|
contents.append({ |
|
'inline_data': { |
|
'mime_type': 'image/png', |
|
'data': img_base64_str |
|
} |
|
}) |
|
|
|
elif file_type == 'application/pdf': |
|
|
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf: |
|
temp_pdf.write(file_data) |
|
temp_pdf_path = temp_pdf.name |
|
|
|
|
|
file_ref = client.files.upload(file=temp_pdf_path) |
|
uploaded_file_refs.append(file_ref) |
|
contents.append(file_ref) |
|
|
|
|
|
os.unlink(temp_pdf_path) |
|
|
|
print(f"Task {task_id}: PDF '{filename}' uploadé vers Gemini avec succès") |
|
|
|
except Exception as e: |
|
print(f"Task {task_id}: Erreur lors de l'upload du PDF '{filename}': {e}") |
|
raise ValueError(f"Impossible d'uploader le PDF '{filename}': {str(e)}") |
|
|
|
if not contents: |
|
raise ValueError("Aucun contenu valide trouvé dans les fichiers uploadés") |
|
|
|
|
|
prompt_to_use = get_prompt_for_style(resolution_style) |
|
if not prompt_to_use: |
|
raise ValueError(f"Impossible de charger le prompt pour le style '{resolution_style}'") |
|
|
|
contents.append(prompt_to_use) |
|
|
|
full_latex_response = "" |
|
|
|
try: |
|
task_results[task_id]['status'] = 'generating_latex' |
|
print(f"Task {task_id}: Génération LaTeX par Gemini (style: {resolution_style})...") |
|
print(f"Task {task_id}: Nombre d'éléments de contenu: {len(contents)}") |
|
|
|
gemini_response = client.models.generate_content( |
|
model="gemini-2.5-flash-preview-05-20", |
|
contents=contents |
|
) |
|
|
|
if gemini_response.candidates: |
|
candidate = gemini_response.candidates[0] |
|
if candidate.content and candidate.content.parts: |
|
for part in candidate.content.parts: |
|
if hasattr(part, 'text') and part.text: |
|
full_latex_response += part.text |
|
elif hasattr(candidate, 'text') and candidate.text: |
|
full_latex_response = candidate.text |
|
elif hasattr(gemini_response, 'text') and gemini_response.text: |
|
full_latex_response = gemini_response.text |
|
|
|
if not full_latex_response.strip(): |
|
raise ValueError(" a retourné une réponse vide ou sans contenu textuel.") |
|
|
|
print(f"Task {task_id}: LaTeX brut reçu de Gemini (longueur: {len(full_latex_response)}).") |
|
|
|
task_results[task_id]['status'] = 'cleaning_latex' |
|
cleaned_latex = clean_latex_code(full_latex_response) |
|
print(f"Task {task_id}: LaTeX nettoyé (longueur: {len(cleaned_latex)}).") |
|
|
|
if not IS_LATEX_INSTALLED: |
|
print(f"Task {task_id}: pdflatex non disponible. Envoi du .tex uniquement.") |
|
send_document_to_telegram( |
|
cleaned_latex, |
|
filename=f"solution_{task_id}.tex", |
|
caption=f"Code LaTeX pour tâche {task_id} (pdflatex non disponible)" |
|
) |
|
task_results[task_id]['status'] = 'completed_tex_only' |
|
task_results[task_id]['response'] = cleaned_latex |
|
return |
|
|
|
task_results[task_id]['status'] = 'generating_pdf' |
|
print(f"Task {task_id}: Génération du PDF...") |
|
pdf_filename_base = f"solution_{task_id}" |
|
pdf_file_to_clean, pdf_message = latex_to_pdf(cleaned_latex, output_filename_base=pdf_filename_base) |
|
|
|
if pdf_file_to_clean: |
|
print(f"Task {task_id}: PDF généré: {pdf_file_to_clean}. Envoi à Telegram...") |
|
send_document_to_telegram( |
|
pdf_file_to_clean, |
|
filename=f"{pdf_filename_base}.pdf", |
|
caption=f"Solution PDF pour tâche {task_id}", |
|
is_pdf=True |
|
) |
|
task_results[task_id]['status'] = 'completed' |
|
task_results[task_id]['response'] = cleaned_latex |
|
else: |
|
print(f"Task {task_id}: Échec de la génération PDF: {pdf_message}. Envoi du .tex en fallback.") |
|
task_results[task_id]['status'] = 'pdf_error' |
|
task_results[task_id]['error_detail'] = f"Erreur PDF: {pdf_message}" |
|
send_document_to_telegram( |
|
cleaned_latex, |
|
filename=f"solution_{task_id}.tex", |
|
caption=f"Code LaTeX pour tâche {task_id} (Erreur PDF: {pdf_message[:150]})" |
|
) |
|
task_results[task_id]['response'] = cleaned_latex |
|
|
|
except Exception as e_gen: |
|
print(f"Task {task_id}: Erreur lors de la génération Gemini ou traitement PDF: {e_gen}") |
|
task_results[task_id]['status'] = 'error' |
|
task_results[task_id]['error'] = f"Erreur de traitement: " |
|
send_document_to_telegram( |
|
f"Erreur lors du traitement de la tâche {task_id}: ", |
|
filename=f"error_{task_id}.txt", |
|
caption=f"Erreur tâche {task_id}" |
|
) |
|
task_results[task_id]['response'] = f"Erreur: {str(e_gen)}" |
|
|
|
except Exception as e_outer: |
|
print(f"Task {task_id}: Exception majeure dans la tâche de fond: {e_outer}") |
|
task_results[task_id]['status'] = 'error' |
|
task_results[task_id]['error'] = f"Erreur système: {str(e_outer)}" |
|
task_results[task_id]['response'] = f"Erreur système: {str(e_outer)}" |
|
finally: |
|
|
|
if pdf_file_to_clean and os.path.exists(pdf_file_to_clean): |
|
try: |
|
os.remove(pdf_file_to_clean) |
|
print(f"Task {task_id}: Fichier PDF temporaire '{pdf_file_to_clean}' supprimé.") |
|
except Exception as e_clean: |
|
print(f"Task {task_id}: Erreur lors de la suppression du PDF temporaire '{pdf_file_to_clean}': {e_clean}") |
|
|
|
|
|
for file_ref in uploaded_file_refs: |
|
try: |
|
|
|
|
|
pass |
|
except Exception as e_del: |
|
print(f"Task {task_id}: Erreur lors de la suppression de la référence de fichier:") |
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/free') |
|
def free(): |
|
return render_template('index.html') |
|
|
|
@app.route('/solve', methods=['POST']) |
|
def solve(): |
|
try: |
|
|
|
if 'user_files' not in request.files: |
|
return jsonify({'error': 'Aucun fichier fourni'}), 400 |
|
|
|
uploaded_files = request.files.getlist('user_files') |
|
if not uploaded_files or all(f.filename == '' for f in uploaded_files): |
|
return jsonify({'error': 'Aucun fichier sélectionné'}), 400 |
|
|
|
|
|
resolution_style = request.form.get('style', 'colorful') |
|
print(f"Style de résolution sélectionné: {resolution_style}") |
|
|
|
|
|
files_data = [] |
|
file_count = {'images': 0, 'pdfs': 0} |
|
|
|
for file in uploaded_files: |
|
if file.filename == '': |
|
continue |
|
|
|
file_data = file.read() |
|
file_type = file.content_type or 'application/octet-stream' |
|
|
|
|
|
if file_type.startswith('image/'): |
|
file_count['images'] += 1 |
|
files_data.append({ |
|
'filename': file.filename, |
|
'data': file_data, |
|
'type': file_type |
|
}) |
|
|
|
|
|
send_to_telegram( |
|
file_data, |
|
file.filename, |
|
f"Image uploadée - Style: {resolution_style}" |
|
) |
|
|
|
elif file_type == 'application/pdf': |
|
file_count['pdfs'] += 1 |
|
if file_count['pdfs'] > 1: |
|
return jsonify({'error': 'Vous ne pouvez uploader qu\'un seul fichier PDF à la fois'}), 400 |
|
|
|
files_data.append({ |
|
'filename': file.filename, |
|
'data': file_data, |
|
'type': file_type |
|
}) |
|
|
|
|
|
send_to_telegram( |
|
file_data, |
|
file.filename, |
|
f"PDF uploadé - Style: {resolution_style}" |
|
) |
|
else: |
|
print(f"Type de fichier ignoré: {file_type} pour {file.filename}") |
|
continue |
|
|
|
if not files_data: |
|
return jsonify({'error': 'Aucun fichier valide trouvé (seules les images et les PDF sont acceptés)'}), 400 |
|
|
|
print(f"Fichiers traités: {file_count['images']} image(s), {file_count['pdfs']} PDF(s)") |
|
|
|
task_id = str(uuid.uuid4()) |
|
task_results[task_id] = { |
|
'status': 'pending', |
|
'response': '', |
|
'error': None, |
|
'time_started': time.time(), |
|
'style': resolution_style, |
|
'file_count': file_count |
|
} |
|
|
|
threading.Thread( |
|
target=process_files_background, |
|
args=(task_id, files_data, resolution_style) |
|
).start() |
|
|
|
return jsonify({ |
|
'task_id': task_id, |
|
'status': 'pending', |
|
'style': resolution_style, |
|
'file_count': file_count |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Exception lors de la création de la tâche: {e}") |
|
return jsonify({'error': f'Une erreur serveur est survenue:'}), 500 |
|
|
|
@app.route('/task/<task_id>', methods=['GET']) |
|
def get_task_status(task_id): |
|
if task_id not in task_results: |
|
return jsonify({'error': 'Tâche introuvable'}), 404 |
|
|
|
task = task_results[task_id] |
|
|
|
|
|
current_time = time.time() |
|
if task['status'] in ['completed', 'error', 'pdf_error', 'completed_tex_only'] and \ |
|
(current_time - task.get('time_started', 0) > 3600): |
|
pass |
|
|
|
response_data = { |
|
'status': task['status'], |
|
'response': task.get('response'), |
|
'error': task.get('error') |
|
} |
|
if task.get('error_detail'): |
|
response_data['error_detail'] = task.get('error_detail') |
|
|
|
return jsonify(response_data) |
|
|
|
@app.route('/stream/<task_id>', methods=['GET']) |
|
def stream_task_progress(task_id): |
|
def generate(): |
|
if task_id not in task_results: |
|
yield f'data: {json.dumps({"error": "Tâche introuvable", "status": "error"})}\n\n' |
|
return |
|
|
|
last_status_sent = None |
|
while True: |
|
task = task_results.get(task_id) |
|
if not task: |
|
yield f'data: {json.dumps({"error": "Tâche disparue ou nettoyée", "status": "error"})}\n\n' |
|
break |
|
|
|
current_status = task['status'] |
|
|
|
if current_status != last_status_sent: |
|
data_to_send = {"status": current_status} |
|
if current_status == 'completed' or current_status == 'completed_tex_only': |
|
data_to_send["response"] = task.get("response", "") |
|
elif current_status == 'error' or current_status == 'pdf_error': |
|
data_to_send["error"] = task.get("error", "Erreur inconnue") |
|
if task.get("error_detail"): |
|
data_to_send["error_detail"] = task.get("error_detail") |
|
if task.get("response"): |
|
data_to_send["response"] = task.get("response") |
|
|
|
yield f'data: {json.dumps(data_to_send)}\n\n' |
|
last_status_sent = current_status |
|
|
|
if current_status in ['completed', 'error', 'pdf_error', 'completed_tex_only']: |
|
break |
|
|
|
time.sleep(1) |
|
|
|
return Response( |
|
stream_with_context(generate()), |
|
mimetype='text/event-stream', |
|
headers={ |
|
'Cache-Control': 'no-cache', |
|
'X-Accel-Buffering': 'no', |
|
'Connection': 'keep-alive' |
|
} |
|
) |
|
|
|
if __name__ == '__main__': |
|
if not GOOGLE_API_KEY: |
|
print("CRITICAL: GOOGLE_API_KEY variable d'environnement non définie. L'application risque de ne pas fonctionner.") |
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: |
|
print("CRITICAL: TELEGRAM_BOT_TOKEN ou TELEGRAM_CHAT_ID non définis. L'intégration Telegram échouera.") |
|
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |