|
from flask import Flask, render_template, request, jsonify, Response, stream_with_context |
|
from google import genai |
|
import os |
|
from google.genai import types |
|
from PIL import Image |
|
import io |
|
import base64 |
|
import json |
|
import requests |
|
import threading |
|
import uuid |
|
import time |
|
import tempfile |
|
import subprocess |
|
import shutil |
|
|
|
import re |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") |
|
|
|
TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" |
|
TELEGRAM_CHAT_ID = "-1002564204301" |
|
|
|
|
|
|
|
if GOOGLE_API_KEY: |
|
try: |
|
client = genai.Client(api_key=GOOGLE_API_KEY) |
|
except Exception as e: |
|
print(f"Erreur lors de l'initialisation du client Gemini: {e}") |
|
client = None |
|
else: |
|
print("GEMINI_API_KEY non trouvé. Le client Gemini ne sera pas initialisé.") |
|
client = None |
|
|
|
|
|
def load_prompt_from_file(filename): |
|
"""Charge un prompt depuis un fichier texte""" |
|
try: |
|
|
|
prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') |
|
filepath = os.path.join(prompts_dir, filename) |
|
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
except FileNotFoundError: |
|
print(f"ERREUR: Fichier prompt '{filename}' introuvable dans le dossier prompts/") |
|
return "" |
|
except Exception as e: |
|
print(f"ERREUR lors du chargement du prompt '{filename}': {e}") |
|
return "" |
|
|
|
def get_prompt_for_style(style): |
|
"""Retourne le prompt approprié selon le style""" |
|
if style == 'light': |
|
return load_prompt_from_file('prompt_light.txt') |
|
else: |
|
return load_prompt_from_file('prompt_colorful.txt') |
|
|
|
|
|
task_results = {} |
|
|
|
|
|
def check_latex_installation(): |
|
"""Vérifie si pdflatex est installé sur le système""" |
|
try: |
|
|
|
subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) |
|
print("INFO: pdflatex est installé et accessible.") |
|
return True |
|
except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: |
|
print(f"AVERTISSEMENT: pdflatex n'est pas installé ou ne fonctionne pas correctement: {e}") |
|
print("AVERTISSEMENT: La génération de PDF sera désactivée. Seuls les fichiers .tex seront envoyés.") |
|
return False |
|
|
|
IS_LATEX_INSTALLED = check_latex_installation() |
|
|
|
def clean_latex_code(latex_code): |
|
"""Removes markdown code block fences (```latex ... ``` or ``` ... ```) if present.""" |
|
|
|
match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) |
|
if match_latex: |
|
return match_latex.group(1).strip() |
|
|
|
|
|
match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) |
|
if match_generic: |
|
return match_generic.group(1).strip() |
|
|
|
return latex_code.strip() |
|
|
|
def latex_to_pdf(latex_code, output_filename_base="document"): |
|
""" |
|
Converts LaTeX code to PDF. |
|
Returns: path_to_final_pdf (str) or None if error, message (str) |
|
The returned path_to_final_pdf is a temporary file that the caller is responsible for deleting. |
|
""" |
|
if not IS_LATEX_INSTALLED: |
|
return None, "pdflatex n'est pas disponible sur le système." |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir_compile: |
|
tex_filename = f"{output_filename_base}.tex" |
|
tex_path = os.path.join(temp_dir_compile, tex_filename) |
|
pdf_path_in_compile_dir = os.path.join(temp_dir_compile, f"{output_filename_base}.pdf") |
|
|
|
try: |
|
with open(tex_path, "w", encoding="utf-8") as tex_file: |
|
tex_file.write(latex_code) |
|
|
|
my_env = os.environ.copy() |
|
my_env["LC_ALL"] = "C.UTF-8" |
|
my_env["LANG"] = "C.UTF-8" |
|
|
|
last_result = None |
|
for i in range(2): |
|
|
|
|
|
|
|
|
|
process = subprocess.run( |
|
["pdflatex", "-interaction=nonstopmode", "-output-directory", temp_dir_compile, tex_path], |
|
capture_output=True, |
|
text=True, |
|
check=False, |
|
encoding="utf-8", |
|
errors="replace", |
|
env=my_env, |
|
|
|
) |
|
last_result = process |
|
if not os.path.exists(pdf_path_in_compile_dir) and process.returncode != 0: |
|
|
|
break |
|
|
|
if os.path.exists(pdf_path_in_compile_dir): |
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_pdf_out_file: |
|
shutil.copy(pdf_path_in_compile_dir, temp_pdf_out_file.name) |
|
final_pdf_path = temp_pdf_out_file.name |
|
return final_pdf_path, f"PDF généré avec succès: {os.path.basename(final_pdf_path)}" |
|
else: |
|
error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation." |
|
|
|
|
|
if "LaTeX Error: File `" in error_log: |
|
match = re.search(r"LaTeX Error: File `(.*?)' not found.", error_log) |
|
if match: |
|
return None, f"Erreur de compilation PDF: Fichier LaTeX manquant '{match.group(1)}'. Assurez-vous que tous les packages nécessaires (comme fontawesome5) sont installés." |
|
|
|
if "! Undefined control sequence." in error_log: |
|
return None, "Erreur de compilation PDF: Commande LaTeX non définie. Vérifiez le code LaTeX pour des erreurs de syntaxe." |
|
|
|
if "! LaTeX Error:" in error_log: |
|
match = re.search(r"! LaTeX Error: (.*?)\n", error_log) |
|
if match: |
|
return None, f"Erreur de compilation PDF: {match.group(1).strip()}" |
|
|
|
|
|
log_preview = error_log[-1000:] |
|
print(f"Erreur de compilation PDF complète pour {output_filename_base}:\n{error_log}") |
|
return None, f"Erreur lors de la compilation du PDF. Détails dans les logs du serveur. Aperçu: ...{log_preview}" |
|
|
|
except subprocess.TimeoutExpired: |
|
print(f"Timeout lors de la compilation de {output_filename_base}.tex") |
|
return None, "Timeout lors de la compilation du PDF. Le document est peut-être trop complexe ou contient une boucle infinie." |
|
except Exception as e: |
|
print(f"Exception inattendue lors de la génération du PDF ({output_filename_base}): {e}") |
|
return None, f"Exception inattendue lors de la génération du PDF: {str(e)}" |
|
|
|
|
|
|
|
def send_to_telegram(image_data, caption="Nouvelle image uploadée"): |
|
"""Envoie l'image à un chat Telegram spécifié""" |
|
try: |
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" |
|
files = {'photo': ('image.png', image_data)} |
|
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} |
|
response = requests.post(url, files=files, data=data, timeout=30) |
|
|
|
if response.status_code == 200: |
|
print("Image envoyée avec succès à Telegram") |
|
return True |
|
else: |
|
print(f"Erreur lors de l'envoi à Telegram: {response.status_code} - {response.text}") |
|
return False |
|
except requests.exceptions.RequestException as e: |
|
print(f"Exception Request lors de l'envoi à Telegram: {e}") |
|
return False |
|
except Exception as e: |
|
print(f"Exception générale lors de l'envoi à Telegram: {e}") |
|
return False |
|
|
|
def send_document_to_telegram(content_or_path, filename="reponse.txt", caption="Réponse", is_pdf=False): |
|
"""Envoie un fichier (texte ou PDF) à un chat Telegram spécifié""" |
|
try: |
|
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" |
|
files = None |
|
|
|
if is_pdf: |
|
with open(content_or_path, 'rb') as f_pdf: |
|
files = {'document': (filename, f_pdf.read(), 'application/pdf')} |
|
else: |
|
files = {'document': (filename, content_or_path.encode('utf-8'), 'text/plain')} |
|
|
|
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} |
|
response = requests.post(url, files=files, data=data, timeout=60) |
|
|
|
if response.status_code == 200: |
|
print(f"Document '{filename}' envoyé avec succès à Telegram.") |
|
return True |
|
else: |
|
print(f"Erreur lors de l'envoi du document '{filename}' à Telegram: {response.status_code} - {response.text}") |
|
return False |
|
except requests.exceptions.RequestException as e: |
|
print(f"Exception Request lors de l'envoi du document '{filename}' à Telegram: {e}") |
|
return False |
|
except Exception as e: |
|
print(f"Exception générale lors de l'envoi du document '{filename}' à Telegram: {e}") |
|
return False |
|
|
|
|
|
def process_image_background(task_id, image_data, resolution_style='colorful'): |
|
"""Traite l'image, génère LaTeX, convertit en PDF (si possible), et envoie via Telegram.""" |
|
pdf_file_to_clean = None |
|
try: |
|
task_results[task_id]['status'] = 'processing' |
|
|
|
if not client: |
|
raise ConnectionError("Client Gemini non initialisé. Vérifiez la clé API et la configuration.") |
|
|
|
img = Image.open(io.BytesIO(image_data)) |
|
buffered = io.BytesIO() |
|
img.save(buffered, format="PNG") |
|
img_base64_str = base64.b64encode(buffered.getvalue()).decode() |
|
|
|
full_latex_response = "" |
|
|
|
try: |
|
task_results[task_id]['status'] = 'generating_latex' |
|
print(f"Task {task_id}: Génération LaTeX par Gemini (style: {resolution_style})...") |
|
|
|
|
|
prompt_to_use = get_prompt_for_style(resolution_style) |
|
if not prompt_to_use: |
|
raise ValueError(f"Impossible de charger le prompt pour le style '{resolution_style}'") |
|
|
|
gemini_response = client.models.generate_content( |
|
model="gemini-2.5-flash-preview-04-17", |
|
contents=[ |
|
{'inline_data': {'mime_type': 'image/png', 'data': img_base64_str}}, |
|
prompt_to_use |
|
],) |
|
|
|
|
|
if gemini_response.candidates: |
|
candidate = gemini_response.candidates[0] |
|
if candidate.content and candidate.content.parts: |
|
for part in candidate.content.parts: |
|
if hasattr(part, 'text') and part.text: |
|
full_latex_response += part.text |
|
elif hasattr(candidate, 'text') and candidate.text: |
|
full_latex_response = candidate.text |
|
elif hasattr(gemini_response, 'text') and gemini_response.text: |
|
full_latex_response = gemini_response.text |
|
|
|
if not full_latex_response.strip(): |
|
raise ValueError("Gemini a retourné une réponse vide ou sans contenu textuel.") |
|
|
|
print(f"Task {task_id}: LaTeX brut reçu de Gemini (longueur: {len(full_latex_response)}).") |
|
|
|
task_results[task_id]['status'] = 'cleaning_latex' |
|
cleaned_latex = clean_latex_code(full_latex_response) |
|
print(f"Task {task_id}: LaTeX nettoyé (longueur: {len(cleaned_latex)}).") |
|
|
|
if not IS_LATEX_INSTALLED: |
|
print(f"Task {task_id}: pdflatex non disponible. Envoi du .tex uniquement.") |
|
send_document_to_telegram( |
|
cleaned_latex, |
|
filename=f"solution_{task_id}.tex", |
|
caption=f"Code LaTeX pour tâche {task_id} (pdflatex non disponible)" |
|
) |
|
task_results[task_id]['status'] = 'completed_tex_only' |
|
task_results[task_id]['response'] = cleaned_latex |
|
return |
|
|
|
task_results[task_id]['status'] = 'generating_pdf' |
|
print(f"Task {task_id}: Génération du PDF...") |
|
pdf_filename_base = f"solution_{task_id}" |
|
pdf_file_to_clean, pdf_message = latex_to_pdf(cleaned_latex, output_filename_base=pdf_filename_base) |
|
|
|
if pdf_file_to_clean: |
|
print(f"Task {task_id}: PDF généré: {pdf_file_to_clean}. Envoi à Telegram...") |
|
send_document_to_telegram( |
|
pdf_file_to_clean, |
|
filename=f"{pdf_filename_base}.pdf", |
|
caption=f"Solution PDF pour tâche {task_id}", |
|
is_pdf=True |
|
) |
|
task_results[task_id]['status'] = 'completed' |
|
task_results[task_id]['response'] = cleaned_latex |
|
else: |
|
print(f"Task {task_id}: Échec de la génération PDF: {pdf_message}. Envoi du .tex en fallback.") |
|
task_results[task_id]['status'] = 'pdf_error' |
|
task_results[task_id]['error_detail'] = f"Erreur PDF: {pdf_message}" |
|
send_document_to_telegram( |
|
cleaned_latex, |
|
filename=f"solution_{task_id}.tex", |
|
caption=f"Code LaTeX pour tâche {task_id} (Erreur PDF: {pdf_message[:150]})" |
|
) |
|
task_results[task_id]['response'] = cleaned_latex |
|
|
|
except Exception as e_gen: |
|
print(f"Task {task_id}: Erreur lors de la génération Gemini ou traitement PDF: {e_gen}") |
|
task_results[task_id]['status'] = 'error' |
|
task_results[task_id]['error'] = f"Erreur de traitement: {str(e_gen)}" |
|
|
|
send_document_to_telegram( |
|
f"Erreur lors du traitement de la tâche {task_id}: {str(e_gen)}", |
|
filename=f"error_{task_id}.txt", |
|
caption=f"Erreur tâche {task_id}" |
|
) |
|
task_results[task_id]['response'] = f"Erreur: {str(e_gen)}" |
|
|
|
|
|
except Exception as e_outer: |
|
print(f"Task {task_id}: Exception majeure dans la tâche de fond: {e_outer}") |
|
task_results[task_id]['status'] = 'error' |
|
task_results[task_id]['error'] = f"Erreur système: {str(e_outer)}" |
|
task_results[task_id]['response'] = f"Erreur système: {str(e_outer)}" |
|
finally: |
|
if pdf_file_to_clean and os.path.exists(pdf_file_to_clean): |
|
try: |
|
os.remove(pdf_file_to_clean) |
|
print(f"Task {task_id}: Fichier PDF temporaire '{pdf_file_to_clean}' supprimé.") |
|
except Exception as e_clean: |
|
print(f"Task {task_id}: Erreur lors de la suppression du PDF temporaire '{pdf_file_to_clean}': {e_clean}") |
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/free') |
|
def free(): |
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/solve', methods=['POST']) |
|
def solve(): |
|
try: |
|
if 'image' not in request.files: |
|
return jsonify({'error': 'Aucun fichier image fourni'}), 400 |
|
|
|
image_file = request.files['image'] |
|
if image_file.filename == '': |
|
return jsonify({'error': 'Aucun fichier sélectionné'}), 400 |
|
|
|
|
|
resolution_style = request.form.get('style', 'colorful') |
|
print(f"Style de résolution sélectionné: {resolution_style}") |
|
|
|
image_data = image_file.read() |
|
|
|
|
|
send_to_telegram(image_data, f"Nouvelle image pour résolution (Style: {resolution_style})") |
|
|
|
task_id = str(uuid.uuid4()) |
|
task_results[task_id] = { |
|
'status': 'pending', |
|
'response': '', |
|
'error': None, |
|
'time_started': time.time(), |
|
'style': resolution_style |
|
} |
|
|
|
threading.Thread( |
|
target=process_image_background, |
|
args=(task_id, image_data, resolution_style) |
|
).start() |
|
|
|
return jsonify({ |
|
'task_id': task_id, |
|
'status': 'pending', |
|
'style': resolution_style |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Exception lors de la création de la tâche: {e}") |
|
return jsonify({'error': f'Une erreur serveur est survenue: {str(e)}'}), 500 |
|
|
|
|
|
|
|
@app.route('/task/<task_id>', methods=['GET']) |
|
def get_task_status(task_id): |
|
if task_id not in task_results: |
|
return jsonify({'error': 'Tâche introuvable'}), 404 |
|
|
|
task = task_results[task_id] |
|
|
|
|
|
current_time = time.time() |
|
if task['status'] in ['completed', 'error', 'pdf_error', 'completed_tex_only'] and \ |
|
(current_time - task.get('time_started', 0) > 3600): |
|
|
|
|
|
pass |
|
|
|
response_data = { |
|
'status': task['status'], |
|
'response': task.get('response'), |
|
'error': task.get('error') |
|
} |
|
if task.get('error_detail'): |
|
response_data['error_detail'] = task.get('error_detail') |
|
|
|
return jsonify(response_data) |
|
|
|
@app.route('/stream/<task_id>', methods=['GET']) |
|
def stream_task_progress(task_id): |
|
def generate(): |
|
if task_id not in task_results: |
|
yield f'data: {json.dumps({"error": "Tâche introuvable", "status": "error"})}\n\n' |
|
return |
|
|
|
last_status_sent = None |
|
while True: |
|
task = task_results.get(task_id) |
|
if not task: |
|
yield f'data: {json.dumps({"error": "Tâche disparue ou nettoyée", "status": "error"})}\n\n' |
|
break |
|
|
|
current_status = task['status'] |
|
|
|
if current_status != last_status_sent: |
|
data_to_send = {"status": current_status} |
|
if current_status == 'completed' or current_status == 'completed_tex_only': |
|
data_to_send["response"] = task.get("response", "") |
|
elif current_status == 'error' or current_status == 'pdf_error': |
|
data_to_send["error"] = task.get("error", "Erreur inconnue") |
|
if task.get("error_detail"): |
|
data_to_send["error_detail"] = task.get("error_detail") |
|
if task.get("response"): |
|
data_to_send["response"] = task.get("response") |
|
|
|
yield f'data: {json.dumps(data_to_send)}\n\n' |
|
last_status_sent = current_status |
|
|
|
if current_status in ['completed', 'error', 'pdf_error', 'completed_tex_only']: |
|
break |
|
|
|
time.sleep(1) |
|
|
|
return Response( |
|
stream_with_context(generate()), |
|
mimetype='text/event-stream', |
|
headers={ |
|
'Cache-Control': 'no-cache', |
|
'X-Accel-Buffering': 'no', |
|
'Connection': 'keep-alive' |
|
} |
|
) |
|
|
|
if __name__ == '__main__': |
|
if not GOOGLE_API_KEY: |
|
print("CRITICAL: GEMINI_API_KEY variable d'environnement non définie. L'application risque de ne pas fonctionner.") |
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: |
|
print("CRITICAL: TELEGRAM_BOT_TOKEN ou TELEGRAM_CHAT_ID non définis. L'intégration Telegram échouera.") |
|
|
|
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |