File size: 2,340 Bytes
2bd3674
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from logging import Logger
import time


class StreamingConnectionInfo:
    def __init__(self, address, active_connections, latest_message_received_timestamp):
        self.address = address
        self.active_connections = active_connections
        self.latest_message_received_timestamp = latest_message_received_timestamp

    def __repr__(self):
        return str(self)

    def __str__(self):
        return str(
            {
                "address": self.address,
                "active_connections": self.active_connections,
                "latest_message_received_timestamp": self.latest_message_received_timestamp,
            }
        )


class ConnectionTracker:
    def __init__(self, logger: Logger):
        self.connections = dict()
        self.logger = logger

    def __str__(self):
        return str(self.connections)

    def add_connection(self, address):
        if address not in self.connections:
            self.connections[address] = StreamingConnectionInfo(address, 1, time.time())
        else:
            self.connections[address].active_connections += 1
            self.connections[address].latest_message_received_timestamp = time.time()

    def log_recent_message(self, address):
        if address in self.connections:
            self.connections[address].latest_message_received_timestamp = time.time()
        else:
            self.logger.warning(
                f"Address {address} not found in connection tracker when attempting to log recent message"
            )

    def remove_connection(self, address):
        if address in self.connections:
            self.connections[address].active_connections -= 1
            if self.connections[address].active_connections < 0:
                self.logger.warning(
                    f"Address {address} has negative active connections ({self.connections[address].active_connections})"
                )
            if self.connections[address].active_connections <= 0:
                del self.connections[address]
        else:
            self.logger.warning(
                f"Address {address} not found in connection tracker when attempting to remove it"
            )

    def get_active_connection_count(self):
        return sum(
            [connection.active_connections for connection in self.connections.values()]
        )