fschwartzer's picture
Update app.py
e69523f verified
raw
history blame
5.13 kB
import streamlit as st
import pandas as pd
from transformers import BartForConditionalGeneration, TapexTokenizer, T5ForConditionalGeneration, T5Tokenizer
from prophet import Prophet
# Abrindo e lendo o arquivo CSS
with open("style.css", "r") as css:
css_style = css.read()
# Markdown combinado com a importação da fonte e o HTML
html_content = f"""
<style>
{css_style}
@import url('https://fonts.googleapis.com/css2?family=Kanit:wght@700&display=swap');
</style>
<div style='display: flex; flex-direction: column; align-items: flex-start;'>
<div style='display: flex; align-items: center;'>
<div style='width: 20px; height: 4px; background-color: green; margin-right: 1px;'></div>
<div style='width: 20px; height: 4px; background-color: red; margin-right: 1px;'></div>
<div style='width: 20px; height: 4px; background-color: yellow; margin-right: 20px;'></div>
<span style='font-size: 45px; font-weight: normal; font-family: "Kanit", sans-serif;'>NOSTRADAMUS</span>
</div>
<div style='text-align: left; width: 100%;'>
<span style='font-size: 20px; font-weight: normal; color: #333; font-family: "Kanit", sans-serif'>
Meta Prophet + Microsoft TAPEX</span>
</div>
</div>
"""
# Aplicar o markdown combinado no Streamlit
st.markdown(html_content, unsafe_allow_html=True)
# Inicialização de variáveis de estado
if 'all_anomalies' not in st.session_state:
st.session_state['all_anomalies'] = pd.DataFrame()
if 'history' not in st.session_state:
st.session_state['history'] = []
# Carregar os modelos de tradução e TAPEX
pt_en_translator = T5ForConditionalGeneration.from_pretrained("unicamp-dl/translation-pt-en-t5")
en_pt_translator = T5ForConditionalGeneration.from_pretrained("unicamp-dl/translation-en-pt-t5")
tapex_model = BartForConditionalGeneration.from_pretrained("microsoft/tapex-large-finetuned-wtq")
tapex_tokenizer = TapexTokenizer.from_pretrained("microsoft/tapex-large-finetuned-wtq")
tokenizer = T5Tokenizer.from_pretrained("unicamp-dl/translation-pt-en-t5")
def translate(text, model, tokenizer, source_lang="pt", target_lang="en"):
input_ids = tokenizer.encode(text, return_tensors="pt", add_special_tokens=True)
outputs = model.generate(input_ids)
translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
return translated_text
def response(user_question, table_data):
question_en = translate(user_question, pt_en_translator, tokenizer, source_lang="pt", target_lang="en")
encoding = tapex_tokenizer(table=table_data, query=[question_en], padding=True, return_tensors="pt", truncation=True)
outputs = tapex_model.generate(**encoding)
response_en = tapex_tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
response_pt = translate(response_en, en_pt_translator, tokenizer, source_lang="en", target_lang="pt")
return response_pt
def load_data(uploaded_file):
if uploaded_file.name.endswith('.csv'):
df = pd.read_csv(uploaded_file, quotechar='"', encoding='utf-8')
elif uploaded_file.name.endswith('.xlsx'):
df = pd.read_excel(uploaded_file)
return df
def preprocess_data(df):
# Implementar as etapas de pré-processamento aqui
return df
def apply_prophet(df_clean):
if df_clean.empty:
st.error("DataFrame está vazio após o pré-processamento.")
return pd.DataFrame()
# Criar um DataFrame vazio para armazenar todas as anomalias
all_anomalies = pd.DataFrame()
# Processar cada linha no DataFrame
for index, row in df_clean.iterrows():
# Implementar o processamento com o modelo Prophet aqui
pass # Substituir pass pelo seu código real
# Renomear colunas e aplicar filtros
return all_anomalies
# Interface para carregar arquivo
uploaded_file = st.file_uploader("Carregue um arquivo CSV ou XLSX", type=['csv', 'xlsx'])
if uploaded_file:
df = load_data(uploaded_file)
df_clean = preprocess_data(df)
if df_clean.empty:
st.warning("Não há dados válidos para processar.")
else:
with st.spinner('Aplicando modelo de série temporal...'):
all_anomalies = apply_prophet(df_clean)
st.session_state['all_anomalies'] = all_anomalies
# Interface para perguntas do usuário
user_question = st.text_input("Escreva sua questão aqui:", "")
if user_question:
if 'all_anomalies' in st.session_state and not st.session_state['all_anomalies'].empty:
bot_response = response(user_question, st.session_state['all_anomalies'])
st.session_state['history'].append(('👤', user_question))
st.session_state['history'].append(('🤖', bot_response))
else:
st.warning("Ainda não há dados de anomalias para responder a pergunta.")
# Mostrar histórico de conversa
for sender, message in st.session_state['history']:
if sender == '👤':
st.markdown(f"**👤 {message}**")
elif sender == '🤖':
st.markdown(f"**🤖 {message}**", unsafe_allow_html=True)
# Botão para limpar histórico
if st.button("Limpar histórico"):
st.session_state['history'] = []