neuralworm's picture
change to multi-line input (comparison)
8ea2147
raw
history blame
18.9 kB
import gradio as gr
import json
import re
import sqlite3
import logging
from collections import defaultdict
from typing import Tuple, Dict, List
# Assuming you have these files in your project
from util import process_json_files
from gematria import calculate_gematria
from deep_translator import GoogleTranslator, exceptions
from urllib.parse import quote_plus
from tqdm import tqdm
# Constants
DATABASE_FILE = 'gematria.db'
MAX_PHRASE_LENGTH_LIMIT = 20
BATCH_SIZE = 10000
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Global variables
conn: sqlite3.Connection = None
translator: GoogleTranslator = None
book_names: Dict[int, str] = {}
gematria_cache: Dict[Tuple[int, int], List[Tuple[str, str, int, int, int, str]]] = {}
translation_cache: Dict[str, str] = {}
total_word_count: int = 0 # Global counter for word position
def initialize_database() -> None:
"""Initializes the SQLite database."""
global conn
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS results (
gematria_sum INTEGER,
words TEXT,
translation TEXT,
book TEXT,
chapter INTEGER,
verse INTEGER,
phrase_length INTEGER,
word_position TEXT,
PRIMARY KEY (gematria_sum, words, book, chapter, verse, word_position)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_results_gematria
ON results (gematria_sum)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS processed_books (
book TEXT PRIMARY KEY,
max_phrase_length INTEGER
)
''')
conn.commit()
def initialize_translator() -> None:
"""Initializes the Google Translator."""
global translator
translator = GoogleTranslator(source='iw', target='en')
logging.info("Translator initialized.")
def process_book(book_id: int, max_phrase_length: int, cursor):
"""Processes a single book and returns phrases to insert."""
global book_names, total_word_count
book_data = process_json_files(book_id, book_id)
phrases_to_insert = []
if book_id in book_data:
book_data = book_data[book_id]
if 'title' not in book_data or not isinstance(book_data['title'], str):
logging.warning(f"Skipping book {book_id} due to missing 'title' field.")
return phrases_to_insert
title = book_data['title']
book_names[book_id] = title
# Check if this book has already been processed for this phrase length
cursor.execute('''SELECT max_phrase_length FROM processed_books WHERE book = ?''', (title,))
result = cursor.fetchone()
if result and result[0] >= max_phrase_length:
logging.info(f"Skipping book {title}: Already processed with max_phrase_length {result[0]}")
return phrases_to_insert
if 'text' not in book_data or not isinstance(book_data['text'], list):
logging.warning(f"Skipping book {book_id} due to missing 'text' field.")
return phrases_to_insert
chapters = book_data['text']
for chapter_id, chapter in enumerate(chapters):
for verse_id, verse in enumerate(chapter):
verse_text = flatten_text(verse)
verse_text = re.sub(r'\[.*?\]', '', verse_text)
verse_text = re.sub(r"[^\u05D0-\u05EA ]+", "", verse_text)
verse_text = re.sub(r" +", " ", verse_text)
words = verse_text.split()
for length in range(1, max_phrase_length + 1):
for start in range(len(words) - length + 1):
phrase_candidate = " ".join(words[start:start + length])
gematria_sum = calculate_gematria(phrase_candidate.replace(" ", ""))
word_position_range = f"{total_word_count + start + 1}-{total_word_count + start + length}"
phrases_to_insert.append(
(gematria_sum, phrase_candidate, None, title, chapter_id + 1, verse_id + 1, length,
word_position_range))
total_word_count += len(words)
return phrases_to_insert
def populate_database(start_book: int, end_book: int, max_phrase_length: int = 1) -> None:
"""Populates the database with phrases from the Tanach."""
global conn, book_names, total_word_count
logging.info(f"Populating database with books from {start_book} to {end_book}...")
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
for book_id in tqdm(range(start_book, end_book + 1), desc="Processing Books"):
phrases_to_insert = process_book(book_id, max_phrase_length, cursor)
if phrases_to_insert:
cursor.executemany('''
INSERT OR IGNORE INTO results (gematria_sum, words, translation, book, chapter, verse, phrase_length, word_position)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', phrases_to_insert)
# Update processed_books after processing each book
cursor.execute('''
INSERT OR REPLACE INTO processed_books (book, max_phrase_length)
VALUES (?, ?)
''', (book_names[book_id], max_phrase_length))
conn.commit()
total_word_count = 0 # Reset for the next set of phrase lengths
def get_translation(phrase: str) -> str:
"""Retrieves or generates the English translation of a Hebrew phrase
and caches it in the database.
"""
global conn, translator, translation_cache
# Check if the translation exists in the database
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
cursor.execute("SELECT translation FROM results WHERE words = ? LIMIT 1", (phrase,))
result = cursor.fetchone()
if result and result[0]: # If a translation exists, use it
return result[0]
# If no translation in the database, translate and store it
translation = translate_and_store(phrase)
translation_cache[phrase] = translation
# Update the database with the new translation
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
cursor.execute("UPDATE results SET translation = ? WHERE words = ?", (translation, phrase))
conn.commit()
return translation
def translate_and_store(phrase: str) -> str:
"""Translates a Hebrew phrase to English using Google Translate."""
global translator
max_retries = 3
retries = 0
while retries < max_retries:
try:
translation = translator.translate(phrase)
return translation
except (exceptions.TranslationNotFound, exceptions.NotValidPayload,
exceptions.ServerException, exceptions.RequestError) as e:
retries += 1
logging.warning(f"Error translating phrase '{phrase}': {e}. Retrying... ({retries}/{max_retries})")
logging.error(f"Failed to translate phrase '{phrase}' after {max_retries} retries.")
return "[Translation Error]"
def search_gematria_in_db(gematria_sum: int, max_words: int) -> List[Tuple[str, str, int, int, int, str]]:
"""Searches the database for phrases with a given Gematria value."""
global conn
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT words, book, chapter, verse, phrase_length, word_position
FROM results
WHERE gematria_sum = ? AND phrase_length <= ?
''', (gematria_sum, max_words))
results = cursor.fetchall()
return results
def gematria_search_interface(phrases: str, max_words: int, show_translation: bool) -> str:
"""The main function for the Gradio interface, handling multiple phrases."""
global conn, book_names, gematria_cache
results = []
all_results = [] # Store results for each phrase
middle_words_results = [] # Store middle word results for all books
phrases = phrases.strip().splitlines()
if not phrases:
return "Please enter at least one phrase."
for phrase in phrases:
if not phrase.strip():
continue # Skip empty lines
numbers = re.findall(r'\d+', phrase)
text_without_numbers = re.sub(r'\d+', '', phrase)
phrase_gematria = calculate_gematria(text_without_numbers.replace(" ", ""))
phrase_gematria += sum(int(number) for number in numbers)
if (phrase_gematria, max_words) in gematria_cache:
matching_phrases = gematria_cache[(phrase_gematria, max_words)]
else:
matching_phrases = search_gematria_in_db(phrase_gematria, max_words)
gematria_cache[(phrase_gematria, max_words)] = matching_phrases
if not matching_phrases:
results.append(f"No matching phrases found for: {phrase}")
continue
sorted_phrases = sorted(matching_phrases,
key=lambda x: (int(list(book_names.keys())[list(book_names.values()).index(x[1])]), x[2],
x[3]))
results_by_book = defaultdict(list)
for words, book, chapter, verse, phrase_length, word_position in sorted_phrases:
results_by_book[book].append((words, chapter, verse, phrase_length, word_position))
results.append(f"<h2>Results for: {phrase} (Gematria: {phrase_gematria})</h2>")
results.append("<div class='results-container'>")
for book, phrases in results_by_book.items():
for words, chapter, verse, phrase_length, word_position in phrases:
translation = get_translation(words) if show_translation else ""
link = f"https://www.biblegateway.com/passage/?search={quote_plus(book)}+{chapter}%3A{verse}&version=CJB"
results.append(f"""
<div class='result-item'>
<p><b>Book:</b> {book}</p>
<p><b>Chapter:</b> {chapter}, <b>Verse:</b> {verse}</p>
<p class='hebrew-phrase'><b>Hebrew Phrase:</b> {words}</p>
<p><b>Translation:</b> {translation}</p>
<p><b>Phrase Length:</b> {phrase_length} words</p>
<p><b>Phrase Gematria:</b> {phrase_gematria}</p>
<p><b>Word Position in the Tanach:</b> {word_position}</p>
<a href='{link}' target='_blank' class='bible-link'>[See on Bible Gateway]</a>
</div>
""")
results.append("</div>")
all_results.append(results_by_book) # Store results by book without the phrase
# Calculate middle words for all input lines
if len(all_results) >= 2:
results.append("<h2>Middle Words:</h2>")
results.append("<div class='results-container'>")
common_books = set.intersection(*[set(results.keys()) for results in all_results])
logging.debug(f"Common books: {common_books}")
for book in common_books:
logging.debug(f"Processing book: {book}")
# Find nearest positions for all phrases in the current book
nearest_positions = find_nearest_positions([results[book] for results in all_results])
logging.debug(f"Nearest positions in {book}: {nearest_positions}")
if nearest_positions:
middle_word_position = sum(nearest_positions) / len(nearest_positions)
logging.debug(f"Calculated middle word position in {book}: {middle_word_position}")
start_position = int(middle_word_position)
end_position = start_position + 1 if middle_word_position % 1 != 0 else start_position
logging.debug(f"Middle word position range in {book}: {start_position}-{end_position}")
middle_words_data = get_words_from_db(book, start_position, end_position)
logging.debug(f"Middle words data fetched from database: {middle_words_data}")
if middle_words_data:
# Store middle word data along with book name for sorting
middle_words_results.extend([(book, data) for data in middle_words_data])
else:
# Handle edge case: fetch words independently for start and end positions
logging.debug(f"No middle words found for range {start_position}-{end_position}. "
f"Fetching words independently.")
middle_words_data_start = get_words_from_db(book, start_position, start_position)
middle_words_data_end = get_words_from_db(book, end_position, end_position)
if middle_words_data_start or middle_words_data_end:
middle_words_results.extend([(book, data) for data in middle_words_data_start + middle_words_data_end])
# Sort middle words results by book order before displaying
middle_words_results.sort(key=lambda x: int(list(book_names.keys())[list(book_names.values()).index(x[0])]))
for book, (words, chapter, verse, phrase_length, word_position) in middle_words_results:
translation = get_translation(words) if show_translation else ""
link = f"https://www.biblegateway.com/passage/?search={quote_plus(book)}+{chapter}%3A{verse}&version=CJB"
results.append(f"""
<div class='result-item'>
<p><b>Book:</b> {book}</p>
<p><b>Chapter:</b> {chapter}, <b>Verse:</b> {verse}</p>
<p class='hebrew-phrase'><b>Hebrew Phrase:</b> {words}</p>
<p><b>Translation:</b> {translation}</p>
<p><b>Phrase Length:</b> {phrase_length} words</p>
<p><b>Word Position in the Tanach:</b> {word_position}</p>
<a href='{link}' target='_blank' class='bible-link'>[See on Bible Gateway]</a>
</div>
""")
results.append("</div>")
# Style modified to position search on top and results below
style = """
<style>
.results-container {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
width: 100%; /* Make results container take full width */
}
.result-item {
border: 1px solid #ccc;
padding: 15px;
border-radius: 5px;
box-shadow: 2px 2px 5px rgba(0, 0, 0, 0.1);
}
.hebrew-phrase {
font-family: 'SBL Hebrew', 'Ezra SIL', serif;
direction: rtl;
}
.bible-link {
display: block;
margin-top: 10px;
color: #007bff;
text-decoration: none;
}
</style>
"""
return style + "\n".join(results)
def find_nearest_positions(results_lists: List[List]) -> List[int]:
"""Finds the nearest word positions among multiple lists of results."""
nearest_positions = []
for i in range(len(results_lists)):
positions_i = [(int(pos.split('-')[0]), int(pos.split('-')[1])) for _, _, _, _, pos in results_lists[i]] # Get start and end positions
min_distance = float('inf')
nearest_pos_start = None
nearest_pos_end = None
for j in range(len(results_lists)):
if i == j:
continue
positions_j = [(int(pos.split('-')[0]), int(pos.split('-')[1])) for _, _, _, _, pos in results_lists[j]] # Get start and end positions
for pos_i_start, pos_i_end in positions_i:
for pos_j_start, pos_j_end in positions_j:
distance = abs((pos_i_start + pos_i_end) // 2 - (pos_j_start + pos_j_end) // 2)
if distance < min_distance:
min_distance = distance
nearest_pos_start = pos_i_start # Take the start position from the first list
nearest_pos_end = pos_i_end # Take the end position from the first list
if nearest_pos_start is not None and nearest_pos_end is not None:
nearest_positions.extend([nearest_pos_start, nearest_pos_end])
return nearest_positions
def get_words_from_db(book: str, start_position: int, end_position: int) -> List[Tuple]:
"""Fetches words from the database based on the book and exact word position range."""
global conn
logging.debug(f"Fetching words from database for {book} at positions {start_position}-{end_position}")
with sqlite3.connect(DATABASE_FILE) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT words, chapter, verse, phrase_length, word_position
FROM results
WHERE book = ? AND word_position = ?
""", (book, f"{start_position}-{end_position}")) # Directly compare word_position
results = cursor.fetchall()
logging.debug(f"Words fetched from database: {results}")
return results
def flatten_text(text: List) -> str:
"""Flattens nested lists into a single list."""
if isinstance(text, list):
return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text)
return text
def run_app() -> None:
"""Initializes and launches the Gradio app."""
global conn
initialize_database()
initialize_translator()
logging.info("Starting database population...")
for max_phrase_length in range(1, MAX_PHRASE_LENGTH_LIMIT + 1):
populate_database(1, 39, max_phrase_length=max_phrase_length)
logging.info("Database population complete.")
with gr.Blocks() as iface: # Use gr.Blocks() for layout control
with gr.Row(): # Place inputs in a row
textbox = gr.Textbox(label="Enter word(s) or numbers (one phrase per line)", lines=5)
slider = gr.Slider(label="Max Word Count in Result Phrases", minimum=1,
maximum=MAX_PHRASE_LENGTH_LIMIT, step=1,
value=1)
checkbox = gr.Checkbox(label="Show Translation", value=True)
with gr.Row(): # Place buttons in a row
clear_button = gr.Button("Clear")
submit_button = gr.Button("Submit", variant="primary")
html_output = gr.HTML(label="Results") # Output for the results
submit_button.click(fn=gematria_search_interface,
inputs=[textbox, slider, checkbox],
outputs=html_output)
clear_button.click(fn=lambda: "", inputs=None, outputs=html_output) # Clear the output
iface.launch()
if __name__ == "__main__":
run_app()