import numpy as np from PIL import Image, UnidentifiedImageError import gradio as gr from deepface import DeepFace from datasets import load_dataset, Image as HfImage import os import pickle from pathlib import Path import gc import requests from io import BytesIO # 🔑 Token de autenticación HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: raise ValueError("⚠️ Por favor, configura la variable de entorno HF_TOKEN para acceder al dataset privado") # 📁 Directorio para embeddings EMBEDDINGS_DIR = Path("embeddings") EMBEDDINGS_DIR.mkdir(exist_ok=True) EMBEDDINGS_FILE = EMBEDDINGS_DIR / "embeddings.pkl" # ✅ Cargar dataset desde metadata.csv (con URLs absolutas) dataset = load_dataset("csv", data_files="metadata.csv") dataset = dataset["train"].cast_column("image", HfImage()) # 🔄 Preprocesar imagen para Facenet def preprocess_image(img: Image.Image) -> np.ndarray: img_rgb = img.convert("RGB") img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS) return np.array(img_resized) # 📦 Construir base de datos de embeddings def build_database(): if EMBEDDINGS_FILE.exists(): print("📂 Cargando embeddings desde el archivo...") with open(EMBEDDINGS_FILE, 'rb') as f: return pickle.load(f) print("🔄 Calculando embeddings (esto puede tomar unos minutos)...") database = [] batch_size = 10 # Debug: Print dataset structure print("Dataset structure:", dataset.features) print("First item structure:", dataset[0]) for i in range(0, len(dataset), batch_size): batch = dataset[i:i + batch_size] print(f"📦 Procesando lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") for j, item in enumerate(batch): try: # The image is already a PIL Image object img = item["image"] # Ensure image is in RGB mode img = img.convert("RGB") img_processed = preprocess_image(img) embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] database.append((f"image_{i+j}", img, embedding)) print(f"✅ Procesada imagen {i+j+1}/{len(dataset)}") del img_processed gc.collect() except Exception as e: print(f"❌ No se pudo procesar imagen {i+j}: {str(e)}") print(f"Error details: {type(e).__name__}") import traceback print(traceback.format_exc()) continue # 💾 Guardar después de cada batch if database: print("💾 Guardando progreso...") with open(EMBEDDINGS_FILE, 'wb') as f: pickle.dump(database, f) gc.collect() return database # 🔍 Buscar rostros similares def find_similar_faces(uploaded_image: Image.Image): try: img_processed = preprocess_image(uploaded_image) query_embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] del img_processed gc.collect() except Exception as e: print(f"Error al procesar imagen de consulta: {str(e)}") return [], "⚠ No se detectó un rostro válido en la imagen." similarities = [] for name, db_img, embedding in database: dist = np.linalg.norm(np.array(query_embedding) - np.array(embedding)) sim_score = 1 / (1 + dist) similarities.append((sim_score, name, db_img)) similarities.sort(reverse=True) top_matches = similarities[:5] gallery_items = [] text_summary = "" for sim, name, img in top_matches: caption = f"{name} - Similitud: {sim:.2f}" gallery_items.append((img, caption)) text_summary += caption + "\n" return gallery_items, text_summary # ⚙️ Inicializar print("🚀 Iniciando aplicación...") database = build_database() print(f"✅ Base de datos cargada con {len(database)} imágenes") # 🎛️ Interfaz Gradio demo = gr.Interface( fn=find_similar_faces, inputs=gr.Image(label="📤 Sube una imagen", type="pil"), outputs=[ gr.Gallery(label="📸 Rostros más similares"), gr.Textbox(label="🧠 Similitud", lines=6) ], title="🔍 Buscador de Rostros con DeepFace", description="Sube una imagen y se comparará contra los rostros del dataset alojado en Hugging Face (`Segizu/facial-recognition`)." ) demo.launch()