|
import streamlit as st
|
|
from groq import Groq
|
|
from PyPDF2 import PdfReader
|
|
from docx import Document
|
|
from tiktoken import get_encoding, Encoding
|
|
import concurrent.futures
|
|
import matplotlib.pyplot as plt
|
|
import io
|
|
import base64
|
|
import os
|
|
|
|
|
|
client = Groq(api_key="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")
|
|
|
|
def extract_text_from_pdf(file):
|
|
reader = PdfReader(file)
|
|
text = ""
|
|
for page in reader.pages:
|
|
text += page.extract_text()
|
|
return text
|
|
|
|
def extract_text_from_docx(file):
|
|
doc = Document(file)
|
|
text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
|
|
return text
|
|
|
|
def preprocess_text(text):
|
|
return " ".join(text.replace("\n", " ").replace("\r", " ").split())
|
|
|
|
def get_default_encoding():
|
|
return get_encoding("cl100k_base")
|
|
|
|
def split_into_chunks(text, token_limit=5500):
|
|
encoding = get_default_encoding()
|
|
words = text.split()
|
|
chunks = []
|
|
current_chunk = []
|
|
current_tokens = 0
|
|
|
|
for word in words:
|
|
word_tokens = len(encoding.encode(word + " "))
|
|
if current_tokens + word_tokens > token_limit:
|
|
chunks.append(" ".join(current_chunk))
|
|
current_chunk = [word]
|
|
current_tokens = word_tokens
|
|
else:
|
|
current_chunk.append(word)
|
|
current_tokens += word_tokens
|
|
|
|
if current_chunk:
|
|
chunks.append(" ".join(current_chunk))
|
|
return chunks
|
|
|
|
def summarize_text(text):
|
|
try:
|
|
response = client.chat.completions.create(
|
|
messages=[{
|
|
"role": "user",
|
|
"content": f"Summarize the following legal document in a concise manner: {text}"
|
|
}],
|
|
model="llama-3.1-8b-instant",
|
|
stream=False
|
|
)
|
|
if response and response.choices:
|
|
return response.choices[0].message.content
|
|
else:
|
|
return "Error: Received an empty or invalid response from Groq API."
|
|
except Exception as e:
|
|
return f"Error generating summary: {e}"
|
|
|
|
def summarize_large_text(text, chunk_limit=5000):
|
|
chunks = split_into_chunks(text, token_limit=chunk_limit)
|
|
summaries = []
|
|
for chunk in chunks:
|
|
summaries.append(summarize_text(chunk))
|
|
return " ".join(summaries)
|
|
|
|
def detect_key_clauses(text):
|
|
key_clauses = [
|
|
{"clause": "confidentiality", "summary": "Confidentiality clauses ensure that sensitive information remains protected."},
|
|
{"clause": "liability", "summary": "Liability clauses outline the responsibility for damages or losses incurred."},
|
|
{"clause": "termination", "summary": "Termination clauses specify the conditions under which a contract may be ended."},
|
|
{"clause": "force majeure", "summary": "Force majeure clauses excuse parties from performance obligations due to unforeseen events."},
|
|
{"clause": "governing law", "summary": "Governing law clauses specify which jurisdiction's laws will govern the contract."},
|
|
{"clause": "dispute resolution", "summary": "Dispute resolution clauses specify how conflicts between parties will be resolved."},
|
|
{"clause": "amendment", "summary": "Amendment clauses outline the process for changing the terms of the contract."},
|
|
{"clause": "warranty", "summary": "Warranty clauses provide assurances regarding the quality or condition of goods or services."},
|
|
]
|
|
|
|
detected_clauses = []
|
|
for clause in key_clauses:
|
|
if clause["clause"].lower() in text.lower():
|
|
clause_start = text.lower().find(clause["clause"].lower())
|
|
context = text[clause_start - 50: clause_start + 200]
|
|
explanation = f"The document mentions '{clause['clause']}' clause. Context: {context.strip()}..."
|
|
detected_clauses.append({
|
|
"clause": clause["clause"].capitalize(),
|
|
"summary": clause["summary"],
|
|
"explanation": explanation
|
|
})
|
|
|
|
return detected_clauses
|
|
|
|
def detect_hidden_obligations_or_dependencies(text, summary):
|
|
hidden_obligations = [
|
|
{"phrase": "dependent upon", "summary": "This suggests that some action is conditional upon another."},
|
|
{"phrase": "if", "summary": "This indicates that certain conditions must be met to fulfill the obligation."},
|
|
{"phrase": "may be required", "summary": "Implies that the party could be obligated to perform an action under specific conditions."},
|
|
{"phrase": "should", "summary": "Implies a recommendation or requirement, though not explicitly mandatory."},
|
|
{"phrase": "obligated to", "summary": "Indicates a clear, binding duty to perform an action."},
|
|
]
|
|
|
|
hidden_dependencies = []
|
|
|
|
for item in hidden_obligations:
|
|
if item["phrase"].lower() in text.lower() or item["phrase"].lower() in summary.lower():
|
|
phrase_start = text.lower().find(item["phrase"].lower())
|
|
context = text[phrase_start - 50: phrase_start + 200]
|
|
hidden_dependencies.append({
|
|
"phrase": item["phrase"],
|
|
"summary": item["summary"],
|
|
"context": context.strip()
|
|
})
|
|
|
|
return hidden_dependencies
|
|
|
|
def detect_risks(text, summary):
|
|
risk_phrases = [
|
|
{"phrase": "penalty", "summary": "Penalty clauses may impose financial or legal consequences on the parties involved."},
|
|
{"phrase": "liability", "summary": "Liability clauses may indicate potential financial responsibility or legal risks."},
|
|
{"phrase": "default", "summary": "Default clauses can expose parties to consequences for failure to perform obligations."},
|
|
{"phrase": "breach", "summary": "Breach of contract can lead to serious legal consequences including financial penalties."},
|
|
{"phrase": "suspension", "summary": "Suspension clauses may indicate risks of halting services or operations in case of non-compliance."},
|
|
]
|
|
|
|
detected_risks = []
|
|
|
|
for item in risk_phrases:
|
|
if item["phrase"].lower() in text.lower() or item["phrase"].lower() in summary.lower():
|
|
phrase_start = text.lower().find(item["phrase"].lower())
|
|
context = text[phrase_start - 50: phrase_start + 200]
|
|
detected_risks.append({
|
|
"phrase": item["phrase"],
|
|
"summary": item["summary"],
|
|
"context": context.strip()
|
|
})
|
|
|
|
return detected_risks
|
|
|
|
def plot_risk_pie_chart(detected_clauses, hidden_obligations, detected_risks):
|
|
|
|
num_clauses = len(detected_clauses)
|
|
num_obligations = len(hidden_obligations)
|
|
num_risks = len(detected_risks)
|
|
|
|
|
|
labels = ['Detected Key Clauses', 'Hidden Obligations or Dependencies', 'Detected Risks']
|
|
sizes = [num_clauses, num_obligations, num_risks]
|
|
colors = ['#ff9999','#66b3ff','#99ff99']
|
|
|
|
fig, ax = plt.subplots()
|
|
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90, colors=colors, wedgeprops={'edgecolor': 'black'})
|
|
ax.axis('equal')
|
|
|
|
|
|
buf = io.BytesIO()
|
|
plt.savefig(buf, format="png")
|
|
buf.seek(0)
|
|
|
|
|
|
img_str = base64.b64encode(buf.read()).decode('utf-8')
|
|
buf.close()
|
|
|
|
return img_str
|
|
|
|
def generate_analysis_document(document_text, summary, detected_clauses, hidden_obligations, detected_risks):
|
|
doc = Document()
|
|
doc.add_heading('Legal Document Analysis', level=1)
|
|
|
|
doc.add_heading('Extracted Document Text', level=2)
|
|
doc.add_paragraph(document_text)
|
|
|
|
doc.add_heading('Summary', level=2)
|
|
doc.add_paragraph(summary)
|
|
|
|
doc.add_heading('Key Clauses', level=2)
|
|
if detected_clauses:
|
|
for clause in detected_clauses:
|
|
doc.add_paragraph(f"Clause: {clause['clause']}")
|
|
doc.add_paragraph(f"Summary: {clause['summary']}")
|
|
doc.add_paragraph(f"Explanation: {clause['explanation']}")
|
|
else:
|
|
doc.add_paragraph("No key clauses detected.")
|
|
|
|
doc.add_heading('Hidden Obligations or Dependencies', level=2)
|
|
if hidden_obligations:
|
|
for obligation in hidden_obligations:
|
|
doc.add_paragraph(f"Phrase: {obligation['phrase']}")
|
|
doc.add_paragraph(f"Summary: {obligation['summary']}")
|
|
doc.add_paragraph(f"Context: {obligation['context']}")
|
|
else:
|
|
doc.add_paragraph("No hidden obligations detected.")
|
|
|
|
doc.add_heading('Risks', level=2)
|
|
if detected_risks:
|
|
for risk in detected_risks:
|
|
doc.add_paragraph(f"Risk Phrase: {risk['phrase']}")
|
|
doc.add_paragraph(f"Summary: {risk['summary']}")
|
|
doc.add_paragraph(f"Context: {risk['context']}")
|
|
else:
|
|
doc.add_paragraph("No risks detected.")
|
|
|
|
return doc
|
|
|
|
def display_legal_analysis_page():
|
|
st.title("Legal Document Analysis with Groq API")
|
|
|
|
uploaded_file = st.file_uploader("Upload your legal document (PDF or DOCX)", type=["pdf", "docx"])
|
|
if uploaded_file:
|
|
if uploaded_file.name.endswith(".pdf"):
|
|
document_text = preprocess_text(extract_text_from_pdf(uploaded_file))
|
|
elif uploaded_file.name.endswith(".docx"):
|
|
document_text = preprocess_text(extract_text_from_docx(uploaded_file))
|
|
else:
|
|
st.error("Unsupported file type!")
|
|
return
|
|
|
|
tabs = st.tabs(["Document Text", "Summary", "Key Clauses", "Hidden Obligations or Dependencies", "Risk Analysis"])
|
|
|
|
|
|
with tabs[0]:
|
|
st.subheader("Extracted Legal Document Text")
|
|
st.text_area("Document Text", document_text, height=300)
|
|
|
|
with tabs[1]:
|
|
st.subheader("Quick Summary")
|
|
summary = summarize_large_text(document_text)
|
|
if "Error" in summary:
|
|
st.warning("Summary generation failed.")
|
|
summary = "Summary not available."
|
|
st.write(summary)
|
|
|
|
with tabs[2]:
|
|
st.subheader("Detected Key Clauses")
|
|
|
|
detected_clauses = detect_key_clauses(document_text)
|
|
if not detected_clauses:
|
|
st.write("No key clauses detected.")
|
|
else:
|
|
|
|
clause_counts = {}
|
|
for clause in detected_clauses:
|
|
clause_counts[clause['clause']] = clause_counts.get(clause['clause'], 0) + 1
|
|
|
|
|
|
if clause_counts:
|
|
labels = list(clause_counts.keys())
|
|
values = list(clause_counts.values())
|
|
|
|
fig, ax = plt.subplots()
|
|
ax.bar(labels, values, color='skyblue')
|
|
|
|
|
|
plt.xticks(rotation=45, ha='right')
|
|
|
|
|
|
ax.set_title("Detected Key Clauses Visualization")
|
|
ax.set_xlabel("Clause")
|
|
ax.set_ylabel("Count")
|
|
|
|
|
|
st.pyplot(fig)
|
|
|
|
|
|
for clause in detected_clauses:
|
|
if st.button(f"Show Explanation for {clause['clause']} Clause"):
|
|
st.write(f"**Clause: {clause['clause']}**")
|
|
st.write(f"Summary: {clause['summary']}\nExplanation: {clause['explanation']}")
|
|
|
|
with tabs[3]:
|
|
st.subheader("Detected Hidden Obligations or Dependencies")
|
|
hidden_obligations = detect_hidden_obligations_or_dependencies(document_text, summary)
|
|
if not hidden_obligations:
|
|
st.write("No hidden obligations or dependencies detected.")
|
|
else:
|
|
for item in hidden_obligations:
|
|
st.write(f"**Phrase: {item['phrase']}**")
|
|
st.write(f"Summary: {item['summary']}\nContext: {item['context']}")
|
|
|
|
with tabs[4]:
|
|
st.subheader("Risk Analysis & Visualization")
|
|
|
|
detected_clauses = detect_key_clauses(document_text)
|
|
hidden_obligations = detect_hidden_obligations_or_dependencies(document_text, summary)
|
|
detected_risks = detect_risks(document_text, summary)
|
|
|
|
|
|
img_str = plot_risk_pie_chart(detected_clauses, hidden_obligations, detected_risks)
|
|
st.image(f"data:image/png;base64,{img_str}", use_column_width=True)
|
|
|
|
|
|
st.write("### Detected Risks:")
|
|
if detected_risks:
|
|
for risk in detected_risks:
|
|
st.write(f"**{risk['phrase']}**: {risk['summary']}")
|
|
|
|
|
|
st.write("### Detected Key Clauses:")
|
|
for clause in detected_clauses:
|
|
st.write(f"**{clause['clause']}**: {clause['explanation']}")
|
|
|
|
st.write("### Hidden Obligations or Dependencies:")
|
|
for obligation in hidden_obligations:
|
|
st.write(f"**{obligation['phrase']}**: {obligation['summary']}")
|
|
|
|
|
|
analysis_doc = generate_analysis_document(document_text, summary, detected_clauses, hidden_obligations, detected_risks)
|
|
|
|
with st.expander("Download Analysis"):
|
|
output_path = "analysis_report.docx"
|
|
analysis_doc.save(output_path)
|
|
|
|
with open(output_path, "rb") as f:
|
|
st.download_button("Download Analysis", data=f, file_name="analysis_report.docx", mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
display_legal_analysis_page()
|
|
|