luulinh90s's picture
update
fafbcc3
raw
history blame
10.6 kB
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session, safe_join
import json
import random
import os
import string
import logging
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
])
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key
# Directories for visualizations
VISUALIZATION_DIRS_PLAN_OF_SQLS = {
"TP": "visualizations/TP",
"TN": "visualizations/TN",
"FP": "visualizations/FP",
"FN": "visualizations/FN"
}
VISUALIZATION_DIRS_CHAIN_OF_TABLE = {
"TP": "htmls_COT/TP",
"TN": "htmls_COT/TN",
"FP": "htmls_COT/FP",
"FN": "htmls_COT/FN"
}
# Function to save session data to a file
def save_session_data(username, data):
os.makedirs('session_data', exist_ok=True)
with open(f'session_data/{username}_session.json', 'w') as f:
json.dump(data, f)
# Function to load session data from a file
def load_session_data(username):
try:
with open(f'session_data/{username}_session.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
# Load all sample files from the directories based on the selected method
def load_samples(method):
logger.info(f"Loading samples for method: {method}")
if method == "Chain-of-Table":
visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE
else:
visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS
samples = {"TP": [], "TN": [], "FP": [], "FN": []}
for category, dir_path in visualization_dirs.items():
try:
for filename in os.listdir(dir_path):
if filename.endswith(".html"):
samples[category].append(filename)
logger.info(f"Loaded {len(samples[category])} samples for category {category}")
except Exception as e:
logger.exception(f"Error loading samples from {dir_path}: {e}")
return samples
# Randomly select balanced samples
def select_balanced_samples(samples):
try:
tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5)
tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5)
logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}")
return tp_fp_samples + tn_fn_samples
except Exception as e:
logger.exception("Error selecting balanced samples")
return []
def generate_random_string(length=8):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
@app.route('/', methods=['GET', 'POST'])
def index():
logger.info("Rendering index page.")
if request.method == 'POST':
username = request.form.get('username')
seed = request.form.get('seed')
method = request.form.get('method')
if not username or not seed or not method:
logger.error("Missing username, seed, or method.")
return "Missing username, seed, or method", 400
try:
seed = int(seed)
random.seed(seed)
all_samples = load_samples(method)
selected_samples = select_balanced_samples(all_samples)
logger.info(f"Number of selected samples: {len(selected_samples)}")
if len(selected_samples) == 0:
logger.error("No samples were selected.")
return "No samples were selected", 500
filename = f'{username}_{seed}_{method}_{generate_random_string()}.json'
logger.info(f"Generated filename: {filename}")
# Save session data
session_data = {
'responses': [],
'username': username,
'selected_samples': selected_samples,
'method': method,
'filename': filename,
'current_index': 0
}
save_session_data(username, session_data)
logger.info(f"Session data saved for user: {username}")
return redirect(url_for('experiment', username=username))
except Exception as e:
logger.exception(f"Error in index route: {e}")
return "An error occurred", 500
return render_template('index.html')
@app.route('/experiment/<username>', methods=['GET', 'POST'])
def experiment(username):
try:
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
selected_samples = session_data['selected_samples']
method = session_data['method']
current_index = session_data['current_index']
if current_index >= len(selected_samples):
return redirect(url_for('completed', username=username))
visualization_file = selected_samples[current_index]
if method == "Chain-of-Table":
vis_dir = 'htmls_COT'
else:
vis_dir = 'visualizations'
# Determine the correct visualization directory based on the category
for category, dir_path in (VISUALIZATION_DIRS_CHAIN_OF_TABLE if method == "Chain-of-Table" else VISUALIZATION_DIRS_PLAN_OF_SQLS).items():
if visualization_file in os.listdir(dir_path):
visualization_path = os.path.join(vis_dir, category, visualization_file)
break
else:
logger.error(f"Visualization file {visualization_file} not found.")
return "Visualization file not found", 404
logger.info(f"Rendering experiment page with visualization: {visualization_path}")
statement = "Please make a decision to Accept/Reject the AI prediction based on the explanation."
return render_template('experiment.html',
sample_id=current_index,
statement=statement,
visualization=url_for('send_visualization', filename=visualization_path),
username=username)
except Exception as e:
logger.exception(f"An error occurred in the experiment route: {e}")
return "An error occurred", 500
@app.route('/feedback', methods=['POST'])
def feedback():
try:
username = request.form['username']
feedback = request.form['feedback']
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
# Store the feedback
session_data['responses'].append({
'sample_id': session_data['current_index'],
'feedback': feedback
})
# Move to the next sample
session_data['current_index'] += 1
# Save updated session data
save_session_data(username, session_data)
if session_data['current_index'] >= len(session_data['selected_samples']):
return redirect(url_for('completed', username=username))
return redirect(url_for('experiment', username=username))
except Exception as e:
logger.exception(f"Error in feedback route: {e}")
return "An error occurred", 500
@app.route('/completed/<username>')
def completed(username):
try:
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
responses = session_data['responses']
method = session_data['method']
if method == "Chain-of-Table":
json_file = 'Tabular_LLMs_human_study_vis_6_COT.json'
else: # Default to Plan-of-SQLs
json_file = 'Tabular_LLMs_human_study_vis_6.json'
with open(json_file, 'r') as f:
ground_truth = json.load(f)
correct_responses = 0
accept_count = 0
reject_count = 0
for response in responses:
sample_id = response['sample_id']
feedback = response['feedback']
visualization_file = session_data['selected_samples'][sample_id]
index = visualization_file.split('-')[1].split('.')[0] # Extract index from filename
if feedback.upper() == "TRUE":
accept_count += 1
elif feedback.upper() == "FALSE":
reject_count += 1
if method == "Chain-of-Table":
ground_truth_key = f"COT_test-{index}.html"
else:
ground_truth_key = f"POS_test-{index}.html"
if ground_truth_key in ground_truth and ground_truth[ground_truth_key]['answer'].upper() == feedback.upper():
correct_responses += 1
else:
logger.warning(f"Missing or mismatched key: {ground_truth_key}")
accuracy = (correct_responses / len(responses)) * 100 if responses else 0
accuracy = round(accuracy, 2)
accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0
reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0
accept_percentage = round(accept_percentage, 2)
reject_percentage = round(reject_percentage, 2)
return render_template('completed.html',
accuracy=accuracy,
accept_percentage=accept_percentage,
reject_percentage=reject_percentage)
except Exception as e:
logger.exception(f"An error occurred in the completed route: {e}")
return "An error occurred", 500
@app.route('/visualizations/<path:filename>')
def send_visualization(filename):
logger.info(f"Attempting to serve file: {filename}")
# Ensure the path is safe and doesn't allow access to files outside the intended directory
safe_path = safe_join(os.getcwd(), filename)
directory = os.path.dirname(safe_path)
file_name = os.path.basename(safe_path)
logger.info(f"Serving file from directory: {directory}, filename: {file_name}")
return send_from_directory(directory, file_name)
if __name__ == "__main__":
os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists
app.run(host="0.0.0.0", port=7860, debug=True)