Spaces:
Sleeping
Sleeping
import streamlit as st | |
from utils import set_algorithm_name, get_pdf_iframe, to_csv_file | |
from menu import display_pages_menu | |
from country_by_country.utils.constants import JURIDICTIONS | |
from Levenshtein import distance | |
import sys | |
import logging | |
import pandas as pd | |
import numpy as np | |
import re | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") | |
def check_last_cell_sum(column): | |
last_cell = column.iloc[-2] # Get the last cell value | |
result = [""] * (len(column.tolist()) - 2) | |
try: | |
sum_except_last = column.iloc[ | |
:-2 | |
].sum() # Calculate the sum of all values except the last one | |
result.append( | |
"background-color: red" | |
if float(last_cell) != sum_except_last | |
else "background-color: green" | |
) | |
result.append("") | |
return result | |
except Exception: | |
result.append("background-color: red") | |
result.append("") | |
return result | |
def column_sum(column): | |
try: | |
return column.iloc[:-1].sum() | |
except Exception: | |
return None | |
def style_negative(v, props=""): | |
try: | |
return props if float(v) < 0 else None | |
except Exception: | |
return None | |
def convert_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame: | |
for column_name in dataframe.columns: | |
try: | |
dataframe[column_name] = dataframe[column_name].astype(float) | |
except Exception as e: | |
pass | |
return dataframe | |
special_characters = "#&()[]@©€$'R¹³²" | |
def style_symbol(v, props=""): | |
try: | |
return props if any(c in special_characters for c in v) else None | |
except Exception: | |
return None | |
def style_specific_cells(dataframe: pd.DataFrame, index_list: list): | |
color = "background-color: lightgreen" | |
df1 = pd.DataFrame("", index=dataframe.index, columns=dataframe.columns) | |
for index in index_list: | |
df1.iloc[index, 0] = color | |
return df1 | |
def most_similar_string(input_string: str) -> str: | |
def update_min(string, min_distance, most_similar, input_string=input_string): | |
dist = distance(input_string, string) | |
if dist < min_distance: | |
return dist, string | |
else: | |
return min_distance, most_similar | |
if input_string == None: | |
return "None" | |
min_distance = float("inf") | |
most_similar = None | |
for string in JURIDICTIONS.keys(): | |
# Compute the distance with the juridiction name | |
min_distance, most_similar = update_min(string, min_distance, most_similar) | |
# Compute the distance with the Alpha-2 code | |
min_distance, most_similar = update_min( | |
JURIDICTIONS[string]["Alpha-2 code"], min_distance, most_similar | |
) | |
# Compute the distance with the Alpha-3 code | |
min_distance, most_similar = update_min( | |
JURIDICTIONS[string]["Alpha-3 code"], min_distance, most_similar | |
) | |
return most_similar | |
def validate(data: pd.DataFrame) -> None: | |
st.session_state.tables[st.session_state["algorithm_name"]] = data | |
def update_df_csv_to_save() -> None: | |
for idx, change in st.session_state.changes["edited_rows"].items(): | |
for label, value in change.items(): | |
st.session_state.tables[st.session_state["algorithm_name"]].loc[ | |
idx, label | |
] = value | |
st.session_state["df_csv_to_save"] = to_csv_file( | |
st.session_state.tables[st.session_state["algorithm_name"]], | |
) | |
st.set_page_config(layout="wide", page_title="Tables customization") # page_icon="📈" | |
st.title("Country by Country Tax Reporting analysis : Tables") | |
st.subheader( | |
"This page will allow you to clean the extracted tables", | |
) | |
display_pages_menu() | |
if ( | |
st.session_state.get("validate_selected_pages", False) | |
and "pdf_after_page_validation" in st.session_state | |
): | |
col3, col4 = st.columns(2) | |
with col3: | |
st.markdown( | |
get_pdf_iframe(st.session_state["pdf_after_page_validation"]), | |
unsafe_allow_html=True, | |
) | |
with col4: | |
index = ( | |
list(st.session_state.tables.keys()).index( | |
st.session_state["algorithm_name"], | |
) | |
if "algorithm_name" in st.session_state | |
else 0 | |
) | |
st.session_state["algorithm_name"] = st.selectbox( | |
"Choose the extracted table you want to see", | |
list(st.session_state.tables.keys()), | |
index=index, | |
on_change=set_algorithm_name, | |
args=("selectbox2",), | |
key="selectbox2", | |
) | |
if "algorithm_name" in st.session_state: | |
st.session_state["df_csv_to_save"] = to_csv_file( | |
st.session_state.tables[st.session_state["algorithm_name"]] | |
) | |
st.download_button( | |
label="📥 Download Current Table", | |
data=( | |
st.session_state["df_csv_to_save"] | |
if "df_csv_to_save" in st.session_state | |
else None | |
), | |
disabled="df_csv_to_save" not in st.session_state, | |
file_name=( | |
f"{st.session_state['original_pdf_name']}.csv" | |
if "original_pdf_name" in st.session_state | |
else "table.csv" | |
), | |
) | |
st.session_state.tables[st.session_state["algorithm_name"]] = st.data_editor( | |
st.session_state.tables[st.session_state["algorithm_name"]], | |
num_rows="dynamic", | |
on_change=update_df_csv_to_save, | |
key="changes", | |
width=800, | |
height=900, | |
) | |
st.subheader( | |
"Filters : ", | |
) | |
col7, col8, col9 = st.columns([1, 1, 1]) | |
with col7: | |
total = st.checkbox( | |
"Calculate the Total of each columns, excluding the last row", value=True | |
) | |
country = st.checkbox("Activate the country filter", value=True) | |
decimal_cleanup = st.checkbox("Apply decimal cleanup") | |
with col8: | |
negativ = st.checkbox( | |
"Show the negative numbers, for each columns detected as a numerical type" | |
) | |
with st.container(border=True): | |
cleanup_rules = st.checkbox( | |
"Apply clean up rules : (number) mean a negative number, o-> 0, homogenization NA, ect ect " | |
) | |
if cleanup_rules: | |
cleanup_excluded = st.multiselect( | |
"exclude from filtering", | |
st.session_state.tables[st.session_state["algorithm_name"]].columns, | |
key="cleanup", | |
) | |
with col9: | |
with st.container(border=True): | |
symbol = st.checkbox( | |
"Show the cells that contain a special symbol : " + special_characters, | |
value=True, | |
) | |
remove_symbols = st.checkbox( | |
"Remove the special symbols on numeric columns" | |
) | |
if remove_symbols: | |
rm_symbol_excluded = st.multiselect( | |
"exclude from filtering", | |
st.session_state.tables[st.session_state["algorithm_name"]].columns, | |
key="rm_symbol", | |
) | |
dataframe = st.session_state.tables[st.session_state["algorithm_name"]].copy() | |
dataframe = convert_dataframe(dataframe) | |
if country: | |
dataframe.iloc[:-2, 0] = dataframe.iloc[:-2, 0].apply( | |
lambda x: most_similar_string(x) | |
) | |
if remove_symbols: | |
pattern = "[" + re.escape(special_characters) + "]" | |
for column, dtype in dataframe.dtypes.items(): | |
if column not in rm_symbol_excluded: | |
dataframe[column] = dataframe[column].apply( | |
lambda x: re.sub(pattern, "", str(x)) | |
) | |
dataframe = convert_dataframe(dataframe) | |
if cleanup_rules: | |
for column, dtype in dataframe.dtypes.items(): | |
if column not in cleanup_excluded: | |
# this is a code translated by chatgpt from Kane's R code | |
dataframe[column] = dataframe[column].replace( | |
{"^-$|^$|^ $|^N/I$|^- -$|^N/A$|^n\\.a\\.$": None}, regex=True | |
) | |
dataframe[column] = dataframe[column].replace( | |
{"^o$|^O$|^\\(o\\)$|^\\(O\\)$|^\\(0\\)$": "0"}, regex=True | |
) | |
if dtype == object: | |
dataframe[column] = dataframe[column].str.replace( | |
"(\\(.*\\))[:alnum:]+", "\\1", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"\\([:alnum:]+$|\\)[:alnum:]+$", "", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"\\([:alpha:]+\\)", "", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"(.+)\\(.+\\)$", "\\1", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"^\\(-(.*)\\)", "-\\1", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"^\\((.*)\\)", "-\\1", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"\\(.*\\)| |\\*|^-$|\\[.*\\]|^-€$", "", regex=True | |
) | |
dataframe = convert_dataframe(dataframe) | |
if decimal_cleanup: | |
decimal_separator = ( | |
st.session_state["metadata"]["separator"] | |
if st.session_state["metadata"]["separator"] | |
else "," | |
) | |
for column, dtype in dataframe.dtypes.items(): | |
if dtype == object: | |
if decimal_separator == ",": | |
dataframe[column] = dataframe[column].str.replace( | |
"\\.", "", regex=False | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
",", ".", regex=False | |
) | |
else: | |
dataframe[column] = dataframe[column].str.replace( | |
",(.{1,2})$", ".\\1", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
"\\.([0-9]{3})", ",\\1", regex=True | |
) | |
dataframe[column] = dataframe[column].str.replace( | |
",", "", regex=False | |
) | |
if total: | |
dataframe = convert_dataframe(dataframe) | |
new_row = dataframe.apply(column_sum, axis=0) | |
new_row.iloc[0] = "Total Calculated" | |
dataframe.loc[-1] = new_row.transpose() | |
dataframe_styler = dataframe.style | |
if total: | |
dataframe_styler = dataframe_styler.apply( | |
check_last_cell_sum, | |
subset=pd.IndexSlice[:, dataframe.columns[1:]], | |
axis=0, | |
) | |
if negativ: | |
dataframe_styler = dataframe_styler.map( | |
style_negative, | |
props="color:red;", | |
) | |
if symbol: | |
dataframe_styler = dataframe_styler.map( | |
style_symbol, | |
props="color:red;", | |
) | |
if country: | |
index_list = [] | |
for index, (val1, val2) in enumerate( | |
zip( | |
dataframe.iloc[:-1, 0], | |
st.session_state.tables[st.session_state["algorithm_name"]].iloc[ | |
:-1, 0 | |
], | |
) | |
): | |
if val1 != val2: | |
index_list.append(index) | |
dataframe_styler = dataframe_styler.apply( | |
lambda x: style_specific_cells(x, index_list), axis=None | |
) | |
st.dataframe(dataframe_styler, use_container_width=True, height=1000) | |
st.button( | |
"Save the table above", | |
on_click=validate, | |
args=(dataframe_styler.data,), | |
) | |