from flask import Flask, render_template, request, jsonify, Response, stream_with_context, abort from google import genai from google.genai import types import os from PIL import Image import io import base64 import json import logging from werkzeug.utils import secure_filename import mimetypes # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__) # Configuration MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16 MB max de taille d'image app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} # Récupération de la clé API depuis les variables d'environnement GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") if not GOOGLE_API_KEY: logger.error("La clé API Gemini n'est pas configurée. Définissez la variable d'environnement GEMINI_API_KEY.") # Initialisation du client Gemini try: client = genai.Client(api_key=GOOGLE_API_KEY) logger.info("Client Gemini initialisé avec succès") except Exception as e: logger.error(f"Erreur lors de l'initialisation du client Gemini: {e}") client = None def allowed_file(filename): """Vérifie si le fichier a une extension autorisée""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def process_image(file_data): """Traite l'image téléchargée et retourne la chaîne base64""" try: img = Image.open(io.BytesIO(file_data)) # Redimensionnement si l'image est trop grande (facultatif) max_size = 1600 # pixels if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) img = img.resize(new_size, Image.LANCZOS) logger.info(f"Image redimensionnée à {new_size}") buffered = io.BytesIO() img.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return img_str except Exception as e: logger.error(f"Erreur lors du traitement de l'image: {e}") raise ValueError(f"Impossible de traiter l'image: {str(e)}") def generate_ai_response(model_id, img_str, prompt, thinking_budget=None): """Fonction générique pour la génération de contenu avec les modèles Gemini""" if client is None: raise ValueError("Le client Gemini n'est pas initialisé") contents = [ {'inline_data': {'mime_type': 'image/png', 'data': img_str}}, prompt ] config_args = {} if thinking_budget: config_args["thinking_config"] = types.ThinkingConfig(thinking_budget=thinking_budget) config = types.GenerateContentConfig(**config_args) try: return client.models.generate_content_stream( model=model_id, contents=contents, config=config ) except Exception as e: logger.error(f"Erreur lors de la génération de contenu avec {model_id}: {e}") raise ValueError(f"Erreur lors de la génération: {str(e)}") def stream_response(response_generator): """Fonction qui gère le streaming des réponses""" mode = 'starting' try: for chunk in response_generator: # Vérification de validité du chunk if not hasattr(chunk, 'candidates') or not chunk.candidates: continue for part in chunk.candidates[0].content.parts: # Gestion du mode de pensée if hasattr(part, 'thought') and part.thought: if mode != "thinking": yield f'data: {json.dumps({"mode": "thinking"})}\n\n' mode = "thinking" # Mode de réponse else: if mode != "answering": yield f'data: {json.dumps({"mode": "answering"})}\n\n' mode = "answering" # Envoi du contenu s'il existe if hasattr(part, 'text') and part.text: yield f'data: {json.dumps({"content": part.text})}\n\n' except Exception as e: logger.error(f"Erreur pendant le streaming: {e}") yield f'data: {json.dumps({"error": str(e)})}\n\n' @app.route('/') def index(): """Page d'accueil principale""" try: return render_template('index.html') except Exception as e: logger.error(f"Erreur lors du rendu de index.html: {e}") return "Une erreur est survenue. Veuillez réessayer plus tard.", 500 @app.route('/free') def maintenance(): """Page de maintenance""" try: return render_template('maj.html') except Exception as e: logger.error(f"Erreur lors du rendu de maj.html: {e}") return "Page en maintenance. Veuillez revenir plus tard.", 503 @app.route('/health') def health_check(): """Endpoint de vérification de santé pour monitoring""" status = { "status": "ok", "gemini_client": client is not None } return jsonify(status) @app.route('/solve', methods=['POST']) def solve(): """Endpoint utilisant le modèle Pro avec capacités de réflexion étendues""" if not client: return jsonify({"error": "Service non disponible - client Gemini non initialisé"}), 503 try: # Vérification de l'image if 'image' not in request.files: return jsonify({"error": "Aucune image n'a été envoyée"}), 400 file = request.files['image'] if file.filename == '': return jsonify({"error": "Aucun fichier sélectionné"}), 400 if not allowed_file(file.filename): return jsonify({"error": "Format de fichier non autorisé"}), 400 # Détection du type de contenu file_data = file.read() content_type = mimetypes.guess_type(file.filename)[0] if not content_type or not content_type.startswith('image/'): return jsonify({"error": "Le fichier envoyé n'est pas une image valide"}), 400 # Traitement de l'image try: img_str = process_image(file_data) except ValueError as e: return jsonify({"error": str(e)}), 400 # Génération de la réponse try: response_generator = generate_ai_response( model_id="gemini-2.5-pro-exp-03-25", img_str=img_str, prompt="Résous ça en français with rendering latex", thinking_budget=8000 ) return Response( stream_with_context(stream_response(response_generator)), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } ) except ValueError as e: return jsonify({"error": str(e)}), 500 except Exception as e: logger.error(f"Erreur dans /solve: {e}") return jsonify({"error": "Une erreur inconnue est survenue"}), 500 @app.route('/solved', methods=['POST']) def solved(): """Endpoint utilisant le modèle Flash pour des réponses plus rapides""" if not client: return jsonify({"error": "Service non disponible - client Gemini non initialisé"}), 503 try: # Vérification de l'image if 'image' not in request.files: return jsonify({"error": "Aucune image n'a été envoyée"}), 400 file = request.files['image'] if file.filename == '': return jsonify({"error": "Aucun fichier sélectionné"}), 400 if not allowed_file(file.filename): return jsonify({"error": "Format de fichier non autorisé"}), 400 # Détection du type de contenu file_data = file.read() content_type = mimetypes.guess_type(file.filename)[0] if not content_type or not content_type.startswith('image/'): return jsonify({"error": "Le fichier envoyé n'est pas une image valide"}), 400 # Traitement de l'image try: img_str = process_image(file_data) except ValueError as e: return jsonify({"error": str(e)}), 400 # Génération de la réponse try: response_generator = generate_ai_response( model_id="gemini-2.5-flash-preview-04-17", img_str=img_str, prompt="Résous ça en français with rendering latex" ) return Response( stream_with_context(stream_response(response_generator)), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } ) except ValueError as e: return jsonify({"error": str(e)}), 500 except Exception as e: logger.error(f"Erreur dans /solved: {e}") return jsonify({"error": "Une erreur inconnue est survenue"}), 500 @app.errorhandler(413) def request_entity_too_large(error): """Gestion de l'erreur de fichier trop volumineux""" return jsonify({"error": f"Le fichier est trop volumineux. Taille maximale: {MAX_CONTENT_LENGTH/1024/1024} MB"}), 413 @app.errorhandler(404) def page_not_found(error): """Gestion de l'erreur 404""" return jsonify({"error": "Page non trouvée"}), 404 @app.errorhandler(500) def internal_server_error(error): """Gestion de l'erreur 500""" logger.error(f"Erreur 500: {error}") return jsonify({"error": "Erreur interne du serveur"}), 500 if __name__ == '__main__': # Vérification des dépendances et configuration avant le démarrage if not client: logger.warning("L'application démarre sans client Gemini initialisé. Certaines fonctionnalités seront indisponibles.") # Configuration pour le développement debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true" port = int(os.environ.get("PORT", 5000)) app.run(debug=debug_mode, host='0.0.0.0', port=port)