import asyncio import json import logging import os import tkinter as tk from tkinter import messagebox from threading import Thread from typing import Dict, Any class AIApp(tk.Tk): def __init__(self, ai_core): super().__init__() self.ai_core = ai_core self.title("AI System Interface") self.geometry("800x600") self._running = True self.create_widgets() self._start_health_monitoring() def create_widgets(self): self.query_label = tk.Label(self, text="Enter your query:") self.query_label.pack(pady=10) self.query_entry = tk.Entry(self, width=100) self.query_entry.pack(pady=10) self.submit_button = tk.Button(self, text="Submit", command=self.submit_query) self.submit_button.pack(pady=10) self.response_area = tk.Text(self, height=20, width=100) self.response_area.pack(pady=10) self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) def submit_query(self): query = self.query_entry.get() self.status_bar.config(text="Processing...") Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() def _run_async_task(self, coroutine): """Run async task in a separate thread""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(coroutine) self.after(0, self._display_result, result) except Exception as e: self.after(0, self._show_error, str(e)) finally: loop.close() def _display_result(self, result: Dict): """Display results in the GUI""" self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") self.status_bar.config(text="Query processed successfully") def _show_error(self, message: str): """Display error messages to the user""" messagebox.showerror("Error", message) self.status_bar.config(text=f"Error: {message}") def _start_health_monitoring(self): """Periodically check system health""" def update_health(): if self._running: health = asyncio.run(self.ai_core.self_healing.check_health()) self.status_bar.config( text=f"System Health - Memory: {health['memory_usage']}% | " f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s" ) self.after(5000, update_health) update_health() async def main(): """The main function initializes the AI system, handles user input in a loop, generates responses using the AI system, and prints the insights, security level, AI response, and safety analysis. It also ensures proper shutdown of the AI system and its resources.""" print("🧠 Hybrid AI System Initializing (Local Models)") ai = AICore() app = AIApp(ai) app.mainloop() await ai.shutdown() if __name__ == "__main__": asyncio.run(main())