luulinh90s's picture
update
92bf86f
raw
history blame
13.2 kB
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session
import json
import random
import os
import string
import logging
from datetime import datetime
import os
from huggingface_hub import login
# Use the Hugging Face token from environment variables
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
login(token=hf_token)
else:
logger.error("HF_TOKEN not found in environment variables")
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
])
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key
# Directories for visualizations
VISUALIZATION_DIRS_PLAN_OF_SQLS = {
"TP": "htmls_POS/TP",
"TN": "htmls_POS/TN",
"FP": "htmls_POS/FP",
"FN": "htmls_POS/FN"
}
VISUALIZATION_DIRS_CHAIN_OF_TABLE = {
"TP": "htmls_COT/TP",
"TN": "htmls_COT/TN",
"FP": "htmls_COT/FP",
"FN": "htmls_COT/FN"
}
VISUALIZATION_DIRS_NO_XAI = {
"TP": "htmls_NO_XAI/TP",
"TN": "htmls_NO_XAI/TN",
"FP": "htmls_NO_XAI/FP",
"FN": "htmls_NO_XAI/FN"
}
VISUALIZATION_DIRS_DATER = {
"TP": "htmls_DATER/TP",
"TN": "htmls_DATER/TN",
"FP": "htmls_DATER/FP",
"FN": "htmls_DATER/FN"
}
import json
import os
from datetime import datetime
from huggingface_hub import HfApi
from huggingface_hub import HfApi
def save_session_data(username, data):
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f'{username}_{timestamp}_session.json'
# Convert data to JSON string
json_data = json.dumps(data, indent=4)
# Create a temporary file
temp_file_path = f"/tmp/{file_name}"
with open(temp_file_path, 'w') as f:
f.write(json_data)
# Upload the file to a separate Hugging Face Space dedicated to data storage
api = HfApi()
api.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo=f"session_data_pref/{file_name}",
repo_id="luulinh90s/Tabular-LLM-Study-Data",
repo_type="space",
)
# Remove the temporary file
os.remove(temp_file_path)
logger.info(f"Session data saved for user {username} in Hugging Face Data Space")
except Exception as e:
logger.exception(f"Error saving session data for user {username}: {e}")
from huggingface_hub import hf_hub_download, HfApi
def load_session_data(username):
try:
# List all files in the repo
api = HfApi()
files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space")
# Filter and sort files for the user
user_files = [f for f in files if f.startswith(f'session_data_pref/{username}_') and f.endswith('_session.json')]
if not user_files:
logger.warning(f"No session data found for user {username}")
return None
# Get the most recent file
latest_file = sorted(user_files, reverse=True)[0]
# Download the file from the data storage Space
file_path = hf_hub_download(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space",
filename=latest_file)
with open(file_path, 'r') as f:
data = json.load(f)
logger.info(f"Session data loaded for user {username} from Hugging Face Data Space")
return data
except Exception as e:
logger.exception(f"Error loading session data for user {username}: {e}")
return None
def load_samples(methods):
logger.info(f"Loading samples for methods: {methods}")
samples = []
categories = ["TP", "TN", "FP", "FN"]
method_dirs = []
for method in methods:
if method == 'No-XAI':
method_dirs.append('NO_XAI')
elif method == 'Dater':
method_dirs.append('DATER')
elif method == 'Chain-of-Table':
method_dirs.append('COT')
elif method == 'Plan-of-SQLs':
method_dirs.append('POS')
for category in categories:
dir_a = f'htmls_{method_dirs[0].upper()}/{category}'
dir_b = f'htmls_{method_dirs[1].upper()}/{category}'
files_a = set(os.listdir(dir_a))
files_b = set(os.listdir(dir_b))
matching_files = files_a & files_b
for file in matching_files:
samples.append({
'category': category,
'file': file
})
return samples
def select_balanced_samples(samples):
try:
selected_samples = random.sample(samples, min(10, len(samples)))
logger.info(f"Selected balanced samples: {len(selected_samples)}")
return selected_samples
except Exception as e:
logger.exception("Error selecting balanced samples")
return []
def generate_random_string(length=8):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
@app.route('/', methods=['GET', 'POST'])
def index():
logger.info("Rendering index page.")
if request.method == 'POST':
username = request.form.get('username')
seed = request.form.get('seed')
methods = request.form.get('method').split(',')
if not username or not seed or len(methods) != 2:
logger.error("Missing username, seed, or incorrect number of methods.")
return "Please fill in all fields and select exactly two methods.", 400
try:
seed = int(seed)
random.seed(seed)
all_samples = load_samples(methods)
selected_samples = select_balanced_samples(all_samples)
logger.info(f"Number of selected samples: {len(selected_samples)}")
if len(selected_samples) == 0:
logger.error("No samples were selected.")
return "No samples were selected", 500
session_data = {
'username': username,
'seed': seed,
'methods': methods,
'selected_samples': selected_samples,
'current_index': 0,
'responses': [],
'start_time': datetime.now().isoformat()
}
save_session_data(username, session_data)
logger.info(f"Session data initialized for user: {username}")
return redirect(url_for('experiment', username=username))
except Exception as e:
logger.exception(f"Error in index route: {e}")
return "An error occurred", 500
return render_template('index.html')
@app.route('/experiment/<username>', methods=['GET', 'POST'])
def experiment(username):
try:
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
selected_samples = session_data['selected_samples']
methods = session_data['methods']
current_index = session_data['current_index']
if current_index >= len(selected_samples):
return redirect(url_for('completed', username=username))
sample = selected_samples[current_index]
method_a, method_b = methods
# Find matching files for both methods
file_a = None
file_b = None
method_a_dir = get_method_dir(method_a)
method_b_dir = get_method_dir(method_b)
for category in ['TP', 'TN', 'FP', 'FN']:
dir_a = f'htmls_{method_a_dir.upper()}/{category}'
dir_b = f'htmls_{method_b_dir.upper()}/{category}'
files_a = os.listdir(dir_a)
files_b = os.listdir(dir_b)
matching_files = set(files_a) & set(files_b)
if matching_files:
file_a = os.path.join(dir_a, next(iter(matching_files)))
file_b = os.path.join(dir_b, next(iter(matching_files)))
break
if not file_a or not file_b:
logger.error(f"Missing files for comparison at index {current_index}")
session_data['current_index'] += 1
save_session_data(username, session_data)
return redirect(url_for('experiment', username=username))
visualization_a = url_for('send_visualization', filename=file_a)
visualization_b = url_for('send_visualization', filename=file_b)
statement = """
You are given two explanations that describe the reasoning process of the Table QA model.
Please analyze the explanations and determine which one provides a clearer and more accurate reasoning process.
"""
return render_template('experiment.html',
sample_id=current_index,
statement=statement,
visualization_a=visualization_a,
visualization_b=visualization_b,
method_a=method_a,
method_b=method_b,
username=username)
except Exception as e:
logger.exception(f"An error occurred in the experiment route: {e}")
return "An error occurred", 500
def get_method_dir(method):
if method == 'No-XAI':
return 'NO_XAI'
elif method == 'Dater':
return 'DATER'
elif method == 'Chain-of-Table':
return 'COT'
elif method == 'Plan-of-SQLs':
return 'POS'
def get_visualization_dir(method):
if method == "No-XAI":
return 'htmls_NO_XAI'
elif method == "Dater":
return 'htmls_DATER'
elif method == "Chain-of-Table":
return 'htmls_COT'
else: # Plan-of-SQLs
return 'htmls_POS'
@app.route('/feedback', methods=['POST'])
def feedback():
try:
username = request.form['username']
feedback = request.form['feedback']
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
# Store the feedback
session_data['responses'].append({
'sample_id': session_data['current_index'],
'preferred_method': feedback,
'timestamp': datetime.now().isoformat()
})
# Move to the next sample
session_data['current_index'] += 1
# Save updated session data
save_session_data(username, session_data)
logger.info(f"Feedback saved for user {username}, sample {session_data['current_index'] - 1}")
if session_data['current_index'] >= len(session_data['selected_samples']):
return redirect(url_for('completed', username=username))
return redirect(url_for('experiment', username=username))
except Exception as e:
logger.exception(f"Error in feedback route: {e}")
return "An error occurred", 500
@app.route('/completed/<username>')
def completed(username):
try:
session_data = load_session_data(username)
if not session_data:
logger.error(f"No session data found for user: {username}")
return redirect(url_for('index'))
session_data['end_time'] = datetime.now().isoformat()
methods = session_data['methods']
responses = session_data['responses']
preferences = {method: 0 for method in methods}
total_responses = len(responses)
for response in responses:
preferred_method = response['preferred_method']
preferences[preferred_method] += 1
for method in preferences:
preferences[method] = round((preferences[method] / total_responses) * 100, 2)
session_data['preferences'] = preferences
save_session_data(username, session_data)
return render_template('completed.html', preferences=preferences)
except Exception as e:
logger.exception(f"An error occurred in the completed route: {e}")
return "An error occurred", 500
@app.route('/visualizations/<path:filename>')
def send_visualization(filename):
logger.info(f"Attempting to serve file: {filename}")
# Ensure the path is safe and doesn't allow access to files outside the intended directory
base_dir = os.getcwd()
file_path = os.path.normpath(os.path.join(base_dir, filename))
if not file_path.startswith(base_dir):
return "Access denied", 403
if not os.path.exists(file_path):
return "File not found", 404
directory = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
logger.info(f"Serving file from directory: {directory}, filename: {file_name}")
return send_from_directory(directory, file_name)
if __name__ == "__main__":
os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists
app.run(host="0.0.0.0", port=7860, debug=True)