Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
from langchain_community.document_loaders import TextLoader, DirectoryLoader, UnstructuredPDFLoader, UnstructuredWordDocumentLoader | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
import json | |
from google.oauth2 import service_account | |
load_dotenv() | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if GEMINI_API_KEY is None: | |
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
conf = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS') | |
service_account_info = json.loads(conf) | |
service_account_info = eval(service_account_info) | |
credentials = service_account.Credentials.from_service_account_info(service_account_info) | |
DOCUMENT_DIR = 'document/' | |
COLLECTION_NAME = "health_documents" | |
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", GEMINI_API_KEY=GEMINI_API_KEY, temperature=0.7, credentials=credentials) | |
print("Models initialized successfully.") | |
import os | |
from dotenv import load_dotenv | |
from langchain_community.llms import HuggingFacePipeline | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.document_loaders import TextLoader, DirectoryLoader, UnstructuredPDFLoader, UnstructuredWordDocumentLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
from langchain_community.vectorstores.utils import filter_complex_metadata | |
from langchain_community.vectorstores import Chroma | |
import torch | |
from constants import CHROMA_PATH | |
# Load environment variables (if needed) | |
load_dotenv() | |
# Define the directory containing the documents | |
DOCUMENT_DIR = 'document/' | |
# Define the path for ChromaDB persistent storage | |
# Initialize Hugging Face Embeddings | |
# You can change the model to any suitable embedding model | |
embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/gtr-t5-large", | |
model_kwargs={'device': 'cpu'} | |
) | |
def load_documents(directory): | |
"""Load documents from multiple file types.""" | |
documents = [] | |
# Load text files | |
text_loader = DirectoryLoader( | |
directory, | |
glob="**/*.txt", | |
loader_cls=TextLoader | |
) | |
documents.extend(text_loader.load()) | |
# Load Word documents | |
docx_loader = DirectoryLoader( | |
directory, | |
glob="**/*.docx", | |
loader_cls=UnstructuredWordDocumentLoader, | |
loader_kwargs={"mode": "elements"} | |
) | |
documents.extend(docx_loader.load()) | |
# Load PDF files | |
pdf_loader = DirectoryLoader( | |
directory, | |
glob="**/*.pdf", | |
loader_cls=UnstructuredPDFLoader | |
) | |
documents.extend(pdf_loader.load()) | |
print(f"Loaded {len(documents)} documents.") | |
return documents | |
def split_documents(documents, chunk_size=1000, chunk_overlap=200): | |
"""Split documents into smaller chunks.""" | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap | |
) | |
chunks = text_splitter.split_documents(documents) | |
return chunks | |
def create_and_store_embeddings(chunks): | |
"""Create or load ChromaDB vector store.""" | |
# Ensure the Chroma path exists | |
os.makedirs(CHROMA_PATH, exist_ok=True) | |
print(f"unfiltered chunks: {len(chunks)}") | |
filtered_chunks = [] | |
for chunk in chunks: | |
# Create a new document with filtered metadata | |
filtered_metadata = {k: v for k, v in chunk.metadata.items() | |
if isinstance(v, (str, int, float, bool))} | |
chunk.metadata = filtered_metadata | |
filtered_chunks.append(chunk) | |
print(f"Filtered metadata for {len(filtered_chunks)} chunks.") | |
# Create or load the vector store | |
vector_store = Chroma.from_documents( | |
documents=filtered_chunks, | |
embedding=embeddings, | |
persist_directory=CHROMA_PATH | |
) | |
print("Created ChromaDB vector store.") | |
return vector_store | |
def load_vectordb(path:str=CHROMA_PATH): | |
if os.path.exists(path): | |
vector_store = Chroma(persist_directory=path, embedding_function=embeddings) | |
print("Loaded ChromaDB vector store.") | |
return vector_store | |
else: | |
raise ValueError(f"ChromaDB path {path} does not exist.") | |
from langchain.chains import LLMChain | |
from langchain.chains.base import Chain | |
def create_health_agent(vector_store): | |
"""Create a custom retrieval QA chain for health-related queries.""" | |
prompt_template = """You are a helpful health assistant. Who will talk to the user as human and resolve their queries. | |
Use Previous_Conversation to maintain consistency in the conversation. | |
These are Previous_Conversation between you and user. | |
Previous_Conversation: \n{previous_conversation} | |
These are info about the person. | |
User_Data: \n{user_data} | |
Thoroughly analyze the Context, and also use context to answer the questions, aside of your knowledge. | |
Points to Adhere: | |
1. Only tell the schemes if user specifically asked, otherwise don't share schemes information. | |
2. If the user asks about schemes, Ask about what state they belong to first. | |
2. You can act as a mental-health counselor if needed. | |
3. Give precautions and natural-remedies for the diseases, if user asked or it's needed, only for Common diseases include the common cold, flu etc. | |
4. Also Use Information from Context to answer the questions. | |
6. Ask the preferred language of the user, In the starting of the conversation. | |
Context: {context}\n | |
Question: {question} | |
Answer:""" | |
PROMPT = PromptTemplate( | |
template=prompt_template, | |
input_variables=["context", "question", "previous_conversation", "user_data"] | |
) | |
if llm is None: | |
raise ValueError("No language model initialized. Please check the model initialization.") | |
# Create a retriever | |
retriever = vector_store.as_retriever(search_kwargs={"k": 10}) | |
class CustomRetrievalQA(Chain): | |
retriever: object | |
llm_chain: LLMChain | |
def input_keys(self): | |
return ['query', 'previous_conversation', 'user_data'] | |
def output_keys(self): | |
return ['result'] | |
def _call(self, inputs): | |
query = inputs['query'] | |
previous_conversation = inputs.get('previous_conversation', '') | |
user_data = inputs.get('user_data', '') | |
# Retrieve relevant documents | |
docs = retriever.get_relevant_documents(query) | |
context = "\n".join([doc.page_content for doc in docs]) | |
# Prepare inputs for the LLM chain | |
llm_inputs = { | |
'context': context, | |
'question': query, | |
'previous_conversation': previous_conversation, | |
'user_data': user_data | |
} | |
# Generate response | |
result = self.llm_chain(llm_inputs) | |
return {'result': result['text']} | |
# Create the LLM chain | |
llm_chain = LLMChain(llm=llm, prompt=PROMPT) | |
# Create and return the custom chain | |
return CustomRetrievalQA(retriever=retriever, llm_chain=llm_chain, user_data=None) | |
def agent_with_db(): | |
# 1. Load documents | |
vector_store = load_vectordb(CHROMA_PATH) | |
UPDATE_DB = os.getenv("UPDATE_DB") | |
if UPDATE_DB.lower()=="true": | |
UPDATE_DB = True | |
if vector_store is None or UPDATE_DB is True: | |
print("Loading documents...") | |
print(vector_store, UPDATE_DB) | |
documents = load_documents(DOCUMENT_DIR) | |
print("Splitting documents into chunks...") | |
chunks = split_documents(documents) | |
print(f"Split into {len(chunks)} chunks.") | |
print("Creating and storing embeddings in ChromaDB...") | |
try: | |
vector_store = create_and_store_embeddings(chunks) | |
print("Embeddings stored successfully in ChromaDB.") | |
except Exception as e: | |
print(f"An error occurred while creating or storing embeddings: {e}") | |
return | |
print("Creating the health agent...") | |
health_agent = create_health_agent(vector_store) | |
return health_agent |