xor_tanakh_2 / build_master_index.py
neuralworm's picture
initial commit
f054e62
import json
import logging
import re
import os
from collections import defaultdict
import gensim
import igraph as ig
import numpy as np
import time
from gematria import calculate_gematria, HEBREW_GEMATRIA_VALUES
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Konfiguration ---
BOOK_RANGE = range(1, 40)
MAX_PHRASE_WORDS = 3
# <<< NEU: Wir definieren Basis-Pfade für unsere strukturierte Ausgabe >>>
MODELS_DIR = "models_by_book"
INDICES_DIR = "indices_by_book"
def ensure_dirs():
"""Stellt sicher, dass die Ausgabeordner existieren."""
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(INDICES_DIR, exist_ok=True)
def load_book_corpus(book_number):
"""Lädt ein einzelnes Buch und gibt seinen Text als Liste von Wörtern und Sätzen zurück."""
try:
with open(f"texts/torah/{book_number:02}.json", 'r', encoding='utf-8') as file:
data = json.load(file)
full_text = ' '.join([' '.join(block) for block in data.get("text", [])])
text_no_brackets = re.sub(r"\[.*?\]", "", full_text, flags=re.DOTALL)
clean_text = re.sub(r"[^\u05D0-\u05EA\s]+", "", text_no_brackets)
words = clean_text.split()
sentences = [w.split() for w in clean_text.split('\n') if w]
return words, sentences
except FileNotFoundError:
return None, None
def get_phrase_vector(phrase_words, model):
vectors = [model.wv[word] for word in phrase_words if word in model.wv]
return np.mean(vectors, axis=0).tolist() if vectors else np.zeros(model.vector_size).tolist()
def process_book(book_number):
"""Führt den gesamten Indexierungsprozess für ein einzelnes Buch durch."""
logging.info(f"\n" + "="*20 + f" Verarbeite Buch {book_number:02} " + "="*20)
corpus_words, corpus_sentences = load_book_corpus(book_number)
if not corpus_words:
logging.warning(f"Buch {book_number:02} konnte nicht geladen werden oder ist leer.")
return
# 1. Trainiere spezifisches Word2Vec-Modell für dieses Buch
model_path = os.path.join(MODELS_DIR, f"book_{book_number:02}.w2v")
logging.info(f"Trainiere Word2Vec-Modell für Buch {book_number:02}...")
w2v_model = gensim.models.Word2Vec(sentences=corpus_sentences, vector_size=50, window=5, min_count=1, workers=4)
w2v_model.save(model_path)
# 2. Baue den Basis-Index für dieses Buch
logging.info(f"Erstelle Phrasen-Index für Buch {book_number:02}...")
book_index = defaultdict(lambda: {"phrases": []})
phrase_counts = defaultdict(int)
for i in range(len(corpus_words)):
for j in range(1, MAX_PHRASE_WORDS + 1):
if i + j > len(corpus_words): break
phrase_words = corpus_words[i:i+j]
phrase_text = " ".join(phrase_words)
phrase_counts[phrase_text] += 1
gematria_val = calculate_gematria("".join(phrase_words))
if gematria_val > 0:
if phrase_text not in [p['text'] for p in book_index[gematria_val]['phrases']]:
phrase_vector = get_phrase_vector(phrase_words, w2v_model)
book_index[gematria_val]['phrases'].append({"text": phrase_text, "vector": phrase_vector})
for gematria_val, data in book_index.items():
for phrase_data in data['phrases']:
phrase_data['count'] = phrase_counts[phrase_data['text']]
# 3. Netzwerk-Analyse für dieses Buch
logging.info(f"Erstelle Graphen für Buch {book_number:02}...")
edges = set()
for sentence in corpus_sentences:
unique_gematria = list(set([calculate_gematria(word) for word in sentence if calculate_gematria(word) > 0]))
for i in range(len(unique_gematria)):
for j in range(i + 1, len(unique_gematria)):
edges.add(tuple(sorted((unique_gematria[i], unique_gematria[j]))))
if edges:
G_ig = ig.Graph.TupleList(list(edges), directed=False)
pagerank_list = G_ig.pagerank()
pagerank_scores = {int(name): rank for name, rank in zip(G_ig.vs['name'], pagerank_list)}
for gematria_val in book_index.keys():
book_index[gematria_val]['pagerank'] = pagerank_scores.get(int(gematria_val), 0)
# 4. Speichere den Index für dieses Buch
index_path = os.path.join(INDICES_DIR, f"book_{book_number:02}_index.json")
logging.info(f"Speichere Index für Buch {book_number:02} in {index_path}...")
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(book_index, f)
def main():
ensure_dirs()
total_start_time = time.time()
for book_num in BOOK_RANGE:
process_book(book_num)
logging.info(f"\nAlle {len(BOOK_RANGE)} Bücher wurden erfolgreich indiziert.")
logging.info(f"Gesamtdauer: {(time.time() - total_start_time)/60:.2f} Minuten.")
if __name__ == "__main__":
main()