|
'''Copyright 2024 Ashok Kumar |
|
|
|
Licensed under the Apache License, Version 2.0 (the "License"); |
|
you may not use this file except in compliance with the License. |
|
You may obtain a copy of the License at |
|
|
|
http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
Unless required by applicable law or agreed to in writing, software |
|
distributed under the License is distributed on an "AS IS" BASIS, |
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
See the License for the specific language governing permissions and |
|
limitations under the License.''' |
|
|
|
import os |
|
import requests |
|
import json |
|
import pandas as pd |
|
import geopandas as gpd |
|
import tzlocal |
|
import pytz |
|
from PIL import Image |
|
import contextily as ctx |
|
from datetime import datetime |
|
from geopy.exc import GeocoderTimedOut |
|
from geopy.geocoders import Nominatim |
|
import folium |
|
from folium import plugins |
|
import streamlit as st |
|
import streamlit_folium as st_folium |
|
from data import flight_data |
|
from huggingface_hub import InferenceClient |
|
import branca.colormap as cm |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from difflib import get_close_matches |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
import time |
|
|
|
|
|
@st.cache_data(ttl=3600) |
|
def load_airport_data(): |
|
data_url = "https://raw.githubusercontent.com/ashok2216-A/ashok_airport-data/main/data/airports.dat" |
|
column_names = ["Airport ID", "Name", "City", "Country", "IATA/FAA", "ICAO", "Latitude", "Longitude", |
|
"Altitude", "Timezone", "DST", "Tz database time zone", "Type", "Source"] |
|
return pd.read_csv(data_url, header=None, names=column_names) |
|
|
|
|
|
@st.cache_data(ttl=3600) |
|
def get_location(country): |
|
geolocator = Nominatim(user_agent="flight_tracker") |
|
return geolocator.geocode(country) |
|
|
|
|
|
@st.cache_data(ttl=60) |
|
def fetch_flight_data(lat_min, lat_max, lon_min, lon_max): |
|
try: |
|
|
|
url = "https://opensky-network.org/api/states/all" |
|
|
|
|
|
params = { |
|
'lamin': lat_min, |
|
'lamax': lat_max, |
|
'lomin': lon_min, |
|
'lomax': lon_max |
|
} |
|
|
|
|
|
response = requests.get(url, params=params, timeout=10) |
|
|
|
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
|
|
|
if not data or 'states' not in data: |
|
st.warning("No flight data available for the selected area.") |
|
return {'states': [], 'time': 0} |
|
|
|
return data |
|
|
|
except requests.exceptions.RequestException as e: |
|
st.error(f"Error fetching flight data: {str(e)}") |
|
return {'states': [], 'time': 0} |
|
except json.JSONDecodeError as e: |
|
st.error(f"Error parsing flight data: {str(e)}") |
|
return {'states': [], 'time': 0} |
|
except Exception as e: |
|
st.error(f"Unexpected error: {str(e)}") |
|
return {'states': [], 'time': 0} |
|
|
|
|
|
HF_API_URL = "https://api-inference.huggingface.co/models/google/flan-t5-large" |
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
headers = {"Authorization": f"Bearer {HF_TOKEN}"} |
|
|
|
def query_llm(prompt): |
|
try: |
|
payload = { |
|
"inputs": prompt, |
|
"parameters": { |
|
"max_length": 250, |
|
"temperature": 0.1, |
|
"top_p": 0.95, |
|
"do_sample": False |
|
} |
|
} |
|
|
|
response = requests.post(HF_API_URL, headers=headers, json=payload) |
|
response.raise_for_status() |
|
return response.json()[0]['generated_text'] |
|
except requests.exceptions.HTTPError as e: |
|
if e.response.status_code == 403: |
|
st.warning("Language model access is currently restricted. Using direct flight data display instead.") |
|
else: |
|
st.error(f"Error querying language model: {str(e)}") |
|
return None |
|
except Exception as e: |
|
st.error(f"Error querying language model: {str(e)}") |
|
return None |
|
|
|
def create_flight_embeddings(geo_df): |
|
"""Create embeddings for flight data to enable semantic search""" |
|
try: |
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
flight_texts = [] |
|
for _, row in geo_df.iterrows(): |
|
text = f"Flight {row['callsign']} from {row['origin_country']} " |
|
text += f"at altitude {row['baro_altitude']}m, speed {row['velocity']}m/s, " |
|
text += f"heading {row['true_track']}°" |
|
flight_texts.append(text) |
|
|
|
|
|
embeddings = model.encode(flight_texts) |
|
return embeddings, flight_texts |
|
except Exception as e: |
|
st.warning(f"Could not create embeddings: {str(e)}") |
|
return None, None |
|
|
|
def find_similar_flights(identifier, geo_df, embeddings, flight_texts, threshold=0.7): |
|
"""Find similar flights using semantic search""" |
|
try: |
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
query_embedding = model.encode([identifier]) |
|
|
|
|
|
similarities = cosine_similarity(query_embedding, embeddings)[0] |
|
|
|
|
|
similar_indices = [i for i, sim in enumerate(similarities) if sim > threshold] |
|
if similar_indices: |
|
return geo_df.iloc[similar_indices] |
|
return None |
|
except Exception as e: |
|
st.warning(f"Error in semantic search: {str(e)}") |
|
return None |
|
|
|
def query_flight_data(geo_df, question): |
|
|
|
question = question.lower().strip() |
|
|
|
|
|
query_mappings = { |
|
'callsign': ['callsign'], |
|
'altitude': ['baro_altitude', 'geo_altitude'], |
|
'speed': ['velocity'], |
|
'direction': ['true_track'], |
|
'country': ['origin_country'], |
|
'squawk': ['squawk'], |
|
'icao': ['icao24'], |
|
'vertical': ['vertical_rate'], |
|
'ground': ['on_ground'], |
|
'position': ['latitude', 'longitude'], |
|
'time': ['time_position', 'last_contact'] |
|
} |
|
|
|
|
|
identifier = None |
|
if 'for' in question: |
|
identifier = question.split('for')[-1].strip() |
|
elif 'of' in question: |
|
identifier = question.split('of')[-1].strip() |
|
elif 'about' in question: |
|
identifier = question.split('about')[-1].strip() |
|
|
|
if not identifier: |
|
return "Please specify a flight identifier (callsign or ICAO code) in your question." |
|
|
|
|
|
identifier = identifier.strip().upper() |
|
|
|
|
|
flight_data = None |
|
|
|
|
|
if identifier in geo_df['callsign'].str.upper().values: |
|
flight_data = geo_df[geo_df['callsign'].str.upper() == identifier] |
|
elif identifier in geo_df['icao24'].str.upper().values: |
|
flight_data = geo_df[geo_df['icao24'].str.upper() == identifier] |
|
|
|
|
|
if flight_data is None or flight_data.empty: |
|
|
|
clean_identifier = ''.join(filter(str.isalnum, identifier)) |
|
if not geo_df['callsign'].empty: |
|
clean_callsigns = geo_df['callsign'].fillna('').apply(lambda x: ''.join(filter(str.isalnum, str(x).upper()))) |
|
matches = clean_callsigns == clean_identifier |
|
if matches.any(): |
|
flight_data = geo_df[matches] |
|
|
|
|
|
if flight_data is None or flight_data.empty: |
|
try: |
|
all_callsigns = geo_df['callsign'].fillna('').str.upper().unique() |
|
close_matches = get_close_matches(identifier, all_callsigns, n=1, cutoff=0.8) |
|
if close_matches: |
|
flight_data = geo_df[geo_df['callsign'].str.upper() == close_matches[0]] |
|
except: |
|
pass |
|
|
|
|
|
if flight_data is None or flight_data.empty: |
|
try: |
|
|
|
embeddings, flight_texts = create_flight_embeddings(geo_df) |
|
if embeddings is not None: |
|
|
|
similar_flights = find_similar_flights(identifier, geo_df, embeddings, flight_texts) |
|
if similar_flights is not None and not similar_flights.empty: |
|
flight_data = similar_flights |
|
st.info(f"Found similar flight(s) to {identifier}") |
|
except Exception as e: |
|
st.warning(f"Semantic search failed: {str(e)}") |
|
|
|
if flight_data is None or flight_data.empty: |
|
|
|
available_flights = geo_df['callsign'].dropna().unique() |
|
if len(available_flights) > 0: |
|
return f"Could not find flight {identifier}. Available flights: {', '.join(available_flights[:10])}..." |
|
return f"Could not find flight {identifier}. No flights currently available in the selected area." |
|
|
|
|
|
flight_info = {} |
|
for col in flight_data.columns: |
|
if col in flight_data.columns: |
|
value = flight_data[col].iloc[0] |
|
if pd.notna(value): |
|
if col == 'baro_altitude' or col == 'geo_altitude': |
|
flight_info[col] = f"{value} meters" |
|
elif col == 'velocity': |
|
flight_info[col] = f"{value} m/s" |
|
elif col == 'true_track': |
|
flight_info[col] = f"{value} degrees" |
|
elif col == 'vertical_rate': |
|
flight_info[col] = f"{value} m/s" |
|
elif col == 'latitude': |
|
flight_info[col] = f"{value}° N" |
|
elif col == 'longitude': |
|
flight_info[col] = f"{value}° E" |
|
else: |
|
flight_info[col] = str(value) |
|
|
|
if not flight_info: |
|
return f"No information available for flight {identifier}." |
|
|
|
|
|
try: |
|
|
|
prompt = f"""Answer this question about flight {identifier}: {question} |
|
|
|
Available flight data: |
|
{json.dumps(flight_info, indent=2)} |
|
|
|
Provide a clear and concise answer focusing on the specific information requested.""" |
|
|
|
llm_response = query_llm(prompt) |
|
if llm_response: |
|
return llm_response |
|
except: |
|
pass |
|
|
|
|
|
response = f"Flight Information for {identifier}:\n" |
|
for key, value in flight_info.items(): |
|
response += f"- {key.replace('_', ' ').title()}: {value}\n" |
|
return response |
|
|
|
@st.cache_data(ttl=60) |
|
def get_traffic_gdf(lat_min, lat_max, lon_min, lon_max, local_time_zone, _loc, flight_info): |
|
|
|
json_dict = fetch_flight_data(lat_min, lat_max, lon_min, lon_max) |
|
|
|
if not json_dict or not json_dict.get('states'): |
|
st.warning("No flight data available for the selected area.") |
|
return None |
|
|
|
try: |
|
|
|
columns = ["icao24","callsign","origin_country","time_position","last_contact","longitude","latitude", |
|
"baro_altitude","on_ground","velocity","true_track","vertical_rate","sensors","geo_altitude", |
|
"squawk","spi","position_source"] |
|
|
|
unix_timestamp = int(json_dict["time"]) |
|
local_timezone = pytz.timezone(local_time_zone) |
|
local_time = datetime.fromtimestamp(unix_timestamp, local_timezone).strftime('%Y-%m-%d %H:%M:%S') |
|
|
|
|
|
state_df = pd.DataFrame(json_dict["states"], columns=columns) |
|
state_df['time'] = local_time |
|
|
|
|
|
gdf = gpd.GeoDataFrame( |
|
state_df, |
|
geometry=gpd.points_from_xy(state_df.longitude, state_df.latitude), |
|
crs="EPSG:4326" |
|
) |
|
|
|
|
|
st.title("Live Flight Tracker") |
|
st.subheader('Flight Details', divider='rainbow') |
|
st.write('Location: {0}'.format(_loc)) |
|
st.write('Current Local Time: {0}-{1}:'.format(local_time, local_time_zone)) |
|
st.write("Minimum_latitude is {0} and Maximum_latitude is {1}".format(lat_min, lat_max)) |
|
st.write("Minimum_longitude is {0} and Maximum_longitude is {1}".format(lon_min, lon_max)) |
|
st.write('Number of Visible Flights: {}'.format(len(json_dict['states']))) |
|
st.write('Plotting the flight: {}'.format(flight_info)) |
|
st.subheader('Map Visualization', divider='rainbow') |
|
st.write('****Click ":orange[Update Map]" Button to Refresh the Map****') |
|
return gdf |
|
|
|
except Exception as e: |
|
st.error(f"Error processing flight data: {str(e)}") |
|
return None |
|
|
|
def flight_tracking(flight_view_level, country, local_time_zone, flight_info, airport, color): |
|
|
|
loc = get_location(country) |
|
if not loc: |
|
st.error("Could not find location. Please try a different country name.") |
|
return |
|
|
|
loc_box = loc[1] |
|
extend_left =+12*flight_view_level |
|
extend_right =+10*flight_view_level |
|
extend_top =+10*flight_view_level |
|
extend_bottom =+ 18*flight_view_level |
|
lat_min, lat_max = (loc_box[0] - extend_left), loc_box[0]+extend_right |
|
lon_min, lon_max = (loc_box[1] - extend_bottom), loc_box[1]+extend_top |
|
|
|
columns = ["icao24","callsign","origin_country","time_position","last_contact","longitude","latitude", |
|
"baro_altitude","on_ground","velocity","true_track","vertical_rate","sensors","geo_altitude", |
|
"squawk","spi","position_source",] |
|
|
|
|
|
airport_df = load_airport_data() |
|
airport_locations = airport_df[["Name", "City", "Country", "IATA/FAA", "Latitude", "Longitude"]] |
|
airport_country_loc = airport_locations[airport_locations['Country'] == str(loc)] |
|
airport_country_loc = airport_country_loc[(airport_country_loc['Country'] == str(loc)) & |
|
(airport_country_loc['Latitude'] >= lat_min) & |
|
(airport_country_loc['Latitude'] <= lat_max) & |
|
(airport_country_loc['Longitude'] >= lon_min) & |
|
(airport_country_loc['Longitude'] <= lon_max)] |
|
|
|
|
|
geo_df = get_traffic_gdf(lat_min, lat_max, lon_min, lon_max, local_time_zone, loc, flight_info) |
|
if geo_df is None: |
|
return |
|
|
|
|
|
m = folium.Map( |
|
location=[loc_box[0], loc_box[1]], |
|
zoom_start=6, |
|
tiles='CartoDB dark_matter', |
|
show_grid=False |
|
) |
|
|
|
|
|
if color == "rainbow": |
|
colormap = cm.LinearColormap( |
|
colors=['red', 'yellow', 'green', 'blue', 'purple'], |
|
vmin=geo_df[flight_info].min(), |
|
vmax=geo_df[flight_info].max() |
|
) |
|
elif color == "ice": |
|
colormap = cm.LinearColormap( |
|
colors=['white', 'lightblue', 'blue'], |
|
vmin=geo_df[flight_info].min(), |
|
vmax=geo_df[flight_info].max() |
|
) |
|
else: |
|
colormap = cm.LinearColormap( |
|
colors=['yellow', 'orange', 'red'], |
|
vmin=geo_df[flight_info].min(), |
|
vmax=geo_df[flight_info].max() |
|
) |
|
|
|
|
|
icon_template = """ |
|
<div style="transform: rotate({rotation_angle}deg);"> |
|
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg"> |
|
<path d="M21 16v-2l-8-5V3.5c0-.83-.67-1.5-1.5-1.5S10 2.67 10 3.5V9l-8 5v2l8-2.5V19l-2 1.5V22l3.5-1 3.5 1v-1.5L13 19v-5.5l8 2.5z" fill="{color_hex}"/> |
|
</svg> |
|
</div> |
|
""" |
|
|
|
|
|
tooltip_template = """ |
|
<div style="font-size: 12px; font-family: Arial, sans-serif; max-width: 300px;"> |
|
<div style="font-weight: bold; font-size: 14px; margin-bottom: 5px; color: #2c3e50;"> |
|
Flight: {callsign} |
|
</div> |
|
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 5px;"> |
|
{rows} |
|
</div> |
|
</div> |
|
""" |
|
|
|
|
|
for idx, row in geo_df.iterrows(): |
|
if pd.notna(row['latitude']) and pd.notna(row['longitude']): |
|
|
|
value = row[flight_info] if pd.notna(row[flight_info]) else geo_df[flight_info].min() |
|
color_hex = colormap(value) |
|
|
|
|
|
rotation_angle = row['true_track'] if pd.notna(row['true_track']) else 0 |
|
icon_html = icon_template.format(rotation_angle=rotation_angle, color_hex=color_hex) |
|
|
|
|
|
tooltip_rows = [] |
|
for col in columns: |
|
val = row[col] if pd.notna(row[col]) else 'N/A' |
|
if col in ['baro_altitude', 'geo_altitude']: |
|
val = f"{val} m" |
|
elif col == 'velocity': |
|
val = f"{val} m/s" |
|
elif col == 'true_track': |
|
val = f"{val}°" |
|
tooltip_rows.append(f'<div style="font-weight: bold;">{col}:</div><div>{val}</div>') |
|
|
|
|
|
tooltip_html = tooltip_template.format( |
|
callsign=row['callsign'] if pd.notna(row['callsign']) else 'Unknown', |
|
rows='\n'.join(tooltip_rows) |
|
) |
|
|
|
|
|
popup_content = f""" |
|
<div style="font-size: 14px; font-family: Arial, sans-serif; max-width: 300px;"> |
|
<div style="font-weight: bold; font-size: 16px; margin-bottom: 10px; color: #2c3e50;"> |
|
Flight: {row['callsign'] if pd.notna(row['callsign']) else 'Unknown'} |
|
</div> |
|
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 5px;"> |
|
<div style="font-weight: bold;">ICAO24:</div> |
|
<div>{row['icao24'] if pd.notna(row['icao24']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">Origin Country:</div> |
|
<div>{row['origin_country'] if pd.notna(row['origin_country']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">Time Position:</div> |
|
<div>{row['time_position'] if pd.notna(row['time_position']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">Last Contact:</div> |
|
<div>{row['last_contact'] if pd.notna(row['last_contact']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">Baro Altitude:</div> |
|
<div>{row['baro_altitude'] if pd.notna(row['baro_altitude']) else 'N/A'} m</div> |
|
<div style="font-weight: bold;">Geo Altitude:</div> |
|
<div>{row['geo_altitude'] if pd.notna(row['geo_altitude']) else 'N/A'} m</div> |
|
<div style="font-weight: bold;">Velocity:</div> |
|
<div>{row['velocity'] if pd.notna(row['velocity']) else 'N/A'} m/s</div> |
|
<div style="font-weight: bold;">True Track:</div> |
|
<div>{row['true_track'] if pd.notna(row['true_track']) else 'N/A'}°</div> |
|
<div style="font-weight: bold;">Vertical Rate:</div> |
|
<div>{row['vertical_rate'] if pd.notna(row['vertical_rate']) else 'N/A'} m/s</div> |
|
<div style="font-weight: bold;">Squawk:</div> |
|
<div>{row['squawk'] if pd.notna(row['squawk']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">On Ground:</div> |
|
<div>{row['on_ground'] if pd.notna(row['on_ground']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">SPI:</div> |
|
<div>{row['spi'] if pd.notna(row['spi']) else 'N/A'}</div> |
|
<div style="font-weight: bold;">Position Source:</div> |
|
<div>{row['position_source'] if pd.notna(row['position_source']) else 'N/A'}</div> |
|
</div> |
|
</div> |
|
""" |
|
|
|
|
|
icon = folium.DivIcon( |
|
html=icon_html, |
|
icon_size=(24, 24), |
|
icon_anchor=(12, 12) |
|
) |
|
|
|
|
|
folium.Marker( |
|
location=[row['latitude'], row['longitude']], |
|
icon=icon, |
|
popup=folium.Popup(popup_content, max_width=300), |
|
tooltip=tooltip_html |
|
).add_to(m) |
|
|
|
|
|
if airport == 1: |
|
for idx, row in airport_country_loc.iterrows(): |
|
folium.Marker( |
|
location=[row['Latitude'], row['Longitude']], |
|
icon=folium.Icon(icon='plane', prefix='fa', color='blue'), |
|
popup=f"<b>{row['Name']}</b><br>IATA: {row['IATA/FAA']}<br>City: {row['City']}", |
|
tooltip=f"Airport: {row['Name']}" |
|
).add_to(m) |
|
|
|
|
|
colormap.add_to(m) |
|
|
|
|
|
folium.LayerControl().add_to(m) |
|
|
|
|
|
st_folium.folium_static(m, width=1200, height=600) |
|
return None |
|
|
|
st.set_page_config( |
|
layout="wide" |
|
) |
|
image = Image.open('logo.png') |
|
add_selectbox = st.sidebar.image( |
|
image, width=150 |
|
) |
|
add_selectbox = st.sidebar.subheader( |
|
"Configure Map",divider='rainbow' |
|
) |
|
with st.sidebar: |
|
Refresh = st.button('Update Map', key=1) |
|
on = st.toggle('View Airports') |
|
if on: |
|
air_port = 1 |
|
st.write(':rainbow[Nice Work Buddy!]') |
|
st.write('Now Airports are Visible') |
|
else: |
|
air_port=0 |
|
view = st.slider('Increase Flight Visibility',1,6,2) |
|
st.write("You Selected:", view) |
|
cou = st.text_input('Type Country Name', 'north america') |
|
st.write('The current Country name is', cou) |
|
time = st.text_input('Type Time Zone Name (Ex: America/Toronto, Europe/Berlin)', 'Asia/Kolkata') |
|
st.write('The current Time Zone is', time) |
|
info = st.selectbox( |
|
'Select Flight Information', |
|
('baro_altitude', |
|
'on_ground', 'velocity', |
|
'geo_altitude')) |
|
st.write('Plotting the data of Flight:', info) |
|
clr = st.radio('Pick A Color for Scatter Plot',["rainbow","ice","hot"]) |
|
if clr == "rainbow": |
|
st.write('The current color is', "****:rainbow[Rainbow]****") |
|
elif clr == 'ice': |
|
st.write('The current color is', "****:blue[Ice]****") |
|
elif clr == 'hot': |
|
st.write('The current color is', "****:red[Hot]****") |
|
else: None |
|
|
|
|
|
flight_tracking(flight_view_level=view, country=cou,flight_info=info, |
|
local_time_zone=time, airport=air_port, color=clr) |
|
st.subheader('Ask your Questions!', divider='rainbow') |
|
st.write("Google's TAPAS base LLM model 🤖") |
|
geo_df = flight_data(flight_view_level = view, country= cou, flight_info=info, local_time_zone=time, airport=1) |
|
question = st.text_input('Type your questions here', "What is the squawk code for SWR9XD?") |
|
result = query_flight_data(geo_df, question) |
|
st.markdown(result) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|