luulinh90s's picture
update
782e7af
raw
history blame
14.6 kB
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session
import json
import random
import os
import string
import logging
from datetime import datetime
import os
from huggingface_hub import login
# Use the Hugging Face token from environment variables
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
login(token=hf_token)
else:
logger.error("HF_TOKEN not found in environment variables")
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
])
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key
# Directories for visualizations
VISUALIZATION_DIRS_PLAN_OF_SQLS = {
"TP": "htmls_POS/TP",
"TN": "htmls_POS/TN",
"FP": "htmls_POS/FP",
"FN": "htmls_POS/FN"
}
VISUALIZATION_DIRS_CHAIN_OF_TABLE = {
"TP": "htmls_COT/TP",
"TN": "htmls_COT/TN",
"FP": "htmls_COT/FP",
"FN": "htmls_COT/FN"
}
VISUALIZATION_DIRS_NO_XAI = {
"TP": "htmls_NO_XAI/TP",
"TN": "htmls_NO_XAI/TN",
"FP": "htmls_NO_XAI/FP",
"FN": "htmls_NO_XAI/FN"
}
VISUALIZATION_DIRS_DATER = {
"TP": "htmls_DATER/TP",
"TN": "htmls_DATER/TN",
"FP": "htmls_DATER/FP",
"FN": "htmls_DATER/FN"
}
import json
import os
from datetime import datetime
from huggingface_hub import HfApi
def save_session_data(username, data):
try:
# Extract seed and start_time from the data
seed = data.get('seed', 'unknown')
start_time = data.get('start_time', datetime.now().isoformat())
# Create a filename with username, seed, and start_time
file_name = f'{username}_seed{seed}_{start_time}_session.json'
# Remove any characters that might not be safe for filenames
file_name = "".join(c for c in file_name if c.isalnum() or c in ['_', '-', '.'])
# Convert data to JSON string
json_data = json.dumps(data, indent=4)
# Create a temporary file
temp_file_path = f"/tmp/{file_name}"
with open(temp_file_path, 'w') as f:
f.write(json_data)
# Upload the file to the Hugging Face Space, overwriting if it exists
api = HfApi()
api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo=f"session_data_pref/{file_name}",
repo_id="luulinh90s/Tabular-LLM-Study-Data",
repo_type="space",
)
# Remove the temporary file
os.remove(temp_file_path)
logger.info(f"Session data saved for user {username} with seed {seed} and start time {start_time} in Hugging Face Data Space")
except Exception as e:
logger.exception(f"Error saving session data for user {username}: {e}")
from huggingface_hub import hf_hub_download, HfApi
def load_session_data(username):
try:
# List all files in the repo
api = HfApi()
files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space")
# Filter files for the user
user_files = [f for f in files if f.startswith(f'session_data_pref/{username}_')]
if not user_files:
logger.warning(f"No session data found for user {username}")
return None
# Get the most recent file
latest_file = sorted(user_files, reverse=True)[0]
# Download the file from the data storage Space
file_path = hf_hub_download(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space",
filename=latest_file)
with open(file_path, 'r') as f:
data = json.load(f)
logger.info(f"Session data loaded for user {username} from Hugging Face Data Space")
return data
except Exception as e:
logger.exception(f"Error loading session data for user {username}: {e}")
return None
def load_samples(methods):
logger.info(f"Loading samples for methods: {methods}")
samples = set() # Use a set to avoid duplicates
categories = ["TP", "TN", "FP", "FN"]
method_dirs = [get_method_dir(method) for method in methods]
for category in categories:
dir_a = f'htmls_{method_dirs[0].upper()}/{category}'
dir_b = f'htmls_{method_dirs[1].upper()}/{category}'
files_a = set(os.listdir(dir_a))
files_b = set(os.listdir(dir_b))
matching_files = files_a & files_b
for file in matching_files:
samples.add((category, file))
# Convert set of tuples back to list of dictionaries
samples = [{'category': category, 'file': file} for category, file in samples]
logger.info(f"Loaded {len(samples)} unique samples across all categories")
return samples
def select_balanced_samples(samples):
try:
# Ensure we have at least 10 unique samples
unique_samples = list({(s['category'], s['file']) for s in samples})
if len(unique_samples) < 10:
logger.warning(f"Not enough unique samples. Only {len(unique_samples)} available.")
selected_samples = unique_samples
else:
selected_samples = random.sample(unique_samples, 10)
logger.info(f"Unique sampled samples:\n{selected_samples}")
# Convert back to dictionary format
selected_samples = [{'category': category, 'file': file} for category, file in selected_samples]
logger.info(f"Selected {len(selected_samples)} unique samples")
return selected_samples
except Exception as e:
logger.exception("Error selecting balanced samples")
return []
def generate_random_string(length=8):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
@app.route('/', methods=['GET', 'POST'])
def index():
logger.info("Rendering index page.")
if request.method == 'POST':
username = request.form.get('username')
seed = request.form.get('seed')
methods = request.form.get('method').split(',')
if not username or not seed or len(methods) != 2:
logger.error("Missing username, seed, or incorrect number of methods.")
return "Please fill in all fields and select exactly two methods.", 400
try:
seed_int = int(seed) # Convert to int for random.seed()
random.seed(seed_int)
all_samples = load_samples(methods)
selected_samples = select_balanced_samples(all_samples)
logger.info(f"Number of selected samples: {len(selected_samples)}")
if len(selected_samples) == 0:
logger.error("No samples were selected.")
return "No samples were selected", 500
start_time = datetime.now().isoformat()
session_data = {
'username': username,
'seed': seed, # Store as string
'methods': methods,
'selected_samples': selected_samples,
'current_index': 0,
'responses': [],
'start_time': start_time
}
save_session_data(username, session_data)
logger.info(f"Session data initialized for user: {username}")
return redirect(url_for('experiment', username=username))
except Exception as e:
logger.exception(f"Error in index route: {e}")
return "An error occurred", 500
return render_template('index.html')
@app.route('/experiment/<username>', methods=['GET', 'POST'])
def experiment(username):
try:
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
logger.info(f"Session data:\n{session_data}")
selected_samples = session_data['selected_samples']
methods = session_data['methods']
current_index = session_data['current_index']
logger.info(f"current_index:\n{current_index}")
if current_index >= len(selected_samples):
return redirect(url_for('completed', username=username))
sample = selected_samples[current_index]
logger.info(f"sample:\n{sample}")
method_a, method_b = methods
# Find matching files for both methods
method_a_dir = get_method_dir(method_a)
method_b_dir = get_method_dir(method_b)
# for category in ['TP', 'TN', 'FP', 'FN']:
category = sample['category']
dir_a = f'htmls_{method_a_dir.upper()}/{category}'
dir_b = f'htmls_{method_b_dir.upper()}/{category}'
file_a = os.path.join(dir_a, sample['file'])
file_b = os.path.join(dir_b, sample['file'])
logger.info(f"file_a:\n{file_a}")
logger.info(f"file_b:\n{file_a}")
# files_a = os.listdir(dir_a)
# files_b = os.listdir(dir_b)
#
# matching_files = set(files_a) & set(files_b)
# if matching_files:
# file_a = os.path.join(dir_a, next(iter(matching_files)))
# file_b = os.path.join(dir_b, next(iter(matching_files)))
# break
if not file_a or not file_b:
logger.error(f"Missing files for comparison at index {current_index}")
session_data['current_index'] += 1
save_session_data(username, session_data)
return redirect(url_for('experiment', username=username))
visualization_a = url_for('send_visualization', filename=file_a)
visualization_b = url_for('send_visualization', filename=file_b)
statement = """
Please note that in select row function, starting index is 0 for Chain-of-Table 1 for Dater and Index * represents the selection of the whole Table.
You are now given two explanations that describe the reasoning process of the Table QA model.
Please analyze the explanations and determine which one provides a clearer and more accurate reasoning process.
"""
return render_template('experiment.html',
sample_id=current_index,
statement=statement,
visualization_a=visualization_a,
visualization_b=visualization_b,
method_a=method_a,
method_b=method_b,
username=username)
except Exception as e:
logger.exception(f"An error occurred in the experiment route: {e}")
return "An error occurred", 500
def get_method_dir(method):
if method == 'No-XAI':
return 'NO_XAI'
elif method == 'Dater':
return 'DATER'
elif method == 'Chain-of-Table':
return 'COT'
elif method == 'Plan-of-SQLs':
return 'POS'
def get_visualization_dir(method):
if method == "No-XAI":
return 'htmls_NO_XAI'
elif method == "Dater":
return 'htmls_DATER'
elif method == "Chain-of-Table":
return 'htmls_COT'
else: # Plan-of-SQLs
return 'htmls_POS'
@app.route('/feedback', methods=['POST'])
def feedback():
try:
username = request.form['username']
feedback = request.form['feedback']
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
# Store the feedback
session_data['responses'].append({
'sample_id': session_data['current_index'],
'preferred_method': feedback,
'timestamp': datetime.now().isoformat()
})
# Move to the next sample
session_data['current_index'] += 1
# Save updated session data
save_session_data(username, session_data)
logger.info(f"Feedback saved for user {username}, sample {session_data['current_index'] - 1}")
if session_data['current_index'] >= len(session_data['selected_samples']):
return redirect(url_for('completed', username=username))
return redirect(url_for('experiment', username=username))
except Exception as e:
logger.exception(f"Error in feedback route: {e}")
return "An error occurred", 500
@app.route('/completed/<username>')
def completed(username):
try:
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
session_data['end_time'] = datetime.now().isoformat()
methods = session_data['methods']
responses = session_data['responses']
preferences = {method: 0 for method in methods}
total_responses = len(responses)
for response in responses:
preferred_method = response['preferred_method']
preferences[preferred_method] += 1
for method in preferences:
preferences[method] = round((preferences[method] / total_responses) * 100, 2)
session_data['preferences'] = preferences
# Save the final updated session data
save_session_data(username, session_data)
return render_template('completed.html', preferences=preferences)
except Exception as e:
logger.exception(f"An error occurred in the completed route: {e}")
return "An error occurred", 500
@app.route('/visualizations/<path:filename>')
def send_visualization(filename):
logger.info(f"Attempting to serve file: {filename}")
# Ensure the path is safe and doesn't allow access to files outside the intended directory
base_dir = os.getcwd()
file_path = os.path.normpath(os.path.join(base_dir, filename))
if not file_path.startswith(base_dir):
return "Access denied", 403
if not os.path.exists(file_path):
return "File not found", 404
directory = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
logger.info(f"Serving file from directory: {directory}, filename: {file_name}")
return send_from_directory(directory, file_name)
if __name__ == "__main__":
os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists
app.run(host="0.0.0.0", port=7860, debug=True)