File size: 4,892 Bytes
93b0124
cbe5279
75f0c72
cfec7bd
 
d5b8fa3
cfec7bd
cbe5279
75f0c72
 
 
 
cbe5279
75f0c72
 
 
 
 
 
 
 
 
cbe5279
75f0c72
 
 
 
 
cbe5279
75f0c72
 
cbe5279
75f0c72
 
cbe5279
 
 
 
 
 
 
 
 
 
75f0c72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cfec7bd
 
 
 
 
75f0c72
cfec7bd
 
f073b54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75f0c72
f073b54
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import urllib.parse
import fitz  # PyMuPDF for PDF reading
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from huggingface_hub import hf_hub_download, InferenceClient

# πŸ”Ή Hugging Face Space Repository Details
HF_REPO_ID = "tstone87/ccr-colorado"

# πŸ”Ή Load Embedding Model (Optimized for QA Retrieval)
model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")

# πŸ”Ή Define PDF Directory and Chunk Size
PDF_DIR = "./pdfs"  # Local folder for downloaded PDFs
CHUNK_SIZE = 2500  # Larger chunks for better context

# πŸ”Ή Ensure Directory Exists
os.makedirs(PDF_DIR, exist_ok=True)

# πŸ”Ή Function to Download PDFs from Hugging Face Space (Handles Spaces)
def download_pdfs():
    pdf_files = [
        "SNAP 10 CCR 2506-1 .pdf",
        "Med 10 CCR 2505-10 8.100.pdf",
    ]

    for pdf_file in pdf_files:
        pdf_path = os.path.join(PDF_DIR, pdf_file)

        if not os.path.exists(pdf_path):  # Download if not already present
            print(f"πŸ“₯ Downloading {pdf_file}...")

            # URL encode spaces correctly
            encoded_filename = urllib.parse.quote(pdf_file)

            try:
                hf_hub_download(repo_id=HF_REPO_ID, filename=encoded_filename, local_dir=PDF_DIR, force_download=True)
                print(f"βœ… Successfully downloaded {pdf_file}")
            except Exception as e:
                print(f"❌ Error downloading {pdf_file}: {e}")

    print("βœ… All PDFs downloaded.")

# πŸ”Ή Function to Extract Text from PDFs
def extract_text_from_pdfs():
    all_text = ""
    for pdf_file in os.listdir(PDF_DIR):
        if pdf_file.endswith(".pdf"):
            pdf_path = os.path.join(PDF_DIR, pdf_file)
            doc = fitz.open(pdf_path)
            for page in doc:
                all_text += page.get_text("text") + "\n"
    
    return all_text

# πŸ”Ή Initialize FAISS and Embed Text
def initialize_faiss():
    download_pdfs()
    text_data = extract_text_from_pdfs()

    if not text_data:
        raise ValueError("❌ No text extracted from PDFs!")

    # Split text into chunks
    chunks = [text_data[i:i+CHUNK_SIZE] for i in range(0, len(text_data), CHUNK_SIZE)]

    # Generate embeddings
    embeddings = np.array([model.encode(chunk) for chunk in chunks])

    # Create FAISS index
    index = faiss.IndexFlatL2(embeddings.shape[1])
    index.add(embeddings)

    print("βœ… FAISS index initialized.")
    
    return index, chunks

# πŸ”Ή Initialize FAISS on Startup
index, chunks = initialize_faiss()

# πŸ”Ή Function to Search FAISS
def search_policy(query, top_k=3):
    query_embedding = model.encode(query).reshape(1, -1)
    distances, indices = index.search(query_embedding, top_k)
    
    return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])

# πŸ”Ή Hugging Face LLM Client
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")

# πŸ”Ή Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
    messages = [{"role": "system", "content": system_message}]

    for val in history:
        if val[0]:
            messages.append({"role": "user", "content": val[0]})
        if val[1]:
            messages.append({"role": "assistant", "content": val[1]})

    # πŸ”Ή Retrieve relevant policy info from FAISS
    policy_context = search_policy(message)

    if policy_context:
        messages.append({"role": "assistant", "content": f"πŸ“„ **Relevant Policy Context:**\n\n{policy_context}"})

        user_query_with_context = f"""
        The following is the most relevant policy information retrieved from the official Colorado public assistance policies:

        {policy_context}

        Based on this information, answer the following question:
        {message}
        """
        messages.append({"role": "user", "content": user_query_with_context})
    else:
        messages.append({"role": "user", "content": message})

    response = ""
    for message in client.chat_completion(
        messages,
        max_tokens=max_tokens,
        stream=True,
        temperature=temperature,
        top_p=top_p,
    ):
        token = message.choices[0].delta.content
        response += token
        yield response

# πŸ”Ή Gradio Chat Interface
demo = gr.ChatInterface(
    respond,
    additional_inputs=[
        gr.Textbox(
            value="You are a knowledgeable chatbot designed to assist Colorado case workers with Medicaid, SNAP, TANF, CHP+, and other programs.",
            label="System message"
        ),
        gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
        gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
        gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
    ],
)

if __name__ == "__main__":
    demo.launch()