Spaces:
Sleeping
Sleeping
File size: 8,075 Bytes
54af9e3 8d06b39 224e4de 54af9e3 224e4de aa2cc5f 54af9e3 aa2cc5f 54af9e3 c3ddf65 887083d 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 aa2cc5f 54af9e3 bb7941a 7b864ba 224e4de 54af9e3 224e4de |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import json
import streamlit as st
from streamlit import session_state as ss
from streamlit_local_storage import LocalStorage
from form.form import build_form_data_from_answers, write_pdf_form, work_categories
from llm_manager.llm_parser import LlmParser
from local_storage.entities import PersonalDetails, LocationDetails, ContractorDetails
from prompts.prompts_manager import PromptsManager, Questions as Q
from repository.repository import build_repo_from_environment, get_repository
from repository import ModelRoles, Model
from utils.parsing_utils import check_for_missing_answers
user_msg = "Please describe what you need to do. To get the best results try to answer all the following questions:"
ls: LocalStorage = LocalStorage()
class UIManager:
def __init__(self):
self.pm: PromptsManager = PromptsManager(work_categories=work_categories)
self.repository = (build_repo_from_environment(self.pm.system_prompt) or
get_repository("testing",
Model("fakeModel", ModelRoles("a", "b", "c"))))
@staticmethod
def get_current_step():
return ss.get("step")
@staticmethod
def _build_base_ui():
st.markdown("## Dubai Asset Management red tape cutter")
def build_ui_for_initial_state(self, user_message):
help_ = user_message
self._build_base_ui()
with st.form("Please describe your request"):
user_input = st.text_area("Your input", height=700, label_visibility="hidden", placeholder=help_,
help=help_)
signature = st.file_uploader("Your signature", key="file_upload")
ss["signature"] = signature
submit_button = st.form_submit_button()
if submit_button:
ss["user_input"] = user_input
ss["step"] = "parsing_answers"
st.rerun()
def build_ui_for_parsing_answers(self):
self._build_base_ui()
with st.status("initialising LLM"):
self.repository.init()
with st.status("waiting for LLM"):
answer = self.repository.send_prompt(self.pm.verify_user_input_prompt(ss["user_input"]))
st.write(f"answers from LLM: {answer['content']}")
with st.status("Checking for missing answers"):
answers = LlmParser.parse_verification_prompt_answers(answer['content'])
ss["answers"] = answers
if len(answers) != len(Q):
ss["step"] = "parsing_error"
st.rerun()
ss["missing_answers"] = check_for_missing_answers(ss["answers"])
if not ss.get("missing_answers"):
ss["step"] = "check_category"
else:
ss["step"] = "ask_again"
st.rerun()
def build_ui_for_ask_again(self):
self._build_base_ui()
with st.form("form1"):
for ma in ss["missing_answers"]:
st.text_input(self.pm.questions[ma].lower(), key=ma)
submitted = st.form_submit_button("Submit answers")
if submitted:
for ma in ss["missing_answers"]:
ss["answers"][ma] = ss[ma]
ss["step"] = "check_category"
st.rerun()
def build_ui_for_check_category(self):
self._build_base_ui()
with st.status("finding the work categories applicable to your work"):
answer = self.repository.send_prompt(self.pm.get_work_category(ss["answers"][Q.WORK_TO_DO]))
categories = LlmParser.parse_get_categories_answer(answer['content'])
with st.status("categories found, creating PDF form"):
form_data, filename = build_form_data_from_answers(ss["answers"], categories,
ss.get("signature"))
pdf_form = write_pdf_form(form_data)
pdf_form_filename = filename
ss["pdf_form"] = pdf_form
ss["pdf_form_filename"] = pdf_form_filename
ss["step"] = "form_created"
st.rerun()
def build_ui_for_form_created(self):
self._build_base_ui()
st.download_button("download form", ss["pdf_form"],
file_name=ss["pdf_form_filename"], mime="application/pdf")
start_over_button = st.button("Start over")
if start_over_button:
del ss["step"]
del ss["pdf_form"]
del ss["pdf_form_filename"]
if "signature" in ss:
del ss["signature"]
st.rerun()
def build_ui_for_parsing_error(self):
def build_form_fragment(form_, col, title, add_save, *questions):
form_.text(title)
for user_data in questions:
with col:
form_.text_input(self.pm.questions_to_field_labels()[user_data], value=ss.get("answers", {})
.get(user_data), key=f"fq_{user_data.name}")
if add_save:
with col:
form_.text_input("Save as", key=title.replace(" ", "_"))
self._build_base_ui()
f = st.form("Please check the following information and correct fix any inaccuracies")
col1, col2 = f.columns(2)
build_form_fragment(f, col1, "your details", True, Q.FULL_NAME, Q.CONTACT_NUMBER, Q.YOUR_EMAIL)
build_form_fragment(f, col2, "work details", False, Q.WORK_TO_DO, Q.START_DATE, Q.END_DATE)
build_form_fragment(f, col1, "location details", True, Q.COMMUNITY, Q.BUILDING, Q.UNIT_APT_NUMBER,
Q.OWNER_OR_TENANT)
build_form_fragment(f, col2, "contractor details", True, Q.COMPANY_NAME, Q.COMPANY_NUMBER, Q.COMPANY_EMAIL)
submit_data = f.form_submit_button()
if submit_data:
for i in range(len(Q)):
ss["answers"][Q(i)] = ss[f"fq_{Q(i).name}"]
for details_key, func in [("your_details", self._get_personal_details),
("location_details", self._get_location_details),
("contractor_details", self._get_contractor_details)]:
details = func(details_key)
if details:
key = ss[details_key] # get the name under which this data should be saved
ls.setItem(key, json.dumps(details.__dict__), key)
ss["step"] = "check_category"
st.rerun()
@staticmethod
def _get_personal_details(personal_details_key) -> PersonalDetails | None:
if ss.get(personal_details_key):
return PersonalDetails(ss[f"fq_{Q.FULL_NAME.name}"], ss[f"fq_{Q.YOUR_EMAIL.name}"], ss[f"fq_{Q.CONTACT_NUMBER.name}"])
return None
@staticmethod
def _get_location_details(location_details_key) -> LocationDetails | None:
if ss.get(location_details_key):
return LocationDetails(ss[f"fq_{Q.OWNER_OR_TENANT.name}"], ss[f"fq_{Q.COMMUNITY.name}"],
ss[f"fq_{Q.BUILDING.name}"], ss[f"fq_{Q.UNIT_APT_NUMBER.name}"])
return None
@staticmethod
def _get_contractor_details(contractor_details_key) -> ContractorDetails | None:
if ss.get(contractor_details_key):
return ContractorDetails(ss[f"fq_{Q.COMPANY_NAME.name}"], ss[f"fq_{Q.COMPANY_NUMBER.name}"],
ss[f"fq_{Q.COMPANY_EMAIL.name}"])
return None
def use_streamlit():
um = UIManager()
if not um.get_current_step():
um.build_ui_for_initial_state(user_msg)
if um.get_current_step() == "parsing_answers":
um.build_ui_for_parsing_answers()
if um.get_current_step() == "parsing_error":
um.build_ui_for_parsing_error()
if um.get_current_step() == "ask_again":
um.build_ui_for_ask_again()
if um.get_current_step() == "check_category":
um.build_ui_for_check_category()
if um.get_current_step() == "form_created":
um.build_ui_for_form_created()
use_streamlit()
|