File size: 5,825 Bytes
22e1b62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import time
import logging
import requests
from bs4 import BeautifulSoup
from typing import Dict, Optional
from urllib.parse import quote, urlparse

logging.basicConfig(
    filename='error.log',
    level=logging.INFO,
    format='%(asctime)s | [%(levelname)s]: %(message)s',
    datefmt='%m-%d-%Y / %I:%M:%S %p'
)

class SearchResults:
    def __init__(self, results):
        self.results = results

    def __str__(self):
        output = ""
        for result in self.results:
            output += "---\n"
            output += f"Title: {result.get('title', 'Title not found')}\n"
            output += f"Link: {result.get('link', 'Link not found')}\n"
            output += "---\n"
        return output

class GoogleReverseImageSearch:
    def __init__(self):
        self.base_url = "https://www.google.com/searchbyimage"
        self.headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}
        self.retry_count = 3
        self.retry_delay = 1

    def response(self, query: str, image_url: str, max_results: int = 10, delay: int = 1) -> SearchResults:
        self._validate_input(query, image_url)
        
        encoded_query = quote(query)
        encoded_image_url = quote(image_url)

        url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2"

        all_results = []
        start_index = 0

        while len(all_results) < max_results:
            if start_index != 0:
                time.sleep(delay)
                
            paginated_url = f"{url}&start={start_index}"

            response = self._make_request(paginated_url)
            if response is None:
                break

            search_results, valid_content = self._parse_search_results(response.text)
            if not valid_content:
                logging.warning("Unexpected HTML structure encountered.")
                break

            for result in search_results:
                if len(all_results) >= max_results:
                    break
                data = self._extract_result_data(result)
                if data and data not in all_results:
                    all_results.append(data)

            start_index += (len(all_results)-start_index)

        if len(all_results) == 0:
            logging.warning(f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].")
            return "No results found. Please try again with a different query and/or image URL."
        else:
            return SearchResults(all_results[:max_results])
    
    def _validate_input(self, query: str, image_url: str):
        if not query:
            raise ValueError("Query not found. Please enter a query and try again.")
        if not image_url:
            raise ValueError("Image URL not found. Please enter an image URL and try again.")
        if not self._validate_image_url(image_url):
            raise ValueError("Invalid image URL. Please enter a valid image URL and try again.")
    
    def _validate_image_url(self, url: str) -> bool:
        parsed_url = urlparse(url)
        path = parsed_url.path.lower()
        valid_extensions = (".jpg", ".jpeg", ".png", ".webp")
        return any(path.endswith(ext) for ext in valid_extensions)
    
    def _make_request(self, url: str):
        attempts = 0
        while attempts < self.retry_count:
            try:
                response = requests.get(url, headers=self.headers)
                if response.headers.get('Content-Type', '').startswith('text/html'):
                    response.raise_for_status()
                    return response
                else:
                    logging.warning("Non-HTML content received.")
                    return None
            except requests.exceptions.HTTPError as http_err:
                logging.error(f"HTTP error occurred: {http_err}")
                attempts += 1
                time.sleep(self.retry_delay)
            except Exception as err:
                logging.error(f"An error occurred: {err}")
                return None
        return None

    def _parse_search_results(self, html_content: str) -> (Optional[list], bool):
        try:
            soup = BeautifulSoup(html_content, "html.parser")
            return soup.find_all('div', class_='g'), True
        except Exception as e:
            logging.error(f"Error parsing HTML content: {e}")
            return None, False

    def _extract_result_data(self, result) -> Dict:
        link = result.find('a', href=True)['href'] if result.find('a', href=True) else None
        title = result.find('h3').get_text(strip=True) if result.find('h3') else None
        return {"link": link, "title": title} if link and title else {}
    

if __name__ == "__main__":
    # request = GoogleReverseImageSearch()

    # response = request.response(
    #     query="Example Query",
    #     image_url="https://ichef.bbci.co.uk/images/ic/1024xn/p0khzhhl.jpg.webp",
    #     max_results=5
    # )

    # print(response)

    # Path to local image
    image_path = "data/test_data/towel.jpg"
    image_path = "C:\\TTProjects\\prj-nict-ai-content-detection\\data\\test_data\\towel.jpg"

    import json
    file_path = image_path
    search_url = 'https://yandex.ru/images/search'
    files = {'upfile': ('blob', open(file_path, 'rb'), 'image/jpeg')}
    params = {'rpt': 'imageview', 'format': 'json', 'request': '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}'}
    response = requests.post(search_url, params=params, files=files)
    query_string = json.loads(response.content)['blocks'][0]['params']['url']
    img_search_url = search_url + '?' + query_string
    print(img_search_url)

    response = requests.get(img_search_url)
    print(response.text)