Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import torch | |
| import bitsandbytes | |
| import accelerate | |
| import scipy | |
| import copy | |
| import time | |
| from typing import Tuple, Dict, List, Union | |
| from streamlit.delta_generator import DeltaGenerator | |
| from PIL import Image | |
| import torch.nn as nn | |
| import pandas as pd | |
| from my_model.detector.object_detection import detect_and_draw_objects | |
| from my_model.captioner.image_captioning import get_caption | |
| from my_model.utilities.gen_utilities import free_gpu_resources | |
| from my_model.state_manager import StateManager | |
| from my_model.config import inference_config as config | |
| class InferenceRunner(StateManager): | |
| """ | |
| Manages the user interface and interactions for running inference using the Streamlit-based Knowledge-Based Visual | |
| Question Answering (KBVQA) application. | |
| This class handles image uploads, displays sample images, and facilitates the question-answering process using the | |
| KBVQA model. | |
| Inherits from the StateManager class. | |
| """ | |
| def __init__(self) -> None: | |
| """ | |
| Initializes the InferenceRunner instance, setting up the necessary state. | |
| """ | |
| super().__init__() | |
| def answer_question(self, caption: str, detected_objects_str: str, question: str) -> Tuple[str, int]: | |
| """ | |
| Generates an answer to a user's question based on the image's caption and detected objects. | |
| Args: | |
| caption (str): Caption generated for the image. | |
| detected_objects_str (str): String representation of detected objects in the image. | |
| question (str): User's question about the image. | |
| Returns: | |
| Tuple[str, int]: A tuple containing the answer to the question and the prompt length. | |
| """ | |
| free_gpu_resources() | |
| answer = st.session_state.kbvqa.generate_answer(question, caption, detected_objects_str) | |
| prompt_length = st.session_state.kbvqa.current_prompt_length | |
| free_gpu_resources() | |
| return answer, prompt_length | |
| def display_sample_images(self) -> None: | |
| """ | |
| Displays sample images as clickable thumbnails for the user to select. | |
| Returns: | |
| None | |
| """ | |
| self.col1.write("Choose from sample images:") | |
| cols = self.col1.columns(len(config.SAMPLE_IMAGES)) | |
| for idx, sample_image_path in enumerate(config.SAMPLE_IMAGES): | |
| with cols[idx]: | |
| image = Image.open(sample_image_path) | |
| image_for_display = self.resize_image(sample_image_path, 80, 80) | |
| st.image(image_for_display) | |
| if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx + 1}'): | |
| self.process_new_image(sample_image_path, image) | |
| def handle_image_upload(self) -> None: | |
| """ | |
| Provides an image uploader widget for the user to upload their own images. | |
| Returns: | |
| None | |
| """ | |
| uploaded_image = self.col1.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"]) | |
| if uploaded_image is not None: | |
| self.process_new_image(uploaded_image.name, Image.open(uploaded_image)) | |
| def display_image_and_analysis(self, image_key: str, image_data: Dict, nested_col21: DeltaGenerator, | |
| nested_col22: DeltaGenerator) -> None: | |
| """ | |
| Displays the uploaded or selected image and provides an option to analyze the image. | |
| Args: | |
| image_key (str): Unique key identifying the image. | |
| image_data (Dict): Data associated with the image. | |
| nested_col21 (DeltaGenerator): Column for displaying the image. | |
| nested_col22 (DeltaGenerator): Column for displaying the analysis button. | |
| Returns: | |
| None | |
| """ | |
| image_for_display = self.resize_image(image_data['image'], 600) | |
| nested_col21.image(image_for_display, caption=f'Uploaded Image: {image_key[-11:]}') | |
| self.handle_analysis_button(image_key, image_data, nested_col22) | |
| def handle_analysis_button(self, image_key: str, image_data: Dict, nested_col22: DeltaGenerator) -> None: | |
| """ | |
| Provides an 'Analyze Image' button and processes the image analysis upon click. | |
| Args: | |
| image_key (str): Unique key identifying the image. | |
| image_data (Dict): Data associated with the image. | |
| nested_col22 (DeltaGenerator): Column for displaying the analysis button. | |
| Returns: | |
| None | |
| """ | |
| if not image_data['analysis_done'] or self.settings_changed or self.confidance_change: | |
| nested_col22.text("Please click 'Analyze Image'..") | |
| analyze_button_key = f'analyze_{image_key}_{st.session_state.detection_model}_' \ | |
| f'{st.session_state.confidence_level}' | |
| with nested_col22: | |
| if st.button('Analyze Image', key=analyze_button_key, on_click=self.disable_widgets, | |
| disabled=self.is_widget_disabled): | |
| with st.spinner('Analyzing the image...'): | |
| caption, detected_objects_str, image_with_boxes = self.analyze_image(image_data['image']) | |
| self.update_image_data(image_key, caption, detected_objects_str, True) | |
| st.session_state['loading_in_progress'] = False | |
| def handle_question_answering(self, image_key: str, image_data: Dict, nested_col22: DeltaGenerator) -> None: | |
| """ | |
| Manages the question-answering interface for each image. | |
| Args: | |
| image_key (str): Unique key identifying the image. | |
| image_data (Dict): Data associated with the image. | |
| nested_col22 (DeltaGenerator): Column for displaying the question-answering interface. | |
| Returns: | |
| None | |
| """ | |
| if image_data['analysis_done']: | |
| self.display_question_answering_interface(image_key, image_data, nested_col22) | |
| if self.settings_changed or self.confidance_change: | |
| nested_col22.warning("Confidence level changed, please click 'Analyze Image' each time you change it.") | |
| def display_question_answering_interface(self, image_key: str, image_data: Dict, | |
| nested_col22: DeltaGenerator) -> None: | |
| """ | |
| Displays the interface for question answering, including sample questions and a custom question input. | |
| Args: | |
| image_key (str): Unique key identifying the image. | |
| image_data (Dict): Data associated with the image. | |
| nested_col22 (DeltaGenerator): The column where the interface will be displayed. | |
| Returns: | |
| None | |
| """ | |
| sample_questions = config.SAMPLE_QUESTIONS.get(image_key, []) | |
| selected_question = nested_col22.selectbox("Select a sample question or type your own:", | |
| ["Custom question..."] + sample_questions, | |
| key=f'sample_question_{image_key}') | |
| # Display custom question input only if "Custom question..." is selected | |
| question = selected_question | |
| if selected_question == "Custom question...": | |
| custom_question = nested_col22.text_input("Or ask your own question:", key=f'custom_question_{image_key}') | |
| question = custom_question | |
| self.process_question(image_key, question, image_data, nested_col22) | |
| qa_history = image_data.get('qa_history', []) | |
| for num, (q, a, p) in enumerate(qa_history): | |
| nested_col22.text(f"Q{num + 1}: {q}\nA{num + 1}: {a}\nPrompt Length: {p}\n") | |
| def process_question(self, image_key: str, question: str, image_data: Dict, nested_col22: DeltaGenerator) -> None: | |
| """ | |
| Processes the user's question, generates an answer, and updates the question-answer history. | |
| This method checks if the question is new or if settings have changed, and if so, generates an answer using the | |
| KBVQA model. | |
| It then updates the question-answer history for the image. | |
| Args: | |
| image_key (str): Unique key identifying the image. | |
| question (str): The question asked by the user. | |
| image_data (Dict): Data associated with the image. | |
| nested_col22 (DeltaGenerator): The column where the answer will be displayed. | |
| Returns: | |
| None | |
| """ | |
| qa_history = image_data.get('qa_history', []) | |
| if question and ( | |
| question not in [q for q, _, _ in qa_history] or self.settings_changed or self.confidance_change): | |
| if nested_col22.button('Get Answer', key=f'answer_{image_key}', disabled=self.is_widget_disabled): | |
| answer, prompt_length = self.answer_question(image_data['caption'], image_data['detected_objects_str'], | |
| question) | |
| self.add_to_qa_history(image_key, question, answer, prompt_length) | |
| def image_qa_app(self) -> None: | |
| """ | |
| Main application interface for image-based question answering. | |
| This method orchestrates the display of sample images, handles image uploads, and facilitates the | |
| question-answering process. | |
| It iterates through each image in the session state, displaying the image and providing interfaces for image | |
| analysis and question answering. | |
| Returns: | |
| None | |
| """ | |
| self.display_sample_images() | |
| self.handle_image_upload() | |
| # self.display_session_state(self.col1) | |
| with self.col2: | |
| for image_key, image_data in self.get_images_data().items(): | |
| with st.container(): | |
| nested_col21, nested_col22 = st.columns([0.65, 0.35]) | |
| self.display_image_and_analysis(image_key, image_data, nested_col21, nested_col22) | |
| self.handle_question_answering(image_key, image_data, nested_col22) | |
| def run_inference(self) -> None: | |
| """ | |
| Sets up widgets and manages the inference process, including model loading and reloading, based on user | |
| interactions. | |
| This method orchestrates the overall flow of the inference process. | |
| Returns: | |
| None | |
| """ | |
| self.set_up_widgets() # Inherent from the StateManager Class | |
| load_fine_tuned_model = False | |
| fine_tuned_model_already_loaded = False | |
| reload_kbvqa = False | |
| reload_detection_model = False | |
| force_reload_full_model = False | |
| st.session_state.button_label = ( | |
| "Reload Model" if (self.is_model_loaded and | |
| st.session_state.kbvqa.detection_model != st.session_state['detection_model']) or | |
| (st.session_state['previous_state']['method'] is not None and | |
| st.session_state['method'] != st.session_state['previous_state']['method']) | |
| else "Load Model" | |
| ) | |
| if st.session_state.button_label == "Reload Model": | |
| self.col1.warning("Model settings have changed, please reload the model.. ") | |
| with self.col1: | |
| if st.session_state.method == "7b-Fine-Tuned Model" or st.session_state.method == "13b-Fine-Tuned Model": | |
| with st.container(): | |
| nested_col11, nested_col12 = st.columns([0.5, 0.5]) | |
| if nested_col11.button(st.session_state.button_label, on_click=self.disable_widgets, | |
| disabled=self.is_widget_disabled): | |
| if st.session_state.button_label == "Load Model": | |
| if self.is_model_loaded: | |
| free_gpu_resources() | |
| fine_tuned_model_already_loaded = True | |
| else: | |
| load_fine_tuned_model = True | |
| elif st.session_state.button_label == "Reload Model" and st.session_state['method'] != \ | |
| st.session_state['previous_state']['method']: # check if the model size have changed | |
| force_reload_full_model = True | |
| elif (self.is_model_loaded and st.session_state.kbvqa.detection_model != | |
| st.session_state['detection_model']): | |
| reload_detection_model = True | |
| if nested_col12.button("Force Reload", on_click=self.disable_widgets, | |
| disabled=self.is_widget_disabled): | |
| force_reload_full_model = True | |
| if load_fine_tuned_model: | |
| t1 = time.time() | |
| free_gpu_resources() | |
| self.load_model() | |
| st.session_state['time_taken_to_load_model'] = int(time.time() - t1) | |
| st.session_state['loading_in_progress'] = False | |
| elif fine_tuned_model_already_loaded: | |
| free_gpu_resources() | |
| self.col1.text("Model already loaded and no settings were changed:)") | |
| st.session_state['loading_in_progress'] = False | |
| elif reload_detection_model: | |
| free_gpu_resources() | |
| self.reload_detection_model() | |
| st.session_state['loading_in_progress'] = False | |
| elif force_reload_full_model: | |
| free_gpu_resources() | |
| t1 = time.time() | |
| self.force_reload_model() | |
| st.session_state['time_taken_to_load_model'] = int(time.time() - t1) | |
| st.session_state['loading_in_progress'] = False | |
| st.session_state['model_loaded'] = True | |
| elif st.session_state.method == "Vision-Language Embeddings Alignment": | |
| self.col1.warning( | |
| f'Model using {st.session_state.method} is desgined but requires large scale data and multiple ' | |
| f'high-end GPUs, implementation will be explored in the future.') | |
| if self.is_model_loaded: | |
| free_gpu_resources() | |
| st.session_state['loading_in_progress'] = False | |
| self.update_prev_state() | |
| self.image_qa_app() # this is the main Q/A Application | |