File size: 7,341 Bytes
da7dbd0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import time
import logging
import requests
import json
from bs4 import BeautifulSoup
from urllib.parse import quote, urlparse

logging.basicConfig(
    filename='error.log',
    level=logging.INFO,
    format='%(asctime)s | [%(levelname)s]: %(message)s',
    datefmt='%m-%d-%Y / %I:%M:%S %p'
)

class SearchResults:
    def __init__(self, results):
        self.results = results

    def __str__(self):
        output = ""
        for result in self.results:
            output += "---\n"
            output += f"Title: {result.get('title', 'Title not found')}\n"
            output += f"Link: {result.get('link', 'Link not found')}\n"
            output += "---\n"
        return output

class YandexReverseImageSearcher:
    def __init__(self):
        self.base_url = "https://yandex.ru/images/search"
        self.headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}
        self.retry_count = 3
        self.retry_delay = 1

    def response(self, query: str, image_url: str, max_results: int = 10, delay: int = 1) -> SearchResults:
        self._validate_input(query, image_url)
        
        encoded_query = quote(query)
        encoded_image_url = quote(image_url)

        url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2"

        all_results = []
        start_index = 0

        while len(all_results) < max_results:
            if start_index != 0:
                time.sleep(delay)
                
            paginated_url = f"{url}&start={start_index}"

            response = self._make_request(paginated_url)
            if response is None:
                break

            search_results, valid_content = self._parse_search_results(response.text)
            if not valid_content:
                logging.warning("Unexpected HTML structure encountered.")
                break

            for result in search_results:
                if len(all_results) >= max_results:
                    break
                data = self._extract_result_data(result)
                if data and data not in all_results:
                    all_results.append(data)

            start_index += (len(all_results)-start_index)

        if len(all_results) == 0:
            logging.warning(f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].")
            return "No results found. Please try again with a different query and/or image URL."
        else:
            return SearchResults(all_results[:max_results])
    
    def _validate_input(self, query: str, image_url: str):
        if not query:
            raise ValueError("Query not found. Please enter a query and try again.")
        if not image_url:
            raise ValueError("Image URL not found. Please enter an image URL and try again.")
        if not self._validate_image_url(image_url):
            raise ValueError("Invalid image URL. Please enter a valid image URL and try again.")
    
    def _validate_image_url(self, url: str) -> bool:
        parsed_url = urlparse(url)
        path = parsed_url.path.lower()
        valid_extensions = (".jpg", ".jpeg", ".png", ".webp")
        return any(path.endswith(ext) for ext in valid_extensions)
    
    def _make_request(self, url: str):
        attempts = 0
        while attempts < self.retry_count:
            try:
                response = requests.get(url, headers=self.headers)
                if response.headers.get('Content-Type', '').startswith('text/html'):
                    response.raise_for_status()
                    return response
                else:
                    logging.warning("Non-HTML content received.")
                    return None
            except requests.exceptions.HTTPError as http_err:
                logging.error(f"HTTP error occurred: {http_err}")
                attempts += 1
                time.sleep(self.retry_delay)
            except Exception as err:
                logging.error(f"An error occurred: {err}")
                return None
        return None

    def _parse_search_results(self, html_content: str):
        try:
            soup = BeautifulSoup(html_content, "html.parser")
            return soup.find_all('div', class_='g'), True
        except Exception as e:
            logging.error(f"Error parsing HTML content: {e}")
            return None, False

    def _extract_result_data(self, result):
        link = result.find('a', href=True)['href'] if result.find('a', href=True) else None
        title = result.find('h3').get_text(strip=True) if result.find('h3') else None
        return {"link": link, "title": title} if link and title else {}


def get_image_links(page):
    """
    Extracts image URLs from the given HTML page.

    Args:
        page: The HTML content as a string.

    Returns:
        A list of image URLs.
    """
    soup = BeautifulSoup(page, 'html.parser')
    
    # Find the specific section containing image links
    gallery_data = soup.find('div', {'class': 'cbir-section cbir-section_name_sites'})
    if gallery_data is None:
        return []
    
    # Find the container of image links
    image_links_container = gallery_data.find('div', {'class': 'Root'})
    if image_links_container is None:
        return []
    
    data_state = json.loads(image_links_container['data-state'])

    # Extract URLs from each div
    image_urls = []
    for site in data_state['sites']:
        original_image_url = site['originalImage']['url']
        image_urls.append(original_image_url)

    return image_urls


def yandex_reverse_image_search(file_path):
    img_search_url = generate_images_search_links(file_path)
    if img_search_url is None:
        return []
    
    # Simulate a user agent to avoid being blocked
    headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36',
    'Content-Type': 'application/json',
    }
    
    try:
        response = requests.get(img_search_url, headers=headers)
        response.raise_for_status()  # Raise an exception for bad status codes

        # Parse the HTML content
        soup = BeautifulSoup(response.content, 'html.parser')
        image_urls = get_image_links(soup.prettify())
        return image_urls

    except requests.exceptions.RequestException as e:
        print(f"Error fetching image: {e}")
        return []


def generate_images_search_links(file_path):
    search_url = 'https://yandex.ru/images/search'
    params = {'rpt': 'imageview', 'format': 'json', 'request': '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}'}
    
    try:
        files = {'upfile': ('blob', open(file_path, 'rb'), 'image/jpeg/webp')}
        response = requests.post(search_url, params=params, files=files)
        query_string = json.loads(response.content)['blocks'][0]['params']['url']
        img_search_url = search_url + '?' + query_string
        return img_search_url
    except:
        return None


if __name__ == "__main__":
    file_path = "T:\\Projects\\prj-nict-ai-content-detection\\data\\test_data\\towels.jpg.webp"
    image_urls = yandex_reverse_image_search(file_path)
    for image_url in image_urls:
        print(f"Image URL: {image_url}")