import numpy as np from PIL import Image import gradio as gr from deepface import DeepFace from datasets import load_dataset import os import pickle from io import BytesIO from huggingface_hub import upload_file, hf_hub_download, list_repo_files from pathlib import Path import gc import requests import time import shutil import tarfile # 🔁 Limpiar almacenamiento temporal si existe def clean_temp_dirs(): print("🧹 Limpiando carpetas temporales...") for folder in ["embeddings", "batches"]: path = Path(folder) if path.exists() and path.is_dir(): shutil.rmtree(path) print(f"✅ Carpeta eliminada: {folder}") path.mkdir(exist_ok=True) clean_temp_dirs() # 📁 Parámetros DATASET_ID = "Segizu/facial-recognition" EMBEDDINGS_SUBFOLDER = "embeddings" LOCAL_EMB_DIR = Path("embeddings") LOCAL_EMB_DIR.mkdir(exist_ok=True) HF_TOKEN = os.getenv("HF_TOKEN") headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} # 💾 Configuración de control de almacenamiento MAX_TEMP_STORAGE_GB = 40 UPLOAD_EVERY = 50 embeddings_to_upload = [] def get_folder_size(path): total = 0 for dirpath, _, filenames in os.walk(path): for f in filenames: fp = os.path.join(dirpath, f) total += os.path.getsize(fp) return total / (1024 ** 3) # En GB def flush_embeddings(): global embeddings_to_upload print("🚀 Subiendo lote de embeddings a Hugging Face...") for emb_file in embeddings_to_upload: try: filename = emb_file.name upload_file( path_or_fileobj=str(emb_file), path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{filename}", repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN ) os.remove(emb_file) print(f"✅ Subido y eliminado: {filename}") time.sleep(1.2) # Evita 429 except Exception as e: print(f"❌ Error subiendo {filename}: {e}") continue embeddings_to_upload = [] # ✅ Cargar CSV desde el dataset dataset = load_dataset( "csv", data_files="metadata.csv", split="train", column_names=["image"], header=0 ) print("✅ Validación post-carga") print(dataset[0]) print("Columnas:", dataset.column_names) # 🔄 Preprocesamiento def preprocess_image(img: Image.Image) -> np.ndarray: img_rgb = img.convert("RGB") img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS) return np.array(img_resized) def build_database(): print(f"📊 Uso actual de almacenamiento tempora _ INICIO_: {get_folder_size('.'):.2f} GB") print("🔄 Generando embeddings...") batch_size = 10 archive_batch_size = 50 batch_files = [] batch_index = 0 ARCHIVE_DIR = Path("batches") ARCHIVE_DIR.mkdir(exist_ok=True) for i in range(0, len(dataset), batch_size): batch = dataset[i:i + batch_size] print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}") for j in range(len(batch["image"])): item = {"image": batch["image"][j]} image_url = item["image"] if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image": print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}") continue name = f"image_{i + j}" filename = LOCAL_EMB_DIR / f"{name}.pkl" # Verificar si ya existe en Hugging Face Hub try: hf_hub_download( repo_id=DATASET_ID, repo_type="dataset", filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz", token=HF_TOKEN ) print(f"⏩ Ya existe en remoto: {name}.pkl") continue except: pass try: response = requests.get(image_url, headers=headers, timeout=10) response.raise_for_status() img = Image.open(BytesIO(response.content)).convert("RGB") img_processed = preprocess_image(img) embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] with open(filename, "wb") as f: pickle.dump({"name": name, "img": img, "embedding": embedding}, f) batch_files.append(filename) del img_processed gc.collect() # Si llegamos al tamaño de archivo por lote o espacio es crítico if len(batch_files) >= archive_batch_size or get_folder_size(".") > 40: archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: for file in batch_files: tar.add(file, arcname=file.name) print(f"📦 Empaquetado: {archive_path}") # Subida al Hub upload_file( path_or_fileobj=str(archive_path), path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN ) print(f"✅ Subido: {archive_path.name}") # Borrar .pkl y el .tar.gz local for f in batch_files: f.unlink() archive_path.unlink() print("🧹 Limpieza completada tras subida") batch_files = [] batch_index += 1 time.sleep(2) # Pausa para evitar 429 print(f"📊 Uso actual de almacenamiento tempora _ FINAL_: {get_folder_size('.'):.2f} GB") except Exception as e: print(f"❌ Error en {name}: {e}") continue # Último lote si queda algo if batch_files: archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: for file in batch_files: tar.add(file, arcname=file.name) print(f"📦 Empaquetado final: {archive_path}") upload_file( path_or_fileobj=str(archive_path), path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}", repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN ) for f in batch_files: f.unlink() archive_path.unlink() print("✅ Subida y limpieza final") # 🔍 Buscar similitudes desde archivos remotos def find_similar_faces(uploaded_image: Image.Image): try: img_processed = preprocess_image(uploaded_image) query_embedding = DeepFace.represent( img_path=img_processed, model_name="Facenet", enforce_detection=False )[0]["embedding"] del img_processed gc.collect() except Exception as e: return [], f"⚠ Error procesando imagen: {str(e)}" similarities = [] try: embedding_files = [ f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN) if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".pkl") ] except Exception as e: return [], f"⚠ Error obteniendo archivos: {str(e)}" for file_path in embedding_files: try: file_bytes = requests.get( f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}", headers=headers, timeout=10 ).content record = pickle.loads(file_bytes) name = record["name"] img = record["img"] emb = record["embedding"] dist = np.linalg.norm(np.array(query_embedding) - np.array(emb)) sim_score = 1 / (1 + dist) similarities.append((sim_score, name, np.array(img))) except Exception as e: print(f"⚠ Error con {file_path}: {e}") continue similarities.sort(reverse=True) top = similarities[:5] gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top] summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top]) return gallery, summary # 🚀 Inicializar print("🚀 Iniciando app...") build_database() # 🎛️ Interfaz Gradio demo = gr.Interface( fn=find_similar_faces, inputs=gr.Image(label="📤 Sube una imagen", type="pil"), outputs=[ gr.Gallery(label="📸 Rostros similares"), gr.Textbox(label="🧠 Detalle", lines=6) ], title="🔍 Reconocimiento facial con DeepFace", description="Sube una imagen y encuentra coincidencias en el dataset privado de Hugging Face usando embeddings Facenet." ) demo.launch()