# from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session # import json # import random # import os # import string # from flask_session import Session # import logging # # # Set up logging # logging.basicConfig(level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler("app.log"), # logging.StreamHandler() # ]) # logger = logging.getLogger(__name__) # # app = Flask(__name__) # app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key # app.config['SESSION_TYPE'] = 'filesystem' # Session(app) # # # Directories for visualizations # VISUALIZATION_DIRS_PLAN_OF_SQLS = { # "TP": "visualizations/TP", # "TN": "visualizations/TN", # "FP": "visualizations/FP", # "FN": "visualizations/FN" # } # # VISUALIZATION_DIRS_CHAIN_OF_TABLE = { # "TP": "htmls_COT/TP", # "TN": "htmls_COT/TN", # "FP": "htmls_COT/FP", # "FN": "htmls_COT/FN" # } # # # # Load all sample files from the directories based on the selected method # def load_samples(method): # logger.info(f"Loading samples for method: {method}") # if method == "Chain-of-Table": # visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE # else: # visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS # # samples = {"TP": [], "TN": [], "FP": [], "FN": []} # for category, dir_path in visualization_dirs.items(): # try: # for filename in os.listdir(dir_path): # if filename.endswith(".html"): # samples[category].append(filename) # logger.info(f"Loaded {len(samples[category])} samples for category {category}") # except Exception as e: # logger.exception(f"Error loading samples from {dir_path}: {e}") # return samples # # # # Randomly select balanced samples # def select_balanced_samples(samples): # try: # tp_fp_samples = random.sample(samples["TP"] + samples["FP"], 5) # tn_fn_samples = random.sample(samples["TN"] + samples["FN"], 5) # logger.info(f"Selected balanced samples: {len(tp_fp_samples + tn_fn_samples)}") # return tp_fp_samples + tn_fn_samples # except Exception as e: # logger.exception("Error selecting balanced samples") # return [] # # # def generate_random_string(length=8): # return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) # # # @app.route('/', methods=['GET', 'POST']) # def index(): # logger.info("Rendering index page.") # if request.method == 'POST': # username = request.form.get('username') # seed = request.form.get('seed') # method = request.form.get('method') # # if not username or not seed or not method: # logger.error("Missing username, seed, or method.") # return "Missing username, seed, or method", 400 # # try: # seed = int(seed) # random.seed(seed) # all_samples = load_samples(method) # selected_samples = select_balanced_samples(all_samples) # random_string = generate_random_string() # filename = f'{username}_{seed}_{method}_{random_string}.json' # # logger.info(f"Generated filename: {filename}") # # session['selected_samples'] = selected_samples # session['responses'] = [] # Initialize responses list # session['method'] = method # Store the selected method # # return redirect(url_for('experiment', username=username, sample_index=0, seed=seed, filename=filename)) # except Exception as e: # logger.exception(f"Error in index route: {e}") # return "An error occurred", 500 # return render_template('index.html') # # # @app.route('/experiment////', methods=['GET']) # def experiment(username, sample_index, seed, filename): # try: # sample_index = int(sample_index) # selected_samples = session.get('selected_samples', []) # method = session.get('method') # Retrieve the selected method # # if sample_index >= len(selected_samples): # return redirect(url_for('completed', filename=filename)) # # visualization_file = selected_samples[sample_index] # visualization_path = None # # # Determine the correct visualization directory based on the method # if method == "Chain-of-Table": # visualization_dirs = VISUALIZATION_DIRS_CHAIN_OF_TABLE # else: # visualization_dirs = VISUALIZATION_DIRS_PLAN_OF_SQLS # # # Find the correct visualization path # for category, dir_path in visualization_dirs.items(): # if visualization_file in os.listdir(dir_path): # visualization_path = f"{category}/{visualization_file}" # break # # if not visualization_path: # logger.error("Visualization file not found.") # return "Visualization file not found", 404 # # statement = "Please make a decision to Accept/Reject the AI prediction based on the explanation." # return render_template('experiment.html', # sample_id=sample_index, # statement=statement, # visualization=visualization_path, # username=username, # seed=seed, # sample_index=sample_index, # filename=filename) # except Exception as e: # logger.exception(f"An error occurred in the experiment route: {e}") # return "An error occurred", 500 # # # @app.route('/visualizations/') # def send_visualization(path): # try: # method = session.get('method') # if method == "Chain-of-Table": # visualization_dir = 'htmls_COT' # else: # Default to Plan-of-SQLs # visualization_dir = 'visualizations' # # return send_from_directory(visualization_dir, path) # except Exception as e: # logger.exception(f"Error sending visualization: {e}") # return "An error occurred", 500 # # # @app.route('/feedback', methods=['POST']) # def feedback(): # try: # sample_id = request.form['sample_id'] # feedback = request.form['feedback'] # username = request.form['username'] # seed = request.form['seed'] # sample_index = int(request.form['sample_index']) # filename = request.form['filename'] # # selected_samples = session.get('selected_samples', []) # responses = session.get('responses', []) # # responses.append({ # 'sample_id': sample_id, # 'feedback': feedback # }) # session['responses'] = responses # # result_dir = 'human_study' # os.makedirs(result_dir, exist_ok=True) # # filepath = os.path.join(result_dir, filename) # if os.path.exists(filepath): # with open(filepath, 'r') as f: # data = json.load(f) # else: # data = {} # # data[sample_index] = { # 'Username': username, # 'Seed': seed, # 'Sample ID': sample_id, # 'Task': f"Please make a decision to Accept/Reject the AI prediction based on the explanation.", # 'User Feedback': feedback # } # # with open(filepath, 'w') as f: # json.dump(data, f, indent=4) # # logger.info(f"Feedback saved for sample {sample_id}") # # next_sample_index = sample_index + 1 # if next_sample_index >= len(selected_samples): # return redirect(url_for('completed', filename=filename)) # # return redirect( # url_for('experiment', username=username, sample_index=next_sample_index, seed=seed, filename=filename)) # except Exception as e: # logger.exception(f"Error in feedback route: {e}") # return "An error occurred", 500 # # # @app.route('/completed/') # def completed(filename): # try: # responses = session.get('responses', []) # method = session.get('method') # if method == "Chain-of-Table": # json_file = 'Tabular_LLMs_human_study_vis_6_COT.json' # else: # Default to Plan-of-SQLs # json_file = 'Tabular_LLMs_human_study_vis_6.json' # # with open(json_file, 'r') as f: # ground_truth = json.load(f) # # correct_responses = 0 # accept_count = 0 # reject_count = 0 # # for response in responses: # sample_id = response['sample_id'] # feedback = response['feedback'] # index = sample_id.split('-')[1].split('.')[0] # Extract index from filename # # if feedback.upper() == "TRUE": # accept_count += 1 # elif feedback.upper() == "FALSE": # reject_count += 1 # # if method == "Chain-of-Table": # ground_truth_key = f"COT_test-{index}.html" # else: # ground_truth_key = f"POS_test-{index}.html" # # if ground_truth_key in ground_truth and ground_truth[ground_truth_key][ # 'answer'].upper() == feedback.upper(): # correct_responses += 1 # else: # logger.warning(f"Missing or mismatched key: {ground_truth_key}") # # accuracy = (correct_responses / len(responses)) * 100 if responses else 0 # accuracy = round(accuracy, 2) # # accept_percentage = (accept_count / len(responses)) * 100 if len(responses) else 0 # reject_percentage = (reject_count / len(responses)) * 100 if len(responses) else 0 # # accept_percentage = round(accept_percentage, 2) # reject_percentage = round(reject_percentage, 2) # # return render_template('completed.html', # accuracy=accuracy, # accept_percentage=accept_percentage, # reject_percentage=reject_percentage) # except Exception as e: # logger.exception(f"Error in completed route: {e}") # return "An error occurred", 500 # # # if __name__ == '__main__': # try: # app.run(debug=False, port=7860) # except Exception as e: # logger.exception(f"Failed to start app: {e}") # from flask import Flask app = Flask(__name__) @app.route('/') def index(): return "Hello, world!" # if __name__ == '__main__': # app.run(debug=False, port=7860) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)