DAMHelper / ui_manager.py
enricorampazzo's picture
now saving personal, location and contractor details in the browser local storage
aa2cc5f
raw
history blame
7.15 kB
import json
import os
import streamlit as st
from streamlit import session_state as ss
from streamlit_local_storage import LocalStorage
from form.form import build_form_data_from_answers, write_pdf_form
from llm_manager.llm_parser import LlmParser
from local_storage.entities import PersonalDetails, LocationDetails, ContractorDetails
from prompts.prompts_manager import PromptsManager, Questions as Q
from repository.repository import get_repository
from repository import ModelRoles, Model
from utils.parsing_utils import check_for_missing_answers
ls: LocalStorage = LocalStorage()
def in_hf() -> bool:
return os.getenv("env") == "hf"
class UIManager:
def __init__(self):
self.pm: PromptsManager = PromptsManager()
self.repository = (build_repo_from_environment(self.pm.system_prompt) or
get_repository("testing",
Model("fakeModel", ModelRoles("a", "b", "c"))))
@staticmethod
def get_current_step():
return ss.get("step")
@staticmethod
def _build_base_ui():
st.markdown("## Dubai Asset Management red tape cutter")
def build_ui_for_initial_state(self, user_message):
help_ = user_message
self._build_base_ui()
with st.form("Please describe your request"):
user_input = st.text_area("Your input", height=700, label_visibility="hidden", placeholder=help_,
help=help_)
signature = st.file_uploader("Your signature", key="file_upload")
ss["signature"] = signature
submit_button = st.form_submit_button()
if submit_button:
ss["user_input"] = user_input
ss["step"] = "parsing_answers"
st.rerun()
def build_ui_for_parsing_answers(self):
self._build_base_ui()
with st.status("initialising LLM"):
self.repository.init()
with st.status("waiting for LLM"):
answer = self.repository.send_prompt(self.pm.verify_user_input_prompt(ss["user_input"]))
st.write(f"answers from LLM: {answer['content']}")
with st.status("Checking for missing answers"):
answers = LlmParser.parse_verification_prompt_answers(answer['content'])
ss["answers"] = answers
if len(answers) != len(Q):
ss["step"] = "parsing_error"
st.rerun()
ss["missing_answers"] = check_for_missing_answers(ss["answers"])
if not ss.get("missing_answers"):
ss["step"] = "check_category"
else:
ss["step"] = "ask_again"
st.rerun()
def build_ui_for_ask_again(self):
self._build_base_ui()
with st.form("form1"):
for ma in ss["missing_answers"]:
st.text_input(self.pm.questions[ma].lower(), key=ma)
submitted = st.form_submit_button("Submit answers")
if submitted:
for ma in ss["missing_answers"]:
ss["answers"][ma] = ss[ma]
ss["step"] = "check_category"
st.rerun()
def build_ui_for_check_category(self):
self._build_base_ui()
with st.status("finding the work categories applicable to your work"):
answer = self.repository.send_prompt(self.pm.get_work_category(ss["answers"][1]))
categories = LlmParser.parse_get_categories_answer(answer['content'])
with st.status("categories found, creating PDF form"):
form_data, filename = build_form_data_from_answers(ss["answers"], categories,
ss.get("signature"))
pdf_form = write_pdf_form(form_data)
pdf_form_filename = filename
ss["pdf_form"] = pdf_form
ss["pdf_form_filename"] = pdf_form_filename
ss["step"] = "form_created"
st.rerun()
def build_ui_for_form_created(self):
self._build_base_ui()
st.download_button("download form", ss["pdf_form"],
file_name=ss["pdf_form_filename"], mime="application/pdf")
start_over_button = st.button("Start over")
if start_over_button:
del ss["step"]
del ss["pdf_form"]
del ss["pdf_form_filename"]
if "signature" in ss:
del ss["signature"]
st.rerun()
def build_ui_for_parsing_error(self):
def build_form_fragment(form_, col, title, *questions):
form_.text(title)
for user_data in questions:
with col:
form_.text_input(self.pm.questions_to_field_labels()[user_data], value=ss.get("answers", {})
.get(user_data), key=f"fq_{user_data.value}")
with col:
form_.text_input("Save as", key=title.replace(" ", "_"))
self._build_base_ui()
f = st.form("Please check the following information and correct fix any inaccuracies")
col1, col2 = f.columns(2)
build_form_fragment(f, col1, "your details", Q.FULL_NAME, Q.CONTACT_NUMBER, Q.YOUR_EMAIL)
build_form_fragment(f, col2, "work details", Q.WORK_TO_DO, Q.START_DATE, Q.END_DATE)
build_form_fragment(f, col1, "location details", Q.COMMUNITY, Q.BUILDING, Q.UNIT_APT_NUMBER,
Q.OWNER_OR_TENANT)
build_form_fragment(f, col2, "contractor details", Q.COMPANY_NAME, Q.COMPANY_NUMBER, Q.COMPANY_EMAIL)
submit_data = f.form_submit_button()
if submit_data:
for i in range(len(Q)):
ss["answers"][Q(i)] = ss[f"fq_{i}"]
for details_key, func in [("your_details", self._get_personal_details),
("location_details", self._get_location_details),
("contractor_details", self._get_contractor_details)]:
details = func(details_key)
if details:
key = ss[details_key] # get the name under which this data should be saved
ls.setItem(key, json.dumps(details))
@staticmethod
def _get_personal_details(personal_details_key) -> PersonalDetails | None:
if ss.get(personal_details_key):
return PersonalDetails(ss[f"fq_{Q.FULL_NAME}"], ss[f"fq_{Q.FULL_NAME}"], ss[f"fq_{Q.CONTACT_NUMBER}"])
return None
@staticmethod
def _get_location_details(location_details_key) -> LocationDetails | None:
if ss.get(location_details_key):
return LocationDetails(ss[f"fq_{Q.OWNER_OR_TENANT}"], ss[f"fq_{Q.COMMUNITY}"], ss[f"fq_{Q.BUILDING}"],
ss[f"fq_{Q.UNIT_APT_NUMBER}"])
return None
@staticmethod
def _get_contractor_details(contractor_details_key) -> ContractorDetails | None:
if ss.get(contractor_details_key):
return ContractorDetails(ss[f"fq_{Q.COMPANY_NAME}"], ss[f"fq_{Q.COMPANY_NUMBER}"],
ss[f"fq_{Q.COMPANY_EMAIL}"])
return None