Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import re | |
| import asyncio | |
| import aiosqlite | |
| import logging | |
| from collections import defaultdict | |
| from util import process_json_files | |
| from gematria import calculate_gematria | |
| from deep_translator import GoogleTranslator, exceptions | |
| from urllib.parse import quote_plus | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Global variables | |
| conn = None # Database connection (will be initialized asynchronously) | |
| translator = None | |
| book_names = {} # Dictionary to store book names | |
| ongoing_search_task = None # Track ongoing search tasks | |
| def flatten_text(text): | |
| """Helper function to flatten nested lists into a single list.""" | |
| if isinstance(text, list): | |
| return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text) | |
| return text | |
| async def initialize_database(): | |
| """Initializes the SQLite database asynchronously.""" | |
| global conn | |
| conn = await aiosqlite.connect('gematria.db') | |
| async with conn: | |
| c = await conn.cursor() | |
| await c.execute(''' | |
| CREATE TABLE IF NOT EXISTS results ( | |
| gematria_sum INTEGER, | |
| words TEXT UNIQUE, | |
| translation TEXT, | |
| book INTEGER, | |
| chapter INTEGER, | |
| verse INTEGER, | |
| PRIMARY KEY (words, book, chapter, verse) | |
| ) | |
| ''') | |
| await c.execute(''' | |
| CREATE TABLE IF NOT EXISTS processed_books ( | |
| book INTEGER PRIMARY KEY, | |
| max_phrase_length INTEGER | |
| ) | |
| ''') | |
| await conn.commit() | |
| logging.info("Database initialized.") | |
| def initialize_translator(): | |
| """Initializes the Google Translator.""" | |
| global translator | |
| translator = GoogleTranslator(source='iw', target='en') | |
| logging.info("Translator initialized.") | |
| async def insert_phrase_to_db(conn, gematria_sum, phrase_candidate, book, chapter, verse): | |
| """Inserts a phrase and its Gematria value into the database.""" | |
| async with conn: | |
| c = await conn.cursor() | |
| try: | |
| await c.execute(''' | |
| INSERT INTO results (gematria_sum, words, book, chapter, verse) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', (gematria_sum, phrase_candidate, book, chapter, verse)) | |
| await conn.commit() | |
| logging.debug(f"Inserted phrase: {phrase_candidate} (Gematria: {gematria_sum}) at {book}:{chapter}:{verse}") | |
| except aiosqlite.IntegrityError: | |
| logging.debug(f"Phrase already exists: {phrase_candidate} (Gematria: {gematria_sum}) at {book}:{chapter}:{verse}") | |
| async def populate_database_async(conn, tanach_texts, max_phrase_length=1): | |
| """Asynchronous version of populate_database using aiosqlite.""" | |
| global book_names | |
| logging.info("Populating database...") | |
| async with conn: | |
| c = await conn.cursor() | |
| for book_id, text in tanach_texts: | |
| await c.execute('''SELECT max_phrase_length FROM processed_books WHERE book = ?''', (book_id,)) | |
| result = await c.fetchone() | |
| if result and result[0] >= max_phrase_length: | |
| logging.info(f"Skipping book {book_id}: Already processed with max_phrase_length {result[0]}") | |
| continue | |
| logging.info(f"Processing book {book_id} with max_phrase_length {max_phrase_length}") | |
| if 'text' not in text or not isinstance(text['text'], list): | |
| logging.warning(f"Skipping book {book_id} due to missing or invalid 'text' field.") | |
| continue | |
| title = text.get('title', 'Unknown') | |
| book_names[book_id] = title | |
| chapters = text['text'] | |
| for chapter_id, chapter in enumerate(chapters): | |
| if not isinstance(chapter, list): | |
| logging.warning(f"Skipping chapter {chapter_id} in book {title} due to invalid format.") | |
| continue | |
| for verse_id, verse in enumerate(chapter): | |
| verse_text = flatten_text(verse) | |
| verse_text = re.sub(r"[^\u05D0-\u05EA ]+", "", verse_text) | |
| verse_text = re.sub(r" +", " ", verse_text) | |
| words = verse_text.split() | |
| for length in range(1, max_phrase_length + 1): | |
| for start in range(len(words) - length + 1): | |
| phrase_candidate = " ".join(words[start:start + length]) | |
| gematria_sum = calculate_gematria(phrase_candidate.replace(" ", "")) | |
| await insert_phrase_to_db(conn, gematria_sum, phrase_candidate, book_id, chapter_id + 1, verse_id + 1) | |
| try: | |
| await c.execute('''INSERT INTO processed_books (book, max_phrase_length) VALUES (?, ?)''', (book_id, max_phrase_length)) | |
| except aiosqlite.IntegrityError: | |
| await c.execute('''UPDATE processed_books SET max_phrase_length = ? WHERE book = ?''', (max_phrase_length, book_id)) | |
| await conn.commit() | |
| logging.info("Database population complete.") | |
| async def get_translation(phrase): | |
| """Retrieves or generates the English translation of a Hebrew phrase.""" | |
| global translator, conn | |
| async with conn: | |
| c = await conn.cursor() | |
| await c.execute(''' | |
| SELECT translation FROM results | |
| WHERE words = ? | |
| ''', (phrase,)) | |
| result = await c.fetchone() | |
| if result and result[0]: | |
| return result[0] | |
| else: | |
| translation = translate_and_store(phrase) | |
| await c.execute(''' | |
| UPDATE results | |
| SET translation = ? | |
| WHERE words = ? | |
| ''', (translation, phrase)) | |
| await conn.commit() | |
| return translation | |
| def translate_and_store(phrase): | |
| global translator | |
| max_retries = 3 | |
| retries = 0 | |
| while retries < max_retries: | |
| try: | |
| translation = translator.translate(phrase) | |
| logging.debug(f"Translated phrase: {translation}") | |
| return translation | |
| except (exceptions.TranslationNotFound, exceptions.NotValidPayload, | |
| exceptions.ServerException, exceptions.RequestError, requests.exceptions.ConnectionError) as e: | |
| retries += 1 | |
| logging.warning(f"Error translating phrase '{phrase}': {e}. Retrying... ({retries}/{max_retries})") | |
| logging.error(f"Failed to translate phrase '{phrase}' after {max_retries} retries.") | |
| return "[Translation Error]" | |
| async def search_gematria_in_db(conn, gematria_sum): | |
| """Searches the database for phrases with a given Gematria value.""" | |
| async with conn: | |
| c = await conn.cursor() | |
| await c.execute(''' | |
| SELECT words, book, chapter, verse FROM results WHERE gematria_sum = ? | |
| ''', (gematria_sum,)) | |
| results = await c.fetchall() | |
| logging.debug(f"Found {len(results)} matching phrases for Gematria: {gematria_sum}") | |
| return results | |
| async def gematria_search_interface(phrase, request: gr.Request): | |
| """The main function for the Gradio interface.""" | |
| global ongoing_search_task, conn, book_names | |
| if not phrase.strip(): | |
| return "Please enter a phrase." | |
| # Cancel any ongoing search task | |
| if ongoing_search_task is not None and not ongoing_search_task.done(): | |
| ongoing_search_task.cancel() | |
| # Start the search asynchronously | |
| async def search_task(): | |
| matching_phrases = await search_gematria_in_db(conn, calculate_gematria(phrase.replace(" ", ""))) | |
| if not matching_phrases: | |
| return "No matching phrases found." | |
| # Sort and group results | |
| sorted_phrases = sorted(matching_phrases, key=lambda x: (x[1], x[2], x[3])) | |
| results_by_book = defaultdict(list) | |
| for words, book, chapter, verse in sorted_phrases: | |
| results_by_book[book].append((words, chapter, verse)) | |
| # Format results for display | |
| results = [] | |
| results.append("<div class='results-container'>") | |
| for book, phrases in results_by_book.items(): | |
| results.append(f"<h4>Book: {book_names.get(book, 'Unknown')}</h4>") | |
| for words, chapter, verse in phrases: | |
| translation = await get_translation(words) # Await the translation here | |
| book_name_english = book_names.get(book, 'Unknown') | |
| link = f"https://www.biblegateway.com/passage/?search={quote_plus(book_name_english)}+{chapter}%3A{verse}" | |
| results.append(f""" | |
| <div class='result-item'> | |
| <p>Chapter: {chapter}, Verse: {verse}</p> | |
| <p class='hebrew-phrase'>Hebrew Phrase: {words}</p> | |
| <p>Translation: {translation}</p> | |
| <a href='{link}' target='_blank' class='bible-link'>[See on Bible Gateway]</a> | |
| </div> | |
| """) | |
| results.append("</div>") | |
| # Add CSS styling | |
| style = """ | |
| <style> | |
| .results-container { | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); | |
| gap: 20px; | |
| } | |
| .result-item { | |
| border: 1px solid #ccc; | |
| padding: 15px; | |
| border-radius: 5px; | |
| box-shadow: 2px 2px 5px rgba(0, 0, 0, 0.1); | |
| } | |
| .hebrew-phrase { | |
| font-family: 'SBL Hebrew', 'Ezra SIL', serif; | |
| direction: rtl; | |
| } | |
| .bible-link { | |
| display: block; | |
| margin-top: 10px; | |
| color: #007bff; | |
| text-decoration: none; | |
| } | |
| </style> | |
| """ | |
| return style + "\n".join(results) | |
| ongoing_search_task = request.app.get_blocks().queue.insert(fn=search_task, queue_id="gematria") | |
| result = request.app.get_blocks().queue.get_output(queue_id="gematria", job_hash=ongoing_search_task.job_hash) | |
| return result | |
| async def run_app(): | |
| """Initializes, populates the database, and launches the Gradio app.""" | |
| global conn | |
| await initialize_database() | |
| initialize_translator() | |
| # Move database population to a separate function | |
| async def populate_database(): | |
| tanach_texts_1_1_1 = process_json_files(1, 1) | |
| tanach_texts_1_39_1 = process_json_files(1, 39) | |
| tanach_texts_27_27_4 = process_json_files(27, 27) | |
| await populate_database_async(conn, tanach_texts_1_1_1, max_phrase_length=1) | |
| await populate_database_async(conn, tanach_texts_1_39_1, max_phrase_length=1) | |
| await populate_database_async(conn, tanach_texts_27_27_4, max_phrase_length=4) | |
| # Start database population in the background | |
| asyncio.create_task(populate_database()) | |
| # Create the main Gradio interface | |
| iface = gr.Interface( | |
| fn=gematria_search_interface, | |
| inputs=gr.Textbox(label="Enter phrase"), | |
| outputs=gr.HTML(label="Results"), | |
| title="Gematria Search in Tanach", | |
| description="Search for phrases in the Tanach that have the same Gematria value.", | |
| live=False, | |
| allow_flagging="never", | |
| concurrency_limit=3 | |
| ) | |
| iface.launch(max_threads=10) | |
| if __name__ == "__main__": | |
| asyncio.run(run_app()) |