Raiff1982's picture
Create GUI.py
f64e159 verified
raw
history blame
3.17 kB
import asyncio
import json
import logging
import os
import tkinter as tk
from tkinter import messagebox
from threading import Thread
from typing import Dict, Any
class AIApp(tk.Tk):
def __init__(self, ai_core):
super().__init__()
self.ai_core = ai_core
self.title("AI System Interface")
self.geometry("800x600")
self._running = True
self.create_widgets()
self._start_health_monitoring()
def create_widgets(self):
self.query_label = tk.Label(self, text="Enter your query:")
self.query_label.pack(pady=10)
self.query_entry = tk.Entry(self, width=100)
self.query_entry.pack(pady=10)
self.submit_button = tk.Button(self, text="Submit", command=self.submit_query)
self.submit_button.pack(pady=10)
self.response_area = tk.Text(self, height=20, width=100)
self.response_area.pack(pady=10)
self.status_bar = tk.Label(self, text="Ready", bd=1, relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
def submit_query(self):
query = self.query_entry.get()
self.status_bar.config(text="Processing...")
Thread(target=self._run_async_task, args=(self.ai_core.generate_response(query),)).start()
def _run_async_task(self, coroutine):
"""Run async task in a separate thread"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(coroutine)
self.after(0, self._display_result, result)
except Exception as e:
self.after(0, self._show_error, str(e))
finally:
loop.close()
def _display_result(self, result: Dict):
"""Display results in the GUI"""
self.response_area.insert(tk.END, json.dumps(result, indent=2) + "\n\n")
self.status_bar.config(text="Query processed successfully")
def _show_error(self, message: str):
"""Display error messages to the user"""
messagebox.showerror("Error", message)
self.status_bar.config(text=f"Error: {message}")
def _start_health_monitoring(self):
"""Periodically check system health"""
def update_health():
if self._running:
health = asyncio.run(self.ai_core.self_healing.check_health())
self.status_bar.config(
text=f"System Health - Memory: {health['memory_usage']}% | "
f"CPU: {health['cpu_load']}% | Response Time: {health['response_time']:.2f}s"
)
self.after(5000, update_health)
update_health()
async def main():
"""The main function initializes the AI system, handles user input in a loop,
generates responses using the AI system, and prints the insights, security level,
AI response, and safety analysis. It also ensures proper shutdown of the AI system
and its resources."""
print("🧠 Hybrid AI System Initializing (Local Models)")
ai = AICore()
app = AIApp(ai)
app.mainloop()
await ai.shutdown()
if __name__ == "__main__":
asyncio.run(main())