import pandas as pd import requests from pydantic import BaseModel, Field from typing import List, Tuple, Optional from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate import os from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, Header, Request from fastapi.responses import JSONResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware import json import tempfile import shutil import PyPDF2 from dotenv import load_dotenv import pdfplumber import re from db import * import time import asyncio from contextlib import asynccontextmanager import logging from sqlalchemy.pool import NullPool from cloud_config import * import uuid # Load environment variables load_dotenv() # Configure logging for Cloud Run logging.basicConfig( level=getattr(logging, LOG_LEVEL), format=LOG_FORMAT ) logger = logging.getLogger(__name__) # Global variable to store access token access_token = None # Startup/shutdown events @asynccontextmanager async def lifespan(app: FastAPI): # Startup logger.info("Starting up Job Recommendation API...") # You can initialize connection pools here if needed yield # Shutdown logger.info("Shutting down Job Recommendation API...") # Close any open connections here # Initialize FastAPI app with lifespan app = FastAPI( title="Job Recommendation API", description="API for processing resumes and recommending jobs", lifespan=lifespan ) # Add CORS middleware for cloud deployment app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure based on your needs allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Add request ID middleware for better tracing @app.middleware("http") async def add_request_id(request: Request, call_next): request_id = f"{time.time()}-{request.client.host}" request.state.request_id = request_id # Log the request logger.info(f"Request ID: {request_id} - {request.method} {request.url.path}") try: response = await call_next(request) response.headers["X-Request-ID"] = request_id return response except Exception as e: logger.error(f"Request ID: {request_id} - Error: {str(e)}") raise # Security configuration API_KEY = os.getenv("API_KEY") security = HTTPBearer() def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): """ Verify the API key from the Authorization header """ if not API_KEY: logger.error("API key not configured") raise HTTPException( status_code=500, detail="API key not configured", ) if credentials.credentials != API_KEY: logger.warning("Invalid API key attempt") raise HTTPException( status_code=401, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) return credentials.credentials # Initialize OpenAI client with error handling try: llm = ChatOpenAI( model="gpt-4o-mini", temperature=0, api_key=os.getenv("OPENAI_API_KEY") ) logger.info("OpenAI client initialized successfully") except Exception as e: logger.error(f"Failed to initialize OpenAI client: {e}") raise # Initialize database engine with connection pooling suitable for Cloud Run def get_engine(): """ Get database engine with NullPool for Cloud Run """ try: conn_string = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['dbname']}" # Use NullPool for Cloud Run to avoid connection issues engine = create_engine(conn_string, poolclass=NullPool, pool_pre_ping=True) logger.info("Database engine created successfully") return engine except Exception as e: logger.error(f"Failed to create database engine: {e}") raise # Initialize database engine engine = get_engine() def get_access_token(): """ Get access token for the external API with better error handling """ global access_token # If we already have a token, return it if access_token: return access_token try: login_url = str(os.getenv("login_url")) login_data = { "email": str(os.getenv("email")), "password": str(os.getenv("password")) } login_headers = { 'accept': 'application/json', 'Content-Type': 'application/json' } # Add timeout to prevent hanging login_response = requests.post(login_url, headers=login_headers, json=login_data, timeout=None) if login_response.status_code == 200: login_result = login_response.json() access_token = login_result.get('data', {}).get('tokens', {}).get('accessToken') if access_token: logger.info("Successfully obtained access token") return access_token else: logger.error("Login successful but no access token found in response") return None else: logger.error(f"Login failed with status {login_response.status_code}: {login_response.text}") return None except requests.exceptions.Timeout: logger.error("Login request timed out") return None except requests.exceptions.RequestException as e: logger.error(f"Network error during login: {e}") return None except Exception as e: logger.error(f"Unexpected error getting access token: {e}") return None def generate_smart_hiring_collateral(job_description_text: str) -> tuple[str, str]: """ Generate collateral using the smart-hiring/generate endpoint Returns a tuple of (collateral, job_id) """ try: url = str(os.getenv("smart_hiring_url")) # Generate a unique job ID using UUID job_id = str(uuid.uuid4()) # Prepare headers with authentication headers = { 'accept': 'application/json', 'Authorization': f'Bearer {get_access_token()}' } # Prepare payload payload = { 'job_id': job_id, 'job_description_text': job_description_text } # Make the API request response = requests.post(url, headers=headers, data=payload, timeout=None) if response.status_code == 200: logger.info("Smart hiring collateral generated successfully") # Parse the response to extract smart_hiring_criteria try: response_data = response.json() if response_data.get('success') and 'data' in response_data: smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '') if smart_hiring_criteria: logger.info("Successfully extracted smart hiring criteria") return smart_hiring_criteria, job_id else: logger.warning("No smart_hiring_criteria found in response") return "", job_id else: logger.warning("Invalid response format from smart hiring API") return "", job_id except json.JSONDecodeError as e: logger.error(f"Failed to parse smart hiring response as JSON: {e}") return "", job_id elif response.status_code == 401: logger.warning("Authentication failed for smart hiring, getting fresh token...") global access_token access_token = None # Reset the token new_token = get_access_token() if new_token: headers['Authorization'] = f'Bearer {new_token}' response = requests.post(url, headers=headers, data=payload, timeout=None) if response.status_code == 200: logger.info("Smart hiring collateral generated successfully with fresh token") # Parse the response to extract smart_hiring_criteria try: response_data = response.json() if response_data.get('success') and 'data' in response_data: smart_hiring_criteria = response_data['data'].get('smart_hiring_criteria', '') if smart_hiring_criteria: logger.info("Successfully extracted smart hiring criteria with fresh token") return smart_hiring_criteria, job_id else: logger.warning("No smart_hiring_criteria found in response with fresh token") return "", job_id else: logger.warning("Invalid response format from smart hiring API with fresh token") return "", job_id except json.JSONDecodeError as e: logger.error(f"Failed to parse smart hiring response as JSON with fresh token: {e}") return "", job_id else: logger.error(f"Smart hiring API call failed with status {response.status_code}") return "", job_id else: logger.error("Could not obtain fresh token for smart hiring") return "", job_id else: logger.error(f"Smart hiring API call failed with status {response.status_code}: {response.text}") return "", job_id except requests.exceptions.Timeout: logger.error(f"Smart hiring API request timed out after {EXTERNAL_API_TIMEOUT} seconds") return "", "" except Exception as e: logger.error(f"Exception occurred in smart hiring generation: {str(e)}") return "", "" class structure(BaseModel): name: str = Field(description="Name of the candidate") location: str = Field(description="The location of the candidate. Extract city and state if possible.") skills: List[str] = Field(description="List of individual skills of the candidate") ideal_jobs: str = Field(description="List of ideal jobs for the candidate based on past experience.") email: str = Field(description="The email of the candidate") yoe: str = Field(description="Years of experience of the candidate.") experience: str = Field(description="A brief summary of the candidate's past experience.") industry: str = Field(description="The industry the candidate has experience in.(Tech,Legal,Finance/Accounting,Healthcare,Industrial,Logistics,Telecom,Admin,Other)") class JobAnalysis(BaseModel): job_title: str company_name: str analysis: dict def extract_text_from_pdf(pdf_file_path: str) -> str: """ Extract text from PDF file using multiple methods for better accuracy """ text = "" # Method 1: Try pdfplumber (better for complex layouts) try: with pdfplumber.open(pdf_file_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" if text.strip(): logger.info(f"Successfully extracted text using pdfplumber: {len(text)} characters") return text.strip() except Exception as e: logger.warning(f"pdfplumber failed: {e}") # Method 2: Try PyPDF2 (fallback) try: with open(pdf_file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" if text.strip(): logger.info(f"Successfully extracted text using PyPDF2: {len(text)} characters") return text.strip() except Exception as e: logger.error(f"PyPDF2 failed: {e}") # If both methods fail, return empty string logger.error("Failed to extract text from PDF") return "" def extract_resume_info(resume_text: str) -> structure: """ Extract structured information from resume using LLM """ prompt = ChatPromptTemplate.from_template(""" You are an expert resume parser. Extract the following information from the resume text provided and return it in a structured JSON format. Resume Text: {resume_text} Please extract and structure the information according to the following schema: - name: Full name of the candidate - location: City and state if available, otherwise general location - skills: List of technical skills, tools, technologies, programming languages, etc. - ideal_jobs: Based on their experience, what types of jobs would be ideal for this candidate - email: Email address of the candidate (if found in resume) - yoe: Years of experience (extract from work history) - experience: Brief summary of their work experience and background - industry: Categorize into one of these industries: Tech, Legal, Finance/Accounting, Healthcare, Industrial, Logistics, Telecom, Admin, Other Return ONLY a valid JSON object with these fields. Do not include any other text or explanations. """) try: str_llm = llm.with_structured_output(structure) chain = prompt | str_llm response = chain.invoke({"resume_text": resume_text}) validated_data = { 'name': response.name, 'location': response.location, 'email': response.email, 'skills': response.skills, 'ideal_jobs': response.ideal_jobs, 'yoe': response.yoe, 'experience': response.experience, 'industry': response.industry } logger.info(f"Successfully extracted resume info for: {validated_data['name']}") return validated_data except Exception as e: logger.error(f"Failed to extract resume info: {e}") return { 'name': "Unknown", 'location': "Unknown", 'email': "", 'skills': [], 'ideal_jobs': "Software Engineer", 'yoe': "0", 'experience': "No experience listed", 'industry': "Tech" } def filter_jobs_by_industry(jobs_df: pd.DataFrame, target_industry: str) -> pd.DataFrame: """ Filter jobs by industry """ # Map the extracted industry to database industry values industry_mapping = { 'Tech': ['technology', 'VC Tech'], 'Legal': ['Legal'], 'Finance/Accounting': ['finance/Accounting'], 'Healthcare': ['healthcare'], 'Industrial': ['industrial'], 'Logistics': ['logistics'], 'Telecom': ['telecom'], 'Admin': ['admin'], 'Other': ['Other'] } target_industries = industry_mapping.get(target_industry, ['Tech']) # Filter jobs by industry (using database column name 'industry') filtered_jobs = jobs_df[jobs_df['industry'].isin(target_industries)] logger.info(f"Filtered {len(filtered_jobs)} jobs for industry: {target_industry}") return filtered_jobs def filter_jobs_by_location(jobs_df: pd.DataFrame, candidate_location: str) -> pd.DataFrame: """ Filter jobs by location matching the candidate's location """ if not candidate_location or candidate_location.lower() in ['unknown', 'n/a', '']: logger.info(f"No location info provided, returning all {len(jobs_df)} jobs") return jobs_df # Return all jobs if no location info # Clean and normalize candidate location candidate_location = candidate_location.lower().strip() logger.info(f"Filtering jobs for candidate location: {candidate_location}") # Extract state abbreviations and full names state_mapping = { 'alabama': 'al', 'alaska': 'ak', 'arizona': 'az', 'arkansas': 'ar', 'california': 'ca', 'colorado': 'co', 'connecticut': 'ct', 'delaware': 'de', 'district of columbia': 'dc', 'florida': 'fl', 'georgia': 'ga', 'hawaii': 'hi', 'idaho': 'id', 'illinois': 'il', 'indiana': 'in', 'iowa': 'ia', 'kansas': 'ks', 'kentucky': 'ky', 'louisiana': 'la', 'maine': 'me', 'maryland': 'md', 'massachusetts': 'ma', 'michigan': 'mi', 'minnesota': 'mn', 'mississippi': 'ms', 'missouri': 'mo', 'montana': 'mt', 'nebraska': 'ne', 'nevada': 'nv', 'new hampshire': 'nh', 'new jersey': 'nj', 'new mexico': 'nm', 'new york': 'ny', 'north carolina': 'nc', 'north dakota': 'nd', 'ohio': 'oh', 'oklahoma': 'ok', 'oregon': 'or', 'pennsylvania': 'pa', 'rhode island': 'ri', 'south carolina': 'sc', 'south dakota': 'sd', 'tennessee': 'tn', 'texas': 'tx', 'utah': 'ut', 'vermont': 'vt', 'virginia': 'va', 'washington': 'wa', 'west virginia': 'wv', 'wisconsin': 'wi', 'wyoming': 'wy' } # Create location patterns to match location_patterns = [] # Add the original location location_patterns.append(candidate_location) # Add state variations for state_name, state_abbr in state_mapping.items(): if state_name in candidate_location or state_abbr in candidate_location: location_patterns.extend([state_name, state_abbr]) # Add common city variations (extract city name) city_match = re.search(r'^([^,]+)', candidate_location) if city_match: city_name = city_match.group(1).strip() location_patterns.append(city_name) # Add remote/anywhere patterns if location is remote if 'remote' in candidate_location or 'anywhere' in candidate_location: location_patterns.extend(['remote', 'anywhere', 'work from home', 'wfh']) logger.info(f"Location patterns to match: {location_patterns}") # Filter jobs by location matching_jobs = [] for _, job_row in jobs_df.iterrows(): job_location = str(job_row.get('job_location', '')).lower() # Check if any location pattern matches location_matches = any(pattern in job_location for pattern in location_patterns) # Also check for remote jobs if candidate location includes remote if 'remote' in candidate_location and any(remote_term in job_location for remote_term in ['remote', 'anywhere', 'work from home', 'wfh']): location_matches = True # Check for exact city/state matches if candidate_location in job_location or job_location in candidate_location: location_matches = True if location_matches: matching_jobs.append(job_row) result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df logger.info(f"Found {len(matching_jobs)} jobs matching location out of {len(jobs_df)} total jobs") return result_df def extract_experience_requirement(requirements_text: str) -> dict: """ Extract experience requirements from job requirements text Returns a dictionary with min_years, max_years, and level """ if not requirements_text or pd.isna(requirements_text): return {'min_years': 0, 'max_years': 999, 'level': 'any'} requirements_text = str(requirements_text).lower() # Common experience patterns experience_patterns = [ # Specific year ranges r'(\d+)[\-\+]\s*(\d+)\s*years?\s*experience', r'(\d+)\s*to\s*(\d+)\s*years?\s*experience', r'(\d+)\s*-\s*(\d+)\s*years?\s*experience', # Minimum years r'(\d+)\+?\s*years?\s*experience', r'minimum\s*(\d+)\s*years?\s*experience', r'at\s*least\s*(\d+)\s*years?\s*experience', # Level-based patterns r'(entry\s*level|junior|associate)', r'(mid\s*level|intermediate|mid\s*senior)', r'(senior|lead|principal|staff)', r'(executive|director|vp|chief|c\s*level)', # Specific year mentions r'(\d+)\s*years?\s*in\s*the\s*field', r'(\d+)\s*years?\s*of\s*professional\s*experience', r'(\d+)\s*years?\s*of\s*relevant\s*experience' ] min_years = 0 max_years = 999 level = 'any' # Check for specific year ranges for pattern in experience_patterns[:3]: # First 3 patterns are for ranges matches = re.findall(pattern, requirements_text) if matches: try: min_years = int(matches[0][0]) max_years = int(matches[0][1]) break except (ValueError, IndexError): continue # Check for minimum years if no range found if min_years == 0: for pattern in experience_patterns[3:6]: # Minimum year patterns matches = re.findall(pattern, requirements_text) if matches: try: min_years = int(matches[0]) break except (ValueError, IndexError): continue # Check for level-based requirements for pattern in experience_patterns[6:10]: # Level patterns matches = re.findall(pattern, requirements_text) if matches: level_match = matches[0].lower() if 'entry' in level_match or 'junior' in level_match or 'associate' in level_match: level = 'entry' if min_years == 0: min_years = 0 max_years = 2 elif 'mid' in level_match or 'intermediate' in level_match: level = 'mid' if min_years == 0: min_years = 2 max_years = 5 elif 'senior' in level_match or 'lead' in level_match or 'principal' in level_match or 'staff' in level_match: level = 'senior' if min_years == 0: min_years = 5 max_years = 10 elif 'executive' in level_match or 'director' in level_match or 'vp' in level_match or 'chief' in level_match: level = 'executive' if min_years == 0: min_years = 10 max_years = 999 break # Check for specific year mentions if still no match if min_years == 0: for pattern in experience_patterns[10:]: # Specific year mention patterns matches = re.findall(pattern, requirements_text) if matches: try: min_years = int(matches[0]) max_years = min_years + 2 # Add buffer break except (ValueError, IndexError): continue return { 'min_years': min_years, 'max_years': max_years, 'level': level } def filter_jobs_by_experience(jobs_df: pd.DataFrame, candidate_yoe: str) -> pd.DataFrame: """ Filter jobs by experience level matching the candidate's years of experience """ if not candidate_yoe or candidate_yoe.lower() in ['unknown', 'n/a', '']: logger.info(f"No experience info provided, returning all {len(jobs_df)} jobs") return jobs_df # Extract numeric years from candidate experience try: # Handle various formats like "5 years", "5+ years", "5-7 years", etc. yoe_match = re.search(r'(\d+(?:\.\d+)?)', str(candidate_yoe)) if yoe_match: candidate_years = float(yoe_match.group(1)) else: logger.warning(f"Could not extract years from: {candidate_yoe}") return jobs_df except (ValueError, TypeError): logger.error(f"Invalid experience format: {candidate_yoe}") return jobs_df logger.info(f"Filtering jobs for candidate with {candidate_years} years of experience") # Filter jobs by experience requirements matching_jobs = [] for _, job_row in jobs_df.iterrows(): requirements_text = str(job_row.get('requirements', '')) experience_req = extract_experience_requirement(requirements_text) # Check if candidate's experience matches the job requirements if (candidate_years >= experience_req['min_years'] and candidate_years <= experience_req['max_years']): matching_jobs.append(job_row) result_df = pd.DataFrame(matching_jobs) if matching_jobs else jobs_df logger.info(f"Found {len(matching_jobs)} jobs matching experience out of {len(jobs_df)} total jobs") return result_df def filter_jobs_by_priority(jobs_df: pd.DataFrame) -> pd.DataFrame: """ Filter jobs to only include high priority jobs """ if jobs_df.empty: logger.info("No jobs to filter by priority") return jobs_df # Filter jobs by priority - only include high priority jobs priority_filtered_jobs = jobs_df[jobs_df['priority'].str.lower() == 'high'] logger.info(f"Found {len(priority_filtered_jobs)} high priority jobs out of {len(jobs_df)} total jobs") return priority_filtered_jobs def create_job_description(job_row: pd.Series) -> str: """ Create a comprehensive job description from job data """ description_parts = [] if pd.notna(job_row.get('company_blurb')): description_parts.append(f"Company: {job_row['company_blurb']}") if pd.notna(job_row.get('company_culture')): description_parts.append(f"Company Culture: {job_row['company_culture']}") if pd.notna(job_row.get('description')): description_parts.append(f"Description: {job_row['description']}") if pd.notna(job_row.get('requirements')): description_parts.append(f"Requirements: {job_row['requirements']}") if pd.notna(job_row.get('role_responsibilities')): description_parts.append(f"Role Responsibilities: {job_row['role_responsibilities']}") if pd.notna(job_row.get('job_location')): description_parts.append(f"Location: {job_row['job_location']}") return "\n\n".join(description_parts) def create_jd_smart_hiring(job_row: pd.Series) -> str: """ Create a smart hiring job description from job data """ description_parts = [] if pd.notna(job_row.get('description')): description_parts.append(f"Description: {job_row['description']}") if pd.notna(job_row.get('requirements')): description_parts.append(f"Requirements: {job_row['requirements']}") return "\n\n".join(description_parts) def clean_analysis_result(analysis_result: dict) -> dict: """ Clean up the analysis result to only include final_score and summary """ if not isinstance(analysis_result, dict): return analysis_result # Remove user_context if present if 'user_context' in analysis_result: del analysis_result['user_context'] # Clean up final_response if present if 'final_response' in analysis_result: try: # Handle both string and dict formats if isinstance(analysis_result['final_response'], str): final_response = json.loads(analysis_result['final_response']) else: final_response = analysis_result['final_response'] # Extract and format the evaluation data if 'evaluation' in final_response and len(final_response['evaluation']) > 0: evaluation = final_response['evaluation'][0] # Create a minimal structure with only final_score and summary cleaned_response = { 'final_score': evaluation.get('final_score', 0), 'summary': {} } # Extract summary information if 'summary' in evaluation and len(evaluation['summary']) > 0: summary = evaluation['summary'][0] cleaned_response['summary'] = { 'strengths': summary.get('strengths', []), 'weaknesses': summary.get('weaknesses', []), 'opportunities': summary.get('opportunities', []), 'recommendations': summary.get('recommendations', []) } analysis_result['final_response'] = cleaned_response except (json.JSONDecodeError, KeyError, IndexError) as e: logger.error(f"Error cleaning analysis result: {e}") # Keep original if cleaning fails pass return analysis_result def sort_jobs_by_score(job_analyses: list) -> list: """ Sort jobs by final_score in descending order (highest scores first) """ def extract_score(job_analysis): try: analysis = job_analysis.get('analysis', {}) if 'final_response' in analysis and isinstance(analysis['final_response'], dict): return analysis['final_response'].get('final_score', 0) return 0 except: return 0 return sorted(job_analyses, key=extract_score, reverse=True) async def analyze_job_fit_with_retry(job_description: str, resume_file_path: str, job_row: pd.Series = None, max_retries: int = 3) -> dict: """ Analyze job-candidate fit with retry logic for resilience """ for attempt in range(max_retries): try: result = analyze_job_fit(job_description, resume_file_path, job_row) if "error" not in result: return result # If authentication error and not last attempt, retry if "Authentication failed" in result.get("error", "") and attempt < max_retries - 1: logger.warning(f"Authentication failed, retrying... (attempt {attempt + 1}/{max_retries})") global access_token access_token = None # Reset token to force refresh await asyncio.sleep(2 ** attempt) # Exponential backoff continue # If timeout error and not last attempt, retry with longer timeout if "timed out" in result.get("error", "").lower() and attempt < max_retries - 1: logger.warning(f"Request timed out, retrying with longer timeout... (attempt {attempt + 1}/{max_retries})") await asyncio.sleep(2 ** attempt) # Exponential backoff continue return result except Exception as e: logger.error(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}") if attempt == max_retries - 1: return {"error": f"Failed after {max_retries} attempts: {str(e)}"} await asyncio.sleep(2 ** attempt) def analyze_job_fit(job_description: str, resume_file_path: str, job_row: pd.Series = None) -> dict: """ Analyze job-candidate fit using the external API """ url = str(os.getenv("analyze_url")) # Check if resume file exists if not os.path.exists(resume_file_path): logger.error(f"Resume file not found: {resume_file_path}") return {"error": f"Resume file not found: {resume_file_path}"} # Prepare headers with authentication headers = { 'accept': 'application/json', 'Authorization': f'Bearer {get_access_token()}' } # Prepare form data files = { 'resume': (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf') } data = { 'jd_text': job_description } # Generate collateral if job_row is provided if job_row is not None: try: job_description_text = create_jd_smart_hiring(job_row) if job_description_text: collateral, job_id = generate_smart_hiring_collateral(job_description_text) if collateral: data['collateral'] = collateral data['job_id'] = job_id logger.info(f"Added collateral and job_id ({job_id}) to job fit analysis request") elif job_id: # Even if collateral is empty, we can still use the job_id data['job_id'] = job_id logger.info(f"Added job_id ({job_id}) to job fit analysis request (no collateral)") except Exception as e: logger.warning(f"Failed to generate collateral: {e}") # Continue without collateral if generation fails try: # Make the API request with configured timeout response = requests.post(url, headers=headers, files=files, data=data, timeout=None) # If we get an authentication error, try to get a fresh token and retry once if response.status_code == 401: logger.warning("Authentication failed, getting fresh token...") global access_token access_token = None # Reset the token new_token = get_access_token() if new_token: headers['Authorization'] = f'Bearer {new_token}' # Close the previous file and reopen files['resume'][1].close() files['resume'] = (os.path.basename(resume_file_path), open(resume_file_path, 'rb'), 'application/pdf') response = requests.post(url, headers=headers, files=files, data=data, timeout=None) else: # If we can't get a fresh token, return error return {"error": "Authentication failed and could not obtain fresh token"} if response.status_code == 200: logger.info("Job fit analysis completed successfully") return response.json() elif response.status_code == 401: # If we still get 401 after fresh token, return error return {"error": "Authentication failed even with fresh token"} else: logger.error(f"API call failed with status {response.status_code}") return {"error": f"API call failed with status {response.status_code}", "details": response.text} except requests.exceptions.Timeout: logger.error(f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds") return {"error": f"API request timed out after {EXTERNAL_API_TIMEOUT} seconds"} except Exception as e: logger.error(f"Exception occurred: {str(e)}") return {"error": f"Exception occurred: {str(e)}"} finally: # Ensure the file is closed if 'resume' in files: try: files['resume'][1].close() except: pass @app.post("/process_resume_and_recommend_jobs") async def process_resume_and_recommend_jobs( resume: UploadFile = File(...), resume_text: str = Form(""), api_key: str = Depends(verify_api_key) ): """ Process resume, extract information, filter jobs by industry, and analyze fit """ request_start_time = time.time() try: logger.info(f"Processing resume: {resume.filename}") # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: shutil.copyfileobj(resume.file, tmp_file) tmp_file_path = tmp_file.name try: # Extract text from PDF if no resume_text provided if not resume_text: resume_text = extract_text_from_pdf(tmp_file_path) if not resume_text: logger.error("Could not extract text from PDF file") return JSONResponse( status_code=400, content={"error": "Could not extract text from PDF file"} ) # Extract resume information using LLM resume_info = extract_resume_info(resume_text) # Load jobs data from PostgreSQL database try: jobs_df = pd.read_sql_table("jobs", con=engine) candidates_df = pd.read_sql_table("candidates", con=engine) submissions_df = pd.read_sql_table("candidate_submissions", con=engine) logger.info(f"Loaded {len(jobs_df)} jobs, {len(candidates_df)} candidates, {len(submissions_df)} submissions") except Exception as db_error: logger.error(f"Database error: {db_error}") return JSONResponse( status_code=500, content={"error": "Database connection error"} ) # Filter jobs by industry filtered_jobs = filter_jobs_by_industry(jobs_df, resume_info['industry']) if filtered_jobs.empty: logger.warning(f"No jobs found for industry: {resume_info['industry']}") return JSONResponse( status_code=404, content={"message": f"No jobs found for industry: {resume_info['industry']}"} ) # Filter jobs by location location_filtered_jobs = filter_jobs_by_location(filtered_jobs, resume_info['location']) # Filter jobs by experience level experience_filtered_jobs = filter_jobs_by_experience(location_filtered_jobs, resume_info['yoe']) # Filter jobs by priority priority_filtered_jobs = filter_jobs_by_priority(experience_filtered_jobs) # Use priority filtered jobs if available, otherwise fall back to experience filtered jobs, then location filtered jobs if not priority_filtered_jobs.empty: jobs_to_analyze = priority_filtered_jobs elif not experience_filtered_jobs.empty: jobs_to_analyze = experience_filtered_jobs else: jobs_to_analyze = location_filtered_jobs # Create filtered_submission_df with job_ids from jobs_to_analyze job_ids_to_analyze = jobs_to_analyze['id'].tolist() filtered_submission_df = submissions_df[submissions_df['jobId'].isin(job_ids_to_analyze)] # Check if candidate email exists in candidates_df candidate_id = None if resume_info.get('email'): candidate_match = candidates_df[candidates_df['email'] == resume_info['email']] if not candidate_match.empty: candidate_id = candidate_match.iloc[0]['id'] logger.info(f"Found existing candidate with ID: {candidate_id}") # Analyze job fit for each filtered job job_analyses = [] # Use configured number of jobs to analyze for _, job_row in jobs_to_analyze.head(MAX_JOBS_TO_ANALYZE).iterrows(): job_id = job_row.get('id') # Check if we have an existing submission for this candidate and job existing_submission = None if candidate_id and job_id: submission_match = filtered_submission_df[ (filtered_submission_df['candidate_id'] == candidate_id) & (filtered_submission_df['jobId'] == job_id) ] if not submission_match.empty: existing_submission = submission_match.iloc[0] logger.info(f"Found existing submission for job_id: {job_id}, candidate_id: {candidate_id}") if existing_submission is not None: # Use existing fit score from submission fit_score = existing_submission.get('fit_score', 0) existing_analysis = { 'final_response': { 'final_score': fit_score, 'summary': { 'strengths': [], 'weaknesses': [], 'opportunities': [], 'recommendations': [] } }, 'source': 'existing_submission' } analysis_result = existing_analysis else: # Call API for new analysis with retry logic job_description = create_job_description(job_row) analysis_result = await analyze_job_fit_with_retry(job_description, tmp_file_path, job_row) analysis_result['source'] = 'api_call' # Clean up the analysis result cleaned_analysis = clean_analysis_result(analysis_result) job_analysis = JobAnalysis( job_title=job_row.get('job_title', 'Unknown'), company_name=job_row.get('company_name', 'Unknown'), analysis=cleaned_analysis ) job_analyses.append(job_analysis.dict()) # Sort jobs by final_score in descending order (highest scores first) job_analyses = sort_jobs_by_score(job_analyses) # Count existing submissions vs API calls existing_submissions_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'existing_submission') api_calls_count = sum(1 for analysis in job_analyses if analysis.get('analysis', {}).get('source') == 'api_call') # Clean up temporary file os.unlink(tmp_file_path) # Calculate processing time processing_time = time.time() - request_start_time logger.info(f"Request completed in {processing_time:.2f} seconds") return { "resume_info": resume_info, "industry": resume_info['industry'], "location": resume_info['location'], "experience_years": resume_info['yoe'], "jobs_analyzed": len(job_analyses), "location_filtered": not location_filtered_jobs.empty, "experience_filtered": not experience_filtered_jobs.empty, "priority_filtered": not priority_filtered_jobs.empty, "existing_submissions_used": existing_submissions_count, "api_calls_made": api_calls_count, "candidate_found": candidate_id is not None, "processing_time_seconds": round(processing_time, 2), "job_analyses": job_analyses } except Exception as e: # Clean up temporary file in case of error if os.path.exists(tmp_file_path): os.unlink(tmp_file_path) raise e except Exception as e: logger.error(f"Processing failed: {str(e)}", exc_info=True) return JSONResponse( status_code=500, content={"error": f"Processing failed: {str(e)}"} ) @app.get("/health") async def health_check(api_key: str = Depends(verify_api_key)): """ Health check endpoint with database connectivity check """ health_status = { "status": "healthy", "message": "Job Recommendation API is running", "timestamp": time.time() } # Check database connectivity try: with engine.connect() as conn: result = conn.execute(text("SELECT 1")) health_status["database"] = "connected" except Exception as e: logger.error(f"Database health check failed: {e}") health_status["database"] = "disconnected" health_status["status"] = "degraded" return health_status @app.get("/") async def root(): """ Root endpoint """ return { "message": "Job Recommendation API", "version": "1.0.0", "docs": "/docs", "health": "/health" } if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 8080)) logger.info(f"Starting server on port {port}") uvicorn.run(app, host="0.0.0.0", port=port)