Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
# 財政部財政資訊中心 江信宗 | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
from langchain_community.utils import user_agent | |
from langchain_groq import ChatGroq | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain.chains import RetrievalQA | |
from langchain_community.document_loaders import WebBaseLoader, TextLoader | |
from langchain.prompts import PromptTemplate | |
from langchain.schema import Document | |
import gradio as gr | |
from openai import OpenAI | |
import re | |
import time | |
import requests | |
def load_documents(sources): | |
documents = [] | |
for source in sources: | |
try: | |
if isinstance(source, str): | |
if source.startswith('http'): | |
loader = WebBaseLoader(source) | |
else: | |
loader = TextLoader(source) | |
documents.extend(loader.load()) | |
elif isinstance(source, dict): | |
documents.append(Document(page_content=source['content'], metadata=source.get('metadata', {}))) | |
except Exception as e: | |
print(f"Error loading source {source}: {str(e)}") | |
return documents | |
sources = [ | |
# "TaxQADataSet_Slim1.txt", | |
# "TaxQADataSet_Slim2.txt", | |
# "TaxQADataSet_Slim3.txt", | |
# "TaxQADataSet_Slim4.txt", | |
# "TaxQADataSet_Slim5.txt", | |
# "TaxQADataSet_Slim6.txt", | |
# "TaxQADataSet_ntpc1.txt", | |
# "TaxQADataSet_ntpc2.txt", | |
# "TaxQADataSet_kctax.txt", | |
# "TaxQADataSet_chutax.txt", | |
# "LandTaxAct1100623.txt", | |
# "TheEnforcementRulesoftheLandTaxAct1100923.txt", | |
# "HouseTaxAct1130103.txt", | |
# "VehicleLicenseTaxAct1101230.txt", | |
# "TaxCollectionAct1101217.txt", | |
# "AmusementTaxAct960523.txt", | |
# "StampTaxAct910515.txt", | |
"DeedTaxAct990505.txt" | |
] | |
documents = load_documents(sources) | |
print(f"\n成功載入 {len(documents)} 個網址或檔案") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=512, | |
chunk_overlap=50, | |
length_function=len, | |
is_separator_regex=False, | |
separators=["\n\n\n","\n\n", "\n", "。"] | |
) | |
split_docs = text_splitter.split_documents(documents) | |
print(f"分割後的文件數量:{len(split_docs)}") | |
embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
print(f"\n成功初始化 Microsoft 嵌入型") | |
print(f"\n開始建立向量資料庫") | |
vectorstore = Chroma.from_documents(split_docs, embeddings, persist_directory="./Knowledge-base") | |
print(f"成功建立 Chroma 向量資料庫,共有 {len(split_docs)} 個文檔") | |
retriever = vectorstore.as_retriever( | |
search_type="mmr", | |
search_kwargs={ | |
"k": min(4, len(split_docs)), | |
"fetch_k": min(20, len(split_docs)), | |
"lambda_mult": 0.8 | |
} | |
) | |
print(f"檢索演算法:Maximum Marginal Relevance Retrieval") | |
print(f"檢索文檔數量:k={min(4, len(split_docs))}, fetch_k={min(20, len(split_docs))}") | |
template = """Let's work this out in a step by step way to be sure we have the right answer. Must reply to me in Taiwanese Traditional Chinese. | |
在回答之前,請仔細分析檢索到的上下文,確保你的回答準確完整反映了上下文中的訊息,而不是依賴先前的知識,在回應的答案中絕對不要提到是根據上下文回答。 | |
如果檢索到的多個上下文之間存在聯繫,請整合這些訊息以提供更全面的回答,但要避免過度推斷。 | |
如果檢索到的上下文不包含足夠回答問題的訊息,請誠實的說明,不要試圖編造答案。 | |
上下文: {context} | |
問題: {question} | |
答案:""" | |
PROMPT = PromptTemplate( | |
template=template, input_variables=["context", "question"] | |
) | |
def create_chain(llm): | |
return RetrievalQA.from_chain_type( | |
llm=llm, | |
chain_type="stuff", | |
retriever=retriever, | |
return_source_documents=True, | |
chain_type_kwargs={"prompt": PROMPT} | |
) | |
print(f"成功建立 RAG Chain") | |
def initialize_llm(api_key): | |
return ChatGroq( | |
groq_api_key=api_key, | |
model_name='llama-3.1-70b-versatile' | |
) | |
def generate_insight_questions(query, api_key): | |
llm = initialize_llm(api_key) | |
prompt = f"""Let's work this out in a step by step way to be sure we have the right answer. Must reply to me in "Traditional Chinese". | |
根據以下回答,生成3個相關的洞察問題: | |
原始問題: {query} | |
請提供3個簡短但有深度的問題,這些問題應該符合: | |
1. 與原始問題緊密相關 | |
2. 準確重新描述原始問題 | |
3. 引導更深入的解決原始問題 | |
請直接列出這3個問題,每個問題一行,不要添加編號或其他文字。 | |
""" | |
try: | |
response = llm.invoke(prompt) | |
if hasattr(response, 'content'): | |
questions = response.content.split('\n') | |
else: | |
questions = str(response).split('\n') | |
while len(questions) < 3: | |
questions.append("提供更多地方稅資訊") | |
return questions[:3] | |
except Exception as e: | |
print(f"Error generating insight questions: {str(e)}") | |
return ["提供更多地方稅資訊", "提供其他地方稅問題", "還想了解什麼地方稅目"] | |
def answer_question(query, api_key): | |
try: | |
gr.Info("檢索地方稅知識庫中......") | |
llm = initialize_llm(api_key) | |
chain = create_chain(llm) | |
result = chain.invoke({"query": query}) | |
answer = result["result"] | |
insight_questions = generate_insight_questions(query, api_key) | |
while len(insight_questions) < 3: | |
insight_questions.append("提供更多地方稅資訊") | |
return answer, insight_questions[:3] | |
except Exception as e: | |
return f"抱歉,處理您的問題時發生錯誤:{str(e)}", [] | |
def split_questions(query): | |
questions = re.split(r'[?!。 ]', query) | |
return [q.strip() for q in questions if q.strip()] | |
def answer_multiple_questions(query, api_key): | |
questions = split_questions(query) | |
all_answers = [] | |
all_insight_questions = [] | |
for question in questions: | |
answer, insight_questions = answer_question(question, api_key) | |
if len(questions) > 1: | |
all_answers.append(f"【問題】{question}\n答案:{answer}") | |
else: | |
all_answers.append(answer) | |
all_insight_questions.extend(insight_questions) | |
if len(questions) > 1: | |
combined_answer = "\n\n\n".join(all_answers) | |
else: | |
combined_answer = "\n".join(all_answers) | |
selected_insight_questions = all_insight_questions[:3] | |
return combined_answer, selected_insight_questions | |
def get_tax_law(tax_type): | |
tax_law_dict = { | |
"房屋稅": "房屋稅條例", | |
"地價稅": "土地稅法", | |
"契稅": "契稅條例", | |
"娛樂稅": "娛樂稅法", | |
"印花稅": "印花稅法", | |
"牌照稅": "使用牌照稅法", | |
"稅捐稽徵法": "稅捐稽徵法" | |
} | |
return tax_law_dict.get(tax_type, "無稅法") | |
def fetch_law_summary(tax_law, keywords): | |
url = "https://ttc.mof.gov.tw/Api/GetData" | |
headers = { | |
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" | |
} | |
gr.Info("檢索法令彙編函釋中......") | |
all_results = [] | |
for keyword in keywords: | |
payload = { | |
"FunctionID": "FB10001", | |
"ObjParams[TaxAct]": tax_law, | |
"ObjParams[TaxVer]": "請選擇", | |
"ObjParams[Chapter]": "請選擇", | |
"ObjParams[Article]": "請選擇", | |
"ObjParams[Content]": keyword, | |
"ObjParams[Operator01]": "0", | |
"ObjParams[Content01]": "", | |
"ObjParams[Operator02]": "0", | |
"ObjParams[Content02]": "" | |
} | |
try: | |
response = requests.post(url, data=payload, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
if data["Code"] == "1" and "Table" in data["Data"]: | |
all_results.extend(data["Data"]["Table"]) | |
except Exception as e: | |
print(f"Error fetching law summary for keyword '{keyword}': {str(e)}") | |
if all_results: | |
summary = "<h3>法令彙編函釋檢索結果:</h3>" | |
for index, result in enumerate(all_results[:20]): # Limit to first 20 results across all keywords | |
summary += f""" | |
<details> | |
<summary style="cursor: pointer; color: #0066cc;">{result['Title']}</summary> | |
<p>{result['Content']}</p> | |
</details> | |
""" | |
return summary | |
else: | |
return "<p>未找到相關法令彙編函釋。</p>" | |
def llm_openai_api(query, answer): | |
client = OpenAI( | |
api_key=os.environ.get("YOUR_API_TOKEN"), | |
base_url="https://api.sambanova.ai/v1", | |
) | |
user_prompt = f""" | |
「題目:{query} | |
答案:{answer}」 | |
請詳細分析答案內容後,依據與題目相關性最高的稅目名稱及最多3個重點關鍵字回應我,問題與答案中的稅目名稱列入TaxName,關鍵字列入KeyWord,只須根據格式回應,不要寫其他的。 | |
# 回應字典格式範例: | |
{{"TaxName": "地價稅", "KeyWord": "宿舍用地,醫護人員"}} | |
""" | |
try: | |
response = client.chat.completions.create( | |
model='Meta-Llama-3.1-405B-Instruct', | |
messages=[ | |
{"role": "system", "content": "Must reply to user in Traditional Chinese."}, | |
{"role": "user", "content": user_prompt} | |
], | |
temperature=0.7, | |
top_p=1 | |
) | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
print(f"請輸入正確的 API Key!Error: {str(e)}") | |
return '{"TaxName": "", "KeyWord": ""}' | |
def handle_interaction(query, api_key, state): | |
gr.Info("開始處理問題,請稍待片刻......") | |
start_time = time.time() | |
if state is None: | |
state = {"history": []} | |
if not api_key: | |
api_key = os.getenv("YOUR_API_KEY") | |
query = convert_punctuation(query) | |
answer, insight_questions = answer_multiple_questions(query, api_key) | |
api_response = llm_openai_api(query, answer) | |
tax_name = "" | |
keywords = [] | |
print(api_response) | |
try: | |
response_dict = eval(api_response) | |
tax_name = response_dict.get("TaxName", "") | |
keywords = response_dict.get("KeyWord", "").split(",") | |
except: | |
print("Error parsing api_response") | |
tax_law = get_tax_law(tax_name) | |
law_summary_content = fetch_law_summary(tax_law, keywords) | |
state["history"].append((query, answer)) | |
while len(insight_questions) < 3: | |
insight_questions.append("提供更多地方稅資訊") | |
end_time = time.time() | |
gr.Info(f"Model 已答覆,執行時間: {(end_time - start_time):.2f} 秒。") | |
return answer, insight_questions[0], insight_questions[1], insight_questions[2], state, query, law_summary_content | |
def convert_punctuation(text): | |
return text.replace('?', '?').replace(',', ',').replace('!', '!').replace(' ', ' ') | |
def clear_outputs(): | |
return "", "", gr.update(value="", visible=False) | |
custom_css = """ | |
.query-input { | |
background-color: #B7E0FF !important; | |
padding: 15px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.query-input textarea { | |
font-size: 18px !important; | |
background-color: #ffffff; | |
border: 1px solid #f0f8ff; | |
border-radius: 8px; | |
} | |
.answer-box { | |
background-color: #FFF5CD !important; | |
padding: 10px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.answer-box textarea { | |
font-size: 18px !important; | |
background-color: #ffffff; | |
border: 1px solid #f0f8ff; | |
border-radius: 8px; | |
} | |
.center-text { | |
text-align: center !important; | |
color: #ff4081; | |
text-shadow: 2px 2px 4px rgba(0,0,0,0.1); | |
margin-bottom: 0 !important; | |
} | |
#submit-btn { | |
border-radius: 10px !important; | |
border: none !important; | |
background-color: #ff4081 !important; | |
color: white !important; | |
font-weight: bold !important; | |
transition: all 0.3s ease !important; | |
margin: 0 !important; | |
} | |
#submit-btn:hover { | |
background-color: #f50057 !important; | |
transform: scale(1.05); | |
} | |
.insight-btn { | |
border-radius: 10px !important; | |
border: none !important; | |
background-color: #00bcd4 !important; | |
} | |
.insight-btn:hover { | |
background-color: #00acc1 !important; | |
} | |
.gr-form { | |
background-color: #e8f5e9 !important; | |
padding: 15px !important; | |
border-radius: 10px !important; | |
} | |
.api-key-input { | |
background-color: #FFCFB3 !important; | |
padding: 15px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.text-background { | |
font-size: 18px !important; | |
padding: 5px !important; | |
border-radius: 10px !important; | |
border: 2px solid #B7E0FF !important; | |
margin: 0 !important; | |
} | |
.clear-button { | |
color: white !important; | |
background-color: #000000 !important; | |
padding: 5px !important; | |
border-radius: 10px !important; | |
margin: 0 !important; | |
} | |
.clear-button:hover { | |
background-color: #000000 !important; | |
transform: scale(1.05); | |
} | |
""" | |
with gr.Blocks(theme=gr.themes.Soft(), css=custom_css) as iface: | |
gr.Markdown(""" | |
# 地方稅知識庫系統 - 財政部財政資訊中心 | |
> ### **※ RAG-based 系統部署:江信宗,LLM:Llama-3.1-70B,以地方稅極少知識資料示範,僅供參考,準確資訊請依據地方稅稽徵機關回覆為準。** | |
""", elem_classes="center-text") | |
with gr.Row(): | |
query_input = gr.Textbox(label="輸入您的問題,系統將基於學習到的知識資料提供相關答案。", placeholder="請輸入您的問題(支援同時輸入多個問題,例如:問題1?問題2?)", autofocus=True, scale=3, max_lines=5, elem_classes="query-input") | |
api_key_input = gr.Textbox(label="請輸入您的 API Key", type="password", placeholder="API authentication key", scale=1, elem_classes="api-key-input") | |
answer_output = gr.Textbox(label="知識庫答案", interactive=False, max_lines=40, elem_classes="answer-box") | |
with gr.Row(): | |
insight_q1 = gr.Button("洞察問題 1", visible=False, elem_classes=["insight-btn"]) | |
insight_q2 = gr.Button("洞察問題 2", visible=False, elem_classes=["insight-btn"]) | |
insight_q3 = gr.Button("洞察問題 3", visible=False, elem_classes=["insight-btn"]) | |
state = gr.State() | |
current_question = gr.Textbox(lines=2, label="當前問題", visible=False) | |
law_summary = gr.HTML(label="法令彙編函釋檢索", elem_classes="text-background", visible=False) # Set initial visibility to False | |
with gr.Row(): | |
submit_btn = gr.Button("傳送", variant="primary", scale=3, elem_id="submit-btn") | |
clear_button = gr.Button("清除", variant="secondary", scale=1, elem_classes="clear-button") | |
gr.HTML( | |
""" | |
<span style="font-size: 18px; color: black;">※ 財政部各稅法令函釋檢索系統:</span><a href="https://ttc.mof.gov.tw/" title="財政部各稅法令函釋檢索系統" style="font-size: 18px; color: red;">https://ttc.mof.gov.tw/</a> | |
""" | |
) | |
def update_ui(answer, q1, q2, q3, state, current_q, law_summary): | |
return [ | |
answer, | |
gr.update(value=q1, visible=bool(q1)), | |
gr.update(value=q2, visible=bool(q2)), | |
gr.update(value=q3, visible=bool(q3)), | |
state, | |
current_q, | |
gr.update(value=law_summary, visible=bool(law_summary.strip())) | |
] | |
submit_btn.click( | |
fn=handle_interaction, | |
inputs=[query_input, api_key_input, state], | |
outputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary] | |
).then( | |
fn=update_ui, | |
inputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary], | |
outputs=[answer_output, insight_q1, insight_q2, insight_q3, state, current_question, law_summary] | |
) | |
for btn in [insight_q1, insight_q2, insight_q3]: | |
btn.click( | |
lambda x: x, | |
inputs=[btn], | |
outputs=[query_input] | |
) | |
def clear_outputs(): | |
return "", "", gr.update(value="", visible=False) | |
clear_button.click( | |
fn=clear_outputs, | |
inputs=[], | |
outputs=[query_input, answer_output, law_summary] | |
) | |
if __name__ == "__main__": | |
if "SPACE_ID" in os.environ: | |
iface.launch() | |
else: | |
iface.launch(share=True, show_api=False) | |