Spaces:
Sleeping
Sleeping
File size: 10,744 Bytes
f054e62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import json
import logging
import argparse
import numpy as np
import sys
import os
import re
from collections import Counter
import pickle
from gematria import letter_to_value, HEBREW_GEMATRIA_VALUES, linearize_umlauts, decompose_to_latin
# --- Konfiguration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger("gensim").setLevel(logging.WARNING)
HOLOGRAPHIC_STATE_SIZE_BITS = 4096
BITS_PER_CHAR = 16
BOOK_RANGE = range(1, 40)
MODELS_DIR = "models_by_book"
INDICES_DIR = "indices_by_book"
CACHE_FILE = "tanakh_data.cache"
SORTED_GEMATRIA = sorted(HEBREW_GEMATRIA_VALUES.items(), key=lambda item: item[1], reverse=True)
def setup_logging(debug_mode):
level = logging.DEBUG if debug_mode else logging.INFO
logging.getLogger().setLevel(level)
# --- Kern-Engine als Klasse ---
class TanakhExplorer:
def __init__(self, use_cache=True):
self.all_indices = {}
self.tanakh_state = None
cache_valid = use_cache and os.path.exists(CACHE_FILE)
if cache_valid:
try:
logging.info(f"Lade Daten aus Cache-Datei: {CACHE_FILE}")
with open(CACHE_FILE, 'rb') as f:
cached_data = pickle.load(f)
self.all_indices = cached_data.get('indices', {})
self.tanakh_state = cached_data.get('state')
logging.info("Daten erfolgreich aus Cache geladen.")
except Exception as e:
logging.warning(f"Cache-Datei ist korrupt oder konnte nicht geladen werden: {e}. Lade Daten neu.")
cache_valid = False
if not cache_valid or not self.all_indices or not self.tanakh_state:
self._load_all_indices()
self._create_tanakh_holographic_state()
if use_cache:
self._save_to_cache()
def _load_all_indices(self):
logging.info("Lade Index-Dateien für alle Bücher...")
for i in BOOK_RANGE:
index_path = os.path.join(INDICES_DIR, f"book_{i:02}_index.json")
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f: self.all_indices[i] = json.load(f)
if not self.all_indices: sys.exit("Keine Index-Dateien gefunden. Bitte 'build_indices.py' ausführen.")
logging.info(f"{len(self.all_indices)} Buch-Indizes geladen.")
def _create_tanakh_holographic_state(self):
logging.info("Erstelle holographischen Tanach-State...")
final_state = '0' * HOLOGRAPHIC_STATE_SIZE_BITS
full_binary_text = ""
for i in BOOK_RANGE:
try:
with open(f"texts/torah/{i:02}.json", 'r', encoding='utf-8') as file:
data = json.load(file)
full_text = ' '.join([' '.join(block) for block in data.get("text", [])])
clean_text = re.sub(r"[^\u05D0-\u05EA]+", "", re.sub(r"\[.*?\]", "", full_text, flags=re.DOTALL))
if clean_text:
full_binary_text += self._text_to_gematria_binary(clean_text, for_state=True)
except Exception: continue
self.tanakh_state = self._fold_into_state(full_binary_text)
logging.info("Holographischer Tanach-State wurde erstellt.")
def _save_to_cache(self):
logging.info(f"Speichere Daten in Cache-Datei: {CACHE_FILE}")
data_to_cache = {'indices': self.all_indices, 'state': self.tanakh_state}
with open(CACHE_FILE, 'wb') as f: pickle.dump(data_to_cache, f)
@staticmethod
def _text_to_gematria_binary(text, for_state=False):
text_for_calc = linearize_umlauts(text.lower())
if for_state:
clean_text = re.sub(r"[^\u05D0-\u05EA]+", "", text_for_calc)
else:
clean_text = re.sub(r"[^a-z\u05D0-\u05EA]+", "", text_for_calc)
logging.debug(f"text_to_gematria_binary (for_state={for_state}): Original='{text[:30]}...', Bereinigt='{clean_text[:30]}...'")
binary_string = "".join(format(letter_to_value(c), f'0{BITS_PER_CHAR}b') for c in clean_text)
logging.debug(f" -> erzeugter Binärstring (erste 64 Bits): {binary_string[:64]}")
return binary_string
@staticmethod
def _fold_into_state(binary_string, initial_state=None):
state = np.array(list(initial_state), dtype=np.int8) if initial_state else np.zeros(HOLOGRAPHIC_STATE_SIZE_BITS, dtype=np.int8)
for i in range(0, len(binary_string), HOLOGRAPHIC_STATE_SIZE_BITS):
block = binary_string[i:i+HOLOGRAPHIC_STATE_SIZE_BITS].ljust(HOLOGRAPHIC_STATE_SIZE_BITS, '0')
state = np.bitwise_xor(state, np.array(list(block), dtype=np.int8))
return "".join(state.astype(str))
def get_best_phrase_from_all_books(self, gematria_val, method):
best_overall_phrase_obj = None
best_overall_score = -1.0
for book_num, book_index in self.all_indices.items():
candidates = book_index.get(str(gematria_val), {}).get('phrases', [])
if not candidates: continue
pg_score = book_index.get(str(gematria_val), {}).get('pagerank', 0)
best_in_book = max(candidates, key=lambda p: pg_score / p.get('count', 1) if p.get('count', 0) > 0 else 0)
current_score = pg_score / best_in_book.get('count', 1) if best_in_book.get('count', 0) > 0 else 0
if current_score > best_overall_score:
best_overall_score = current_score
best_in_book['source_book'] = book_num
best_overall_phrase_obj = best_in_book
if best_overall_phrase_obj:
return best_overall_phrase_obj, "exact"
for offset in [1, -1]:
for book_num, book_index in self.all_indices.items():
candidates = book_index.get(str(gematria_val + offset), {}).get('phrases', [])
if candidates:
best_in_book = min(candidates, key=lambda p: p.get('position', float('inf')))
best_in_book['source_book'] = book_num
return best_in_book, f"neighbor(d={offset})"
decomposed = decompose_to_latin(gematria_val)
if decomposed:
return {"text": f"[{decomposed}]", "position": -2, "source_book": "N/A"}, "decomposed"
return None, None
def run_fractal_mode(self, query, depth, method):
print(f"\n" + "="*15 + f" FRAKTALE LOGOS-AUSSCHÖPFUNG (Tiefe: {depth}, Methode: {method}) " + "="*15)
initial_logos = query
# <<<<<<<<<<<<<<<<<< HIER IST DIE KORREKTUR >>>>>>>>>>>>>>>>>>>>
# Wir verwenden 0 für das Quell-Buch, um den TypeError zu vermeiden
all_found_phrases_map = {initial_logos: {"text": initial_logos, "position": -1, "depth": 0, "count":1, "source_book": 0}}
# <<<<<<<<<<<<<<<<<< ENDE DER KORREKTUR >>>>>>>>>>>>>>>>>>>>>
phrases_to_process_this_level = {initial_logos}
for d in range(depth):
logging.info(f"--- Starte Tiefe {d + 1}/{depth} mit {len(phrases_to_process_this_level)} Phrasen ---")
phrases_for_next_level = set()
for p_current in phrases_to_process_this_level:
combined_query = f"{initial_logos} {p_current}"
query_binary = self._text_to_gematria_binary(combined_query)
konzept_state = self._fold_into_state(query_binary)
final_konzept = "".join(str(int(a)^int(b)) for a,b in zip(self.tanakh_state, konzept_state))
for i in range(0, len(final_konzept), BITS_PER_CHAR):
gematria_val = int(final_konzept[i:i+BITS_PER_CHAR], 2)
if gematria_val == 0: continue
phrase_obj, _ = self.get_best_phrase_from_all_books(gematria_val, method)
if phrase_obj:
phrase_text = phrase_obj['text']
if phrase_text not in all_found_phrases_map:
phrase_obj['depth'] = d + 1
phrase_obj['count'] = 1
all_found_phrases_map[phrase_text] = phrase_obj
phrases_for_next_level.add(phrase_text)
else:
all_found_phrases_map[phrase_text]['count'] += 1
if not phrases_for_next_level:
logging.info(f"Keine neuen Phrasen in Tiefe {d + 1} gefunden.")
break
phrases_to_process_this_level = phrases_for_next_level
# Sortiere nach Buch und dann nach Position, um die narrative Ordnung beizubehalten
sorted_by_position = sorted(all_found_phrases_map.values(), key=lambda x: (x.get('source_book', 99), x.get('position', -1)))
print("\n--- Finale Synthese (geordnet nach Buch und Auftreten im Text) ---")
current_book = -1
for p in sorted_by_position:
book = p.get('source_book')
if book != current_book:
# Gib eine Kopfzeile für jedes neue Buch aus
if isinstance(book, int) and book > 0:
print(f"\n--- Buch {book:02d} ---")
elif book == 0:
print(f"--- Query ---")
current_book = book
print(f"{p['text']}", end=" | ")
print("\n")
# Sortiere nach Häufigkeit für die Top-Konzepte
sorted_by_count = sorted(all_found_phrases_map.values(), key=lambda x: x['count'], reverse=True)
print("\n--- Top 25 Resonanz-Konzepte (geordnet nach Häufigkeit im Fraktal) ---")
for p in sorted_by_count[:25]:
source = f"B{p.get('source_book', '??'):02d}" if isinstance(p.get('source_book'), int) and p.get('source_book') > 0 else p.get('source_book', 'N/A')
print(f"[{p['count']:2d}x] {p['text']} (Original in {source}, Pos: {p.get('position', 'N/A')})")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Tanakh Holographic Explorer (v13, Final).")
parser.add_argument("query", type=str, help="Die anfängliche Abfragephrase (Logos).")
parser.add_argument("--method", type=str, choices=['frequency', 'network', 'default'], default='network', help="Gewichtungsmethode.")
parser.add_argument("--depth", type=int, default=1, help="Maximale Tiefe der fraktalen Suche.")
parser.add_argument("--no-cache", action="store_true", help="Erzwingt das Neuladen der Daten.")
parser.add_argument("--debug", action="store_true", help="Aktiviert detaillierte Debug-Ausgaben.")
args = parser.parse_args()
setup_logging(args.debug)
engine = TanakhExplorer(use_cache=not args.no_cache)
engine.run_fractal_mode(args.query, args.depth, args.method)
|