Spaces:
Running
Running
import streamlit as st | |
from huggingface_hub import InferenceClient | |
import os | |
from typing import Iterator | |
from PIL import Image | |
import pytesseract | |
from PyPDF2 import PdfReader | |
import base64 | |
from together import Together | |
API_KEY = os.getenv("TOGETHER_API_KEY") | |
if not API_KEY: | |
raise ValueError("API key is missing! Make sure TOGETHER_API_KEY is set in the Secrets.") | |
# Initialize the client with Together AI provider | |
def get_client(): | |
#return InferenceClient( | |
# provider="together", | |
# api_key=API_KEY | |
#) | |
return Together(api_key=API_KEY) # Use Together.ai's official client | |
def process_file(file) -> str: | |
"""Process uploaded file and return its content""" | |
if file is None: | |
return "" | |
try: | |
# Handle PDF files | |
if file.type == "application/pdf": | |
text = "" | |
pdf_reader = PdfReader(file) | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
return text | |
# Handle image files | |
elif file.type.startswith("image/"): | |
return base64.b64encode(file.getvalue()).decode("utf-8") | |
# Handle text files | |
else: | |
return file.getvalue().decode('utf-8') | |
except Exception as e: | |
return f"Error processing file: {str(e)}" | |
def generate_response( | |
message: str, | |
history: list[tuple[str, str]], | |
system_message: str, | |
max_tokens: int, | |
temperature: float, | |
top_p: float, | |
files=None | |
) -> Iterator[str]: | |
client = get_client() | |
has_images = False | |
content_blocks = [] | |
image_content = None # To store image data | |
image_mime_type = None # To store MIME type | |
if files: | |
for file in files: | |
content = process_file(file) | |
if file.type.startswith("image/"): | |
has_images = True | |
image_content = content # Already base64 encoded | |
image_mime_type = file.type # Store MIME type | |
else: | |
content_blocks.append({ | |
"type": "text", | |
"text": f"File content:\n{content}" | |
}) | |
# Build messages | |
messages = [{"role": "system", "content": system_message}] | |
# Add history | |
for user_msg, assistant_msg in history: | |
messages.append({"role": "user", "content": user_msg}) | |
messages.append({"role": "assistant", "content": assistant_msg}) | |
try: | |
if has_images: | |
# Vision model request | |
vision_messages = [{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": message}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:{image_mime_type};base64,{image_content}", | |
}, | |
}, | |
] | |
}] | |
stream = client.chat.completions.create( | |
model="meta-llama/Llama-3.2-11B-Vision-Instruct-Turbo", | |
messages=vision_messages, | |
stream=True, | |
) | |
else: | |
# Text-only model request | |
current_message = { | |
"role": "user", | |
"content": [{"type": "text", "text": message}] + content_blocks | |
} | |
messages.append(current_message) | |
stream = client.chat.completions.create( | |
model="deepseek-ai/DeepSeek-R1", | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
stream=True | |
) | |
# Stream response | |
for chunk in stream: | |
if chunk.choices and chunk.choices[0].delta.content: | |
yield chunk.choices[0].delta.content | |
except Exception as e: | |
yield f"Error: {str(e)}" | |
def main(): | |
st.set_page_config(page_title="DeepSeek Chat", page_icon="💭", layout="wide") | |
# Initialize session state for chat history | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
st.title("DeepSeek Chat with File Upload") | |
st.markdown("Chat with DeepSeek AI model. You can optionally upload files for the model to analyze.") | |
# Sidebar for parameters | |
with st.sidebar: | |
st.header("Settings") | |
system_message = st.text_area( | |
"System Message", | |
value="You are a friendly Chatbot.", | |
height=100 | |
) | |
max_tokens = st.slider( | |
"Max Tokens", | |
min_value=1, | |
max_value=8192, | |
value=8192, | |
step=1 | |
) | |
temperature = st.slider( | |
"Temperature", | |
min_value=0.1, | |
max_value=4.0, | |
value=0.0, | |
step=0.1 | |
) | |
top_p = st.slider( | |
"Top-p (nucleus sampling)", | |
min_value=0.1, | |
max_value=1.0, | |
value=0.95, | |
step=0.05 | |
) | |
uploaded_file = st.file_uploader( | |
"Upload File (optional)", | |
type=['txt', 'py', 'md', 'swift', 'java', 'js', 'ts', 'rb', 'go', | |
'php', 'c', 'cpp', 'h', 'hpp', 'cs', 'html', 'css', 'kt', 'svelte', | |
'pdf', 'png', 'jpg', 'jpeg'], # Added file types | |
accept_multiple_files=True | |
) | |
# Display chat messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.write(message["content"]) | |
# Chat input | |
if prompt := st.chat_input("What would you like to know?"): | |
# Display user message | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
with st.chat_message("user"): | |
st.write(prompt) | |
# Generate and display assistant response | |
with st.chat_message("assistant"): | |
response_placeholder = st.empty() | |
full_response = "" | |
# Get message history for context | |
history = [(msg["content"], next_msg["content"]) | |
for msg, next_msg in zip(st.session_state.messages[::2], st.session_state.messages[1::2])] | |
# Stream the response | |
for response_chunk in generate_response( | |
prompt, | |
history, | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
uploaded_file | |
): | |
full_response += response_chunk | |
print(full_response) | |
response_placeholder.markdown(full_response + "▌") | |
response_placeholder.markdown(full_response) | |
# Add assistant response to chat history | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
if __name__ == "__main__": | |
main() |