Spaces:
Running
Running
import logging | |
import os | |
from datetime import datetime, timedelta | |
import gradio as gr | |
import json | |
from log_reader import RemoteLogReader | |
# logging.basicConfig(level=logging.INFO) | |
# log = logging.getLogger(__name__) | |
def get_file_data(content: str) -> tuple[str, bool]: | |
"""Read file content and return IP and vote condition status""" | |
try: | |
lines = [line.strip() for line in content.split('\n') if line.strip()] | |
if not lines: | |
return None, False | |
# Get IP from first line | |
try: | |
first_line_data = json.loads(lines[0]) | |
ip = first_line_data.get('ip') | |
except json.JSONDecodeError: | |
ip = None | |
# Check vote conditions from last line | |
try: | |
last_line_data = json.loads(lines[-1]) | |
feedback = last_line_data.get('feedback') | |
vote_conditions_met = (last_line_data.get('type') == 'vote' and | |
isinstance(feedback, dict) and | |
len(feedback) == 6) | |
except json.JSONDecodeError: | |
vote_conditions_met = False | |
return ip, vote_conditions_met | |
except Exception as e: | |
logging.error(f"Error processing file content: {e}") | |
return None, False | |
def search_battle_anony(date_str: str, search_query: str) -> list[list]: | |
"""Search battle_anony conversations for a specific date and query.""" | |
results = [] | |
try: | |
# Initialize RemoteLogReader | |
reader = RemoteLogReader() | |
# Get conversation logs for battle_anony mode | |
conv_logs = reader.get_conv_logs(date_str) | |
battle_anony_logs = conv_logs.get('battle_anony', {}) | |
# Process each conversation | |
for conv_id, logs in battle_anony_logs.items(): | |
found_query = False | |
ip = None | |
# Convert messages to file content format for validation check | |
content = '\n'.join(json.dumps(msg) for msg in logs) | |
ip, is_valid = get_file_data(content) | |
if not ip: | |
continue | |
# Search through messages for the query | |
for log in logs: | |
if "state" in log and "messages" in log["state"]: | |
messages = log["state"]["messages"] | |
# Search through each message | |
for _, message in messages: | |
if search_query in message: | |
found_query = True | |
break | |
if found_query: | |
break | |
if found_query: | |
# Convert to list format instead of dict | |
results.append([ | |
ip, | |
conv_id, | |
'Yes' if is_valid else 'No', | |
'Yes' | |
]) | |
except Exception as e: | |
logging.error(f"Error searching logs for date {date_str}: {e}") | |
return results | |
def create_ui(): | |
def process_search(date, query): | |
if not query.strip(): | |
return [] | |
# Convert timestamp to datetime object | |
date_obj = datetime.fromtimestamp(date) | |
# Convert date to required format (YYYY_MM_DD) | |
date_str = date_obj.strftime("%Y_%m_%d") | |
results = search_battle_anony(date_str, query) | |
if not results: | |
# Return empty list with correct structure for DataFrame | |
return [] | |
return results | |
with gr.Blocks(title="Battle Search") as app: | |
gr.Markdown("# Battle Search") | |
gr.Markdown("Search for specific content in battle_anony conversations by date and view IP addresses with validation status.") | |
with gr.Row(): | |
date_input = gr.DateTime( | |
label="Select Date", | |
include_time=False, | |
) | |
query_input = gr.Textbox( | |
label="Search Query", | |
placeholder="Enter text to search for in conversations...", | |
lines=1 | |
) | |
with gr.Row(): | |
search_button = gr.Button("Search") | |
with gr.Row(): | |
table_output = gr.DataFrame( | |
headers=['IP', 'Conversation ID', 'Is Valid', 'Contains Query'], | |
label="Results Table", | |
interactive=False, | |
value=[] # Initialize with empty list | |
) | |
search_button.click( | |
fn=process_search, | |
inputs=[date_input, query_input], | |
outputs=[table_output] | |
) | |
return app | |
if __name__ == "__main__": | |
app = create_ui() | |
app.launch() |