import streamlit as st import pandas as pd import requests import re import tempfile import shutil import os from difflib import SequenceMatcher import json from urllib.parse import quote_plus import base64 import zipfile from datetime import datetime import streamlit.components.v1 as components # -----------zip_and_auto_download helper function-------------- def zip_and_auto_download(file_bytes, inner_filename, zip_prefix="Download"): """ Wraps given bytes in a ZIP and provides: - Fallback button - Guaranteed auto-download using components.html """ zip_filename = f"{zip_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" zip_file_path = tempfile.NamedTemporaryFile(delete=False, suffix=".zip").name with zipfile.ZipFile(zip_file_path, 'w') as zipf: zipf.writestr(inner_filename, file_bytes) with open(zip_file_path, "rb") as f: zip_bytes = f.read() # Visible fallback st.download_button( label=f"📥 Download {zip_filename}", data=zip_bytes, file_name=zip_filename, mime="application/zip" ) # Auto download using components.html b64 = base64.b64encode(zip_bytes).decode() html = f"""
""" components.html(html, height=0, width=0) os.remove(zip_file_path) # ----------------------------------------------- # 🔧 UTILITY FUNCTIONS # ----------------------------------------------- def construct_query(row): """Constructs the Google search query using applicant data.""" query = str(row['Applicant Name']) optional_fields = ['Job Title', 'State', 'City', 'Skills'] for field in optional_fields: if field in row and pd.notna(row[field]): value = row[field] query += f" {str(value).strip()}" if str(value).strip() else "" query += " linkedin" print(f"[DEBUG] Search Query: {query}") return query def get_name_from_url(link): """Extracts the name part from a LinkedIn profile URL.""" match = re.search(r'linkedin\.com/in/([a-zA-Z0-9-]+)', link) if match: profile_name = match.group(1).replace('-', ' ') print(f"[DEBUG] Extracted profile name from URL: {profile_name}") return profile_name return None def calculate_similarity(name1, name2): """Calculates similarity between two names.""" similarity = SequenceMatcher(None, name1.lower().strip(), name2.lower().strip()).ratio() print(f"[DEBUG] Similarity between '{name1}' and '{name2}' = {similarity}") return similarity # ----------------------------------------------- # 🔍 LINKEDIN SCRAPER FUNCTION # ----------------------------------------------- def fetch_linkedin_links(query, api_key, applicant_name): """Fetches LinkedIn profile links using BrightData SERP scraping API.""" try: print(f"[DEBUG] Sending request to BrightData for query: {query}") url = "https://api.brightdata.com/request" google_url = f"https://www.google.com/search?q={quote_plus(query)}" payload = { "zone": "serp_api2", "url": google_url, "method": "GET", "country": "us", "format": "raw", "data_format": "html" } headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } response = requests.post(url, headers=headers, json=payload) response.raise_for_status() html = response.text linkedin_regex = r'https://(?:[a-z]{2,3}\.)?linkedin\.com/in/[a-zA-Z0-9\-_/]+' matches = re.findall(linkedin_regex, html) print(f"[DEBUG] Found {len(matches)} LinkedIn link(s) in search result") for link in matches: profile_name = get_name_from_url(link) if profile_name: similarity = calculate_similarity(applicant_name, profile_name) if similarity >= 0.5: print(f"[DEBUG] Match found: {link}") return link print(f"[DEBUG] No matching LinkedIn profile found for: {applicant_name}") return None except Exception as e: print(f"[ERROR] Error fetching LinkedIn link for query '{query}': {e}") return None # ----------------------------------------------- # 📂 PROCESS FILE FUNCTION # ----------------------------------------------- def process_file(file, api_key): """Processes the uploaded Excel file to fetch LinkedIn profile links.""" try: df = pd.read_excel(file) print(f"[DEBUG] Input file read successfully. Rows: {len(df)}") if 'Applicant Name' not in df.columns: raise ValueError("Missing required column: 'Applicant Name'") df = df[df['Applicant Name'].notna()] df = df[df['Applicant Name'].str.strip() != ''] print(f"[DEBUG] Valid applicant rows after filtering: {len(df)}") df['Search Query'] = df.apply(construct_query, axis=1) df['LinkedIn Link'] = df.apply( lambda row: fetch_linkedin_links(row['Search Query'], api_key, row['Applicant Name']), axis=1 ) temp_dir = tempfile.mkdtemp() output_file = os.path.join(temp_dir, "updated_with_linkedin_links.csv") df.to_csv(output_file, index=False) print(f"[DEBUG] Output written to: {output_file}") return output_file except Exception as e: print(f"[ERROR] Error processing file: {e}") st.error(f"Error processing file: {e}") return None # ----------------------------------------------- # 🌐 STREAMLIT INTERFACE # ----------------------------------------------- st.set_page_config(page_title="LinkedIn Profile Scraper", layout="centered") st.title("🔗 LinkedIn Profile Link Scraper") st.markdown("Upload an Excel file with applicant details to fetch best-matching LinkedIn profile links.") api_key = st.text_input("Enter your BrightData SERP API Key", type="password") uploaded_file = st.file_uploader("Upload Excel File (.xlsx)", type=["xlsx"]) if uploaded_file and api_key: st.info("⏳ Processing file... This may take a moment.") output_file = process_file(uploaded_file, api_key) if output_file: with open(output_file, "rb") as f: csv_bytes = f.read() zip_and_auto_download( file_bytes=csv_bytes, inner_filename="updated_with_linkedin_links.csv", zip_prefix="LinkedIn_Links" ) shutil.rmtree(os.path.dirname(output_file)) elif not api_key: st.warning("⚠️ Please enter your BrightData SERP API key to proceed.")