File size: 13,707 Bytes
cb7ff7d
6508ff2
8d06b39
224e4de
ebaa573
54af9e3
ebaa573
6508ff2
224e4de
aa2cc5f
54af9e3
ebaa573
d680b24
88f9105
ebaa573
54af9e3
aa2cc5f
54af9e3
c3ddf65
887083d
cb7ff7d
88f9105
cb7ff7d
ebaa573
6508ff2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88f9105
54af9e3
 
aa2cc5f
54af9e3
 
d680b24
 
ebaa573
d680b24
ebaa573
54af9e3
6508ff2
 
 
 
 
 
 
 
 
54af9e3
ebaa573
54af9e3
ebaa573
 
 
 
 
 
 
 
54af9e3
 
6508ff2
54af9e3
 
ebaa573
 
 
 
 
 
 
 
54af9e3
 
 
cb7ff7d
 
d680b24
54af9e3
 
 
cb7ff7d
54af9e3
 
88f9105
54af9e3
 
6508ff2
54af9e3
 
 
6508ff2
54af9e3
6508ff2
54af9e3
 
 
 
 
 
6508ff2
54af9e3
 
 
 
6508ff2
54af9e3
 
 
 
 
aa2cc5f
54af9e3
cb7ff7d
6508ff2
54af9e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb7ff7d
 
 
 
 
 
 
 
 
 
d680b24
6508ff2
cb7ff7d
54af9e3
cb7ff7d
 
 
 
 
 
 
 
6508ff2
aa2cc5f
54af9e3
cb7ff7d
6508ff2
 
cb7ff7d
 
6508ff2
 
cb7ff7d
54af9e3
 
cb7ff7d
 
ebaa573
 
cb7ff7d
54af9e3
 
 
 
 
 
 
 
 
 
 
 
aa2cc5f
 
54af9e3
bb7941a
ebaa573
cb7ff7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ebaa573
cb7ff7d
 
 
 
 
 
6508ff2
cb7ff7d
 
 
ebaa573
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54af9e3
6508ff2
 
54af9e3
ebaa573
 
54af9e3
6508ff2
54af9e3
6508ff2
cb7ff7d
6508ff2
54af9e3
6508ff2
54af9e3
6508ff2
cb7ff7d
6508ff2
54af9e3
224e4de
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
import re
from enum import Enum

import streamlit as st
import streamlit_tags as st_tags
from streamlit import session_state as ss
from streamlit.delta_generator import DeltaGenerator
from streamlit.runtime.scriptrunner_utils.exceptions import StopException

from form.form import build_form_data_from_answers, write_pdf_form, work_categories
from llm_manager.llm_parser import LlmParser
from local_storage.entities import PersonalDetails, LocationDetails, ContractorDetails, SavedDetails
from local_storage.ls_manager import LocalStorageManager
from prompts.prompts_manager import PromptsManager
from enums import Questions as Q, DetailsType
from repository.repository import build_repo_from_environment, get_repository
from repository import ModelRoles, Model
from utils.parsing_utils import check_for_missing_answers

user_msg = "Please describe what you need to do. To get the best results try to answer all the following questions:"

find_tags_regex = re.compile(r"@(\S*)")


class Steps(Enum):
    INITIAL_STATE = 1
    PARSING_ANSWERS = 2
    ASK_AGAIN = 3
    FIND_CATEGORIES = 4
    VALIDATE_DATA = 5
    PARSING_ERROR = 6
    FORM_CREATED = 7

    def __eq__(self, other):
        if not isinstance(other, self.__class__):
            return False
        return self.value == other.value

    def __hash__(self):
        return hash(self.value)


