File size: 3,921 Bytes
0bca85a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import streamlit as st
import requests
import threading
import queue
import time
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor

# Streamlit UI
st.title("Web Crawler - Link Checker")

BASE_URL = st.text_input("Enter the Base URL:", "https://www.example.com/")
MAX_CONCURRENT_THREAD = st.slider("Select Max Concurrent Threads", 1, 20, 10)
LOGGING = st.checkbox("Enable Logging", True)

start_crawling = st.button("Start Crawling")

# Global Variables
relativeLinks = queue.Queue()
visitedLinks = set()

# File Output Data
visited_links = []
ok_links = []
error_links = []
exception_links = []

session = requests.Session()  # Reuse session for connection pooling
session.headers.update({"User-Agent": "Mozilla/5.0"})  # Set a common User-Agent

def link_checker(source, address):
    """Check the validity of a link."""
    try:
        resp = session.get(address, timeout=5)
        if resp.status_code in [400, 404, 403, 408, 409, 501, 502, 503]:
            error_links.append(f"[{source}] {resp.status_code} {resp.reason} --> {address}")
        else:
            ok_links.append(f"[{source}] OK --> {address}")
    except requests.RequestException as e:
        exception_links.append(f"[{source}] Exception --> {e} {address}")

def normalize_url(a):
    """Normalize URLs to absolute format."""
    if a.startswith("#") or a.startswith("javascript:") or a.startswith("mailto:") or a.startswith("tel:"):
        return None
    return urljoin(BASE_URL, a).rstrip('/')

def link_extractor(address):
    """Extract links from a webpage and check their validity."""
    visited_links.append(address)
    try:
        res = session.get(address, timeout=5)
        soup = BeautifulSoup(res.text, 'html.parser')

        extracted_links = []
        for tag, attr in [('img', 'src'), ('link', 'href'), ('script', 'src'), ('a', 'href')]:
            extracted_links.extend(
                normalize_url(link[attr]) for link in soup.find_all(tag) if link.has_attr(attr)
            )

        extracted_links = set(filter(None, extracted_links))  # Remove duplicates and None values

        with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_THREAD) as executor:
            executor.map(lambda p: link_checker(source=address, address=p), extracted_links)

        for p in extracted_links:
            if p.startswith(BASE_URL) and p not in visitedLinks:
                visitedLinks.add(p)
                relativeLinks.put(p)
    except requests.RequestException as e:
        exception_links.append(f"[RELATIVE LINK] Exception --> {e} {address}")

def threader():
    """Worker function for handling queued links."""
    while not relativeLinks.empty():
        value = relativeLinks.get()
        if value:
            print(f"Checking: {value} | Remaining: {relativeLinks.qsize()}")
            link_extractor(value)
        relativeLinks.task_done()

if start_crawling:
    start_time = time.time()
    visitedLinks.add(BASE_URL.strip())
    relativeLinks.put(BASE_URL.strip())

    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_THREAD) as executor:
        executor.map(lambda _: threader(), range(MAX_CONCURRENT_THREAD))

    end_time = time.time()
    elapsed_time = end_time - start_time

    # Display Results
    st.success("Crawling Completed!")
    st.write(f"Total Time Taken: {elapsed_time:.2f} seconds")
    st.write(f"Total Links Visited: {len(visited_links)}")
    st.write(f"Total OK Links: {len(ok_links)}")
    st.write(f"Total Error Links: {len(error_links)}")
    st.write(f"Total Exception Links: {len(exception_links)}")

    # Display Logs
    with st.expander("Visited Links"):
        st.write(visited_links)
    with st.expander("OK Links"):
        st.write(ok_links)
    with st.expander("Error Links"):
        st.write(error_links)
    with st.expander("Exception Links"):
        st.write(exception_links)