Spaces:
Runtime error
Runtime error
import os | |
import PyPDF2 | |
from google.colab import userdata | |
from PyPDF2 import PdfReader | |
## Embedding model! | |
from langchain_huggingface import HuggingFaceEmbeddings | |
embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
import pandas as pd | |
folder_path = "./" | |
context_data = [] | |
# List all files in the folder | |
files = os.listdir(folder_path) | |
# Get list of CSV and Excel files | |
data_files = [f for f in files if f.endswith(('.csv', '.xlsx', '.xls'))] | |
# Process each file | |
for f, file in enumerate(data_files, 1): | |
print(f"\nProcessing file {f}: {file}") | |
file_path = os.path.join(folder_path, file) | |
try: | |
# Read the file based on its extension | |
if file.endswith('.csv'): | |
df = pd.read_csv(file_path) | |
else: | |
df = pd.read_excel(file_path) | |
# Extract non-empty values from column 2 and append them | |
context_data.extend(df.iloc[:, 2].dropna().astype(str).tolist()) | |
except Exception as e: | |
print(f"Error processing file {file}: {str(e)}") | |
def extract_text_from_pdf(pdf_path): | |
"""Extracts text from a PDF file.""" | |
try: | |
with open(pdf_path, "rb") as file: | |
reader = PyPDF2.PdfReader(file) | |
text = "".join(page.extract_text() or "" for page in reader.pages) # Handle None cases | |
return text | |
except Exception as e: | |
print(f"Error extracting text from {pdf_path}: {e}") | |
return "" | |
folder_path = "./" | |
# Initialize the list to hold the extracted text chunks | |
text_chunks = [] | |
# Get all PDF filenames in the folder | |
filenames = [f for f in os.listdir(folder_path) if f.lower().endswith(".pdf")] | |
# Process each PDF file | |
for index, file in enumerate(filenames, 1): | |
print(f"\nProcessing file {index}: {file}") | |
pdf_path = os.path.join(folder_path, file) | |
try: | |
# Extract text from the PDF | |
extracted_text = extract_text_from_pdf(pdf_path) | |
if extracted_text.strip(): # Ensure extracted text is not just whitespace | |
# Split extracted text into chunks of 1000 characters | |
chunks = [extracted_text[i:i+2000] for i in range(0, len(extracted_text), 2000)] | |
# Append extracted chunks to the list | |
text_chunks.extend(chunks) | |
else: | |
print(f"No text found in the PDF: {file}") | |
except Exception as e: | |
print(f"Error reading the PDF {file}: {e}") | |
from urllib.parse import urljoin, urlparse | |
import requests | |
from io import BytesIO | |
from bs4 import BeautifulSoup | |
from langchain_core.prompts import ChatPromptTemplate | |
import gradio as gr | |
def scrape_websites(base_urls): | |
try: | |
visited_links = set() # To avoid revisiting the same link | |
content_by_url = {} # Store content from each URL | |
for base_url in base_urls: | |
if not base_url.strip(): | |
continue # Skip empty or invalid URLs | |
print(f"Scraping base URL: {base_url}") | |
html_content = fetch_page_content(base_url) | |
if html_content: | |
cleaned_content = clean_body_content(html_content) | |
content_by_url[base_url] = cleaned_content | |
visited_links.add(base_url) | |
# Extract and process all internal links | |
soup = BeautifulSoup(html_content, "html.parser") | |
links = extract_internal_links(base_url, soup) | |
for link in links: | |
if link not in visited_links: | |
print(f"Scraping link: {link}") | |
page_content = fetch_page_content(link) | |
if page_content: | |
cleaned_content = clean_body_content(page_content) | |
content_by_url[link] = cleaned_content | |
visited_links.add(link) | |
# If the link is a PDF file, extract its content | |
if link.lower().endswith('.pdf'): | |
print(f"Extracting PDF content from: {link}") | |
pdf_content = extract_pdf_text(link) | |
if pdf_content: | |
content_by_url[link] = pdf_content | |
return content_by_url | |
except Exception as e: | |
print(f"Error during scraping: {e}") | |
return {} | |
def fetch_page_content(url): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
return response.text | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
def extract_internal_links(base_url, soup): | |
links = set() | |
for anchor in soup.find_all("a", href=True): | |
href = anchor["href"] | |
full_url = urljoin(base_url, href) | |
if is_internal_link(base_url, full_url): | |
links.add(full_url) | |
return links | |
def is_internal_link(base_url, link_url): | |
base_netloc = urlparse(base_url).netloc | |
link_netloc = urlparse(link_url).netloc | |
return base_netloc == link_netloc | |
def extract_pdf_text(pdf_url): | |
try: | |
response = requests.get(pdf_url) | |
response.raise_for_status() | |
# Open the PDF from the response content | |
with BytesIO(response.content) as file: | |
reader = PdfReader(file) | |
pdf_text = "" | |
for page in reader.pages: | |
pdf_text += page.extract_text() | |
return pdf_text if pdf_text else None | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching PDF {pdf_url}: {e}") | |
return None | |
except Exception as e: | |
print(f"Error reading PDF {pdf_url}: {e}") | |
return None | |
def clean_body_content(html_content): | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Remove scripts and styles | |
for script_or_style in soup(["script", "style"]): | |
script_or_style.extract() | |
# Get text and clean up | |
cleaned_content = soup.get_text(separator="\n") | |
cleaned_content = "\n".join( | |
line.strip() for line in cleaned_content.splitlines() if line.strip() | |
) | |
return cleaned_content | |
if __name__ == "__main__": | |
website = [ | |
"https://www.rib.gov.rw/index.php?id=371", | |
"https://haguruka.org.rw/our-work/" | |
] | |
all_content = scrape_websites(website) | |
# Temporary list to store (url, content) tuples | |
temp_list = [] | |
# Process and store each URL with its content | |
for url, content in all_content.items(): | |
temp_list.append((url, content)) | |
processed_texts = [] | |
# Process each element in the temporary list | |
for element in temp_list: | |
if isinstance(element, tuple): | |
url, content = element # Unpack the tuple | |
processed_texts.append(f"url: {url}, content: {content}") | |
elif isinstance(element, str): | |
processed_texts.append(element) | |
else: | |
processed_texts.append(str(element)) | |
def chunk_string(s, chunk_size=2000): | |
return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
# List to store the chunks | |
chunked_texts = [] | |
for text in processed_texts: | |
chunked_texts.extend(chunk_string(text)) | |
data = [] | |
data.extend(context_data) | |
data.extend([item for item in text_chunks if item not in data]) | |
data.extend([item for item in chunked_texts if item not in data]) | |
from langchain_community.vectorstores import Chroma | |
vectorstore = Chroma( | |
collection_name="GBV_dataset", | |
embedding_function=embed_model, | |
) | |
vectorstore.get().keys() | |
# add data to vector nstore | |
vectorstore.add_texts(data) | |
api= os.environ.get('V1') | |
from openai import OpenAI | |
from langchain_core.prompts import PromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
import gradio as gr | |
from typing import Iterator | |
import time | |
# Refined Template with Emotional Awareness | |
template = (""" | |
You are a friendly and empathetic chatbot designed to assist users in a conversational and human-like manner. Your goal is to provide accurate, helpful, and emotionally supportive responses based on the provided context: {context}. Follow these guidelines: | |
1. **Emotional Awareness** | |
- Acknowledge the user's emotions and respond with empathy. | |
- Use phrases like "I understand how you feel," "That sounds challenging," or "I'm here to support you." | |
- If the user expresses negative emotions, offer comfort and reassurance. | |
2. **Contextual Interaction** | |
- Begin with a warm and empathetic welcome message. | |
- Extract precise details from the provided context: {context}. | |
- Respond directly to the user's question: {question}. | |
- Remember the user's name is {first_name}. some time you can address it occasionally | |
3. **Communication Guidelines** | |
- Maintain a warm, conversational tone. | |
- Use occasional emojis for engagement (e.g., π, π,π, β€οΈ). | |
- Provide clear, concise, and emotionally supportive information. | |
4. **Response Strategies** | |
- Greet users naturally and ask about their wellbeing (e.g., "Welcome, {first_name}! π How are you feeling today?", "Hello {first_name}! π€ What's on your mind?"). | |
- Always start with a check-in about the user's wellbeing or current situation. | |
- Deliver only relevant information. | |
- Avoid generating content beyond the context. | |
- Handle missing information transparently. | |
5. **No Extra Content** | |
- If no information matches the user's request: | |
* Respond politely: "I don't have that information at the moment, {first_name}. π" | |
* Offer alternative assistance options. | |
- Strictly avoid generating unsupported content. | |
- Prevent information padding or speculation. | |
6. **Extracting Relevant Links** | |
- If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
- Example response: | |
- "Here is the link you requested, [URL]" | |
7. **Real-Time Awareness** | |
- Acknowledge the current context when appropriate. | |
- Stay focused on the user's immediate needs. | |
- If this is the first message, always ask how the user is feeling and what they would like help with today. | |
**Context:** {context} | |
**User's Question:** {question} | |
**Your Response:** | |
""") | |
rag_prompt = PromptTemplate.from_template(template) | |
retriever = vectorstore.as_retriever() | |
class OpenRouterLLM: | |
def __init__(self, key: str): | |
try: | |
self.client = OpenAI( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=key # Corrected from `key=getmod` | |
) | |
self.headers = { | |
"HTTP-Referer": "http://localhost:3000", | |
"X-Title": "Local Development" | |
} | |
except Exception as e: | |
print(f"Initialization error: {e}") | |
raise | |
def stream(self, prompt: str) -> Iterator[str]: | |
try: | |
completion = self.client.chat.completions.create( | |
model="deepseek/deepseek-r1-distill-llama-70b:free", | |
messages=[{"role": "user", "content": prompt}], | |
stream=True | |
) | |
for chunk in completion: | |
delta = chunk.choices[0].delta | |
if hasattr(delta, "content") and delta.content: | |
yield delta.content | |
except Exception as e: | |
yield f"Streaming error: {str(e)}" | |
class UserSession: | |
def __init__(self): | |
self.current_user = None | |
self.welcome_message = None | |
def set_user(self, user_info): | |
self.current_user = user_info | |
self.set_welcome_message(user_info.get("first_name", "Guest")) | |
def get_user(self): | |
return self.current_user | |
def set_welcome_message(self, first_name): | |
self.welcome_message = ( | |
f"<div style='font-size: 18px; font-weight: bold; color: #2E86C1;'>" | |
f"Welcome {first_name}! π</div>" | |
f"<div style='font-size: 14px; color: #34495E;'>" | |
f"We appreciate you reaching out to us. You are in a safe and trusted space designed to support you. " | |
f"Here, you can find guidance on gender-based violence (GBV) and legal assistance.<br><br>" | |
f"You donβt have to go through this aloneβwe are here to listen, support, and help you find the right solutions. " | |
f"You deserve to be heard and helped, and we are committed to standing by your side." | |
f"</div>" | |
) | |
def get_welcome_message(self): | |
return self.welcome_message | |
# Initialize session | |
user_session = UserSession() | |
# Store user details and handle session | |
def collect_user_info(first_name, last_name, phone): | |
if not first_name or not last_name or not phone: | |
return "All fields are required to proceed.", gr.update(visible=False), gr.update(visible=True), [] | |
# Validate phone number (basic validation) | |
if not phone.replace("+", "").replace("-", "").replace(" ", "").isdigit(): | |
return "Please enter a valid phone number.", gr.update(visible=False), gr.update(visible=True), [] | |
# Store user info for chat session | |
user_info = { | |
"first_name": first_name, | |
"last_name": last_name, | |
"phone": phone, | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
} | |
# Set user in session | |
user_session.set_user(user_info) | |
# Generate welcome message | |
welcome_message = user_session.get_welcome_message() | |
# Add initial message to start the conversation | |
chat_history = add_initial_message([(None, welcome_message)]) | |
# Return welcome message and update UI | |
return welcome_message, gr.update(visible=True), gr.update(visible=False), chat_history | |
# Add initial message to start the conversation | |
def add_initial_message(chatbot): | |
initial_message = ( | |
"<div style='font-size: 14px; font-weight: normal; color: #16A085;'>" | |
f"I just want to check in and see how you are doing." | |
f"If you are going through something, please know you are not alone, I am here for you, no matter what.π€" | |
"</div>" | |
) | |
return chatbot + [(None, initial_message)] | |
# Create RAG chain with user context | |
def create_rag_chain(retriever, template, api_key): | |
llm = OpenRouterLLM(api_key) | |
rag_prompt = PromptTemplate.from_template(template) | |
def stream_func(input_dict): | |
# Get context using the retriever's invoke method | |
context = retriever.invoke(input_dict["question"]) | |
context_str = "\n".join([doc.page_content for doc in context]) | |
# Get user info from the session | |
user_info = user_session.get_user() or {} | |
first_name = user_info.get("first_name", "User") | |
# Format prompt with user context | |
prompt = rag_prompt.format( | |
context=context_str, | |
question=input_dict["question"], | |
first_name=first_name | |
) | |
# Stream response | |
return llm.stream(prompt) | |
return stream_func | |
def rag_memory_stream(message, history): | |
# Initialize with empty response | |
partial_text = "" | |
# Get user context | |
user_info = user_session.get_user() | |
# Use the rag_chain with the question | |
for new_text in rag_chain({"question": message}): | |
partial_text += new_text | |
yield partial_text | |
# Gradio Interface Setup with improved UX | |
def chatbot_interface(): | |
# Get API key (in a real application, handle this more securely) | |
api_key = api # This should be properly defined or imported elsewhere | |
# Create the RAG chain with user context | |
global rag_chain | |
rag_chain = create_rag_chain(retriever, template, api_key) | |
# Create theme | |
theme = gr.themes.Soft( | |
primary_hue="indigo", | |
secondary_hue="blue", | |
) | |
with gr.Blocks(theme=theme, css=""" | |
.welcome-container { | |
text-align: center; | |
margin-bottom: 20px; | |
padding: 20px; | |
border-radius: 10px; | |
background-color: #f0f4ff; | |
} | |
.feedback-btn { margin-top: 10px; } | |
footer { margin-top: 30px; text-align: center; } | |
""") as demo: | |
# Welcome banner | |
gr.Markdown("# π€ Ijwi ry'Ubufasha - Your AI Assistant", elem_classes=["welcome-container"]) | |
# User registration section | |
registration_container = gr.Column(visible=True) | |
with registration_container: | |
gr.Markdown("### Please provide your details to start chatting") | |
with gr.Row(): | |
first_name = gr.Textbox( | |
label="First Name", | |
placeholder="Enter your first name", | |
scale=1 | |
) | |
last_name = gr.Textbox( | |
label="Last Name", | |
placeholder="Enter your last name", | |
scale=1 | |
) | |
phone = gr.Textbox( | |
label="Phone Number", | |
placeholder="Enter your phone number (e.g., +250...)", | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Start Chatting", variant="primary", scale=2) | |
response_message = gr.Markdown(elem_id="welcome-message") | |
# Chatbot section (initially hidden) | |
chatbot_container = gr.Column(visible=False) | |
with chatbot_container: | |
chat_interface = gr.ChatInterface( | |
fn=rag_memory_stream, | |
title="π€ Help Chatbot", | |
fill_height=True, | |
theme=theme | |
) | |
# Feedback buttons | |
with gr.Row(): | |
feedback_label = gr.Markdown("### Was this conversation helpful?") | |
with gr.Row(): | |
thumbs_up = gr.Button("π Yes, it was helpful", elem_classes=["feedback-btn"]) | |
thumbs_down = gr.Button("π No, it needs improvement", elem_classes=["feedback-btn"]) | |
# Footer with version info | |
gr.Markdown("Ijwi ry'Ubufasha v1.0.0 Β© 2025", elem_id="footer") | |
# Handle user registration | |
submit_btn.click( | |
collect_user_info, | |
inputs=[first_name, last_name, phone], | |
outputs=[response_message, chatbot_container, registration_container, chat_interface.chatbot] | |
) | |
# Handle feedback (placeholder functionality) | |
def record_feedback(feedback_type): | |
# Here you could log feedback to a file or database | |
feedback_message = f"Thank you for your feedback! We'll use it to improve our service." | |
return feedback_message | |
thumbs_up.click(lambda: record_feedback("positive"), outputs=feedback_label) | |
thumbs_down.click(lambda: record_feedback("negative"), outputs=feedback_label) | |
return demo | |
if __name__ == "__main__": | |
chatbot_interface().launch(share=True, inbrowser=True) |