|
import streamlit as st |
|
import pandas as pd |
|
import requests |
|
import re |
|
import tempfile |
|
import shutil |
|
import os |
|
from difflib import SequenceMatcher |
|
import json |
|
from urllib.parse import quote_plus |
|
import base64 |
|
import zipfile |
|
from datetime import datetime |
|
import streamlit.components.v1 as components |
|
|
|
|
|
|
|
def zip_and_auto_download(file_bytes, inner_filename, zip_prefix="Download"): |
|
""" |
|
Wraps given bytes in a ZIP and provides: |
|
- Fallback button |
|
- Guaranteed auto-download using components.html |
|
""" |
|
zip_filename = f"{zip_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" |
|
zip_file_path = tempfile.NamedTemporaryFile(delete=False, suffix=".zip").name |
|
|
|
with zipfile.ZipFile(zip_file_path, 'w') as zipf: |
|
zipf.writestr(inner_filename, file_bytes) |
|
|
|
with open(zip_file_path, "rb") as f: |
|
zip_bytes = f.read() |
|
|
|
|
|
st.download_button( |
|
label=f"π₯ Download {zip_filename}", |
|
data=zip_bytes, |
|
file_name=zip_filename, |
|
mime="application/zip" |
|
) |
|
|
|
|
|
b64 = base64.b64encode(zip_bytes).decode() |
|
html = f""" |
|
<html> |
|
<body> |
|
<a id="auto-download-link" href="data:application/zip;base64,{b64}" download="{zip_filename}"></a> |
|
<script> |
|
document.getElementById('auto-download-link').click(); |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
components.html(html, height=0, width=0) |
|
|
|
os.remove(zip_file_path) |
|
|
|
|
|
|
|
|
|
|
|
def construct_query(row): |
|
"""Constructs the Google search query using applicant data.""" |
|
query = str(row['Applicant Name']) |
|
optional_fields = ['Job Title', 'State', 'City', 'Skills'] |
|
|
|
for field in optional_fields: |
|
if field in row and pd.notna(row[field]): |
|
value = row[field] |
|
query += f" {str(value).strip()}" if str(value).strip() else "" |
|
|
|
query += " linkedin" |
|
print(f"[DEBUG] Search Query: {query}") |
|
return query |
|
|
|
def get_name_from_url(link): |
|
"""Extracts the name part from a LinkedIn profile URL.""" |
|
match = re.search(r'linkedin\.com/in/([a-zA-Z0-9-]+)', link) |
|
if match: |
|
profile_name = match.group(1).replace('-', ' ') |
|
print(f"[DEBUG] Extracted profile name from URL: {profile_name}") |
|
return profile_name |
|
return None |
|
|
|
def calculate_similarity(name1, name2): |
|
"""Calculates similarity between two names.""" |
|
similarity = SequenceMatcher(None, name1.lower().strip(), name2.lower().strip()).ratio() |
|
print(f"[DEBUG] Similarity between '{name1}' and '{name2}' = {similarity}") |
|
return similarity |
|
|
|
|
|
|
|
|
|
|
|
def fetch_linkedin_links(query, api_key, applicant_name): |
|
"""Fetches LinkedIn profile links using BrightData SERP scraping API.""" |
|
try: |
|
print(f"[DEBUG] Sending request to BrightData for query: {query}") |
|
url = "https://api.brightdata.com/request" |
|
google_url = f"https://www.google.com/search?q={quote_plus(query)}" |
|
|
|
payload = { |
|
"zone": "serp_api2", |
|
"url": google_url, |
|
"method": "GET", |
|
"country": "us", |
|
"format": "raw", |
|
"data_format": "html" |
|
} |
|
|
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
response = requests.post(url, headers=headers, json=payload) |
|
response.raise_for_status() |
|
html = response.text |
|
|
|
linkedin_regex = r'https://(?:[a-z]{2,3}\.)?linkedin\.com/in/[a-zA-Z0-9\-_/]+' |
|
matches = re.findall(linkedin_regex, html) |
|
print(f"[DEBUG] Found {len(matches)} LinkedIn link(s) in search result") |
|
|
|
for link in matches: |
|
profile_name = get_name_from_url(link) |
|
if profile_name: |
|
similarity = calculate_similarity(applicant_name, profile_name) |
|
if similarity >= 0.5: |
|
print(f"[DEBUG] Match found: {link}") |
|
return link |
|
print(f"[DEBUG] No matching LinkedIn profile found for: {applicant_name}") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Error fetching LinkedIn link for query '{query}': {e}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def process_file(file, api_key): |
|
"""Processes the uploaded Excel file to fetch LinkedIn profile links.""" |
|
try: |
|
df = pd.read_excel(file) |
|
print(f"[DEBUG] Input file read successfully. Rows: {len(df)}") |
|
|
|
if 'Applicant Name' not in df.columns: |
|
raise ValueError("Missing required column: 'Applicant Name'") |
|
|
|
df = df[df['Applicant Name'].notna()] |
|
df = df[df['Applicant Name'].str.strip() != ''] |
|
print(f"[DEBUG] Valid applicant rows after filtering: {len(df)}") |
|
|
|
df['Search Query'] = df.apply(construct_query, axis=1) |
|
df['LinkedIn Link'] = df.apply( |
|
lambda row: fetch_linkedin_links(row['Search Query'], api_key, row['Applicant Name']), |
|
axis=1 |
|
) |
|
|
|
temp_dir = tempfile.mkdtemp() |
|
output_file = os.path.join(temp_dir, "updated_with_linkedin_links.csv") |
|
df.to_csv(output_file, index=False) |
|
print(f"[DEBUG] Output written to: {output_file}") |
|
return output_file |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Error processing file: {e}") |
|
st.error(f"Error processing file: {e}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="LinkedIn Profile Scraper", layout="centered") |
|
st.title("π LinkedIn Profile Link Scraper") |
|
st.markdown("Upload an Excel file with applicant details to fetch best-matching LinkedIn profile links.") |
|
|
|
api_key = st.text_input("Enter your BrightData SERP API Key", type="password") |
|
uploaded_file = st.file_uploader("Upload Excel File (.xlsx)", type=["xlsx"]) |
|
|
|
if uploaded_file and api_key: |
|
st.info("β³ Processing file... This may take a moment.") |
|
output_file = process_file(uploaded_file, api_key) |
|
if output_file: |
|
with open(output_file, "rb") as f: |
|
csv_bytes = f.read() |
|
|
|
zip_and_auto_download( |
|
file_bytes=csv_bytes, |
|
inner_filename="updated_with_linkedin_links.csv", |
|
zip_prefix="LinkedIn_Links" |
|
) |
|
|
|
shutil.rmtree(os.path.dirname(output_file)) |
|
elif not api_key: |
|
st.warning("β οΈ Please enter your BrightData SERP API key to proceed.") |