Muhammad541's picture
Update app.py
2f417d6 verified
raw
history blame
15.7 kB
import os
import pandas as pd
import torch
from sentence_transformers import SentenceTransformer, util
import faiss
import numpy as np
import pickle
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import scipy.special
from tqdm import tqdm
from tabulate import tabulate
from sklearn.feature_extraction.text import TfidfVectorizer
from multiprocessing import Pool, cpu_count
from flask import Flask, request, jsonify
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Disable tokenizers parallelism to avoid fork-related deadlocks
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Paths for saving artifacts
MODEL_DIR = "./saved_models" # Primary location in /app/saved_models
FALLBACK_MODEL_DIR = "/tmp/saved_models" # Fallback if ./saved_models fails
# Try to use the primary directory, fall back to /tmp if needed
try:
os.makedirs(MODEL_DIR, exist_ok=True)
logger.info(f"Using model directory: {MODEL_DIR}")
chosen_model_dir = MODEL_DIR
except Exception as e:
logger.warning(f"Failed to create {MODEL_DIR}: {e}. Using fallback directory.")
os.makedirs(FALLBACK_MODEL_DIR, exist_ok=True)
chosen_model_dir = FALLBACK_MODEL_DIR
# Update paths based on the chosen directory
UNIVERSAL_MODEL_PATH = os.path.join(chosen_model_dir, "universal_model")
DETECTOR_MODEL_PATH = os.path.join(chosen_model_dir, "detector_model")
TFIDF_PATH = os.path.join(chosen_model_dir, "tfidf_vectorizer.pkl")
SKILL_TFIDF_PATH = os.path.join(chosen_model_dir, "skill_tfidf.pkl")
QUESTION_ANSWER_PATH = os.path.join(chosen_model_dir, "question_to_answer.pkl")
FAISS_INDEX_PATH = os.path.join(chosen_model_dir, "faiss_index.index")
# Improved dataset loading with fallback
def load_dataset(file_path, required_columns=[], fallback_data=None):
try:
df = pd.read_csv(file_path)
for col in required_columns:
if col not in df.columns:
logger.warning(f"Column '{col}' missing in {file_path}. Using default values.")
df[col] = ""
return df
except Exception as e:
logger.error(f"Error loading {file_path}: {e}")
if fallback_data is not None:
logger.info(f"Using fallback data for {file_path}")
return pd.DataFrame(fallback_data)
return None
# Load datasets with fallbacks
questions_df = load_dataset("Generated_Skill-Based_Questions.csv", ["Skill", "Question", "Answer"], {
'Skill': ['Linux', 'Git', 'Node.js', 'Python', 'Kubernetes'],
'Question': ['Advanced Linux question', 'Advanced Git question', 'Basic Node.js question',
'Intermediate Python question', 'Basic Kubernetes question'],
'Answer': ['Linux answer', 'Git answer', 'Node.js answer', 'Python answer', 'Kubernetes answer']
})
courses_df = load_dataset("coursera_course_dataset_v2_no_null.csv", ["skills", "course_title", "Organization", "level"], {
'skills': ['Docker', 'Jenkins', 'Azure', 'Cybersecurity'],
'course_title': ['Docker Mastery', 'Jenkins CI/CD', 'Azure Fundamentals', 'Cybersecurity Basics'],
'Organization': ['Udemy', 'Coursera', 'Microsoft', 'edX'],
'level': ['Intermediate', 'Intermediate', 'Intermediate', 'Advanced'],
'popularity': [0.9, 0.85, 0.95, 0.8],
'completion_rate': [0.7, 0.65, 0.8, 0.6]
})
jobs_df = load_dataset("Updated_Job_Posting_Dataset.csv", ["job_title", "company_name", "location", "required_skills", "job_description"], {
'job_title': ['DevOps Engineer', 'Cloud Architect'],
'company_name': ['Tech Corp', 'Cloud Inc'],
'location': ['Remote', 'Silicon Valley'],
'required_skills': ['Linux, Cloud', 'AWS, Kubernetes'],
'job_description': ['DevOps role description', 'Cloud architecture position']
})
# Validate questions_df
if questions_df is None or questions_df.empty:
logger.error("questions_df is empty or could not be loaded. Exiting.")
exit(1)
if not all(col in questions_df.columns for col in ["Skill", "Question", "Answer"]):
logger.error("questions_df is missing required columns. Exiting.")
exit(1)
logger.info(f"questions_df loaded with {len(questions_df)} rows. Skills available: {questions_df['Skill'].unique().tolist()}")
# Load or Initialize Models
if os.path.exists(UNIVERSAL_MODEL_PATH):
universal_model = SentenceTransformer(UNIVERSAL_MODEL_PATH)
else:
universal_model = SentenceTransformer("all-MiniLM-L6-v2")
if os.path.exists(DETECTOR_MODEL_PATH):
detector_tokenizer = AutoTokenizer.from_pretrained(DETECTOR_MODEL_PATH)
detector_model = AutoModelForSequenceClassification.from_pretrained(DETECTOR_MODEL_PATH)
else:
detector_tokenizer = AutoTokenizer.from_pretrained("roberta-base-openai-detector")
detector_model = AutoModelForSequenceClassification.from_pretrained("roberta-base-openai-detector")
# Precompute Resources with Validation
def resources_valid(saved_skills, current_skills):
return set(saved_skills) == set(current_skills)
def initialize_resources(user_skills):
global tfidf_vectorizer, skill_tfidf, question_to_answer, faiss_index, answer_embeddings
user_skills_lower = [s.lower() for s in user_skills]
needs_recompute = False
if all(os.path.exists(p) for p in [TFIDF_PATH, SKILL_TFIDF_PATH, QUESTION_ANSWER_PATH, FAISS_INDEX_PATH]):
try:
with open(TFIDF_PATH, 'rb') as f:
tfidf_vectorizer = pickle.load(f)
with open(SKILL_TFIDF_PATH, 'rb') as f:
skill_tfidf = pickle.load(f)
with open(QUESTION_ANSWER_PATH, 'rb') as f:
question_to_answer = pickle.load(f)
faiss_index = faiss.read_index(FAISS_INDEX_PATH)
if set(skill_tfidf.keys()) != set(user_skills_lower):
logger.info("Skill mismatch detected, recomputing resources")
needs_recompute = True
except Exception as e:
logger.error(f"Error loading saved resources: {e}")
needs_recompute = True
else:
needs_recompute = True
if needs_recompute:
logger.info("Building new resources")
tfidf_vectorizer = TfidfVectorizer(stop_words='english')
all_texts = user_skills + questions_df['Answer'].fillna("").tolist() + questions_df['Question'].tolist()
tfidf_vectorizer.fit(all_texts)
skill_tfidf = {skill.lower(): tfidf_vectorizer.transform([skill]).toarray()[0] for skill in user_skills}
question_to_answer = dict(zip(questions_df['Question'], questions_df['Answer']))
answer_embeddings = universal_model.encode(list(question_to_answer.values()), convert_to_tensor=True).cpu().numpy()
faiss_index = faiss.IndexFlatL2(answer_embeddings.shape[1])
faiss_index.add(answer_embeddings)
# Save resources
with open(TFIDF_PATH, 'wb') as f: pickle.dump(tfidf_vectorizer, f)
with open(SKILL_TFIDF_PATH, 'wb') as f: pickle.dump(skill_tfidf, f)
with open(QUESTION_ANSWER_PATH, 'wb') as f: pickle.dump(question_to_answer, f)
faiss.write_index(faiss_index, FAISS_INDEX_PATH)
universal_model.save(UNIVERSAL_MODEL_PATH)
logger.info(f"Resources saved to {chosen_model_dir}")
# Enhanced evaluation with error handling
def evaluate_response(args):
try:
skill, user_answer, question = args
if not user_answer:
return skill, 0.0, False
inputs = detector_tokenizer(user_answer, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
logits = detector_model(**inputs).logits
probs = scipy.special.softmax(logits, axis=1).tolist()[0]
is_ai = probs[1] > 0.5
expected_answer = question_to_answer.get(question, "")
user_embedding = universal_model.encode(user_answer, convert_to_tensor=True)
expected_embedding = universal_model.encode(expected_answer, convert_to_tensor=True)
score = util.pytorch_cos_sim(user_embedding, expected_embedding).item() * 100
user_tfidf = tfidf_vectorizer.transform([user_answer]).toarray()[0]
skill_vec = skill_tfidf.get(skill.lower(), np.zeros_like(user_tfidf))
relevance = np.dot(user_tfidf, skill_vec) / (np.linalg.norm(user_tfidf) * np.linalg.norm(skill_vec) + 1e-10)
score *= max(0.5, min(1.0, relevance))
return skill, round(max(0, score), 2), is_ai
except Exception as e:
logger.error(f"Evaluation error for {skill}: {e}")
return skill, 0.0, False
# Improved course recommendation
def recommend_courses(skills_to_improve, user_level, upgrade=False):
try:
if not skills_to_improve or courses_df.empty:
return []
# Add missing columns if needed
if 'popularity' not in courses_df:
courses_df['popularity'] = 0.8
if 'completion_rate' not in courses_df:
courses_df['completion_rate'] = 0.7
skill_embeddings = universal_model.encode(skills_to_improve, convert_to_tensor=True)
course_embeddings = universal_model.encode(courses_df['skills'].fillna(""), convert_to_tensor=True)
similarities = util.pytorch_cos_sim(skill_embeddings, course_embeddings).numpy()
total_scores = 0.6 * similarities + 0.2 * courses_df['popularity'].values + 0.2 * courses_df['completion_rate'].values
recommendations = []
target_level = 'Advanced' if upgrade else user_level
for i, skill in enumerate(skills_to_improve):
idx = np.argsort(-total_scores[i])[:5]
candidates = courses_df.iloc[idx]
candidates = candidates[candidates['level'].str.contains(target_level, case=False)]
recommendations.extend(candidates[['course_title', 'Organization']].values.tolist()[:3])
return list(dict.fromkeys(map(tuple, recommendations)))
except Exception as e:
logger.error(f"Course recommendation error: {e}")
return []
# Enhanced job recommendation
def recommend_jobs(user_skills, user_level):
try:
if jobs_df.empty:
return []
job_field = 'required_skills' if 'required_skills' in jobs_df.columns else 'job_description'
job_embeddings = universal_model.encode(jobs_df[job_field].fillna(""), convert_to_tensor=True)
user_embedding = universal_model.encode(" ".join(user_skills), convert_to_tensor=True)
similarities = util.pytorch_cos_sim(user_embedding, job_embeddings).numpy()[0]
level_scores = jobs_df.get('level', 'Intermediate').apply(
lambda x: 1 - abs({'Beginner':0, 'Intermediate':1, 'Advanced':2}.get(x,1) -
{'Beginner':0, 'Intermediate':1, 'Advanced':2}[user_level])/2
)
total_scores = 0.6 * similarities + 0.4 * level_scores
top_idx = np.argsort(-total_scores)[:5]
return [(jobs_df.iloc[i]['job_title'], jobs_df.iloc[i]['company_name'],
jobs_df.iloc[i].get('location', 'Remote')) for i in top_idx]
except Exception as e:
logger.error(f"Job recommendation error: {e}")
return []
# Flask application setup
app = Flask(__name__)
@app.route('/')
def health_check():
return jsonify({"status": "active", "model_dir": chosen_model_dir})
@app.route('/assess', methods=['POST'])
def assess_skills():
try:
data = request.get_json()
if not data or 'skills' not in data or 'answers' not in data:
return jsonify({"error": "Missing required fields"}), 400
user_skills = [s.strip() for s in data['skills'] if isinstance(s, str)]
answers = [a.strip() for a in data['answers'] if isinstance(a, str)]
user_level = data.get('user_level', 'Intermediate').strip()
if len(answers) != len(user_skills):
return jsonify({"error": "Answers count must match skills count"}), 400
initialize_resources(user_skills)
# Get relevant questions
filtered_questions = questions_df[questions_df['Skill'].str.lower().isin([skill.lower() for skill in user_skills])]
if filtered_questions.empty:
return jsonify({"error": "No matching questions found for the user's skills."}), 500
user_questions = []
for skill in user_skills:
skill_questions = filtered_questions[filtered_questions['Skill'].str.lower() == skill.lower()]
if not skill_questions.empty:
user_questions.append(skill_questions.sample(1).iloc[0])
else:
user_questions.append({
'Skill': skill,
'Question': f"What are the best practices for using {skill} in a production environment?",
'Answer': f"Best practices for {skill} include proper documentation, monitoring, and security measures."
})
user_questions = pd.DataFrame(user_questions).reset_index(drop=True)
if len(user_questions) != len(user_skills):
return jsonify({"error": f"Internal error: Number of selected questions ({len(user_questions)}) does not match number of skills ({len(user_skills)})."}), 500
user_responses = []
for idx, row in user_questions.iterrows():
answer = answers[idx]
if not answer or answer.lower() == 'skip':
user_responses.append((row['Skill'], None, row['Question']))
else:
user_responses.append((row['Skill'], answer, row['Question']))
with Pool(processes=min(cpu_count(), 4)) as pool:
eval_args = [(skill, user_code, question) for skill, user_code, question in user_responses if user_code]
results = pool.map(evaluate_response, eval_args)
user_scores = {}
ai_flags = {}
scores_list = []
skipped_questions = [f"{skill} ({question})" for skill, user_code, question in user_responses if user_code is None]
for skill, score, is_ai in results:
if skill in user_scores:
user_scores[skill] = max(user_scores[skill], score)
ai_flags[skill] = ai_flags[skill] or is_ai
else:
user_scores[skill] = score
ai_flags[skill] = is_ai
scores_list.append(score)
mean_score = np.mean(scores_list) if scores_list else 50
dynamic_threshold = max(40, mean_score)
weak_skills = [skill for skill, score in user_scores.items() if score < dynamic_threshold]
# Generate recommendations
courses = recommend_courses(weak_skills or user_skills, user_level, upgrade=not weak_skills)
jobs = recommend_jobs(user_skills, user_level)
return jsonify({
"assessment_results": {
"skills": [
{
"skill": skill,
"progress": f"{'■' * int(score//10)}{'-' * (10 - int(score//10))}",
"score": f"{score:.2f} %",
"origin": "AI-Generated" if is_ai else "Human-Written"
} for skill, score, is_ai in results
],
"mean_score": mean_score,
"dynamic_threshold": dynamic_threshold,
"weak_skills": weak_skills,
"skipped_questions": skipped_questions
},
"recommended_courses": courses[:3],
"recommended_jobs": jobs[:5]
})
except Exception as e:
logger.error(f"Assessment error: {e}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, threaded=True)