Spaces:
Runtime error
Runtime error
| import json | |
| import logging | |
| import datetime | |
| import time | |
| import requests | |
| import pytz | |
| import unittest | |
| import gradio as gr | |
| from utils import process_json_files, flatten_text_with_line_breaks, calculate_tanach_statistics, build_word_index | |
| import logging | |
| from deep_translator import GoogleTranslator | |
| from deep_translator.exceptions import NotValidLength, RequestError | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Load Tanach text | |
| TANACH_DATA = process_json_files(1, 39) | |
| WORD_INDEX = build_word_index(TANACH_DATA) | |
| # --- Utility Functions --- | |
| def get_current_word_data(client_time_str): | |
| """Gets data about the current word based on the client's time.""" | |
| try: | |
| client_time = datetime.datetime.strptime(client_time_str, "%H:%M:%S") | |
| total_seconds = int(client_time.strftime("%H")) * 3600 + \ | |
| int(client_time.strftime("%M")) * 60 + \ | |
| int(client_time.strftime("%S")) | |
| # Find the closest key in WORD_INDEX | |
| word_position = min(WORD_INDEX.keys(), key=lambda k: abs(k - total_seconds)) | |
| return WORD_INDEX[word_position], word_position | |
| except Exception as e: | |
| logging.error(f"Error processing client time: {e}") | |
| return None, None | |
| def get_formatted_verse(book_id, chapter_id, verse_id, highlight_word=True): | |
| """Returns a formatted verse with optional word highlighting.""" | |
| chapter_text = TANACH_DATA[book_id]["text"][chapter_id] | |
| flattened_chapter = flatten_text_with_line_breaks(chapter_text) | |
| # Highlight the word *before* joining with <br> | |
| if highlight_word and 0 <= verse_id - 1 < len(flattened_chapter): | |
| flattened_chapter[verse_id - 1] = \ | |
| f"<span class='highlight'>{flattened_chapter[verse_id - 1]}</span>" | |
| return '<br>'.join(flattened_chapter) | |
| def translate_verse(hebrew_verse, highlight_word=True): | |
| """Translates a Hebrew verse to English, splitting into chunks if necessary.""" | |
| try: | |
| translator = GoogleTranslator(source='iw', target='en') | |
| max_length = 2000 # Slightly below the limit to be safe | |
| translated_text = "" | |
| # Split the verse into chunks smaller than the max length | |
| chunks = [hebrew_verse[i:i + max_length] for i in range(0, len(hebrew_verse), max_length)] | |
| for chunk_index, chunk in enumerate(chunks): | |
| # Translate the current chunk | |
| translated_chunk = translator.translate(chunk) | |
| # If it's not the first chunk, find the last line break and start from there | |
| if chunk_index > 0: | |
| last_line_break = translated_chunk.rfind('<br>', 0, 100) # Find last <br> in first 100 chars | |
| if last_line_break != -1: | |
| translated_text += translated_chunk[last_line_break + 4:] # Add from after <br> | |
| else: | |
| translated_text += translated_chunk | |
| else: | |
| translated_text += translated_chunk | |
| return translated_text | |
| except RequestError as e: | |
| logging.warning(f"Translation failed: Request Error - {e}") | |
| return "Translation unavailable: Request Error" | |
| def get_client_time_from_ip(ip_address): | |
| """Attempts to get client time using IP address and API.""" | |
| try: | |
| api_url = f"http://ip-api.com/json/{ip_address}" | |
| response = requests.get(api_url) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| data = response.json() | |
| timezone = data.get("timezone") | |
| if timezone: | |
| return timezone # Return timezone only | |
| except requests.exceptions.RequestException as e: | |
| logging.warning(f"Error fetching time from IP: {e}") | |
| return None | |
| # --- Gradio Interface --- | |
| def update_tanach_display(client_time_str, timezone): | |
| """Updates the Gradio interface with client time, verse info, and translations.""" | |
| try: | |
| # Get timezone offset using pytz | |
| tz = pytz.timezone(timezone) | |
| offset = tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600 # Offset in hours | |
| # Adjust client time based on the timezone | |
| client_time_obj = datetime.datetime.strptime(client_time_str, "%H:%M:%S") | |
| client_time_obj = client_time_obj.replace(tzinfo=datetime.timezone(datetime.timedelta(hours=offset))) | |
| client_time_str = client_time_obj.strftime("%H:%M:%S") | |
| except Exception as e: | |
| logging.error(f"Error adjusting client time based on timezone: {e}") | |
| return "Error: Invalid Timezone", "", "" | |
| word_data, word_position = get_current_word_data(client_time_str) | |
| if word_data is None: | |
| logging.error(f"Word position {word_position} not found in index.") | |
| return "Error: Word not found", "", "" | |
| book_id = word_data["book_id"] | |
| chapter_id = word_data["chapter_id"] | |
| verse_id = word_data["verse_id"] | |
| logging.debug(f"Book ID: {book_id}, Chapter ID: {chapter_id}, Verse ID: {verse_id}") | |
| # Format verse information | |
| verse_info = f""" | |
| **{TANACH_DATA[book_id]['title']}** | |
| Chapter {chapter_id + 1}, Verse {verse_id} | |
| """ | |
| # Get and format Hebrew and English verses | |
| hebrew_verse = get_formatted_verse(book_id, chapter_id, verse_id) | |
| english_verse = translate_verse('\n'.join(hebrew_verse.split('<br>')), highlight_word=False) | |
| return verse_info, hebrew_verse, english_verse | |
| def auto_advance(client_time_str, timezone): | |
| """Automatically advances the text based on the client's time and a fixed interval.""" | |
| while True: | |
| current_time = datetime.datetime.now().strftime("%H:%M:%S") | |
| verse_info, hebrew_verse, english_verse = update_tanach_display(current_time, timezone) | |
| yield verse_info, hebrew_verse, english_verse | |
| time.sleep(1) # Update every second | |
| # --- Fetching User's IP --- | |
| def fetch_user_ip(): | |
| """Fetches the user's IP address using a public API.""" | |
| try: | |
| response = requests.get('https://api.ipify.org?format=json') | |
| response.raise_for_status() | |
| data = response.json() | |
| return data.get('ip') | |
| except requests.exceptions.RequestException as e: | |
| logging.warning(f"Error fetching user's IP: {e}") | |
| return None | |
| # --- Gradio Interface --- | |
| with gr.Blocks(css=""" | |
| .container { | |
| display: flex; | |
| flex-direction: column; | |
| align-items: center; | |
| font-family: 'Times New Roman', serif; | |
| } | |
| /* Add this highlight class styling */ | |
| .highlight { | |
| background-color: #FFFF00; /* Yellow highlight */ | |
| padding: 2px 5px; | |
| border-radius: 5px; | |
| } | |
| #verse-info { | |
| margin-bottom: 20px; | |
| text-align: center; | |
| } | |
| #verses { | |
| display: flex; | |
| flex-direction: row; | |
| justify-content: center; | |
| align-items: flex-start; | |
| gap: 50px; | |
| } | |
| #hebrew-verse { | |
| font-size: 18px; | |
| line-height: 1.5; | |
| margin-bottom: 20px; | |
| text-align: right; | |
| direction: rtl; | |
| } | |
| #english-verse { | |
| font-size: 18px; | |
| line-height: 1.5; | |
| margin-bottom: 20px; | |
| } | |
| """) as iface: | |
| with gr.Row(): | |
| client_ip_input = gr.Textbox(label="Enter your IP address (optional)", value="") | |
| timezone_input = gr.Textbox(label="Timezone", value="", interactive=False) # Added timezone input | |
| with gr.Row(): | |
| verse_info_output = gr.Markdown(label="Verse Information", elem_id="verse-info") | |
| # Place Hebrew and English verses within a flex container | |
| with gr.Row(elem_id="verses"): | |
| hebrew_verse_output = gr.HTML(label="Hebrew Verse", elem_id="hebrew-verse") | |
| english_verse_output = gr.HTML(label="English Translation", elem_id="english-verse") | |
| # Fetch user's IP and get timezone | |
| gr.Button("Fetch IP and Timezone").click( | |
| fn=fetch_user_ip, | |
| inputs=[], | |
| outputs=[client_ip_input], | |
| queue=False, | |
| ) | |
| client_ip_input.change( | |
| fn=get_client_time_from_ip, | |
| inputs=[client_ip_input], | |
| outputs=[timezone_input], | |
| queue=False, | |
| ) | |
| # Update the display with verse information and translations | |
| client_ip_input.submit( | |
| fn=update_tanach_display, | |
| inputs=[client_ip_input, timezone_input], | |
| outputs=[verse_info_output, hebrew_verse_output, english_verse_output], | |
| queue=False | |
| ) | |
| # Start automatic advancement | |
| gr.Button("Update Position").click( | |
| fn=auto_advance, | |
| inputs=[client_ip_input, timezone_input], | |
| outputs=[verse_info_output, hebrew_verse_output, english_verse_output], | |
| queue=False | |
| ) | |
| class TestWordIndex(unittest.TestCase): | |
| def test_word_index_boundaries(self): | |
| # Test for 0:00:00 | |
| word_data_start, _ = get_current_word_data("00:00:00") | |
| self.assertEqual(word_data_start["book_id"], 1) | |
| self.assertEqual(word_data_start["chapter_id"], 0) | |
| self.assertEqual(word_data_start["verse_id"], 1) | |
| # Test for 23:59:59 | |
| word_data_end, _ = get_current_word_data("23:59:59") | |
| self.assertEqual(word_data_end["book_id"], 39) | |
| self.assertEqual(word_data_end["chapter_id"], 35) | |
| self.assertEqual(word_data_end["verse_id"], 23) | |
| if __name__ == '__main__': | |
| # Run tests first | |
| suite = unittest.TestLoader().loadTestsFromTestCase(TestWordIndex) | |
| unittest.TextTestRunner().run(suite) | |
| iface.launch(share=True) | |