Spaces:
Sleeping
Sleeping
| import time | |
| import logging | |
| import requests | |
| import json | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import quote, urlparse | |
| logging.basicConfig( | |
| filename='error.log', | |
| level=logging.INFO, | |
| format='%(asctime)s | [%(levelname)s]: %(message)s', | |
| datefmt='%m-%d-%Y / %I:%M:%S %p' | |
| ) | |
| class SearchResults: | |
| def __init__(self, results): | |
| self.results = results | |
| def __str__(self): | |
| output = "" | |
| for result in self.results: | |
| output += "---\n" | |
| output += f"Title: {result.get('title', 'Title not found')}\n" | |
| output += f"Link: {result.get('link', 'Link not found')}\n" | |
| output += "---\n" | |
| return output | |
| class YandexReverseImageSearcher: | |
| def __init__(self): | |
| self.base_url = "https://yandex.ru/images/search" | |
| self.headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"} | |
| self.retry_count = 3 | |
| self.retry_delay = 1 | |
| def response(self, query: str, image_url: str, max_results: int = 10, delay: int = 1) -> SearchResults: | |
| self._validate_input(query, image_url) | |
| encoded_query = quote(query) | |
| encoded_image_url = quote(image_url) | |
| url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2" | |
| all_results = [] | |
| start_index = 0 | |
| while len(all_results) < max_results: | |
| if start_index != 0: | |
| time.sleep(delay) | |
| paginated_url = f"{url}&start={start_index}" | |
| response = self._make_request(paginated_url) | |
| if response is None: | |
| break | |
| search_results, valid_content = self._parse_search_results(response.text) | |
| if not valid_content: | |
| logging.warning("Unexpected HTML structure encountered.") | |
| break | |
| for result in search_results: | |
| if len(all_results) >= max_results: | |
| break | |
| data = self._extract_result_data(result) | |
| if data and data not in all_results: | |
| all_results.append(data) | |
| start_index += (len(all_results)-start_index) | |
| if len(all_results) == 0: | |
| logging.warning(f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].") | |
| return "No results found. Please try again with a different query and/or image URL." | |
| else: | |
| return SearchResults(all_results[:max_results]) | |
| def _validate_input(self, query: str, image_url: str): | |
| if not query: | |
| raise ValueError("Query not found. Please enter a query and try again.") | |
| if not image_url: | |
| raise ValueError("Image URL not found. Please enter an image URL and try again.") | |
| if not self._validate_image_url(image_url): | |
| raise ValueError("Invalid image URL. Please enter a valid image URL and try again.") | |
| def _validate_image_url(self, url: str) -> bool: | |
| parsed_url = urlparse(url) | |
| path = parsed_url.path.lower() | |
| valid_extensions = (".jpg", ".jpeg", ".png", ".webp") | |
| return any(path.endswith(ext) for ext in valid_extensions) | |
| def _make_request(self, url: str): | |
| attempts = 0 | |
| while attempts < self.retry_count: | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| if response.headers.get('Content-Type', '').startswith('text/html'): | |
| response.raise_for_status() | |
| return response | |
| else: | |
| logging.warning("Non-HTML content received.") | |
| return None | |
| except requests.exceptions.HTTPError as http_err: | |
| logging.error(f"HTTP error occurred: {http_err}") | |
| attempts += 1 | |
| time.sleep(self.retry_delay) | |
| except Exception as err: | |
| logging.error(f"An error occurred: {err}") | |
| return None | |
| return None | |
| def _parse_search_results(self, html_content: str): | |
| try: | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| return soup.find_all('div', class_='g'), True | |
| except Exception as e: | |
| logging.error(f"Error parsing HTML content: {e}") | |
| return None, False | |
| def _extract_result_data(self, result): | |
| link = result.find('a', href=True)['href'] if result.find('a', href=True) else None | |
| title = result.find('h3').get_text(strip=True) if result.find('h3') else None | |
| return {"link": link, "title": title} if link and title else {} | |
| def get_image_links(page): | |
| """ | |
| Extracts image URLs from the given HTML page. | |
| Args: | |
| page: The HTML content as a string. | |
| Returns: | |
| A list of image URLs. | |
| """ | |
| soup = BeautifulSoup(page, 'html.parser') | |
| # Find the specific section containing image links | |
| gallery_data = soup.find('div', {'class': 'cbir-section cbir-section_name_sites'}) | |
| if gallery_data is None: | |
| return [] | |
| # Find the container of image links | |
| image_links_container = gallery_data.find('div', {'class': 'Root'}) | |
| if image_links_container is None: | |
| return [] | |
| data_state = json.loads(image_links_container['data-state']) | |
| # Extract URLs from each div | |
| image_urls = [] | |
| for site in data_state['sites']: | |
| original_image_url = site['originalImage']['url'] | |
| image_urls.append(original_image_url) | |
| return image_urls | |
| def yandex_reverse_image_search(file_path): | |
| img_search_url = generate_images_search_links(file_path) | |
| if img_search_url is None: | |
| return [] | |
| # Simulate a user agent to avoid being blocked | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', | |
| 'Content-Type': 'application/json', | |
| } | |
| try: | |
| response = requests.get(img_search_url, headers=headers) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| # Parse the HTML content | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| image_urls = get_image_links(soup.prettify()) | |
| return image_urls | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching image: {e}") | |
| return [] | |
| def generate_images_search_links(file_path): | |
| search_url = 'https://yandex.ru/images/search' | |
| params = {'rpt': 'imageview', 'format': 'json', 'request': '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}'} | |
| try: | |
| files = {'upfile': ('blob', open(file_path, 'rb'), 'image/jpeg/webp')} | |
| response = requests.post(search_url, params=params, files=files) | |
| query_string = json.loads(response.content)['blocks'][0]['params']['url'] | |
| img_search_url = search_url + '?' + query_string | |
| return img_search_url | |
| except: | |
| return None | |
| if __name__ == "__main__": | |
| file_path = "T:\\Projects\\prj-nict-ai-content-detection\\data\\test_data\\towels.jpg.webp" | |
| image_urls = yandex_reverse_image_search(file_path) | |
| for image_url in image_urls: | |
| print(f"Image URL: {image_url}") | |