File size: 7,735 Bytes
d988646 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import os
from io import BytesIO
from PIL import Image
import google.generativeai as genai
import google.ai.generativelanguage as glm
from langchain.vectorstores import Chroma
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import streamlit as st
st.title("DocsGPT")
genai.configure(api_key=os.environ['GOOGLE_API_KEY'])
st.markdown(
"""
<style>
.css-1jc7ptx, .e1ewe7hr3, .viewerBadge_container__1QSob,
.styles_viewerBadge__1yB5_, .viewerBadge_link__1S137,
.viewerBadge_text__1JaDK {
display: none;
}
</style>
""",
unsafe_allow_html=True
)
rag = glm.Tool(
function_declarations=[
glm.FunctionDeclaration(
name='vector_search',
description="Returns the content of the document user attached. Make sure that your not passing query as a question use like **keywords** instead. Use this function to search for contents in the user attached or uploaded documents to you. Try not to completly paste the user question as query, instead use keywords.",
parameters=glm.Schema(
type=glm.Type.OBJECT,
properties={
'query': glm.Schema(type=glm.Type.STRING),
},
required=['query']
)
)
]
)
gemini = genai.GenerativeModel('gemini-pro', tools=[rag])
gemini_vision = genai.GenerativeModel('gemini-pro-vision')
class rawkn:
def __init__(self, text):
self.text = text
def get_relevant_documents(self, query):
return self.text
def loader_data(files, include_getting_real):
file_type = files[0].type if len(files) > 0 else "application/pdf"
total_content = ''
num_pages = 0
if include_getting_real:
files.append("./getting_real_basecamp.pdf")
for file in files:
if file_type == "application/pdf":
pdf_reader = PdfReader(file)
content = ''
for page in pdf_reader.pages:
num_pages += 1
content += page.extract_text()
for img in page.images:
try:
image_stream = BytesIO(img.data)
img = Image.open(image_stream)
img_desc = gemini_vision.generate_content(["Generate a detailed description of the image. If it is a flow chart, please create a flowchart that exactly as it is. If it is table, try to create a table exactly like in the image. write all the text in the image it it contains any text. Clearly explain the image in more detailed.\nAlso make sure give a nice heading to the image contant.", img]).candidates[0].content.parts[0].text
print("***************************")
print(img_desc)
print("***************************")
content += "Image content:\n" + img_desc
except:
print("cannot extract image")
if file_type == "text/plain":
content = file.read()
content = content.decode("utf-8")
total_content += content
if num_pages <= 2:
chunk_size = 500
elif num_pages <= 3:
chunk_size = 1000
elif num_pages <= 5:
chunk_size = 2000
elif num_pages <= 10:
chunk_size = 3000
else:
chunk_size = 4000
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=0)
texts = text_splitter.split_text(total_content)
try:
embeddings = GoogleGenerativeAIEmbeddings(model = "models/embedding-001")
vector_store = Chroma.from_texts(texts, embeddings).as_retriever()
st.session_state.knowledge = vector_store
st.session_state.chat.history.append(glm.Content(
parts=[glm.Part(
text=f"Now i've uploaded some files.\nHere are the list of documents you have access to:\n{[i.name if type(i) != str else i for i in files]}"
)],
role="user"
)
)
st.session_state.chat.history.append(glm.Content(
parts=[glm.Part(
text=f"Sure! Ask me anything about the documents you have uploaded. I can help you with that."
)],
role="model"
)
)
except:
st.session_state.knowledge = rawkn(total_content)
if "history" not in st.session_state:
st.session_state.history = []
if "knowledge" not in st.session_state:
st.session_state.knowledge = None
if "chat" not in st.session_state:
st.session_state.chat = gemini.start_chat(history=[glm.Content(
parts=[glm.Part(
text="Your name is DocsGPT. You are very helpful and can assist with documents uploaded by the user. Use the vector_search tool/function to search for contents in the user attached or uploaded documents to you.\nYou have access to all documents uploaded by the user."
)],
role="user"
),
glm.Content(
parts=[glm.Part(
text="Sure, i can do that for you."
)],
role="model"
)])
for history in st.session_state.history:
with st.chat_message(history["role"]):
st.markdown(history["text"])
with st.sidebar:
st.title("Knowledge")
st.markdown("""### Tips to use DocsGPT:
- Upload your documents [pdf, txt] to DocsGPT and make sure to click on the process button.
- wait for a second and then start chatting with DocsGPT.
- While asking questions to DocsGPT about your uploaded files, please refer your uploaded files as *Document*, *Docs*, *attached or uploaded docs*, so the model can easily understands what you are referring to.""")
files = st.file_uploader("Upload a file", accept_multiple_files=True, type=["pdf", "txt"])
include_getting_real = st.checkbox("Include getting-real?")
process = st.button("Process")
if process and files:
with st.spinner('loading your file. This may take a while...'):
loader_data(files, include_getting_real)
elif process and include_getting_real:
with st.spinner('loading your file. This may take a while...'):
loader_data([], include_getting_real)
if prompt := st.chat_input("Enter your message..."):
st.session_state.history.append({"role": "user", "text": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
message_placeholder = st.empty()
response = st.session_state.chat.send_message(prompt)
if response.candidates[0].content.parts[0].text == '':
args = response.candidates[0].content.parts[0].function_call.args['query']
if st.session_state.knowledge is not None:
print("searching for ", args)
related_docs = str(st.session_state.knowledge.get_relevant_documents(args))
print(related_docs)
else:
related_docs = 'No knowledge documents loaded'
response = st.session_state.chat.send_message(
glm.Content(
parts=[glm.Part(
function_response = glm.FunctionResponse(
name='vector_search',
response={'rag': related_docs},
)
)]
)
).candidates[0].content.parts[0].text
else:
response = response.candidates[0].content.parts[0].text
print(st.session_state.chat.history)
message_placeholder.markdown(response)
st.session_state.history.append({"role": "assistant", "text": response})
|