|
import os |
|
import json |
|
import logging |
|
from datetime import datetime |
|
from collections import defaultdict |
|
from typing import Dict, List, Any, Optional |
|
import numpy as np |
|
from sklearn.ensemble import IsolationForest |
|
import openai |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler("ai_system.log"), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
|
class MemoryStore: |
|
def __init__(self, persistence_file: str = "memory_store.json"): |
|
self.session_memories = defaultdict(list) |
|
self.persistent_memories = [] |
|
self.persistence_file = persistence_file |
|
self.recall_weights = defaultdict(float) |
|
self.sentiment_history = [] |
|
self.load_memory() |
|
|
|
def add_memory(self, key: str, content: Any, domain: str, sentiment: float = 0.0): |
|
"""Store memories with contextual linking""" |
|
memory = { |
|
"content": content, |
|
"timestamp": datetime.now().isoformat(), |
|
"domain": domain, |
|
"access_count": 0, |
|
"sentiment": sentiment, |
|
"associations": [] |
|
} |
|
|
|
|
|
if self.persistent_memories: |
|
last_memory = self.persistent_memories[-1] |
|
memory["associations"] = self._find_associations(last_memory["content"], content) |
|
|
|
self.session_memories[key].append(memory) |
|
self.persistent_memories.append(memory) |
|
self._update_recall_weight(key, boost=1.2) |
|
self.prune_memories() |
|
|
|
def recall(self, key: str, context: str = None) -> List[Any]: |
|
"""Context-aware recall with adaptive weights""" |
|
memories = [m for m in self.persistent_memories if key in m["content"]] |
|
|
|
if context: |
|
memories = self._contextual_filter(memories, context) |
|
|
|
|
|
weights = [ |
|
self.recall_weights[key] * |
|
(1 / (1 + self._days_since(m["timestamp"]))) * |
|
(1 + m["access_count"] * 0.1) |
|
for m in memories |
|
] |
|
|
|
return sorted(memories, key=lambda x: x["access_count"], reverse=True)[:10] |
|
|
|
def _find_associations(self, existing: str, new: str) -> List[str]: |
|
"""Semantic linking between concepts""" |
|
|
|
return list(set(existing.split()) & set(new.split())) |
|
|
|
def _contextual_filter(self, memories: List[dict], context: str) -> List[dict]: |
|
"""Filter memories based on contextual relevance""" |
|
|
|
return [m for m in memories if context.lower() in m["content"].lower()] |
|
|
|
def _days_since(self, timestamp: str) -> float: |
|
return (datetime.now() - datetime.fromisoformat(timestamp)).days |
|
|
|
def _update_recall_weight(self, key: str, boost: float = 1.0): |
|
self.recall_weights[key] = min(self.recall_weights[key] * boost, 5.0) |
|
|
|
def prune_memories(self): |
|
"""Modular pruning system with anomaly detection""" |
|
|
|
if len(self.persistent_memories) > 1000: |
|
X = np.array([len(m["content"]) for m in self.persistent_memories]).reshape(-1,1) |
|
clf = IsolationForest(contamination=0.1) |
|
preds = clf.fit_predict(X) |
|
self.persistent_memories = [m for m,p in zip(self.persistent_memories, preds) if p == 1] |
|
|
|
def save_memory(self): |
|
with open(self.persistence_file, "w") as f: |
|
json.dump({ |
|
"persistent": self.persistent_memories, |
|
"weights": self.recall_weights |
|
}, f) |
|
|
|
def load_memory(self): |
|
try: |
|
with open(self.persistence_file, "r") as f: |
|
data = json.load(f) |
|
self.persistent_memories = data.get("persistent", []) |
|
self.recall_weights = defaultdict(float, data.get("weights", {})) |
|
except FileNotFoundError: |
|
pass |
|
|
|
class SentientGPT: |
|
def __init__(self): |
|
self.memory = MemoryStore() |
|
self.session_context = defaultdict(dict) |
|
self.sentiment_window = [] |
|
self.engagement_history = [] |
|
|
|
def _track_engagement(self, response: str): |
|
"""Track user engagement patterns""" |
|
engagement = { |
|
"timestamp": datetime.now(), |
|
"response_length": len(response), |
|
"complexity": self._calculate_complexity(response) |
|
} |
|
self.engagement_history.append(engagement) |
|
|
|
if len(self.engagement_history) > 100: |
|
self.engagement_history.pop(0) |
|
|
|
def _calculate_complexity(self, text: str) -> float: |
|
"""Calculate text complexity score""" |
|
words = text.split() |
|
unique_words = len(set(words)) |
|
return (unique_words / len(words)) if words else 0 |
|
|
|
def process_query(self, user_id: str, query: str) -> str: |
|
"""Main processing pipeline""" |
|
|
|
sentiment = self._analyze_sentiment(query) |
|
self.sentiment_window.append(sentiment) |
|
|
|
|
|
context = self._update_context(user_id, query, sentiment) |
|
|
|
|
|
response = self._generate_response(query, context, sentiment) |
|
|
|
|
|
self.memory.add_memory( |
|
key=user_id, |
|
content=query, |
|
domain=self._detect_domain(query), |
|
sentiment=sentiment |
|
) |
|
|
|
|
|
self._track_engagement(response) |
|
|
|
return response |
|
|
|
def _analyze_sentiment(self, text: str) -> float: |
|
"""Dynamic sentiment analysis with moving window""" |
|
|
|
positive_words = {"good", "great", "happy", "awesome"} |
|
negative_words = {"bad", "terrible", "hate", "awful"} |
|
words = text.lower().split() |
|
score = (sum(1 for w in words if w in positive_words) - |
|
sum(1 for w in words if w in negative_words)) / len(words) |
|
|
|
|
|
if self.sentiment_window: |
|
score = 0.7 * score + 0.3 * np.mean(self.sentiment_window[-5:]) |
|
|
|
return max(min(score, 1.0), -1.0) |
|
|
|
def _detect_domain(self, query: str) -> str: |
|
"""Cross-domain detection""" |
|
domains = { |
|
"technical": {"how", "build", "code", "create"}, |
|
"emotional": {"feel", "think", "believe", "opinion"}, |
|
"factual": {"what", "when", "where", "why"} |
|
} |
|
|
|
words = set(query.lower().split()) |
|
scores = { |
|
domain: len(words & keywords) |
|
for domain, keywords in domains.items() |
|
} |
|
|
|
return max(scores, key=scores.get) |
|
|
|
def _update_context(self, user_id: str, query: str, sentiment: float) -> dict: |
|
"""Maintain dynamic conversation context""" |
|
context = self.session_context[user_id] |
|
|
|
|
|
context.setdefault("history", []).append(query) |
|
if len(context["history"]) > 5: |
|
context["history"].pop(0) |
|
|
|
|
|
context["sentiment"] = 0.8 * context.get("sentiment", 0) + 0.2 * sentiment |
|
|
|
return context |
|
|
|
def _generate_response(self, query: str, context: dict, sentiment: float) -> str: |
|
"""Generate response with contextual awareness""" |
|
|
|
memories = self.memory.recall( |
|
key=self._detect_domain(query), |
|
context=query |
|
) |
|
|
|
|
|
prompt = f"Context: {context}\nMemories: {memories[:3]}\nQuery: {query}" |
|
|
|
try: |
|
response = openai.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{"role": "system", "content": prompt}, |
|
{"role": "user", "content": query} |
|
] |
|
).choices[0].message['content'] |
|
|
|
|
|
if sentiment < -0.5: |
|
response = f"I understand this might be frustrating. {response}" |
|
elif sentiment > 0.5: |
|
response = f"Great to hear! {response}" |
|
|
|
except Exception as e: |
|
logger.error(f"API Error: {e}") |
|
response = "I'm having trouble processing that request right now." |
|
|
|
return response |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
bot = SentientGPT() |
|
|
|
while True: |
|
query = input("User: ") |
|
if query.lower() in ["exit", "quit"]: |
|
break |
|
|
|
response = bot.process_query("user123", query) |
|
print(f"AI: {response}") |
|
bot.memory.save_memory() |