Spaces:
Running
Running
import streamlit as st | |
import requests | |
import threading | |
import queue | |
import time | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from concurrent.futures import ThreadPoolExecutor | |
# Streamlit UI | |
st.title("Web Crawler - Link Checker") | |
BASE_URL = st.text_input("Enter the Base URL:", "https://www.example.com/") | |
MAX_CONCURRENT_THREAD = st.slider("Select Max Concurrent Threads", 1, 20, 10) | |
LOGGING = st.checkbox("Enable Logging", True) | |
start_crawling = st.button("Start Crawling") | |
# Global Variables | |
relativeLinks = queue.Queue() | |
visitedLinks = set() | |
# File Output Data | |
visited_links = [] | |
ok_links = [] | |
error_links = [] | |
exception_links = [] | |
session = requests.Session() # Reuse session for connection pooling | |
session.headers.update({"User-Agent": "Mozilla/5.0"}) # Set a common User-Agent | |
def link_checker(source, address): | |
"""Check the validity of a link.""" | |
try: | |
resp = session.get(address, timeout=5) | |
if resp.status_code in [400, 404, 403, 408, 409, 501, 502, 503]: | |
error_links.append(f"[{source}] {resp.status_code} {resp.reason} --> {address}") | |
else: | |
ok_links.append(f"[{source}] OK --> {address}") | |
except requests.RequestException as e: | |
exception_links.append(f"[{source}] Exception --> {e} {address}") | |
def normalize_url(a): | |
"""Normalize URLs to absolute format.""" | |
if a.startswith("#") or a.startswith("javascript:") or a.startswith("mailto:") or a.startswith("tel:"): | |
return None | |
return urljoin(BASE_URL, a).rstrip('/') | |
def link_extractor(address): | |
"""Extract links from a webpage and check their validity.""" | |
visited_links.append(address) | |
try: | |
res = session.get(address, timeout=5) | |
soup = BeautifulSoup(res.text, 'html.parser') | |
extracted_links = [] | |
for tag, attr in [('img', 'src'), ('link', 'href'), ('script', 'src'), ('a', 'href')]: | |
extracted_links.extend( | |
normalize_url(link[attr]) for link in soup.find_all(tag) if link.has_attr(attr) | |
) | |
extracted_links = set(filter(None, extracted_links)) # Remove duplicates and None values | |
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_THREAD) as executor: | |
executor.map(lambda p: link_checker(source=address, address=p), extracted_links) | |
for p in extracted_links: | |
if p.startswith(BASE_URL) and p not in visitedLinks: | |
visitedLinks.add(p) | |
relativeLinks.put(p) | |
except requests.RequestException as e: | |
exception_links.append(f"[RELATIVE LINK] Exception --> {e} {address}") | |
def threader(): | |
"""Worker function for handling queued links.""" | |
while not relativeLinks.empty(): | |
value = relativeLinks.get() | |
if value: | |
print(f"Checking: {value} | Remaining: {relativeLinks.qsize()}") | |
link_extractor(value) | |
relativeLinks.task_done() | |
if start_crawling: | |
start_time = time.time() | |
visitedLinks.add(BASE_URL.strip()) | |
relativeLinks.put(BASE_URL.strip()) | |
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_THREAD) as executor: | |
executor.map(lambda _: threader(), range(MAX_CONCURRENT_THREAD)) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
# Display Results | |
st.success("Crawling Completed!") | |
st.write(f"Total Time Taken: {elapsed_time:.2f} seconds") | |
st.write(f"Total Links Visited: {len(visited_links)}") | |
st.write(f"Total OK Links: {len(ok_links)}") | |
st.write(f"Total Error Links: {len(error_links)}") | |
st.write(f"Total Exception Links: {len(exception_links)}") | |
# Display Logs | |
with st.expander("Visited Links"): | |
st.write(visited_links) | |
with st.expander("OK Links"): | |
st.write(ok_links) | |
with st.expander("Error Links"): | |
st.write(error_links) | |
with st.expander("Exception Links"): | |
st.write(exception_links) | |