|
from io import BytesIO |
|
import os |
|
from typing import List, Tuple |
|
from openpyxl import Workbook |
|
from openpyxl.styles import Font |
|
from openpyxl.styles import Alignment |
|
from openpyxl.styles import numbers |
|
from openpyxl.styles.borders import Border, Side |
|
import streamlit as st |
|
import pandas as pd |
|
import numpy as np |
|
from data.cli_dropbox import dropbox_download_file, dropbox_upload_bytefile |
|
|
|
from .dataframes import complete_affaires, complete_arrets, complete_intervenants, complete_supplements, complete_vehicules, merge_clients, merge_affaires, merge_intervenants_affaires,cast_specifics_to_str |
|
|
|
def check_paths(paths: List[str]) -> bool: |
|
files = [] |
|
for path in paths: |
|
if not os.path.exists(path) or not os.path.isfile(path): |
|
files.append(path) |
|
return files |
|
|
|
def load_excels(datapath: str, excel_sources: List[dict]) -> pd.DataFrame: |
|
data = {} |
|
for key, value in excel_sources.items(): |
|
data[key] = pd.read_excel(os.path.join(datapath, excel_sources[key]['path']), |
|
sheet_name=excel_sources[key]['sheet_name'], |
|
usecols=excel_sources[key]['columns'], |
|
nrows=excel_sources[key]['rows'], |
|
skiprows=excel_sources[key]['skiprows'], |
|
) |
|
return data |
|
|
|
@st.cache_data |
|
def load_transform_data(datapath: str, excel_sources: List[dict]) -> pd.DataFrame: |
|
not_files = check_paths([os.path.join(datapath, excel['path']) for excel in excel_sources.values()]) |
|
if len(not_files): |
|
st.error(f'Erreur: une partie de la base de données n\'est pas accessible. {not_files}') |
|
return |
|
|
|
data = load_excels(datapath, excel_sources) |
|
merge_clients(data) |
|
merge_affaires(data) |
|
merge_intervenants_affaires(data) |
|
|
|
complete_vehicules(data) |
|
complete_supplements(data) |
|
complete_intervenants(data) |
|
complete_affaires(data) |
|
complete_arrets(data) |
|
cast_specifics_to_str(data) |
|
return data |
|
|
|
|
|
def filter_multiple_conditions_data(df, filters): |
|
filtered_df = df[df[filters.keys()].isin(filters.values()).all(axis=1)] |
|
return filtered_df |
|
|
|
def draw_border(sheet, start_cell: Tuple[int, int], end_cell: Tuple[int, int]): |
|
|
|
border_style = Border(left=Side(style='thin'), |
|
right=Side(style='thin'), |
|
top=Side(style='thin'), |
|
bottom=Side(style='thin')) |
|
|
|
|
|
start_cell = sheet.cell(row=start_cell[0], column=start_cell[1]) |
|
end_cell = sheet.cell(row=end_cell[0] , column=end_cell[1]) |
|
cell_range = '{}:{}'.format(start_cell.coordinate, end_cell.coordinate) |
|
|
|
|
|
for row in sheet[cell_range]: |
|
for cell in row: |
|
cell.border = border_style |
|
|
|
def get_fit_totals(dataframe): |
|
column_sums = {} |
|
for column in dataframe.columns: |
|
if dataframe[column].dtype in [int, float] and not np.isnan(dataframe[column]).all(): |
|
column_sums[column] = dataframe[column].sum() |
|
column_sums['rows'] = dataframe.shape[0] |
|
column_sums['worked_hours'] = column_sums['H.\njour'] + column_sums['H.\nnuit (1)'] |
|
return column_sums |
|
|
|
def load_fit(datapath: str, intervenant: str, year: str, month: str, week: str): |
|
filename = f'{intervenant}_{year}_{month}_{week}_FIT.xlsx' |
|
if dropbox_download_file(f'/SEC_IND_GTP2023_OUTPUT/FIT/{intervenant}/{year}/{month}/{week}/{filename}', os.path.join(datapath, filename)): |
|
|
|
if os.path.exists(os.path.join(datapath, filename)) and os.path.isfile(os.path.join(datapath, filename)): |
|
data = pd.read_excel(os.path.join(datapath, filename), sheet_name='Sheet', skiprows=6, nrows=10) |
|
data.dropna(axis=0, how='all', inplace=True) |
|
totals = get_fit_totals(data) |
|
|
|
if 'fit' not in st.session_state.keys(): |
|
st.session_state['fit'] = {} |
|
|
|
if intervenant not in st.session_state['fit'].keys(): |
|
st.session_state['fit'][intervenant] = {} |
|
if year not in st.session_state['fit'][intervenant].keys(): |
|
st.session_state['fit'][intervenant][year] = {} |
|
if month not in st.session_state['fit'][intervenant][year].keys(): |
|
st.session_state['fit'][intervenant][year][month] = {} |
|
if week not in st.session_state['fit'][intervenant][year][month].keys(): |
|
st.session_state['fit'][intervenant][year][month][week] = {} |
|
st.session_state['fit'][intervenant][year][month][week] = { |
|
'data': data, |
|
'totals': totals |
|
} |
|
|
|
return data |
|
print('error loading fit') |
|
return None |
|
|
|
async def update_society_fit(dropbox_datapath: str, form: dict): |
|
society = form['prestataire'] |
|
year = form['year'] |
|
month = form['month'] |
|
week = form['week'] |
|
dropbox_path = f'{dropbox_datapath}/SOCIETE/{society}/{year}/{month}/{week}' |
|
|
|
filename = f'{society}_{year}_{month}_{week}_FIT.csv' |
|
fit_df = pd.DataFrame([form]) |
|
|
|
fit = dropbox_download_file(os.path.join(dropbox_path, filename), '', False) |
|
if fit: |
|
fit = pd.read_csv(BytesIO(fit), index_col=0) |
|
fit_df = pd.concat([fit, fit_df], ignore_index=True) |
|
|
|
csv_data = BytesIO() |
|
fit_df.to_csv(csv_data, index = False) |
|
dropbox_upload_bytefile(dropbox_data_path=dropbox_path, dropbox_file_name=filename, bytes=csv_data) |
|
return fit_df |
|
|
|
|
|
async def update_society_payroll(dropbox_datapath: str, form: dict): |
|
prestataire = form['Prestataire'] |
|
fournisseur = form['Fournisseur'] |
|
year = form['year'] |
|
month = form['month'] |
|
dropbox_path = f'{dropbox_datapath}/PRESTATIONS/{prestataire}/{year}/{month}' |
|
|
|
filename = f'{prestataire}_{fournisseur}_{year}_{month}_PRESTATIONS_CROISEES.xlsx' |
|
payroll_df = pd.DataFrame([form]) |
|
|
|
payroll = dropbox_download_file(os.path.join(dropbox_path, filename), '', False) |
|
if payroll: |
|
payroll = pd.read_excel(BytesIO(payroll)) |
|
payroll_df = pd.concat([payroll, payroll_df], ignore_index=True) |
|
|
|
excel_data = BytesIO() |
|
payroll_df.to_excel(excel_data, index = False) |
|
dropbox_upload_bytefile(dropbox_data_path=dropbox_path, dropbox_file_name=filename, bytes=excel_data) |
|
return payroll_df |
|
|
|
async def update_historical_week(dropbox_datapath: str, form: dict): |
|
intervenant = form['intervenant'] |
|
year = form['year'] |
|
month = form['month'] |
|
week = form['week'] |
|
dropbox_path = f'{dropbox_datapath}/FIT/{intervenant}/{year}/{month}/{week}' |
|
|
|
historic_df = pd.DataFrame([form]) |
|
|
|
historic = dropbox_download_file(dropbox_path + '/historique.xlsx', '', False) |
|
if historic: |
|
historic = pd.read_excel(historic) |
|
historic_df = pd.concat([historic, historic_df], ignore_index=True) |
|
|
|
excel_data = BytesIO() |
|
historic_df.to_excel(excel_data, index = False) |
|
dropbox_upload_bytefile(dropbox_data_path=dropbox_path, dropbox_file_name='historique.xlsx', bytes=excel_data) |
|
return historic_df |
|
|
|
async def update_monthly_payroll(dropbox_datapath: str, payroll_dict: dict, year: str, month: str, week: str): |
|
|
|
dropbox_path = f'{dropbox_datapath}/PAYES' |
|
|
|
nom = payroll_dict['Nom'] |
|
prenom = payroll_dict['Prénom'] |
|
payroll_df = pd.DataFrame([payroll_dict]) |
|
|
|
payroll = dropbox_download_file(dropbox_path + f'/tableau_prepaye_{year}_{month}.xlsx', '', False) |
|
if payroll: |
|
payroll = pd.read_excel(payroll) |
|
intervenant_rows = payroll[(payroll['Nom'] == nom) & (payroll['Prénom'] == prenom)] |
|
|
|
if len(intervenant_rows): |
|
current_week_row = payroll[(payroll['Nom'] == nom) & (payroll_df['Prénom'] == prenom) & (payroll['Semaine'] == f'{year}-s{week}')] |
|
|
|
if len(current_week_row): |
|
payroll.iloc[current_week_row.index] = payroll_df.loc[0] |
|
payroll_df = payroll |
|
else: |
|
payroll_df = pd.concat([payroll, payroll_df], ignore_index=True) |
|
else: |
|
payroll_df = pd.concat([payroll, payroll_df], ignore_index=True) |
|
rows_for_total = payroll_df[(payroll_df['Nom'] == nom) & (payroll_df['Prénom'] == prenom) & (payroll_df['Semaine'].str.contains(f'{year}-s'))] |
|
total = pd.DataFrame([rows_for_total.drop(columns=['Nom', 'Prénom', 'Semaine']).sum()]) |
|
total['Nom'] = nom |
|
total['Prénom'] = prenom |
|
total['Semaine'] = 'TOTAL' |
|
all_but_total = payroll_df[~((payroll_df['Nom'] == nom) & (payroll_df['Prénom'] == prenom) & (payroll_df['Semaine'].str.contains('TOTAL')))] |
|
|
|
payroll_df = pd.concat([total, all_but_total], ignore_index=True, axis = 0) |
|
|
|
payroll_df = payroll_df.sort_values(by=['Nom', 'Prénom', 'Semaine']) |
|
column_order = payroll_df.columns[-3:].tolist() + payroll_df.columns[:-3].tolist() |
|
payroll_df = payroll_df[column_order] |
|
excel_data = BytesIO() |
|
payroll_df.to_excel(excel_data, index = False) |
|
dropbox_upload_bytefile(dropbox_data_path=dropbox_path, dropbox_file_name=f'tableau_prepaye_{year}_{month}.xlsx', bytes=excel_data) |
|
return payroll_df |
|
|
|
def write_excel_fit(datapath: str, filename: str, data, starting_row = 7): |
|
workbook = Workbook() |
|
sheet = workbook.active |
|
|
|
|
|
sheet.column_dimensions['A'].width = 60 |
|
sheet.column_dimensions['B'].width = 40 |
|
sheet.column_dimensions['C'].width = 80 |
|
sheet.column_dimensions['D'].width = 40 |
|
sheet.column_dimensions['E'].width = 20 |
|
sheet.column_dimensions['K'].width = 60 |
|
sheet.column_dimensions['L'].width = 40 |
|
sheet.column_dimensions['M'].width = 40 |
|
sheet.column_dimensions['O'].width = 20 |
|
sheet.column_dimensions['P'].width = 40 |
|
sheet.column_dimensions['Q'].width = 40 |
|
sheet.column_dimensions['R'].width = 40 |
|
sheet.column_dimensions['S'].width = 20 |
|
sheet.row_dimensions[29].height = 30 |
|
sheet.row_dimensions[31].height = 40 |
|
|
|
sheet['A1'] = 'SECMI' |
|
|
|
sheet['D1'] = 'FICHE D\'INTERVENTION ET DE TEMPS (FIT)' |
|
|
|
sheet['A3'] = f'Intervenant: {data["intervenant"]}' |
|
draw_border(sheet, (3, 1), (3, 1)) |
|
|
|
sheet.merge_cells('M1:N1') |
|
sheet['M1'] = f'Année: {data["year"]}' |
|
draw_border(sheet, (1, 13), (1, 13)) |
|
|
|
sheet.merge_cells('M2:N2') |
|
sheet['M2'] = f'Semaine: {data["week"]}' |
|
draw_border(sheet, (2, 13), (2, 13)) |
|
|
|
|
|
sheet['A18'] = '(1) travail effectué entre 21h00 et 06h00' |
|
|
|
sheet.merge_cells('A19:S22') |
|
sheet['A19'] = 'Commentaires SECMI:' |
|
draw_border(sheet, (19, 1), (22, 19)) |
|
|
|
sheet.merge_cells('A23:S26') |
|
sheet['A23'] = 'Commentaires Client:' |
|
draw_border(sheet, (23, 1), (26, 19)) |
|
|
|
sheet.merge_cells('A30:D30') |
|
sheet['A30'] = 'Signature client:' |
|
draw_border(sheet, (30, 1), (30, 4)) |
|
sheet.merge_cells('A31:D32') |
|
draw_border(sheet, (31, 1), (32, 4)) |
|
|
|
|
|
sheet['E30'] = 'Note\n(de 0 à 10)' |
|
draw_border(sheet, (30, 5), (30, 5)) |
|
sheet.merge_cells('E31:E32') |
|
draw_border(sheet, (31, 5), (32, 5)) |
|
|
|
sheet['L30'] = 'Signature chargé d\'affaire:' |
|
draw_border(sheet, (30, 12), (30, 12)) |
|
sheet.merge_cells('L31:L32') |
|
draw_border(sheet, (31, 12), (32, 12)) |
|
|
|
sheet['M30'] = 'Signature intervenant:' |
|
draw_border(sheet, (30, 13), (30, 13)) |
|
sheet.merge_cells('M31:M32') |
|
draw_border(sheet, (31, 13), (32, 13)) |
|
|
|
|
|
sheet.merge_cells('A33:T33') |
|
sheet.merge_cells('A34:T34') |
|
draw_border(sheet, (33, 1), (34, 19)) |
|
sheet['A33'] = 'Service Administratif' |
|
sheet['A34'] = 'Tel: +33 6 02 14 55 16 - Email : [email protected]' |
|
|
|
|
|
draw_border(sheet, (starting_row, 1), (starting_row + 10, 19)) |
|
|
|
draw_border(sheet, (starting_row - 2, 4), (starting_row -2, 19)) |
|
|
|
|
|
sheet.cell(row=starting_row - 2, column=4, value='TOTAUX (en heure)') |
|
|
|
header = list(data['data'].keys()) |
|
for col_i, key in enumerate(header): |
|
sheet.cell(row=starting_row, column=col_i + 1, value=key) |
|
if key in data['totals'].keys(): |
|
sheet.cell(row=starting_row - 2, column=col_i + 1, value=data['totals'][key]) |
|
|
|
for cell in sheet[starting_row - 2]: |
|
cell.font = Font(bold=True) |
|
for cell in sheet[starting_row]: |
|
cell.font = Font(bold=True) |
|
|
|
starting_row += 1 |
|
for col_i, key in enumerate(data['data'].keys()): |
|
values = data['data'][key] |
|
for j, value in enumerate(values): |
|
|
|
sheet.cell(row=starting_row + j, column=col_i + 1, value=value) |
|
|
|
alignment = Alignment(horizontal='center', vertical='center') |
|
|
|
decimal_format = numbers.FORMAT_NUMBER_00 |
|
|
|
for row in sheet.iter_rows(): |
|
for cell in row: |
|
cell.alignment = alignment |
|
cell.number_format = decimal_format |
|
|
|
sheet['A1'].font = Font(bold=True, underline='single', size=11) |
|
sheet['D1'].font = Font(bold=True, underline='single', size=11) |
|
sheet['A19'].font = Font(bold=True, underline='single') |
|
sheet['A19'].alignment = Alignment(horizontal='left', vertical='top') |
|
sheet['A23'].font = Font(bold=True, underline='single') |
|
sheet['A23'].alignment = Alignment(horizontal='left', vertical='top') |
|
sheet['A33'].font = Font(bold=True) |
|
sheet['A33'].alignment = Alignment(horizontal='center') |
|
sheet['A34'].font = Font(bold=True) |
|
sheet['A34'].alignment = Alignment(horizontal='center') |
|
|
|
workbook.save(os.path.join(datapath, filename)) |
|
|