kebos-ai / url_process.py
HARISH20205's picture
first
f3352b5
raw
history blame
17.5 kB
import datetime
import re
import socket
import requests
import dns.resolver
import ssl
from urllib.parse import urlparse, parse_qs
import whois
from tld import get_tld
import pandas as pd
import time
from googlesearch import search
from catboost import CatBoostClassifier
from lime.lime_tabular import LimeTabularExplainer
import logging
import asyncio
import aiohttp
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Feature extraction functions
def extract_url_features(url):
features = {}
try:
# Basic URL features
features['qty_dot_url'] = url.count('.')
features['qty_slash_url'] = url.count('/')
features['qty_at_url'] = url.count('@')
features['qty_space_url'] = url.count(' ')
features['qty_plus_url'] = url.count('+')
features['qty_dollar_url'] = url.count('$')
features['length_url'] = len(url)
features['qty_equal_url'] = url.count('=')
features['qty_asterisk_url'] = url.count('*')
features['qty_percent_url'] = url.count('%')
features['qty_exclamation_url'] = url.count('!')
features['qty_questionmark_url'] = url.count('?')
features['qty_tilde_url'] = url.count('~')
features['qty_hyphen_url'] = url.count('-')
features['qty_hashtag_url'] = url.count('#')
features['qty_underline_url'] = url.count('_')
tld_pattern = r'\.(com|org|net|gov|edu|io|xyz|info|biz|co|uk|us|ca|au|in|cn|jp|ru|de|fr|it|nl|es|ch|se|no|dk|fi|pl|tr|br|za|mx|kr|sg|my|th|hk|vn|ar|cl|pe|nz|il|pk)'
features['qty_tld_url'] = len(re.findall(tld_pattern, url, re.IGNORECASE))
features['qty_tld_url'] == get_tld(url, as_object=True, fail_silently=False)
features['email_in_url'] = 1 if '@' in url else 0
features['url_google_index'] = is_url_indexed(url)
features['url_shortened'] = 1 if 'bit.ly' in url or 'tinyurl.com' in url else 0
features['qty_comma_url'] = url.count(',')
features['qty_and_url'] = url.count('&')
# Extract domain from URL
domain = url.split('/')[2] if '://' in url else url.split('/')[0]
# Domain features
features['qty_underline_domain'] = domain.count('_')
features['qty_equal_domain'] = domain.count('=')
features['qty_exclamation_domain'] = domain.count('!')
features['qty_comma_domain'] = domain.count(',')
features['qty_hashtag_domain'] = domain.count('#')
features['qty_vowels_domain'] = sum(1 for c in domain if c in 'aeiouAEIOU')
features['server_client_domain'] = 1 if 'server' in domain or 'client' in domain else 0
features['qty_dot_domain'] = domain.count('.')
features['domain_in_ip'] = 1 if is_ip_address(domain) else 0
features['domain_length'] = len(domain)
features['qty_hyphen_domain'] = domain.count('-')
features['time_domain_expiration'] = get_domain_time_features(domain)[1]
features['qty_percent_domain'] = domain.count('%')
features['qty_at_domain'] = domain.count('@')
features['domain_spf'] = get_spf_record(domain)
features['domain_google_index'] = is_domain_indexed(domain)
# Directory features
directory = url.split('/')[3] if len(url.split('/')) > 3 else ""
features['qty_underline_directory'] = directory.count('_')
features['qty_equal_directory'] = directory.count('=')
features['qty_exclamation_directory'] = directory.count('!')
features['qty_comma_directory'] = directory.count(',')
features['qty_hashtag_directory'] = directory.count('#')
features['directory_length'] = len(directory)
features['qty_space_directory'] = directory.count(' ')
features['qty_tilde_directory'] = directory.count('~')
features['qty_dollar_directory'] = directory.count('$')
features['qty_plus_directory'] = directory.count('+')
features['qty_and_directory'] = directory.count('&')
features['qty_slash_directory'] = directory.count('/')
features['qty_dot_directory'] = directory.count('.')
features['qty_asterisk_directory'] = directory.count('*')
features['qty_at_directory'] = directory.count('@')
features['qty_questionmark_directory'] = directory.count('?')
features['qty_hyphen_directory'] = directory.count('-')
features['qty_percent_directory'] = directory.count('%')
features['qty_equal_directory'] = directory.count('=')
# File features
file = url.split('/')[4] if len(url.split('/')) > 4 else ""
features['qty_underline_file'] = file.count('_')
features['qty_and_file'] = domain.count('&')
features['qty_dollar_file'] = domain.count('$')
features['qty_questionmark_file'] = domain.count('?')
features['qty_equal_file'] = file.count('=')
features['qty_slash_file'] = file.count('/')
features['qty_exclamation_file'] = file.count('!')
features['qty_comma_file'] = file.count(',')
features['qty_hashtag_file'] = file.count('#')
features['file_length'] = len(file)
features['qty_tilde_file'] = file.count('~')
features['qty_at_file'] = file.count('@')
features['qty_dot_file'] = file.count('.')
features['qty_space_file'] = file.count(' ')
features['qty_plus_file'] = file.count('+')
features['qty_asterisk_file'] = file.count('*')
features['qty_hyphen_file'] = file.count('-')
features['qty_underline_file'] = file.count('_')
features['qty_percent_file'] = file.count('%')
features['qty_equal_file'] = file.count('=')
# Parameters features
params = url.split('?')[1] if '?' in url else ""
features['qty_underline_params'] = params.count('_')
features['qty_equal_params'] = params.count('=')
features['qty_exclamation_params'] = params.count('!')
features['qty_comma_params'] = params.count(',')
features['qty_hashtag_params'] = params.count('#')
features['params_length'] = len(params)
features['qty_tilde_params'] = params.count('~')
features['qty_asterisk_params'] = params.count('*')
features['qty_space_params'] = params.count(' ')
features['qty_dollar_params'] = params.count('$')
features['qty_questionmark_params'] = params.count('?')
features['tld_present_params'] = 1 if get_tld(url, as_object=True, fail_silently=False) else 0
features['qty_plus_params'] = params.count('+')
features['qty_at_params'] = params.count('@')
features['qty_params'] = url.count('?')
features['qty_and_params'] = params.count('&')
features['qty_hyphen_params'] = params.count('-')
features['qty_dot_params'] = params.count('.')
features['qty_percent_params'] = params.count('%')
features['qty_slash_params'] = params.count('/')
# Other features
features['email_in_url'] = 1 if '@' in url else 0
features['asn_ip'] = get_asn(get_ip_from_url(url))
features['qty_ip_resolved'] = get_resolved_ips(domain)
features['ttl_hostname'] = get_ttl(domain)
features['url_google_index'] = is_url_indexed(url)
# Extract domain time features and ensure timestamps
features['time_domain_activation'], features['time_domain_expiration'] = get_domain_time_features(domain)
# Convert activation time to a timestamp if it's a datetime object
if isinstance(features['time_domain_activation'], datetime):
features['time_domain_activation'] = features['time_domain_activation'].timestamp()
# Convert expiration time to a timestamp if it's a datetime object
if isinstance(features['time_domain_expiration'], datetime):
features['time_domain_expiration'] = features['time_domain_expiration'].timestamp()
try:
features['qty_redirects'] = len(requests.get(url, timeout=5).history)
except requests.exceptions.RequestException as e:
print(f"Error processing redirects for URL '{url}': {e}")
features['qty_redirects'] = -1
features['qty_mx_servers'] = get_mx_record_count(domain)
features['qty_nameservers'] = get_nameserver_count(domain)
features['tls_ssl_certificate'] = get_tls_ssl_certificate(domain)
features['time_response'] = get_response_time(url)
except Exception as e:
print(f"Error extracting features for {url}: {e}")
for key in features.keys():
features[key] = -1
return features
# Function to count specific characters in a URL
def count_char_in_url(url, char):
try:
return url.count(char)
except Exception:
return -1 # Return -1 if unable to count characters
# Function to extract domain features from a URL
def extract_domain_features(url):
try:
domain = urlparse(url).netloc
domain_parts = domain.split('.')
tld_length = len(domain_parts[-1]) # Top-level domain length
return domain, tld_length
except Exception:
return -1, -1 # Return -1 if extraction fails
def is_domain_indexed(domain):
query = f"site:{domain}"
try:
results = list(search(query, num=1))
return 1 if results else 0
except Exception as e:
print(f"Error checking Google index for {domain}: {e}")
return -1
def get_response_time(url):
try:
start_time = time.time()
response = requests.get(url, timeout=10) # 10-second timeout
end_time = time.time()
return end_time - start_time # Response time in seconds
except requests.exceptions.RequestException as e:
print(f"Error measuring response time for {url}: {e}")
return -1 # Return None if there's an error
def get_mx_record_count(domain):
try:
# Use dns.resolver.resolve instead of the deprecated query
answers = dns.resolver.resolve(domain, 'MX')
return len(answers)
except dns.resolver.NoAnswer:
# No MX records found for the domain
print(f"No MX records found for {domain}.")
return 0
except (dns.resolver.NXDOMAIN, dns.resolver.Timeout, dns.resolver.NoNameservers) as e:
# Handle other DNS errors
print(f"Error fetching MX records for {domain}: {e}")
return -1
def get_nameserver_count(domain):
try:
# Query the NS records for the domain
ns_records = dns.resolver.resolve(domain, 'NS')
return len(ns_records) # Return the count of NS records
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.Timeout):
# Handle cases where no NS records are found or DNS query fails
print(f"No NS records found for {domain}.")
return 0 # Return 0 if no NS records exist
# Function to extract directory and file related features
def extract_path_features(url):
try:
parsed_url = urlparse(url)
path = parsed_url.path
return path
except Exception:
return -1 # Return -1 if extraction fails
# Function to extract query parameter related features
def extract_query_features(url):
try:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
return query_params
except Exception:
return -1 # Return -1 if extraction fails
# Function to check if the domain is an IP address format
def is_ip_address(domain):
try:
socket.inet_aton(domain)
return 1 # It's an IP address
except socket.error:
return 0 # It's not an IP address
# Function to get the time-related features
from datetime import datetime
def get_domain_time_features(domain):
try:
domain_info = whois.whois(domain)
activation_time = domain_info.creation_date
expiration_time = domain_info.expiration_date
# Ensure activation_time and expiration_time are valid datetime objects or None
if not isinstance(activation_time, (datetime, type(None))):
activation_time = None
if not isinstance(expiration_time, (datetime, type(None))):
expiration_time = None
return activation_time, expiration_time
except Exception as e:
print(f"Error fetching domain times for {domain}: {e}")
return -1, -1
'''def get_domain_time_features(domain):
rdap_url = f"https://rdap.org/domain/{domain}" # RDAP public API endpoint
try:
response = requests.get(rdap_url, timeout=5)
if response.status_code == 200:
domain_data = response.json()
# Extract activation and expiration dates
activation_time = domain_data.get("events", [{}])
creation_date = None
expiration_date = None
for event in activation_time:
if event.get("eventAction") == "registration":
creation_date = event.get("eventDate")
elif event.get("eventAction") == "expiration":
expiration_date = event.get("eventDate")
# Convert string dates to datetime objects
creation_date = datetime.fromisoformat(creation_date) if creation_date else 0
expiration_date = datetime.fromisoformat(expiration_date) if expiration_date else 0
return creation_date, expiration_date
elif response.status_code == 404:
# Domain does not exist
return 0, 0
else:
# Failed to fetch data for other reasons
return -1, -1
except Exception as e:
print(f"Error fetching domain times for {domain}: {e}")
return -1, -1'''
# Function to get SPF record
def get_spf_record(domain):
try:
txt_records = dns.resolver.resolve(domain, 'TXT')
for record in txt_records:
if "v=spf1" in str(record):
return 1
return 0
except Exception:
return -1 # Return -1 if SPF check fails
# Function to get TLS/SSL certificate
def get_tls_ssl_certificate(domain):
try:
context = ssl.create_default_context()
with socket.create_connection((domain, 443)) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
return 1 # TLS/SSL certificate exists
except Exception:
return -1 # Return -1 if SSL check fails
# Function to get IP address from URL
def get_ip_from_url(url):
try:
domain = url.split('/')[2] if '://' in url else url.split('/')[0]
ip = socket.gethostbyname(domain)
return ip
except Exception:
return -1 # Return -1 if IP extraction fails
# Function to get ASN for an IP
def get_asn(ip):
if not ip or ip == -1:
return -1 # Return -1 if IP is invalid
try:
response = requests.get(f"https://ipinfo.io/{ip}/json")
data = response.json()
org = data.get("org", "Unknown ASN")
match = re.search(r'AS(\d+)', org)
return int(match.group(1)) if match else -1
except Exception:
return -1
# Function to get resolved IPs for a domain
def get_resolved_ips(domain):
try:
return len(socket.gethostbyname_ex(domain)[2])
except Exception:
return -1
# Function to get TTL value for a domain
def get_ttl(domain):
try:
answers = dns.resolver.resolve(domain, 'A')
return answers.rrset.ttl
except Exception:
return -1
def is_url_indexed(url):
query = f"site:{url}"
try:
results = list(search(query, num=1))
return 1 if results else 0
except Exception as e:
print(f"Error checking if URL is indexed: {e}")
return -1
def process_urls(urls):
url_features = []
for url in urls:
if not (url.startswith("http://") or url.startswith("https://")):
url = "https://" + url
features = extract_url_features(url)
url_features.append(features)
return pd.DataFrame(url_features)
'''def predict_urls(urls, model_path):
features_df = process_urls(urls)
features_df.fillna(-1, inplace=True)
model = CatBoostClassifier()
model.load_model(model_path)
predictions = model.predict(features_df)
return predictions'''
'''def explain_prediction(features_df, model_path):
model = CatBoostClassifier()
model.load_model(model_path)
from lime.lime_tabular import LimeTabularExplainer
explainer = LimeTabularExplainer(
training_data=features_df.values,
feature_names=features_df.columns.tolist(),
class_names=["Legitimate", "Malicious"],
mode="classification"
)
explanation = explainer.explain_instance(
data_row=features_df.iloc[0].values,
predict_fn=model.predict_proba,
num_features=5
)
explanation.show_in_notebook(show_table=True)
return explanation'''
# Async enhancements for faster processing (optional)
async def fetch_url(session, url):
try:
async with session.get(url, timeout=10) as response:
return len(response.history)
except Exception:
return -1
async def process_urls_async(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results