|
import asyncio |
|
import json |
|
import logging |
|
import os |
|
import tkinter as tk |
|
from tkinter import messagebox |
|
from threading import Thread |
|
from typing import Dict, Any |
|
|
|
class AIApp(tk.Tk): |
|
def __init__(self, ai_core): |
|
super().__init__() |
|
self.ai_core = ai_core |
|
self.title("AI System Interface") |
|
self.geometry("800x600") |
|
self._running = True |
|
|
|
self.create_widgets() |
|
self._start_health_monitoring() |
|
|
|
def create_widgets(self): |
|
self.query_label = tk.Label(self, text="Enter your query:") |
|
self.query_label.pack(pady=10) |
|
|
|
self.query_entry = tk.Entry(self, width=100) |
|
self.query_entry.pack(pady=10) |
|
|
|
self.submit_button = tk.Button(self, text="Submit", command=self.submit_query) |
|
self.submit_button.pack(pady=10) |
|
|
|
self.response_area = tk.Text(self, height=20, width=100) |
|
self.response_area.pack(pady=10) |
|
|
|
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W) |
|
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) |
|
|
|
def submit_query(self): |
|
query = self.query_entry.get() |
|
self.status_bar.config(text="Processing...") |
|
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start() |
|
|
|
def _run_async_task(self, coroutine): |
|
"""Run async task in a separate thread""" |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
try: |
|
result = loop.run_until_complete(coroutine) |
|
self.after(0, self._display_result, result) |
|
except Exception as e: |
|
self.after(0, self._show_error, str(e)) |
|
finally: |
|
loop.close() |
|
|
|
def _display_result(self, result: Dict): |
|
"""Display results in the GUI""" |
|
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n") |
|
self.status_bar.config(text="Query processed successfully") |
|
|
|
def _show_error(self, message: str): |
|
"""Display error messages to the user""" |
|
messagebox.showerror("Error", message) |
|
self.status_bar.config(text=f"Error: {message}") |
|
|
|
def _start_health_monitoring(self): |
|
"""Periodically check system health""" |
|
def update_health(): |
|
if self._running: |
|
health = asyncio.run(self.ai_core.self_healing.check_health()) |
|
self.status_bar.config( |
|
text=f"System Health - Memory: {health['memory_usage']}% | " |
|
f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s" |
|
) |
|
self.after(5000, update_health) |
|
update_health() |
|
|
|
async def main(): |
|
"""The main function initializes the AI system, handles user input in a loop, |
|
generates responses using the AI system, and prints the insights, security level, |
|
AI response, and safety analysis. It also ensures proper shutdown of the AI system |
|
and its resources.""" |
|
print("🧠 Hybrid AI System Initializing (Local Models)") |
|
ai = AICore() |
|
app = AIApp(ai) |
|
app.mainloop() |
|
await ai.shutdown() |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |