File size: 7,059 Bytes
eb340f9 493967f 24cf5cc 493967f 24cf5cc 493967f 24cf5cc 493967f 24cf5cc 493967f 24cf5cc 493967f eb340f9 493967f c464196 2320627 493967f c464196 eb340f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
import streamlit as st
import pandas as pd
import requests
import re
import tempfile
import shutil
import os
from difflib import SequenceMatcher
import json
from urllib.parse import quote_plus
import base64
import zipfile
from datetime import datetime
import streamlit.components.v1 as components
# -----------zip_and_auto_download helper function--------------
def zip_and_auto_download(file_bytes, inner_filename, zip_prefix="Download"):
"""
Wraps given bytes in a ZIP and provides:
- Fallback button
- Guaranteed auto-download using components.html
"""
zip_filename = f"{zip_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
zip_file_path = tempfile.NamedTemporaryFile(delete=False, suffix=".zip").name
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.writestr(inner_filename, file_bytes)
with open(zip_file_path, "rb") as f:
zip_bytes = f.read()
# Visible fallback
st.download_button(
label=f"π₯ Download {zip_filename}",
data=zip_bytes,
file_name=zip_filename,
mime="application/zip"
)
# Auto download using components.html
b64 = base64.b64encode(zip_bytes).decode()
html = f"""
<html>
<body>
<a id="auto-download-link" href="data:application/zip;base64,{b64}" download="{zip_filename}"></a>
<script>
document.getElementById('auto-download-link').click();
</script>
</body>
</html>
"""
components.html(html, height=0, width=0)
os.remove(zip_file_path)
# -----------------------------------------------
# π§ UTILITY FUNCTIONS
# -----------------------------------------------
def construct_query(row):
"""Constructs the Google search query using applicant data."""
query = str(row['Applicant Name'])
optional_fields = ['Job Title', 'State', 'City', 'Skills']
for field in optional_fields:
if field in row and pd.notna(row[field]):
value = row[field]
query += f" {str(value).strip()}" if str(value).strip() else ""
query += " linkedin"
print(f"[DEBUG] Search Query: {query}")
return query
def get_name_from_url(link):
"""Extracts the name part from a LinkedIn profile URL."""
match = re.search(r'linkedin\.com/in/([a-zA-Z0-9-]+)', link)
if match:
profile_name = match.group(1).replace('-', ' ')
print(f"[DEBUG] Extracted profile name from URL: {profile_name}")
return profile_name
return None
def calculate_similarity(name1, name2):
"""Calculates similarity between two names."""
similarity = SequenceMatcher(None, name1.lower().strip(), name2.lower().strip()).ratio()
print(f"[DEBUG] Similarity between '{name1}' and '{name2}' = {similarity}")
return similarity
# -----------------------------------------------
# π LINKEDIN SCRAPER FUNCTION
# -----------------------------------------------
def fetch_linkedin_links(query, api_key, applicant_name):
"""Fetches LinkedIn profile links using BrightData SERP scraping API."""
try:
print(f"[DEBUG] Sending request to BrightData for query: {query}")
url = "https://api.brightdata.com/request"
google_url = f"https://www.google.com/search?q={quote_plus(query)}"
payload = {
"zone": "serp_api2",
"url": google_url,
"method": "GET",
"country": "us",
"format": "raw",
"data_format": "html"
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
html = response.text
linkedin_regex = r'https://(?:[a-z]{2,3}\.)?linkedin\.com/in/[a-zA-Z0-9\-_/]+'
matches = re.findall(linkedin_regex, html)
print(f"[DEBUG] Found {len(matches)} LinkedIn link(s) in search result")
for link in matches:
profile_name = get_name_from_url(link)
if profile_name:
similarity = calculate_similarity(applicant_name, profile_name)
if similarity >= 0.5:
print(f"[DEBUG] Match found: {link}")
return link
print(f"[DEBUG] No matching LinkedIn profile found for: {applicant_name}")
return None
except Exception as e:
print(f"[ERROR] Error fetching LinkedIn link for query '{query}': {e}")
return None
# -----------------------------------------------
# π PROCESS FILE FUNCTION
# -----------------------------------------------
def process_file(file, api_key):
"""Processes the uploaded Excel file to fetch LinkedIn profile links."""
try:
df = pd.read_excel(file)
print(f"[DEBUG] Input file read successfully. Rows: {len(df)}")
if 'Applicant Name' not in df.columns:
raise ValueError("Missing required column: 'Applicant Name'")
df = df[df['Applicant Name'].notna()]
df = df[df['Applicant Name'].str.strip() != '']
print(f"[DEBUG] Valid applicant rows after filtering: {len(df)}")
df['Search Query'] = df.apply(construct_query, axis=1)
df['LinkedIn Link'] = df.apply(
lambda row: fetch_linkedin_links(row['Search Query'], api_key, row['Applicant Name']),
axis=1
)
temp_dir = tempfile.mkdtemp()
output_file = os.path.join(temp_dir, "updated_with_linkedin_links.csv")
df.to_csv(output_file, index=False)
print(f"[DEBUG] Output written to: {output_file}")
return output_file
except Exception as e:
print(f"[ERROR] Error processing file: {e}")
st.error(f"Error processing file: {e}")
return None
# -----------------------------------------------
# π STREAMLIT INTERFACE
# -----------------------------------------------
st.set_page_config(page_title="LinkedIn Profile Scraper", layout="centered")
st.title("π LinkedIn Profile Link Scraper")
st.markdown("Upload an Excel file with applicant details to fetch best-matching LinkedIn profile links.")
api_key = st.text_input("Enter your BrightData SERP API Key", type="password")
uploaded_file = st.file_uploader("Upload Excel File (.xlsx)", type=["xlsx"])
if uploaded_file and api_key:
st.info("β³ Processing file... This may take a moment.")
output_file = process_file(uploaded_file, api_key)
if output_file:
with open(output_file, "rb") as f:
csv_bytes = f.read()
zip_and_auto_download(
file_bytes=csv_bytes,
inner_filename="updated_with_linkedin_links.csv",
zip_prefix="LinkedIn_Links"
)
shutil.rmtree(os.path.dirname(output_file))
elif not api_key:
st.warning("β οΈ Please enter your BrightData SERP API key to proceed.") |