File size: 5,540 Bytes
62257cb
 
 
 
 
 
50e208c
62257cb
 
 
 
 
9149fd8
 
62257cb
 
 
 
 
 
 
 
 
30bf3a3
 
62257cb
 
f21d103
 
62257cb
 
 
9149fd8
 
62257cb
 
 
 
9149fd8
 
62257cb
 
 
 
 
 
 
 
 
 
 
 
 
9149fd8
62257cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50e208c
62257cb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import streamlit as st
import extra_streamlit_components as stx
import requests
from PIL import Image
from transformers import AutoProcessor, AutoModelForVision2Seq
from io import BytesIO
#import replicate
from llama_index.llms.palm import PaLM
from llama_index import ServiceContext, VectorStoreIndex, Document
from llama_index.memory import ChatMemoryBuffer
import os
import datetime
from PIL import Image
import io

# Set up the title of the application
st.title("Image Captioning and Chat")

# Initialize the cookie manager
cookie_manager = stx.CookieManager()

@st.cache_resource
def get_vision_model():
    model = AutoModelForVision2Seq.from_pretrained("microsoft/kosmos-2-patch14-224")
    processor = AutoProcessor.from_pretrained("microsoft/kosmos-2-patch14-224")
    return model, processor

model, processor = get_vision_model()

# Function to get image caption via Kosmos2.
@st.cache_data
def get_image_caption(image_data):
    # Convert BytesIO to PIL Image
    image = Image.open(io.BytesIO(image_data))

    model, processor = get_vision_model()

    prompt = "<grounding>An image of"
    # Pass the PIL image to the processor
    inputs = processor(text=prompt, images=image, return_tensors="pt")

    generated_ids = model.generate(
        pixel_values=inputs["pixel_values"],
        input_ids=inputs["input_ids"][:, :-1],
        attention_mask=inputs["attention_mask"][:, :-1],
        img_features=None,
        img_attn_mask=inputs["img_attn_mask"][:, :-1],
        use_cache=True,
        max_new_tokens=64,
    )
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

    text_description, entities = processor.post_process_generation(generated_text)

    return text_description

# Function to create the chat engine.
@st.cache_resource
def create_chat_engine(img_desc, api_key):
    llm = PaLM(api_key=api_key)
    service_context = ServiceContext.from_defaults(llm=llm)
    doc = Document(text=img_desc)
    index = VectorStoreIndex.from_documents([doc], service_context=service_context)
    chatmemory = ChatMemoryBuffer.from_defaults(token_limit=1500)
    
    chat_engine = index.as_chat_engine(
        chat_mode="context",
        system_prompt=(
            f"You are a chatbot, able to have normal interactions, as well as talk. "
            "You always answer in great detail and are polite. Your responses always descriptive. "
            "Your job is to talk about an image the user has uploaded. Image description: {img_desc}."
        ),
        verbose=True,
        memory=chatmemory
    )
    return chat_engine

# Clear chat function
def clear_chat():
    if "messages" in st.session_state:
        del st.session_state.messages
    if "image_file" in st.session_state:
        del st.session_state.image_file

# Callback function to clear the chat when a new image is uploaded
def on_image_upload():
    clear_chat()        

# Add a clear chat button
if st.button("Clear Chat"):
    clear_chat()        

# Image upload section.
image_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"], key="uploaded_image", on_change=on_image_upload)
if image_file:
    # Display the uploaded image at a standard width.
    st.image(image_file, caption='Uploaded Image.', width=200)
    # Process the uploaded image to get a caption.
    image_data = BytesIO(image_file.getvalue())
    img_desc = get_image_caption(image_data)
    st.write(f"Image description: {img_desc}")

    # Initialize the chat engine with the image description.
    chat_engine = create_chat_engine(img_desc, os.environ["GOOGLE_API_KEY"])

# Initialize session state for messages if it doesn't exist
if "messages" not in st.session_state:
    st.session_state.messages = []

# Display previous messages
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# Handle new user input
user_input = st.chat_input("Ask me about the image:", key="chat_input")
if user_input:
    # Retrieve the message count from cookies
    message_count = cookie_manager.get(cookie='message_count')
    if message_count is None:
        message_count = 0
    else:
        message_count = int(message_count)

    # Check if the message limit has been reached
    if message_count >= 20:
        st.error("Notice: The maximum message limit for this demo version has been reached.")
    else:
        # Append user message to the session state
        st.session_state.messages.append({"role": "user", "content": user_input})

        # Display user message immediately
        with st.chat_message("user"):
            st.markdown(user_input)

        # Call the chat engine to get the response if an image has been uploaded
        if image_file:
            # Get the response from your chat engine
            response = chat_engine.chat(user_input)

            # Append assistant message to the session state
            st.session_state.messages.append({"role": "assistant", "content": response})

            # Display the assistant message
            with st.chat_message("assistant"):
                st.markdown(response)
        
        # Increment the message count and update the cookie
        message_count += 1
        cookie_manager.set('message_count', str(message_count), expires_at=datetime.datetime.now() + datetime.timedelta(days=30))



# Set Replicate and Google API keys
#os.environ['REPLICATE_API_TOKEN'] = st.secrets['REPLICATE_API_TOKEN']
os.environ["GOOGLE_API_KEY"] = st.secrets['GOOGLE_API_KEY']