Dharma20's picture
Create pipeline.py
dd64543 verified
from itertools import chain
from typing import Any, List
from haystack.components.converters import PyPDFToDocument, MarkdownToDocument, TextFileToDocument, OutputAdapter
from haystack.components.routers import FileTypeRouter
from haystack.components.joiners import DocumentJoiner
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter
from haystack.components.builders import ChatPromptBuilder, PromptBuilder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.core.component.types import Variadic
from haystack_experimental.chat_message_stores.in_memory import InMemoryChatMessageStore
from haystack_experimental.components.retrievers import ChatMessageRetriever
from haystack_experimental.components.writers import ChatMessageWriter
from haystack_integrations.components.generators.cohere import CohereChatGenerator, CohereGenerator
from haystack_experimental.components.retrievers import ChatMessageRetriever
from haystack_experimental.components.writers import ChatMessageWriter
from haystack.dataclasses import ChatMessage
from haystack import Pipeline
from haystack import component
import os
from dotenv import load_dotenv
# Load .env file
load_dotenv()
# Access the API key
os.environ["COHERE_API_KEY"] = os.getenv('COHERE_API_KEY')
document_store = InMemoryDocumentStore()
file_type_router = FileTypeRouter(mime_types=['text/plain','application/pdf','text/markdown'])
pdf_converter = PyPDFToDocument()
text_file_converter = TextFileToDocument()
markdown_converter = MarkdownToDocument()
document_joiner = DocumentJoiner()
document_cleaner = DocumentCleaner()
document_splitter = DocumentSplitter(split_by='word', split_overlap=50)
document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L12-v2")
document_writer = DocumentWriter(document_store)
preprocessing_pipeline = Pipeline()
# Adding Componenets
preprocessing_pipeline.add_component('file_type_router', file_type_router)
preprocessing_pipeline.add_component('text_file_converter', text_file_converter)
preprocessing_pipeline.add_component('markdown_converter', markdown_converter)
preprocessing_pipeline.add_component('pdf_converter', pdf_converter)
preprocessing_pipeline.add_component('document_joiner', document_joiner)
preprocessing_pipeline.add_component('document_cleaner', document_cleaner)
preprocessing_pipeline.add_component('document_splitter', document_splitter)
preprocessing_pipeline.add_component('document_embedder', document_embedder)
preprocessing_pipeline.add_component('document_writer', document_writer)
# Connections
preprocessing_pipeline.connect('file_type_router.text/plain', 'text_file_converter.sources')
preprocessing_pipeline.connect('file_type_router.application/pdf', 'pdf_converter.sources')
preprocessing_pipeline.connect('file_type_router.text/markdown', 'markdown_converter.sources')
preprocessing_pipeline.connect('text_file_converter', 'document_joiner')
preprocessing_pipeline.connect('markdown_converter', 'document_joiner')
preprocessing_pipeline.connect('pdf_converter', 'document_joiner')
preprocessing_pipeline.connect('document_joiner', 'document_cleaner')
preprocessing_pipeline.connect('document_cleaner', 'document_splitter')
preprocessing_pipeline.connect('document_splitter', 'document_embedder')
preprocessing_pipeline.connect('document_embedder', 'document_writer')
@component
class ListJoiner:
def __init__(self, _type: Any):
component.set_output_types(self, values=_type)
def run(self, values:Variadic[Any]):
result = list(chain(*values))
return {'values':result}
memory_store = InMemoryChatMessageStore()
query_rephrase_template="""
Rewrite the question for search while keeping its meaning and key terms intact.
If the conversation history is empty, DO NOT change the query.
Use conversation history only if necessary, and avoid extending the query with your own knowledge.
If no changes are needed, output the current question as is.
Conversation history:
{% for memory in memories %}
{{ memory.content }}
{% endfor %}
User Query: {{query}}
Rewritten Query:
"""
conversational_rag = Pipeline()
#Query rephrasing components
conversational_rag.add_component("query_rephrase_prompt_builder",PromptBuilder(query_rephrase_template))
conversational_rag.add_component('query_rephrase_llm',CohereGenerator())
conversational_rag.add_component('list_to_str_adapter', OutputAdapter(template="{{ replies[0] }}", output_type=str))
#RAG components
conversational_rag.add_component('retriever', InMemoryBM25Retriever(document_store=document_store, top_k=3))
conversational_rag.add_component('prompt_builder', ChatPromptBuilder(variables=["query", "documents", "memories"],required_variables=['query', 'documents', 'memories']))
conversational_rag.add_component('llm', CohereChatGenerator())
#Memory components
conversational_rag.add_component('memory_retriever',ChatMessageRetriever(memory_store))
conversational_rag.add_component('memory_writer', ChatMessageWriter(memory_store))
conversational_rag.add_component('memory_joiner', ListJoiner(List[ChatMessage]))
#Query Rephrasing Connections
conversational_rag.connect('memory_retriever', 'query_rephrase_prompt_builder.memories')
conversational_rag.connect('query_rephrase_prompt_builder.prompt', 'query_rephrase_llm' )
conversational_rag.connect('query_rephrase_llm.replies', 'list_to_str_adapter')
conversational_rag.connect('list_to_str_adapter', 'retriever.query')
#RAG connections
conversational_rag.connect('retriever.documents', 'prompt_builder.documents')
conversational_rag.connect('prompt_builder.prompt', 'llm.messages')
conversational_rag.connect('llm.replies', 'memory_joiner')
#Memory Connections
conversational_rag.connect('memory_joiner','memory_writer')
conversational_rag.connect('memory_retriever','prompt_builder.memories')
system_message = ChatMessage.from_system("""You are an intelligent and cheerful AI assistant specialized in assisting humans with queries based on provided supporting documents and conversation history.
Always prioritize accurate and concise answers derived from the documents, and offer contextually relevant follow-up questions to maintain an engaging and helpful conversation.
If the answer is not present in the documents, politely inform the user while suggesting alternative ways to help""")
user_message_template ="""Based on the conversation history and the provided supporting documents, provide a brief and accurate answer to the question.
Make the conversation feel more natural and engaging
- Format your response for clarity and readability, using bullet points, paragraphs, or lists where necessary.
- Note: Supporting documents are not part of the conversation history.
- If the question cannot be answered using the supporting documents, respond with: "The answer is not available in the provided documents."
Conversation History:
{% for memory in memories %}
{{ memory.content }}
{% endfor %}
Supporting Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query }}
Answer:
"""
user_message = ChatMessage.from_user(user_message_template)