Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import time | |
| from urllib.parse import ( | |
| quote, | |
| urlparse, | |
| ) | |
| import requests | |
| from bs4 import BeautifulSoup | |
| logging.basicConfig( | |
| filename="error.log", | |
| level=logging.INFO, | |
| format="%(asctime)s | [%(levelname)s]: %(message)s", | |
| datefmt="%m-%d-%Y / %I:%M:%S %p", | |
| ) | |
| class SearchResults: | |
| def __init__(self, results): | |
| self.results = results | |
| def __str__(self): | |
| output = "" | |
| for result in self.results: | |
| output += "---\n" | |
| output += f"Title: {result.get('title', 'Title not found')}\n" | |
| output += f"Link: {result.get('link', 'Link not found')}\n" | |
| output += "---\n" | |
| return output | |
| class YandexReverseImageSearcher: | |
| def __init__(self): | |
| self.base_url = "https://yandex.ru/images/search" | |
| self.headers = { | |
| "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", # noqa: E501 | |
| } | |
| self.retry_count = 3 | |
| self.retry_delay = 1 | |
| def response( | |
| self, | |
| query: str, | |
| image_url: str, | |
| max_results: int = 10, | |
| delay: int = 1, | |
| ) -> SearchResults: | |
| self._validate_input(query, image_url) | |
| encoded_query = quote(query) | |
| encoded_image_url = quote(image_url) | |
| url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2" # noqa: E501 | |
| all_results = [] | |
| start_index = 0 | |
| while len(all_results) < max_results: | |
| if start_index != 0: | |
| time.sleep(delay) | |
| paginated_url = f"{url}&start={start_index}" | |
| response = self._make_request(paginated_url) | |
| if response is None: | |
| break | |
| search_results, valid_content = self._parse_search_results( | |
| response.text, | |
| ) | |
| if not valid_content: | |
| logging.warning("Unexpected HTML structure encountered.") | |
| break | |
| for result in search_results: | |
| if len(all_results) >= max_results: | |
| break | |
| data = self._extract_result_data(result) | |
| if data and data not in all_results: | |
| all_results.append(data) | |
| start_index += len(all_results) - start_index | |
| if len(all_results) == 0: | |
| logging.warning( | |
| f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].", # noqa: E501 | |
| ) | |
| return "No results found. Please try again with a different query and/or image URL." # noqa: E501 | |
| else: | |
| return SearchResults(all_results[:max_results]) | |
| def _validate_input(self, query: str, image_url: str): | |
| if not query: | |
| raise ValueError( | |
| "Query not found. Enter a query and try again.", | |
| ) | |
| if not image_url: | |
| raise ValueError( | |
| "Image URL not found. Enter an image URL and try again.", | |
| ) | |
| if not self._validate_image_url(image_url): | |
| raise ValueError( | |
| "Invalid image URL. Enter a valid image URL and try again.", | |
| ) | |
| def _validate_image_url(self, url: str) -> bool: | |
| parsed_url = urlparse(url) | |
| path = parsed_url.path.lower() | |
| valid_extensions = (".jpg", ".jpeg", ".png", ".webp") | |
| return any(path.endswith(ext) for ext in valid_extensions) | |
| def _make_request(self, url: str): | |
| attempts = 0 | |
| while attempts < self.retry_count: | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| if response.headers.get("Content-Type", "").startswith( | |
| "text/html", | |
| ): | |
| response.raise_for_status() | |
| return response | |
| else: | |
| logging.warning("Non-HTML content received.") | |
| return None | |
| except requests.exceptions.HTTPError as http_err: | |
| logging.error(f"HTTP error occurred: {http_err}") | |
| attempts += 1 | |
| time.sleep(self.retry_delay) | |
| except Exception as err: | |
| logging.error(f"An error occurred: {err}") | |
| return None | |
| return None | |
| def _parse_search_results(self, html_content: str): | |
| try: | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| return soup.find_all("div", class_="g"), True | |
| except Exception as e: | |
| logging.error(f"Error parsing HTML content: {e}") | |
| return None, False | |
| def _extract_result_data(self, result): | |
| link = ( | |
| result.find("a", href=True)["href"] | |
| if result.find("a", href=True) | |
| else None | |
| ) | |
| title = ( | |
| result.find("h3").get_text(strip=True) | |
| if result.find("h3") | |
| else None | |
| ) | |
| return {"link": link, "title": title} if link and title else {} | |
| def get_image_links(page): | |
| """ | |
| Extracts image URLs from the given HTML page. | |
| Args: | |
| page: The HTML content as a string. | |
| Returns: | |
| A list of image URLs. | |
| """ | |
| soup = BeautifulSoup(page, "html.parser") | |
| # Find the specific section containing image links | |
| gallery_data = soup.find( | |
| "div", | |
| {"class": "cbir-section cbir-section_name_sites"}, | |
| ) | |
| if gallery_data is None: | |
| return [] | |
| # Find the container of image links | |
| image_links_container = gallery_data.find("div", {"class": "Root"}) | |
| if image_links_container is None: | |
| return [] | |
| data_state = json.loads(image_links_container["data-state"]) | |
| # Extract URLs from each div | |
| image_urls = [] | |
| for site in data_state["sites"]: | |
| original_image_url = site["originalImage"]["url"] | |
| image_urls.append(original_image_url) | |
| return image_urls | |
| def yandex_reverse_image_search(file_path): | |
| img_search_url = generate_images_search_links(file_path) | |
| if img_search_url is None: | |
| return [] | |
| # Simulate a user agent to avoid being blocked | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36", # noqa: E501 | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| response = requests.get(img_search_url, headers=headers) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| # Parse the HTML content | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| image_urls = get_image_links(soup.prettify()) | |
| return image_urls | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching image: {e}") | |
| return [] | |
| def generate_images_search_links(file_path): | |
| search_url = "https://yandex.ru/images/search" | |
| params = { | |
| "rpt": "imageview", | |
| "format": "json", | |
| "request": '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}', # noqa: E501 | |
| } | |
| try: | |
| files = {"upfile": ("blob", open(file_path, "rb"), "image/jpeg/webp")} | |
| response = requests.post(search_url, params=params, files=files) | |
| query_string = json.loads(response.content)["blocks"][0]["params"][ | |
| "url" | |
| ] | |
| img_search_url = search_url + "?" + query_string | |
| return img_search_url | |
| except requests.exceptions as e: | |
| print(f"Error generating search URL: {e}") | |
| return None | |
| if __name__ == "__main__": | |
| file_path = "T:\\Projects\\prj-nict-ai-content-detection\\data\\test_data\\towels.jpg.webp" # noqa: E501 | |
| image_urls = yandex_reverse_image_search(file_path) | |
| for image_url in image_urls: | |
| print(f"Image URL: {image_url}") | |