Spaces:
Sleeping
Sleeping
from flask import Flask, request | |
from twilio.twiml.messaging_response import MessagingResponse | |
from twilio.rest import Client | |
import os | |
import requests | |
from PIL import Image | |
import shutil | |
from langchain.vectorstores.chroma import Chroma | |
from langchain.prompts import ChatPromptTemplate | |
from langchain_community.llms.ollama import Ollama | |
from get_embedding_function import get_embedding_function | |
from langchain.document_loaders import PyPDFDirectoryLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.schema import Document | |
import tempfile | |
app = Flask(__name__) | |
UPLOAD_FOLDER = '/code/uploads' | |
CHROMA_PATH = tempfile.mkdtemp() # Use the same folder for Chroma | |
if not os.path.exists(UPLOAD_FOLDER): | |
os.makedirs(UPLOAD_FOLDER) | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
class ConversationBufferMemory: | |
def __init__(self, max_size=6): | |
self.memory = [] | |
self.max_size = max_size | |
def add_to_memory(self, interaction): | |
self.memory.append(interaction) | |
if len(self.memory) > self.max_size: | |
self.memory.pop(0) | |
def get_memory(self): | |
return self.memory | |
conversation_memory = ConversationBufferMemory(max_size=2) | |
account_sid = os.environ.get('TWILIO_ACCOUNT_SID') | |
auth_token = os.environ.get('TWILIO_AUTH_TOKEN') | |
client = Client(account_sid, auth_token) | |
from_whatsapp_number = 'whatsapp:+14155238886' | |
PROMPT_TEMPLATE = """ | |
Answer the question based only on the following context: | |
{context} | |
--- | |
Answer the question based on the above context: {question} | |
""" | |
AI71_API_KEY = os.environ.get('AI71_API_KEY') | |
def generate_response(query, chat_history): | |
response = '' | |
for chunk in AI71(AI71_API_KEY).chat.completions.create( | |
model="tiiuae/falcon-180b-chat", | |
messages=[ | |
{"role": "system", "content": "You are the best agricultural assistant. Remember to give a response in not more than 2 sentences. Greet the user if the user greets you."}, | |
{"role": "user", "content": f'''Answer the query based on history {chat_history}: {query}'''}, | |
], | |
stream=True, | |
): | |
if chunk.choices[0].delta.content: | |
response += chunk.choices[0].delta.content | |
return response.replace("###", '').replace('\nUser:', '') | |
def convert_img(url, account_sid, auth_token): | |
try: | |
response = requests.get(url, auth=HTTPBasicAuth(account_sid, auth_token)) | |
response.raise_for_status() | |
parsed_url = urlparse(url) | |
media_id = parsed_url.path.split('/')[-1] | |
filename = f"downloaded_media_{media_id}" | |
media_filepath = os.path.join(UPLOAD_FOLDER, filename) | |
with open(media_filepath, 'wb') as file: | |
file.write(response.content) | |
print(f"Media downloaded successfully and saved as {media_filepath}") | |
with open(media_filepath, 'rb') as img_file: | |
image = Image.open(img_file) | |
converted_filename = f"image.jpg" | |
converted_filepath = os.path.join(UPLOAD_FOLDER, converted_filename) | |
image.convert('RGB').save(converted_filepath, 'JPEG') | |
return converted_filepath | |
except requests.exceptions.HTTPError as err: | |
print(f"HTTP error occurred: {err}") | |
except Exception as err: | |
print(f"An error occurred: {err}") | |
def get_weather(city): | |
city = city.strip().replace(' ', '+') | |
r = requests.get(f'https://www.google.com/search?q=weather+in+{city}') | |
soup = BeautifulSoup(r.text, 'html.parser') | |
temperature = soup.find('div', attrs={'class': 'BNeawe iBp4i AP7Wnd'}).text | |
return temperature | |
def download_and_save_as_txt(url, account_sid, auth_token): | |
try: | |
response = requests.get(url, auth=HTTPBasicAuth(account_sid, auth_token)) | |
response.raise_for_status() | |
parsed_url = urlparse(url) | |
media_id = parsed_url.path.split('/')[-1] | |
filename = f"pdf_file.pdf" | |
txt_filepath = os.path.join(UPLOAD_FOLDER, filename) | |
with open(txt_filepath, 'wb') as file: | |
file.write(response.content) | |
print(f"Media downloaded successfully and saved as {txt_filepath}") | |
return txt_filepath | |
except requests.exceptions.HTTPError as err: | |
print(f"HTTP error occurred: {err}") | |
except Exception as err: | |
print(f"An error occurred: {err}") | |
def initialize_chroma(): | |
try: | |
# Initialize Chroma | |
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=get_embedding_function()) | |
# Perform an initial operation to ensure it works | |
db.similarity_search_with_score("test query", k=1) | |
print("Chroma initialized successfully.") | |
except Exception as e: | |
print(f"Error initializing Chroma: {e}") | |
initialize_chroma() | |
def query_rag(query_text: str): | |
embedding_function = get_embedding_function() | |
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
results = db.similarity_search_with_score(query_text, k=5) | |
if not results: | |
response_text = "Sorry, I couldn't find any relevant information." | |
else: | |
context_text = "\n\n---\n\n".join([doc.page_content for doc, _score in results]) | |
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE) | |
prompt = prompt_template.format(context=context_text, question=query_text) | |
response = '' | |
for chunk in AI71(AI71_API_KEY).chat.completions.create( | |
model="tiiuae/falcon-180b-chat", | |
messages=[ | |
{"role": "system", "content": "You are the best agricultural assistant. Remember to give a response in not more than 2 sentences."}, | |
{"role": "user", "content": f'''Answer the following query based on the given context: {prompt}'''}, | |
], | |
stream=True, | |
): | |
if chunk.choices[0].delta.content: | |
response += chunk.choices[0].delta.content | |
response_text = response.replace("###", '').replace('\nUser:', '') | |
return response_text | |
def save_pdf_and_update_database(pdf_filepath): | |
try: | |
# Assuming you're loading PDFs from a specific directory | |
document_loader = PyPDFDirectoryLoader(os.path.dirname(pdf_filepath)) | |
documents = document_loader.load() | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=800, | |
chunk_overlap=80, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
chunks = text_splitter.split_documents(documents) | |
add_to_chroma(chunks) | |
print(f"PDF processed and data updated in Chroma.") | |
except Exception as e: | |
print(f"Error in processing PDF: {e}") | |
def add_to_chroma(chunks: list[Document]): | |
try: | |
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=get_embedding_function()) | |
chunks_with_ids = calculate_chunk_ids(chunks) | |
existing_items = db.get(include=[]) | |
existing_ids = set(existing_items["ids"]) | |
new_chunks = [chunk for chunk in chunks_with_ids if chunk.metadata["id"] not in existing_ids] | |
if new_chunks: | |
new_chunk_ids = [chunk.metadata["id"] for chunk in new_chunks] | |
db.add_documents(new_chunks, ids=new_chunk_ids) | |
db.persist() | |
print(f"Chunks added to Chroma.") | |
except Exception as e: | |
print(f"Error adding chunks to Chroma: {e}") | |
def calculate_chunk_ids(chunks): | |
last_page_id = None | |
current_chunk_index = 0 | |
for chunk in chunks: | |
source = chunk.metadata.get("source") | |
page = chunk.metadata.get("page") | |
current_page_id = f"{source}:{page}" | |
if current_page_id == last_page_id: | |
current_chunk_index += 1 | |
else: | |
current_chunk_index = 0 | |
last_page_id = current_page_id | |
chunk_id = f"{current_page_id}:{current_chunk_index}" | |
chunk.metadata["id"] = chunk_id | |
return chunks | |
def whatsapp_webhook(): | |
incoming_msg = request.values.get('Body', '').lower() | |
sender = request.values.get('From') | |
num_media = int(request.values.get('NumMedia', 0)) | |
chat_history = conversation_memory.get_memory() | |
if num_media > 0: | |
media_url = request.values.get('MediaUrl0') | |
content_type = request.values.get('MediaContentType0') | |
if content_type.startswith('image/'): | |
# Handle image processing (disease/pest detection) | |
filepath = convert_img(media_url, account_sid, auth_token) | |
response_text = handle_image(filepath) | |
elif content_type == 'application/pdf': | |
# Handle PDF processing | |
filepath = download_and_save_as_txt(media_url, account_sid, auth_token) | |
save_pdf_and_update_database(filepath) | |
response_text = "PDF received and processed." | |
else: | |
response_text = "Unsupported media type. Please send a PDF or image file." | |
elif "weather" in incoming_msg: | |
city = incoming_msg.replace("weather", "").strip() | |
temperature = get_weather(city) | |
response_text = f"The current temperature in {city} is {temperature}" | |
else: | |
# Generate response using the question and chat history | |
response_text = query_rag(incoming_msg) | |
# Add interaction to memory | |
interaction = {'role': 'user', 'content': incoming_msg, 'response': response_text} | |
conversation_memory.add_to_memory(interaction) | |
# Send the response | |
resp = MessagingResponse() | |
msg = resp.message() | |
msg.body(response_text) | |
return str(resp) | |
if __name__ == "__main__": | |
send_initial_message('919080522395') | |
send_initial_message('916382792828') | |
app.run(host='0.0.0.0', port=7860) | |