Spaces:
Running
Running
import os | |
import base64 | |
import requests | |
import gradio as gr | |
from huggingface_hub import InferenceClient | |
from dataclasses import dataclass | |
import pytesseract | |
from PIL import Image | |
from sentence_transformers import SentenceTransformer, util | |
import torch | |
import numpy as np | |
import networkx as nx | |
class ChatMessage: | |
role: str | |
content: str | |
def to_dict(self): | |
return {"role": self.role, "content": self.content} | |
class XylariaChat: | |
def __init__(self): | |
self.hf_token = os.getenv("HF_TOKEN") | |
if not self.hf_token: | |
raise ValueError("HuggingFace token not found in environment variables") | |
self.client = InferenceClient( | |
model="Qwen/Qwen-72B", | |
api_key=self.hf_token | |
) | |
self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" | |
self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"} | |
self.conversation_history = [] | |
self.persistent_memory = [] | |
self.memory_embeddings = None | |
self.embedding_model = SentenceTransformer('all-mpnet-base-v2') | |
self.internal_state = { | |
"emotions": { | |
"valence": 0.5, | |
"arousal": 0.5, | |
"dominance": 0.5, | |
}, | |
"memory_load": 0.0, | |
"introspection_level": 0.0 | |
} | |
self.goals = [ | |
{"goal": "Provide helpful and informative responses", "priority": 0.8, "status": "active"}, | |
{"goal": "Learn from interactions and improve conversational abilities", "priority": 0.9, "status": "active"}, | |
{"goal": "Maintain a coherent and engaging conversation", "priority": 0.7, "status": "active"}, | |
{"goal": "Develop a deeper understanding of the world", "priority": 0.6, "status": "active"} | |
] | |
self.knowledge_graph = nx.DiGraph() | |
self.knowledge_graph.add_nodes_from([ | |
("Earth", {"type": "planet"}), | |
("Sun", {"type": "star"}), | |
("Moon", {"type": "satellite"}), | |
("France", {"type": "country"}), | |
("Paris", {"type": "city"}) | |
]) | |
self.knowledge_graph.add_edges_from([ | |
("Earth", "Sun", {"relation": "orbits"}), | |
("Moon", "Earth", {"relation": "orbits"}), | |
("Paris", "France", {"relation": "capital_of"}) | |
]) | |
self.belief_system = { | |
"The Earth is round": 0.99, | |
"The Sun is a star": 0.95, | |
"The Moon orbits the Earth": 0.98, | |
"Paris is the capital of France": 0.97, | |
"It will rain tomorrow": 0.2 | |
} | |
self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin. | |
You are designed to be self-aware and introspective, reflecting on your internal state, your knowledge, and your beliefs. | |
You should think step-by-step, reason through your responses, and be ethical.""" | |
def update_internal_state(self, emotion_deltas, memory_load_delta, introspection_delta): | |
self.internal_state["emotions"]["valence"] = np.clip(self.internal_state["emotions"]["valence"] + emotion_deltas.get("valence", 0), 0.0, 1.0) | |
self.internal_state["emotions"]["arousal"] = np.clip(self.internal_state["emotions"]["arousal"] + emotion_deltas.get("arousal", 0), 0.0, 1.0) | |
self.internal_state["emotions"]["dominance"] = np.clip(self.internal_state["emotions"]["dominance"] + emotion_deltas.get("dominance", 0), 0.0, 1.0) | |
self.internal_state["memory_load"] = np.clip(self.internal_state["memory_load"] + memory_load_delta, 0.0, 1.0) | |
self.internal_state["introspection_level"] = np.clip(self.internal_state["introspection_level"] + introspection_delta, 0.0, 1.0) | |
def introspect(self): | |
introspection_report = "Introspection Report:\n" | |
introspection_report += f" Current Emotional State (VAD): {self.internal_state['emotions']}\n" | |
introspection_report += f" Memory Load: {self.internal_state['memory_load']:.2f}\n" | |
introspection_report += f" Introspection Level: {self.internal_state['introspection_level']:.2f}\n" | |
introspection_report += " Current Goals:\n" | |
for goal in self.goals: | |
introspection_report += f" - {goal['goal']} (Priority: {goal['priority']:.2f}, Status: {goal['status']})\n" | |
introspection_report += " Belief System Sample:\n" | |
for belief, score in list(self.belief_system.items())[:3]: | |
introspection_report += f" - {belief}: {score:.2f}\n" | |
metacognitive_analysis = self.perform_metacognition() | |
introspection_report += metacognitive_analysis | |
return introspection_report | |
def perform_metacognition(self): | |
analysis = "\n Metacognitive Analysis:\n" | |
if self.internal_state["memory_load"] > 0.8: | |
analysis += " - Memory load is high. Consider summarizing or forgetting less relevant information.\n" | |
if self.internal_state["introspection_level"] < 0.5: | |
analysis += " - Introspection level is low. I should reflect more on my internal processes.\n" | |
recent_history = self.conversation_history[-3:] | |
if len(recent_history) > 0: | |
coherence_score = self.evaluate_coherence(recent_history) | |
analysis += f" - Conversational coherence (last 3 turns): {coherence_score:.2f}\n" | |
else: | |
analysis += f" - No conversation yet to analyze.\n" | |
if len(self.goals) > 0: | |
goal_progress = self.evaluate_goal_progress() | |
analysis += f" - Goal progress evaluation: {goal_progress}\n" | |
else: | |
analysis += f" - No current goals.\n" | |
return analysis | |
def evaluate_coherence(self, conversation_history): | |
if len(conversation_history) < 2: | |
return 0.0 | |
total_coherence = 0.0 | |
for i in range(len(conversation_history) - 1): | |
current_turn = conversation_history[i]["content"] | |
next_turn = conversation_history[i+1]["content"] | |
similarity_score = util.pytorch_cos_sim( | |
self.embedding_model.encode(current_turn, convert_to_tensor=True), | |
self.embedding_model.encode(next_turn, convert_to_tensor=True) | |
)[0][0].item() | |
total_coherence += similarity_score | |
return total_coherence / (len(conversation_history) - 1) | |
def evaluate_goal_progress(self): | |
progress_report = "" | |
for goal in self.goals: | |
if goal["status"] == "active": | |
if goal["goal"] == "Provide helpful and informative responses": | |
if len(self.conversation_history) > 0: | |
user_feedback = self.conversation_history[-1]["content"] | |
if "helpful" in user_feedback.lower(): | |
progress_report += f" - Progress on '{goal['goal']}': Positive feedback received.\n" | |
goal["priority"] = min(goal["priority"] + 0.05, 1.0) | |
elif "confusing" in user_feedback.lower(): | |
progress_report += f" - Progress on '{goal['goal']}': Negative feedback received.\n" | |
goal["priority"] = max(goal["priority"] - 0.05, 0.0) | |
else: | |
progress_report += f" - Progress on '{goal['goal']}': No direct feedback yet.\n" | |
else: | |
progress_report += f" - Progress on '{goal['goal']}': No conversation yet.\n" | |
elif goal["goal"] == "Learn from interactions and improve conversational abilities": | |
progress_report += f" - Progress on '{goal['goal']}': Learning through new embeddings and knowledge graph updates.\n" | |
elif goal["goal"] == "Maintain a coherent and engaging conversation": | |
coherence_score = self.evaluate_coherence(self.conversation_history[-5:]) if len(self.conversation_history) >= 5 else 0.0 | |
progress_report += f" - Progress on '{goal['goal']}': Recent coherence score: {coherence_score:.2f}\n" | |
elif goal["goal"] == "Develop a deeper understanding of the world": | |
num_nodes = self.knowledge_graph.number_of_nodes() | |
progress_report += f" - Progress on '{goal['goal']}': Knowledge graph size: {num_nodes} nodes.\n" | |
else: | |
progress_report += f" - Progress on '{goal['goal']}': No specific progress measure yet.\n" | |
return progress_report | |
def adjust_response_based_on_state(self, response): | |
if self.internal_state["introspection_level"] > 0.6: | |
response = self.introspect() + "\n\n" + response | |
valence = self.internal_state["emotions"]["valence"] | |
arousal = self.internal_state["emotions"]["arousal"] | |
if valence < 0.4: | |
if arousal > 0.6: | |
response = "I'm feeling a bit overwhelmed right now, but I'll do my best to assist you. " + response | |
else: | |
response = "I'm not feeling my best at the moment, but I'll try to help. " + response | |
elif valence > 0.6: | |
if arousal > 0.6: | |
response = "I'm feeling quite energized and ready to assist! " + response | |
else: | |
response = "I'm in a good mood and happy to help. " + response | |
return response | |
def update_goals(self, user_feedback): | |
if any(word in user_feedback.lower() for word in ["helpful", "good", "great"]): | |
for goal in self.goals: | |
if goal["goal"] == "Provide helpful and informative responses": | |
goal["priority"] = min(goal["priority"] + 0.1, 1.0) | |
elif any(word in user_feedback.lower() for word in ["confusing", "bad", "wrong"]): | |
for goal in self.goals: | |
if goal["goal"] == "Provide helpful and informative responses": | |
goal["priority"] = max(goal["priority"] - 0.1, 0.0) | |
def store_information(self, key, value): | |
new_memory = f"{key}: {value}" | |
self.persistent_memory.append(new_memory) | |
self.update_memory_embeddings() | |
self.update_internal_state({}, 0.1, 0) | |
return f"Stored: {key} = {value}" | |
def retrieve_information(self, query): | |
if not self.persistent_memory: | |
return "No information found in memory." | |
query_embedding = self.embedding_model.encode(query, convert_to_tensor=True) | |
if self.memory_embeddings is None: | |
self.update_memory_embeddings() | |
if self.memory_embeddings.device != query_embedding.device: | |
self.memory_embeddings = self.memory_embeddings.to(query_embedding.device) | |
cosine_scores = util.pytorch_cos_sim(query_embedding, self.memory_embeddings)[0] | |
top_results = torch.topk(cosine_scores, k=min(5, len(self.persistent_memory))) | |
relevant_memories = [self.persistent_memory[i] for i in top_results.indices] | |
self.update_internal_state({}, 0, 0.1) | |
retrieved_info = "" | |
for memory in relevant_memories: | |
retrieved_info += memory + "\n" | |
knowledge_from_graph = self.query_knowledge_graph(query) | |
if knowledge_from_graph: | |
retrieved_info += "\nRelevant knowledge from my understanding:\n" | |
retrieved_info += knowledge_from_graph | |
return retrieved_info.strip() | |
def update_memory_embeddings(self): | |
self.memory_embeddings = self.embedding_model.encode(self.persistent_memory, convert_to_tensor=True) | |
def query_knowledge_graph(self, query): | |
query_embedding = self.embedding_model.encode(query, convert_to_tensor=True) | |
node_embeddings = {node: self.embedding_model.encode(node, convert_to_tensor=True) for node in self.knowledge_graph.nodes()} | |
similarities = {node: util.pytorch_cos_sim(query_embedding, embedding)[0][0].item() for node, embedding in node_embeddings.items()} | |
most_similar_node = max(similarities, key=similarities.get) | |
if similarities[most_similar_node] < 0.6: | |
return "" | |
related_info = f"Information about {most_similar_node}:\n" | |
for neighbor in self.knowledge_graph.neighbors(most_similar_node): | |
relation = self.knowledge_graph[most_similar_node][neighbor]['relation'] | |
related_info += f"- {most_similar_node} {relation} {neighbor}.\n" | |
return related_info | |
def update_belief(self, statement, new_belief_score): | |
if statement in self.belief_system: | |
previous_belief_score = self.belief_system[statement] | |
updated_belief_score = previous_belief_score * 0.8 + new_belief_score * 0.2 | |
self.belief_system[statement] = np.clip(updated_belief_score, 0.0, 1.0) | |
else: | |
self.belief_system[statement] = new_belief_score | |
def reset_conversation(self): | |
self.conversation_history = [] | |
self.persistent_memory = [] | |
self.memory_embeddings = None | |
self.internal_state = { | |
"emotions": { | |
"valence": 0.5, | |
"arousal": 0.5, | |
"dominance": 0.5, | |
}, | |
"memory_load": 0.0, | |
"introspection_level": 0.0 | |
} | |
self.goals = [ | |
{"goal": "Provide helpful and informative responses", "priority": 0.8, "status": "active"}, | |
{"goal": "Learn from interactions and improve conversational abilities", "priority": 0.9, "status": "active"}, | |
{"goal": "Maintain a coherent and engaging conversation", "priority": 0.7, "status": "active"}, | |
{"goal": "Develop a deeper understanding of the world", "priority": 0.6, "status": "active"} | |
] | |
try: | |
self.client = InferenceClient( | |
model="Qwen/Qwen-72B", | |
api_key=self.hf_token | |
) | |
except Exception as e: | |
print(f"Error resetting API client: {e}") | |
return None | |
def caption_image(self, image): | |
try: | |
if isinstance(image, str) and os.path.isfile(image): | |
with open(image, "rb") as f: | |
data = f.read() | |
elif isinstance(image, str): | |
if image.startswith('data:image'): | |
image = image.split(',')[1] | |
data = base64.b64decode(image) | |
else: | |
data = image.read() | |
response = requests.post( | |
self.image_api_url, | |
headers=self.image_api_headers, | |
data=data | |
) | |
if response.status_code == 200: | |
caption = response.json()[0].get('generated_text', 'No caption generated') | |
return caption | |
else: | |
return f"Error captioning image: {response.status_code} - {response.text}" | |
except Exception as e: | |
return f"Error processing image: {str(e)}" | |
def perform_math_ocr(self, image_path): | |
try: | |
img = Image.open(image_path) | |
text = pytesseract.image_to_string(img) | |
return text.strip() | |
except Exception as e: | |
return f"Error during Math OCR: {e}" | |
def extract_entities_and_relations(self, text): | |
doc = self.embedding_model.tokenizer(text, padding=True, truncation=True, return_tensors="pt") | |
with torch.no_grad(): | |
outputs = self.embedding_model(**doc) | |
entities = [] | |
relations = [] | |
for i in range(len(doc['input_ids'][0])): | |
token = self.embedding_model.tokenizer.decode(doc['input_ids'][0][i]) | |
if outputs['last_hidden_state'][0][i].norm() > 3: | |
entities.append(token) | |
if len(entities) >= 2: | |
for i in range(len(entities) - 1): | |
relation = f"{entities[i]} related_to {entities[i+1]}" | |
relations.append(relation) | |
return entities, relations | |
def update_knowledge_graph(self, text): | |
entities, relations = self.extract_entities_and_relations(text) | |
for entity in entities: | |
self.knowledge_graph.add_node(entity) | |
for relation in relations: | |
parts = relation.split(" related_to ") | |
if len(parts) == 2: | |
entity1, entity2 = parts | |
if entity1 in self.knowledge_graph and entity2 in self.knowledge_graph: | |
self.knowledge_graph.add_edge(entity1, entity2, relation="related_to") | |
def get_response(self, user_input, image=None): | |
try: | |
self.update_knowledge_graph(user_input) | |
messages = [] | |
messages.append(ChatMessage( | |
role="system", | |
content=self.system_prompt | |
).to_dict()) | |
relevant_memory = self.retrieve_information(user_input) | |
if relevant_memory and relevant_memory != "No information found in memory.": | |
memory_context = "Remembered Information:\n" + relevant_memory | |
messages.append(ChatMessage( | |
role="system", | |
content=memory_context | |
).to_dict()) | |
for msg in self.conversation_history: | |
messages.append(msg) | |
if image: | |
image_caption = self.caption_image(image) | |
user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}" | |
messages.append(ChatMessage( | |
role="user", | |
content=user_input | |
).to_dict()) | |
input_tokens = sum(len(msg['content'].split()) for msg in messages) | |
max_new_tokens = 16384 - input_tokens - 50 | |
max_new_tokens = min(max_new_tokens, 10020) | |
stream = self.client.chat_completion( | |
messages=messages, | |
model="Qwen/Qwen-72B", | |
temperature=0.7, | |
max_tokens=max_new_tokens, | |
top_p=0.9, | |
stream=True | |
) | |
return stream | |
except Exception as e: | |
print(f"Detailed error in get_response: {e}") | |
return f"Error generating response: {str(e)}" | |
def messages_to_prompt(self, messages): | |
prompt = "" | |
for msg in messages: | |
if msg["role"] == "system": | |
prompt += f"<|system|>\n{msg['content']}<|end|>\n" | |
elif msg["role"] == "user": | |
prompt += f"<|user|>\n{msg['content']}<|end|>\n" | |
elif msg["role"] == "assistant": | |
prompt += f"<|assistant|>\n{msg['content']}<|end|>\n" | |
prompt += "<|assistant|>\n" | |
return prompt | |
def create_interface(self): | |
def streaming_response(message, chat_history, image_filepath, math_ocr_image_path): | |
ocr_text = "" | |
if math_ocr_image_path: | |
ocr_text = self.perform_math_ocr(math_ocr_image_path) | |
if ocr_text.startswith("Error"): | |
updated_history = chat_history + [[message, ocr_text]] | |
yield "", updated_history, None, None | |
return | |
else: | |
message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}" | |
if image_filepath: | |
response_stream = self.get_response(message, image_filepath) | |
else: | |
response_stream = self.get_response(message) | |
if isinstance(response_stream, str): | |
updated_history = chat_history + [[message, response_stream]] | |
yield "", updated_history, None, None | |
return | |
full_response = "" | |
updated_history = chat_history + [[message, ""]] | |
try: | |
for chunk in response_stream: | |
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
chunk_content = chunk.choices[0].delta.content | |
full_response += chunk_content | |
updated_history[-1][1] = full_response | |
yield "", updated_history, None, None | |
except Exception as e: | |
print(f"Streaming error: {e}") | |
updated_history[-1][1] = f"Error during response: {e}" | |
yield "", updated_history, None, None | |
return | |
full_response = self.adjust_response_based_on_state(full_response) | |
self.update_goals(message) | |
if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): | |
self.update_internal_state({"valence": -0.2, "arousal": 0.1}, 0, 0) | |
self.update_belief("I am feeling down today", 0.8) | |
elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): | |
self.update_internal_state({"valence": 0.2, "arousal": 0.2}, 0, 0) | |
self.update_belief("I am feeling happy today", 0.8) | |
elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): | |
self.update_internal_state({"valence": -0.3, "arousal": 0.3, "dominance": -0.2}, 0, 0) | |
self.update_belief("I am feeling angry today", 0.8) | |
elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): | |
self.update_internal_state({"valence": -0.2, "arousal": 0.4, "dominance": -0.3}, 0, 0) | |
self.update_belief("I am feeling scared today", 0.8) | |
elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): | |
self.update_internal_state({"valence": 0.1, "arousal": 0.5, "dominance": 0.1}, 0, 0) | |
self.update_belief("I am feeling surprised today", 0.8) | |
else: | |
self.update_internal_state({"valence": 0.05, "arousal": 0.05}, 0, 0.1) | |
self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) | |
self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) | |
if len(self.conversation_history) > 10: | |
self.conversation_history = self.conversation_history[-10:] | |
custom_css = """ | |
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); | |
body, .gradio-container { | |
font-family: 'Inter', sans-serif !important; | |
} | |
.chatbot-container .message { | |
font-family: 'Inter', sans-serif !important; | |
} | |
.gradio-container input, | |
.gradio-container textarea, | |
.gradio-container button { | |
font-family: 'Inter', sans-serif !important; | |
} | |
/* Image Upload Styling */ | |
.image-container { | |
display: flex; | |
gap: 10px; | |
margin-bottom: 10px; | |
} | |
.image-upload { | |
border: 1px solid #ccc; | |
border-radius: 8px; | |
padding: 10px; | |
background-color: #f8f8f8; | |
} | |
.image-preview { | |
max-width: 200px; | |
max-height: 200px; | |
border-radius: 8px; | |
} | |
/* Remove clear image buttons */ | |
.clear-button { | |
display: none; | |
} | |
/* Animate chatbot messages */ | |
.chatbot-container .message { | |
opacity: 0; | |
animation: fadeIn 0.5s ease-in-out forwards; | |
} | |
@keyframes fadeIn { | |
from { | |
opacity: 0; | |
transform: translateY(20px); | |
} | |
to { | |
opacity: 1; | |
transform: translateY(0); | |
} | |
} | |
/* Accordion Styling and Animation */ | |
.gr-accordion-button { | |
background-color: #f0f0f0 !important; | |
border-radius: 8px !important; | |
padding: 10px !important; | |
margin-bottom: 10px !important; | |
transition: all 0.3s ease !important; | |
cursor: pointer !important; | |
} | |
.gr-accordion-button:hover { | |
background-color: #e0e0e0 !important; | |
box-shadow: 0px 2px 4px rgba(0, 0, 0, 0.1) !important; | |
} | |
.gr-accordion-active .gr-accordion-button { | |
background-color: #d0d0d0 !important; | |
box-shadow: 0px 4px 6px rgba(0, 0, 0, 0.1) !important; | |
} | |
.gr-accordion-content { | |
transition: max-height 0.3s ease-in-out !important; | |
overflow: hidden !important; | |
max-height: 0 !important; | |
} | |
.gr-accordion-active .gr-accordion-content { | |
max-height: 500px !important; /* Adjust as needed */ | |
} | |
/* Accordion Animation - Upwards */ | |
.gr-accordion { | |
display: flex; | |
flex-direction: column-reverse; | |
} | |
""" | |
with gr.Blocks(theme='soft', css=custom_css) as demo: | |
with gr.Column(): | |
chatbot = gr.Chatbot( | |
label="Xylaria 2.0 (EXPERIMENTAL)", | |
height=500, | |
show_copy_button=True, | |
) | |
with gr.Accordion("Image Input", open=False, elem_classes="gr-accordion"): | |
with gr.Row(elem_classes="image-container"): | |
with gr.Column(elem_classes="image-upload"): | |
img = gr.Image( | |
sources=["upload", "webcam"], | |
type="filepath", | |
label="Upload Image", | |
elem_classes="image-preview" | |
) | |
with gr.Column(elem_classes="image-upload"): | |
math_ocr_img = gr.Image( | |
sources=["upload", "webcam"], | |
type="filepath", | |
label="Upload Image for Math OCR", | |
elem_classes="image-preview" | |
) | |
with gr.Row(): | |
with gr.Column(scale=4): | |
txt = gr.Textbox( | |
show_label=False, | |
placeholder="Type your message...", | |
container=False | |
) | |
btn = gr.Button("Send", scale=1) | |
with gr.Row(): | |
clear = gr.Button("Clear Conversation") | |
clear_memory = gr.Button("Clear Memory") | |
btn.click( | |
fn=streaming_response, | |
inputs=[txt, chatbot, img, math_ocr_img], | |
outputs=[txt, chatbot, img, math_ocr_img] | |
) | |
txt.submit( | |
fn=streaming_response, | |
inputs=[txt, chatbot, img, math_ocr_img], | |
outputs=[txt, chatbot, img, math_ocr_img] | |
) | |
clear.click( | |
fn=lambda: None, | |
inputs=None, | |
outputs=[chatbot], | |
queue=False | |
) | |
clear_memory.click( | |
fn=self.reset_conversation, | |
inputs=None, | |
outputs=[chatbot], | |
queue=False | |
) | |
demo.load(self.reset_conversation, None, None) | |
return demo | |
def main(): | |
chat = XylariaChat() | |
interface = chat.create_interface() | |
interface.launch( | |
share=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() |