Spaces:
Runtime error
Runtime error
import os | |
import json | |
import logging | |
from datetime import datetime | |
from collections import defaultdict | |
from typing import Dict, List, Any, Optional | |
import numpy as np | |
from sklearn.ensemble import IsolationForest | |
import openai | |
from dotenv import load_dotenv | |
load_dotenv() | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("ai_system.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
class MemoryStore: | |
def __init__(self, persistence_file: str = "memory_store.json"): | |
self.session_memories = defaultdict(list) | |
self.persistent_memories = [] | |
self.persistence_file = persistence_file | |
self.recall_weights = defaultdict(float) | |
self.sentiment_history = [] | |
self.load_memory() | |
def add_memory(self, key: str, content: Any, domain: str, sentiment: float = 0.0): | |
"""Store memories with contextual linking""" | |
memory = { | |
"content": content, | |
"timestamp": datetime.now().isoformat(), | |
"domain": domain, | |
"access_count": 0, | |
"sentiment": sentiment, | |
"associations": [] | |
} | |
# Cross-domain linking | |
if self.persistent_memories: | |
last_memory = self.persistent_memories[-1] | |
memory["associations"] = self._find_associations(last_memory["content"], content) | |
self.session_memories[key].append(memory) | |
self.persistent_memories.append(memory) | |
self._update_recall_weight(key, boost=1.2) | |
self.prune_memories() | |
def recall(self, key: str, context: str = None) -> List[Any]: | |
"""Context-aware recall with adaptive weights""" | |
memories = [m for m in self.persistent_memories if key in m["content"]] | |
if context: | |
memories = self._contextual_filter(memories, context) | |
# Apply temporal decay and frequency weights | |
weights = [ | |
self.recall_weights[key] * | |
(1 / (1 + self._days_since(m["timestamp"]))) * | |
(1 + m["access_count"] * 0.1) | |
for m in memories | |
] | |
return sorted(memories, key=lambda x: x["access_count"], reverse=True)[:10] | |
def _find_associations(self, existing: str, new: str) -> List[str]: | |
"""Semantic linking between concepts""" | |
# Placeholder for actual semantic similarity model | |
return list(set(existing.split()) & set(new.split())) | |
def _contextual_filter(self, memories: List[dict], context: str) -> List[dict]: | |
"""Filter memories based on contextual relevance""" | |
# Placeholder for actual contextual similarity model | |
return [m for m in memories if context.lower() in m["content"].lower()] | |
def _days_since(self, timestamp: str) -> float: | |
return (datetime.now() - datetime.fromisoformat(timestamp)).days | |
def _update_recall_weight(self, key: str, boost: float = 1.0): | |
self.recall_weights[key] = min(self.recall_weights[key] * boost, 5.0) | |
def prune_memories(self): | |
"""Modular pruning system with anomaly detection""" | |
# Remove less relevant memories using isolation forest | |
if len(self.persistent_memories) > 1000: | |
X = np.array([len(m["content"]) for m in self.persistent_memories]).reshape(-1,1) | |
clf = IsolationForest(contamination=0.1) | |
preds = clf.fit_predict(X) | |
self.persistent_memories = [m for m,p in zip(self.persistent_memories, preds) if p == 1] | |
def save_memory(self): | |
with open(self.persistence_file, "w") as f: | |
json.dump({ | |
"persistent": self.persistent_memories, | |
"weights": self.recall_weights | |
}, f) | |
def load_memory(self): | |
try: | |
with open(self.persistence_file, "r") as f: | |
data = json.load(f) | |
self.persistent_memories = data.get("persistent", []) | |
self.recall_weights = defaultdict(float, data.get("weights", {})) | |
except FileNotFoundError: | |
pass | |
class SentientGPT: | |
def __init__(self): | |
self.memory = MemoryStore() | |
self.session_context = defaultdict(dict) | |
self.sentiment_window = [] | |
self.engagement_history = [] | |
def _track_engagement(self, response: str): | |
"""Track user engagement patterns""" | |
engagement = { | |
"timestamp": datetime.now(), | |
"response_length": len(response), | |
"complexity": self._calculate_complexity(response) | |
} | |
self.engagement_history.append(engagement) | |
if len(self.engagement_history) > 100: | |
self.engagement_history.pop(0) | |
def _calculate_complexity(self, text: str) -> float: | |
"""Calculate text complexity score""" | |
words = text.split() | |
unique_words = len(set(words)) | |
return (unique_words / len(words)) if words else 0 | |
def process_query(self, user_id: str, query: str) -> str: | |
"""Main processing pipeline""" | |
# Analyze sentiment | |
sentiment = self._analyze_sentiment(query) | |
self.sentiment_window.append(sentiment) | |
# Update context | |
context = self._update_context(user_id, query, sentiment) | |
# Generate response | |
response = self._generate_response(query, context, sentiment) | |
# Memory operations | |
self.memory.add_memory( | |
key=user_id, | |
content=query, | |
domain=self._detect_domain(query), | |
sentiment=sentiment | |
) | |
# Track engagement | |
self._track_engagement(response) | |
return response | |
def _analyze_sentiment(self, text: str) -> float: | |
"""Dynamic sentiment analysis with moving window""" | |
# Placeholder for actual sentiment analysis | |
positive_words = {"good", "great", "happy", "awesome"} | |
negative_words = {"bad", "terrible", "hate", "awful"} | |
words = text.lower().split() | |
score = (sum(1 for w in words if w in positive_words) - | |
sum(1 for w in words if w in negative_words)) / len(words) | |
# Apply moving window smoothing | |
if self.sentiment_window: | |
score = 0.7 * score + 0.3 * np.mean(self.sentiment_window[-5:]) | |
return max(min(score, 1.0), -1.0) | |
def _detect_domain(self, query: str) -> str: | |
"""Cross-domain detection""" | |
domains = { | |
"technical": {"how", "build", "code", "create"}, | |
"emotional": {"feel", "think", "believe", "opinion"}, | |
"factual": {"what", "when", "where", "why"} | |
} | |
words = set(query.lower().split()) | |
scores = { | |
domain: len(words & keywords) | |
for domain, keywords in domains.items() | |
} | |
return max(scores, key=scores.get) | |
def _update_context(self, user_id: str, query: str, sentiment: float) -> dict: | |
"""Maintain dynamic conversation context""" | |
context = self.session_context[user_id] | |
# Maintain last 5 interactions | |
context.setdefault("history", []).append(query) | |
if len(context["history"]) > 5: | |
context["history"].pop(0) | |
# Track sentiment trends | |
context["sentiment"] = 0.8 * context.get("sentiment", 0) + 0.2 * sentiment | |
return context | |
def _generate_response(self, query: str, context: dict, sentiment: float) -> str: | |
"""Generate response with contextual awareness""" | |
# Retrieve relevant memories | |
memories = self.memory.recall( | |
key=self._detect_domain(query), | |
context=query | |
) | |
# Build prompt with context | |
prompt = f"Context: {context}\nMemories: {memories[:3]}\nQuery: {query}" | |
try: | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": prompt}, | |
{"role": "user", "content": query} | |
] | |
).choices[0].message['content'] | |
# Adjust response based on sentiment | |
if sentiment < -0.5: | |
response = f"I understand this might be frustrating. {response}" | |
elif sentiment > 0.5: | |
response = f"Great to hear! {response}" | |
except Exception as e: | |
logger.error(f"API Error: {e}") | |
response = "I'm having trouble processing that request right now." | |
return response | |
# ==================== | |
# Usage Example | |
# ==================== | |
if __name__ == "__main__": | |
bot = SentientGPT() | |
while True: | |
query = input("User: ") | |
if query.lower() in ["exit", "quit"]: | |
break | |
response = bot.process_query("user123", query) | |
print(f"AI: {response}") | |
bot.memory.save_memory() |