import threading import time import gradio as gr import csv import re import os from datetime import datetime, timedelta from gtts import gTTS from queue import Queue # Global TTS queue tts_queue = Queue() # Global data storage data = None def load_training_data(): global data if data is None: data = {} try: with open("docs/your_data.csv", "r", encoding="utf-8-sig") as csvfile: reader = csv.DictReader(csvfile) for row in reader: plate_number = row.get("رقم المركبة", "").strip() company_name = row.get("اسم الشركه", "").strip() date = row.get("تاريخ الدخول", "").strip() # Store by plate number if plate_number: if plate_number not in data: data[plate_number] = [] data[plate_number].append(row) # Store by company name if company_name: if company_name not in data: data[company_name] = [] data[company_name].append(row) # Store by date only if it looks like a valid date if date and re.match(r'\d{1,2}\.\d{1,2}\.\d{4}', date): if date not in data: data[date] = [] data[date].append(row) except FileNotFoundError: print("Warning: Data file not found. Starting with empty dataset.") data = {} # Ensure data is initialized even if file is not found def parse_date(date_str): """Parse date with error handling""" try: return datetime.strptime(date_str, "%d.%m.%Y") except ValueError: # Return None for invalid date formats return None def get_week_range(date_str): """Get week range with error handling""" current_date = parse_date(date_str) if current_date is None: return None, None # Return None values if date parsing failed days_to_subtract = (current_date.weekday() + 2) % 7 start_of_week = current_date - timedelta(days=days_to_subtract) end_of_week = start_of_week + timedelta(days=6) return start_of_week, end_of_week def text_to_speech(text): """Convert text to speech with error handling""" if not text: return None try: # Create static directory if it doesn't exist os.makedirs("static", exist_ok=True) speech = gTTS(text=text, lang='ar', slow=False) filename = f"text_to_speech_{int(time.time())}.mp3" filepath = os.path.join("static", filename) speech.save(filepath) return filepath except Exception as e: print(f"Text-to-speech error: {str(e)}") return None def calculate_weekly_total(date_str): """Calculate weekly total with improved error handling""" load_training_data() # Check if date format is valid input_date = parse_date(date_str) if input_date is None: return "Invalid date format. Please enter a date in the format dd.mm.yyyy." start_of_week, end_of_week = get_week_range(date_str) if start_of_week is None or end_of_week is None: return "Invalid date format. Please enter a date in the format dd.mm.yyyy." weekly_total = 0 for date_key in data.keys(): # Try to parse the key as a date record_date = parse_date(date_key) # Skip keys that aren't valid dates or are out of range if record_date is None or not (start_of_week <= record_date <= end_of_week): continue # Process matching dates for record in data[date_key]: try: report = record.get("تقرير نهائي", "") if "شغل" in report: money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report) else: money_values = re.findall(r'(\d+)\s*شيكل', report) # Convert found values to integers with error handling money_values = [int(value) for value in money_values if value.isdigit()] weekly_total += sum(money_values) except (ValueError, TypeError): continue return weekly_total def calculate_weekly_cash_total(date_str): """Calculate weekly cash total with improved error handling""" load_training_data() # Check if date format is valid input_date = parse_date(date_str) if input_date is None: return "Invalid date format. Please enter a date in the format dd.mm.yyyy." start_of_week, end_of_week = get_week_range(date_str) if start_of_week is None or end_of_week is None: return "Invalid date format. Please enter a date in the format dd.mm.yyyy." weekly_cash_total = 0 for date_key in data.keys(): # Try to parse the key as a date record_date = parse_date(date_key) # Skip keys that aren't valid dates or are out of range if record_date is None or not (start_of_week <= record_date <= end_of_week): continue # Process matching dates for record in data[date_key]: try: plate_number = record.get("رقم المركبة", "") if "كاش" in plate_number: report = record.get("تقرير نهائي", "") money_values = re.findall(r'(\d+)\s*شيكل', report) money_values = [int(value) for value in money_values if value.isdigit()] weekly_cash_total += sum(money_values) except (ValueError, TypeError): continue return weekly_cash_total def search_partial_matches(input_text): """Search for partial matches with error checking""" load_training_data() if not input_text or not isinstance(input_text, str): return {} input_text = input_text.strip() if not input_text: return {} matching_records = {} for key in data.keys(): if isinstance(key, str) and input_text in key: matching_records[key] = data[key] return matching_records def calculate_total_for_period(start_date_str, end_date_str): """Calculate total for period with improved error handling""" load_training_data() # Check if date formats are valid start_date = parse_date(start_date_str) end_date = parse_date(end_date_str) if start_date is None or end_date is None: return "Invalid date format. Please enter dates in the format dd.mm.yyyy." total_amount = 0 for date_key in data.keys(): # Try to parse the key as a date record_date = parse_date(date_key) # Skip keys that aren't valid dates or are out of range if record_date is None or not (start_date <= record_date <= end_date): continue # Process matching dates for record in data[date_key]: try: report = record.get("تقرير نهائي", "") if "شغل" in report: money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report) else: money_values = re.findall(r'(\d+)\s*شيكل', report) # Convert found values to integers with error handling money_values = [int(value) for value in money_values if value.isdigit()] total_amount += sum(money_values) except (ValueError, TypeError): continue return total_amount def chatbot(input_text, start_date_str="", end_date_str="", enable_voice=False): """Main chatbot function with error handling""" # Handle date range query if provided if start_date_str and end_date_str: try: total_for_period = calculate_total_for_period(start_date_str, end_date_str) return (f"Total amount from {start_date_str} to {end_date_str}: {total_for_period} شيكل", "", "", None) except Exception as e: return (f"Error: {str(e)}", "", "", None) else: return original_chatbot(input_text, enable_voice) def original_chatbot(input_text, enable_voice): """Original chatbot implementation with improved error handling""" if not input_text or not isinstance(input_text, str): return ("Please enter a valid query.", "", "", None) load_training_data() matching_records = search_partial_matches(input_text) total_money = 0 filtered_records = {} # Filter records containing "شيكل" for key, records in matching_records.items(): if not records: continue filtered_records[key] = [info for info in records if "شيكل" in info.get("تقرير نهائي", "")] res_list = [] if filtered_records: responses = [] for key, records in filtered_records.items(): if not records: continue # Handle company name display if key == "رقم المركبة" and records: company_name = records[0].get("اسم الشركه", "") if company_name: res_list.append(f"اسم الشركة هو: {company_name}") # Process each record for info in records: try: # Format record details response = "\n".join([f"{k}: {v}" for k, v in info.items() if v]) responses.append(response) # Parse money values report = info.get("تقرير نهائي", "") if "شغل" in report: money_values = re.findall(r'شغل\s*(\d+)\s*شيكل', report) else: money_values = re.findall(r'(\d+)\s*شيكل', report) # Convert to integers with error handling money_values = [int(value) for value in money_values if value.isdigit()] total_money += sum(money_values) except (ValueError, TypeError): continue # Format results num_records_found = f"Number of records found: {len(responses)}" total_money_str = f"Total Money: {total_money} شيكل" combined_output = f"{num_records_found} - {total_money_str}" response = "\n\n---\n\n".join(responses) res_list.append(f"مجموع الدخل اليومي في هذا اليوم هو: {total_money}") else: combined_output = "No matching entries found in the data." response = "" # Calculate weekly totals try: weekly_total = calculate_weekly_total(input_text) res_list.append(f"مجموع الدخل الأسبوعي هو: {weekly_total}") except Exception as e: weekly_total = f"Error calculating weekly total: {str(e)}" res_list.append(f"Error: {str(e)}") try: weekly_cash_total = calculate_weekly_cash_total(input_text) res_list.append(f"مجموع الكاش المقبوض في هذا الاسبوع هو: {weekly_cash_total}") except Exception as e: weekly_cash_total = f"Error calculating weekly cash total: {str(e)}" res_list.append(f"Error: {str(e)}") # Generate audio if enabled audio_file = None if enable_voice and res_list: audio_file = text_to_speech("\n".join(res_list)) return (combined_output, response, f"Weekly Total: {weekly_total} - Weekly Cash Total: {weekly_cash_total}", audio_file) # Define the Gradio interface iface = gr.Interface( fn=chatbot, inputs=[ gr.Textbox(lines=2, placeholder="Enter Date or Company Name or Plate Number..."), gr.Textbox(lines=1, placeholder="بحث من تاريخ (dd.mm.yyyy)", label="بحث من تاريخ"), gr.Textbox(lines=1, placeholder="الى تاريخ (dd.mm.yyyy)", label="الى تاريخ"), gr.Checkbox(label="تفعيل الصوت") # Checkbox for enabling voice output ], outputs=[ gr.Textbox(label="مجموع الدخل اليومي"), gr.Textbox(label="عرض التقارير"), gr.Textbox(label="مجموع الدخل الاسبوعي"), gr.Audio(label="Play Voice") # Audio output for playing the voice ], live=False, title="شركه ابناء عرفات", description="بحث حسب اسم الشركه - التاريخ - نمره الشاحنه" ) if __name__ == "__main__": iface.launch(share=True)