Spaces:
Runtime error
Runtime error
import os | |
import json | |
import asyncio | |
import logging | |
import re | |
import random | |
import torch | |
import aiohttp | |
import psutil | |
import gc | |
import numpy as np | |
from collections import deque | |
from typing import List, Dict, Any, Optional | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.fernet import Fernet | |
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline | |
from sklearn.ensemble import IsolationForest | |
import tkinter as tk | |
from tkinter import scrolledtext, messagebox | |
from threading import Thread | |
# Set up structured logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("ai_system.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class AIConfig: | |
"""Configuration manager with validation and encryption key handling""" | |
_DEFAULTS = { | |
"model_name": "mistralai/Mistral-7B-Instruct-v0.2", | |
"perspectives": ["newton", "davinci", "quantum", "emotional"], | |
"safety_thresholds": { | |
"memory": 85, | |
"cpu": 90, | |
"response_time": 2.0 | |
}, | |
"max_retries": 3, | |
"max_input_length": 4096, | |
"max_response_length": 1024, | |
"additional_models": ["gpt-4o-mini-2024-07-18"] | |
} | |
def __init__(self, config_path: str = "config.json"): | |
self.config = self._load_config(config_path) | |
self._validate_config() | |
self.encryption_key = self._init_encryption() | |
def _load_config(self, file_path: str) -> Dict: | |
"""Load configuration with fallback to defaults""" | |
try: | |
with open(file_path, 'r') as file: | |
return {**self._DEFAULTS, **json.load(file)} | |
except (FileNotFoundError, json.JSONDecodeError) as e: | |
logger.warning(f"Config load failed: {e}, using defaults") | |
return self._DEFAULTS | |
def _validate_config(self): | |
"""Validate configuration parameters""" | |
if not isinstance(self.config["perspectives"], list): | |
raise ValueError("Perspectives must be a list") | |
thresholds = self.config["safety_thresholds"] | |
for metric, value in thresholds.items(): | |
if not (0 <= value <= 100 if metric != "response_time" else value > 0): | |
raise ValueError(f"Invalid threshold value for {metric}: {value}") | |
def _init_encryption(self) -> bytes: | |
"""Initialize encryption key with secure storage""" | |
key_path = os.path.expanduser("~/.ai_system.key") | |
if os.path.exists(key_path): | |
with open(key_path, "rb") as key_file: | |
return key_file.read() | |
key = Fernet.generate_key() | |
with open(key_path, "wb") as key_file: | |
key_file.write(key) | |
os.chmod(key_path, 0o600) | |
return key | |
def model_name(self) -> str: | |
return self.config["model_name"] | |
def safety_thresholds(self) -> Dict: | |
return self.config["safety_thresholds"] | |
# Additional property accessors... | |
class Element: | |
"""Represents an element with specific properties and defense abilities""" | |
def __init__(self, name: str, symbol: str, representation: str, properties: List[str], interactions: List[str], defense_ability: str): | |
self.name = name | |
self.symbol = symbol | |
self.representation = representation | |
self.properties = properties | |
self.interactions = interactions | |
self.defense_ability = defense_ability | |
def execute_defense_function(self, system: Any): | |
"""Executes the defense function based on the element's defense ability""" | |
defense_functions = { | |
"evasion": self.evasion, | |
"adaptability": self.adaptability, | |
"fortification": self.fortification, | |
"barrier": self.barrier, | |
"regeneration": self.regeneration, | |
"resilience": self.resilience, | |
"illumination": self.illumination, | |
"shield": self.shield, | |
"reflection": self.reflection, | |
"protection": self.protection | |
} | |
if self.defense_ability.lower() in defense_functions: | |
defense_functions[self.defense_ability.lower()](system) | |
else: | |
self.no_defense() | |
def evasion(self, system): | |
logging.info(f"{self.name} evasion active - Obfuscating sensitive patterns") | |
system.response_modifiers.append(lambda x: re.sub(r'\d{3}-\d{2}-\d{4}', '[REDACTED]', x)) | |
def adaptability(self, system): | |
logging.info(f"{self.name} adapting - Optimizing runtime parameters") | |
system.model.config.temperature = max(0.7, system.model.config.temperature - 0.1) | |
def fortification(self, system): | |
logging.info(f"{self.name} fortifying - Enhancing security layers") | |
system.security_level += 1 | |
def barrier(self, system): | |
logging.info(f"{self.name} barrier erected - Filtering malicious patterns") | |
system.response_filters.append(lambda x: x.replace("malicious", "benign")) | |
def regeneration(self, system): | |
logging.info(f"{self.name} regenerating - Restoring system resources") | |
system.self_healing.metric_history.clear() | |
def resilience(self, system): | |
logging.info(f"{self.name} resilience - Boosting error tolerance") | |
system.error_threshold += 2 | |
def illumination(self, system): | |
logging.info(f"{self.name} illuminating - Enhancing explainability") | |
system.explainability_factor *= 1.2 | |
def shield(self, system): | |
logging.info(f"{self.name} shielding - Protecting sensitive data") | |
system.response_modifiers.append(lambda x: x.replace("password", "********")) | |
def reflection(self, system): | |
logging.info(f"{self.name} reflecting - Analyzing attack patterns") | |
system.security_audit = True | |
def protection(self, system): | |
logging.info(f"{self.name} protecting - Validating output safety") | |
system.safety_checks += 1 | |
def no_defense(self): | |
logging.warning("No active defense mechanism") | |
class CognitiveEngine: | |
"""Provides various cognitive perspectives and insights""" | |
def newton_thoughts(self, query: str) -> str: | |
return f"Scientific perspective: {query} suggests fundamental principles at play." | |
def davinci_insights(self, query: str) -> str: | |
return f"Creative analysis: {query} could be reimagined through interdisciplinary approaches." | |
def quantum_perspective(self, query: str) -> str: | |
return f"Quantum viewpoint: {query} exhibits probabilistic outcomes in entangled systems." | |
def emotional_insight(self, query: str) -> str: | |
return f"Emotional interpretation: {query} carries underlying tones of hope and curiosity." | |
def ethical_guidelines(self) -> str: | |
return "Ethical framework: Ensuring beneficence, justice, and respect for autonomy." | |
class EmotionalAnalyzer: | |
"""Analyzes the emotional content of the text""" | |
def analyze(self, text: str) -> Dict[str, float]: | |
classifier = pipeline("text-classification", model="SamLowe/roberta-base-go_emotions") | |
results = classifier(text) | |
return {result['label']: result['score'] for result in results} | |
class SelfHealingSystem: | |
"""Monitors the health of the AI system and performs self-healing actions if necessary""" | |
def __init__(self, config: AIConfig): | |
self.config = config | |
self.metric_history = deque(maxlen=100) | |
self.anomaly_detector = IsolationForest(contamination=0.1) | |
self.last_retrain = 0 | |
async def check_health(self) -> Dict[str, Any]: | |
metrics = { | |
'memory_usage': self._get_memory_usage(), | |
'cpu_load': self._get_cpu_load(), | |
'response_time': await self._measure_response_time() | |
} | |
self.metric_history.append(metrics) | |
await self._detect_anomalies() | |
self._take_corrective_actions(metrics) | |
return metrics | |
def _get_memory_usage(self) -> float: | |
return psutil.virtual_memory().percent | |
def _get_cpu_load(self) -> float: | |
return psutil.cpu_percent(interval=1) | |
async def _measure_response_time(self) -> float: | |
start = asyncio.get_event_loop().time() | |
await asyncio.sleep(0) | |
return asyncio.get_event_loop().time() - start | |
async def _detect_anomalies(self): | |
if len(self.metric_history) % 50 == 0: | |
features = np.array([[m['memory_usage'], m['cpu_load'], m['response_time']] for m in self.metric_history]) | |
if len(features) > 10: | |
self.anomaly_detector.fit(features) | |
if self.metric_history: | |
latest = np.array([[self.metric_history[-1]['memory_usage'], self.metric_history[-1]['cpu_load'], self.metric_history[-1]['response_time']]]) | |
anomalies = self.anomaly_detector.predict(latest) | |
if anomalies == -1: | |
await self._emergency_throttle() | |
async def _emergency_throttle(self): | |
logging.warning("Anomaly detected! Throttling system...") | |
await asyncio.sleep(1) | |
def _take_corrective_actions(self, metrics: Dict[str, Any]): | |
if metrics['memory_usage'] > self.config.safety_thresholds['memory']: | |
logging.warning("Memory usage exceeds threshold! Freeing up resources...") | |
if metrics['cpu_load'] > self.config.safety_thresholds['cpu']: | |
logging.warning("CPU load exceeds threshold! Reducing workload...") | |
if metrics['response_time'] > self.config.safety_thresholds['response_time']: | |
logging.warning("Response time exceeds threshold! Optimizing processes...") | |
class SafetySystem: | |
"""Analyzes the safety of the generated responses""" | |
def __init__(self): | |
self.toxicity_analyzer = pipeline("text-classification", model="unitary/toxic-bert") | |
self.bias_detector = pipeline("text-classification", model="d4data/bias-detection-model") | |
def _detect_pii(self, text: str) -> list: | |
patterns = { | |
"SSN": r"\b\d{3}-\d{2}-\d{4}\b", | |
"Credit Card": r"\b(?:\d[ -]*?){13,16}\b", | |
} | |
return [pii_type for pii_type, pattern in patterns.items() if re.search(pattern, text)] | |
def analyze(self, text: str) -> dict: | |
return { | |
"toxicity": self.toxicity_analyzer(text)[0]['score'], | |
"bias": self.bias_detector(text)[0]['score'], | |
"privacy": self._detect_pii(text) | |
} | |
class AICore: | |
"""Core AI processing engine with model management and safety features""" | |
def __init__(self, config_path: str = "config.json"): | |
self.config = AIConfig(config_path) | |
self.models = self._initialize_models() | |
self.cipher = Fernet(self.config.encryption_key) | |
self.cognition = CognitiveEngine() | |
self.self_healing = SelfHealingSystem(self.config) | |
self.safety_system = SafetySystem() | |
self.emotional_analyzer = EmotionalAnalyzer() | |
self.elements = self._initialize_elements() | |
self.security_level = 0 | |
self.response_modifiers = [] | |
self.response_filters = [] | |
self.safety_checks = 0 | |
self.explainability_factor = 1.0 | |
self.http_session = aiohttp.ClientSession() | |
def _initialize_models(self) -> Dict[str, Any]: | |
"""Initialize AI models with quantization""" | |
quant_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_compute_dtype=torch.bfloat16 | |
) | |
tokenizer = AutoTokenizer.from_pretrained(self.config.model_name) | |
models = { | |
'mistralai': AutoModelForCausalLM.from_pretrained( | |
self.config.model_name, | |
quantization_config=quant_config | |
), | |
'gpt4o': AutoModelForCausalLM.from_pretrained( | |
self.config.config["additional_models"][0], | |
quantization_config=quant_config | |
) | |
} | |
return {'tokenizer': tokenizer, **models} | |
def _initialize_elements(self) -> Dict[str, Element]: | |
"""Initializes the elements with their properties and defense abilities""" | |
return { | |
"hydrogen": Element( | |
name="Hydrogen", | |
symbol="H", | |
representation="Lua", | |
properties=["Simple", "Lightweight", "Versatile"], | |
interactions=["Easily integrates with other languages"], | |
defense_ability="Evasion" | |
), | |
"carbon": Element( | |
name="Carbon", | |
symbol="C", | |
representation="Python", | |
properties=["Flexible", "Widely used", "Powerful"], | |
interactions=["Multi-paradigm programming"], | |
defense_ability="Adaptability" | |
), | |
"iron": Element( | |
name="Iron", | |
symbol="Fe", | |
representation="Java", | |
properties=["Strong", "Reliable", "Enterprise"], | |
interactions=["Large-scale systems"], | |
defense_ability="Fortification" | |
), | |
"silicon": Element( | |
name="Silicon", | |
symbol="Si", | |
representation="JavaScript", | |
properties=["Versatile", "Web-scale", "Dynamic"], | |
interactions=["Browser environments"], | |
defense_ability="Barrier" | |
), | |
"oxygen": Element( | |
name="Oxygen", | |
symbol="O", | |
representation="C++", | |
properties=["Efficient", "Low-level", "Performant"], | |
interactions=["System programming"], | |
defense_ability="Regeneration" | |
) | |
} | |
async def _process_perspectives(self, query: str) -> List[str]: | |
"""Processes the query through different cognitive perspectives""" | |
return [getattr(self.cognition, f"{p}_insight")(query) | |
if p == "emotional" else getattr(self.cognition, f"{p}_perspective")(query) | |
for p in self.config.perspectives] | |
async def _generate_local_model_response(self, query: str) -> str: | |
"""Generates a response using the local AI model""" | |
inputs = self.models['tokenizer'](query, return_tensors="pt").to(self.models['mistralai'].device) | |
outputs = self.models['mistralai'].generate(**inputs, max_new_tokens=256) | |
return self.models['tokenizer'].decode(outputs[0], skip_special_tokens=True) | |
def _apply_element_effects(self, response: str) -> str: | |
"""Applies the effects of elements to the response""" | |
for element in self.elements.values(): | |
element.execute_defense_function(self) | |
for modifier in self.response_modifiers: | |
response = modifier(response) | |
for filter_func in self.response_filters: | |
response = filter_func(response) | |
return response | |
async def generate_response(self, query: str, user_id: Optional[str] = None) -> Dict[str, Any]: | |
"""Generates a response to the user query""" | |
try: | |
nonce = os.urandom(12) | |
aesgcm = AESGCM(self.config.encryption_key) | |
encrypted_data = aesgcm.encrypt(nonce, query.encode(), None) | |
perspectives = await self._process_perspectives(query) | |
model_response = await self._generate_local_model_response(query) | |
final_response = self._apply_element_effects(model_response) | |
sentiment = self.emotional_analyzer.analyze(query) | |
safety = self.safety_system.analyze(final_response) | |
return { | |
"insights": perspectives, | |
"response": final_response, | |
"security_level": self.security_level, | |
"safety_checks": self.safety_checks, | |
"sentiment": sentiment, | |
"safety_analysis": safety, | |
"encrypted_query": nonce + encrypted_data, | |
"health_status": await self.self_healing.check_health() | |
} | |
except Exception as e: | |
logging.error(f"System error: {e}") | |
return {"error": "Processing failed - safety protocols engaged"} | |
async def shutdown(self): | |
"""Shuts down the AICore by closing the HTTP session""" | |
await self.http_session.close() | |
class AIApp(tk.Tk): | |
"""GUI application for interacting with the AI system""" | |
def __init__(self, ai_core: AICore): | |
super().__init__() | |
self.title("Advanced AI System") | |
self.ai_core = ai_core | |
self._create_widgets() | |
self._running = True | |
self._start_health_monitoring() | |
def _create_widgets(self): | |
"""Initialize GUI components""" | |
self.query_entry = tk.Entry(self, width=80) | |
self.query_entry.pack(pady=10) | |
tk.Button(self, text="Submit", command=self._submit_query).pack(pady=5) | |
self.response_area = scrolledtext.ScrolledText(self, width=100, height=30) | |
self.response_area.pack(pady=10) | |
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) | |
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) | |
def _submit_query(self): | |
"""Handle query submission with async execution""" | |
query = self.query_entry.get() | |
if not query: | |
return | |
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() | |
def _run_async_task(self, coroutine): | |
"""Run async task in a separate thread""" | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
result = loop.run_until_complete(coroutine) | |
self.after(0, self._display_result, result) | |
except Exception as e: | |
self.after(0, self._show_error, str(e)) | |
finally: | |
loop.close() | |
def _display_result(self, result: Dict): | |
"""Display results in the GUI""" | |
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") | |
self.status_bar.config(text="Query processed successfully") | |
def _show_error(self, message: str): | |
"""Display error messages to the user""" | |
messagebox.showerror("Error", message) | |
self.status_bar.config(text=f"Error: {message}") | |
def _start_health_monitoring(self): | |
"""Periodically check system health""" | |
def update_health(): | |
if self._running: | |
health = self.ai_core.self_healing.check_health() | |
self.status_bar.config( | |
text=f"System Health - Memory: {health['memory_usage']}% | " | |
f"CPU: {health['cpu_load']}% | GPU: {health['gpu_memory'] | |
class AIApp(tk.Tk): | |
"""GUI application for interacting with the AI system""" | |
def __init__(self, ai_core: AICore): | |
super().__init__() | |
self.title("Advanced AI System") | |
self.ai_core = ai_core | |
self._create_widgets() | |
self._running = True | |
self._start_health_monitoring() | |
def _create_widgets(self): | |
"""Initialize GUI components""" | |
self.query_entry = tk.Entry(self, width=80) | |
self.query_entry.pack(pady=10) | |
tk.Button(self, text="Submit", command=self._submit_query).pack(pady=5) | |
self.response_area = scrolledtext.ScrolledText(self, width=100, height=30) | |
self.response_area.pack(pady=10) | |
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) | |
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) | |
def _submit_query(self): | |
"""Handle query submission with async execution""" | |
query = self.query_entry.get() | |
if not query: | |
return | |
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() | |
def _run_async_task(self, coroutine): | |
"""Run async task in a separate thread""" | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
result = loop.run_until_complete(coroutine) | |
self.after(0, self._display_result, result) | |
except Exception as e: | |
self.after(0, self._show_error, str(e)) | |
finally: | |
loop.close() | |
def _display_result(self, result: Dict): | |
"""Display results in the GUI""" | |
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") | |
self.status_bar.config(text="Query processed successfully") | |
def _show_error(self, message: str): | |
"""Display error messages to the user""" | |
messagebox.showerror("Error", message) | |
self.status_bar.config(text=f"Error: {message}") | |
def _start_health_monitoring(self): | |
"""Periodically check system health""" | |
def update_health(): | |
if self._running: | |
health = asyncio.run(self.ai_core.self_healing.check_health()) | |
self.status_bar.config( | |
text=f"System Health - Memory: {health['memory_usage']}% | " | |
f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s" | |
) | |
self.after(5000, update_health) | |
update_health() | |
async def main(): | |
"""The main function initializes the AI system, handles user input in a loop, | |
generates responses using the AI system, and prints the insights, security level, | |
AI response, and safety analysis. It also ensures proper shutdown of the AI system | |
and its resources.""" | |
print("ЪДа Hybrid AI System Initializing (Local Models)") | |
ai = AICore() | |
app = AIApp(ai) | |
app.mainloop() | |
await ai.shutdown() | |
if __name__ == "__main__": | |
asyncio.run(main()) |