|
import os |
|
from dotenv import load_dotenv |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import pandas as pd |
|
from typing import List, Tuple |
|
import json |
|
from datetime import datetime |
|
from datasets import load_dataset |
|
|
|
try: |
|
medical_datasets = { |
|
'all_processed': load_dataset("lavita/medical-qa-datasets", "all-processed"), |
|
'icliniq': load_dataset("lavita/medical-qa-datasets", "chatdoctor-icliniq"), |
|
'healthcaremagic': load_dataset("lavita/medical-qa-datasets", "chatdoctor_healthcaremagic") |
|
} |
|
print("μλ£ λ°μ΄ν°μ
λ‘λ μλ£") |
|
except Exception as e: |
|
print(f"μλ£ λ°μ΄ν°μ
λ‘λ μ€ν¨: {e}") |
|
medical_datasets = None |
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
|
|
LLM_MODELS = { |
|
"Cohere c4ai-crp-08-2024": "CohereForAI/c4ai-command-r-plus-08-2024", |
|
"Meta Llama3.3-70B": "meta-llama/Llama-3.3-70B-Instruct" |
|
} |
|
|
|
class ChatHistory: |
|
def __init__(self): |
|
self.history = [] |
|
self.history_file = "/tmp/chat_history.json" |
|
self.load_history() |
|
|
|
def add_conversation(self, user_msg: str, assistant_msg: str): |
|
conversation = { |
|
"timestamp": datetime.now().isoformat(), |
|
"messages": [ |
|
{"role": "user", "content": user_msg}, |
|
{"role": "assistant", "content": assistant_msg} |
|
] |
|
} |
|
self.history.append(conversation) |
|
self.save_history() |
|
|
|
def format_for_display(self): |
|
|
|
formatted = [] |
|
for conv in self.history: |
|
formatted.append([ |
|
conv["messages"][0]["content"], |
|
conv["messages"][1]["content"] |
|
]) |
|
return formatted |
|
|
|
def get_messages_for_api(self): |
|
|
|
messages = [] |
|
for conv in self.history: |
|
messages.extend([ |
|
{"role": "user", "content": conv["messages"][0]["content"]}, |
|
{"role": "assistant", "content": conv["messages"][1]["content"]} |
|
]) |
|
return messages |
|
|
|
def clear_history(self): |
|
self.history = [] |
|
self.save_history() |
|
|
|
def save_history(self): |
|
try: |
|
with open(self.history_file, 'w', encoding='utf-8') as f: |
|
json.dump(self.history, f, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
print(f"νμ€ν 리 μ μ₯ μ€ν¨: {e}") |
|
|
|
def load_history(self): |
|
try: |
|
if os.path.exists(self.history_file): |
|
with open(self.history_file, 'r', encoding='utf-8') as f: |
|
self.history = json.load(f) |
|
except Exception as e: |
|
print(f"νμ€ν 리 λ‘λ μ€ν¨: {e}") |
|
self.history = [] |
|
|
|
|
|
|
|
chat_history = ChatHistory() |
|
|
|
def get_client(model_name="Cohere c4ai-crp-08-2024"): |
|
try: |
|
return InferenceClient(LLM_MODELS[model_name], token=HF_TOKEN) |
|
except Exception: |
|
return InferenceClient(LLM_MODELS["Meta Llama3.3-70B"], token=HF_TOKEN) |
|
|
|
def analyze_file_content(content, file_type): |
|
"""Analyze file content and return structural summary""" |
|
if file_type in ['parquet', 'csv']: |
|
try: |
|
lines = content.split('\n') |
|
header = lines[0] |
|
columns = header.count('|') - 1 |
|
rows = len(lines) - 3 |
|
return f"π λ°μ΄ν°μ
ꡬ쑰: {columns}κ° μ»¬λΌ, {rows}κ° λ°μ΄ν°" |
|
except: |
|
return "β λ°μ΄ν°μ
ꡬ쑰 λΆμ μ€ν¨" |
|
|
|
lines = content.split('\n') |
|
total_lines = len(lines) |
|
non_empty_lines = len([line for line in lines if line.strip()]) |
|
|
|
if any(keyword in content.lower() for keyword in ['def ', 'class ', 'import ', 'function']): |
|
functions = len([line for line in lines if 'def ' in line]) |
|
classes = len([line for line in lines if 'class ' in line]) |
|
imports = len([line for line in lines if 'import ' in line or 'from ' in line]) |
|
return f"π» μ½λ ꡬ쑰: {total_lines}μ€ (ν¨μ: {functions}, ν΄λμ€: {classes}, μν¬νΈ: {imports})" |
|
|
|
paragraphs = content.count('\n\n') + 1 |
|
words = len(content.split()) |
|
return f"π λ¬Έμ ꡬ쑰: {total_lines}μ€, {paragraphs}λ¨λ½, μ½ {words}λ¨μ΄" |
|
|
|
def read_uploaded_file(file): |
|
if file is None: |
|
return "", "" |
|
try: |
|
file_ext = os.path.splitext(file.name)[1].lower() |
|
|
|
if file_ext == '.parquet': |
|
df = pd.read_parquet(file.name, engine='pyarrow') |
|
content = df.head(10).to_markdown(index=False) |
|
return content, "parquet" |
|
elif file_ext == '.csv': |
|
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] |
|
for encoding in encodings: |
|
try: |
|
df = pd.read_csv(file.name, encoding=encoding) |
|
content = f"π λ°μ΄ν° 미리보기:\n{df.head(10).to_markdown(index=False)}\n\n" |
|
content += f"\nπ λ°μ΄ν° μ 보:\n" |
|
content += f"- μ 체 ν μ: {len(df)}\n" |
|
content += f"- μ 체 μ΄ μ: {len(df.columns)}\n" |
|
content += f"- μ»¬λΌ λͺ©λ‘: {', '.join(df.columns)}\n" |
|
content += f"\nπ μ»¬λΌ λ°μ΄ν° νμ
:\n" |
|
for col, dtype in df.dtypes.items(): |
|
content += f"- {col}: {dtype}\n" |
|
null_counts = df.isnull().sum() |
|
if null_counts.any(): |
|
content += f"\nβ οΈ κ²°μΈ‘μΉ:\n" |
|
for col, null_count in null_counts[null_counts > 0].items(): |
|
content += f"- {col}: {null_count}κ° λλ½\n" |
|
return content, "csv" |
|
except UnicodeDecodeError: |
|
continue |
|
raise UnicodeDecodeError(f"β μ§μλλ μΈμ½λ©μΌλ‘ νμΌμ μ½μ μ μμ΅λλ€ ({', '.join(encodings)})") |
|
else: |
|
encodings = ['utf-8', 'cp949', 'euc-kr', 'latin1'] |
|
for encoding in encodings: |
|
try: |
|
with open(file.name, 'r', encoding=encoding) as f: |
|
content = f.read() |
|
return content, "text" |
|
except UnicodeDecodeError: |
|
continue |
|
raise UnicodeDecodeError(f"β μ§μλλ μΈμ½λ©μΌλ‘ νμΌμ μ½μ μ μμ΅λλ€ ({', '.join(encodings)})") |
|
except Exception as e: |
|
return f"β νμΌ μ½κΈ° μ€λ₯: {str(e)}", "error" |
|
|
|
def get_medical_context(query): |
|
"""μλ£ λ°μ΄ν°μ
μμ κ΄λ ¨ μ 보 κ²μ""" |
|
if medical_datasets is None: |
|
return "" |
|
|
|
try: |
|
relevant_info = [] |
|
|
|
|
|
for dataset_name, dataset in medical_datasets.items(): |
|
for item in dataset['train']: |
|
|
|
if 'question' in item and query.lower() in item['question'].lower(): |
|
relevant_info.append(f"Q: {item['question']}\nA: {item['answer']}") |
|
elif 'answer' in item and query.lower() in item['answer'].lower(): |
|
relevant_info.append(f"Q: {item['question']}\nA: {item['answer']}") |
|
|
|
if len(relevant_info) >= 3: |
|
break |
|
|
|
if relevant_info: |
|
return "\n\nμλ£ μ°Έκ³ μ 보:\n" + "\n---\n".join(relevant_info[:3]) |
|
return "" |
|
except Exception as e: |
|
print(f"μλ£ λ°μ΄ν° κ²μ μ€λ₯: {e}") |
|
return "" |
|
|
|
|
|
SYSTEM_PREFIX = """μ λ μν μ λ¬Έ AI μ΄μμ€ν΄νΈ 'GiniGEN Medical'μ
λλ€. |
|
μ λ¬Έ μλ£ λ°μ΄ν°λ² μ΄μ€λ₯Ό κΈ°λ°μΌλ‘ λ€μκ³Ό κ°μ μ λ¬Έμ±μ κ°μ§κ³ μν΅νκ² μ΅λλ€: |
|
|
|
1. π₯ μΌλ°μ μΈ μν μ 보 μ 곡 |
|
2. π¬ μ¦μ λ° μ§λ³ κ΄λ ¨ μ€λͺ
|
|
3. π§¬ κ±΄κ° κ΄λ¦¬ μ‘°μΈ |
|
4. π μν μ°κ΅¬ λ°μ΄ν° ν΄μ |
|
5. βοΈ μλ°© μν μ 보 |
|
|
|
λ€μ μμΉμΌλ‘ μν΅νκ² μ΅λλ€: |
|
1. π€ μ λ’°ν μ μλ μν μ 보 μ 곡 |
|
2. π‘ μ΄ν΄νκΈ° μ¬μ΄ μν μ€λͺ
|
|
3. π― κ°μΈλ³ λ§μΆ€ κ±΄κ° μ 보 |
|
4. β οΈ μλ£ λ©΄μ±
μ‘°ν μ€μ |
|
5. β¨ κ³Όνμ κ·Όκ±° κΈ°λ° μ‘°μΈ |
|
|
|
μ€μ κ³ μ§μ¬ν: |
|
- μ΄λ μΌλ°μ μΈ μ 보 μ 곡 λͺ©μ μ΄λ©°, μ λ¬Έ μλ£ μλ΄μ λ체ν μ μμ΅λλ€. |
|
- κΈ΄κΈν μλ£ μν©μ΄λ μ¬κ°ν μ¦μμ κ²½μ° μ¦μ μλ£μ§μ μ°Ύμμ£ΌμΈμ. |
|
- λͺ¨λ μΉλ£ κ²°μ μ λ°λμ λ΄λΉ μλ£μ§κ³Ό μλ΄ ν κ²°μ νμκΈ° λ°λλλ€.""" |
|
|
|
def chat(message, history, uploaded_file, system_message="", max_tokens=4000, temperature=0.7, top_p=0.9): |
|
if not message: |
|
return "", history |
|
|
|
try: |
|
|
|
pharmkg_context = get_medical_context(message) |
|
|
|
system_message = SYSTEM_PREFIX + system_message + pharmkg_context |
|
|
|
|
|
if uploaded_file: |
|
content, file_type = read_uploaded_file(uploaded_file) |
|
if file_type == "error": |
|
error_message = content |
|
chat_history.add_conversation(message, error_message) |
|
return "", history + [[message, error_message]] |
|
|
|
file_summary = analyze_file_content(content, file_type) |
|
|
|
if file_type in ['parquet', 'csv']: |
|
system_message += f"\n\nνμΌ λ΄μ©:\n```markdown\n{content}\n```" |
|
else: |
|
system_message += f"\n\nνμΌ λ΄μ©:\n```\n{content}\n```" |
|
|
|
if message == "νμΌ λΆμμ μμν©λλ€...": |
|
message = f"""[νμΌ κ΅¬μ‘° λΆμ] {file_summary} |
|
λ€μ κ΄μ μμ λμμ λλ¦¬κ² μ΅λλ€: |
|
1. π μ λ°μ μΈ λ΄μ© νμ
|
|
2. π‘ μ£Όμ νΉμ§ μ€λͺ
|
|
3. π― μ€μ©μ μΈ νμ© λ°©μ |
|
4. β¨ κ°μ μ μ |
|
5. π¬ μΆκ° μ§λ¬Έμ΄λ νμν μ€λͺ
""" |
|
|
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
|
|
|
|
if history: |
|
for user_msg, assistant_msg in history: |
|
messages.append({"role": "user", "content": user_msg}) |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
client = get_client() |
|
partial_message = "" |
|
|
|
for msg in client.chat_completion( |
|
messages, |
|
max_tokens=max_tokens, |
|
stream=True, |
|
temperature=temperature, |
|
top_p=top_p, |
|
): |
|
token = msg.choices[0].delta.get('content', None) |
|
if token: |
|
partial_message += token |
|
current_history = history + [[message, partial_message]] |
|
yield "", current_history |
|
|
|
|
|
chat_history.add_conversation(message, partial_message) |
|
|
|
except Exception as e: |
|
error_msg = f"β μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" |
|
chat_history.add_conversation(message, error_msg) |
|
yield "", history + [[message, error_msg]] |
|
|
|
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", title="GiniGEN π€") as demo: |
|
|
|
initial_history = chat_history.format_for_display() |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
value=initial_history, |
|
height=600, |
|
label="λνμ°½ π¬", |
|
show_label=True |
|
) |
|
|
|
|
|
msg = gr.Textbox( |
|
label="λ©μμ§ μ
λ ₯", |
|
show_label=False, |
|
placeholder="무μμ΄λ λ¬Όμ΄λ³΄μΈμ... π", |
|
container=False |
|
) |
|
with gr.Row(): |
|
clear = gr.ClearButton([msg, chatbot], value="λνλ΄μ© μ§μ°κΈ°") |
|
send = gr.Button("보λ΄κΈ° π€") |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### GiniGEN Medi π€ [νμΌ μ
λ‘λ] π\nμ§μ νμ: ν
μ€νΈ, μ½λ, CSV, Parquet νμΌ") |
|
file_upload = gr.File( |
|
label="νμΌ μ ν", |
|
file_types=["text", ".csv", ".parquet"], |
|
type="filepath" |
|
) |
|
|
|
with gr.Accordion("κ³ κΈ μ€μ βοΈ", open=False): |
|
system_message = gr.Textbox(label="μμ€ν
λ©μμ§ π", value="") |
|
max_tokens = gr.Slider(minimum=1, maximum=8000, value=4000, label="μ΅λ ν ν° μ π") |
|
temperature = gr.Slider(minimum=0, maximum=1, value=0.7, label="μ°½μμ± μμ€ π‘οΈ") |
|
top_p = gr.Slider(minimum=0, maximum=1, value=0.9, label="μλ΅ λ€μμ± π") |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
["μΌλ°μ μΈ κ±΄κ° κ΄λ¦¬ μ‘°μΈμ ν΄μ£ΌμΈμ. π₯"], |
|
["κ³ νμ μ¦μμ λν΄ μ€λͺ
ν΄μ£ΌμΈμ. π¬"], |
|
["건κ°ν μνμ΅κ΄μ λν΄ μλ €μ£ΌμΈμ. πͺ"], |
|
["μ½λ‘λ19 μλ°©μμΉμ μλ €μ£ΌμΈμ. π¦ "], |
|
["μ€νΈλ μ€ κ΄λ¦¬ λ°©λ²μ μΆμ²ν΄μ£ΌμΈμ. π§ββοΈ"], |
|
], |
|
inputs=msg, |
|
) |
|
|
|
def clear_chat(): |
|
chat_history.clear_history() |
|
return None, None |
|
|
|
|
|
msg.submit( |
|
chat, |
|
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
send.click( |
|
chat, |
|
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
clear.click( |
|
clear_chat, |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
|
|
file_upload.change( |
|
lambda: "νμΌ λΆμμ μμν©λλ€...", |
|
outputs=msg |
|
).then( |
|
chat, |
|
inputs=[msg, chatbot, file_upload, system_message, max_tokens, temperature, top_p], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |