Spaces:
Runtime error
Runtime error
import os | |
import sys | |
import asyncio | |
import logging | |
import openai # Correct OpenAI import | |
from transformers import AutoTokenizer | |
from typing import Dict, Any, List | |
import aiohttp | |
from cryptography.fernet import Fernet | |
import gradio as gr | |
import json | |
import torch | |
import psutil | |
import random | |
class EnvironmentManager: | |
"""Handles loading and validation of environment variables.""" | |
def load_env_variables() -> Dict[str, str]: | |
required_vars = [ | |
"OPENAI_API_KEY", "ENCRYPTION_KEY" | |
] | |
env_vars = {var: os.getenv(var) for var in required_vars} | |
missing_vars = [var for var, value in env_vars.items() if not value] | |
if missing_vars: | |
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") | |
return env_vars | |
class EncryptionManager: | |
"""Handles encryption and decryption of sensitive data.""" | |
def __init__(self, key: str): | |
self.cipher = Fernet(key.encode()) | |
def encrypt(self, data: str) -> str: | |
return self.cipher.encrypt(data.encode()).decode() | |
def decrypt(self, encrypted_data: str) -> str: | |
return self.cipher.decrypt(encrypted_data.encode()).decode() | |
class AICore: | |
"""Main AI Core system integrating OpenAI chat functionality.""" | |
def __init__(self, env_vars: Dict[str, str]): | |
self.env_vars = env_vars | |
self.encryption_manager = EncryptionManager(env_vars["ENCRYPTION_KEY"]) | |
self.openai_api_key = env_vars["OPENAI_API_KEY"] | |
async def generate_response(self, query: str) -> Dict[str, Any]: | |
try: | |
encrypted_query = self.encryption_manager.encrypt(query) | |
chat_completion = await openai.ChatCompletion.acreate( | |
model="gpt-4-turbo", # Ensure you use a valid model name | |
messages=[ | |
{"role": "system", "content": "You are a helpful AI assistant."}, | |
{"role": "user", "content": query} | |
], | |
api_key=self.openai_api_key | |
) | |
model_response = chat_completion['choices'][0]['message']['content'] | |
return |