import requests from bs4 import BeautifulSoup import time import random import logging import sqlite3 from fake_useragent import UserAgent from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser from concurrent.futures import ThreadPoolExecutor import gradio as gr from threading import Lock # User agent initialization ua = UserAgent() google_bot_ua = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" # Domain access times for rate-limiting domain_access_times = {} # Thread-safe visited URLs set visited = set() visited_lock = Lock() # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) # Save to file def save_to_file(filename, data): with open(filename, "a") as file: file.write(data + "\n") # Save to database def save_to_database(url, keywords_matched): conn = sqlite3.connect("crawler.db") cursor = conn.cursor() cursor.execute("CREATE TABLE IF NOT EXISTS results (url TEXT, keywords TEXT)") cursor.execute("INSERT INTO results (url, keywords) VALUES (?, ?)", (url, ",".join(keywords_matched))) conn.commit() conn.close() # Get delay for rate-limiting def get_delay(domain): now = time.time() if domain in domain_access_times: elapsed = now - domain_access_times[domain] delay = max(0, 5 - elapsed) else: delay = 0 domain_access_times[domain] = now return delay # Can crawl based on robots.txt def can_crawl(url, user_agent): parsed_url = "/".join(url.split("/")[:3]) + "/robots.txt" rp = RobotFileParser() rp.set_url(parsed_url) try: rp.read() except Exception: return True # Assume crawlable if robots.txt cannot be fetched return rp.can_fetch(user_agent, url) # Crawl function def crawl(url, keywords, depth): if depth <= 0: return "" with visited_lock: if url in visited: return "" visited.add(url) domain = urlparse(url).netloc time.sleep(get_delay(domain)) if not can_crawl(url, google_bot_ua): logging.warning(f"Blocked by robots.txt: {url}") return "" for attempt in range(3): # Retry up to 3 times try: user_agent = google_bot_ua if random.random() < 0.2 else ua.random headers = {"User-Agent": user_agent, "Referer": "https://www.google.com"} response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() break except requests.exceptions.RequestException as e: if attempt == 2: logging.error(f"Failed after retries: {url} - {e}") return "" time.sleep(2 ** attempt) # Exponential backoff save_to_file("found.txt", url) logging.info(f"Crawled: {url}") soup = BeautifulSoup(response.text, "html.parser") # Check for keywords text = soup.get_text().lower() keywords_matched = [kw for kw in keywords if kw.lower() in text] if keywords_matched: save_to_file("keywords_found.txt", url) save_to_database(url, keywords_matched) logging.info(f"Keywords found in {url}: {keywords_matched}") # Find and crawl links for link in soup.find_all("a", href=True): next_url = urljoin(url, link["href"]) if next_url.startswith("http"): crawl(next_url, keywords, depth - 1) return f"Crawled: {url}, Keywords: {keywords_matched}" # Gradio interface function def gradio_crawl(start_url, keywords, depth): keywords_list = keywords.split(',') result = crawl(start_url, keywords_list, depth) return result # Gradio UI setup iface = gr.Interface( fn=gradio_crawl, inputs=[ gr.Textbox(label="Starting URL", placeholder="Enter the starting URL"), gr.Textbox(label="Keywords (comma-separated)", placeholder="Enter keywords to search for"), gr.Slider(label="Crawl Depth", minimum=1, maximum=5, step=1, value=3) ], outputs="text", live=True, title="Web Crawler", description="A simple web crawler that searches for keywords in websites." ) # Launch the Gradio app iface.launch()