File size: 5,090 Bytes
93b0124
a808742
75f0c72
cfec7bd
 
d5b8fa3
cfec7bd
a808742
75f0c72
 
a808742
75f0c72
 
 
 
 
092e878
a808742
092e878
 
a808742
cbe5279
092e878
a808742
 
 
 
 
cbe5279
a808742
092e878
a808742
 
 
 
 
 
cbe5279
a808742
cbe5279
75f0c72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a808742
75f0c72
 
 
 
 
 
 
 
 
 
 
 
cfec7bd
 
 
a808742
cfec7bd
75f0c72
cfec7bd
 
f073b54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75f0c72
f073b54
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import requests
import fitz  # PyMuPDF for PDF reading
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from huggingface_hub import InferenceClient

# πŸ”Ή Define PDF Directory and Chunk Size
PDF_DIR = "./pdfs"
CHUNK_SIZE = 2500  # Larger chunks for better context

# πŸ”Ή Ensure Directory Exists
os.makedirs(PDF_DIR, exist_ok=True)

# πŸ”Ή Direct URLs for PDF Downloads (with `?download=true`)
PDF_FILES = {
    "SNAP 10 CCR 2506-1.pdf": "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/SNAP%2010%20CCR%202506-1%20.pdf?download=true",
    "Med 10 CCR 2505-10 8.100.pdf": "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Med%2010%20CCR%202505-10%208.100.pdf?download=true",
}

# πŸ”Ή Function to Download PDFs Directly from Given URLs
def download_pdfs():
    for filename, url in PDF_FILES.items():
        pdf_path = os.path.join(PDF_DIR, filename)
        if not os.path.exists(pdf_path):
            print(f"πŸ“₯ Downloading {filename}...")
            try:
                response = requests.get(url, stream=True)
                response.raise_for_status()  # Ensure the request was successful
                
                with open(pdf_path, "wb") as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                
                print(f"βœ… Successfully downloaded {filename}")
            except Exception as e:
                print(f"❌ Error downloading {filename}: {e}")

    print("βœ… All PDFs downloaded.")

# πŸ”Ή Function to Extract Text from PDFs
def extract_text_from_pdfs():
    all_text = ""
    for pdf_file in os.listdir(PDF_DIR):
        if pdf_file.endswith(".pdf"):
            pdf_path = os.path.join(PDF_DIR, pdf_file)
            doc = fitz.open(pdf_path)
            for page in doc:
                all_text += page.get_text("text") + "\n"
    
    return all_text

# πŸ”Ή Initialize FAISS and Embed Text
def initialize_faiss():
    download_pdfs()
    text_data = extract_text_from_pdfs()

    if not text_data:
        raise ValueError("❌ No text extracted from PDFs!")

    # Split text into chunks
    chunks = [text_data[i:i+CHUNK_SIZE] for i in range(0, len(text_data), CHUNK_SIZE)]

    # Generate embeddings
    model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")
    embeddings = np.array([model.encode(chunk) for chunk in chunks])

    # Create FAISS index
    index = faiss.IndexFlatL2(embeddings.shape[1])
    index.add(embeddings)

    print("βœ… FAISS index initialized.")
    
    return index, chunks

# πŸ”Ή Initialize FAISS on Startup
index, chunks = initialize_faiss()

# πŸ”Ή Function to Search FAISS
def search_policy(query, top_k=3):
    query_embedding = SentenceTransformer("multi-qa-mpnet-base-dot-v1").encode(query).reshape(1, -1)
    distances, indices = index.search(query_embedding, top_k)
    
    return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])

# πŸ”Ή Hugging Face LLM Client
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")

# πŸ”Ή Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
    messages = [{"role": "system", "content": system_message}]

    for val in history:
        if val[0]:
            messages.append({"role": "user", "content": val[0]})
        if val[1]:
            messages.append({"role": "assistant", "content": val[1]})

    # πŸ”Ή Retrieve relevant policy info from FAISS
    policy_context = search_policy(message)

    if policy_context:
        messages.append({"role": "assistant", "content": f"πŸ“„ **Relevant Policy Context:**\n\n{policy_context}"})

        user_query_with_context = f"""
        The following is the most relevant policy information retrieved from the official Colorado public assistance policies:

        {policy_context}

        Based on this information, answer the following question:
        {message}
        """
        messages.append({"role": "user", "content": user_query_with_context})
    else:
        messages.append({"role": "user", "content": message})

    response = ""
    for message in client.chat_completion(
        messages,
        max_tokens=max_tokens,
        stream=True,
        temperature=temperature,
        top_p=top_p,
    ):
        token = message.choices[0].delta.content
        response += token
        yield response

# πŸ”Ή Gradio Chat Interface
demo = gr.ChatInterface(
    respond,
    additional_inputs=[
        gr.Textbox(
            value="You are a knowledgeable chatbot designed to assist Colorado case workers with Medicaid, SNAP, TANF, CHP+, and other programs.",
            label="System message"
        ),
        gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
        gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
        gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
    ],
)

if __name__ == "__main__":
    demo.launch()