Leonydis137 commited on
Commit
cb5b9a2
·
verified ·
1 Parent(s): e0c069c

Create memory_manager.py

Browse files
Files changed (1) hide show
  1. memory_manager.py +104 -0
memory_manager.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import pickle
3
+ import threading
4
+ import logging
5
+ from queue import Queue, Empty
6
+ from datetime import datetime
7
+ from functools import lru_cache
8
+ import numpy as np
9
+ import faiss
10
+ from sentence_transformers import SentenceTransformer
11
+ from time import perf_counter
12
+
13
+ # Configuration
14
+ MEMORY_FILE = os.environ.get("MEMORY_FILE", "memory.pkl")
15
+ INDEX_FILE = os.environ.get("INDEX_FILE", "memory.index")
16
+ EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
17
+
18
+ # Logging setup
19
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
20
+
21
+ # Load embedding model
22
+ embedding_model = SentenceTransformer(EMBEDDING_MODEL)
23
+
24
+ # Initialize memory store and FAISS index
25
+ try:
26
+ memory_data = pickle.load(open(MEMORY_FILE, "rb"))
27
+ memory_index = faiss.read_index(INDEX_FILE)
28
+ logging.info("Loaded existing memory and index.")
29
+ except Exception:
30
+ memory_data = []
31
+ dimension = embedding_model.get_sentence_embedding_dimension()
32
+ memory_index = faiss.IndexFlatL2(dimension)
33
+ logging.info("Initialized new memory and index.")
34
+
35
+ # Queue and worker for async flushing
36
+ _write_queue = Queue()
37
+
38
+ def _flush_worker():
39
+ """Background thread: batch writes to disk."""
40
+ while True:
41
+ batch = []
42
+ try:
43
+ item = _write_queue.get(timeout=5)
44
+ batch.append(item)
45
+ except Empty:
46
+ pass
47
+ # Drain queue
48
+ while not _write_queue.empty():
49
+ batch.append(_write_queue.get_nowait())
50
+ if batch:
51
+ try:
52
+ pickle.dump(memory_data, open(MEMORY_FILE, "wb"))
53
+ faiss.write_index(memory_index, INDEX_FILE)
54
+ logging.info(f"Flushed {len(batch)} entries to disk.")
55
+ except Exception as e:
56
+ logging.error(f"Flush error: {e}")
57
+
58
+ # Start flush thread
59
+ t = threading.Thread(target=_flush_worker, daemon=True)
60
+ t.start()
61
+
62
+ @lru_cache(maxsize=512)
63
+ def get_embedding(text: str) -> np.ndarray:
64
+ """Compute embedding with timing."""
65
+ start = perf_counter()
66
+ vec = embedding_model.encode(text)
67
+ elapsed = perf_counter() - start
68
+ logging.info(f"get_embedding: {elapsed:.3f}s for '{text[:20]}...'")
69
+ return vec
70
+
71
+
72
+ def embed_and_store(text: str, agent: str = "system", topic: str = ""):
73
+ """Embed text, add to FAISS and queue disk write."""
74
+ try:
75
+ vec = get_embedding(text)
76
+ memory_index.add(np.array([vec], dtype='float32'))
77
+ memory_data.append({
78
+ "text": text,
79
+ "agent": agent,
80
+ "topic": topic,
81
+ "timestamp": datetime.now().isoformat()
82
+ })
83
+ _write_queue.put(True)
84
+ logging.info(f"Queued memory: {agent} / '{text[:20]}...'")
85
+ except Exception as e:
86
+ logging.error(f"embed_and_store error: {e}")
87
+
88
+
89
+ def retrieve_relevant(query: str, k: int = 5) -> list:
90
+ """Return top-k relevant memory entries."""
91
+ try:
92
+ q_vec = get_embedding(query)
93
+ D, I = memory_index.search(np.array([q_vec], dtype='float32'), k)
94
+ results = []
95
+ for dist, idx in zip(D[0], I[0]):
96
+ if idx < len(memory_data):
97
+ entry = memory_data[idx]
98
+ entry_copy = entry.copy()
99
+ entry_copy['similarity'] = 1 - dist
100
+ results.append(entry_copy)
101
+ return results
102
+ except Exception as e:
103
+ logging.error(f"retrieve_relevant error: {e}")
104
+ return []