import logging import os from datetime import datetime, timedelta import gradio as gr import json from log_reader import RemoteLogReader # logging.basicConfig(level=logging.INFO) # log = logging.getLogger(__name__) def get_file_data(content: str) -> tuple[str, str, bool]: """Read file content and return IP, username, and vote condition status""" try: lines = [line.strip() for line in content.split('\n') if line.strip()] if not lines: return None, "", False # Get IP from first line try: first_line_data = json.loads(lines[0]) ip = first_line_data.get('ip') except json.JSONDecodeError: ip = None # Find the vote line (if any) username = "" vote_conditions_met = False vote_line_index = -1 # Search for the vote line for i, line in enumerate(lines): try: line_data = json.loads(line) if line_data.get('type') == 'vote': vote_line_index = i break except json.JSONDecodeError: continue # If we found a vote line, check conditions and get username if vote_line_index >= 0: try: vote_line_data = json.loads(lines[vote_line_index]) # Only try to get username if the key exists if 'username' in vote_line_data: username = vote_line_data.get('username') feedback = vote_line_data.get('feedback') # Check vote conditions: type is vote, feedback has 6 items, and at least 4 lines (2 rounds of chat) # Only count lines up to and including the vote line relevant_lines = lines[:vote_line_index + 1] vote_conditions_met = ( isinstance(feedback, dict) and len([1 for v in feedback.values() if v]) == 6 and len(relevant_lines) >= 4 ) except (json.JSONDecodeError, TypeError): pass return ip, username, vote_conditions_met except Exception as e: logging.error(f"Error processing file content: {e}") return None, "", False def search_battle_anony(date_str: str, search_query: str) -> list[list]: """Search battle_anony conversations for a specific date and query.""" results = [] try: # Initialize RemoteLogReader reader = RemoteLogReader() # Get conversation logs for battle_anony mode conv_logs = reader.get_conv_logs(date_str) battle_anony_logs = conv_logs.get('battle_anony', {}) # Process each conversation for conv_id, logs in battle_anony_logs.items(): found_query = False ip = None username = "" # Convert messages to file content format for validation check content = '\n'.join(json.dumps(msg) for msg in logs) ip, username, is_valid = get_file_data(content) if not ip: continue # Search through messages for the query for log in logs: if "state" in log and "messages" in log["state"]: messages = log["state"]["messages"] # Search through each message for _, message in messages: if search_query in message: found_query = True break if found_query: break if found_query: # Convert to list format instead of dict results.append([ ip, username, # Add username column conv_id, 'Yes' if is_valid else 'No', 'Yes' ]) except Exception as e: logging.error(f"Error searching logs for date {date_str}: {e}") return results def create_ui(): def process_search(month, day, query): if not query.strip(): return [] # Create date string in YYYY_MM_DD format date_str = f"2025_{month}_{day}" results = search_battle_anony(date_str, query) if not results: return [] return results with gr.Blocks(title="Battle Search") as app: gr.Markdown("# Battle Search") gr.Markdown("Search for specific content in battle_anony conversations by date and view IP addresses with validation status.") with gr.Row(): with gr.Column(): current_month = datetime.now().strftime("%m") current_day = datetime.now().strftime("%d") month_input = gr.Dropdown( choices=[f"{i:02d}" for i in range(1, 13)], label="Month", value=current_month ) day_input = gr.Dropdown( choices=[f"{i:02d}" for i in range(1, 32)], label="Day", value=current_day ) query_input = gr.Textbox( label="Search Query", placeholder="Enter text to search for in conversations...", lines=1 ) with gr.Row(): search_button = gr.Button("Search") with gr.Row(): table_output = gr.DataFrame( headers=['IP', 'Username', 'Conversation ID', 'Is Valid', 'Contains Query'], label="Results Table", interactive=False, value=[] # Initialize with empty list ) search_button.click( fn=process_search, inputs=[month_input, day_input, query_input], outputs=[table_output] ) return app if __name__ == "__main__": app = create_ui() app.launch()