from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session import json import random import os import string import logging from datetime import datetime import os from huggingface_hub import login # Use the Hugging Face token from environment variables hf_token = os.environ.get("HF_TOKEN") if hf_token: login(token=hf_token) else: logger.error("HF_TOKEN not found in environment variables") # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ]) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key # Directories for visualizations VISUALIZATION_DIRS_PLAN_OF_SQLS = { "TP": "htmls_POS/TP", "TN": "htmls_POS/TN", "FP": "htmls_POS/FP", "FN": "htmls_POS/FN" } VISUALIZATION_DIRS_CHAIN_OF_TABLE = { "TP": "htmls_COT/TP", "TN": "htmls_COT/TN", "FP": "htmls_COT/FP", "FN": "htmls_COT/FN" } VISUALIZATION_DIRS_NO_XAI = { "TP": "htmls_NO_XAI/TP", "TN": "htmls_NO_XAI/TN", "FP": "htmls_NO_XAI/FP", "FN": "htmls_NO_XAI/FN" } VISUALIZATION_DIRS_DATER = { "TP": "htmls_DATER/TP", "TN": "htmls_DATER/TN", "FP": "htmls_DATER/FP", "FN": "htmls_DATER/FN" } import json import os from datetime import datetime from huggingface_hub import HfApi def save_session_data(username, data): try: # Extract seed and start_time from the data seed = data.get('seed', 'unknown') start_time = data.get('start_time', datetime.now().isoformat()) # Create a filename with username, seed, and start_time file_name = f'{username}_seed{seed}_{start_time}_session.json' # Remove any characters that might not be safe for filenames file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.']) # Convert data to JSON string json_data = json.dumps(data, indent=4) # Create a temporary file temp_file_path = f"/tmp/{file_name}" with open(temp_file_path, 'w') as f: f.write(json_data) # Upload the file to the Hugging Face Space, overwriting if it exists api = HfApi() api.upload_file( path_or_fileobj=temp_file_path, path_in_repo=f"session_data_pref/{file_name}", repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", ) # Remove the temporary file os.remove(temp_file_path) logger.info(f"Session data saved for user {username} with seed {seed} and start time {start_time} in Hugging Face Data Space") except Exception as e: logger.exception(f"Error saving session data for user {username}: {e}") from huggingface_hub import hf_hub_download, HfApi def load_session_data(username): try: # List all files in the repo api = HfApi() files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space") # Filter files for the user user_files = [f for f in files if f.startswith(f'session_data_pref/{username}_')] if not user_files: logger.warning(f"No session data found for user {username}") return None # Get the most recent file latest_file = sorted(user_files, reverse=True)[0] # Download the file from the data storage Space file_path = hf_hub_download(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", filename=latest_file) with open(file_path, 'r') as f: data = json.load(f) logger.info(f"Session data loaded for user {username} from Hugging Face Data Space") return data except Exception as e: logger.exception(f"Error loading session data for user {username}: {e}") return None def load_samples(methods): logger.info(f"Loading samples for methods: {methods}") samples = set() # Use a set to avoid duplicates categories = ["TP", "TN", "FP", "FN"] method_dirs = [get_method_dir(method) for method in methods] for category in categories: dir_a = f'htmls_{method_dirs[0].upper()}/{category}' dir_b = f'htmls_{method_dirs[1].upper()}/{category}' files_a = set(os.listdir(dir_a)) files_b = set(os.listdir(dir_b)) matching_files = files_a & files_b for file in matching_files: samples.add((category, file)) # Convert set of tuples back to list of dictionaries samples = [{'category': category, 'file': file} for category, file in samples] logger.info(f"Loaded {len(samples)} unique samples across all categories") return samples def select_balanced_samples(samples): try: # Ensure we have at least 10 unique samples unique_samples = list({(s['category'], s['file']) for s in samples}) if len(unique_samples) < 10: logger.warning(f"Not enough unique samples. Only {len(unique_samples)} available.") selected_samples = unique_samples else: selected_samples = random.sample(unique_samples, 10) logger.info(f"Unique sampled samples:\n{selected_samples}") # Convert back to dictionary format selected_samples = [{'category': category, 'file': file} for category, file in selected_samples] logger.info(f"Selected {len(selected_samples)} unique samples") return selected_samples except Exception as e: logger.exception("Error selecting balanced samples") return [] def generate_random_string(length=8): return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) @app.route('/', methods=['GET', 'POST']) def index(): logger.info("Rendering index page.") if request.method == 'POST': username = request.form.get('username') seed = request.form.get('seed') methods = request.form.get('method').split(',') if not username or not seed or len(methods) != 2: logger.error("Missing username, seed, or incorrect number of methods.") return "Please fill in all fields and select exactly two methods.", 400 try: seed_int = int(seed) # Convert to int for random.seed() random.seed(seed_int) all_samples = load_samples(methods) selected_samples = select_balanced_samples(all_samples) logger.info(f"Number of selected samples: {len(selected_samples)}") if len(selected_samples) == 0: logger.error("No samples were selected.") return "No samples were selected", 500 start_time = datetime.now().isoformat() session_data = { 'username': username, 'seed': seed, # Store as string 'methods': methods, 'selected_samples': selected_samples, 'current_index': 0, 'responses': [], 'start_time': start_time } save_session_data(username, session_data) logger.info(f"Session data initialized for user: {username}") return redirect(url_for('experiment', username=username)) except Exception as e: logger.exception(f"Error in index route: {e}") return "An error occurred", 500 return render_template('index.html') @app.route('/experiment/', methods=['GET', 'POST']) def experiment(username): try: session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) logger.info(f"Session data:\n{session_data}") selected_samples = session_data['selected_samples'] methods = session_data['methods'] current_index = session_data['current_index'] logger.info(f"current_index:\n{current_index}") if current_index >= len(selected_samples): return redirect(url_for('completed', username=username)) sample = selected_samples[current_index] logger.info(f"sample:\n{sample}") method_a, method_b = methods # Find matching files for both methods method_a_dir = get_method_dir(method_a) method_b_dir = get_method_dir(method_b) # for category in ['TP', 'TN', 'FP', 'FN']: category = sample['category'] dir_a = f'htmls_{method_a_dir.upper()}/{category}' dir_b = f'htmls_{method_b_dir.upper()}/{category}' file_a = os.path.join(dir_a, sample['file']) file_b = os.path.join(dir_b, sample['file']) logger.info(f"file_a:\n{file_a}") logger.info(f"file_b:\n{file_a}") # files_a = os.listdir(dir_a) # files_b = os.listdir(dir_b) # # matching_files = set(files_a) & set(files_b) # if matching_files: # file_a = os.path.join(dir_a, next(iter(matching_files))) # file_b = os.path.join(dir_b, next(iter(matching_files))) # break if not file_a or not file_b: logger.error(f"Missing files for comparison at index {current_index}") session_data['current_index'] += 1 save_session_data(username, session_data) return redirect(url_for('experiment', username=username)) visualization_a = url_for('send_visualization', filename=file_a) visualization_b = url_for('send_visualization', filename=file_b) statement = """ Please note that in select row function, starting index is 0 for Chain-of-Table 1 for Dater and Index * represents the selection of the whole Table. You are now given two explanations that describe the reasoning process of the Table QA model. Please analyze the explanations and determine which one provides a clearer and more accurate reasoning process. """ return render_template('experiment.html', sample_id=current_index, statement=statement, visualization_a=visualization_a, visualization_b=visualization_b, method_a=method_a, method_b=method_b, username=username) except Exception as e: logger.exception(f"An error occurred in the experiment route: {e}") return "An error occurred", 500 def get_method_dir(method): if method == 'No-XAI': return 'NO_XAI' elif method == 'Dater': return 'DATER' elif method == 'Chain-of-Table': return 'COT' elif method == 'Plan-of-SQLs': return 'POS' def get_visualization_dir(method): if method == "No-XAI": return 'htmls_NO_XAI' elif method == "Dater": return 'htmls_DATER' elif method == "Chain-of-Table": return 'htmls_COT' else: # Plan-of-SQLs return 'htmls_POS' @app.route('/feedback', methods=['POST']) def feedback(): try: username = request.form['username'] feedback = request.form['feedback'] session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) # Store the feedback session_data['responses'].append({ 'sample_id': session_data['current_index'], 'preferred_method': feedback, 'timestamp': datetime.now().isoformat() }) # Move to the next sample session_data['current_index'] += 1 # Save updated session data save_session_data(username, session_data) logger.info(f"Feedback saved for user {username}, sample {session_data['current_index'] - 1}") if session_data['current_index'] >= len(session_data['selected_samples']): return redirect(url_for('completed', username=username)) return redirect(url_for('experiment', username=username)) except Exception as e: logger.exception(f"Error in feedback route: {e}") return "An error occurred", 500 @app.route('/completed/') def completed(username): try: session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) session_data['end_time'] = datetime.now().isoformat() methods = session_data['methods'] responses = session_data['responses'] preferences = {method: 0 for method in methods} total_responses = len(responses) for response in responses: preferred_method = response['preferred_method'] preferences[preferred_method] += 1 for method in preferences: preferences[method] = round((preferences[method] / total_responses) * 100, 2) session_data['preferences'] = preferences # Save the final updated session data save_session_data(username, session_data) return render_template('completed.html', preferences=preferences) except Exception as e: logger.exception(f"An error occurred in the completed route: {e}") return "An error occurred", 500 @app.route('/visualizations/') def send_visualization(filename): logger.info(f"Attempting to serve file: {filename}") # Ensure the path is safe and doesn't allow access to files outside the intended directory base_dir = os.getcwd() file_path = os.path.normpath(os.path.join(base_dir, filename)) if not file_path.startswith(base_dir): return "Access denied", 403 if not os.path.exists(file_path): return "File not found", 404 directory = os.path.dirname(file_path) file_name = os.path.basename(file_path) logger.info(f"Serving file from directory: {directory}, filename: {file_name}") return send_from_directory(directory, file_name) if __name__ == "__main__": os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists app.run(host="0.0.0.0", port=7860, debug=True)