Spaces:
Running
Running
import pandas as pd | |
import numpy as np | |
df = pd.read_csv('receiverByStateTown.csv') | |
agg = df.groupby(['state','town']).agg({ | |
'signal_strength': 'mean', | |
'dropped_call': 'mean' | |
}).reset_index() | |
# Define classification logic | |
def classify_receiver(row): | |
avg_signal = row['signal_strength'] | |
drop_rate = row['dropped_call'] | |
if avg_signal >= 2.0 and drop_rate <= 0.25: | |
return 'Green' | |
elif avg_signal >= 1.3 and drop_rate <= 0.19: | |
return 'Yellow' | |
else: | |
return 'Red' | |
agg['receiver_status'] = agg.apply(classify_receiver, axis=1) | |
def classify_receiver_mean(row): | |
avg_signal = row['avg_signal_strength'] | |
drop_rate = row['call_drop_rate'] | |
if avg_signal >= 2.0 and drop_rate <= 0.25: | |
return 'Green' | |
elif avg_signal >= 1.3 and drop_rate <= 0.19: | |
return 'Yellow' | |
else: | |
return 'Red' | |
new_data = agg.merge(df, left_on= ["state","town"], right_on= ["state", "town"], how="inner") | |
final_df = new_data[['state','town','device_id','receiver_latitude','receiver_longitude','receiver_status']] | |
def analyze_telecom_data(): | |
results = {} | |
# 1. State with the most connected devices, group by state and then find sort by max device ids count | |
device_count_by_state = df.groupby('state')['device_id'].nunique().sort_values(ascending=False) | |
#get the state with maximum number of connected devices | |
results['max_connected_state'] = device_count_by_state.idxmax() | |
#get the number of devices connected for that state | |
results['max_connected_state_count'] = device_count_by_state.max() | |
# 2. State with best average signal strength | |
signal_strength_by_state = df.groupby('state')['signal_strength'].mean().sort_values(ascending=False) | |
# get the state with the best signal strength | |
results['best_signal_state'] = signal_strength_by_state.idxmax() | |
#get the best averge signal strength for that state | |
results['best_signal_avg'] = round(signal_strength_by_state.max(), 2) | |
# 3. State with worst average call drop rate | |
call_drop_by_state = df.groupby('state')['dropped_call'].mean().sort_values(ascending=False) | |
# get the state with max call drop call | |
results['worst_call_drop_state'] = call_drop_by_state.idxmax() | |
# get the worst drop rate for that state | |
results['worst_call_drop_rate'] = round(call_drop_by_state.max(), 2) | |
# 4. Town-level summary (per state and town) | |
town_summary = ( | |
df.groupby(['state', 'town']) | |
.agg( | |
device_count=('device_id', 'nunique'), # find device count aggregated | |
avg_signal_strength=('signal_strength', 'mean'), # find average signal strength | |
call_drop_rate=('dropped_call', 'mean'), # find average drop calls | |
avg_call_duration=('call_duration', 'mean') # find average call duration | |
) | |
.reset_index() | |
.sort_values(by='device_count', ascending=False) | |
) | |
town_summary['receiver_status'] = town_summary.apply(classify_receiver_mean, axis=1) | |
location_df = ( | |
df.groupby(['state', 'town']) | |
.agg( | |
receiver_latitude=('receiver_latitude', 'first'), | |
receiver_longitude=('receiver_longitude', 'first') | |
) | |
.reset_index() | |
) | |
town_summary = town_summary.merge(location_df, on=['state', 'town'], how='left') | |
town_summary = town_summary.round(2) | |
# 5. Top towns by different metrics | |
#get town by most connected devices | |
results['top_town_by_device'] = town_summary.sort_values('device_count', ascending=False).head(1).to_dict('records')[0] | |
#get town by best average signal | |
results['top_town_by_signal'] = town_summary.sort_values('avg_signal_strength', ascending=False).head(1).to_dict('records')[0] | |
#get town with most call drop rate | |
results['worst_town_by_call_drop'] = town_summary.sort_values('call_drop_rate', ascending=False).head(1).to_dict('records')[0] | |
# 6. Full breakdowns | |
results['state_device_count'] = device_count_by_state.to_dict() | |
results['state_signal_strength'] = signal_strength_by_state.round(2).to_dict() | |
results['state_call_drop_rate'] = call_drop_by_state.round(2).to_dict() | |
results['town_summary'] = town_summary.round(2).to_dict('records') | |
return results | |
def get_summary_by_state(state_name): | |
filtered = final_df[final_df['state'] == state_name] | |
coord_counts = ( | |
filtered.groupby(['town', 'receiver_latitude', 'receiver_longitude']) | |
.size() | |
.reset_index(name='count') | |
) | |
dominant_coords = ( | |
coord_counts.sort_values(['town', 'count'], ascending=[True, False]) | |
.groupby('town') | |
.head(1) | |
.drop(columns='count') | |
) | |
summary = ( | |
filtered.merge(dominant_coords, on=['town', 'receiver_latitude', 'receiver_longitude']) | |
.groupby(['town', 'receiver_latitude', 'receiver_longitude']) | |
.agg( | |
device_count=('device_id', 'nunique'), | |
receiver_status=('receiver_status', lambda x: x.mode().iloc[0]) | |
) | |
.reset_index()) | |
return summary |