import streamlit as st |
from annotated_text import annotated_text, annotation |
import fitz |
import os |
import chromadb |
import uuid |
from pathlib import Path |
import os |
os.environ['OPENAI_API_KEY'] = os.environ['OPEN_API_KEY'] |
st.title("Contracts Multiple File Search ") |
import pandas as pd |
from langchain.retrievers import BM25Retriever, EnsembleRetriever |
from langchain.schema import Document |
from langchain.vectorstores import Chroma |
from langchain.embeddings import HuggingFaceEmbeddings |
embedding = HuggingFaceEmbeddings(model_name='BAAI/bge-base-en-v1.5') |
from FlagEmbedding import FlagReranker |
reranker = FlagReranker('BAAI/bge-reranker-base') |
import spacy |
nlp = spacy.load("en_core_web_md") |
def util_upload_file_and_return_list_docs(uploaded_files): |
list_docs = [] |
list_save_path = [] |
for uploaded_file in uploaded_files: |
save_path = Path(os.getcwd(), uploaded_file.name) |
with open(save_path, mode='wb') as w: |
w.write(uploaded_file.getvalue()) |
docs = fitz.open(save_path) |
list_docs.append(docs) |
list_save_path.append(save_path) |
return(list_docs, list_save_path) |
def split_txt_file_synthetic_sentence_rolling(ctxt, sentence_size_in_chars, sliding_size_in_chars,debug=False): |
sliding_size_in_chars = sentence_size_in_chars - sliding_size_in_chars |
pos_start = 0 |
pos_end = len(ctxt) |
final_return = [] |
if(debug): |
print('pos_start : ',pos_start) |
print('pos_end : ',pos_end) |
if(pos_end<sentence_size_in_chars): |
return([{'section_org_text':ctxt[pos_start:pos_end],'section_char_start':pos_start,'section_char_end':pos_end}]) |
if(sentence_size_in_chars<sliding_size_in_chars): |
return(None) |
stop_condition = False |
start = pos_start |
end = start + sentence_size_in_chars |
mydict = {} |
mydict['section_org_text'] = ctxt[start:end] |
mydict['section_char_start'] = start |
mydict['section_char_end'] = end |
final_return.append(mydict) |
while(stop_condition==False): |
start = end - sliding_size_in_chars |
end = start + sentence_size_in_chars |
if(end>pos_end): |
if(start<pos_end): |
end = pos_end |
mydict = {} |
mydict['section_org_text'] = ctxt[start:end] |
mydict['section_char_start'] = start |
mydict['section_char_end'] = end |
final_return.append(mydict) |
stop_condition=True |
else: |
stop_condition=True |
else: |
mydict = {} |
mydict['section_org_text'] = ctxt[start:end] |
mydict['section_char_start'] = start |
mydict['section_char_end'] = end |
final_return.append(mydict) |
if(debug): |
print('start : ', start) |
print('end : ', end) |
return(final_return) |
def split_into_sentences_with_offsets(text): |
""" |
Splits a paragraph into sentences and returns them along with their start and end offsets. |
:param text: The input text to be split into sentences. |
:return: A list of tuples, each containing a sentence and its start and end offsets. |
""" |
doc = nlp(text) |
return [(sent.text, sent.start_char, sent.end_char) for sent in doc.sents] |
def util_get_list_page_and_passage(list_docs, list_save_path): |
passage_documents = [] |
for ind_doc, docs in enumerate(list_docs): |
for txt_index, txt_page in enumerate(docs): |
page_document = txt_page.get_text() |
sections = split_into_sentences_with_offsets(page_document) |
for sub_sub_index, sub_sub_item in enumerate(sections): |
sub_text=sub_sub_item[0] |
passage_document = Document(page_content=sub_text, metadata={"page_content": page_document,"page_index": txt_index, "file_name" : str(list_save_path[ind_doc])}) |
passage_documents.append(passage_document) |
return(passage_documents) |
def util_get_only_content_inside_loop(page_no,page_documents): |
for index, item in enumerate(page_documents): |
if(page_documents[index].metadata['txt_page_index']==page_no): |
return(page_documents[index].get_content()) |
return(None) |
def util_get_list_pageno_and_contents(some_query_passage,passage_documents,passage_nodes): |
''' page no starts with index 1 ''' |
return_value = [] |
rescore = reranker.compute_score([[some_query_passage , x.page_content] for x in passage_nodes]) |
print('rescore :: ',rescore) |
tmp_array = [] |
for i, x in enumerate(passage_nodes): |
tmp_dict = {"passage_content":x.page_content, |
"page_no":int(x.metadata['page_index'])+1, |
"page_content":str(x.metadata['page_content']), |
"file_name": str(x.metadata['file_name']), |
"score" : float(rescore[i])} |
tmp_array.append(tmp_dict) |
df = pd.DataFrame(tmp_array) |
df = df.sort_values(by='score', ascending=False) |
df = df.drop_duplicates(subset=['file_name'], keep='first') |
df = df[["passage_content","file_name","page_no","page_content"]] |
return(df) |
def util_openai_extract_clause(example_prompt, page_content): |
import openai |
openai.api_key = os.environ['OPENAI_API_KEY'] |
content = example_prompt |
content = content + "\n Answer precisely; do not add anything extra, and try to locate the answer in the below context \n context: " |
return_value = openai.ChatCompletion.create(model="gpt-3.5-turbo",temperature=0.0001,messages=[{"role": "user", "content": content + "\n" + page_content},]) |
return(str(return_value['choices'][0]['message']['content'])) |
def util_openai_hyde(example_prompt): |
import openai |
openai.api_key = os.environ['OPENAI_API_KEY'] |
content = example_prompt |
return_value = openai.ChatCompletion.create(model="gpt-3.5-turbo",temperature=0.0001,messages=[ |
{"role": "system", "content": "You are a legal contract lawyer. generate a summary from below text " + "\n"}, |
{"role": "user", "content": example_prompt + "\n"}, |
] |
) |
return(str(return_value['choices'][0]['message']['content'])) |
def util_openai_format (example_passage, page_content): |
''' |
annotated_text(" ",annotation("ENTITY : ", str(page_no)),) |
''' |
if(True): |
found_value = util_openai_extract_clause(example_passage, page_content) |
if(len(found_value)>0): |
found_value = found_value.strip() |
first_index = page_content.find(found_value) |
if(first_index!=-1): |
print('first_index : ',first_index) |
print('found_value : ',found_value) |
return(annotated_text(page_content[0:first_index-1],annotation(found_value, " FOUND ENTITY "),page_content[first_index+len(found_value):])) |
return(annotated_text(page_content)) |
def util_openai_modify_prompt(example_prompt, page_content): |
import openai |
openai.api_key = os.environ['OPENAI_API_KEY'] |
my_prompt = """Expand the original Query to show exact resuls for extraction\n |
Query: """ + example_prompt |
return_value = openai.ChatCompletion.create(model="gpt-4",temperature=0.0001,messages=[{"role": "user", "content": my_prompt},]) |
return(str(return_value['choices'][0]['message']['content'])) |
passage_documents = [] |
with st.form("my_form"): |
multi = '''1. Download and Upload Multiple contracts |
e.g. https://www.barc.gov.in/tenders/GCC-LPS.pdf |
e.g. https://www.montrosecounty.net/DocumentCenter/View/823/Sample-Construction-Contract |
''' |
st.markdown(multi) |
multi = '''2. Insert Query to search or find similar language ''' |
st.markdown(multi) |
multi = '''3. Press Index.''' |
st.markdown(multi) |
multi = ''' |
** Attempt is made for appropriate page and passage retrieval ** \n |
''' |
st.markdown(multi) |
list_docs = [] |
list_save_path = [] |
uploaded_files = st.file_uploader("Choose file(s)", accept_multiple_files=True) |
print('uploaded_files ', uploaded_files) |
single_example_passage = st.text_area('Enter Query Here',"What is Governing Law ") |
submitted = st.form_submit_button("Index and Calculate") |
if submitted and (uploaded_files is not None): |
list_docs, list_save_path = util_upload_file_and_return_list_docs(uploaded_files) |
passage_documents = util_get_list_page_and_passage(list_docs, list_save_path) |
bm25_retriever = BM25Retriever.from_documents(passage_documents) |
bm25_retriever.k = 2 |
chroma_vectorstore = Chroma.from_documents(passage_documents, embedding) |
chroma_retriever = chroma_vectorstore.as_retriever(search_kwargs={"k": 2}) |
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, chroma_retriever],weights=[0.25, 0.75]) |
passage_nodes = ensemble_retriever.get_relevant_documents(single_example_passage) |
print('len(passage_nodes):', len(passage_nodes)) |
df = util_get_list_pageno_and_contents(single_example_passage,passage_documents,passage_nodes) |
st.write(df) |