Spaces:
Runtime error
Runtime error
| import datetime | |
| import json | |
| import pandas as pd | |
| import streamlit as st | |
| from langchain_core.messages import HumanMessage, FunctionMessage | |
| from streamlit.delta_generator import DeltaGenerator | |
| from backend.chat_bot.json_decoder import CustomJSONDecoder | |
| from backend.constants.streamlit_keys import CHAT_CURRENT_USER_SESSIONS, EL_SESSION_SELECTOR, \ | |
| EL_UPLOAD_FILES_STATUS, USER_PRIVATE_FILES, EL_BUILD_KB_WITH_FILES, \ | |
| EL_PERSONAL_KB_NAME, EL_PERSONAL_KB_DESCRIPTION, \ | |
| USER_PERSONAL_KNOWLEDGE_BASES, AVAILABLE_RETRIEVAL_TOOLS, EL_PERSONAL_KB_NEEDS_REMOVE, \ | |
| CHAT_KNOWLEDGE_TABLE, EL_UPLOAD_FILES, EL_SELECTED_KBS | |
| from backend.constants.variables import DIVIDER_HTML, USER_NAME, RETRIEVER_TOOLS | |
| from backend.construct.build_chat_bot import build_chat_knowledge_table, initialize_session_manager | |
| from backend.chat_bot.chat import refresh_sessions, on_session_change_submit, refresh_agent, \ | |
| create_private_knowledge_base_as_tool, \ | |
| remove_private_knowledge_bases, add_file, clear_files, clear_history, back_to_main, on_chat_submit | |
| def render_session_manager(): | |
| with st.expander("🤖 Session Management"): | |
| if CHAT_CURRENT_USER_SESSIONS not in st.session_state: | |
| refresh_sessions() | |
| st.markdown("Here you can update `session_id` and `system_prompt`") | |
| st.markdown("- Click empty row to add a new item") | |
| st.markdown("- If needs to delete an item, just click it and press `DEL` key") | |
| st.markdown("- Don't forget to submit your change.") | |
| st.data_editor( | |
| data=st.session_state[CHAT_CURRENT_USER_SESSIONS], | |
| num_rows="dynamic", | |
| key="session_editor", | |
| use_container_width=True, | |
| ) | |
| st.button("⏫ Submit", on_click=on_session_change_submit, type="primary") | |
| def render_session_selection(): | |
| with st.expander("✅ Session Selection", expanded=True): | |
| st.selectbox( | |
| "Choose a `session` to chat", | |
| options=st.session_state[CHAT_CURRENT_USER_SESSIONS], | |
| index=None, | |
| key=EL_SESSION_SELECTOR, | |
| format_func=lambda x: x["session_id"], | |
| on_change=refresh_agent, | |
| ) | |
| def render_files_manager(): | |
| with st.expander("📃 **Upload your personal files**", expanded=False): | |
| st.markdown("- Files will be parsed by [Unstructured API](https://unstructured.io/api-key).") | |
| st.markdown("- All files will be converted into vectors and stored in [MyScaleDB](https://myscale.com/).") | |
| st.file_uploader(label="⏫ **Upload files**", key=EL_UPLOAD_FILES, accept_multiple_files=True) | |
| # st.markdown("### Uploaded Files") | |
| st.dataframe( | |
| data=st.session_state[CHAT_KNOWLEDGE_TABLE].list_files(st.session_state[USER_NAME]), | |
| use_container_width=True, | |
| ) | |
| st.session_state[EL_UPLOAD_FILES_STATUS] = st.empty() | |
| col_1, col_2 = st.columns(2) | |
| with col_1: | |
| st.button(label="Upload files", on_click=add_file) | |
| with col_2: | |
| st.button(label="Clear all files and tools", on_click=clear_files) | |
| def _render_create_personal_knowledge_bases(div: DeltaGenerator): | |
| with div: | |
| st.markdown("- If you haven't upload your personal files, please upload them first.") | |
| st.markdown("- Select some **files** to build your `personal knowledge base`.") | |
| st.markdown("- Once the your `personal knowledge base` is built, " | |
| "it will answer your questions using information from your personal **files**.") | |
| st.multiselect( | |
| label="⚡️Select some files to build a **personal knowledge base**", | |
| options=st.session_state[USER_PRIVATE_FILES], | |
| placeholder="You should upload some files first", | |
| key=EL_BUILD_KB_WITH_FILES, | |
| format_func=lambda x: x["file_name"], | |
| ) | |
| st.text_input( | |
| label="⚡️Personal knowledge base name", | |
| value="get_relevant_documents", | |
| key=EL_PERSONAL_KB_NAME | |
| ) | |
| st.text_input( | |
| label="⚡️Personal knowledge base description", | |
| value="Searches from some personal files.", | |
| key=EL_PERSONAL_KB_DESCRIPTION, | |
| ) | |
| st.button( | |
| label="Build 🔧", | |
| on_click=create_private_knowledge_base_as_tool | |
| ) | |
| def _render_remove_personal_knowledge_bases(div: DeltaGenerator): | |
| with div: | |
| st.markdown("> Here is all your personal knowledge bases.") | |
| if USER_PERSONAL_KNOWLEDGE_BASES in st.session_state and len(st.session_state[USER_PERSONAL_KNOWLEDGE_BASES]) > 0: | |
| st.dataframe(st.session_state[USER_PERSONAL_KNOWLEDGE_BASES]) | |
| else: | |
| st.warning("You don't have any personal knowledge bases, please create a new one.") | |
| st.multiselect( | |
| label="Choose a personal knowledge base to delete", | |
| placeholder="Choose a personal knowledge base to delete", | |
| options=st.session_state[USER_PERSONAL_KNOWLEDGE_BASES], | |
| format_func=lambda x: x["tool_name"], | |
| key=EL_PERSONAL_KB_NEEDS_REMOVE, | |
| ) | |
| st.button("Delete", on_click=remove_private_knowledge_bases, type="primary") | |
| def render_personal_tools_build(): | |
| with st.expander("🔨 **Build your personal knowledge base**", expanded=True): | |
| create_new_kb, kb_manager = st.tabs(["Create personal knowledge base", "Personal knowledge base management"]) | |
| _render_create_personal_knowledge_bases(create_new_kb) | |
| _render_remove_personal_knowledge_bases(kb_manager) | |
| def render_knowledge_base_selector(): | |
| with st.expander("🙋 **Select some knowledge bases to query**", expanded=True): | |
| st.markdown("- Knowledge bases come in two types: `public` and `private`.") | |
| st.markdown("- All users can access our `public` knowledge bases.") | |
| st.markdown("- Only you can access your `personal` knowledge bases.") | |
| options = st.session_state[RETRIEVER_TOOLS].keys() | |
| if AVAILABLE_RETRIEVAL_TOOLS in st.session_state: | |
| options = st.session_state[AVAILABLE_RETRIEVAL_TOOLS] | |
| st.multiselect( | |
| label="Select some knowledge base tool", | |
| placeholder="Please select some knowledge bases to query", | |
| options=options, | |
| default=["Wikipedia + Self Querying"], | |
| key=EL_SELECTED_KBS, | |
| on_change=refresh_agent, | |
| ) | |
| def chat_page(): | |
| # initialize resources | |
| build_chat_knowledge_table() | |
| initialize_session_manager() | |
| # render sidebar | |
| with st.sidebar: | |
| left, middle, right = st.columns([1, 1, 2]) | |
| with left: | |
| st.button(label="↩️ Log Out", help="log out and back to main page", on_click=back_to_main) | |
| with right: | |
| st.markdown(f"👤 `{st.session_state[USER_NAME]}`") | |
| st.markdown(DIVIDER_HTML, unsafe_allow_html=True) | |
| render_session_manager() | |
| render_session_selection() | |
| render_files_manager() | |
| render_personal_tools_build() | |
| render_knowledge_base_selector() | |
| # render chat history | |
| if "agent" not in st.session_state: | |
| refresh_agent() | |
| for msg in st.session_state.agent.memory.chat_memory.messages: | |
| speaker = "user" if isinstance(msg, HumanMessage) else "assistant" | |
| if isinstance(msg, FunctionMessage): | |
| with st.chat_message(name="from knowledge base", avatar="📚"): | |
| st.write( | |
| f"*{datetime.datetime.fromtimestamp(msg.additional_kwargs['timestamp']).isoformat()}*" | |
| ) | |
| st.write("Retrieved from knowledge base:") | |
| try: | |
| st.dataframe( | |
| pd.DataFrame.from_records( | |
| json.loads(msg.content, cls=CustomJSONDecoder) | |
| ), | |
| use_container_width=True, | |
| ) | |
| except Exception as e: | |
| st.warning(e) | |
| st.write(msg.content) | |
| else: | |
| if len(msg.content) > 0: | |
| with st.chat_message(speaker): | |
| # print(type(msg), msg.dict()) | |
| st.write( | |
| f"*{datetime.datetime.fromtimestamp(msg.additional_kwargs['timestamp']).isoformat()}*" | |
| ) | |
| st.write(f"{msg.content}") | |
| st.session_state["next_round"] = st.empty() | |
| from streamlit import _bottom | |
| with _bottom: | |
| col1, col2 = st.columns([1, 16]) | |
| with col1: | |
| st.button("🗑️", help="Clean chat history", on_click=clear_history, type="secondary") | |
| with col2: | |
| st.chat_input("Input Message", on_submit=on_chat_submit, key="chat_input") | |