|
|
|
|
|
import os |
|
import logging |
|
import base64 |
|
import json |
|
import uuid |
|
import google.generativeai as genai |
|
from datetime import datetime |
|
from functools import wraps |
|
from flask import Flask, render_template, request, jsonify, session, redirect |
|
from dotenv import load_dotenv |
|
from werkzeug.utils import secure_filename |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
api_key = os.environ.get("GEMINI_API_KEY") |
|
if not api_key: |
|
logger.warning("GEMINI_API_KEY not found in environment variables") |
|
else: |
|
genai.configure(api_key=api_key) |
|
logger.info("GEMINI_API_KEY found. API configured successfully.") |
|
|
|
|
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.environ.get("SESSION_SECRET", "default-dev-secret-key") |
|
app.config['UPLOAD_FOLDER'] = 'static/uploads' |
|
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 |
|
|
|
|
|
def session_required(f): |
|
@wraps(f) |
|
def decorated_function(*args, **kwargs): |
|
if 'session_id' not in session: |
|
session['session_id'] = str(uuid.uuid4()) |
|
logger.info(f"Created new session: {session['session_id']}") |
|
return f(*args, **kwargs) |
|
return decorated_function |
|
|
|
|
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
|
|
|
model = None |
|
vision_model = None |
|
if api_key: |
|
|
|
model = genai.GenerativeModel( |
|
model_name='gemini-1.0-pro', |
|
generation_config={ |
|
'temperature': 0.7, |
|
'top_p': 0.9, |
|
'top_k': 40, |
|
'max_output_tokens': 2048 |
|
} |
|
) |
|
|
|
|
|
|
|
vision_model = genai.GenerativeModel('gemini-1.5-flash') |
|
else: |
|
logger.error("Cannot initialize Gemini models: API Key is missing.") |
|
|
|
|
|
@app.route('/') |
|
@session_required |
|
def index(): |
|
"""Render the chat interface.""" |
|
if not api_key: |
|
return "Erreur: Clé API Gemini manquante. Veuillez configurer GEMINI_API_KEY.", 500 |
|
return render_template('index.html') |
|
|
|
@app.route('/api/chat', methods=['POST']) |
|
@session_required |
|
def chat(): |
|
"""Process chat messages and get responses from Gemini API.""" |
|
if not api_key: |
|
logger.error("Chat request failed: API Key is missing.") |
|
return jsonify({'error': 'Configuration serveur incomplète (clé API manquante).'}), 500 |
|
if not model or not vision_model: |
|
logger.error("Chat request failed: Models not initialized.") |
|
return jsonify({'error': 'Configuration serveur incomplète (modèles non initialisés).'}), 500 |
|
|
|
try: |
|
data = request.json |
|
user_message = data.get('message', '') |
|
chat_history = data.get('history', []) |
|
image_data = data.get('image', None) |
|
|
|
if not user_message and not image_data: |
|
return jsonify({'error': 'Veuillez entrer un message ou joindre une image.'}), 400 |
|
|
|
|
|
session_id = session.get('session_id') |
|
logger.info(f"Received chat request from session {session_id}. Message length: {len(user_message)}. Image attached: {'Yes' if image_data else 'No'}") |
|
|
|
|
|
if image_data: |
|
if not vision_model: |
|
logger.error("Vision model not available.") |
|
return jsonify({'error': 'Le modèle de vision n\'est pas configuré.'}), 500 |
|
try: |
|
|
|
|
|
image_info, image_base64 = image_data.split(',', 1) |
|
mime_type = image_info.split(':')[1].split(';')[0] |
|
image_bytes = base64.b64decode(image_base64) |
|
|
|
|
|
image_part = { |
|
"mime_type": mime_type, |
|
"data": image_bytes |
|
} |
|
|
|
|
|
session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) |
|
os.makedirs(session_dir, exist_ok=True) |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
extension = mime_type.split('/')[-1] if '/' in mime_type else 'jpg' |
|
filename = secure_filename(f"image_{timestamp}.{extension}") |
|
filepath = os.path.join(session_dir, filename) |
|
with open(filepath, "wb") as f: |
|
f.write(image_bytes) |
|
logger.info(f"Saved uploaded image to {filepath}") |
|
|
|
|
|
|
|
parts = [] |
|
if user_message: |
|
parts.append(user_message) |
|
parts.append(image_part) |
|
|
|
|
|
logger.debug(f"Sending parts to vision model: {[type(p) if not isinstance(p, dict) else 'dict(image)' for p in parts]}") |
|
response = vision_model.generate_content(parts) |
|
logger.info(f"Generated vision response successfully. Response length: {len(response.text)}") |
|
return jsonify({'response': response.text}) |
|
|
|
except (ValueError, IndexError) as decode_error: |
|
logger.error(f"Error decoding image data: {str(decode_error)}") |
|
return jsonify({'error': 'Format de données d\'image invalide.'}), 400 |
|
except Exception as img_error: |
|
|
|
logger.exception(f"Error processing image: {str(img_error)}") |
|
return jsonify({ |
|
'error': 'Désolé, une erreur est survenue lors du traitement de l\'image. Veuillez réessayer.' |
|
}), 500 |
|
else: |
|
|
|
if not model: |
|
logger.error("Text model not available.") |
|
return jsonify({'error': 'Le modèle de texte n\'est pas configuré.'}), 500 |
|
|
|
|
|
formatted_history = [] |
|
for msg in chat_history[-15:]: |
|
role = "user" if msg['sender'] == 'user' else "model" |
|
|
|
if msg.get('text'): |
|
formatted_history.append({"role": role, "parts": [msg['text']]}) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
chat_session = model.start_chat(history=formatted_history) |
|
|
|
|
|
response = chat_session.send_message(user_message) |
|
|
|
|
|
logger.info(f"Generated text response successfully. Response length: {len(response.text)}") |
|
|
|
|
|
return jsonify({'response': response.text}) |
|
|
|
except genai.types.generation_types.BlockedPromptException as be: |
|
logger.warning(f"Content blocked for session {session_id}: {str(be)}") |
|
return jsonify({ |
|
'error': 'Votre message ou la conversation contient du contenu potentiellement inapproprié et ne peut pas être traité.' |
|
}), 400 |
|
except Exception as e: |
|
logger.exception(f"Error during text generation for session {session_id}: {str(e)}") |
|
return jsonify({ |
|
'error': 'Désolé, une erreur est survenue lors de la génération de la réponse texte.' |
|
}), 500 |
|
|
|
except Exception as e: |
|
|
|
logger.exception(f"Unhandled error in chat endpoint: {str(e)}") |
|
return jsonify({ |
|
'error': 'Désolé, j\'ai rencontré une erreur inattendue. Veuillez réessayer.' |
|
}), 500 |
|
|
|
|
|
@app.route('/api/save-chat', methods=['POST']) |
|
@session_required |
|
def save_chat(): |
|
"""Save the current chat history.""" |
|
try: |
|
session_id = session.get('session_id') |
|
if not session_id: |
|
return jsonify({'error': 'Session introuvable.'}), 400 |
|
|
|
|
|
session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) |
|
os.makedirs(session_dir, exist_ok=True) |
|
|
|
data = request.json |
|
chat_history = data.get('history', []) |
|
|
|
if not chat_history: |
|
return jsonify({'error': 'Aucune conversation à sauvegarder.'}), 400 |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"chat_{timestamp}.json" |
|
filepath = os.path.join(session_dir, filename) |
|
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
json.dump(chat_history, f, ensure_ascii=False, indent=2) |
|
|
|
logger.info(f"Chat history saved for session {session_id} to {filename}") |
|
return jsonify({'success': True, 'filename': filename, 'timestamp': timestamp}) |
|
|
|
except Exception as e: |
|
logger.exception(f"Error saving chat for session {session_id}: {str(e)}") |
|
return jsonify({ |
|
'error': 'Désolé, une erreur est survenue lors de la sauvegarde de la conversation.' |
|
}), 500 |
|
|
|
@app.route('/api/load-chats', methods=['GET']) |
|
@session_required |
|
def load_chats(): |
|
"""Get a list of saved chat files for current session.""" |
|
try: |
|
session_id = session.get('session_id') |
|
if not session_id: |
|
return jsonify({'error': 'Session introuvable.'}), 400 |
|
|
|
|
|
session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) |
|
|
|
|
|
if not os.path.exists(session_dir): |
|
logger.info(f"No chat directory found for session {session_id}") |
|
return jsonify({'chats': []}) |
|
|
|
chat_files = [] |
|
for filename in os.listdir(session_dir): |
|
|
|
if filename.startswith('chat_') and filename.endswith('.json'): |
|
try: |
|
|
|
timestamp_str = filename[5:-5] |
|
|
|
datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S") |
|
chat_files.append({ |
|
'filename': filename, |
|
'timestamp': timestamp_str |
|
}) |
|
except ValueError: |
|
logger.warning(f"Skipping file with unexpected format: {filename} in {session_dir}") |
|
|
|
|
|
|
|
chat_files.sort(key=lambda x: x['timestamp'], reverse=True) |
|
|
|
logger.info(f"Loaded {len(chat_files)} chats for session {session_id}") |
|
return jsonify({'chats': chat_files}) |
|
|
|
except Exception as e: |
|
logger.exception(f"Error loading chat list for session {session_id}: {str(e)}") |
|
return jsonify({ |
|
'error': 'Désolé, une erreur est survenue lors du chargement des conversations.' |
|
}), 500 |
|
|
|
@app.route('/api/load-chat/<filename>', methods=['GET']) |
|
@session_required |
|
def load_chat(filename): |
|
"""Load a specific chat history file.""" |
|
try: |
|
session_id = session.get('session_id') |
|
if not session_id: |
|
return jsonify({'error': 'Session introuvable.'}), 400 |
|
|
|
|
|
safe_filename = secure_filename(filename) |
|
if not safe_filename.startswith('chat_') or not safe_filename.endswith('.json'): |
|
logger.warning(f"Attempt to load invalid chat filename: {filename} (secured: {safe_filename}) for session {session_id}") |
|
return jsonify({'error': 'Nom de fichier de conversation invalide.'}), 400 |
|
|
|
|
|
session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) |
|
filepath = os.path.join(session_dir, safe_filename) |
|
|
|
if not os.path.exists(filepath): |
|
logger.warning(f"Chat file not found: {filepath} for session {session_id}") |
|
return jsonify({'error': 'Conversation introuvable.'}), 404 |
|
|
|
|
|
if not os.path.abspath(filepath).startswith(os.path.abspath(session_dir)): |
|
logger.error(f"Attempt to access file outside session directory: {filepath}") |
|
return jsonify({'error': 'Accès non autorisé.'}), 403 |
|
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
chat_history = json.load(f) |
|
|
|
|
|
if not isinstance(chat_history, list): |
|
raise ValueError("Invalid chat history format in file.") |
|
for item in chat_history: |
|
if not isinstance(item, dict) or 'sender' not in item or 'text' not in item: |
|
|
|
if 'sender' not in item: |
|
raise ValueError("Invalid message format in chat history.") |
|
|
|
|
|
logger.info(f"Loaded chat {safe_filename} for session {session_id}") |
|
return jsonify({'history': chat_history}) |
|
|
|
except json.JSONDecodeError: |
|
logger.error(f"Error decoding JSON from chat file: {safe_filename} for session {session_id}") |
|
return jsonify({'error': 'Le fichier de conversation est corrompu.'}), 500 |
|
except ValueError as ve: |
|
logger.error(f"Invalid content in chat file {safe_filename}: {str(ve)}") |
|
return jsonify({'error': f'Format invalide dans le fichier de conversation: {str(ve)}'}), 500 |
|
except Exception as e: |
|
logger.exception(f"Error loading chat file {safe_filename} for session {session_id}: {str(e)}") |
|
return jsonify({ |
|
'error': 'Désolé, une erreur est survenue lors du chargement de la conversation.' |
|
}), 500 |
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)), debug=os.environ.get('FLASK_DEBUG', 'False').lower() == 'true') |
|
|