| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import tkinter as tk | |
| from tkinter import messagebox | |
| from threading import Thread | |
| from typing import Dict, Any | |
| class AIApp(tk.Tk): | |
| def __init__(self, ai_core): | |
| super().__init__() | |
| self.ai_core = ai_core | |
| self.title("AI System Interface") | |
| self.geometry("800x600") | |
| self._running = True | |
| self.create_widgets() | |
| self._start_health_monitoring() | |
| def create_widgets(self): | |
| self.query_label = tk.Label(self, text="Enter your query:") | |
| self.query_label.pack(pady=10) | |
| self.query_entry = tk.Entry(self, width=100) | |
| self.query_entry.pack(pady=10) | |
| self.submit_button = tk.Button(self, text="Submit", command=self.submit_query) | |
| self.submit_button.pack(pady=10) | |
| self.response_area = tk.Text(self, height=20, width=100) | |
| self.response_area.pack(pady=10) | |
| self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) | |
| self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) | |
| def submit_query(self): | |
| query = self.query_entry.get() | |
| self.status_bar.config(text="Processing...") | |
| Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() | |
| def _run_async_task(self, coroutine): | |
| """Run async task in a separate thread""" | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| result = loop.run_until_complete(coroutine) | |
| self.after(0, self._display_result, result) | |
| except Exception as e: | |
| self.after(0, self._show_error, str(e)) | |
| finally: | |
| loop.close() | |
| def _display_result(self, result: Dict): | |
| """Display results in the GUI""" | |
| self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") | |
| self.status_bar.config(text="Query processed successfully") | |
| def _show_error(self, message: str): | |
| """Display error messages to the user""" | |
| messagebox.showerror("Error", message) | |
| self.status_bar.config(text=f"Error: {message}") | |
| def _start_health_monitoring(self): | |
| """Periodically check system health""" | |
| def update_health(): | |
| if self._running: | |
| health = asyncio.run(self.ai_core.self_healing.check_health()) | |
| self.status_bar.config( | |
| text=f"System Health - Memory: {health['memory_usage']}% | " | |
| f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s" | |
| ) | |
| self.after(5000, update_health) | |
| update_health() | |
| async def main(): | |
| """The main function initializes the AI system, handles user input in a loop, | |
| generates responses using the AI system, and prints the insights, security level, | |
| AI response, and safety analysis. It also ensures proper shutdown of the AI system | |
| and its resources.""" | |
| print("🧠 Hybrid AI System Initializing (Local Models)") | |
| ai = AICore() | |
| app = AIApp(ai) | |
| app.mainloop() | |
| await ai.shutdown() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |