File size: 3,857 Bytes
d907837
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from abc import ABC, ABCMeta, abstractmethod
from contextlib import asynccontextmanager
import logging
from typing import Literal, Optional
from httpx import AsyncClient
from pydantic import BaseModel, Field
from playwright.async_api import Browser, BrowserContext, Page
from asyncio import Semaphore

# ========================== Schemas ==========================


class SerpQuery(BaseModel):
    """Model for SERP query"""
    query: str = Field(
        ..., description="The query to search for")
    n_results: int = Field(
        10, description="Number of results to return for each query. Valid values are 10, 25, 50 and 100")
    sort_by: Literal["relevance",
                     "date"] = Field(default="relevance", description="How to sort search results.")


class SerpResultItem(BaseModel):
    """Model for a single SERP result item"""
    title: str = Field(..., description="Title of the search result")
    href: str = Field(..., description="URL of the search result")
    body: Optional[str] = Field(
        None, description="Snippet of the search result")
    content_slug: Optional[str] = Field(
        None, description="Content slug of the search result. A slug that encodes the content type and URL that can be used to fetch the full content later")

    class Config:
        extra = "allow"  # Allow additional fields in the result item


# =============================== Base classes ===============================


class SERPBackendBase(ABC):
    """Base class for SERP scrapping backends"""

    def __init__(self):
        pass

    @property
    @abstractmethod
    def name(self) -> str:
        """Name of the backend. Used for identification in slugs"""
        pass

    @property
    @abstractmethod
    def category(self) -> Literal["general", "patent", "scholar"]:
        """Content category that the backend provides. Used for search_auto """
        pass

    @abstractmethod
    async def query(self, query: SerpQuery, client: AsyncClient) -> list[SerpResultItem]:
        """Perform a SERP query and return results"""
        pass


class PlaywrightSerpBackendBase(SERPBackendBase):
    """Base class for SERP scrapping backends using Playwright"""

    def __init__(self):
        pass

    async def query(self, query: SerpQuery, client: AsyncClient) -> list[SerpResultItem]:
        """Perform a SERP query and return results using Playwright"""
        raise NotImplementedError("query_page method must be used instead")

    @abstractmethod
    async def query_serp_page(self, browser: Browser, query: SerpQuery) -> list[SerpResultItem]:
        """Perform a SERP query using Playwright and return results"""
        pass


async def query_serp_backend(backend: SERPBackendBase, query: SerpQuery, client: AsyncClient, browser: Browser) -> list[SerpResultItem]:
    """Queries the given backend with the given SERP query."""
    logging.info(f"Querying {backend.name} with {query}")
    if isinstance(backend, PlaywrightSerpBackendBase):
        return await backend.query_serp_page(browser, query)
    else:
        return await backend.query(query, client)


def get_backends_doc(backends: list[SERPBackendBase]) -> str:
    """Retrieves all the available backends and builds a list for doc"""
    doc_str = "### Available SERP Backends \n\n\n "
    for backend in backends:
        doc_str += f"   \n\n `{backend.name}` - category: `{backend.category}`"

    return doc_str


@asynccontextmanager
async def playwright_open_page(browser: Browser, sema: Semaphore):
    """Context manager for playwright pages"""
    # Acquire the concurrency semaphore
    await sema.acquire()
    context: BrowserContext = await browser.new_context()
    page: Page = await context.new_page()
    try:
        yield page
    finally:
        await page.close()
        await context.close()
        sema.release()