File size: 5,276 Bytes
4e46bd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import requests
import fitz  # PyMuPDF for PDF reading
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from huggingface_hub import InferenceClient

# πŸ”Ή Define Directories and Chunk Size
APP_DIR = "./"  # Root app folder
PDF_DIR = "./pdfs"  # Where PDFs will be stored
CHUNK_SIZE = 2500  # Larger chunks for better context

# πŸ”Ή Ensure PDF Directory Exists
os.makedirs(PDF_DIR, exist_ok=True)

# πŸ”Ή Function to Auto-Detect PDFs in App Folder & Generate Download Links
def get_pdf_links():
    base_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/"
    pdf_links = {}

    for file in os.listdir(APP_DIR):
        if file.endswith(".pdf"):
            encoded_file = requests.utils.quote(file)  # Encode spaces correctly
            pdf_links[file] = f"{base_url}{encoded_file}?download=true"

    return pdf_links

# πŸ”Ή Get List of PDFs & Their Download Links
PDF_FILES = get_pdf_links()

# πŸ”Ή Function to Download PDFs
def download_pdfs():
    for filename, url in PDF_FILES.items():
        pdf_path = os.path.join(PDF_DIR, filename)
        if not os.path.exists(pdf_path):
            print(f"πŸ“₯ Downloading {filename}...")
            try:
                response = requests.get(url, stream=True)
                response.raise_for_status()  # Ensure the request was successful
                
                with open(pdf_path, "wb") as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                
                print(f"βœ… Successfully downloaded {filename}")
            except Exception as e:
                print(f"❌ Error downloading {filename}: {e}")

    print("βœ… All PDFs downloaded.")

# πŸ”Ή Function to Extract Text from PDFs
def extract_text_from_pdfs():
    all_text = ""
    for pdf_file in os.listdir(PDF_DIR):
        if pdf_file.endswith(".pdf"):
            pdf_path = os.path.join(PDF_DIR, pdf_file)
            doc = fitz.open(pdf_path)
            for page in doc:
                all_text += page.get_text("text") + "\n"
    
    return all_text

# πŸ”Ή Initialize FAISS and Embed Text
def initialize_faiss():
    download_pdfs()
    text_data = extract_text_from_pdfs()

    if not text_data:
        raise ValueError("❌ No text extracted from PDFs!")

    # Split text into chunks
    chunks = [text_data[i:i+CHUNK_SIZE] for i in range(0, len(text_data), CHUNK_SIZE)]

    # Generate embeddings
    model = SentenceTransformer("multi-qa-mpnet-base-dot-v1")
    embeddings = np.array([model.encode(chunk) for chunk in chunks])

    # Create FAISS index
    index = faiss.IndexFlatL2(embeddings.shape[1])
    index.add(embeddings)

    print("βœ… FAISS index initialized.")
    
    return index, chunks

# πŸ”Ή Initialize FAISS on Startup
index, chunks = initialize_faiss()

# πŸ”Ή Function to Search FAISS
def search_policy(query, top_k=3):
    query_embedding = SentenceTransformer("multi-qa-mpnet-base-dot-v1").encode(query).reshape(1, -1)
    distances, indices = index.search(query_embedding, top_k)
    
    return "\n\n".join([chunks[i] for i in indices[0] if i < len(chunks)])

# πŸ”Ή Hugging Face LLM Client
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")

# πŸ”Ή Function to Handle Chat Responses
def respond(message, history, system_message, max_tokens, temperature, top_p):
    messages = [{"role": "system", "content": system_message}]

    for val in history:
        if val[0]:
            messages.append({"role": "user", "content": val[0]})
        if val[1]:
            messages.append({"role": "assistant", "content": val[1]})

    # πŸ”Ή Retrieve relevant policy info from FAISS
    policy_context = search_policy(message)

    if policy_context:
        messages.append({"role": "assistant", "content": f"πŸ“„ **Relevant Policy Context:**\n\n{policy_context}"})

        user_query_with_context = f"""
        The following is the most relevant policy information retrieved from the official Colorado public assistance policies:

        {policy_context}

        Based on this information, answer the following question:
        {message}
        """
        messages.append({"role": "user", "content": user_query_with_context})
    else:
        messages.append({"role": "user", "content": message})

    response = ""
    for message in client.chat_completion(
        messages,
        max_tokens=max_tokens,
        stream=True,
        temperature=temperature,
        top_p=top_p,
    ):
        token = message.choices[0].delta.content
        response += token
        yield response

# πŸ”Ή Gradio Chat Interface
demo = gr.ChatInterface(
    respond,
    additional_inputs=[
        gr.Textbox(
            value="You are a knowledgeable chatbot designed to assist Colorado case workers with Medicaid, SNAP, TANF, CHP+, and other programs.",
            label="System message"
        ),
        gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
        gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
        gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
    ],
)

if __name__ == "__main__":
    demo.launch()