Spaces:
Sleeping
Sleeping
import json | |
import os | |
import streamlit as st | |
from streamlit import session_state as ss | |
from streamlit_local_storage import LocalStorage | |
from form.form import build_form_data_from_answers, write_pdf_form | |
from llm_manager.llm_parser import LlmParser | |
from local_storage.entities import PersonalDetails, LocationDetails, ContractorDetails | |
from prompts.prompts_manager import PromptsManager, Questions as Q | |
from repository.repository import get_repository | |
from repository import ModelRoles, Model | |
from utils.parsing_utils import check_for_missing_answers | |
ls: LocalStorage = LocalStorage() | |
def in_hf() -> bool: | |
return os.getenv("env") == "hf" | |
class UIManager: | |
def __init__(self): | |
self.pm: PromptsManager = PromptsManager() | |
self.repository = (build_repo_from_environment(self.pm.system_prompt) or | |
get_repository("testing", | |
Model("fakeModel", ModelRoles("a", "b", "c")))) | |
def get_current_step(): | |
return ss.get("step") | |
def _build_base_ui(): | |
st.markdown("## Dubai Asset Management red tape cutter") | |
def build_ui_for_initial_state(self, user_message): | |
help_ = user_message | |
self._build_base_ui() | |
with st.form("Please describe your request"): | |
user_input = st.text_area("Your input", height=700, label_visibility="hidden", placeholder=help_, | |
help=help_) | |
signature = st.file_uploader("Your signature", key="file_upload") | |
ss["signature"] = signature | |
submit_button = st.form_submit_button() | |
if submit_button: | |
ss["user_input"] = user_input | |
ss["step"] = "parsing_answers" | |
st.rerun() | |
def build_ui_for_parsing_answers(self): | |
self._build_base_ui() | |
with st.status("initialising LLM"): | |
self.repository.init() | |
with st.status("waiting for LLM"): | |
answer = self.repository.send_prompt(self.pm.verify_user_input_prompt(ss["user_input"])) | |
st.write(f"answers from LLM: {answer['content']}") | |
with st.status("Checking for missing answers"): | |
answers = LlmParser.parse_verification_prompt_answers(answer['content']) | |
ss["answers"] = answers | |
if len(answers) != len(Q): | |
ss["step"] = "parsing_error" | |
st.rerun() | |
ss["missing_answers"] = check_for_missing_answers(ss["answers"]) | |
if not ss.get("missing_answers"): | |
ss["step"] = "check_category" | |
else: | |
ss["step"] = "ask_again" | |
st.rerun() | |
def build_ui_for_ask_again(self): | |
self._build_base_ui() | |
with st.form("form1"): | |
for ma in ss["missing_answers"]: | |
st.text_input(self.pm.questions[ma].lower(), key=ma) | |
submitted = st.form_submit_button("Submit answers") | |
if submitted: | |
for ma in ss["missing_answers"]: | |
ss["answers"][ma] = ss[ma] | |
ss["step"] = "check_category" | |
st.rerun() | |
def build_ui_for_check_category(self): | |
self._build_base_ui() | |
with st.status("finding the work categories applicable to your work"): | |
answer = self.repository.send_prompt(self.pm.get_work_category(ss["answers"][1])) | |
categories = LlmParser.parse_get_categories_answer(answer['content']) | |
with st.status("categories found, creating PDF form"): | |
form_data, filename = build_form_data_from_answers(ss["answers"], categories, | |
ss.get("signature")) | |
pdf_form = write_pdf_form(form_data) | |
pdf_form_filename = filename | |
ss["pdf_form"] = pdf_form | |
ss["pdf_form_filename"] = pdf_form_filename | |
ss["step"] = "form_created" | |
st.rerun() | |
def build_ui_for_form_created(self): | |
self._build_base_ui() | |
st.download_button("download form", ss["pdf_form"], | |
file_name=ss["pdf_form_filename"], mime="application/pdf") | |
start_over_button = st.button("Start over") | |
if start_over_button: | |
del ss["step"] | |
del ss["pdf_form"] | |
del ss["pdf_form_filename"] | |
if "signature" in ss: | |
del ss["signature"] | |
st.rerun() | |
def build_ui_for_parsing_error(self): | |
def build_form_fragment(form_, col, title, *questions): | |
form_.text(title) | |
for user_data in questions: | |
with col: | |
form_.text_input(self.pm.questions_to_field_labels()[user_data], value=ss.get("answers", {}) | |
.get(user_data), key=f"fq_{user_data.value}") | |
with col: | |
form_.text_input("Save as", key=title.replace(" ", "_")) | |
self._build_base_ui() | |
f = st.form("Please check the following information and correct fix any inaccuracies") | |
col1, col2 = f.columns(2) | |
build_form_fragment(f, col1, "your details", Q.FULL_NAME, Q.CONTACT_NUMBER, Q.YOUR_EMAIL) | |
build_form_fragment(f, col2, "work details", Q.WORK_TO_DO, Q.START_DATE, Q.END_DATE) | |
build_form_fragment(f, col1, "location details", Q.COMMUNITY, Q.BUILDING, Q.UNIT_APT_NUMBER, | |
Q.OWNER_OR_TENANT) | |
build_form_fragment(f, col2, "contractor details", Q.COMPANY_NAME, Q.COMPANY_NUMBER, Q.COMPANY_EMAIL) | |
submit_data = f.form_submit_button() | |
if submit_data: | |
for i in range(len(Q)): | |
ss["answers"][Q(i)] = ss[f"fq_{i}"] | |
for details_key, func in [("your_details", self._get_personal_details), | |
("location_details", self._get_location_details), | |
("contractor_details", self._get_contractor_details)]: | |
details = func(details_key) | |
if details: | |
key = ss[details_key] # get the name under which this data should be saved | |
ls.setItem(key, json.dumps(details)) | |
def _get_personal_details(personal_details_key) -> PersonalDetails | None: | |
if ss.get(personal_details_key): | |
return PersonalDetails(ss[f"fq_{Q.FULL_NAME}"], ss[f"fq_{Q.FULL_NAME}"], ss[f"fq_{Q.CONTACT_NUMBER}"]) | |
return None | |
def _get_location_details(location_details_key) -> LocationDetails | None: | |
if ss.get(location_details_key): | |
return LocationDetails(ss[f"fq_{Q.OWNER_OR_TENANT}"], ss[f"fq_{Q.COMMUNITY}"], ss[f"fq_{Q.BUILDING}"], | |
ss[f"fq_{Q.UNIT_APT_NUMBER}"]) | |
return None | |
def _get_contractor_details(contractor_details_key) -> ContractorDetails | None: | |
if ss.get(contractor_details_key): | |
return ContractorDetails(ss[f"fq_{Q.COMPANY_NAME}"], ss[f"fq_{Q.COMPANY_NUMBER}"], | |
ss[f"fq_{Q.COMPANY_EMAIL}"]) | |
return None | |