Spaces:
Sleeping
Sleeping
File size: 4,811 Bytes
1beeec8 7095423 229fe87 1beeec8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import requests
from bs4 import BeautifulSoup
import time
import random
import logging
import sqlite3
from fake_useragent import UserAgent
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
from concurrent.futures import ThreadPoolExecutor
import gradio as gr
from threading import Lock
# User agent initialization
ua = UserAgent()
google_bot_ua = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
# Domain access times for rate-limiting
domain_access_times = {}
# Thread-safe visited URLs set
visited = set()
visited_lock = Lock()
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# Save to file
def save_to_file(filename, data):
with open(filename, "a") as file:
file.write(data + "\n")
# Save to database
def save_to_database(url, keywords_matched):
conn = sqlite3.connect("crawler.db")
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS results (url TEXT, keywords TEXT)")
cursor.execute("INSERT INTO results (url, keywords) VALUES (?, ?)", (url, ",".join(keywords_matched)))
conn.commit()
conn.close()
# Get delay for rate-limiting
def get_delay(domain):
now = time.time()
if domain in domain_access_times:
elapsed = now - domain_access_times[domain]
delay = max(0, 5 - elapsed)
else:
delay = 0
domain_access_times[domain] = now
return delay
# Can crawl based on robots.txt
def can_crawl(url, user_agent):
parsed_url = "/".join(url.split("/")[:3]) + "/robots.txt"
rp = RobotFileParser()
rp.set_url(parsed_url)
try:
rp.read()
except Exception:
return True # Assume crawlable if robots.txt cannot be fetched
return rp.can_fetch(user_agent, url)
# Crawl function
def crawl(url, keywords, depth, found_urls, keywords_found):
if depth <= 0:
return "", ""
with visited_lock:
if url in visited:
return found_urls, keywords_found
visited.add(url)
domain = urlparse(url).netloc
time.sleep(get_delay(domain))
if not can_crawl(url, google_bot_ua):
logging.warning(f"Blocked by robots.txt: {url}")
return found_urls, keywords_found
for attempt in range(3): # Retry up to 3 times
try:
user_agent = google_bot_ua if random.random() < 0.2 else ua.random
headers = {"User-Agent": user_agent, "Referer": "https://www.google.com"}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt == 2:
logging.error(f"Failed after retries: {url} - {e}")
return found_urls, keywords_found
time.sleep(2 ** attempt) # Exponential backoff
save_to_file("found.txt", url)
logging.info(f"Crawled: {url}")
found_urls += f"{url}\n"
soup = BeautifulSoup(response.text, "html.parser")
# Check for keywords
text = soup.get_text().lower()
keywords_matched = [kw for kw in keywords if kw.lower() in text]
if keywords_matched:
save_to_file("keywords_found.txt", url)
save_to_database(url, keywords_matched)
logging.info(f"Keywords found in {url}: {keywords_matched}")
keywords_found += f"{url} - Keywords: {', '.join(keywords_matched)}\n"
# Find and crawl links
for link in soup.find_all("a", href=True):
next_url = urljoin(url, link["href"])
if next_url.startswith("http"):
found_urls, keywords_found = crawl(next_url, keywords, depth - 1, found_urls, keywords_found)
return found_urls, keywords_found
# Gradio interface function
def gradio_crawl(start_url, keywords, depth):
keywords_list = keywords.split(',')
found_urls = ""
keywords_found = ""
found_urls, keywords_found = crawl(start_url, keywords_list, depth, found_urls, keywords_found)
return found_urls, keywords_found
# Gradio UI setup
iface = gr.Interface(
fn=gradio_crawl,
inputs=[
gr.Textbox(label="Starting URL", placeholder="Enter the starting URL"),
gr.Textbox(label="Keywords (comma-separated)", placeholder="Enter keywords to search for"),
gr.Slider(label="Crawl Depth", minimum=1, maximum=5, step=1, value=3)
],
outputs=[
gr.Textbox(label="Crawled URLs", lines=10, placeholder="Found URLs will be shown here...", interactive=False),
gr.Textbox(label="Keywords Found", lines=10, placeholder="URLs with matching keywords will be shown here...", interactive=False)
],
live=True,
title="WebCrawlPLUS",
description="A webcrawler from SynAckNetwork.com"
)
# Launch the Gradio app
iface.launch() |