File size: 3,634 Bytes
870ab6b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import random  
import requests  
import time  
import threading  
import socket


def fetch_proxies():
    """Fetch a list of proxy servers from proxyscrape.com.  
  
    Returns:  
        list: A list of proxy servers in the format "IP:Port".  
    """  
    url = "https://www.proxy-list.download/api/v1/get?type=https"
    response = requests.get(url)
    if response.status_code == 200:
        return response.text.split("\r\n")[:-1]
    print(f"Error fetching proxies: {response.status_code}")
    return []  


def test_proxy(proxy, prompt, timeout):
    """Test the given proxy server with a specified prompt and timeout.

    Args:
        proxy (str): The proxy server in the format "IP:Port".
        prompt (str): The test prompt to be used for testing.
        timeout (int): The maximum time in seconds allowed for the test.
    """
    try:
        # Split IP and Port
        ip, port = proxy.split(':')
        
        # Create a socket object
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # Start the timer
        start_time = time.time()
        
        # Connect to the proxy server
        sock.connect((ip, int(port)))
        
        # Stop the timer and calculate the elapsed time
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        # Print the elapsed time
        #print(f"Elapsed time: {elapsed_time} seconds")
        
        # Close the socket
        sock.close()
        
        # Check if the elapsed time is below the timeout
        if elapsed_time < timeout:
            print(f"proxy: {proxy} ✅ | Elapsed time: {elapsed_time} seconds")
            add_working_proxy(proxy)
    except Exception as e:
        pass


def add_working_proxy(proxy):  
    """Add a working proxy server to the global working_proxies list.  
  
    Args:  
        proxy (str): The proxy server in the format "IP:Port". 
    """  
    global working_proxies  
    working_proxies.append(proxy)  


def remove_proxy(proxy):  
    """Remove a proxy server from the global working_proxies list.  
  
    Args:  
        proxy (str): The proxy server in the format "IP:Port".  
    """  
    global working_proxies  
    if proxy in working_proxies:  
        working_proxies.remove(proxy)  


def get_working_proxies(prompt, timeout=5):  
    """Fetch and test proxy servers, adding working proxies to the global working_proxies list.  
  
    Args:  
        prompt (str): The test prompt to be used for testing.  
        timeout (int, optional): The maximum time in seconds allowed for testing. Defaults to 5.  
    """  
    proxy_list = fetch_proxies()  
    threads = []  

    for proxy in proxy_list:  
        thread = threading.Thread(target=test_proxy, args=(  
            proxy, prompt, timeout))  
        threads.append(thread)  
        thread.start()  

    for t in threads:  
        t.join(timeout)  


def update_working_proxies():  
    """Continuously update the global working_proxies list with working proxy servers."""  
    global working_proxies  
    test_prompt = "What is the capital of France?"  

    while True:  
        working_proxies = []  # Clear the list before updating  
        get_working_proxies(test_prompt)  
        print('proxies updated')  
        time.sleep(1800)  # Update proxies list every 30 minutes  


def get_random_proxy():  
    """Get a random working proxy server from the global working_proxies list.  
  
    Returns:  
        str: A random working proxy server in the format "IP:Port".  
    """  
    global working_proxies  
    return random.choice(working_proxies)