|
import pandas as pd |
|
import openpyxl |
|
import gradio as gr |
|
import io |
|
import base64 |
|
from datetime import datetime |
|
|
|
def convert_schedule(file_path, direction): |
|
if file_path is None: |
|
return pd.DataFrame({'Error': ['Please upload a file']}), None |
|
|
|
try: |
|
|
|
raw = pd.read_excel(file_path, header=None) |
|
header1 = raw.iloc[0, 1:].astype(object) |
|
header2 = raw.iloc[1, 1:].astype(object) |
|
|
|
|
|
|
|
header2_str = header2.fillna('').astype(str) |
|
if header2.notna().all() and not header2_str.str.startswith('Unnamed').any(): |
|
days = header2.tolist() |
|
data_start = 2 |
|
else: |
|
|
|
days = [] |
|
last = None |
|
for val in header1: |
|
if pd.isna(val) or str(val).startswith('Unnamed'): |
|
days.append(last) |
|
else: |
|
last = str(val) |
|
days.append(last) |
|
data_start = 1 |
|
|
|
|
|
df = pd.read_excel( |
|
file_path, |
|
header=data_start, |
|
index_col=0, |
|
usecols=[0] + list(range(1, len(days) + 1)) |
|
) |
|
df.columns = [str(day) for day in days] |
|
|
|
|
|
day_cols = list(df.columns) |
|
|
|
|
|
assignments = {} |
|
if direction == 'A to B': |
|
|
|
for model in df.index.astype(str): |
|
for day in day_cols: |
|
cell = df.at[model, day] |
|
if pd.isna(cell): |
|
continue |
|
for texter in str(cell).split(','): |
|
texter = texter.strip() |
|
if not texter or texter.lower() in ['nan', 'none', '']: |
|
continue |
|
assignments.setdefault(texter, {d: [] for d in day_cols}) |
|
assignments[texter][day].append(model) |
|
|
|
if not assignments: |
|
result = pd.DataFrame(columns=day_cols) |
|
first_col_name = 'Texter' |
|
else: |
|
index = sorted(assignments.keys()) |
|
result = pd.DataFrame(index=index, columns=day_cols) |
|
first_col_name = 'Texter' |
|
for texter, days_map in assignments.items(): |
|
for day in day_cols: |
|
models = days_map.get(day, []) |
|
result.at[texter, day] = ', '.join(models) if models else 'OFF' |
|
else: |
|
|
|
for texter in df.index.astype(str): |
|
for day in day_cols: |
|
cell = df.at[texter, day] |
|
if pd.isna(cell): |
|
continue |
|
for model in str(cell).split(','): |
|
model = model.strip() |
|
if not model or model.lower() in ['nan', 'none', '']: |
|
continue |
|
assignments.setdefault(model, {d: [] for d in day_cols}) |
|
assignments[model][day].append(texter) |
|
|
|
if not assignments: |
|
result = pd.DataFrame(columns=day_cols) |
|
first_col_name = 'Model' |
|
else: |
|
index = sorted(assignments.keys()) |
|
result = pd.DataFrame(index=index, columns=day_cols) |
|
first_col_name = 'Model' |
|
for model, days_map in assignments.items(): |
|
for day in day_cols: |
|
texters = days_map.get(day, []) |
|
result.at[model, day] = ', '.join(texters) if texters else 'OFF' |
|
|
|
|
|
result.index.name = None |
|
result.columns.name = None |
|
|
|
|
|
display_df = result.reset_index().rename(columns={'index': first_col_name}) |
|
|
|
|
|
result_clean = result.copy().fillna('OFF') |
|
|
|
|
|
for col in result_clean.columns: |
|
result_clean[col] = result_clean[col].astype(str) |
|
|
|
|
|
download_df = result_clean.reset_index().rename(columns={'index': first_col_name}) |
|
|
|
|
|
csv_buffer = io.StringIO() |
|
download_df.to_csv(csv_buffer, index=False) |
|
csv_content = csv_buffer.getvalue() |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"converted_schedule_{timestamp}.csv" |
|
|
|
|
|
return display_df, (csv_content.encode('utf-8'), filename) |
|
|
|
except Exception as e: |
|
error_msg = f"Error processing file: {str(e)}" |
|
print(f"DEBUG: {error_msg}") |
|
error_df = pd.DataFrame({'Error': [error_msg]}) |
|
return error_df, None |
|
|
|
|
|
def process_and_download(file_path, direction): |
|
display_result, download_data = convert_schedule(file_path, direction) |
|
|
|
if download_data is None: |
|
return display_result, None |
|
|
|
|
|
import tempfile |
|
import os |
|
|
|
excel_content, filename = download_data |
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
temp_path = os.path.join(temp_dir, filename) |
|
|
|
|
|
with open(temp_path, 'w', encoding='utf-8') as f: |
|
f.write(excel_content.decode('utf-8')) |
|
|
|
return display_result, temp_path |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_and_download, |
|
inputs=[ |
|
gr.File( |
|
label='Upload Weekly Schedule (.xlsx)', |
|
file_count='single', |
|
file_types=['.xlsx', '.xls'] |
|
), |
|
gr.Radio( |
|
['A to B', 'B to A'], |
|
label='Convert Direction', |
|
value='A to B', |
|
info='A to B: Models→Texters, B to A: Texters→Models' |
|
) |
|
], |
|
outputs=[ |
|
gr.Dataframe(label='Converted Schedule (Preview)', wrap=True), |
|
gr.File(label='Download Converted Schedule (.csv)') |
|
], |
|
title='🔄 7-Day Schedule Converter', |
|
description=( |
|
'**How to use:**\n' |
|
'1. Upload your Excel file with a 7-day schedule\n' |
|
'2. Choose conversion direction\n' |
|
'3. Preview the result and download the converted file\n\n' |
|
'*Supports merged headers and handles Models ↔ Texters conversion*' |
|
), |
|
flagging_mode='never' |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch( |
|
server_name='0.0.0.0', |
|
server_port=7860, |
|
share=False |
|
) |