Spaces:
Sleeping
Sleeping
File size: 5,825 Bytes
22e1b62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import time
import logging
import requests
from bs4 import BeautifulSoup
from typing import Dict, Optional
from urllib.parse import quote, urlparse
logging.basicConfig(
filename='error.log',
level=logging.INFO,
format='%(asctime)s | [%(levelname)s]: %(message)s',
datefmt='%m-%d-%Y / %I:%M:%S %p'
)
class SearchResults:
def __init__(self, results):
self.results = results
def __str__(self):
output = ""
for result in self.results:
output += "---\n"
output += f"Title: {result.get('title', 'Title not found')}\n"
output += f"Link: {result.get('link', 'Link not found')}\n"
output += "---\n"
return output
class GoogleReverseImageSearch:
def __init__(self):
self.base_url = "https://www.google.com/searchbyimage"
self.headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}
self.retry_count = 3
self.retry_delay = 1
def response(self, query: str, image_url: str, max_results: int = 10, delay: int = 1) -> SearchResults:
self._validate_input(query, image_url)
encoded_query = quote(query)
encoded_image_url = quote(image_url)
url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2"
all_results = []
start_index = 0
while len(all_results) < max_results:
if start_index != 0:
time.sleep(delay)
paginated_url = f"{url}&start={start_index}"
response = self._make_request(paginated_url)
if response is None:
break
search_results, valid_content = self._parse_search_results(response.text)
if not valid_content:
logging.warning("Unexpected HTML structure encountered.")
break
for result in search_results:
if len(all_results) >= max_results:
break
data = self._extract_result_data(result)
if data and data not in all_results:
all_results.append(data)
start_index += (len(all_results)-start_index)
if len(all_results) == 0:
logging.warning(f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].")
return "No results found. Please try again with a different query and/or image URL."
else:
return SearchResults(all_results[:max_results])
def _validate_input(self, query: str, image_url: str):
if not query:
raise ValueError("Query not found. Please enter a query and try again.")
if not image_url:
raise ValueError("Image URL not found. Please enter an image URL and try again.")
if not self._validate_image_url(image_url):
raise ValueError("Invalid image URL. Please enter a valid image URL and try again.")
def _validate_image_url(self, url: str) -> bool:
parsed_url = urlparse(url)
path = parsed_url.path.lower()
valid_extensions = (".jpg", ".jpeg", ".png", ".webp")
return any(path.endswith(ext) for ext in valid_extensions)
def _make_request(self, url: str):
attempts = 0
while attempts < self.retry_count:
try:
response = requests.get(url, headers=self.headers)
if response.headers.get('Content-Type', '').startswith('text/html'):
response.raise_for_status()
return response
else:
logging.warning("Non-HTML content received.")
return None
except requests.exceptions.HTTPError as http_err:
logging.error(f"HTTP error occurred: {http_err}")
attempts += 1
time.sleep(self.retry_delay)
except Exception as err:
logging.error(f"An error occurred: {err}")
return None
return None
def _parse_search_results(self, html_content: str) -> (Optional[list], bool):
try:
soup = BeautifulSoup(html_content, "html.parser")
return soup.find_all('div', class_='g'), True
except Exception as e:
logging.error(f"Error parsing HTML content: {e}")
return None, False
def _extract_result_data(self, result) -> Dict:
link = result.find('a', href=True)['href'] if result.find('a', href=True) else None
title = result.find('h3').get_text(strip=True) if result.find('h3') else None
return {"link": link, "title": title} if link and title else {}
if __name__ == "__main__":
# request = GoogleReverseImageSearch()
# response = request.response(
# query="Example Query",
# image_url="https://ichef.bbci.co.uk/images/ic/1024xn/p0khzhhl.jpg.webp",
# max_results=5
# )
# print(response)
# Path to local image
image_path = "data/test_data/towel.jpg"
image_path = "C:\\TTProjects\\prj-nict-ai-content-detection\\data\\test_data\\towel.jpg"
import json
file_path = image_path
search_url = 'https://yandex.ru/images/search'
files = {'upfile': ('blob', open(file_path, 'rb'), 'image/jpeg')}
params = {'rpt': 'imageview', 'format': 'json', 'request': '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}'}
response = requests.post(search_url, params=params, files=files)
query_string = json.loads(response.content)['blocks'][0]['params']['url']
img_search_url = search_url + '?' + query_string
print(img_search_url)
response = requests.get(img_search_url)
print(response.text) |