File size: 4,831 Bytes
7c88c71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import logging
import os
from datetime import datetime, timedelta
import gradio as gr
import json
from log_reader import RemoteLogReader

# logging.basicConfig(level=logging.INFO)
# log = logging.getLogger(__name__)

def get_file_data(content: str) -> tuple[str, bool]:
    """Read file content and return IP and vote condition status"""
    try:
        lines = [line.strip() for line in content.split('\n') if line.strip()]
        if not lines:
            return None, False
        
        # Get IP from first line
        try:
            first_line_data = json.loads(lines[0])
            ip = first_line_data.get('ip')
        except json.JSONDecodeError:
            ip = None
        
        # Check vote conditions from last line
        try:
            last_line_data = json.loads(lines[-1])
            feedback = last_line_data.get('feedback')
            vote_conditions_met = (last_line_data.get('type') == 'vote' and 
                                isinstance(feedback, dict) and 
                                len(feedback) == 6)
        except json.JSONDecodeError:
            vote_conditions_met = False
            
        return ip, vote_conditions_met
    except Exception as e:
        logging.error(f"Error processing file content: {e}")
        return None, False

def search_battle_anony(date_str: str, search_query: str) -> list[list]:
    """Search battle_anony conversations for a specific date and query."""
    results = []
    
    try:
        # Initialize RemoteLogReader
        reader = RemoteLogReader()
        
        # Get conversation logs for battle_anony mode
        conv_logs = reader.get_conv_logs(date_str)
        battle_anony_logs = conv_logs.get('battle_anony', {})
        
        # Process each conversation
        for conv_id, logs in battle_anony_logs.items():
            found_query = False
            ip = None
            
            # Convert messages to file content format for validation check
            content = '\n'.join(json.dumps(msg) for msg in logs)
            ip, is_valid = get_file_data(content)
            
            if not ip:
                continue
                
            # Search through messages for the query
            for log in logs:
                if "state" in log and "messages" in log["state"]:
                    messages = log["state"]["messages"]
                    
                    # Search through each message
                    for _, message in messages:
                        if search_query in message:
                            found_query = True
                            break
                
                if found_query:
                    break
            
            if found_query:
                # Convert to list format instead of dict
                results.append([
                    ip,
                    conv_id,
                    'Yes' if is_valid else 'No',
                    'Yes'
                ])
    
    except Exception as e:
        logging.error(f"Error searching logs for date {date_str}: {e}")
    
    return results

def create_ui():
    def process_search(date, query):
        if not query.strip():
            return []
            
        # Convert timestamp to datetime object
        date_obj = datetime.fromtimestamp(date)
        # Convert date to required format (YYYY_MM_DD)
        date_str = date_obj.strftime("%Y_%m_%d")
        results = search_battle_anony(date_str, query)
        
        if not results:
            # Return empty list with correct structure for DataFrame
            return []
        return results
    
    with gr.Blocks(title="Battle Search") as app:
        gr.Markdown("# Battle Search")
        gr.Markdown("Search for specific content in battle_anony conversations by date and view IP addresses with validation status.")
        
        with gr.Row():
            date_input = gr.DateTime(
                label="Select Date",
                include_time=False,
            )
            
            query_input = gr.Textbox(
                label="Search Query",
                placeholder="Enter text to search for in conversations...",
                lines=1
            )
        
        with gr.Row():
            search_button = gr.Button("Search")
        
        with gr.Row():
            table_output = gr.DataFrame(
                headers=['IP', 'Conversation ID', 'Is Valid', 'Contains Query'],
                label="Results Table",
                interactive=False,
                value=[]  # Initialize with empty list
            )
        
        search_button.click(
            fn=process_search,
            inputs=[date_input, query_input],
            outputs=[table_output]
        )
    
    return app

if __name__ == "__main__":
    app = create_ui()
    app.launch()