File size: 3,166 Bytes
f64e159 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import asyncio
import json
import logging
import os
import tkinter as tk
from tkinter import messagebox
from threading import Thread
from typing import Dict, Any
class AIApp(tk.Tk):
def __init__(self, ai_core):
super().__init__()
self.ai_core = ai_core
self.title("AI System Interface")
self.geometry("800x600")
self._running = True
self.create_widgets()
self._start_health_monitoring()
def create_widgets(self):
self.query_label = tk.Label(self, text="Enter your query:")
self.query_label.pack(pady=10)
self.query_entry = tk.Entry(self, width=100)
self.query_entry.pack(pady=10)
self.submit_button = tk.Button(self, text="Submit", command=self.submit_query)
self.submit_button.pack(pady=10)
self.response_area = tk.Text(self, height=20, width=100)
self.response_area.pack(pady=10)
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def submit_query(self):
query = self.query_entry.get()
self.status_bar.config(text="Processing...")
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start()
def _run_async_task(self, coroutine):
"""Run async task in a separate thread"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(coroutine)
self.after(0, self._display_result, result)
except Exception as e:
self.after(0, self._show_error, str(e))
finally:
loop.close()
def _display_result(self, result: Dict):
"""Display results in the GUI"""
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n")
self.status_bar.config(text="Query processed successfully")
def _show_error(self, message: str):
"""Display error messages to the user"""
messagebox.showerror("Error", message)
self.status_bar.config(text=f"Error: {message}")
def _start_health_monitoring(self):
"""Periodically check system health"""
def update_health():
if self._running:
health = asyncio.run(self.ai_core.self_healing.check_health())
self.status_bar.config(
text=f"System Health - Memory: {health['memory_usage']}% | "
f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s"
)
self.after(5000, update_health)
update_health()
async def main():
"""The main function initializes the AI system, handles user input in a loop,
generates responses using the AI system, and prints the insights, security level,
AI response, and safety analysis. It also ensures proper shutdown of the AI system
and its resources."""
print("🧠 Hybrid AI System Initializing (Local Models)")
ai = AICore()
app = AIApp(ai)
app.mainloop()
await ai.shutdown()
if __name__ == "__main__":
asyncio.run(main()) |