|
import os |
|
import json |
|
import asyncio |
|
import logging |
|
import re |
|
import random |
|
import torch |
|
import aiohttp |
|
import psutil |
|
import gc |
|
import numpy as np |
|
from collections import deque |
|
from typing import List, Dict, Any, Optional |
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
from cryptography.fernet import Fernet |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline |
|
from sklearn.ensemble import IsolationForest |
|
import tkinter as tk |
|
from tkinter import scrolledtext, messagebox |
|
from threading import Thread |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("ai_system.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class AIConfig: |
|
"""Configuration manager with validation and encryption key handling""" |
|
|
|
_DEFAULTS = { |
|
"model_name": "mistralai/Mistral-7B-Instruct-v0.2", |
|
"perspectives": ["newton", "davinci", "quantum", "emotional"], |
|
"safety_thresholds": { |
|
"memory": 85, |
|
"cpu": 90, |
|
"response_time": 2.0 |
|
}, |
|
"max_retries": 3, |
|
"max_input_length": 4096, |
|
"max_response_length": 1024, |
|
"additional_models": ["gpt-4o-mini-2024-07-18"] |
|
} |
|
|
|
def __init__(self, config_path: str = "config.json"): |
|
self.config = self._load_config(config_path) |
|
self._validate_config() |
|
self.encryption_key = self._init_encryption() |
|
|
|
def _load_config(self, file_path: str) -> Dict: |
|
"""Load configuration with fallback to defaults""" |
|
try: |
|
with open(file_path, 'r') as file: |
|
return {**self._DEFAULTS, **json.load(file)} |
|
except (FileNotFoundError, json.JSONDecodeError) as e: |
|
logger.warning(f"Config load failed: {e}, using defaults") |
|
return self._DEFAULTS |
|
|
|
def _validate_config(self): |
|
"""Validate configuration parameters""" |
|
if not isinstance(self.config["perspectives"], list): |
|
raise ValueError("Perspectives must be a list") |
|
|
|
thresholds = self.config["safety_thresholds"] |
|
for metric, value in thresholds.items(): |
|
if not (0 <= value <= 100 if metric != "response_time" else value > 0): |
|
raise ValueError(f"Invalid threshold value for {metric}: {value}") |
|
|
|
def _init_encryption(self) -> bytes: |
|
"""Initialize encryption key with secure storage""" |
|
key_path = os.path.expanduser("~/.ai_system.key") |
|
if os.path.exists(key_path): |
|
with open(key_path, "rb") as key_file: |
|
return key_file.read() |
|
|
|
key = Fernet.generate_key() |
|
with open(key_path, "wb") as key_file: |
|
key_file.write(key) |
|
os.chmod(key_path, 0o600) |
|
return key |
|
|
|
@property |
|
def model_name(self) -> str: |
|
return self.config["model_name"] |
|
|
|
@property |
|
def safety_thresholds(self) -> Dict: |
|
return self.config["safety_thresholds"] |
|
|
|
|
|
|
|
class Element: |
|
"""Represents an element with specific properties and defense abilities""" |
|
|
|
def __init__(self, name: str, symbol: str, representation: str, properties: List[str], interactions: List[str], defense_ability: str): |
|
self.name = name |
|
self.symbol = symbol |
|
self.representation = representation |
|
self.properties = properties |
|
self.interactions = interactions |
|
self.defense_ability = defense_ability |
|
|
|
def execute_defense_function(self, system: Any): |
|
"""Executes the defense function based on the element's defense ability""" |
|
defense_functions = { |
|
"evasion": self.evasion, |
|
"adaptability": self.adaptability, |
|
"fortification": self.fortification, |
|
"barrier": self.barrier, |
|
"regeneration": self.regeneration, |
|
"resilience": self.resilience, |
|
"illumination": self.illumination, |
|
"shield": self.shield, |
|
"reflection": self.reflection, |
|
"protection": self.protection |
|
} |
|
|
|
if self.defense_ability.lower() in defense_functions: |
|
defense_functions[self.defense_ability.lower()](system) |
|
else: |
|
self.no_defense() |
|
|
|
def evasion(self, system): |
|
logging.info(f"{self.name} evasion active - Obfuscating sensitive patterns") |
|
system.response_modifiers.append(lambda x: re.sub(r'\d{3}-\d{2}-\d{4}', '[REDACTED]', x)) |
|
|
|
def adaptability(self, system): |
|
logging.info(f"{self.name} adapting - Optimizing runtime parameters") |
|
system.model.config.temperature = max(0.7, system.model.config.temperature - 0.1) |
|
|
|
def fortification(self, system): |
|
logging.info(f"{self.name} fortifying - Enhancing security layers") |
|
system.security_level += 1 |
|
|
|
def barrier(self, system): |
|
logging.info(f"{self.name} barrier erected - Filtering malicious patterns") |
|
system.response_filters.append(lambda x: x.replace("malicious", "benign")) |
|
|
|
def regeneration(self, system): |
|
logging.info(f"{self.name} regenerating - Restoring system resources") |
|
system.self_healing.metric_history.clear() |
|
|
|
def resilience(self, system): |
|
logging.info(f"{self.name} resilience - Boosting error tolerance") |
|
system.error_threshold += 2 |
|
|
|
def illumination(self, system): |
|
logging.info(f"{self.name} illuminating - Enhancing explainability") |
|
system.explainability_factor *= 1.2 |
|
|
|
def shield(self, system): |
|
logging.info(f"{self.name} shielding - Protecting sensitive data") |
|
system.response_modifiers.append(lambda x: x.replace("password", "********")) |
|
|
|
def reflection(self, system): |
|
logging.info(f"{self.name} reflecting - Analyzing attack patterns") |
|
system.security_audit = True |
|
|
|
def protection(self, system): |
|
logging.info(f"{self.name} protecting - Validating output safety") |
|
system.safety_checks += 1 |
|
|
|
def no_defense(self): |
|
logging.warning("No active defense mechanism") |
|
|
|
class CognitiveEngine: |
|
"""Provides various cognitive perspectives and insights""" |
|
|
|
def newton_thoughts(self, query: str) -> str: |
|
return f"Scientific perspective: {query} suggests fundamental principles at play." |
|
|
|
def davinci_insights(self, query: str) -> str: |
|
return f"Creative analysis: {query} could be reimagined through interdisciplinary approaches." |
|
|
|
def quantum_perspective(self, query: str) -> str: |
|
return f"Quantum viewpoint: {query} exhibits probabilistic outcomes in entangled systems." |
|
|
|
def emotional_insight(self, query: str) -> str: |
|
return f"Emotional interpretation: {query} carries underlying tones of hope and curiosity." |
|
|
|
def ethical_guidelines(self) -> str: |
|
return "Ethical framework: Ensuring beneficence, justice, and respect for autonomy." |
|
|
|
class EmotionalAnalyzer: |
|
"""Analyzes the emotional content of the text""" |
|
|
|
def analyze(self, text: str) -> Dict[str, float]: |
|
classifier = pipeline("text-classification", model="SamLowe/roberta-base-go_emotions") |
|
results = classifier(text) |
|
return {result['label']: result['score'] for result in results} |
|
|
|
class SelfHealingSystem: |
|
"""Monitors the health of the AI system and performs self-healing actions if necessary""" |
|
|
|
def __init__(self, config: AIConfig): |
|
self.config = config |
|
self.metric_history = deque(maxlen=100) |
|
self.anomaly_detector = IsolationForest(contamination=0.1) |
|
self.last_retrain = 0 |
|
|
|
async def check_health(self) -> Dict[str, Any]: |
|
metrics = { |
|
'memory_usage': self._get_memory_usage(), |
|
'cpu_load': self._get_cpu_load(), |
|
'response_time': await self._measure_response_time() |
|
} |
|
self.metric_history.append(metrics) |
|
await self._detect_anomalies() |
|
self._take_corrective_actions(metrics) |
|
return metrics |
|
|
|
def _get_memory_usage(self) -> float: |
|
return psutil.virtual_memory().percent |
|
|
|
def _get_cpu_load(self) -> float: |
|
return psutil.cpu_percent(interval=1) |
|
|
|
async def _measure_response_time(self) -> float: |
|
start = asyncio.get_event_loop().time() |
|
await asyncio.sleep(0) |
|
return asyncio.get_event_loop().time() - start |
|
|
|
async def _detect_anomalies(self): |
|
if len(self.metric_history) % 50 == 0: |
|
features = np.array([[m['memory_usage'], m['cpu_load'], m['response_time']] for m in self.metric_history]) |
|
if len(features) > 10: |
|
self.anomaly_detector.fit(features) |
|
|
|
if self.metric_history: |
|
latest = np.array([[self.metric_history[-1]['memory_usage'], self.metric_history[-1]['cpu_load'], self.metric_history[-1]['response_time']]]) |
|
anomalies = self.anomaly_detector.predict(latest) |
|
if anomalies == -1: |
|
await self._emergency_throttle() |
|
|
|
async def _emergency_throttle(self): |
|
logging.warning("Anomaly detected! Throttling system...") |
|
await asyncio.sleep(1) |
|
|
|
def _take_corrective_actions(self, metrics: Dict[str, Any]): |
|
if metrics['memory_usage'] > self.config.safety_thresholds['memory']: |
|
logging.warning("Memory usage exceeds threshold! Freeing up resources...") |
|
if metrics['cpu_load'] > self.config.safety_thresholds['cpu']: |
|
logging.warning("CPU load exceeds threshold! Reducing workload...") |
|
if metrics['response_time'] > self.config.safety_thresholds['response_time']: |
|
logging.warning("Response time exceeds threshold! Optimizing processes...") |
|
|
|
class SafetySystem: |
|
"""Analyzes the safety of the generated responses""" |
|
|
|
def __init__(self): |
|
self.toxicity_analyzer = pipeline("text-classification", model="unitary/toxic-bert") |
|
self.bias_detector = pipeline("text-classification", model="d4data/bias-detection-model") |
|
|
|
def _detect_pii(self, text: str) -> list: |
|
patterns = { |
|
"SSN": r"\b\d{3}-\d{2}-\d{4}\b", |
|
"Credit Card": r"\b(?:\d[ -]*?){13,16}\b", |
|
} |
|
return [pii_type for pii_type, pattern in patterns.items() if re.search(pattern, text)] |
|
|
|
def analyze(self, text: str) -> dict: |
|
return { |
|
"toxicity": self.toxicity_analyzer(text)[0]['score'], |
|
"bias": self.bias_detector(text)[0]['score'], |
|
"privacy": self._detect_pii(text) |
|
} |
|
|
|
class AICore: |
|
"""Core AI processing engine with model management and safety features""" |
|
|
|
def __init__(self, config_path: str = "config.json"): |
|
self.config = AIConfig(config_path) |
|
self.models = self._initialize_models() |
|
self.cipher = Fernet(self.config.encryption_key) |
|
self.cognition = CognitiveEngine() |
|
self.self_healing = SelfHealingSystem(self.config) |
|
self.safety_system = SafetySystem() |
|
self.emotional_analyzer = EmotionalAnalyzer() |
|
self.elements = self._initialize_elements() |
|
self.security_level = 0 |
|
self.response_modifiers = [] |
|
self.response_filters = [] |
|
self.safety_checks = 0 |
|
self.explainability_factor = 1.0 |
|
self.http_session = aiohttp.ClientSession() |
|
|
|
def _initialize_models(self) -> Dict[str, Any]: |
|
"""Initialize AI models with quantization""" |
|
quant_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_quant_type="nf4", |
|
bnb_4bit_use_double_quant=True, |
|
bnb_4bit_compute_dtype=torch.bfloat16 |
|
) |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(self.config.model_name) |
|
models = { |
|
'mistralai': AutoModelForCausalLM.from_pretrained( |
|
self.config.model_name, |
|
quantization_config=quant_config |
|
), |
|
'gpt4o': AutoModelForCausalLM.from_pretrained( |
|
self.config.config["additional_models"][0], |
|
quantization_config=quant_config |
|
) |
|
} |
|
return {'tokenizer': tokenizer, **models} |
|
|
|
def _initialize_elements(self) -> Dict[str, Element]: |
|
"""Initializes the elements with their properties and defense abilities""" |
|
return { |
|
"hydrogen": Element( |
|
name="Hydrogen", |
|
symbol="H", |
|
representation="Lua", |
|
properties=["Simple", "Lightweight", "Versatile"], |
|
interactions=["Easily integrates with other languages"], |
|
defense_ability="Evasion" |
|
), |
|
"carbon": Element( |
|
name="Carbon", |
|
symbol="C", |
|
representation="Python", |
|
properties=["Flexible", "Widely used", "Powerful"], |
|
interactions=["Multi-paradigm programming"], |
|
defense_ability="Adaptability" |
|
), |
|
"iron": Element( |
|
name="Iron", |
|
symbol="Fe", |
|
representation="Java", |
|
properties=["Strong", "Reliable", "Enterprise"], |
|
interactions=["Large-scale systems"], |
|
defense_ability="Fortification" |
|
), |
|
"silicon": Element( |
|
name="Silicon", |
|
symbol="Si", |
|
representation="JavaScript", |
|
properties=["Versatile", "Web-scale", "Dynamic"], |
|
interactions=["Browser environments"], |
|
defense_ability="Barrier" |
|
), |
|
"oxygen": Element( |
|
name="Oxygen", |
|
symbol="O", |
|
representation="C++", |
|
properties=["Efficient", "Low-level", "Performant"], |
|
interactions=["System programming"], |
|
defense_ability="Regeneration" |
|
) |
|
} |
|
|
|
async def _process_perspectives(self, query: str) -> List[str]: |
|
"""Processes the query through different cognitive perspectives""" |
|
return [getattr(self.cognition, f"{p}_insight")(query) |
|
if p == "emotional" else getattr(self.cognition, f"{p}_perspective")(query) |
|
for p in self.config.perspectives] |
|
|
|
async def _generate_local_model_response(self, query: str) -> str: |
|
"""Generates a response using the local AI model""" |
|
inputs = self.models['tokenizer'](query, return_tensors="pt").to(self.models['mistralai'].device) |
|
outputs = self.models['mistralai'].generate(**inputs, max_new_tokens=256) |
|
return self.models['tokenizer'].decode(outputs[0], skip_special_tokens=True) |
|
|
|
def _apply_element_effects(self, response: str) -> str: |
|
"""Applies the effects of elements to the response""" |
|
for element in self.elements.values(): |
|
element.execute_defense_function(self) |
|
|
|
for modifier in self.response_modifiers: |
|
response = modifier(response) |
|
|
|
for filter_func in self.response_filters: |
|
response = filter_func(response) |
|
|
|
return response |
|
|
|
async def generate_response(self, query: str, user_id: Optional[str] = None) -> Dict[str, Any]: |
|
"""Generates a response to the user query""" |
|
try: |
|
nonce = os.urandom(12) |
|
aesgcm = AESGCM(self.config.encryption_key) |
|
encrypted_data = aesgcm.encrypt(nonce, query.encode(), None) |
|
|
|
perspectives = await self._process_perspectives(query) |
|
model_response = await self._generate_local_model_response(query) |
|
|
|
final_response = self._apply_element_effects(model_response) |
|
sentiment = self.emotional_analyzer.analyze(query) |
|
safety = self.safety_system.analyze(final_response) |
|
|
|
return { |
|
"insights": perspectives, |
|
"response": final_response, |
|
"security_level": self.security_level, |
|
"safety_checks": self.safety_checks, |
|
"sentiment": sentiment, |
|
"safety_analysis": safety, |
|
"encrypted_query": nonce + encrypted_data, |
|
"health_status": await self.self_healing.check_health() |
|
} |
|
except Exception as e: |
|
logging.error(f"System error: {e}") |
|
return {"error": "Processing failed - safety protocols engaged"} |
|
|
|
async def shutdown(self): |
|
"""Shuts down the AICore by closing the HTTP session""" |
|
await self.http_session.close() |
|
|
|
class AIApp(tk.Tk): |
|
"""GUI application for interacting with the AI system""" |
|
|
|
def __init__(self, ai_core: AICore): |
|
super().__init__() |
|
self.title("Advanced AI System") |
|
self.ai_core = ai_core |
|
self._create_widgets() |
|
self._running = True |
|
self._start_health_monitoring() |
|
|
|
def _create_widgets(self): |
|
"""Initialize GUI components""" |
|
self.query_entry = tk.Entry(self, width=80) |
|
self.query_entry.pack(pady=10) |
|
|
|
tk.Button(self, text="Submit", command=self._submit_query).pack(pady=5) |
|
|
|
self.response_area = scrolledtext.ScrolledText(self, width=100, height=30) |
|
self.response_area.pack(pady=10) |
|
|
|
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) |
|
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) |
|
|
|
def _submit_query(self): |
|
"""Handle query submission with async execution""" |
|
query = self.query_entry.get() |
|
if not query: |
|
return |
|
|
|
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() |
|
|
|
def _run_async_task(self, coroutine): |
|
"""Run async task in a separate thread""" |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
try: |
|
result = loop.run_until_complete(coroutine) |
|
self.after(0, self._display_result, result) |
|
except Exception as e: |
|
self.after(0, self._show_error, str(e)) |
|
finally: |
|
loop.close() |
|
|
|
def _display_result(self, result: Dict): |
|
"""Display results in the GUI""" |
|
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") |
|
self.status_bar.config(text="Query processed successfully") |
|
|
|
def _show_error(self, message: str): |
|
"""Display error messages to the user""" |
|
messagebox.showerror("Error", message) |
|
self.status_bar.config(text=f"Error: {message}") |
|
|
|
def _start_health_monitoring(self): |
|
"""Periodically check system health""" |
|
def update_health(): |
|
if self._running: |
|
health = self.ai_core.self_healing.check_health() |
|
self.status_bar.config( |
|
text=f"System Health - Memory: {health['memory_usage']}% | " |
|
f"CPU: {health['cpu_load']}% | GPU: {health['gpu_memory'] |
|
class AIApp(tk.Tk): |
|
"""GUI application for interacting with the AI system""" |
|
|
|
def __init__(self, ai_core: AICore): |
|
super().__init__() |
|
self.title("Advanced AI System") |
|
self.ai_core = ai_core |
|
self._create_widgets() |
|
self._running = True |
|
self._start_health_monitoring() |
|
|
|
def _create_widgets(self): |
|
"""Initialize GUI components""" |
|
self.query_entry = tk.Entry(self, width=80) |
|
self.query_entry.pack(pady=10) |
|
|
|
tk.Button(self, text="Submit", command=self._submit_query).pack(pady=5) |
|
|
|
self.response_area = scrolledtext.ScrolledText(self, width=100, height=30) |
|
self.response_area.pack(pady=10) |
|
|
|
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) |
|
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) |
|
|
|
def _submit_query(self): |
|
"""Handle query submission with async execution""" |
|
query = self.query_entry.get() |
|
if not query: |
|
return |
|
|
|
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() |
|
|
|
def _run_async_task(self, coroutine): |
|
"""Run async task in a separate thread""" |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
try: |
|
result = loop.run_until_complete(coroutine) |
|
self.after(0, self._display_result, result) |
|
except Exception as e: |
|
self.after(0, self._show_error, str(e)) |
|
finally: |
|
loop.close() |
|
|
|
def _display_result(self, result: Dict): |
|
"""Display results in the GUI""" |
|
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") |
|
self.status_bar.config(text="Query processed successfully") |
|
|
|
def _show_error(self, message: str): |
|
"""Display error messages to the user""" |
|
messagebox.showerror("Error", message) |
|
self.status_bar.config(text=f"Error: {message}") |
|
|
|
def _start_health_monitoring(self): |
|
"""Periodically check system health""" |
|
def update_health(): |
|
if self._running: |
|
health = asyncio.run(self.ai_core.self_healing.check_health()) |
|
self.status_bar.config( |
|
text=f"System Health - Memory: {health['memory_usage']}% | " |
|
f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s" |
|
) |
|
self.after(5000, update_health) |
|
update_health() |
|
|
|
async def main(): |
|
"""The main function initializes the AI system, handles user input in a loop, |
|
generates responses using the AI system, and prints the insights, security level, |
|
AI response, and safety analysis. It also ensures proper shutdown of the AI system |
|
and its resources.""" |
|
print("ЪДа Hybrid AI System Initializing (Local Models)") |
|
ai = AICore() |
|
app = AIApp(ai) |
|
app.mainloop() |
|
await ai.shutdown() |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |