import os import time import random import traceback import tiktoken import google.generativeai as genai import vertexai from vertexai.language_models import TextEmbeddingModel # Configuration (will be initialized from run_pipeline.py) # For module, these should ideally be arguments or imported from a config # GENAI_API_KEY = os.getenv("GENAI_API_KEY") # PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") # LOCATION = os.getenv("VERTEX_AI_LOCATION") MULTIMODAL_MODEL_GENAI = "models/gemini-1.5-flash-latest" TEXT_EMBEDDING_MODEL_VERTEXAI = "text-multilingual-embedding-002" EMBEDDING_DIMENSION = 768 # text-multilingual-embedding-002 has 768 dimensions MAX_TOKENS_NORMAL = 500 ENCODING_NAME = "cl100k_base" # Global client for Vertex AI Text Embedding Model text_embedding_model_client = None def initialize_clients(project_id, location, genai_api_key): """Initializes Vertex AI and GenAI clients.""" global text_embedding_model_client if genai_api_key: genai.configure(api_key=genai_api_key) print("✓ Google Generative AI configured.") else: print("⚠️ AVERTISSEMENT: La clé API Gemini n'est pas définie. La génération de descriptions multimodales échouera.") if project_id and location: try: vertexai.init(project=project_id, location=location) print(f"✓ Vertex AI SDK initialisé pour le projet {project_id} dans la région {location}.") text_embedding_model_client = TextEmbeddingModel.from_pretrained(TEXT_EMBEDDING_MODEL_VERTEXAI) print(f"✓ Modèle d'embedding textuel Vertex AI '{TEXT_EMBEDDING_MODEL_VERTEXAI}' chargé avec succès.") except Exception as e: print(f"❌ ERREUR: Échec de l'initialisation du Vertex AI SDK ou du chargement du modèle d'embedding textuel : {str(e)}") print("⚠️ La génération d'embeddings textuels échouera.") text_embedding_model_client = None else: print("⚠️ Vertex AI SDK non initialisé car l'ID du projet Google Cloud ou la localisation sont manquants.") print("⚠️ La génération d'embeddings textuels échouera.") text_embedding_model_client = None def token_chunking(text, max_tokens, encoding): """Chunk text based on token count with smarter boundaries (sentences, paragraphs)""" if not text: return [] tokens = encoding.encode(text) chunks = [] start_token_idx = 0 while start_token_idx < len(tokens): end_token_idx = min(start_token_idx + max_tokens, len(tokens)) if end_token_idx < len(tokens): look_ahead_limit = min(start_token_idx + max_tokens * 2, len(tokens)) text_segment_to_check = encoding.decode(tokens[start_token_idx:look_ahead_limit]) paragraph_break = text_segment_to_check.rfind('\n\n', 0, len(text_segment_to_check) - (look_ahead_limit - (start_token_idx + max_tokens))) if paragraph_break != -1: tokens_up_to_break = encoding.encode(text_segment_to_check[:paragraph_break]) end_token_idx = start_token_idx + len(tokens_up_to_break) else: sentence_end = re.search(r'[.!?]\s+', text_segment_to_check[:len(text_segment_to_check) - (look_ahead_limit - (start_token_idx + max_tokens))][::-1]) if sentence_end: char_index_in_segment = len(text_segment_to_check) - 1 - sentence_end.start() tokens_up_to_end = encoding.encode(text_segment_to_check[:char_index_in_segment + 1]) end_token_idx = start_token_idx + len(tokens_up_to_end) current_chunk_tokens = tokens[start_token_idx:end_token_idx] chunk_text = encoding.decode(current_chunk_tokens).strip() if chunk_text: chunks.append(chunk_text) if start_token_idx == end_token_idx: start_token_idx += 1 else: start_token_idx = end_token_idx return chunks def generate_multimodal_description(image_bytes, prompt_text, multimodal_model_genai_name=MULTIMODAL_MODEL_GENAI, max_retries=5, delay=10): """ Generate a text description for an image using a multimodal model (google.generativeai). Returns description text or None if all retries fail or API key is missing. """ if not genai.api_key: # Check if API key is configured print(" Skipping multimodal description generation: GEMINI_API_KEY is not set.") return None for attempt in range(max_retries): try: time.sleep(delay + random.uniform(0, 5)) content = [ prompt_text, { 'mime_type': 'image/png', 'data': image_bytes } ] model = genai.GenerativeModel(multimodal_model_genai_name) response = model.generate_content(content) description = response.text.strip() if description: return description else: print(f" Tentative {attempt+1}/{max_retries}: Réponse vide ou inattendue du modèle multimodal.") if attempt < max_retries - 1: retry_delay = delay * (2 ** attempt) + random.uniform(1, 5) print(f" Réessai dans {retry_delay:.2f}s...") time.sleep(retry_delay) continue # else: # print(f" Toutes les {max_retries} tentatives ont échoué pour générer la description.") # return None except Exception as e: error_msg = str(e) print(f" Tentative {attempt+1}/{max_retries} échouée pour la description : {error_msg}") if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower() or "unavailable" in error_msg.lower() or "internal error" in error_msg.lower(): if attempt < max_retries - 1: retry_delay = delay * (2 ** attempt) + random.uniform(1, 5) print(f" Erreur d'API retryable détectée. Réessai dans {retry_delay:.2f}s...") time.sleep(retry_delay) continue # else: # print(f" Toutes les {max_retries} tentatives ont échoué pour la description.") # return None else: print(f" Erreur d'API non retryable détectée : {error_msg}") traceback.print_exc() return None print(f" Toutes les {max_retries} tentatives ont échoué pour la description (fin de boucle).") return None def generate_text_embedding(text_content, max_retries=5, delay=5): """ Generate text embedding using the Vertex AI multilingual embedding model. Returns embedding vector (list) or None if all retries fail or client is not initialized. """ global text_embedding_model_client # Ensure we are using the global client if not text_embedding_model_client: print(" Skipping text embedding generation: Vertex AI embedding client is not initialized.") return None if not text_content or not text_content.strip(): return None # Cannot embed empty text for attempt in range(max_retries): try: time.sleep(delay + random.uniform(0, 2)) embeddings = text_embedding_model_client.get_embeddings( # Corrected method name [text_content] # Removed task_type ) if embeddings and len(embeddings) > 0 and hasattr(embeddings[0], 'values') and isinstance(embeddings[0].values, list) and len(embeddings[0].values) == EMBEDDING_DIMENSION: return embeddings[0].values else: print(f" Tentative {attempt+1}/{max_retries}: Format d'embedding Vertex AI inattendu. Réponse : {embeddings}") return None except Exception as e: error_msg = str(e) print(f" Tentative {attempt+1}/{max_retries} échouée pour l'embedding Vertex AI : {error_msg}") if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower() or "unavailable" in error_msg.lower() or "internal error" in error_msg.lower(): if attempt < max_retries - 1: retry_delay = delay * (2 ** attempt) + random.uniform(1, 5) print(f" Erreur d'API Vertex AI retryable détectée. Réessai dans {retry_delay:.2f}s...") time.sleep(retry_delay) continue # else: # print(f" Toutes les {max_retries} tentatives ont échoué pour l'embedding Vertex AI.") # return None else: print(f" Erreur d'API Vertex AI non retryable détectée : {error_msg}") traceback.print_exc() return None print(f" Toutes les {max_retries} tentatives ont échoué pour l'embedding Vertex AI (fin de boucle).") return None