|
import asyncio |
|
import logging |
|
import warnings |
|
from typing import Dict, Generator, Optional |
|
import nest_asyncio |
|
import json |
|
import logging |
|
import sys |
|
from collections import deque |
|
from datetime import datetime, timezone |
|
from decimal import Decimal |
|
from itertools import cycle |
|
from typing import AsyncGenerator, Deque, Dict, Optional, Set, Tuple |
|
from curl_cffi import requests |
|
from docstring_inheritance import GoogleDocstringInheritanceMeta |
|
from lxml import html |
|
import json |
|
import re |
|
from html import unescape |
|
from typing import Optional |
|
from urllib.parse import unquote |
|
from dataclasses import dataclass |
|
from typing import Dict, Optional |
|
from random import randint |
|
class DuckDuckGoSearchException(Exception): |
|
"""Base exception class for webscout.""" |
|
|
|
|
|
|
|
@dataclass |
|
class MapsResult: |
|
"""Represents a result from the maps search.""" |
|
|
|
title: Optional[str] = None |
|
address: Optional[str] = None |
|
country_code: Optional[str] = None |
|
latitude: Optional[str] = None |
|
longitude: Optional[str] = None |
|
url: Optional[str] = None |
|
desc: Optional[str] = None |
|
phone: Optional[str] = None |
|
image: Optional[str] = None |
|
source: Optional[str] = None |
|
hours: Optional[Dict[str, str]] = None |
|
category: Optional[str] = None |
|
facebook: Optional[str] = None |
|
instagram: Optional[str] = None |
|
twitter: Optional[str] = None |
|
|
|
|
|
|
|
REGEX_500_IN_URL = re.compile(r"(?:\d{3}-\d{2}\.js)") |
|
REGEX_STRIP_TAGS = re.compile("<.*?>") |
|
REGEX_VQD = re.compile(rb"""vqd=['"]?([^&"']+)""") |
|
|
|
|
|
def _extract_vqd(html_bytes: bytes, keywords: str) -> Optional[str]: |
|
"""Extract vqd from html using a regular expression.""" |
|
try: |
|
match = REGEX_VQD.search(html_bytes) |
|
if match: |
|
return match.group(1).decode() |
|
except Exception: |
|
pass |
|
raise DuckDuckGoSearchException(f"_extract_vqd() {keywords=} Could not extract vqd.") |
|
|
|
|
|
def _text_extract_json(html_bytes: bytes, keywords: str) -> Optional[str]: |
|
"""text(backend="api") -> extract json from html.""" |
|
try: |
|
start = html_bytes.index(b"DDG.pageLayout.load('d',") + 24 |
|
end = html_bytes.index(b");DDG.duckbar.load(", start) |
|
data = html_bytes[start:end] |
|
return json.loads(data) |
|
except Exception as ex: |
|
raise DuckDuckGoSearchException(f"_text_extract_json() {keywords=} {type(ex).__name__}: {ex}") from ex |
|
|
|
|
|
def _is_500_in_url(url: str) -> bool: |
|
"""Something like '506-00.js' inside the url.""" |
|
return bool(REGEX_500_IN_URL.search(url)) |
|
|
|
|
|
def _normalize(raw_html: str) -> str: |
|
"""Strip HTML tags from the raw_html string.""" |
|
return unescape(REGEX_STRIP_TAGS.sub("", raw_html)) if raw_html else "" |
|
|
|
|
|
def _normalize_url(url: str) -> str: |
|
"""Unquote URL and replace spaces with '+'.""" |
|
return unquote(url.replace(" ", "+")) if url else "" |
|
|
|
logger = logging.getLogger("duckduckgo_search.AsyncWEBS") |
|
|
|
if sys.platform.lower().startswith("win"): |
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
|
|
|
|
|
class AsyncWEBS(metaclass=GoogleDocstringInheritanceMeta): |
|
"""webscout_search async class to get search results from duckduckgo.com.""" |
|
|
|
def __init__(self, headers=None, proxies=None, timeout=10) -> None: |
|
"""Initialize the AsyncWEBS object. |
|
|
|
Args: |
|
headers (dict, optional): Dictionary of headers for the HTTP client. Defaults to None. |
|
proxies (Union[dict, str], optional): Proxies for the HTTP client (can be dict or str). Defaults to None. |
|
timeout (int, optional): Timeout value for the HTTP client. Defaults to 10. |
|
""" |
|
useragent=f'{randint(0, 1000000)}' |
|
headers = {'User-Agent': useragent} |
|
self.proxies = proxies if proxies and isinstance(proxies, dict) else {"http": proxies, "https": proxies} |
|
self._asession = requests.AsyncSession( |
|
headers=headers, proxies=self.proxies, timeout=timeout, impersonate="chrome" |
|
) |
|
self._asession.headers["Referer"] = "https://duckduckgo.com/" |
|
|
|
async def __aenter__(self) -> "AsyncWEBS": |
|
"""A context manager method that is called when entering the 'with' statement.""" |
|
return self |
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: |
|
"""Closes the session.""" |
|
return self._asession.close() |
|
|
|
async def _aget_url(self, method: str, url: str, **kwargs) -> Optional[requests.Response]: |
|
try: |
|
useragent=f'{randint(0, 1000000)}' |
|
headers = {'User-Agent': useragent} |
|
resp = await self._asession.request(method, url, stream=True, **kwargs,headers=headers) |
|
resp.raise_for_status() |
|
resp_content = await resp.acontent() |
|
logger.debug(f"_aget_url() {url} {resp.status_code} {resp.http_version} {resp.elapsed} {len(resp_content)}") |
|
if _is_500_in_url(str(resp.url)) or resp.status_code == 202: |
|
raise DuckDuckGoSearchException("Ratelimit") |
|
if resp.status_code == 200: |
|
return resp_content |
|
except Exception as ex: |
|
raise DuckDuckGoSearchException(f"_aget_url() {url} {type(ex).__name__}: {ex}") from ex |
|
|
|
async def _aget_vqd(self, keywords: str) -> Optional[str]: |
|
"""Get vqd value for a search query.""" |
|
resp_content = await self._aget_url("POST", "https://duckduckgo.com", data={"q": keywords}) |
|
if resp_content: |
|
return _extract_vqd(resp_content, keywords) |
|
|
|
async def text( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
backend: str = "api", |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""DuckDuckGo text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m, y. Defaults to None. |
|
backend: api, html, lite. Defaults to api. |
|
api - collect data from https://duckduckgo.com, |
|
html - collect data from https://html.duckduckgo.com, |
|
lite - collect data from https://lite.duckduckgo.com. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
if backend == "api": |
|
results = self._text_api(keywords, region, safesearch, timelimit, max_results) |
|
elif backend == "html": |
|
results = self._text_html(keywords, region, safesearch, timelimit, max_results) |
|
elif backend == "lite": |
|
results = self._text_lite(keywords, region, timelimit, max_results) |
|
|
|
async for result in results: |
|
yield result |
|
|
|
async def _text_api( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m, y. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
payload = { |
|
"q": keywords, |
|
"kl": region, |
|
"l": region, |
|
"bing_market": region, |
|
"s": "0", |
|
"df": timelimit, |
|
"vqd": vqd, |
|
|
|
"sp": "0", |
|
} |
|
safesearch = safesearch.lower() |
|
if safesearch == "moderate": |
|
payload["ex"] = "-1" |
|
elif safesearch == "off": |
|
payload["ex"] = "-2" |
|
elif safesearch == "on": |
|
payload["p"] = "1" |
|
|
|
cache = set() |
|
for _ in range(11): |
|
resp_content = await self._aget_url("GET", "https://links.duckduckgo.com/d.js", params=payload) |
|
if resp_content is None: |
|
return |
|
|
|
page_data = _text_extract_json(resp_content, keywords) |
|
if page_data is None: |
|
return |
|
|
|
result_exists, next_page_url = False, None |
|
for row in page_data: |
|
href = row.get("u", None) |
|
if href and href not in cache and href != f"http://www.google.com/search?q={keywords}": |
|
cache.add(href) |
|
body = _normalize(row["a"]) |
|
if body: |
|
result_exists = True |
|
yield { |
|
"title": _normalize(row["t"]), |
|
"href": _normalize_url(href), |
|
"body": body, |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
else: |
|
next_page_url = row.get("n", None) |
|
if max_results is None or result_exists is False or next_page_url is None: |
|
return |
|
payload["s"] = next_page_url.split("s=")[1].split("&")[0] |
|
|
|
async def _text_html( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m, y. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
self._asession.headers["Referer"] = "https://html.duckduckgo.com/" |
|
safesearch_base = {"on": 1, "moderate": -1, "off": -2} |
|
payload = { |
|
"q": keywords, |
|
"s": "0", |
|
"kl": region, |
|
"p": safesearch_base[safesearch.lower()], |
|
"df": timelimit, |
|
} |
|
cache: Set[str] = set() |
|
for _ in range(11): |
|
resp_content = await self._aget_url("POST", "https://html.duckduckgo.com/html", data=payload) |
|
if resp_content is None: |
|
return |
|
|
|
tree = html.fromstring(resp_content) |
|
if tree.xpath('//div[@class="no-results"]/text()'): |
|
return |
|
|
|
result_exists = False |
|
for e in tree.xpath('//div[contains(@class, "results_links")]'): |
|
href = e.xpath('.//a[contains(@class, "result__a")]/@href') |
|
href = href[0] if href else None |
|
if ( |
|
href |
|
and href not in cache |
|
and href != f"http://www.google.com/search?q={keywords}" |
|
and not href.startswith("https://duckduckgo.com/y.js?ad_domain") |
|
): |
|
cache.add(href) |
|
title = e.xpath('.//a[contains(@class, "result__a")]/text()') |
|
body = e.xpath('.//a[contains(@class, "result__snippet")]//text()') |
|
result_exists = True |
|
yield { |
|
"title": _normalize(title[0]) if title else None, |
|
"href": _normalize_url(href), |
|
"body": _normalize("".join(body)) if body else None, |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next_page = tree.xpath('.//div[@class="nav-link"]') |
|
next_page = next_page[-1] if next_page else None |
|
if next_page is None: |
|
return |
|
|
|
names = next_page.xpath('.//input[@type="hidden"]/@name') |
|
values = next_page.xpath('.//input[@type="hidden"]/@value') |
|
payload = {n: v for n, v in zip(names, values)} |
|
|
|
async def _text_lite( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
timelimit: d, w, m, y. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
self._asession.headers["Referer"] = "https://lite.duckduckgo.com/" |
|
payload = { |
|
"q": keywords, |
|
"s": "0", |
|
"o": "json", |
|
"api": "d.js", |
|
"kl": region, |
|
"df": timelimit, |
|
} |
|
cache: Set[str] = set() |
|
for _ in range(11): |
|
resp_content = await self._aget_url("POST", "https://lite.duckduckgo.com/lite/", data=payload) |
|
if resp_content is None: |
|
return |
|
|
|
if b"No more results." in resp_content: |
|
return |
|
|
|
tree = html.fromstring(resp_content) |
|
|
|
result_exists = False |
|
data = zip(cycle(range(1, 5)), tree.xpath("//table[last()]//tr")) |
|
for i, e in data: |
|
if i == 1: |
|
href = e.xpath(".//a//@href") |
|
href = href[0] if href else None |
|
if ( |
|
href is None |
|
or href in cache |
|
or href == f"http://www.google.com/search?q={keywords}" |
|
or href.startswith("https://duckduckgo.com/y.js?ad_domain") |
|
): |
|
[next(data, None) for _ in range(3)] |
|
else: |
|
cache.add(href) |
|
title = e.xpath(".//a//text()")[0] |
|
elif i == 2: |
|
body = e.xpath(".//td[@class='result-snippet']//text()") |
|
body = "".join(body).strip() |
|
elif i == 3: |
|
result_exists = True |
|
yield { |
|
"title": _normalize(title), |
|
"href": _normalize_url(href), |
|
"body": _normalize(body), |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next_page_s = tree.xpath("//form[./input[contains(@value, 'ext')]]/input[@name='s']/@value") |
|
if not next_page_s: |
|
return |
|
payload["s"] = next_page_s[0] |
|
payload["vqd"] = _extract_vqd(resp_content, keywords) |
|
|
|
async def images( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
size: Optional[str] = None, |
|
color: Optional[str] = None, |
|
type_image: Optional[str] = None, |
|
layout: Optional[str] = None, |
|
license_image: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout images search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: Day, Week, Month, Year. Defaults to None. |
|
size: Small, Medium, Large, Wallpaper. Defaults to None. |
|
color: color, Monochrome, Red, Orange, Yellow, Green, Blue, |
|
Purple, Pink, Brown, Black, Gray, Teal, White. Defaults to None. |
|
type_image: photo, clipart, gif, transparent, line. |
|
Defaults to None. |
|
layout: Square, Tall, Wide. Defaults to None. |
|
license_image: any (All Creative Commons), Public (PublicDomain), |
|
Share (Free to Share and Use), ShareCommercially (Free to Share and Use Commercially), |
|
Modify (Free to Modify, Share, and Use), ModifyCommercially (Free to Modify, Share, and |
|
Use Commercially). Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with image search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
safesearch_base = {"on": 1, "moderate": 1, "off": -1} |
|
timelimit = f"time:{timelimit}" if timelimit else "" |
|
size = f"size:{size}" if size else "" |
|
color = f"color:{color}" if color else "" |
|
type_image = f"type:{type_image}" if type_image else "" |
|
layout = f"layout:{layout}" if layout else "" |
|
license_image = f"license:{license_image}" if license_image else "" |
|
payload = { |
|
"l": region, |
|
"o": "json", |
|
"q": keywords, |
|
"vqd": vqd, |
|
"f": f"{timelimit},{size},{color},{type_image},{layout},{license_image}", |
|
"p": safesearch_base[safesearch.lower()], |
|
} |
|
|
|
cache = set() |
|
for _ in range(10): |
|
resp_content = await self._aget_url("GET", "https://duckduckgo.com/i.js", params=payload) |
|
if resp_content is None: |
|
return |
|
try: |
|
resp_json = json.loads(resp_content) |
|
except Exception: |
|
return |
|
page_data = resp_json.get("results", None) |
|
if page_data is None: |
|
return |
|
|
|
result_exists = False |
|
for row in page_data: |
|
image_url = row.get("image", None) |
|
if image_url and image_url not in cache: |
|
cache.add(image_url) |
|
result_exists = True |
|
yield { |
|
"title": row["title"], |
|
"image": _normalize_url(image_url), |
|
"thumbnail": _normalize_url(row["thumbnail"]), |
|
"url": _normalize_url(row["url"]), |
|
"height": row["height"], |
|
"width": row["width"], |
|
"source": row["source"], |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next = resp_json.get("next", None) |
|
if next is None: |
|
return |
|
payload["s"] = next.split("s=")[-1].split("&")[0] |
|
|
|
async def videos( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
resolution: Optional[str] = None, |
|
duration: Optional[str] = None, |
|
license_videos: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout videos search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m. Defaults to None. |
|
resolution: high, standart. Defaults to None. |
|
duration: short, medium, long. Defaults to None. |
|
license_videos: creativeCommon, youtube. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with videos search results |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
safesearch_base = {"on": 1, "moderate": -1, "off": -2} |
|
timelimit = f"publishedAfter:{timelimit}" if timelimit else "" |
|
resolution = f"videoDefinition:{resolution}" if resolution else "" |
|
duration = f"videoDuration:{duration}" if duration else "" |
|
license_videos = f"videoLicense:{license_videos}" if license_videos else "" |
|
payload = { |
|
"l": region, |
|
"o": "json", |
|
"s": 0, |
|
"q": keywords, |
|
"vqd": vqd, |
|
"f": f"{timelimit},{resolution},{duration},{license_videos}", |
|
"p": safesearch_base[safesearch.lower()], |
|
} |
|
|
|
cache = set() |
|
for _ in range(10): |
|
resp_content = await self._aget_url("GET", "https://duckduckgo.com/v.js", params=payload) |
|
if resp_content is None: |
|
return |
|
try: |
|
resp_json = json.loads(resp_content) |
|
except Exception: |
|
return |
|
page_data = resp_json.get("results", None) |
|
if page_data is None: |
|
return |
|
|
|
result_exists = False |
|
for row in page_data: |
|
if row["content"] not in cache: |
|
cache.add(row["content"]) |
|
result_exists = True |
|
yield row |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next = resp_json.get("next", None) |
|
if next is None: |
|
return |
|
payload["s"] = next.split("s=")[-1].split("&")[0] |
|
|
|
async def news( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout news search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with news search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
safesearch_base = {"on": 1, "moderate": -1, "off": -2} |
|
payload = { |
|
"l": region, |
|
"o": "json", |
|
"noamp": "1", |
|
"q": keywords, |
|
"vqd": vqd, |
|
"p": safesearch_base[safesearch.lower()], |
|
"df": timelimit, |
|
"s": 0, |
|
} |
|
|
|
cache = set() |
|
for _ in range(10): |
|
resp_content = await self._aget_url("GET", "https://duckduckgo.com/news.js", params=payload) |
|
if resp_content is None: |
|
return |
|
try: |
|
resp_json = json.loads(resp_content) |
|
except Exception: |
|
return |
|
page_data = resp_json.get("results", None) |
|
if page_data is None: |
|
return |
|
|
|
result_exists = False |
|
for row in page_data: |
|
if row["url"] not in cache: |
|
cache.add(row["url"]) |
|
image_url = row.get("image", None) |
|
result_exists = True |
|
yield { |
|
"date": datetime.fromtimestamp(row["date"], timezone.utc).isoformat(), |
|
"title": row["title"], |
|
"body": _normalize(row["excerpt"]), |
|
"url": _normalize_url(row["url"]), |
|
"image": _normalize_url(image_url) if image_url else None, |
|
"source": row["source"], |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next = resp_json.get("next", None) |
|
if next is None: |
|
return |
|
payload["s"] = next.split("s=")[-1].split("&")[0] |
|
|
|
async def answers(self, keywords: str) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout instant answers. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
|
|
Yields: |
|
dict with instant answers results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
payload = { |
|
"q": f"what is {keywords}", |
|
"format": "json", |
|
} |
|
|
|
resp_content = await self._aget_url("GET", "https://api.duckduckgo.com/", params=payload) |
|
if resp_content is None: |
|
yield None |
|
try: |
|
page_data = json.loads(resp_content) |
|
except Exception: |
|
page_data = None |
|
|
|
if page_data: |
|
answer = page_data.get("AbstractText", None) |
|
url = page_data.get("AbstractURL", None) |
|
if answer: |
|
yield { |
|
"icon": None, |
|
"text": answer, |
|
"topic": None, |
|
"url": url, |
|
} |
|
|
|
|
|
payload = { |
|
"q": f"{keywords}", |
|
"format": "json", |
|
} |
|
resp_content = await self._aget_url("GET", "https://api.duckduckgo.com/", params=payload) |
|
if resp_content is None: |
|
yield None |
|
try: |
|
page_data = json.loads(resp_content).get("RelatedTopics", None) |
|
except Exception: |
|
page_data = None |
|
|
|
if page_data: |
|
for row in page_data: |
|
topic = row.get("Name", None) |
|
if not topic: |
|
icon = row["Icon"].get("URL", None) |
|
yield { |
|
"icon": f"https://duckduckgo.com{icon}" if icon else None, |
|
"text": row["Text"], |
|
"topic": None, |
|
"url": row["FirstURL"], |
|
} |
|
else: |
|
for subrow in row["Topics"]: |
|
icon = subrow["Icon"].get("URL", None) |
|
yield { |
|
"icon": f"https://duckduckgo.com{icon}" if icon else None, |
|
"text": subrow["Text"], |
|
"topic": topic, |
|
"url": subrow["FirstURL"], |
|
} |
|
|
|
async def suggestions(self, keywords: str, region: str = "wt-wt") -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout suggestions. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
|
|
Yields: |
|
dict with suggestions results. |
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
payload = { |
|
"q": keywords, |
|
"kl": region, |
|
} |
|
resp_content = await self._aget_url("GET", "https://duckduckgo.com/ac", params=payload) |
|
if resp_content is None: |
|
yield None |
|
try: |
|
page_data = json.loads(resp_content) |
|
for r in page_data: |
|
yield r |
|
except Exception: |
|
pass |
|
|
|
async def maps( |
|
self, |
|
keywords: str, |
|
place: Optional[str] = None, |
|
street: Optional[str] = None, |
|
city: Optional[str] = None, |
|
county: Optional[str] = None, |
|
state: Optional[str] = None, |
|
country: Optional[str] = None, |
|
postalcode: Optional[str] = None, |
|
latitude: Optional[str] = None, |
|
longitude: Optional[str] = None, |
|
radius: int = 0, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout maps search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query |
|
place: if set, the other parameters are not used. Defaults to None. |
|
street: house number/street. Defaults to None. |
|
city: city of search. Defaults to None. |
|
county: county of search. Defaults to None. |
|
state: state of search. Defaults to None. |
|
country: country of search. Defaults to None. |
|
postalcode: postalcode of search. Defaults to None. |
|
latitude: geographic coordinate (north-south position). Defaults to None. |
|
longitude: geographic coordinate (east-west position); if latitude and |
|
longitude are set, the other parameters are not used. Defaults to None. |
|
radius: expand the search square by the distance in kilometers. Defaults to 0. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with maps search results |
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
|
|
if latitude and longitude: |
|
lat_t = Decimal(latitude.replace(",", ".")) |
|
lat_b = Decimal(latitude.replace(",", ".")) |
|
lon_l = Decimal(longitude.replace(",", ".")) |
|
lon_r = Decimal(longitude.replace(",", ".")) |
|
if radius == 0: |
|
radius = 1 |
|
|
|
else: |
|
if place: |
|
params: Dict[str, Optional[str]] = { |
|
"q": place, |
|
"polygon_geojson": "0", |
|
"format": "jsonv2", |
|
} |
|
else: |
|
params = { |
|
"street": street, |
|
"city": city, |
|
"county": county, |
|
"state": state, |
|
"country": country, |
|
"postalcode": postalcode, |
|
"polygon_geojson": "0", |
|
"format": "jsonv2", |
|
} |
|
try: |
|
resp_content = await self._aget_url( |
|
"GET", |
|
"https://nominatim.openstreetmap.org/search.php", |
|
params=params, |
|
) |
|
if resp_content is None: |
|
yield None |
|
|
|
coordinates = json.loads(resp_content)[0]["boundingbox"] |
|
lat_t, lon_l = Decimal(coordinates[1]), Decimal(coordinates[2]) |
|
lat_b, lon_r = Decimal(coordinates[0]), Decimal(coordinates[3]) |
|
except Exception as ex: |
|
logger.debug(f"ddg_maps() keywords={keywords} {type(ex).__name__} {ex}") |
|
return |
|
|
|
|
|
lat_t += Decimal(radius) * Decimal(0.008983) |
|
lat_b -= Decimal(radius) * Decimal(0.008983) |
|
lon_l -= Decimal(radius) * Decimal(0.008983) |
|
lon_r += Decimal(radius) * Decimal(0.008983) |
|
logger.debug(f"bbox coordinates\n{lat_t} {lon_l}\n{lat_b} {lon_r}") |
|
|
|
|
|
work_bboxes: Deque[Tuple[Decimal, Decimal, Decimal, Decimal]] = deque() |
|
work_bboxes.append((lat_t, lon_l, lat_b, lon_r)) |
|
|
|
|
|
cache = set() |
|
while work_bboxes: |
|
lat_t, lon_l, lat_b, lon_r = work_bboxes.pop() |
|
params = { |
|
"q": keywords, |
|
"vqd": vqd, |
|
"tg": "maps_places", |
|
"rt": "D", |
|
"mkexp": "b", |
|
"wiki_info": "1", |
|
"is_requery": "1", |
|
"bbox_tl": f"{lat_t},{lon_l}", |
|
"bbox_br": f"{lat_b},{lon_r}", |
|
"strict_bbox": "1", |
|
} |
|
resp_content = await self._aget_url("GET", "https://duckduckgo.com/local.js", params=params) |
|
if resp_content is None: |
|
return |
|
try: |
|
page_data = json.loads(resp_content).get("results", []) |
|
except Exception: |
|
return |
|
if page_data is None: |
|
return |
|
|
|
for res in page_data: |
|
result = MapsResult() |
|
result.title = res["name"] |
|
result.address = res["address"] |
|
if f"{result.title} {result.address}" in cache: |
|
continue |
|
else: |
|
cache.add(f"{result.title} {result.address}") |
|
result.country_code = res["country_code"] |
|
result.url = _normalize_url(res["website"]) |
|
result.phone = res["phone"] |
|
result.latitude = res["coordinates"]["latitude"] |
|
result.longitude = res["coordinates"]["longitude"] |
|
result.source = _normalize_url(res["url"]) |
|
if res["embed"]: |
|
result.image = res["embed"].get("image", "") |
|
result.desc = res["embed"].get("description", "") |
|
result.hours = res["hours"] |
|
result.category = res["ddg_category"] |
|
result.facebook = f"www.facebook.com/profile.php?id={x}" if (x := res["facebook_id"]) else None |
|
result.instagram = f"https://www.instagram.com/{x}" if (x := res["instagram_id"]) else None |
|
result.twitter = f"https://twitter.com/{x}" if (x := res["twitter_id"]) else None |
|
yield result.__dict__ |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None: |
|
return |
|
|
|
if len(page_data) >= 15: |
|
lat_middle = (lat_t + lat_b) / 2 |
|
lon_middle = (lon_l + lon_r) / 2 |
|
bbox1 = (lat_t, lon_l, lat_middle, lon_middle) |
|
bbox2 = (lat_t, lon_middle, lat_middle, lon_r) |
|
bbox3 = (lat_middle, lon_l, lat_b, lon_middle) |
|
bbox4 = (lat_middle, lon_middle, lat_b, lon_r) |
|
work_bboxes.extendleft([bbox1, bbox2, bbox3, bbox4]) |
|
|
|
async def translate( |
|
self, keywords: str, from_: Optional[str] = None, to: str = "en" |
|
) -> Optional[Dict[str, Optional[str]]]: |
|
"""webscout translate. |
|
|
|
Args: |
|
keywords: string or a list of strings to translate |
|
from_: translate from (defaults automatically). Defaults to None. |
|
to: what language to translate. Defaults to "en". |
|
|
|
Returns: |
|
dict with translated keywords. |
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd("translate") |
|
|
|
payload = { |
|
"vqd": vqd, |
|
"query": "translate", |
|
"to": to, |
|
} |
|
if from_: |
|
payload["from"] = from_ |
|
|
|
resp_content = await self._aget_url( |
|
"POST", |
|
"https://duckduckgo.com/translation.js", |
|
params=payload, |
|
data=keywords.encode(), |
|
) |
|
if resp_content is None: |
|
return None |
|
try: |
|
page_data = json.loads(resp_content) |
|
page_data["original"] = keywords |
|
except Exception: |
|
page_data = None |
|
return page_data |
|
|
|
logger = logging.getLogger("duckduckgo_search.WEBS") |
|
nest_asyncio.apply() |
|
|
|
|
|
class WEBS(AsyncWEBS): |
|
def __init__(self, headers=None, proxies=None, timeout=10): |
|
if asyncio.get_event_loop().is_running(): |
|
warnings.warn("WEBS running in an async loop. This may cause errors. Use AsyncWEBS instead.", stacklevel=2) |
|
super().__init__(headers, proxies, timeout) |
|
self._loop = asyncio.get_event_loop() |
|
|
|
def __enter__(self) -> "WEBS": |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None: |
|
self._loop.create_task(self.__aexit__(exc_type, exc_val, exc_tb)) |
|
|
|
def _iter_over_async(self, async_gen): |
|
"""Iterate over an async generator.""" |
|
while True: |
|
try: |
|
yield self._loop.run_until_complete(async_gen.__anext__()) |
|
except StopAsyncIteration: |
|
break |
|
|
|
def text(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().text(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def images(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().images(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def videos(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().videos(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def news(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().news(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def answers(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().answers(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def suggestions(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().suggestions(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def maps(self, *args, **kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().maps(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def translate(self, *args, **kwargs) -> Optional[Dict[str, Optional[str]]]: |
|
async_coro = super().translate(*args, **kwargs) |
|
return self._loop.run_until_complete(async_coro) |
|
|
|
import g4f |
|
from webscout import WEBS |
|
from time import time as t |
|
from flask import Flask, jsonify, request |
|
app = Flask(__name__) |
|
|
|
@app.route('/search', methods=['POST']) |
|
def webscout2_search(): |
|
data = request.get_json() |
|
if 'query' not in data: |
|
return jsonify({'error': 'Query parameter missing'}), 400 |
|
|
|
query = data['query'] |
|
|
|
with WEBS() as WEBS: |
|
responses = [] |
|
for i, r in enumerate(WEBS.text(query, region='wt-wt', safesearch='off', timelimit='y')): |
|
if i == 10: |
|
break |
|
responses.append(r) |
|
return jsonify(responses) |
|
|
|
@app.route('/health', methods=['GET']) |
|
def health_check(): |
|
|
|
return jsonify({'status': 'ok'}) |
|
|
|
@app.route('/video', methods=['GET']) |
|
def webscout_videos(): |
|
params = request.args |
|
if 'keywords' not in params: |
|
return jsonify({'error': 'Keywords parameter missing'}), 400 |
|
|
|
keywords = params['keywords'] |
|
|
|
with WEBS() as WEBS: |
|
responses = [] |
|
for r in WEBS.videos( |
|
keywords, |
|
region="wt-wt", |
|
safesearch="off", |
|
timelimit="w", |
|
resolution="high", |
|
duration="medium", |
|
max_results=10, |
|
): |
|
responses.append(r) |
|
return jsonify(responses) |
|
|
|
@app.route('/img', methods=['GET']) |
|
def webscout2_images(): |
|
params = request.args |
|
if 'keywords' not in params: |
|
return jsonify({'error': 'Keywords parameter missing'}), 400 |
|
|
|
keywords = params['keywords'] |
|
|
|
with WEBS() as WEBS: |
|
responses = [] |
|
for r in WEBS.images( |
|
keywords, |
|
region="wt-wt", |
|
safesearch="off", |
|
size=None, |
|
|
|
type_image=None, |
|
layout=None, |
|
license_image=None, |
|
max_results=10, |
|
): |
|
responses.append(r) |
|
return jsonify(responses) |
|
|
|
@app.route('/news', methods=['GET']) |
|
def webscout_news(): |
|
params = request.args |
|
if 'keywords' not in params: |
|
return jsonify({'error': 'Keywords parameter missing'}), 400 |
|
|
|
keywords = params['keywords'] |
|
|
|
with WEBS() as WEBS: |
|
responses = [] |
|
for r in WEBS.news( |
|
keywords, |
|
region="wt-wt", |
|
safesearch="off", |
|
timelimit="m", |
|
max_results=10 |
|
): |
|
responses.append(r) |
|
return jsonify(responses) |
|
|
|
@app.route('/int', methods=['GET']) |
|
def webscout3_search(): |
|
query = request.args.get('query') |
|
if not query: |
|
return jsonify({'error': 'Query parameter missing'}), 400 |
|
|
|
|
|
with WEBS() as WEBS: |
|
responses = [] |
|
for i, r in enumerate(WEBS.text(query, region='wt-wt', safesearch='off', timelimit='y')): |
|
if i == 2: |
|
break |
|
responses.append(r) |
|
|
|
|
|
|
|
return jsonify(responses) |
|
app = Flask(__name__) |
|
|
|
@app.route('/translate', methods=['GET']) |
|
def webscout_translate(): |
|
params = request.args |
|
if 'keywords' not in params or 'to' not in params: |
|
return jsonify({'error': 'Keywords or target language parameter missing'}), 400 |
|
|
|
keywords = params['keywords'] |
|
target_language = params['to'] |
|
|
|
with WEBS() as WEBS: |
|
translation = WEBS.translate(keywords, to=target_language) |
|
return jsonify(translation) |
|
|
|
@app.route('/chat', methods=['POST']) |
|
def chat_gpt(): |
|
user_input = request.json.get('message') |
|
|
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": "Hello! I'm your virtual assistant. How can I help you?" |
|
} |
|
] |
|
|
|
assert user_input is not None |
|
|
|
messages.append({"role": "user", "content": user_input}) |
|
|
|
response = g4f.ChatCompletion.create( |
|
model="gpt-4-32k-0613", |
|
provider=g4f.Provider.GPTalk, |
|
messages=messages, |
|
stream=True, |
|
) |
|
|
|
ms = "" |
|
for message in response: |
|
ms += message |
|
|
|
messages.append({"role": "assistant", "content": ms}) |
|
return jsonify({"response": ms}) |
|
if __name__ == '__main__': |
|
app.run() |
|
|
|
|