|
import asyncio |
|
import gradio as gr |
|
from mcp import ClientSession |
|
from mcp.client.sse import sse_client |
|
from langchain_mcp_adapters.tools import load_mcp_tools |
|
from langgraph.prebuilt import create_react_agent |
|
import traceback |
|
from typing import List, Tuple, Optional |
|
from pprint import pprint |
|
|
|
class MCPChatbot: |
|
def __init__(self): |
|
self.session: Optional[ClientSession] = None |
|
self.agent = None |
|
self.tools = None |
|
self.sse_url = "http://127.0.0.1:7860/gradio_api/mcp/sse" |
|
self.is_initialized = False |
|
|
|
async def initialize(self): |
|
"""MCP μ°κ²° λ° μμ΄μ νΈ μ΄κΈ°ν""" |
|
try: |
|
|
|
self.sse_client_context = sse_client(self.sse_url) |
|
read, write = await self.sse_client_context.__aenter__() |
|
|
|
|
|
self.session_context = ClientSession(read, write) |
|
self.session = await self.session_context.__aenter__() |
|
|
|
|
|
await self.session.initialize() |
|
|
|
|
|
self.tools = await load_mcp_tools(self.session) |
|
tool_names = [tool.name for tool in self.tools] |
|
print(f"λ‘λλ λꡬλ€: {tool_names}") |
|
|
|
|
|
self.agent = create_react_agent("openai:gpt-4o", self.tools) |
|
self.is_initialized = True |
|
|
|
return f"β
μ΄κΈ°ν μλ£! μ¬μ© κ°λ₯ν λꡬ: {', '.join(tool_names)}" |
|
|
|
except Exception as e: |
|
error_msg = f"β μ΄κΈ°ν μ€ν¨: {str(e)}\n{traceback.format_exc()}" |
|
print(error_msg) |
|
return error_msg |
|
|
|
async def cleanup(self): |
|
"""리μμ€ μ 리""" |
|
try: |
|
if hasattr(self, 'session_context') and self.session_context: |
|
await self.session_context.__aexit__(None, None, None) |
|
if hasattr(self, 'sse_client_context') and self.sse_client_context: |
|
await self.sse_client_context.__aexit__(None, None, None) |
|
self.is_initialized = False |
|
except Exception as e: |
|
print(f"μ 리 μ€ μ€λ₯: {e}") |
|
|
|
async def chat(self, message: str) -> str: |
|
"""λ©μμ§ μ²λ¦¬ λ° μλ΅ μμ±""" |
|
if not self.is_initialized: |
|
return "β λ¨Όμ 'μ΄κΈ°ν' λ²νΌμ ν΄λ¦ν΄μ£ΌμΈμ!" |
|
|
|
try: |
|
|
|
response = await self.agent.ainvoke({ |
|
"messages": [{"role": "user", "content": message}] |
|
}) |
|
pprint(response) |
|
|
|
|
|
if "messages" in response and response["messages"]: |
|
last_message = response["messages"][-1] |
|
if hasattr(last_message, 'content'): |
|
return last_message.content |
|
elif isinstance(last_message, dict) and 'content' in last_message: |
|
return last_message['content'] |
|
else: |
|
return str(last_message) |
|
else: |
|
return "μλ΅μ λ°μ§ λͺ»νμ΅λλ€." |
|
|
|
except Exception as e: |
|
error_msg = f"β μ€λ₯ λ°μ: {str(e)}\n{traceback.format_exc()}" |
|
print(error_msg) |
|
return error_msg |
|
|
|
|
|
chatbot = MCPChatbot() |
|
|
|
|
|
def initialize_chatbot(): |
|
"""μ±λ΄ μ΄κΈ°ν""" |
|
try: |
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
return loop.run_until_complete(chatbot.initialize()) |
|
|
|
def process_message(message: str, history: List[Tuple[str, str]]) -> Tuple[List[Tuple[str, str]], str]: |
|
"""λ©μμ§ μ²λ¦¬""" |
|
try: |
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
response = loop.run_until_complete(chatbot.chat(message)) |
|
|
|
|
|
history.append((message, response)) |
|
|
|
return history, "" |
|
|
|
def cleanup_chatbot(): |
|
"""μ±λ΄ μ 리""" |
|
try: |
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
loop.run_until_complete(chatbot.cleanup()) |
|
return "μ 리 μλ£" |
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks(title="MCP μ±λ΄", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π€ MCP λꡬ μ°λ μ±λ΄") |
|
gr.Markdown("MCP(Model Context Protocol) λꡬλ€μ μ¬μ©ν μ μλ AI μ±λ΄μ
λλ€.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
|
|
chatbot_interface = gr.Chatbot( |
|
label="λν", |
|
height=500, |
|
show_copy_button=True |
|
) |
|
|
|
with gr.Row(): |
|
msg_input = gr.Textbox( |
|
placeholder="λ©μμ§λ₯Ό μ
λ ₯νμΈμ... (μ: (3 + 5) x 12λ λμΌ?)", |
|
label="λ©μμ§", |
|
scale=4 |
|
) |
|
send_btn = gr.Button("μ μ‘", variant="primary", scale=1) |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
"(3 + 5) x 12λ λμΌ?", |
|
"1234μ μμΈμλΆν΄λ₯Ό ν΄μ€", |
|
"μ€λ λ μ¨λ μ΄λ?", |
|
"μλ
νμΈμ!" |
|
], |
|
inputs=msg_input |
|
) |
|
|
|
with gr.Column(scale=1): |
|
|
|
gr.Markdown("### π οΈ μ μ΄ ν¨λ") |
|
|
|
init_btn = gr.Button("π μ΄κΈ°ν", variant="secondary") |
|
init_status = gr.Textbox( |
|
label="μ΄κΈ°ν μν", |
|
value="μ΄κΈ°νκ° νμν©λλ€", |
|
interactive=False |
|
) |
|
|
|
cleanup_btn = gr.Button("π§Ή μ 리", variant="secondary") |
|
cleanup_status = gr.Textbox( |
|
label="μ 리 μν", |
|
interactive=False |
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
### π μ¬μ©λ² |
|
1. **μ΄κΈ°ν** λ²νΌμ ν΄λ¦νμ¬ MCP μλ²μ μ°κ²° |
|
2. μ΄κΈ°νκ° μλ£λλ©΄ λ©μμ§λ₯Ό μ
λ ₯νμ¬ λν μμ |
|
3. μν κ³μ°, λ μ¨ μ 보 λ± λ€μν μ§λ¬Έ κ°λ₯ |
|
4. μ¬μ© μλ£ ν **μ 리** λ²νΌμΌλ‘ μ°κ²° μ’
λ£ |
|
|
|
### π§ μ¬μ© κ°λ₯ν κΈ°λ₯ |
|
- μν κ³μ° λ° μμΈμλΆν΄ |
|
- λ μ¨ μ 보 μ‘°ν |
|
- μ΄λ―Έμ§ μ²λ¦¬ (λ°©ν₯ νμΈ, μΈνΌμ νν°) |
|
- κΈ°ν MCP λκ΅¬λ€ |
|
""") |
|
|
|
|
|
init_btn.click( |
|
initialize_chatbot, |
|
outputs=init_status |
|
) |
|
|
|
cleanup_btn.click( |
|
cleanup_chatbot, |
|
outputs=cleanup_status |
|
) |
|
|
|
|
|
def handle_submit(message, history): |
|
if not message.strip(): |
|
return history, "" |
|
return process_message(message, history) |
|
|
|
send_btn.click( |
|
handle_submit, |
|
inputs=[msg_input, chatbot_interface], |
|
outputs=[chatbot_interface, msg_input] |
|
) |
|
|
|
msg_input.submit( |
|
handle_submit, |
|
inputs=[msg_input, chatbot_interface], |
|
outputs=[chatbot_interface, msg_input] |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch( |
|
share=False, |
|
) |