Spaces:
Running
on
Zero
Running
on
Zero
from collections.abc import Iterator | |
from datetime import datetime | |
from pathlib import Path | |
from threading import Thread | |
import gradio as gr | |
import spaces | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer | |
# Vision model imports | |
from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration | |
import random | |
from themes.research_monochrome import theme | |
today_date = datetime.today().strftime("%B %-d, %Y") # noqa: DTZ002 | |
SYS_PROMPT = f"""Knowledge Cutoff Date: April 2024. | |
Today's Date: {today_date}. | |
You are Granite, developed by IBM. You are a helpful AI assistant""" | |
TITLE = "IBM Granite 3.1 8b Instruct & Vision Preview" | |
DESCRIPTION = """ | |
<p>Granite 3.1 8b instruct is an open-source LLM supporting a 128k context window and Granite Vision 3.1 2B Preview for vision-language capabilities. Start with one of the sample prompts | |
or enter your own. Upload an image to use the vision model. Keep in mind that AI can occasionally make mistakes. | |
<span class="gr_docs_link"> | |
<a href="https://www.ibm.com/granite/docs/">View Granite Instruct Documentation <i class="fa fa-external-link"></i></a> | |
</span> | |
<span class="gr_docs_link"> | |
<a href="https://www.ibm.com/granite/vision/docs/">View Granite Vision Documentation <i class="fa fa-external-link"></i></a> | |
</span> | |
</p> | |
""" | |
MAX_INPUT_TOKEN_LENGTH = 128_000 | |
MAX_NEW_TOKENS = 1024 | |
TEMPERATURE = 0.7 | |
TOP_P = 0.85 | |
TOP_K = 50 | |
REPETITION_PENALTY = 1.05 | |
VISION_TEMPERATURE = 0.2 | |
VISION_TOP_P = 0.95 | |
VISION_TOP_K = 50 | |
VISION_MAX_TOKENS = 128 | |
if not torch.cuda.is_available(): | |
print("This demo may not work on CPU.") | |
# Text model loading | |
text_model = AutoModelForCausalLM.from_pretrained( | |
"ibm-granite/granite-3.1-8b-instruct", torch_dtype=torch.float16, device_map="auto" | |
) | |
text_tokenizer = AutoTokenizer.from_pretrained("ibm-granite/granite-3.1-8b-instruct") | |
text_tokenizer.use_default_system_prompt = False | |
# Vision model loading | |
vision_model_path = "ibm-granite/granite-vision-3.1-2b-preview" | |
vision_processor = LlavaNextProcessor.from_pretrained(vision_model_path, use_fast=True) | |
vision_model = LlavaNextForConditionalGeneration.from_pretrained(vision_model_path, torch_dtype="auto", device_map="auto") | |
def generate( | |
message: str, | |
chat_history: list[dict], | |
temperature: float = TEMPERATURE, | |
repetition_penalty: float = REPETITION_PENALTY, | |
top_p: float = TOP_P, | |
top_k: float = TOP_K, | |
max_new_tokens: int = MAX_NEW_TOKENS, | |
) -> Iterator[str]: | |
"""Generate function for text chat demo.""" | |
# Build messages | |
conversation = [] | |
conversation.append({"role": "system", "content": SYS_PROMPT}) | |
conversation += chat_history | |
conversation.append({"role": "user", "content": message}) | |
# Convert messages to prompt format | |
input_ids = text_tokenizer.apply_chat_template( | |
conversation, | |
return_tensors="pt", | |
add_generation_prompt=True, | |
truncation=True, | |
max_length=MAX_INPUT_TOKEN_LENGTH - max_new_tokens, | |
) | |
input_ids = input_ids.to(text_model.device) | |
streamer = TextIteratorStreamer(text_tokenizer, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=text_model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
def get_text_from_content(content): | |
texts = [] | |
for item in content: | |
if item["type"] == "text": | |
texts.append(item["text"]) | |
elif item["type"] == "image": | |
texts.append("[Image]") | |
return " ".join(texts) | |
def chat_inference(image, text, conversation, temperature=VISION_TEMPERATURE, top_p=VISION_TOP_P, top_k=VISION_TOP_K, max_tokens=VISION_MAX_TOKENS): | |
if conversation is None: | |
conversation = [] | |
user_content = [] | |
if image is not None: | |
user_content.append({"type": "image", "image": image}) | |
if text and text.strip(): | |
user_content.append({"type": "text", "text": text.strip()}) | |
if not user_content: | |
return conversation_display(conversation), conversation | |
conversation.append({ | |
"role": "user", | |
"content": user_content | |
}) | |
inputs = vision_processor.apply_chat_template( | |
conversation, | |
add_generation_prompt=True, | |
tokenize=True, | |
return_dict=True, | |
return_tensors="pt" | |
).to("cuda") | |
torch.manual_seed(random.randint(0, 10000)) | |
generation_kwargs = { | |
"max_new_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"top_k": top_k, | |
"do_sample": True, | |
} | |
output = vision_model.generate(**inputs, **generation_kwargs) | |
assistant_response = vision_processor.decode(output[0], skip_special_tokens=True) | |
conversation.append({ | |
"role": "assistant", | |
"content": [{"type": "text", "text": assistant_response.strip()}] | |
}) | |
return conversation_display(conversation), conversation | |
def conversation_display(conversation): | |
chat_history = [] | |
for msg in conversation: | |
if msg["role"] == "user": | |
user_text = get_text_from_content(msg["content"]) | |
chat_history.append({"role": "user", "content": user_text}) | |
elif msg["role"] == "assistant": | |
assistant_text = msg["content"][0]["text"].split("<|assistant|>")[-1].strip() | |
chat_history.append({"role": "assistant", "content": assistant_text}) | |
return chat_history | |
def clear_chat(): | |
return [], [], "", None, [] # Cleared state for both text and vision | |
css_file_path = Path(Path(__file__).parent / "app.css") | |
head_file_path = Path(Path(__file__).parent / "app_head.html") | |
# Advanced settings (displayed in Accordion) - Text Model | |
text_temperature_slider = gr.Slider( | |
minimum=0, maximum=1.0, value=TEMPERATURE, step=0.1, label="Text Temperature", elem_classes=["gr_accordion_element"] | |
) | |
text_top_p_slider = gr.Slider( | |
minimum=0, maximum=1.0, value=TOP_P, step=0.05, label="Text Top P", elem_classes=["gr_accordion_element"] | |
) | |
text_top_k_slider = gr.Slider( | |
minimum=0, maximum=100, value=TOP_K, step=1, label="Text Top K", elem_classes=["gr_accordion_element"] | |
) | |
text_repetition_penalty_slider = gr.Slider( | |
minimum=0, | |
maximum=2.0, | |
value=REPETITION_PENALTY, | |
step=0.05, | |
label="Text Repetition Penalty", | |
elem_classes=["gr_accordion_element"], | |
) | |
text_max_new_tokens_slider = gr.Slider( | |
minimum=1, | |
maximum=2000, | |
value=MAX_NEW_TOKENS, | |
step=1, | |
label="Text Max New Tokens", | |
elem_classes=["gr_accordion_element"], | |
) | |
text_chat_interface_accordion = gr.Accordion(label="Text Model Advanced Settings", open=False) | |
# Advanced settings (displayed in Accordion) - Vision Model | |
vision_temperature_slider = gr.Slider( | |
minimum=0.0, maximum=2.0, value=VISION_TEMPERATURE, step=0.01, label="Vision Temperature", elem_classes=["gr_accordion_element"] | |
) | |
vision_top_p_slider = gr.Slider( | |
minimum=0.0, maximum=1.0, value=VISION_TOP_P, step=0.01, label="Vision Top p", elem_classes=["gr_accordion_element"] | |
) | |
vision_top_k_slider = gr.Slider( | |
minimum=0, maximum=100, value=VISION_TOP_K, step=1, label="Vision Top k", elem_classes=["gr_accordion_element"] | |
) | |
vision_max_tokens_slider = gr.Slider( | |
minimum=10, maximum=300, value=VISION_MAX_TOKENS, step=1, label="Vision Max Tokens", elem_classes=["gr_accordion_element"] | |
) | |
vision_chat_interface_accordion = gr.Accordion(label="Vision Model Advanced Settings", open=False) | |
with gr.Blocks(fill_height=True, css_paths=css_file_path, head_paths=head_file_path, theme=theme, title=TITLE) as demo: | |
gr.HTML(f"<h1>{TITLE}</h1>", elem_classes=["gr_title"]) | |
gr.HTML(DESCRIPTION) | |
chatbot = gr.Chatbot(label="Chat History", elem_id="chatbot", height=500, type='messages') | |
text_input = gr.Textbox(lines=2, placeholder="Enter your message here", label="Message") | |
image_input = gr.Image(type="pil", label="Upload Image (optional)") | |
with text_chat_interface_accordion: | |
text_temperature_slider | |
text_top_p_slider | |
text_top_k_slider | |
text_repetition_penalty_slider | |
text_max_new_tokens_slider | |
with vision_chat_interface_accordion: | |
vision_temperature_slider | |
vision_top_p_slider | |
vision_top_k_slider | |
vision_max_tokens_slider | |
clear_button = gr.Button("Clear Chat") | |
send_button = gr.Button("Send Message") # Changed from "Chat" to "Send Message" for clarity | |
text_state = gr.State([]) # State for text chatbot history | |
vision_state = gr.State([]) # State for vision chatbot history | |
chatbot_type_state = gr.State("text") # State to track which chatbot is in use | |
def send_message(image_input, text_input, chatbot_type_state, text_state, vision_state, | |
text_temperature, text_repetition_penalty, text_top_p, text_top_k, text_max_new_tokens, | |
vision_temperature, vision_top_p, vision_top_k, vision_max_tokens): | |
if image_input: | |
chatbot_type_state = "vision" | |
history = vision_state | |
gen_kwargs_vision = { | |
"temperature": vision_temperature, | |
"top_p": vision_top_p, | |
"top_k": vision_top_k, | |
"max_tokens": vision_max_tokens, | |
"conversation": history | |
} | |
chat_output, updated_vision_state = chat_inference(image=image_input, text=text_input, **gen_kwargs_vision) | |
return chat_output, updated_vision_state, chatbot_type_state, gr.ChatInterface.update(visible=False), gr.Chatbot.update(visible=True) # Hide text interface, show vision chatbot | |
else: | |
chatbot_type_state = "text" | |
history = text_state | |
gen_kwargs_text = { | |
"temperature": text_temperature, | |
"repetition_penalty": text_repetition_penalty, | |
"top_p": text_top_p, | |
"top_k": text_top_k, | |
"max_new_tokens": text_max_new_tokens, | |
"message": text_input, | |
"chat_history": history | |
} | |
chat_output_iterator = generate(**gen_kwargs_text) | |
output_text = "" | |
for text_chunk in chat_output_iterator: | |
output_text = text_chunk | |
updated_text_state = history + [{"role": "user", "content": text_input}, {"role": "assistant", "content": output_text}] | |
text_chatbot_history = updated_text_state # format for chatbot display | |
formatted_history = [] | |
for message in text_chatbot_history: | |
formatted_history.append((message["content"] if message["role"] == "user" else None, message["content"] if message["role"] == "assistant" else None)) | |
return formatted_history, updated_text_state, chatbot_type_state, gr.ChatInterface.update(visible=True), gr.Chatbot.update(visible=False) # Show text interface, hide vision chatbot | |
send_button.click( | |
send_message, | |
inputs=[image_input, text_input, chatbot_type_state, text_state, vision_state, | |
text_temperature_slider, text_repetition_penalty_slider, text_top_p_slider, text_top_k_slider, text_max_new_tokens_slider, | |
vision_temperature_slider, vision_top_p_slider, vision_top_k_slider, vision_max_tokens_slider], | |
outputs=[chatbot, vision_state, chatbot_type_state, gr.ChatInterface(), gr.Chatbot()] # Dummy ChatInterface output, real Chatbot output | |
) | |
clear_button.click( | |
clear_chat, | |
inputs=None, | |
outputs=[chatbot, vision_state, text_input, image_input, text_state] # Added text_state to clear | |
) | |
gr.Examples( | |
examples=[ | |
["Explain the concept of quantum computing to someone with no background in physics or computer science.", None], | |
["What is OpenShift?", None], | |
["What's the importance of low latency inference?", None], | |
["Help me boost productivity habits.", None], | |
[ | |
"""Explain the following code in a concise manner: | |
```java | |
import java.util.ArrayList; | |
import java.util.List; | |
public class Main { | |
public static void main(String[] args) { | |
int[] arr = {1, 5, 3, 4, 2}; | |
int diff = 3; | |
List<Pair> pairs = findPairs(arr, diff); | |
for (Pair pair : pairs) { | |
System.out.println(pair.x + " " + pair.y); | |
} | |
} | |
public static List<Pair> findPairs(int[] arr, int diff) { | |
List<Pair> pairs = new ArrayList<>(); | |
for (int i = 0; i < arr.length; i++) { | |
for (int j = i + 1; j < arr.length; j++) { | |
if (Math.abs(arr[i] - arr[j]) < diff) { | |
pairs.add(new Pair(arr[i], arr[j])); | |
} | |
} | |
} | |
return pairs; | |
} | |
} | |
class Pair { | |
int x; | |
int y; | |
public Pair(int x, int y) { | |
this.x = x; | |
this.y = y; | |
} | |
} | |
```""", None | |
], | |
[ | |
"""Generate a Java code block from the following explanation: | |
The code in the Main class finds all pairs in an array whose absolute difference is less than a given value. | |
The findPairs method takes two arguments: an array of integers and a difference value. It iterates over the array and compares each element to every other element in the array. If the absolute difference between the two elements is less than the difference value, a new Pair object is created and added to a list. | |
The Pair class is a simple data structure that stores two integers. | |
The main method creates an array of integers, initializes the difference value, and calls the findPairs method to find all pairs in the array. Finally, the code iterates over the list of pairs and prints each pair to the console.""" , None # noqa: E501 | |
], | |
["What is in this image?", "https://raw.githubusercontent.com/gradio-app/gradio/main/test/test_files/bus.png"] # Vision example | |
], | |
inputs=[text_input, image_input], | |
example_labels=[ | |
"Explain quantum computing", | |
"What is OpenShift?", | |
"Importance of low latency inference", | |
"Boosting productivity habits", | |
"Explain and document your code", | |
"Generate Java Code", | |
"Vision Example: What is in this image?" | |
], | |
cache_examples=False, | |
) | |
if __name__ == "__main__": | |
demo.queue().launch() |