""" API entegrasyonları için yardımcı fonksiyonlar. Bu modül, OpenAI, Google Gemini ve OpenRouter API'leri ile etkileşim için gerekli fonksiyonları içerir. """ import os import json import time import threading from typing import Dict, Any, List, Optional import openai from google import generativeai as genai import requests from dotenv import load_dotenv # Prompt şablonlarından model listelerini içe aktar from prompt_templates import OPENAI_MODELS, GEMINI_MODELS, OPENROUTER_MODELS # .env dosyasını yükle (varsa) load_dotenv() # Model önbelleği için global değişkenler MODEL_CACHE = { "openai": { "models": [], "last_updated": 0, "update_interval": 3600 # 1 saat (saniye cinsinden) }, "gemini": { "models": [], "last_updated": 0, "update_interval": 3600 # 1 saat (saniye cinsinden) }, "openrouter": { "models": [], "last_updated": 0, "update_interval": 3600 # 1 saat (saniye cinsinden) } } # Model önbelleği dosya yolu CACHE_FILE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "model_cache.json") def load_model_cache(): """ Disk üzerindeki model önbelleğini yükler. """ global MODEL_CACHE try: if os.path.exists(CACHE_FILE_PATH): with open(CACHE_FILE_PATH, 'r') as f: cache_data = json.load(f) MODEL_CACHE = cache_data print(f"Model önbelleği yüklendi: {len(MODEL_CACHE['openai']['models'])} OpenAI, {len(MODEL_CACHE['gemini']['models'])} Gemini, {len(MODEL_CACHE['openrouter']['models'])} OpenRouter modeli") except Exception as e: print(f"Model önbelleği yüklenirken hata oluştu: {str(e)}") def save_model_cache(): """ Model önbelleğini diske kaydeder. """ try: with open(CACHE_FILE_PATH, 'w') as f: json.dump(MODEL_CACHE, f) print("Model önbelleği diske kaydedildi") except Exception as e: print(f"Model önbelleği kaydedilirken hata oluştu: {str(e)}") # Uygulama başlangıcında önbelleği yükle load_model_cache() class APIManager: """ API yönetimi için sınıf. Bu sınıf, API anahtarlarını yönetir ve API'lerin durumunu kontrol eder. """ def __init__(self): """ API yöneticisini başlat. """ self.api_keys = { "openai": os.getenv("OPENAI_API_KEY", ""), "gemini": os.getenv("GEMINI_API_KEY", ""), "openrouter": os.getenv("OPENROUTER_API_KEY", "") } def set_api_key(self, provider: str, api_key: str) -> None: """ Belirli bir sağlayıcı için API anahtarını ayarlar. Args: provider (str): API sağlayıcısı ('openai', 'gemini', 'openrouter') api_key (str): API anahtarı """ if provider in self.api_keys: self.api_keys[provider] = api_key # API anahtarını ilgili kütüphane için de ayarla if provider == "openai": openai.api_key = api_key elif provider == "gemini": genai.configure(api_key=api_key) # API anahtarı değiştiğinde model önbelleğini sıfırla global MODEL_CACHE MODEL_CACHE[provider]["last_updated"] = 0 def get_api_key(self, provider: str) -> str: """ Belirli bir sağlayıcı için API anahtarını döndürür. Args: provider (str): API sağlayıcısı ('openai', 'gemini', 'openrouter') Returns: str: API anahtarı """ return self.api_keys.get(provider, "") def check_api_key_validity(self, provider: str) -> bool: """ Belirli bir sağlayıcı için API anahtarının geçerliliğini kontrol eder. Args: provider (str): API sağlayıcısı ('openai', 'gemini', 'openrouter') Returns: bool: API anahtarı geçerli ise True, değilse False """ api_key = self.get_api_key(provider) if not api_key: return False try: if provider == "openai": openai.api_key = api_key # Basit bir model listesi isteği ile API anahtarının geçerliliğini kontrol et openai.models.list() return True elif provider == "gemini": genai.configure(api_key=api_key) # Kullanılabilir modelleri listeleyerek API anahtarının geçerliliğini kontrol et genai.list_models() return True elif provider == "openrouter": headers = { "Authorization": f"Bearer {api_key}" } response = requests.get( "https://openrouter.ai/api/v1/models", headers=headers ) return response.status_code == 200 return False except Exception: return False class OpenAIHandler: """ OpenAI API ile etkileşim için sınıf. """ def __init__(self, api_key: str = ""): """ OpenAI işleyicisini başlat. Args: api_key (str, optional): OpenAI API anahtarı """ self.api_key = api_key if api_key: openai.api_key = api_key def set_api_key(self, api_key: str) -> None: """ OpenAI API anahtarını ayarlar. Args: api_key (str): OpenAI API anahtarı """ self.api_key = api_key openai.api_key = api_key # API anahtarı değiştiğinde model önbelleğini sıfırla global MODEL_CACHE MODEL_CACHE["openai"]["last_updated"] = 0 def fetch_models_from_api(self) -> List[str]: """ OpenAI API'sinden modelleri çeker. Returns: List[str]: API'den çekilen modeller listesi """ if not self.api_key: return [] try: models = openai.models.list() # Sadece GPT modellerini filtrele gpt_models = [model.id for model in models.data if "gpt" in model.id.lower()] return gpt_models except Exception as e: print(f"OpenAI modellerini çekerken hata oluştu: {str(e)}") return [] def get_available_models(self, force_refresh: bool = False) -> List[str]: """ Kullanılabilir OpenAI modellerini döndürür. Args: force_refresh (bool): Önbelleği zorla yenileme Returns: List[str]: Kullanılabilir modeller listesi """ global MODEL_CACHE current_time = time.time() cache = MODEL_CACHE["openai"] # Önbellek yenileme koşulları: # 1. Zorla yenileme istenmiş # 2. Önbellek boş # 3. Önbellek güncellenme zamanı aşılmış if (force_refresh or not cache["models"] or current_time - cache["last_updated"] > cache["update_interval"]): # API'den modelleri çek api_models = self.fetch_models_from_api() if api_models: # API'den modeller başarıyla çekildiyse önbelleği güncelle all_models = list(set(api_models + OPENAI_MODELS)) # Varsayılan modelleri önceliklendir sorted_models = sorted( all_models, key=lambda x: ( 0 if x in OPENAI_MODELS else 1, OPENAI_MODELS.index(x) if x in OPENAI_MODELS else float('inf') ) ) # Önbelleği güncelle MODEL_CACHE["openai"]["models"] = sorted_models MODEL_CACHE["openai"]["last_updated"] = current_time # Önbelleği diske kaydet save_model_cache() return sorted_models else: # API'den model çekilemediyse önbellekteki modelleri kullan if cache["models"]: return cache["models"] else: return OPENAI_MODELS else: # Önbellek güncel, önbellekteki modelleri kullan return cache["models"] if cache["models"] else OPENAI_MODELS def generate_response(self, prompt: str, model: str = "gpt-3.5-turbo", temperature: float = 0.7, max_tokens: int = 2000) -> Dict[str, Any]: """ OpenAI API kullanarak yanıt oluşturur. Args: prompt (str): Gönderilecek prompt model (str): Kullanılacak model temperature (float): Sıcaklık değeri (0.0-1.0) max_tokens (int): Maksimum token sayısı Returns: Dict[str, Any]: Yanıt bilgilerini içeren sözlük """ if not self.api_key: return {"success": False, "error": "OpenAI API anahtarı ayarlanmamış.", "content": ""} try: response = openai.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=temperature, max_tokens=max_tokens ) return { "success": True, "content": response.choices[0].message.content, "model": model, "usage": { "prompt_tokens": response.usage.prompt_tokens, "completion_tokens": response.usage.completion_tokens, "total_tokens": response.usage.total_tokens } } except Exception as e: return {"success": False, "error": str(e), "content": ""} class GeminiHandler: """ Google Gemini API ile etkileşim için sınıf. """ def __init__(self, api_key: str = ""): """ Gemini işleyicisini başlat. Args: api_key (str, optional): Gemini API anahtarı """ self.api_key = api_key if api_key: genai.configure(api_key=api_key) def set_api_key(self, api_key: str) -> None: """ Gemini API anahtarını ayarlar. Args: api_key (str): Gemini API anahtarı """ self.api_key = api_key genai.configure(api_key=api_key) # API anahtarı değiştiğinde model önbelleğini sıfırla global MODEL_CACHE MODEL_CACHE["gemini"]["last_updated"] = 0 def fetch_models_from_api(self) -> List[str]: """ Gemini API'sinden modelleri çeker. Returns: List[str]: API'den çekilen modeller listesi """ if not self.api_key: return [] try: models = genai.list_models() api_models = [] # Tüm model türlerini topla (gemini, imagen, veo, vb.) for model in models: model_name = model.name.split("/")[-1] if any(keyword in model_name.lower() for keyword in ["gemini", "imagen", "veo"]): api_models.append(model_name) return api_models except Exception as e: print(f"Gemini modellerini çekerken hata oluştu: {str(e)}") return [] def get_available_models(self, force_refresh: bool = False) -> List[str]: """ Kullanılabilir Gemini modellerini döndürür. Args: force_refresh (bool): Önbelleği zorla yenileme Returns: List[str]: Kullanılabilir modeller listesi """ global MODEL_CACHE current_time = time.time() cache = MODEL_CACHE["gemini"] # Önbellek yenileme koşulları: # 1. Zorla yenileme istenmiş # 2. Önbellek boş # 3. Önbellek güncellenme zamanı aşılmış if (force_refresh or not cache["models"] or current_time - cache["last_updated"] > cache["update_interval"]): # API'den modelleri çek api_models = self.fetch_models_from_api() if api_models: # API'den modeller başarıyla çekildiyse önbelleği güncelle all_models = list(set(api_models + GEMINI_MODELS)) # Varsayılan modelleri önceliklendir sorted_models = sorted( all_models, key=lambda x: ( 0 if x in GEMINI_MODELS else 1, GEMINI_MODELS.index(x) if x in GEMINI_MODELS else float('inf') ) ) # Önbelleği güncelle MODEL_CACHE["gemini"]["models"] = sorted_models MODEL_CACHE["gemini"]["last_updated"] = current_time # Önbelleği diske kaydet save_model_cache() return sorted_models else: # API'den model çekilemediyse önbellekteki modelleri kullan if cache["models"]: return cache["models"] else: return GEMINI_MODELS else: # Önbellek güncel, önbellekteki modelleri kullan return cache["models"] if cache["models"] else GEMINI_MODELS def generate_response(self, prompt: str, model: str = "gemini-1.5-pro", temperature: float = 0.7) -> Dict[str, Any]: """ Gemini API kullanarak yanıt oluşturur. Args: prompt (str): Gönderilecek prompt model (str): Kullanılacak model temperature (float): Sıcaklık değeri (0.0-1.0) Returns: Dict[str, Any]: Yanıt bilgilerini içeren sözlük """ if not self.api_key: return {"success": False, "error": "Gemini API anahtarı ayarlanmamış.", "content": ""} try: model_obj = genai.GenerativeModel(model) response = model_obj.generate_content( prompt, generation_config=genai.types.GenerationConfig( temperature=temperature ) ) return { "success": True, "content": response.text, "model": model } except Exception as e: return {"success": False, "error": str(e), "content": ""} class OpenRouterHandler: """ OpenRouter API ile etkileşim için sınıf. """ def __init__(self, api_key: str = ""): """ OpenRouter işleyicisini başlat. Args: api_key (str, optional): OpenRouter API anahtarı """ self.api_key = api_key self.base_url = "https://openrouter.ai/api/v1" def set_api_key(self, api_key: str) -> None: """ OpenRouter API anahtarını ayarlar. Args: api_key (str): OpenRouter API anahtarı """ self.api_key = api_key # API anahtarı değiştiğinde model önbelleğini sıfırla global MODEL_CACHE MODEL_CACHE["openrouter"]["last_updated"] = 0 def fetch_models_from_api(self) -> List[str]: """ OpenRouter API'sinden modelleri çeker. Returns: List[str]: API'den çekilen modeller listesi """ if not self.api_key: return [] try: headers = { "Authorization": f"Bearer {self.api_key}" } response = requests.get( f"{self.base_url}/models", headers=headers ) if response.status_code == 200: models_data = response.json() api_models = [model["id"] for model in models_data["data"]] return api_models else: print(f"OpenRouter modellerini çekerken HTTP hatası: {response.status_code}") return [] except Exception as e: print(f"OpenRouter modellerini çekerken hata oluştu: {str(e)}") return [] def get_available_models(self, force_refresh: bool = False) -> List[str]: """ Kullanılabilir OpenRouter modellerini döndürür. Args: force_refresh (bool): Önbelleği zorla yenileme Returns: List[str]: Kullanılabilir modeller listesi """ global MODEL_CACHE current_time = time.time() cache = MODEL_CACHE["openrouter"] # Önbellek yenileme koşulları: # 1. Zorla yenileme istenmiş # 2. Önbellek boş # 3. Önbellek güncellenme zamanı aşılmış if (force_refresh or not cache["models"] or current_time - cache["last_updated"] > cache["update_interval"]): # API'den modelleri çek api_models = self.fetch_models_from_api() if api_models: # API'den modeller başarıyla çekildiyse önbelleği güncelle all_models = list(set(api_models + OPENROUTER_MODELS)) # Varsayılan modelleri önceliklendir sorted_models = sorted( all_models, key=lambda x: ( 0 if x in OPENROUTER_MODELS else 1, OPENROUTER_MODELS.index(x) if x in OPENROUTER_MODELS else float('inf') ) ) # Önbelleği güncelle MODEL_CACHE["openrouter"]["models"] = sorted_models MODEL_CACHE["openrouter"]["last_updated"] = current_time # Önbelleği diske kaydet save_model_cache() return sorted_models else: # API'den model çekilemediyse önbellekteki modelleri kullan if cache["models"]: return cache["models"] else: return OPENROUTER_MODELS else: # Önbellek güncel, önbellekteki modelleri kullan return cache["models"] if cache["models"] else OPENROUTER_MODELS def generate_response(self, prompt: str, model: str = "openai/gpt-4-turbo", temperature: float = 0.7, max_tokens: int = 2000) -> Dict[str, Any]: """ OpenRouter API kullanarak yanıt oluşturur. Args: prompt (str): Gönderilecek prompt model (str): Kullanılacak model temperature (float): Sıcaklık değeri (0.0-1.0) max_tokens (int): Maksimum token sayısı Returns: Dict[str, Any]: Yanıt bilgilerini içeren sözlük """ if not self.api_key: return {"success": False, "error": "OpenRouter API anahtarı ayarlanmamış.", "content": ""} try: headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } data = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, "max_tokens": max_tokens } response = requests.post( f"{self.base_url}/chat/completions", headers=headers, json=data ) if response.status_code == 200: response_data = response.json() return { "success": True, "content": response_data["choices"][0]["message"]["content"], "model": model, "usage": response_data.get("usage", {}) } else: return { "success": False, "error": f"HTTP {response.status_code}: {response.text}", "content": "" } except Exception as e: return {"success": False, "error": str(e), "content": ""} # Arka planda model listelerini güncelleyen fonksiyon def background_model_updater(): """ Arka planda çalışarak model listelerini periyodik olarak günceller. """ while True: try: # Her sağlayıcı için modelleri güncelle if openai_handler.api_key: openai_handler.get_available_models(force_refresh=True) if gemini_handler.api_key: gemini_handler.get_available_models(force_refresh=True) if openrouter_handler.api_key: openrouter_handler.get_available_models(force_refresh=True) print("Model listeleri arka planda güncellendi") # 1 saat bekle time.sleep(3600) except Exception as e: print(f"Arka plan model güncelleyicisinde hata: {str(e)}") time.sleep(60) # Hata durumunda 1 dakika bekle ve tekrar dene # API işleyicilerini oluştur api_manager = APIManager() openai_handler = OpenAIHandler() gemini_handler = GeminiHandler() openrouter_handler = OpenRouterHandler() # API anahtarlarını ayarla (varsa) openai_api_key = os.getenv("OPENAI_API_KEY", "") gemini_api_key = os.getenv("GEMINI_API_KEY", "") openrouter_api_key = os.getenv("OPENROUTER_API_KEY", "") if openai_api_key: openai_handler.set_api_key(openai_api_key) if gemini_api_key: gemini_handler.set_api_key(gemini_api_key) if openrouter_api_key: openrouter_handler.set_api_key(openrouter_api_key) # Arka plan model güncelleyicisini başlat updater_thread = threading.Thread(target=background_model_updater, daemon=True) updater_thread.start() # Test fonksiyonu def test_api_connections(): """ API bağlantılarını test eder. """ print("API Bağlantı Testi:") # OpenAI API testi if openai_api_key: print("\nOpenAI API Testi:") try: models = openai_handler.get_available_models(force_refresh=True) print(f"Kullanılabilir modeller: {models[:5]}...") print("OpenAI API bağlantısı başarılı.") except Exception as e: print(f"OpenAI API hatası: {str(e)}") else: print("\nOpenAI API anahtarı ayarlanmamış.") print(f"Varsayılan modeller: {OPENAI_MODELS}") # Gemini API testi if gemini_api_key: print("\nGemini API Testi:") try: models = gemini_handler.get_available_models(force_refresh=True) print(f"Kullanılabilir modeller: {models}") print("Gemini API bağlantısı başarılı.") except Exception as e: print(f"Gemini API hatası: {str(e)}") else: print("\nGemini API anahtarı ayarlanmamış.") print(f"Varsayılan modeller: {GEMINI_MODELS}") # OpenRouter API testi if openrouter_api_key: print("\nOpenRouter API Testi:") try: models = openrouter_handler.get_available_models(force_refresh=True) print(f"Kullanılabilir modeller: {len(models)} model bulundu") print(f"İlk 10 model: {models[:10]}...") print("OpenRouter API bağlantısı başarılı.") except Exception as e: print(f"OpenRouter API hatası: {str(e)}") else: print("\nOpenRouter API anahtarı ayarlanmamış.") print(f"Varsayılan modeller: {len(OPENROUTER_MODELS)} model bulundu") print(f"İlk 10 model: {OPENROUTER_MODELS[:10]}...") if __name__ == "__main__": test_api_connections()