import numpy as np from PIL import Image, UnidentifiedImageError import gradio as gr from deepface import DeepFace from datasets import load_dataset, Image as HfImage import os import pickle from pathlib import Path import gc # 🔐 Token automático (si es necesario) HF_TOKEN = os.getenv("HF_TOKEN") # 📁 Directorio para embeddings EMBEDDINGS_DIR = Path("embeddings") EMBEDDINGS_DIR.mkdir(exist_ok=True) EMBEDDINGS_FILE = EMBEDDINGS_DIR / "embeddings.pkl" # ✅ Cargar dataset directamente desde Hugging Face Hub dataset = load_dataset( "csv", data_files="metadata.csv", split="train", column_names=["image"] # ✅ Forzar encabezado correcto ) print("✅ Primer item:", dataset[0]) dataset = dataset["train"].cast_column("image", HfImage()) # 🔄 Preprocesar imagen para Facenet def preprocess_image(img: Image.Image) -> np.ndarray: img_rgb = img.convert("RGB") img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS) return np.array(img_resized) # 📦 Construir base de datos de embeddings def build_database(): if EMBEDDINGS_FILE.exists(): print("📂 Cargando embeddings desde el archivo...") with open(EMBEDDINGS_FILE, 'rb') as f: return pickle.load(f) print("🔄 Calculando embeddings (esto puede tomar unos minutos)...") database = [] batch_size = 10 for i in range(0, len(dataset), batch_size): batch = dataset[i:i + batch_size] print(f"📦 Procesando lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") for j, item in enumerate(batch): try: if not isinstance(item, dict) or "image" not in item: print(f"⚠️ Saltando item {i+j} - estructura inválida: {item}") continue img = item["image"] if not isinstance(img, Image.Image): print(f"⚠️ Saltando item {i+j} - no es imagen: {type(img)}") continue img_processed = preprocess_image(img) embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] database.append((f"image_{i+j}", img, embedding)) print(f"✅ Procesada imagen {i+j+1}/{len(dataset)}") del img_processed gc.collect() except Exception as e: print(f"❌ Error al procesar imagen {i+j}: {str(e)}") continue # Guardar después de cada lote if database: print("💾 Guardando progreso...") with open(EMBEDDINGS_FILE, 'wb') as f: pickle.dump(database, f) gc.collect() return database # 🔍 Buscar rostros similares def find_similar_faces(uploaded_image: Image.Image): try: img_processed = preprocess_image(uploaded_image) query_embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] del img_processed gc.collect() except Exception as e: print(f"Error al procesar imagen de consulta: {str(e)}") return [], "⚠ No se detectó un rostro válido en la imagen." similarities = [] for name, db_img, embedding in database: dist = np.linalg.norm(np.array(query_embedding) - np.array(embedding)) sim_score = 1 / (1 + dist) similarities.append((sim_score, name, db_img)) similarities.sort(reverse=True) top_matches = similarities[:5] gallery_items = [] text_summary = "" for sim, name, img in top_matches: caption = f"{name} - Similitud: {sim:.2f}" gallery_items.append((img, caption)) text_summary += caption + "\n" return gallery_items, text_summary # ⚙️ Iniciar la aplicación print("🚀 Iniciando aplicación...") database = build_database() print(f"✅ Base de datos cargada con {len(database)} imágenes") # 🎛️ Interfaz Gradio demo = gr.Interface( fn=find_similar_faces, inputs=gr.Image(label="📤 Sube una imagen", type="pil"), outputs=[ gr.Gallery(label="📸 Rostros más similares"), gr.Textbox(label="🧠 Similitud", lines=6) ], title="🔍 Buscador de Rostros con DeepFace", description="Sube una imagen y se comparará contra los rostros del dataset alojado en Hugging Face (`Segizu/facial-recognition`)." ) demo.launch()