|
import streamlit as st |
|
import requests |
|
from PIL import Image |
|
import base64 |
|
import re |
|
import streamlit_authenticator as stauth |
|
from streamlit_feedback import streamlit_feedback |
|
import os |
|
import yaml |
|
from yaml.loader import SafeLoader |
|
from pathlib import Path |
|
from drive_search import search_file_in_drive |
|
from datetime import datetime |
|
import json |
|
|
|
class ChatbotApp: |
|
def __init__(self): |
|
|
|
st.set_page_config(page_title="Sicoob Chatbot 🤖", page_icon="logos/sicoob-ico.ico", layout="wide") |
|
|
|
self.backend_url = "http://localhost:5001/chat" |
|
self.title = "Sicoob Chatbot" |
|
self.feedback_dir = os.path.join(os.getcwd(), "./feedback") |
|
self.description = "Este assistente virtual pode te ajudar com informações sobre carômetros da Sicoob." |
|
|
|
self.style_dir = Path("./logos/styles") |
|
self.load_styles() |
|
st.session_state.first = False |
|
|
|
if "theme" not in st.session_state: |
|
st.session_state.theme = "light" |
|
|
|
self.configyml = os.path.join(os.getcwd(), "./files/config.yaml") |
|
|
|
def load_styles(self): |
|
try: |
|
self.base_style = (self.style_dir / "base.css").read_text() |
|
self.light_style = (self.style_dir / "light.css").read_text() |
|
self.dark_style = (self.style_dir / "dark.css").read_text() |
|
except FileNotFoundError as e: |
|
st.error(f"Error loading styles: {e}") |
|
self.base_style = "" |
|
self.light_style = "" |
|
self.dark_style = "" |
|
|
|
@st.dialog("📄 Atualizações Sicoob Chatbot", width="small") |
|
def changelog_user(self): |
|
with open("./logos/ChangelogUser.md", encoding="utf-8") as f: |
|
st.markdown(f"{f.read()}", unsafe_allow_html=True) |
|
|
|
def get_current_style(self): |
|
theme_style = self.dark_style if st.session_state.theme == "dark" else self.light_style |
|
return f"<style>{self.base_style}{theme_style}</style>" |
|
|
|
|
|
def save_feedback(feedback_type, user_input, assistant_response): |
|
feedback_dir = os.path.join(os.getcwd(), "feedback") |
|
|
|
if not os.path.exists(feedback_dir): |
|
os.makedirs(feedback_dir) |
|
|
|
filename = os.path.join(feedback_dir, f"feedback_{feedback_type}.json") |
|
|
|
try: |
|
data = [] |
|
if os.path.exists(filename): |
|
with open(filename, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
|
|
data.append({ |
|
"user_input": user_input, |
|
"assistant_response": assistant_response, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
with open(filename, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
|
st.toast(f"Feedback {feedback_type} salvo com sucesso!", icon="✅") |
|
except Exception as e: |
|
st.toast(f"Erro ao salvar feedback: {str(e)}", icon="❌") |
|
|
|
def stream_chat(self, user_input): |
|
""" |
|
Faz a comunicação com o backend e retorna a resposta como streaming de tokens. |
|
""" |
|
try: |
|
response = requests.post( |
|
self.backend_url, |
|
json={"message": user_input}, |
|
stream=True |
|
) |
|
response.raise_for_status() |
|
|
|
|
|
for chunk in response.iter_content(chunk_size=512): |
|
if chunk: |
|
yield chunk.decode("utf-8") |
|
except Exception as e: |
|
yield f"Erro ao conectar ao servidor: {e}" |
|
|
|
def clear_chat_history(self): |
|
st.session_state.chat_history = [] |
|
st.toast("Histórico limpo com sucesso!", icon="✅") |
|
|
|
def render_sidebar(self): |
|
"""Siderbar""" |
|
with st.sidebar: |
|
if st.button("Light/Dark Mode"): |
|
st.session_state.theme = "dark" if st.session_state.theme == "light" else "light" |
|
st.rerun() |
|
if st.session_state.theme == "light": |
|
st.logo("./logos/sicoob-logo-horizontal-light.png", icon_image="./logos/sicoob-logo-vertical-sm.png") |
|
else: |
|
st.logo("./logos/sicoob-logo-horizontal-dark.png", icon_image="./logos/sicoob-logo-vertical-sm.png") |
|
|
|
if st.button("Limpar Histórico", icon=":material/delete:"): |
|
self.clear_chat_history() |
|
|
|
if st.button("Atualizações", icon=":material/info:"): |
|
self.changelog_user() |
|
st.divider() |
|
|
|
|
|
|
|
|
|
|
|
def render(self): |
|
|
|
""" |
|
Renderiza a interface do chatbot. |
|
""" |
|
|
|
st.markdown(self.get_current_style(), unsafe_allow_html=True) |
|
|
|
with open(self.configyml) as file: |
|
config = yaml.load(file, Loader=SafeLoader) |
|
|
|
|
|
|
|
|
|
authenticator = stauth.Authenticate( |
|
config['credentials'], |
|
config['cookie']['name'], |
|
config['cookie']['key'], |
|
config['cookie']['expiry_days'] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
def add_link_to_text(text): |
|
return re.sub(r'\|\|(.*?)\|\|', lambda match: f' <br>Fonte: <a href="{search_file_in_drive(match.group(1))}" target="_blank">{match.group(1)}</a>', text) |
|
|
|
|
|
with open('./config.yaml', 'w', encoding='utf-8') as file: |
|
yaml.dump(config, file, default_flow_style=False) |
|
|
|
|
|
authentication_status = authenticator.login() |
|
|
|
if st.session_state["authentication_status"]: |
|
|
|
|
|
self.render_sidebar() |
|
|
|
|
|
st.title(self.title) |
|
st.write(self.description) |
|
|
|
with st.sidebar: |
|
authenticator.logout('Sair', 'main') |
|
|
|
|
|
if "chat_history" not in st.session_state: |
|
st.session_state.chat_history = [] |
|
|
|
|
|
for message in st.session_state.chat_history: |
|
role, text = message.split(":", 1) |
|
with st.chat_message(role.strip().lower()): |
|
st.markdown(text.strip(), unsafe_allow_html=True) |
|
|
|
|
|
user_input = st.chat_input("Digite sua pergunta") |
|
if user_input: |
|
|
|
with st.chat_message("user"): |
|
st.write(user_input) |
|
st.session_state.chat_history.append(f"user: {user_input}") |
|
|
|
|
|
with st.chat_message("assistant"): |
|
message_placeholder = st.empty() |
|
assistant_message = "" |
|
|
|
|
|
for token in self.stream_chat(user_input): |
|
assistant_message += token |
|
assistant_message_with_link = add_link_to_text(assistant_message) |
|
message_placeholder.markdown(assistant_message_with_link + "▌", unsafe_allow_html=True) |
|
|
|
message_placeholder.markdown(assistant_message_with_link, unsafe_allow_html=True) |
|
|
|
|
|
st.session_state.chat_history.append(f"assistant: {assistant_message_with_link}") |
|
|
|
|
|
with st.container(border=True): |
|
if st.button("Gostei", icon=":material/thumb_up:"): |
|
save_feedback("positive", "Exemplo de pergunta do usuário", "Exemplo de resposta do assistente") |
|
|
|
if st.button("Não Gostei", icon=":material/thumb_down:"): |
|
save_feedback("negative", "Exemplo de pergunta do usuário", "Exemplo de resposta do assistente") |
|
|
|
elif st.session_state["authentication_status"] == False: |
|
st.error('Username/password is incorrect') |
|
elif st.session_state["authentication_status"] == None: |
|
st.warning('Por favor entre com seu "username" e senha.') |
|
|
|
if __name__ == "__main__": |
|
chatbot_app = ChatbotApp() |
|
chatbot_app.render() |
|
|