Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException | |
| import asyncio | |
| from playwright.async_api import async_playwright | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.responses import FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from io import StringIO | |
| from bs4 import BeautifulSoup | |
| import os | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def power_scrapper(url): | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| page = await browser.new_page() | |
| # Block unnecessary resources to speed up loading | |
| await page.route("**/*", lambda route: route.continue_() if route.request.resource_type in ["document", "script"] else route.abort()) | |
| # Open the target website | |
| await page.goto(url, wait_until='domcontentloaded') | |
| # Wait for a short time to ensure dynamic content is loaded | |
| await page.wait_for_timeout(10) | |
| # Extract all links | |
| links = await page.query_selector_all('a') | |
| result = [] | |
| for link in links: | |
| href = await link.get_attribute('href') | |
| result.append({'href': href}) | |
| # Extract all text content | |
| elements = await page.query_selector_all('body *') | |
| for element in elements: | |
| text_content = await element.text_content() | |
| if text_content and text_content.strip(): | |
| result.append({'text': text_content.strip()}) | |
| await browser.close() | |
| return result | |
| def get_links(soup): | |
| links = [] | |
| title = soup.find('title').get_text() | |
| for link in soup.find_all('a'): | |
| href = link.get('href') | |
| links.append(href) | |
| return links | |
| def get_text_content(soup): | |
| text_elements = [] | |
| for tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'span']: | |
| elements = soup.find_all(tag) | |
| for element in elements: | |
| text_elements.append(element.get_text()) | |
| return text_elements | |
| def get_title(soup): | |
| title = "" | |
| title = soup.find('title').get_text() | |
| return title | |
| async def get_data(url: str): | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| response = requests.get(url, headers=headers) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| title = get_title(soup) | |
| links = get_links(soup) | |
| text_content = get_text_content(soup) | |
| if links==[]: | |
| print("running alternative scrapper") | |
| links = await power_scrapper(url) | |
| return ({title": title , "contend":links+text_content}) | |