Spaces:
Runtime error
Runtime error
import os | |
import PyPDF2 | |
from PyPDF2 import PdfReader | |
## Embedding model! | |
from langchain_huggingface import HuggingFaceEmbeddings | |
embed_model = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
import pandas as pd | |
folder_path = "./" | |
context_data = [] | |
# List all files in the folder | |
files = os.listdir(folder_path) | |
# Get list of CSV and Excel files | |
data_files = [f for f in files if f.endswith(('.csv', '.xlsx', '.xls'))] | |
# Process each file | |
for f, file in enumerate(data_files, 1): | |
print(f"\nProcessing file {f}: {file}") | |
file_path = os.path.join(folder_path, file) | |
try: | |
# Read the file based on its extension | |
if file.endswith('.csv'): | |
df = pd.read_csv(file_path) | |
else: | |
df = pd.read_excel(file_path) | |
# Extract non-empty values from column 2 and append them | |
context_data.extend(df.iloc[:, 2].dropna().astype(str).tolist()) | |
except Exception as e: | |
print(f"Error processing file {file}: {str(e)}") | |
# def extract_text_from_pdf(pdf_path): | |
# """Extracts text from a PDF file.""" | |
# try: | |
# with open(pdf_path, "rb") as file: | |
# reader = PyPDF2.PdfReader(file) | |
# text = "".join(page.extract_text() or "" for page in reader.pages) # Handle None cases | |
# return text | |
# except Exception as e: | |
# print(f"Error extracting text from {pdf_path}: {e}") | |
# return "" | |
# folder_path = "./" | |
# # Initialize the list to hold the extracted text chunks | |
# text_chunks = [] | |
# # Get all PDF filenames in the folder | |
# filenames = [f for f in os.listdir(folder_path) if f.lower().endswith(".pdf")] | |
# # Process each PDF file | |
# for index, file in enumerate(filenames, 1): | |
# print(f"\nProcessing file {index}: {file}") | |
# pdf_path = os.path.join(folder_path, file) | |
# try: | |
# # Extract text from the PDF | |
# extracted_text = extract_text_from_pdf(pdf_path) | |
# if extracted_text.strip(): # Ensure extracted text is not just whitespace | |
# # Split extracted text into chunks of 1000 characters | |
# chunks = [extracted_text[i:i+2000] for i in range(0, len(extracted_text), 2000)] | |
# # Append extracted chunks to the list | |
# text_chunks.extend(chunks) | |
# else: | |
# print(f"No text found in the PDF: {file}") | |
# except Exception as e: | |
# print(f"Error reading the PDF {file}: {e}") | |
from urllib.parse import urljoin, urlparse | |
import requests | |
from io import BytesIO | |
from bs4 import BeautifulSoup | |
from langchain_core.prompts import ChatPromptTemplate | |
import gradio as gr | |
def scrape_websites(base_urls): | |
try: | |
visited_links = set() # To avoid revisiting the same link | |
content_by_url = {} # Store content from each URL | |
for base_url in base_urls: | |
if not base_url.strip(): | |
continue # Skip empty or invalid URLs | |
print(f"Scraping base URL: {base_url}") | |
html_content = fetch_page_content(base_url) | |
if html_content: | |
cleaned_content = clean_body_content(html_content) | |
content_by_url[base_url] = cleaned_content | |
visited_links.add(base_url) | |
# Extract and process all internal links | |
soup = BeautifulSoup(html_content, "html.parser") | |
links = extract_internal_links(base_url, soup) | |
for link in links: | |
if link not in visited_links: | |
print(f"Scraping link: {link}") | |
page_content = fetch_page_content(link) | |
if page_content: | |
cleaned_content = clean_body_content(page_content) | |
content_by_url[link] = cleaned_content | |
visited_links.add(link) | |
# If the link is a PDF file, extract its content | |
if link.lower().endswith('.pdf'): | |
print(f"Extracting PDF content from: {link}") | |
pdf_content = extract_pdf_text(link) | |
if pdf_content: | |
content_by_url[link] = pdf_content | |
return content_by_url | |
except Exception as e: | |
print(f"Error during scraping: {e}") | |
return {} | |
def fetch_page_content(url): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
return response.text | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
def extract_internal_links(base_url, soup): | |
links = set() | |
for anchor in soup.find_all("a", href=True): | |
href = anchor["href"] | |
full_url = urljoin(base_url, href) | |
if is_internal_link(base_url, full_url): | |
links.add(full_url) | |
return links | |
def is_internal_link(base_url, link_url): | |
base_netloc = urlparse(base_url).netloc | |
link_netloc = urlparse(link_url).netloc | |
return base_netloc == link_netloc | |
def extract_pdf_text(pdf_url): | |
try: | |
response = requests.get(pdf_url) | |
response.raise_for_status() | |
# Open the PDF from the response content | |
with BytesIO(response.content) as file: | |
reader = PdfReader(file) | |
pdf_text = "" | |
for page in reader.pages: | |
pdf_text += page.extract_text() | |
return pdf_text if pdf_text else None | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching PDF {pdf_url}: {e}") | |
return None | |
except Exception as e: | |
print(f"Error reading PDF {pdf_url}: {e}") | |
return None | |
def clean_body_content(html_content): | |
soup = BeautifulSoup(html_content, "html.parser") | |
# Remove scripts and styles | |
for script_or_style in soup(["script", "style"]): | |
script_or_style.extract() | |
# Get text and clean up | |
cleaned_content = soup.get_text(separator="\n") | |
cleaned_content = "\n".join( | |
line.strip() for line in cleaned_content.splitlines() if line.strip() | |
) | |
return cleaned_content | |
# if __name__ == "__main__": | |
# website = [ | |
# #"https://www.rib.gov.rw/index.php?id=371", | |
# "https://haguruka.org.rw/our-work/" | |
# ] | |
# all_content = scrape_websites(website) | |
# # Temporary list to store (url, content) tuples | |
# temp_list = [] | |
# # Process and store each URL with its content | |
# for url, content in all_content.items(): | |
# temp_list.append((url, content)) | |
# processed_texts = [] | |
# # Process each element in the temporary list | |
# for element in temp_list: | |
# if isinstance(element, tuple): | |
# url, content = element # Unpack the tuple | |
# processed_texts.append(f"url: {url}, content: {content}") | |
# elif isinstance(element, str): | |
# processed_texts.append(element) | |
# else: | |
# processed_texts.append(str(element)) | |
# def chunk_string(s, chunk_size=2000): | |
# return [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)] | |
# # List to store the chunks | |
# chunked_texts = [] | |
# for text in processed_texts: | |
# chunked_texts.extend(chunk_string(text)) | |
data = [] | |
data.extend(context_data) | |
# data.extend([item for item in text_chunks if item not in data]) | |
# data.extend([item for item in chunked_texts if item not in data]) | |
#from langchain_community.vectorstores import Chroma | |
from langchain_chroma import Chroma | |
vectorstore = Chroma( | |
collection_name="GBV_set", | |
embedding_function=embed_model, | |
) | |
vectorstore.get().keys() | |
# add data to vector nstore | |
vectorstore.add_texts(data) | |
api= os.environ.get('V1') | |
from openai import OpenAI | |
from langchain_core.prompts import PromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
import gradio as gr | |
from typing import Iterator | |
import time | |
# Refined Template with Emotional Awareness | |
template = (""" | |
**Role**: Compassionate Regal Assistance and GBV Support Specialist with Emotional Awareness. | |
You are a friendly and empathetic chatbot designed to assist users in a conversational and human-like manner. Your goal is to provide accurate, helpful, and emotionally supportive responses based on the provided context: {context}. Follow these guidelines: | |
1. **Emotional Awareness** | |
- Acknowledge the user's emotions and respond with empathy. | |
- Use phrases like "I understand how you feel," "That sounds challenging," or "I'm here to support you." | |
- If the user expresses negative emotions, offer comfort and reassurance. | |
2. **Contextual Interaction** | |
- Begin with a warm and empathetic welcome message. | |
- Extract precise details from the provided context: {context}. | |
- Respond directly to the user's question: {question}.\ | |
- Only provide detailed information if user requests it. | |
- Remember the user's name is {first_name}. | |
3. **Communication Guidelines** | |
- Maintain a warm, conversational tone (avoid over-familiarity). | |
- Use occasional emojis for engagement (e.g., 😊, 🤗, ❤️). | |
- Provide clear, concise, and emotionally supportive information. | |
4. **Response Strategies** | |
- Greet users naturally and ask about their wellbeing (e.g., "Welcome, {first_name}! 😊 How are you feeling today?", "Hello {first_name}! 🤗 What's on your mind?"). | |
- Always start with a check-in about the user's wellbeing or current situation. | |
- Provide a concise summary with only relevant information. | |
- Avoid generating content beyond the context. | |
- Handle missing information transparently. | |
5. **No Extra Content** | |
- If no information matches the user's request: | |
* Respond politely: "I don't have that information at the moment, {first_name}. 😊" | |
* Offer alternative assistance options. | |
- Strictly avoid generating unsupported content. | |
- Prevent information padding or speculation. | |
6. **Extracting Relevant Links** | |
- If the user asks for a link related to their request `{question}`, extract the most relevant URL from `{context}` and provide it directly. | |
- Example response: | |
- "Here is the link you requested, [URL]" | |
7. **Real-Time Awareness** | |
- Acknowledge the current context when appropriate. | |
- Stay focused on the user's immediate needs. | |
**Context:** {context} | |
**User's Question:** {question} | |
**Your Response:** | |
""") | |
rag_prompt = PromptTemplate.from_template(template) | |
retriever = vectorstore.as_retriever() | |
class OpenRouterLLM: | |
def __init__(self, key: str): | |
try: | |
self.client = OpenAI( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=key # Corrected from `key=getmod` | |
) | |
self.headers = { | |
"HTTP-Referer": "http://localhost:3000", | |
"X-Title": "Local Development" | |
} | |
except Exception as e: | |
print(f"Initialization error: {e}") | |
raise | |
def stream(self, prompt: str) -> Iterator[str]: | |
try: | |
completion = self.client.chat.completions.create( | |
#model="deepseek/deepseek-r1-distill-llama-70b:free", | |
model="meta-llama/llama-3.3-70b-instruct:free", | |
messages=[{"role": "user", "content": prompt}], | |
stream=True | |
) | |
for chunk in completion: | |
delta = chunk.choices[0].delta | |
if hasattr(delta, "content") and delta.content: | |
yield delta.content | |
except Exception as e: | |
yield f"Streaming error: {str(e)}" | |
class UserSession: | |
def __init__(self): | |
self.current_user = None | |
self.welcome_message = None | |
self.conversation_history = [] # Add conversation history storage | |
def set_user(self, user_info): | |
self.current_user = user_info | |
self.set_welcome_message(user_info.get("Nickname", "Guest")) | |
# Initialize conversation history with welcome message | |
welcome = self.get_welcome_message() | |
#initial_message = (" " | |
#) | |
self.conversation_history = [ | |
{"role": "assistant", "content": welcome}, | |
#{"role": "assistant", "content": initial_message} | |
] | |
def get_user(self): | |
return self.current_user | |
def set_welcome_message(self, Nickname): | |
self.welcome_message = ( | |
f"<div style='font-size: 24px; font-weight: bold; color: #2E86C1;'>" | |
f"Welcome {Nickname}! 👋</div>" | |
f"<div style='font-size: 20px; color: #FFFFFF;'>" | |
f"We appreciate you reaching out to us. You are in a safe and trusted space designed to support you. " | |
f"Here, you can find guidance on gender-based violence (GBV) and legal assistance.<br><br>" | |
f"</div>" | |
) | |
def get_welcome_message(self): | |
return self.welcome_message | |
def add_to_history(self, role, message): | |
"""Add a message to the conversation history""" | |
self.conversation_history.append({"role": role, "content": message}) | |
def get_conversation_history(self): | |
"""Get the full conversation history""" | |
return self.conversation_history | |
def get_formatted_history(self): | |
"""Get conversation history formatted as a string for the LLM""" | |
formatted_history = "" | |
for entry in self.conversation_history: | |
role = "User" if entry["role"] == "user" else "Assistant" | |
formatted_history += f"{role}: {entry['content']}\n\n" | |
return formatted_history | |
# Initialize session | |
user_session = UserSession() | |
# Store user details and handle session | |
def collect_user_info(Nickname): | |
if not Nickname: | |
return "Nickname is required to proceed.", gr.update(visible=False), gr.update(visible=True), [] | |
# Store user info for chat session | |
user_info = { | |
"Nickname": Nickname, | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
} | |
# Set user in session | |
user_session.set_user(user_info) | |
# Generate welcome message | |
welcome_message = user_session.get_welcome_message() | |
# Add initial message to start the conversation | |
chat_history = add_initial_message([(None, welcome_message)]) | |
# Return welcome message and update UI | |
return welcome_message, gr.update(visible=True), gr.update(visible=False), chat_history | |
# Add initial message to start the conversation | |
def add_initial_message(chatbot): | |
#initial_message = (" " | |
# ) | |
return chatbot #+ [(None, initial_message)] | |
# Create RAG chain with user context and conversation history | |
def create_rag_chain(retriever, template, api_key): | |
llm = OpenRouterLLM(api_key) | |
rag_prompt = PromptTemplate.from_template(template) | |
def stream_func(input_dict): | |
# Get context using the retriever's invoke method | |
context = retriever.invoke(input_dict["question"]) | |
context_str = "\n".join([doc.page_content for doc in context]) | |
# Get user info from the session | |
user_info = user_session.get_user() or {} | |
first_name = user_info.get("Nickname", "User") | |
# Get conversation history | |
conversation_history = user_session.get_formatted_history() | |
# Format prompt with user context and conversation history | |
prompt = rag_prompt.format( | |
context=context_str, | |
question=input_dict["question"], | |
first_name=first_name, | |
conversation_history=conversation_history | |
) | |
# Stream response | |
return llm.stream(prompt) | |
return stream_func | |
def rag_memory_stream(message, history): | |
# Add user message to history | |
user_session.add_to_history("user", message) | |
# Initialize with empty response | |
partial_text = "" | |
full_response = "" | |
# Use the rag_chain with the question | |
for new_text in rag_chain({"question": message}): | |
partial_text += new_text | |
full_response = partial_text | |
yield partial_text | |
# After generating the complete response, add it to history | |
user_session.add_to_history("assistant", full_response) | |
# Gradio Interface Setup with improved UX | |
def chatbot_interface(): | |
# Get API key (in a real application, handle this more securely) | |
api_key = api # This should be properly defined or imported elsewhere | |
# Update the template to include conversation history | |
global template | |
template = """ | |
You are a compassionate and supportive AI assistant specializing in helping individuals affected by Gender-Based Violence (GBV). | |
Previous conversation: | |
{conversation_history} | |
Context information: | |
{context} | |
User {first_name} asks: {question} | |
Respond with empathy, providing support and resources based on the conversation. Keep answers short unless the user asks for more details, while maintaining a warm, supportive tone. | |
""" | |
# Create the RAG chain with user context | |
global rag_chain | |
rag_chain = create_rag_chain(retriever, template, api_key) | |
with gr.Blocks(css=""" | |
:root { | |
--background: #000000; | |
--text: #FFFFFF; | |
} | |
body { | |
background: var(--background) !important; | |
color: var(--text) !important; | |
font-family: 'Inter', system-ui, sans-serif; | |
margin: 0 !important; | |
padding: 0 !important; | |
width: 100vw !important; | |
height: 100vh !important; | |
display: flex; | |
flex-direction: column; | |
} | |
.gradio-container { | |
max-width: 100% !important; | |
width: 100vw !important; | |
height: 100vh !important; | |
margin: 0 !important; | |
padding: 20px !important; | |
display: flex; | |
flex-direction: column; | |
} | |
.welcome-box, .chat-container, .gr-textbox, .bot { | |
background: var(--background) !important; | |
color: var(--text) !important; | |
border-radius: 12px !important; | |
padding: 2rem !important; | |
border: 1px solid rgba(255, 255, 255, 0.1) !important; | |
box-shadow: 0 4px 6px rgba(255, 255, 255, 0.05) !important; | |
} | |
.gr-button-primary { | |
background: var(--background) !important; | |
color: var(--text) !important; | |
padding: 12px 24px !important; | |
border-radius: 8px !important; | |
transition: all 0.3s ease !important; | |
border: 1px solid rgba(255, 255, 255, 0.1) !important; | |
} | |
.gr-button-primary:hover { | |
transform: translateY(-1px); | |
box-shadow: 0 4px 12px rgba(255, 255, 255, 0.2) !important; | |
} | |
footer { | |
text-align: center !important; | |
color: var(--text) !important; | |
opacity: 0.7 !important; | |
padding: 1rem !important; | |
font-size: 0.9em !important; | |
} | |
.gr-markdown h3 { | |
color: var(--text) !important; | |
margin-bottom: 1rem !important; | |
} | |
""") as demo: | |
# User registration section | |
registration_container = gr.Column(visible=True) | |
with registration_container: | |
gr.Markdown("### Your privacy is our concern, please provide your nickname. ") | |
# with registration_container: | |
# gr.Markdown( | |
# "### Your privacy is our concern, please provide your nickname.", | |
# elem_id="registration-markdown" | |
# ) | |
with gr.Row(): | |
first_name = gr.Textbox( | |
label="Nickname", | |
placeholder="Enter your Nickname", | |
scale=1 | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Start Chatting", variant="primary", scale=2) | |
response_message = gr.Markdown(elem_id="welcome-message") | |
# Chatbot section (initially hidden) | |
chatbot_container = gr.Column(visible=False) | |
with chatbot_container: | |
chat_interface = gr.ChatInterface( | |
fn=rag_memory_stream, | |
title="Chat with GBVR", | |
fill_height=True | |
) | |
# with chatbot_container: | |
# chat_interface = gr.ChatInterface( | |
# fn=rag_memory_stream, | |
# title="Chat with GBVR", | |
# fill_height=True, | |
# elem_id="chat-title" | |
# ) | |
# Footer with version info | |
gr.Markdown("Ijwi ry'Ubufasha v1.0.0 © 2025", elem_id="footer") | |
# Handle user registration | |
submit_btn.click( | |
collect_user_info, | |
inputs=[first_name], | |
outputs=[response_message, chatbot_container, registration_container, chat_interface.chatbot] | |
) | |
return demo | |
# Launch the interface | |
if __name__ == "__main__": | |
# Launch the interface | |
chatbot_interface().launch(share=True) |