|
import asyncio |
|
from contextlib import asynccontextmanager |
|
import logging |
|
from typing import Literal, Optional |
|
from fastapi import FastAPI, APIRouter |
|
from httpx import AsyncClient |
|
from pydantic import BaseModel, Field |
|
import uvicorn |
|
from playwright.async_api import async_playwright, Browser |
|
|
|
from scrap.base import ScrapperBackendBase |
|
from scrap.gpatents import GpatentsScrapBackend |
|
from serp.base import SERPBackendBase, SerpQuery, SerpResultItem, get_backends_doc, query_serp_backend |
|
from serp.arxiv import ArxivSerpBackend |
|
from serp.bing import BingSerpBackend |
|
from serp.duckduckgo import DuckDuckGoSerpBackend |
|
from serp.gpatents import GPatentsSerpBackend |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
|
|
|
|
playwright = None |
|
pw_browser: Optional[Browser] = None |
|
|
|
|
|
http_client = AsyncClient() |
|
|
|
|
|
@asynccontextmanager |
|
async def api_lifespan(app: FastAPI): |
|
"""Lifespan context manager for FastAPI to manage Playwright browser instance.""" |
|
global playwright, pw_browser |
|
playwright = await async_playwright().start() |
|
pw_browser = await playwright.chromium.launch(headless=False) |
|
|
|
yield |
|
|
|
await pw_browser.close() |
|
await playwright.stop() |
|
|
|
app = FastAPI(lifespan=api_lifespan) |
|
serp_router = APIRouter(prefix="/serp", tags=["SERP"]) |
|
scrap_router = APIRouter(prefix="/scrap", tags=["scrapping"]) |
|
|
|
|
|
SERP_BACKENDS = {b.name: b for b in [DuckDuckGoSerpBackend(), |
|
BingSerpBackend(), |
|
ArxivSerpBackend(), |
|
GPatentsSerpBackend()]} |
|
|
|
|
|
SCRAP_BACKENDS = {b.content_type: b for b in [ |
|
GpatentsScrapBackend() |
|
]} |
|
|
|
app.description = get_backends_doc(SERP_BACKENDS.values()) |
|
|
|
|
|
|
|
|
|
class SerpRequest(BaseModel): |
|
"""Model for a single-query SERP search""" |
|
query: SerpQuery = Field(..., description="The query to perform") |
|
backend: str = Field(..., description="The backend to use for search.") |
|
|
|
|
|
class SerpResponse(BaseModel): |
|
"""Model for single-query SERP search results""" |
|
error: Optional[str] |
|
backend: Optional[str] |
|
results: Optional[list[SerpResultItem]] = Field( |
|
None, description="List of search results for the query") |
|
|
|
|
|
class SerpBulkRequest(BaseModel): |
|
"""A bulk request with many queries""" |
|
queries: list[SerpQuery] = Field(..., |
|
description="Lists of queries to perform") |
|
backend: str = Field(..., |
|
description="The backend to use for the bulk search.") |
|
|
|
|
|
class SerpBulkResultItem(BaseModel): |
|
"""Intermediate result item for bulk results""" |
|
query: str |
|
error: Optional[str] |
|
backend: Optional[str] |
|
results: Optional[list[SerpResultItem]] |
|
|
|
@classmethod |
|
def from_err(cls, q: SerpQuery, err: Exception, backend: str): |
|
return SerpBulkResultItem(query=q.query, error=str(err), results=None, backend=backend) |
|
|
|
@classmethod |
|
def from_results(cls, q: SerpQuery, results: list[SerpResultItem], backend: str): |
|
return SerpBulkResultItem(query=q.query, error=None, results=results, backend=backend) |
|
|
|
|
|
class SerpBulkResponse(BaseModel): |
|
"""Response to a bulk query""" |
|
queries: list[SerpBulkResultItem] |
|
|
|
|
|
class SerpAutoSearchRequest(BaseModel): |
|
query: SerpQuery |
|
category: Literal["general", "patent", "scholar"] = "general" |
|
|
|
|
|
class SerpAutoBulkSearchRequest(BaseModel): |
|
"""A auto-bulk request with many queries""" |
|
queries: list[SerpQuery] |
|
category: Literal["general", "patent", "scholar"] = "general" |
|
|
|
|
|
|
|
|
|
@serp_router.post("/search") |
|
async def search(req: SerpRequest) -> SerpResponse: |
|
"""Performs a single SERP search against the given backend with the given query.""" |
|
|
|
|
|
backend: SERPBackendBase = SERP_BACKENDS.get(req.backend) |
|
|
|
if backend: |
|
try: |
|
results = await query_serp_backend(backend, req.query, http_client, pw_browser) |
|
return SerpResponse(error=None, results=results, backend=backend.name) |
|
except Exception as e: |
|
logging.warning(f"Error while querying {backend.name}", e) |
|
return SerpResponse(error=str(e), results=[], backend=backend.name) |
|
|
|
return SerpResponse(error="No backend with the given backend name was found.", backend=req.backend, results=None) |
|
|
|
|
|
@serp_router.post("/search/bulk") |
|
async def search_bulk(req: SerpBulkRequest) -> SerpBulkResponse: |
|
"""Performs a bulk SERP search against the given backend with the given queries.""" |
|
|
|
|
|
backend: SERPBackendBase = SERP_BACKENDS.get(req.backend) |
|
|
|
if backend: |
|
logging.info( |
|
f"Bulk querying {backend.name} with queries: {req.queries}") |
|
|
|
results = await asyncio.gather(*[query_serp_backend(backend, q, http_client, pw_browser) for q in req.queries], return_exceptions=True) |
|
|
|
for r in results: |
|
if isinstance(r, Exception): |
|
logging.warning( |
|
f"Exception occured while querying {backend.name}", r) |
|
|
|
result_sections = [SerpBulkResultItem.from_err(q, r, backend.name) |
|
if isinstance(r, Exception) else SerpBulkResultItem.from_results(q, r, backend.name) for r, q in zip(results, req.queries)] |
|
|
|
return SerpBulkResponse(queries=result_sections) |
|
|
|
return SerpResponse(error="No backend with the given backend name was found.", backend=req.backend, results=None) |
|
|
|
|
|
@serp_router.post("/fallback_search") |
|
async def search_fallback(auto: SerpAutoSearchRequest) -> SerpResponse: |
|
"""Searches the given query with the first usable backend that matches the requested content category and fallbacks to the next one if it fails""" |
|
|
|
logging.info(f"Auto-search with query {auto}") |
|
for backend in SERP_BACKENDS.values(): |
|
if backend.category.lower() != auto.category.lower(): |
|
continue |
|
|
|
try: |
|
results = await query_serp_backend(backend, auto.query, http_client, pw_browser) |
|
return SerpResponse(error=None, results=results, backend=backend.name) |
|
except Exception as e: |
|
logging.warning(f"Search with {backend.name} backend failed: {e}") |
|
logging.info("Trying out query with next available backend.") |
|
continue |
|
|
|
return SerpResponse(error="No adequate backend found or all backends are rate-limited", results=None, backend=None) |
|
|
|
|
|
@serp_router.post("/fallback_search/bulk") |
|
async def search_fallback_bulk(auto: SerpAutoBulkSearchRequest) -> SerpBulkResponse: |
|
"""Bulk searches the given queries with the first usable backend that matches the requested content category and fallbacks to the next one if it fails""" |
|
|
|
logging.info(f"Bulk Auto-search with query {auto}") |
|
|
|
async def _process_q(q: SerpQuery): |
|
for backend in SERP_BACKENDS.values(): |
|
if backend.category.lower() != auto.category.lower(): |
|
continue |
|
|
|
try: |
|
results = await query_serp_backend(backend, q, http_client, pw_browser) |
|
return (results, backend) |
|
except Exception as e: |
|
logging.warning( |
|
f"Search with {backend.name} backend failed: {e}") |
|
continue |
|
|
|
raise Exception( |
|
error="No adequate backend found or all backends are rate-limited") |
|
|
|
tasks = await asyncio.gather(*[_process_q(q) for q in auto.queries], return_exceptions=True) |
|
result_sections = [SerpBulkResultItem.from_err(q, r[0], "") if isinstance( |
|
r[0], Exception) else SerpBulkResultItem.from_results(q, r[0], r[1].name) for r, q in zip(tasks, auto.queries)] |
|
|
|
return SerpBulkResponse(queries=result_sections) |
|
|
|
|
|
|
|
|
|
class FetchContentResponse(BaseModel): |
|
error: Optional[str] |
|
backend: Optional[str] |
|
content: Optional[dict] |
|
|
|
|
|
@scrap_router.get("/full_contents") |
|
async def full_content(slug: str) -> FetchContentResponse: |
|
splitted_slug = slug.split(":", 1) |
|
content_type = splitted_slug[0] |
|
content_id = splitted_slug[1] |
|
|
|
backend: ScrapperBackendBase = SCRAP_BACKENDS.get(content_type) |
|
|
|
if backend: |
|
try: |
|
result = await backend.scrap(http_client, content_id) |
|
return FetchContentResponse(error=None, backend=backend.content_type, content=result.model_dump()) |
|
except Exception as e: |
|
return FetchContentResponse(error=str(e), backend=backend.content_type, content=None) |
|
|
|
return FetchContentResponse(error="No backend supporting found", backend="", content=None) |
|
|
|
|
|
@scrap_router.get("/fetch_content") |
|
async def fetch_content(back: str, id: str) -> FetchContentResponse: |
|
backend: ScrapperBackendBase = SCRAP_BACKENDS.get(back) |
|
|
|
if backend: |
|
try: |
|
result = await backend.scrap(http_client, id) |
|
return FetchContentResponse(error=None, backend=back, content=result.model_dump()) |
|
except Exception as e: |
|
return FetchContentResponse(error=str(e), backend=back, content=None) |
|
|
|
return FetchContentResponse(error="No backend found", backend=back, content=None) |
|
|
|
|
|
app.include_router(serp_router) |
|
app.include_router(scrap_router) |
|
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|