class UIManager:
    def __init__(self):
        self.pm: PromptsManager = PromptsManager(work_categories=work_categories)
        self.repository = (build_repo_from_environment(self.pm.system_prompt) or
                           get_repository("testing",
                                          Model("fakeModel",
                                                ModelRoles("a", "b", "c"))))
        self.update_in_progress = False
        self.lsm = None

    @staticmethod
    def get_current_step() -> int:
        try:
            return ss.get("step") or Steps.INITIAL_STATE.value
        except StopException:
            return Steps.INITIAL_STATE.value

    @staticmethod
    def set_current_step(step: Steps):
        ss["step"] = step.value

    def _build_base_ui(self):
        st.markdown("## Dubai Asset Management red tape cutter")
        self.lsm = LocalStorageManager() if not self.lsm else self.lsm
        with st.sidebar:
            st.markdown("### Personal details")
            self.build_details_checkboxes(DetailsType.PERSONAL_DETAILS)
            st.markdown("### Locations details")
            self.build_details_checkboxes(DetailsType.LOCATION_DETAILS)
            st.markdown("### Contractors details")
            self.build_details_checkboxes(DetailsType.CONTRACTOR_DETAILS)

    def build_ui_for_initial_state(self, user_message):
        help_ = user_message + "\n".join(self.pm.questions)
        self._build_base_ui()
        with st.form("Please describe your request"):
            st.text_area("Your input", height=700, label_visibility="hidden", placeholder=help_,
                         help=help_, key="user_input")
            signature = st.file_uploader("Your signature", key="file_upload")
            ss["signature"] = signature
            submit_button = st.form_submit_button()
            if submit_button:
                self.set_current_step(Steps.PARSING_ANSWERS)
                st.rerun()

    def build_ui_for_parsing_answers(self):
        self._build_base_ui()
        with st.status("parsing user input for tags"):
            tags = find_tags_regex.findall(ss["user_input"])
            details = [self.lsm.get_detail(t) for t in tags]
        with st.status("initialising LLM"):
            self.repository.init()
        with st.status("waiting for LLM"):
            answer = self.repository.send_prompt(self.pm.verify_user_input_prompt(ss["user_input"], details))
            st.write(f"answers from LLM: {answer['content']}")
        with st.status("Checking for missing answers"):
            answers = LlmParser.parse_verification_prompt_answers(answer['content'], details)
            ss["answers"] = answers
            if len(answers) != len(Q):
                self.set_current_step(Steps.PARSING_ERROR)
                st.rerun()
        ss["missing_answers"] = check_for_missing_answers(ss["answers"])
        if not ss.get("missing_answers"):
            self.set_current_step(Steps.FIND_CATEGORIES)
        else:
            self.set_current_step(Steps.ASK_AGAIN)
        st.rerun()

    def build_ui_for_ask_again(self):
        self._build_base_ui()
        with st.form("form1"):
            for ma in ss["missing_answers"]:
                st.text_input(self.pm.questions[ma.value].lower(), key=ma)
            submitted = st.form_submit_button("Submit answers")
            if submitted:
                for ma in ss["missing_answers"]:
                    ss["answers"][ma] = ss[ma]
                self.set_current_step(Steps.FIND_CATEGORIES)
                st.rerun()

    def build_ui_for_check_category(self):
        self._build_base_ui()
        with st.status("finding the work categories applicable to your work"):
            answer = self.repository.send_prompt(self.pm.get_work_category(ss["answers"][Q.WORK_TO_DO]))
            categories = LlmParser.parse_get_categories_answer(answer['content'])
            ss["categories"] = categories
            self.set_current_step(Steps.VALIDATE_DATA)
            st.rerun()

    def build_ui_for_form_created(self):
        self._build_base_ui()
        st.download_button("download form", ss["pdf_form"],
                           file_name=ss["pdf_form_filename"], mime="application/pdf")
        start_over_button = st.button("Start over")
        if start_over_button:
            del ss["step"]
            del ss["pdf_form"]
            del ss["pdf_form_filename"]
            if "signature" in ss:
                del ss["signature"]
            st.rerun()

    def _integrate_llm_answers_with_user_corrections(self):
        for i in range(len(Q)):
            ss["answers"][Q(i)] = ss[f"fq_{Q(i).name}"]

        for details_key, func in [("your_details", self._get_personal_details),
                                  ("location_details", self._get_location_details),
                                  ("contractor_details", self._get_contractor_details)]:
            details = func(details_key)
            if details:
                key = ss[details_key]  # get the name under which this data should be saved
                self.lsm.save_details(details, key)
        self.set_current_step(Steps.FIND_CATEGORIES)
        st.rerun()

    def _create_pdf_form(self):
        with st.status("categories found, creating PDF form"):
            form_data, filename = build_form_data_from_answers(ss["answers"], ss["categories"],
                                                               ss.get("signature"))
            pdf_form = write_pdf_form(form_data)
            pdf_form_filename = filename
            ss["pdf_form"] = pdf_form
            ss["pdf_form_filename"] = pdf_form_filename
            self.set_current_step(Steps.FORM_CREATED)
            st.rerun()

    def build_ui_for_validate_data_after_correction(self):
        self._build_validation_form(False, self._integrate_llm_answers_with_user_corrections,
                                    "Find work categories")

    def build_ui_to_confirm_form_data(self):
        self._build_validation_form(True, self._create_pdf_form,
                                    "Create work permit request")

    @staticmethod
    def _get_personal_details(personal_details_key) -> PersonalDetails | None:
        key_ = ss.get(personal_details_key)
        if key_:
            details = PersonalDetails(ss[f"fq_{Q.FULL_NAME.name}"], ss[f"fq_{Q.YOUR_EMAIL.name}"],
                                      ss[f"fq_{Q.CONTACT_NUMBER.name}"])
            return details
        return None

    @staticmethod
    def _get_location_details(location_details_key) -> LocationDetails | None:
        if ss.get(location_details_key):
            return LocationDetails(ss[f"fq_{Q.OWNER_OR_TENANT.name}"], ss[f"fq_{Q.COMMUNITY.name}"],
                                   ss[f"fq_{Q.BUILDING.name}"], ss[f"fq_{Q.UNIT_APT_NUMBER.name}"])
        return None

    @staticmethod
    def _get_contractor_details(contractor_details_key) -> ContractorDetails | None:
        if ss.get(contractor_details_key):
            return ContractorDetails(ss[f"fq_{Q.COMPANY_NAME.name}"], ss[f"fq_{Q.COMPANY_NUMBER.name}"],
                                     ss[f"fq_{Q.COMPANY_EMAIL.name}"])
        return None

    def _build_validation_form(self, show_categories: bool, onsubmit, submit_button_label):
        def build_form_fragment(form_, col, title, add_save, *questions):
            form_.text(title)
            for user_data in questions:
                with col:
                    form_.text_input(self.pm.questions_to_field_labels()[user_data], value=ss.get("answers", {})
                                     .get(user_data), key=f"fq_{user_data.name}")
            if add_save:
                with col:
                    form_.text_input("Save as", key=title.replace(" ", "_"))

        self._build_base_ui()
        f = st.form("Please check the following information and correct fix any inaccuracies")
        col1, col2 = f.columns(2)
        build_form_fragment(f, col1, "your details", True, Q.FULL_NAME, Q.CONTACT_NUMBER, Q.YOUR_EMAIL)
        build_form_fragment(f, col2, "work details", False, Q.WORK_TO_DO, Q.START_DATE, Q.END_DATE)
        build_form_fragment(f, col1, "location details", True, Q.COMMUNITY, Q.BUILDING, Q.UNIT_APT_NUMBER,
                            Q.OWNER_OR_TENANT)
        build_form_fragment(f, col2, "contractor details", True, Q.COMPANY_NAME, Q.COMPANY_NUMBER, Q.COMPANY_EMAIL)
        if show_categories:
            for k, wc in work_categories.items():
                f.checkbox(label=wc, key=k, value=k in ss["categories"])

        submit_data = f.form_submit_button(label=submit_button_label)
        if submit_data:
            onsubmit()

    def build_details_checkboxes(self, dt: DetailsType):
        details = self.lsm.get_details(dt)
        with st.container(border=True):
            col1, col2 = st.columns(2)
            with col1:
                st.markdown(f"#### {dt.title()}")
            with col2:
                st.markdown("#### Default")
            for d in details:
                with col1:
                    st.checkbox(label=d.short_description(), key=f"{dt.name}_{d.key}",
                                on_change=self._update_user_prompt, args=[dt, d.key])
                with col2:
                    st.toggle(f"favourite_{d.key}", label_visibility="hidden", value=ss.get(f"DEFAULT_{dt.name}"))
            add_new = st.button(f"Add {dt.title()}")
            if add_new:
                self.add_new_detail_dialog(dt)

    @st.dialog("Add new")
    def add_new_detail_dialog(self, type_: DetailsType):
        if type_.name == DetailsType.CONTRACTOR_DETAILS.name:
            new_item = ContractorDetails()
        elif type_.name == DetailsType.PERSONAL_DETAILS.name:
            new_item = PersonalDetails()
        else:
            new_item = LocationDetails()
        with st.form("new item", border=False):
            fields_labels = new_item.widget_labels()
            for k,v in fields_labels.items():
                st.text_input(label=v, key=k)
            btn_save = st.form_submit_button("Save")
            if btn_save:
                for k in fields_labels:
                    setattr(new_item, k, ss[k])
                self.lsm.save_details(new_item)
                st.rerun()
    def _update_user_prompt(self, type_: DetailsType, key: str):
        if not self.update_in_progress:
            self.update_in_progress = True
            checkbox_key = f"{type_.name}_{key}"
            if ss.get(checkbox_key) is True:
                # if the checkbox is _selected_
                to_deselect = [d for d in self.lsm.get_details(type_) if d.key != key]
                for td in to_deselect:
                    # deselect other checkbox in the same type
                    ss[f"{type_.name}_{td}"] = False
                    if f"@{td}" in ss.get("user_input"):
                        # remove the key associated with this checkbox from the user input textarea
                        ss["user_input"] = ss.get["user_input"].replace(f"@{td}", "")
                # add the key associated to the newly selected checkbox in the user input textarea
                ss["user_input"] = f"{ss.get('user_input', '')} @{key}".strip()
            else:
                # remove the key associated to the newly deselected checkbox in the user input textarea
                if f"@{key}" in ss.get("user_input"):
                    ss["user_input"] = ss.get("user_input", "").replace(f"@{key}", "").strip()
            self.update_in_progress = False


um = UIManager()


def use_streamlit():
    if um.get_current_step() == Steps.INITIAL_STATE.value:
        um.build_ui_for_initial_state(user_msg)
    elif um.get_current_step() == Steps.PARSING_ANSWERS.value:
        um.build_ui_for_parsing_answers()
    elif um.get_current_step() == Steps.PARSING_ERROR.value:
        um.build_ui_for_validate_data_after_correction()
    elif um.get_current_step() == Steps.ASK_AGAIN.value:
        um.build_ui_for_ask_again()
    elif um.get_current_step() == Steps.FIND_CATEGORIES.value:
        um.build_ui_for_check_category()
    elif um.get_current_step() == Steps.VALIDATE_DATA.value:
        um.build_ui_to_confirm_form_data()
    elif um.get_current_step() == Steps.FORM_CREATED.value:
        um.build_ui_for_form_created()


use_streamlit()