# from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session | |
# import json | |
# import random | |
# import os | |
# import string | |
# from flask_session import Session | |
# import logging | |
# | |
# # Set up logging | |
# logging.basicConfig(level=logging.INFO, | |
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
# handlers=[ | |
# logging.FileHandler("app.log"), | |
# logging.StreamHandler() | |
# ]) | |
# logger = logging.getLogger(__name__) | |
# | |
# app = Flask(__name__) | |
# app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key | |
# app.config['SESSION_TYPE'] = 'filesystem' | |
# Session(app) | |
# | |
# # Directories for visualizations | |
# VISUALIZATION_DIRS_PLAN_OF_SQLS = { | |
# "TP": "visualizations/TP", | |
# "TN": "visualizations/TN", | |
# "FP": "visualizations/FP", | |
# "FN": "visualizations/FN" | |
# } | |
# | |
# VISUALIZATION_DIRS_CHAIN_OF_TABLE = { | |
# "TP": "htmls_COT/TP", | |
# "TN": "htmls_COT/TN", | |
# "FP": "htmls_COT/FP", | |
# "FN": "htmls_COT/FN" | |
# } | |
# | |
# | |
# # Load all sample files from the directories based on the selected method | |
# def load_samples(method): | |
# logger.info(f"Loading samples for method: {method}") | |
# if method == "Chain-of-Table": | |
# visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE | |
# else: | |
# visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS | |
# | |
# samples = {"TP": [], "TN": [], "FP": [], "FN": []} | |
# for category, dir_path in visualization_dirs.items(): | |
# try: | |
# for filename in os.listdir(dir_path): | |
# if filename.endswith(".html"): | |
# samples[category].append(filename) | |
# logger.info(f"Loaded {len(samples[category])} samples for category {category}") | |
# except Exception as e: | |
# logger.exception(f"Error loading samples from {dir_path}: {e}") | |
# return samples | |
# | |
# | |
# # Randomly select balanced samples | |
# def select_balanced_samples(samples): | |
# try: | |
# tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5) | |
# tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5) | |
# logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}") | |
# return tp_fp_samples + tn_fn_samples | |
# except Exception as e: | |
# logger.exception("Error selecting balanced samples") | |
# return [] | |
# | |
# | |
# def generate_random_string(length=8): | |
# return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
# | |
# | |
# @app.route('/', methods=['GET', 'POST']) | |
# def index(): | |
# logger.info("Rendering index page.") | |
# if request.method == 'POST': | |
# username = request.form.get('username') | |
# seed = request.form.get('seed') | |
# method = request.form.get('method') | |
# | |
# if not username or not seed or not method: | |
# logger.error("Missing username, seed, or method.") | |
# return "Missing username, seed, or method", 400 | |
# | |
# try: | |
# seed = int(seed) | |
# random.seed(seed) | |
# all_samples = load_samples(method) | |
# selected_samples = select_balanced_samples(all_samples) | |
# random_string = generate_random_string() | |
# filename = f'{username}_{seed}_{method}_{random_string}.json' | |
# | |
# logger.info(f"Generated filename: {filename}") | |
# | |
# session['selected_samples'] = selected_samples | |
# session['responses'] = [] # Initialize responses list | |
# session['method'] = method # Store the selected method | |
# | |
# return redirect(url_for('experiment', username=username, sample_index=0, seed=seed, filename=filename)) | |
# except Exception as e: | |
# logger.exception(f"Error in index route: {e}") | |
# return "An error occurred", 500 | |
# return render_template('index.html') | |
# | |
# | |
# @app.route('/experiment/<username>/<sample_index>/<seed>/<filename>', methods=['GET']) | |
# def experiment(username, sample_index, seed, filename): | |
# try: | |
# sample_index = int(sample_index) | |
# selected_samples = session.get('selected_samples', []) | |
# method = session.get('method') # Retrieve the selected method | |
# | |
# if sample_index >= len(selected_samples): | |
# return redirect(url_for('completed', filename=filename)) | |
# | |
# visualization_file = selected_samples[sample_index] | |
# visualization_path = None | |
# | |
# # Determine the correct visualization directory based on the method | |
# if method == "Chain-of-Table": | |
# visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE | |
# else: | |
# visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS | |
# | |
# # Find the correct visualization path | |
# for category, dir_path in visualization_dirs.items(): | |
# if visualization_file in os.listdir(dir_path): | |
# visualization_path = f"{category}/{visualization_file}" | |
# break | |
# | |
# if not visualization_path: | |
# logger.error("Visualization file not found.") | |
# return "Visualization file not found", 404 | |
# | |
# statement = "Please make a decision to Accept/Reject the AI prediction based on the explanation." | |
# return render_template('experiment.html', | |
# sample_id=sample_index, | |
# statement=statement, | |
# visualization=visualization_path, | |
# username=username, | |
# seed=seed, | |
# sample_index=sample_index, | |
# filename=filename) | |
# except Exception as e: | |
# logger.exception(f"An error occurred in the experiment route: {e}") | |
# return "An error occurred", 500 | |
# | |
# | |
# @app.route('/visualizations/<path:path>') | |
# def send_visualization(path): | |
# try: | |
# method = session.get('method') | |
# if method == "Chain-of-Table": | |
# visualization_dir = 'htmls_COT' | |
# else: # Default to Plan-of-SQLs | |
# visualization_dir = 'visualizations' | |
# | |
# return send_from_directory(visualization_dir, path) | |
# except Exception as e: | |
# logger.exception(f"Error sending visualization: {e}") | |
# return "An error occurred", 500 | |
# | |
# | |
# @app.route('/feedback', methods=['POST']) | |
# def feedback(): | |
# try: | |
# sample_id = request.form['sample_id'] | |
# feedback = request.form['feedback'] | |
# username = request.form['username'] | |
# seed = request.form['seed'] | |
# sample_index = int(request.form['sample_index']) | |
# filename = request.form['filename'] | |
# | |
# selected_samples = session.get('selected_samples', []) | |
# responses = session.get('responses', []) | |
# | |
# responses.append({ | |
# 'sample_id': sample_id, | |
# 'feedback': feedback | |
# }) | |
# session['responses'] = responses | |
# | |
# result_dir = 'human_study' | |
# os.makedirs(result_dir, exist_ok=True) | |
# | |
# filepath = os.path.join(result_dir, filename) | |
# if os.path.exists(filepath): | |
# with open(filepath, 'r') as f: | |
# data = json.load(f) | |
# else: | |
# data = {} | |
# | |
# data[sample_index] = { | |
# 'Username': username, | |
# 'Seed': seed, | |
# 'Sample ID': sample_id, | |
# 'Task': f"Please make a decision to Accept/Reject the AI prediction based on the explanation.", | |
# 'User Feedback': feedback | |
# } | |
# | |
# with open(filepath, 'w') as f: | |
# json.dump(data, f, indent=4) | |
# | |
# logger.info(f"Feedback saved for sample {sample_id}") | |
# | |
# next_sample_index = sample_index + 1 | |
# if next_sample_index >= len(selected_samples): | |
# return redirect(url_for('completed', filename=filename)) | |
# | |
# return redirect( | |
# url_for('experiment', username=username, sample_index=next_sample_index, seed=seed, filename=filename)) | |
# except Exception as e: | |
# logger.exception(f"Error in feedback route: {e}") | |
# return "An error occurred", 500 | |
# | |
# | |
# @app.route('/completed/<filename>') | |
# def completed(filename): | |
# try: | |
# responses = session.get('responses', []) | |
# method = session.get('method') | |
# if method == "Chain-of-Table": | |
# json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' | |
# else: # Default to Plan-of-SQLs | |
# json_file = 'Tabular_LLMs_human_study_vis_6.json' | |
# | |
# with open(json_file, 'r') as f: | |
# ground_truth = json.load(f) | |
# | |
# correct_responses = 0 | |
# accept_count = 0 | |
# reject_count = 0 | |
# | |
# for response in responses: | |
# sample_id = response['sample_id'] | |
# feedback = response['feedback'] | |
# index = sample_id.split('-')[1].split('.')[0] # Extract index from filename | |
# | |
# if feedback.upper() == "TRUE": | |
# accept_count += 1 | |
# elif feedback.upper() == "FALSE": | |
# reject_count += 1 | |
# | |
# if method == "Chain-of-Table": | |
# ground_truth_key = f"COT_test-{index}.html" | |
# else: | |
# ground_truth_key = f"POS_test-{index}.html" | |
# | |
# if ground_truth_key in ground_truth and ground_truth[ground_truth_key][ | |
# 'answer'].upper() == feedback.upper(): | |
# correct_responses += 1 | |
# else: | |
# logger.warning(f"Missing or mismatched key: {ground_truth_key}") | |
# | |
# accuracy = (correct_responses / len(responses)) * 100 if responses else 0 | |
# accuracy = round(accuracy, 2) | |
# | |
# accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0 | |
# reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0 | |
# | |
# accept_percentage = round(accept_percentage, 2) | |
# reject_percentage = round(reject_percentage, 2) | |
# | |
# return render_template('completed.html', | |
# accuracy=accuracy, | |
# accept_percentage=accept_percentage, | |
# reject_percentage=reject_percentage) | |
# except Exception as e: | |
# logger.exception(f"Error in completed route: {e}") | |
# return "An error occurred", 500 | |
# | |
# | |
# if __name__ == '__main__': | |
# try: | |
# app.run(debug=False, port=7860) | |
# except Exception as e: | |
# logger.exception(f"Failed to start app: {e}") | |
# | |
from flask import Flask | |
app = Flask(__name__) | |
def index(): | |
return "Hello, world!" | |
if __name__ == '__main__': | |
app.run(debug=False, port=7860) | |