ElegantSolutions's picture
Update app.py
24cf5cc verified
import streamlit as st
import pandas as pd
import requests
import re
import tempfile
import shutil
import os
from difflib import SequenceMatcher
import json
from urllib.parse import quote_plus
import base64
import zipfile
from datetime import datetime
import streamlit.components.v1 as components
# -----------zip_and_auto_download helper function--------------
def zip_and_auto_download(file_bytes, inner_filename, zip_prefix="Download"):
"""
Wraps given bytes in a ZIP and provides:
- Fallback button
- Guaranteed auto-download using components.html
"""
zip_filename = f"{zip_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
zip_file_path = tempfile.NamedTemporaryFile(delete=False, suffix=".zip").name
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.writestr(inner_filename, file_bytes)
with open(zip_file_path, "rb") as f:
zip_bytes = f.read()
# Visible fallback
st.download_button(
label=f"πŸ“₯ Download {zip_filename}",
data=zip_bytes,
file_name=zip_filename,
mime="application/zip"
)
# Auto download using components.html
b64 = base64.b64encode(zip_bytes).decode()
html = f"""
<html>
<body>
<a id="auto-download-link" href="data:application/zip;base64,{b64}" download="{zip_filename}"></a>
<script>
document.getElementById('auto-download-link').click();
</script>
</body>
</html>
"""
components.html(html, height=0, width=0)
os.remove(zip_file_path)
# -----------------------------------------------
# πŸ”§ UTILITY FUNCTIONS
# -----------------------------------------------
def construct_query(row):
"""Constructs the Google search query using applicant data."""
query = str(row['Applicant Name'])
optional_fields = ['Job Title', 'State', 'City', 'Skills']
for field in optional_fields:
if field in row and pd.notna(row[field]):
value = row[field]
query += f" {str(value).strip()}" if str(value).strip() else ""
query += " linkedin"
print(f"[DEBUG] Search Query: {query}")
return query
def get_name_from_url(link):
"""Extracts the name part from a LinkedIn profile URL."""
match = re.search(r'linkedin\.com/in/([a-zA-Z0-9-]+)', link)
if match:
profile_name = match.group(1).replace('-', ' ')
print(f"[DEBUG] Extracted profile name from URL: {profile_name}")
return profile_name
return None
def calculate_similarity(name1, name2):
"""Calculates similarity between two names."""
similarity = SequenceMatcher(None, name1.lower().strip(), name2.lower().strip()).ratio()
print(f"[DEBUG] Similarity between '{name1}' and '{name2}' = {similarity}")
return similarity
# -----------------------------------------------
# πŸ” LINKEDIN SCRAPER FUNCTION
# -----------------------------------------------
def fetch_linkedin_links(query, api_key, applicant_name):
"""Fetches LinkedIn profile links using BrightData SERP scraping API."""
try:
print(f"[DEBUG] Sending request to BrightData for query: {query}")
url = "https://api.brightdata.com/request"
google_url = f"https://www.google.com/search?q={quote_plus(query)}"
payload = {
"zone": "serp_api2",
"url": google_url,
"method": "GET",
"country": "us",
"format": "raw",
"data_format": "html"
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
html = response.text
linkedin_regex = r'https://(?:[a-z]{2,3}\.)?linkedin\.com/in/[a-zA-Z0-9\-_/]+'
matches = re.findall(linkedin_regex, html)
print(f"[DEBUG] Found {len(matches)} LinkedIn link(s) in search result")
for link in matches:
profile_name = get_name_from_url(link)
if profile_name:
similarity = calculate_similarity(applicant_name, profile_name)
if similarity >= 0.5:
print(f"[DEBUG] Match found: {link}")
return link
print(f"[DEBUG] No matching LinkedIn profile found for: {applicant_name}")
return None
except Exception as e:
print(f"[ERROR] Error fetching LinkedIn link for query '{query}': {e}")
return None
# -----------------------------------------------
# πŸ“‚ PROCESS FILE FUNCTION
# -----------------------------------------------
def process_file(file, api_key):
"""Processes the uploaded Excel file to fetch LinkedIn profile links."""
try:
df = pd.read_excel(file)
print(f"[DEBUG] Input file read successfully. Rows: {len(df)}")
if 'Applicant Name' not in df.columns:
raise ValueError("Missing required column: 'Applicant Name'")
df = df[df['Applicant Name'].notna()]
df = df[df['Applicant Name'].str.strip() != '']
print(f"[DEBUG] Valid applicant rows after filtering: {len(df)}")
df['Search Query'] = df.apply(construct_query, axis=1)
df['LinkedIn Link'] = df.apply(
lambda row: fetch_linkedin_links(row['Search Query'], api_key, row['Applicant Name']),
axis=1
)
temp_dir = tempfile.mkdtemp()
output_file = os.path.join(temp_dir, "updated_with_linkedin_links.csv")
df.to_csv(output_file, index=False)
print(f"[DEBUG] Output written to: {output_file}")
return output_file
except Exception as e:
print(f"[ERROR] Error processing file: {e}")
st.error(f"Error processing file: {e}")
return None
# -----------------------------------------------
# 🌐 STREAMLIT INTERFACE
# -----------------------------------------------
st.set_page_config(page_title="LinkedIn Profile Scraper", layout="centered")
st.title("πŸ”— LinkedIn Profile Link Scraper")
st.markdown("Upload an Excel file with applicant details to fetch best-matching LinkedIn profile links.")
api_key = st.text_input("Enter your BrightData SERP API Key", type="password")
uploaded_file = st.file_uploader("Upload Excel File (.xlsx)", type=["xlsx"])
if uploaded_file and api_key:
st.info("⏳ Processing file... This may take a moment.")
output_file = process_file(uploaded_file, api_key)
if output_file:
with open(output_file, "rb") as f:
csv_bytes = f.read()
zip_and_auto_download(
file_bytes=csv_bytes,
inner_filename="updated_with_linkedin_links.csv",
zip_prefix="LinkedIn_Links"
)
shutil.rmtree(os.path.dirname(output_file))
elif not api_key:
st.warning("⚠️ Please enter your BrightData SERP API key to proceed.")