|
|
|
|
|
import streamlit as st |
|
import google.generativeai as genai |
|
from typing import Dict, Any |
|
import PyPDF2 |
|
import io |
|
from pymongo import MongoClient |
|
from dotenv import load_dotenv |
|
import os |
|
import json |
|
import re |
|
import requests |
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
MONGODB_URI = os.getenv( |
|
"MONGODB_UR", |
|
"mongodb+srv://milind:[email protected]/?retryWrites=true&w=majority&appName=Cluster0", |
|
) |
|
PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_API_KEY") |
|
PERPLEXITY_API_URL = "https://api.perplexity.ai/chat/completions" |
|
|
|
GEMINI_KEY = os.getenv("GEMINI_KEY", "AIzaSyCFIvntck54HOCS5pxxiy9wpr5HJN3r02I") |
|
|
|
|
|
genai.configure(api_key=GEMINI_KEY) |
|
|
|
|
|
def call_perplexity_api(prompt: str) -> str: |
|
""" |
|
Call Perplexity AI with a prompt, returning the text response if successful. |
|
""" |
|
headers = { |
|
"Authorization": f"Bearer {PERPLEXITY_API_KEY}", |
|
"Content-Type": "application/json", |
|
} |
|
payload = { |
|
"model": "llama-3.1-sonar-small-128k-chat", |
|
"messages": [{"role": "user", "content": prompt}], |
|
"temperature": 0.3, |
|
} |
|
|
|
try: |
|
response = requests.post(PERPLEXITY_API_URL, headers=headers, json=payload) |
|
response.raise_for_status() |
|
return response.json()["choices"][0]["message"]["content"] |
|
except Exception as e: |
|
st.error(f"Perplexity API Error: {str(e)}") |
|
return "" |
|
|
|
|
|
def get_perplexity_response(prompt: str) -> str: |
|
""" |
|
Wrapper that calls call_perplexity_api, mimicking the old gemini function name signature. |
|
""" |
|
return call_perplexity_api(prompt) |
|
|
|
|
|
|
|
|
|
|
|
def create_db_connection(): |
|
""" |
|
Create MongoDB connection and return the 'papers' collection. |
|
""" |
|
try: |
|
client = MongoClient(MONGODB_URI) |
|
db = client["novascholar_db"] |
|
collection = db["research_papers"] |
|
|
|
client.admin.command("ping") |
|
return db |
|
except Exception as e: |
|
st.error(f"Database connection error: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(pdf_file) -> str: |
|
""" |
|
Extract all text from a PDF. |
|
""" |
|
try: |
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
text = "" |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() + "\n" |
|
return text |
|
except Exception as e: |
|
st.error(f"Error processing PDF: {str(e)}") |
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def get_perplexity_response(prompt: str) -> str: |
|
""" |
|
Sends a prompt to Google's Gemini model and returns the response text. |
|
Adjust this function as needed for your generative AI usage. |
|
""" |
|
try: |
|
model = genai.GenerativeModel("gemini-1.5-pro") |
|
response = model.generate_content(prompt) |
|
return response.text |
|
except Exception as e: |
|
st.error(f"Gemini API Error: {str(e)}") |
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
def extract_basic_info(text: str) -> Dict[str, str]: |
|
""" |
|
Extract title, publication, journal/conference, abstract, keywords, author, and date from the paper text. |
|
Return a dictionary with these fields. |
|
""" |
|
prompt = f""" |
|
Extract the following fields from the research paper text below: |
|
|
|
Title |
|
Publication |
|
Journal_Conference |
|
Abstract |
|
Keywords |
|
Author |
|
Date_of_Publication |
|
|
|
Paper text: |
|
{text} |
|
|
|
Return them in this format: |
|
Title: ... |
|
Publication: ... |
|
Journal_Conference: ... |
|
Abstract: ... |
|
Keywords: ... |
|
Author: ... |
|
Date_of_Publication: ... |
|
""" |
|
response = get_perplexity_response(prompt) |
|
if not response: |
|
return {} |
|
info = {} |
|
lines = response.split("\n") |
|
for line in lines: |
|
if ":" in line: |
|
key, value = line.split(":", 1) |
|
info[key.strip()] = value.strip() |
|
return info |
|
|
|
|
|
|
|
|
|
|
|
def extract_content_sections(text: str) -> Dict[str, str]: |
|
""" |
|
Extract expanded sections: Intro, Literature_Review, Research_Models_Used, |
|
Methodology, Discussion, Future_Scope, Theory. |
|
""" |
|
prompt = f"""Please extract these sections from the research paper: |
|
1. Introduction |
|
2. Literature Review |
|
3. Research Models Used |
|
4. Methodology |
|
5. Discussion |
|
6. Future Scope |
|
7. Theory |
|
|
|
Paper text: {text} |
|
|
|
Return in this exact format without any additional text or explanations also make sure |
|
no data should be empty (at least 10-15 words) and it should be meaningful: |
|
Intro: <text> |
|
Literature_Review: <text> |
|
Research_Models_Used: <text> |
|
Methodology: <text> |
|
Discussion: <text> |
|
Future_Scope: <text> |
|
Theory: <text> |
|
""" |
|
response = get_perplexity_response(prompt) |
|
if not response: |
|
return {} |
|
sections = {} |
|
lines = response.split("\n") |
|
for line in lines: |
|
if ":" in line: |
|
key, value = line.split(":", 1) |
|
sections[key.strip()] = value.strip() |
|
return sections |
|
|
|
|
|
|
|
|
|
|
|
def extract_variables(text: str) -> Dict[str, Any]: |
|
""" |
|
Extract variable data: Independent_Variables, nof_Independent_Variables, |
|
Dependent_Variables, nof_Dependent_Variables, Control_Variables, |
|
Extraneous_Variables, nof_Control_Variables, nof_Extraneous_Variables |
|
""" |
|
prompt = f"""From the paper text, extract the following fields: |
|
1. Independent_Variables |
|
2. nof_Independent_Variables |
|
3. Dependent_Variables |
|
4. nof_Dependent_Variables |
|
5. Control_Variables |
|
6. Extraneous_Variables |
|
7. nof_Control_Variables |
|
8. nof_Extraneous_Variables |
|
|
|
Return them in this format: |
|
Independent_Variables: <list> |
|
nof_Independent_Variables: <integer> |
|
Dependent_Variables: <list> |
|
nof_Dependent_Variables: <integer> |
|
Control_Variables: <list> |
|
Extraneous_Variables: <list> |
|
nof_Control_Variables: <integer> |
|
nof_Extraneous_Variables: <integer> |
|
|
|
Paper text: {text} |
|
""" |
|
response = get_perplexity_response(prompt) |
|
if not response: |
|
return {} |
|
variables = {} |
|
lines = response.split("\n") |
|
for line in lines: |
|
if ":" in line: |
|
key, value = line.split(":", 1) |
|
|
|
clean_key = key.strip() |
|
clean_value = value.strip() |
|
if clean_key.startswith("nof_"): |
|
try: |
|
variables[clean_key] = int(clean_value) |
|
except ValueError: |
|
|
|
variables[clean_key] = 0 |
|
else: |
|
variables[clean_key] = clean_value |
|
return variables |
|
|
|
|
|
|
|
|
|
|
|
def ensure_non_empty_values(data: Dict[str, Any], fallback_text: str) -> Dict[str, Any]: |
|
""" |
|
Ensure each extracted field has meaningful content. If empty, fill with default text. |
|
""" |
|
for k, v in data.items(): |
|
if not v or len(str(v).split()) < 3: |
|
data[k] = f"No sufficient data found for {k}. Could not parse." |
|
return data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PAPER_TYPE_ATTRIBUTES = { |
|
"Review Based Paper": [ |
|
"Title", |
|
"Publication", |
|
"Journal_Conference", |
|
"Abstract", |
|
"Keywords", |
|
"Author", |
|
"Date_of_Publication", |
|
"Intro", |
|
"Literature_Review", |
|
"Body", |
|
"Protocol", |
|
"Search String", |
|
"Included Studies", |
|
"Data Collection and Analysis Methods", |
|
"Data Extraction Table", |
|
"Synthesis and Analysis", |
|
"Conclusion", |
|
"Limitations", |
|
"Results", |
|
"References", |
|
"Risk of Bias Assessment", |
|
], |
|
"Opinion/Perspective Based Paper": [ |
|
"Title", |
|
"Publication", |
|
"Journal_Conference", |
|
"Abstract", |
|
"Keywords", |
|
"Author", |
|
"Date_of_Publication", |
|
"Intro", |
|
"Literature_Review", |
|
"Introduction", |
|
"Body", |
|
"Results and Discussion", |
|
"Conclusion", |
|
"References", |
|
], |
|
"Empirical Research Paper": [ |
|
"Title", |
|
"Publication", |
|
"Journal_Conference", |
|
"Abstract", |
|
"Keywords", |
|
"Author", |
|
"Date_of_Publication", |
|
"Intro", |
|
"Literature_Review", |
|
"Introduction", |
|
"Body", |
|
"Methodology", |
|
"Participants", |
|
"Survey Instrument", |
|
"Data Collection", |
|
"Data Analysis", |
|
"Results and Discussion", |
|
"Conclusion", |
|
"References", |
|
], |
|
"Research Paper (Other)": [ |
|
"Title", |
|
"Publication", |
|
"Journal_Conference", |
|
"Abstract", |
|
"Keywords", |
|
"Author", |
|
"Date_of_Publication", |
|
"Intro", |
|
"Literature_Review", |
|
"Research_Models_Used", |
|
"Methodology", |
|
"Discussion", |
|
"Future_Scope", |
|
"Theory", |
|
"Independent_Variables", |
|
"nof_Independent_Variables", |
|
"Dependent_Variables", |
|
"nof_Dependent_Variables", |
|
"Control_Variables", |
|
"Extraneous_Variables", |
|
"nof_Control_Variables", |
|
"nof_Extraneous_Variables", |
|
], |
|
} |
|
|
|
|
|
|
|
|
|
|
|
def extract_paper_fields(text: str, paper_type: str) -> Dict[str, Any]: |
|
""" |
|
Use Gemini to extract fields based on the paper type attributes, |
|
then return a dictionary of extracted fields. |
|
""" |
|
if paper_type not in PAPER_TYPE_ATTRIBUTES: |
|
st.error("Invalid paper type selected.") |
|
return {} |
|
|
|
selected_attrs = PAPER_TYPE_ATTRIBUTES[paper_type] |
|
prompt = f""" |
|
Extract the following fields from the research paper text below: |
|
|
|
{", ".join(selected_attrs)} |
|
|
|
Paper text: |
|
{text} |
|
|
|
Return them in this JSON format strictly, with no extra text, and strictly don't start the JSON with a newline or markdown and don't have Unterminated string: |
|
[ |
|
{{ |
|
{", ".join([f'"{attr}": "value"' for attr in selected_attrs])} |
|
}} |
|
] |
|
""" |
|
|
|
try: |
|
response = get_perplexity_response(prompt) |
|
if not response: |
|
st.error("No response from Gemini.") |
|
return {} |
|
|
|
|
|
|
|
raw_text = response.strip() |
|
|
|
|
|
json_start = raw_text.find("[") |
|
json_end = raw_text.rfind("]") + 1 |
|
json_str = raw_text[json_start:json_end] |
|
|
|
|
|
json_str = re.sub(r",\s*}", "}", json_str) |
|
json_str = re.sub(r",\s*\]", "]", json_str) |
|
|
|
try: |
|
data = json.loads(json_str) |
|
except json.JSONDecodeError as e: |
|
st.warning(f"Fixing JSON errors: {str(e)}") |
|
|
|
bracket_pos = json_str.rfind("}") |
|
if bracket_pos != -1: |
|
json_str = json_str[: bracket_pos + 1] |
|
|
|
data = json.loads(json_str) |
|
|
|
if isinstance(data, list) and len(data) > 0: |
|
return data[0] |
|
else: |
|
st.error("Gemini did not return a valid JSON array.") |
|
return {} |
|
except Exception as e: |
|
st.error(f"Error in Gemini extraction: {str(e)}") |
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
def process_paper(text: str, paper_type: str): |
|
""" |
|
Extract paper fields based on paper type, then save to |
|
the corresponding MongoDB collection. |
|
""" |
|
db = create_db_connection() |
|
if db is None: |
|
return |
|
|
|
|
|
collection_name = paper_type.replace(" ", "_").lower() |
|
collection = db[collection_name] |
|
|
|
|
|
extracted_data = extract_paper_fields(text, paper_type) |
|
if extracted_data: |
|
|
|
collection.insert_one(extracted_data) |
|
return extracted_data |
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
st.title("Extract Research Paper") |
|
|
|
paper_type = st.selectbox( |
|
"Select type of research paper:", |
|
[ |
|
"Review Based Paper", |
|
"Opinion/Perspective Based Paper", |
|
"Empirical Research Paper", |
|
"Research Paper (Other)", |
|
], |
|
) |
|
|
|
uploaded_file = st.file_uploader("Upload a PDF or text file", type=["pdf", "txt"]) |
|
|
|
if st.button("Extract & Save") and uploaded_file: |
|
try: |
|
|
|
if uploaded_file.type == "application/pdf": |
|
pdf_reader = PyPDF2.PdfReader(uploaded_file) |
|
text_content = "" |
|
for page in pdf_reader.pages: |
|
text_content += page.extract_text() |
|
else: |
|
text_content = uploaded_file.read().decode("utf-8", errors="replace") |
|
|
|
with st.spinner("Extracting fields..."): |
|
data = process_paper(text_content, paper_type) |
|
|
|
if data: |
|
st.success( |
|
f"Paper extracted and saved to MongoDB in '{paper_type}' collection!" |
|
) |
|
st.write("Extracted fields:") |
|
st.json(data) |
|
|
|
except Exception as e: |
|
st.error(f"An error occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|