Spaces:
Running
Running
import time | |
import json | |
import requests | |
import streamlit as st | |
import os | |
st.set_page_config(page_title="ViBidLQA - Trợ lý AI hỗ trợ hỏi đáp luật Việt Nam", page_icon="./app/static/ai.jpg", layout="centered", initial_sidebar_state="collapsed") | |
# ==== MÔI TRƯỜNG OAuth ==== | |
FB_APP_ID = os.getenv("FB_APP_ID") | |
FB_APP_SECRET = os.getenv("FB_APP_SECRET") | |
FB_REDIRECT_URI = os.getenv("FB_REDIRECT_URI") | |
FB_CLIENT_URL = os.getenv("FB_CLIENT_URL", "https://www.facebook.com") | |
FB_API_URL = os.getenv("FB_API_URL", "https://graph.facebook.com") | |
# ==== MODULE URL ==== | |
routing_response_module = st.secrets["ViBidLQA_Routing_Module"] | |
retrieval_module = st.secrets["ViBidLQA_Retrieval_Module"] | |
reranker_module = st.secrets["ViBidLQA_Rerank_Module"] | |
abs_QA_module = st.secrets["ViBidLQA_AQA_Module"] | |
url_api_question_classify_model = f"{routing_response_module}/query_classify" | |
url_api_unrelated_question_response_model = f"{routing_response_module}/response_unrelated_question" | |
url_api_introduce_system_model = f"{routing_response_module}/about_me" | |
url_api_retrieval_model = f"{retrieval_module}/search" | |
url_api_reranker_model = f"{reranker_module}/rerank" | |
url_api_generation_model = f"{abs_QA_module}/answer" | |
with open("./static/styles.css") as f: | |
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) | |
# ============================= | |
# TAB 1: VIBIDLQA CHATBOT | |
# ============================= | |
with tab1: | |
if 'messages' not in st.session_state: | |
st.session_state.messages = [{'role': 'assistant', 'content': "Xin chào. Tôi là trợ lý AI văn bản luật Đấu thầu Việt Nam được phát triển bởi Nguyễn Trường Phúc và các cộng sự. Rất vui khi được hỗ trợ bạn trong các vấn đề pháp lý tại Việt Nam!"}] | |
st.markdown(f""" | |
<div class=logo_area> | |
<img src="./app/static/ai.jpg"/> | |
</div> | |
""", unsafe_allow_html=True) | |
st.markdown("<h2 style='text-align: center;'>ViBidLQA</h2>", unsafe_allow_html=True) | |
def classify_question(question): | |
data = { | |
"question": question | |
} | |
response = requests.post(url_api_question_classify_model, json=data) | |
if response.status_code == 200: | |
print(response) | |
return response | |
else: | |
return f"Lỗi: {response.status_code} - {response.text}" | |
def introduce_system(question): | |
data = { | |
"question": question | |
} | |
response = requests.post(url_api_introduce_system_model, json=data, stream=True) | |
if response.status_code == 200: | |
return response | |
else: | |
return f"Lỗi: {response.status_code} - {response.text}" | |
def response_unrelated_question(question): | |
data = { | |
"question": question | |
} | |
response = requests.post(url_api_unrelated_question_response_model, json=data, stream=True) | |
if response.status_code == 200: | |
return response | |
else: | |
return f"Lỗi: {response.status_code} - {response.text}" | |
def retrieve_context(question, top_k=10): | |
data = { | |
"query": question, | |
"top_k": top_k | |
} | |
response = requests.post(url_api_retrieval_model, json=data) | |
if response.status_code == 200: | |
results = response.json()["results"] | |
return results | |
else: | |
return f"Lỗi tại Retrieval Module: {response.status_code} - {response.text}" | |
def rerank_context(url_rerank_module, question, relevant_docs, top_k=5): | |
data = { | |
"question": question, | |
"relevant_docs": relevant_docs, | |
"top_k": top_k | |
} | |
response = requests.post(url_rerank_module, json=data) | |
if response.status_code == 200: | |
results = response.json()["reranked_docs"] | |
return results | |
else: | |
return f"Lỗi tại Rerank module: {response.status_code} - {response.text}" | |
def get_abstractive_answer(question): | |
retrieved_context = retrieve_context(question=question) | |
retrieved_context = [item['text'] for item in retrieved_context] | |
reranked_context = rerank_context(url_rerank_module=url_api_reranker_model, | |
question=question, | |
relevant_docs=retrieved_context, | |
top_k=5)[0] | |
data = { | |
"context": reranked_context, | |
"question": question | |
} | |
response = requests.post(url_api_generation_model, json=data, stream=True) | |
if response.status_code == 200: | |
return response | |
else: | |
return f"Lỗi: {response.status_code} - {response.text}" | |
def generate_text_effect(answer): | |
words = answer.split() | |
for i in range(len(words)): | |
time.sleep(0.03) | |
yield " ".join(words[:i+1]) | |
for message in st.session_state.messages: | |
if message['role'] == 'assistant': | |
avatar_class = "assistant-avatar" | |
message_class = "assistant-message" | |
avatar = './app/static/ai.jpg' | |
else: | |
avatar_class = "" | |
message_class = "user-message" | |
avatar = '' | |
st.markdown(f""" | |
<div class="{message_class}"> | |
<img src="{avatar}" class="{avatar_class}" /> | |
<div class="stMarkdown">{message['content']}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
if prompt := st.chat_input(placeholder='Tôi có thể giúp được gì cho bạn?'): | |
st.markdown(f""" | |
<div class="user-message"> | |
<div class="stMarkdown">{prompt}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
st.session_state.messages.append({'role': 'user', 'content': prompt}) | |
message_placeholder = st.empty() | |
full_response = "" | |
classify_result = classify_question(question=prompt).json() | |
print(f"The type of user query: {classify_result}") | |
if classify_result == "BIDDING_RELATED": | |
abs_answer = get_abstractive_answer(question=prompt) | |
if isinstance(abs_answer, str): | |
full_response = abs_answer | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown">{full_response}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
full_response = "" | |
for line in abs_answer.iter_lines(): | |
if line: | |
line = line.decode('utf-8') | |
if line.startswith('data: '): | |
data_str = line[6:] | |
if data_str == '[DONE]': | |
break | |
try: | |
data = json.loads(data_str) | |
token = data.get('token', '') | |
full_response += token | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown">{full_response}●</div> | |
</div> | |
""", unsafe_allow_html=True) | |
except json.JSONDecodeError: | |
pass | |
elif classify_result == "ABOUT_CHATBOT": | |
answer = introduce_system(question=prompt) | |
if isinstance(answer, str): | |
full_response = answer | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown">{full_response}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
full_response = "" | |
for line in answer.iter_lines(): | |
if line: | |
line = line.decode('utf-8') | |
if line.startswith('data: '): | |
data_str = line[6:] | |
if data_str == '[DONE]': | |
break | |
try: | |
data = json.loads(data_str) | |
token = data.get('token', '') | |
full_response += token | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown">{full_response}●</div> | |
</div> | |
""", unsafe_allow_html=True) | |
except json.JSONDecodeError: | |
pass | |
else: | |
answer = response_unrelated_question(question=prompt) | |
if isinstance(answer, str): | |
full_response = answer | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown">{full_response}</div> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
full_response = "" | |
for line in answer.iter_lines(): | |
if line: | |
line = line.decode('utf-8') | |
if line.startswith('data: '): | |
data_str = line[6:] | |
if data_str == '[DONE]': | |
break | |
try: | |
data = json.loads(data_str) | |
token = data.get('token', '') | |
full_response += token | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown">{full_response}●</div> | |
</div> | |
""", unsafe_allow_html=True) | |
except json.JSONDecodeError: | |
pass | |
message_placeholder.markdown(f""" | |
<div class="assistant-message"> | |
<img src="./app/static/ai.jpg" class="assistant-avatar" /> | |
<div class="stMarkdown"> | |
{full_response} | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
st.session_state.messages.append({'role': 'assistant', 'content': full_response}) | |
# ============================= | |
# TAB 2: FACEBOOK OAUTH | |
# ============================= | |
with tab2: | |
st.title("Facebook OAuth Integration") | |
# Định nghĩa hàm đăng ký webhook Facebook | |
def register_facebook_webhook(page_id: str, page_access_token: str): | |
try: | |
url = f"https://graph.facebook.com/v19.0/{page_id}/subscribed_apps" | |
params = { | |
"subscribed_fields": "messages,messaging_postbacks", | |
"access_token": page_access_token | |
} | |
response = requests.post(url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
if data.get("success"): | |
return True, "Đăng ký webhook thành công." | |
else: | |
return False, f"Facebook trả về lỗi: {data}" | |
except requests.exceptions.RequestException as e: | |
return False, f"Lỗi khi gọi Facebook API: {e}" | |
if "token" not in st.session_state: | |
params = { | |
"client_id": FB_APP_ID, | |
"redirect_uri": FB_REDIRECT_URI, | |
"scope": "pages_show_list,pages_manage_metadata,pages_messaging", | |
} | |
auth_url = f"{FB_CLIENT_URL}/dialog/oauth?{urlencode(params)}" | |
st.markdown("### Step 1: Đăng nhập Facebook") | |
st.markdown(f"[Bấm vào đây để đăng nhập Facebook]({auth_url})") | |
st.markdown("### Step 2: Dán URL sau khi đăng nhập") | |
redirect_input = st.text_input("Redirect URL (sau khi đăng nhập thành công)") | |
if redirect_input: | |
parsed_url = urlparse(redirect_input) | |
code = parse_qs(parsed_url.query).get("code", [None])[0] | |
if code: | |
try: | |
token_response = requests.get(f"{FB_API_URL}/oauth/access_token", params={ | |
"client_id": FB_APP_ID, | |
"redirect_uri": FB_REDIRECT_URI, | |
"client_secret": FB_APP_SECRET, | |
"code": code, | |
}) | |
token = token_response.json()["access_token"] | |
st.session_state.token = token | |
st.success("Lấy access token thành công!") | |
pages_response = requests.get(f"{FB_API_URL}/me/accounts", params={"access_token": token}) | |
pages = pages_response.json().get("data", []) | |
st.session_state.pages = pages | |
st.markdown("### Danh sách các Page bạn quản lý:") | |
for page in pages: | |
st.json(page) | |
except Exception as e: | |
st.error(f"Lỗi: {e}") | |
if "pages" in st.session_state and st.session_state.pages: | |
st.markdown("### Step 3: Đăng ký Webhook cho các page") | |
selected_pages = st.multiselect( | |
"Chọn các page để đăng ký webhook:", | |
options=[f"{p['name']} ({p['id']})" for p in st.session_state.pages] | |
) | |
if st.button("Đăng ký Webhook"): | |
selected_pages = st.session_state.selected_pages # Giả sử bạn có danh sách page đã chọn | |
for page in st.session_state.pages: | |
label = f"{page['name']} ({page['id']})" | |
if label in selected_pages: | |
page_id = page['id'] | |
page_access_token = page['access_token'] | |
# Gọi hàm đăng ký webhook | |
success, message = register_facebook_webhook(page_id, page_access_token) | |
if success: | |
st.success(f"✅ Đã đăng ký Webhook cho page: {page['name']}") | |
else: | |
st.warning(f"⚠️ Không thể đăng ký Webhook cho page: {page['name']} - {message}") | |
if st.button("Hiển thị Thông tin Trang"): | |
for page in st.session_state.pages: | |
page_id = page['id'] | |
page_name = page['name'] | |
page_access_token = page['access_token'] | |
# Hiển thị thông tin của từng page | |
st.write(f"**Page Name**: {page_name}") | |
st.write(f"**Page ID**: {page_id}") | |
st.write(f"**Page Access Token**: {page_access_token}") | |
st.write("---") # Dấu phân cách giữa các trang |