Spaces:
Running
Running
import streamlit as st | |
from together import Together | |
import os | |
from typing import Iterator | |
from PIL import Image | |
import base64 | |
from PyPDF2 import PdfReader | |
import json # λλ²κΉ μ© μΆκ° | |
API_KEY = os.getenv("TOGETHER_API_KEY") | |
if not API_KEY: | |
raise ValueError("API key is missing! Make sure TOGETHER_API_KEY is set in the Secrets.") | |
def get_client(): | |
return Together(api_key=API_KEY) | |
def process_file(file) -> str: | |
if file is None: | |
return "" | |
try: | |
if file.type == "application/pdf": | |
text = "" | |
pdf_reader = PdfReader(file) | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return text | |
elif file.type.startswith("image/"): | |
return base64.b64encode(file.getvalue()).decode("utf-8") | |
else: | |
return file.getvalue().decode('utf-8') | |
except Exception as e: | |
st.error(f"νμΌ μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}") | |
return "" | |
def format_message(role: str, content: str) -> dict: | |
"""API λ©μμ§ νμμ λ§κ² λ©μμ§λ₯Ό ν¬λ§·ν ν©λλ€.""" | |
return { | |
"role": role, | |
"content": content | |
} | |
def get_formatted_history(messages: list) -> list: | |
"""λν νμ€ν 리λ₯Ό API νμμ λ§κ² λ³νν©λλ€.""" | |
formatted_messages = [] | |
for msg in messages: | |
if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
# μν μ΄ μ¬λ°λ₯Έμ§ νμΈνκ³ μμ | |
role = msg["role"] | |
if role not in ["system", "user", "assistant"]: | |
role = "user" if role == "human" else "assistant" | |
formatted_messages.append(format_message(role, msg["content"])) | |
return formatted_messages | |
def generate_response( | |
message: str, | |
history: list, | |
system_message: str, | |
max_tokens: int, | |
temperature: float, | |
top_p: float, | |
files=None | |
) -> Iterator[str]: | |
client = get_client() | |
try: | |
# λ©μμ§ λ°°μ΄ μ΄κΈ°ν | |
messages = [] | |
# μμ€ν λ©μμ§ μΆκ° | |
if system_message.strip(): | |
messages.append(format_message("system", system_message)) | |
# λν νμ€ν 리 μΆκ° (μ΄λ―Έ μ¬μ©μ λ©μμ§κ° ν¬ν¨λμ΄ μμ) | |
formatted_history = get_formatted_history(history) | |
# νμΌ λ΄μ©μ΄ μλ κ²½μ° λ§μ§λ§ μ¬μ©μ λ©μμ§μ μΆκ° | |
if files: | |
file_contents = [] | |
for file in files: | |
content = process_file(file) | |
if content: | |
file_contents.append(f"νμΌ λ΄μ©:\n{content}") | |
if file_contents: | |
if formatted_history and formatted_history[-1]["role"] == "user": | |
formatted_history[-1]["content"] += "\n\n" + "\n\n".join(file_contents) | |
else: | |
formatted_history.append(format_message("user", "\n\n".join(file_contents))) | |
messages.extend(formatted_history) | |
# λλ²κΉ : API μμ² λ΄μ© μΆλ ₯ | |
st.write("API μμ² λ©μμ§:", json.dumps(messages, ensure_ascii=False, indent=2)) | |
# API μμ² | |
try: | |
stream = client.chat.completions.create( | |
model="deepseek-ai/DeepSeek-R1", | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
stream=True | |
) | |
for chunk in stream: | |
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: | |
yield chunk.choices[0].delta.content | |
except Exception as e: | |
if "rate limit" in str(e).lower(): | |
yield "API νΈμΆ νλμ λλ¬νμ΅λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ." | |
else: | |
st.error(f"API μ€λ₯ μμΈ: {str(e)}") | |
yield "μ£μ‘ν©λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ." | |
except Exception as e: | |
st.error(f"μ 체 μ€λ₯ μμΈ: {str(e)}") | |
yield "μ€λ₯κ° λ°μνμ΅λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ." | |
def main(): | |
st.set_page_config(page_title="DeepSeek μ±ν ", page_icon="π", layout="wide") | |
# μΈμ μν μ΄κΈ°ν | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
st.title("DeepSeek μ±ν ") | |
st.markdown("DeepSeek AI λͺ¨λΈκ³Ό λννμΈμ. νμν κ²½μ° νμΌμ μ λ‘λν μ μμ΅λλ€.") | |
with st.sidebar: | |
st.header("μ€μ ") | |
system_message = st.text_area( | |
"μμ€ν λ©μμ§", | |
value="λΉμ μ κΉμ΄ μκ² μκ°νλ AIμ λλ€. λ¬Έμ λ₯Ό κΉμ΄ κ³ λ €νκ³ μ²΄κ³μ μΈ μΆλ‘ κ³Όμ μ ν΅ν΄ μ¬λ°λ₯Έ ν΄κ²°μ± μ λμΆνμΈμ. λ°λμ νκΈλ‘ λ΅λ³νμΈμ.", | |
height=100 | |
) | |
max_tokens = st.slider("μ΅λ ν ν° μ", 1, 4096, 2048) | |
temperature = st.slider("μ¨λ", 0.0, 2.0, 0.7, 0.1) | |
top_p = st.slider("Top-p", 0.0, 1.0, 0.7, 0.1) | |
uploaded_file = st.file_uploader( | |
"νμΌ μ λ‘λ (μ νμ¬ν)", | |
type=['txt', 'py', 'md', 'pdf', 'png', 'jpg', 'jpeg'], | |
accept_multiple_files=True | |
) | |
# μ μ₯λ λν λ©μμ§ νμ | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# μ±ν μ λ ₯ | |
if prompt := st.chat_input("무μμ μκ³ μΆμΌμ κ°μ?"): | |
# μ¬μ©μ λ©μμ§ μΆκ° (νλ²λ§ μΆκ°) | |
user_message = format_message("user", prompt) | |
st.session_state.messages.append(user_message) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# μ΄μμ€ν΄νΈ μλ΅ μμ± | |
with st.chat_message("assistant"): | |
response_placeholder = st.empty() | |
full_response = "" | |
# generate_response νΈμΆ | |
for response_chunk in generate_response( | |
prompt, | |
st.session_state.messages, | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
uploaded_file | |
): | |
full_response += response_chunk | |
response_placeholder.markdown(full_response + "β") | |
response_placeholder.markdown(full_response) | |
# μλ΅ μ μ₯ | |
assistant_message = format_message("assistant", full_response) | |
st.session_state.messages.append(assistant_message) | |
if __name__ == "__main__": | |
main() | |