# if __name__ == "__main__": # main() import streamlit as st import google.generativeai as genai from typing import Dict, Any import PyPDF2 import io from pymongo import MongoClient from dotenv import load_dotenv import os import json import re import requests # -------------------------------------------------------------------------------- # 1. Environment Setup # -------------------------------------------------------------------------------- load_dotenv() # MongoDB MONGODB_URI = os.getenv( "MONGODB_UR", "mongodb+srv://milind:05july60@cluster0.d6mld.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0", ) PERPLEXITY_API_KEY = os.getenv("PERPLEXITY_API_KEY") PERPLEXITY_API_URL = "https://api.perplexity.ai/chat/completions" # Gemini GEMINI_KEY = os.getenv("GEMINI_KEY", "AIzaSyCFIvntck54HOCS5pxxiy9wpr5HJN3r02I") # Configure Gemini genai.configure(api_key=GEMINI_KEY) def call_perplexity_api(prompt: str) -> str: """ Call Perplexity AI with a prompt, returning the text response if successful. """ headers = { "Authorization": f"Bearer {PERPLEXITY_API_KEY}", "Content-Type": "application/json", } payload = { "model": "llama-3.1-sonar-small-128k-chat", "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, } try: response = requests.post(PERPLEXITY_API_URL, headers=headers, json=payload) response.raise_for_status() return response.json()["choices"][0]["message"]["content"] except Exception as e: st.error(f"Perplexity API Error: {str(e)}") return "" def get_perplexity_response(prompt: str) -> str: """ Wrapper that calls call_perplexity_api, mimicking the old gemini function name signature. """ return call_perplexity_api(prompt) # -------------------------------------------------------------------------------- # 2. Database Connection # -------------------------------------------------------------------------------- def create_db_connection(): """ Create MongoDB connection and return the 'papers' collection. """ try: client = MongoClient(MONGODB_URI) db = client["novascholar_db"] # Database name collection = db["research_papers"] # Collection name # Ping to confirm connection client.admin.command("ping") return db except Exception as e: st.error(f"Database connection error: {str(e)}") return None # -------------------------------------------------------------------------------- # 3. PDF Text Extraction # -------------------------------------------------------------------------------- def extract_text_from_pdf(pdf_file) -> str: """ Extract all text from a PDF. """ try: pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" return text except Exception as e: st.error(f"Error processing PDF: {str(e)}") return "" # -------------------------------------------------------------------------------- # 4. Gemini Response Helper # -------------------------------------------------------------------------------- def get_perplexity_response(prompt: str) -> str: """ Sends a prompt to Google's Gemini model and returns the response text. Adjust this function as needed for your generative AI usage. """ try: model = genai.GenerativeModel("gemini-1.5-pro") response = model.generate_content(prompt) return response.text except Exception as e: st.error(f"Gemini API Error: {str(e)}") return "" # -------------------------------------------------------------------------------- # 5. Basic Info Extraction # -------------------------------------------------------------------------------- def extract_basic_info(text: str) -> Dict[str, str]: """ Extract title, publication, journal/conference, abstract, keywords, author, and date from the paper text. Return a dictionary with these fields. """ prompt = f""" Extract the following fields from the research paper text below: Title Publication Journal_Conference Abstract Keywords Author Date_of_Publication Paper text: {text} Return them in this format: Title: ... Publication: ... Journal_Conference: ... Abstract: ... Keywords: ... Author: ... Date_of_Publication: ... """ response = get_perplexity_response(prompt) if not response: return {} info = {} lines = response.split("\n") for line in lines: if ":" in line: key, value = line.split(":", 1) info[key.strip()] = value.strip() return info # -------------------------------------------------------------------------------- # 6. Content Sections Extraction # -------------------------------------------------------------------------------- def extract_content_sections(text: str) -> Dict[str, str]: """ Extract expanded sections: Intro, Literature_Review, Research_Models_Used, Methodology, Discussion, Future_Scope, Theory. """ prompt = f"""Please extract these sections from the research paper: 1. Introduction 2. Literature Review 3. Research Models Used 4. Methodology 5. Discussion 6. Future Scope 7. Theory Paper text: {text} Return in this exact format without any additional text or explanations also make sure no data should be empty (at least 10-15 words) and it should be meaningful: Intro: Literature_Review: Research_Models_Used: Methodology: Discussion: Future_Scope: Theory: """ response = get_perplexity_response(prompt) if not response: return {} sections = {} lines = response.split("\n") for line in lines: if ":" in line: key, value = line.split(":", 1) sections[key.strip()] = value.strip() return sections # -------------------------------------------------------------------------------- # 7. Variables Extraction # -------------------------------------------------------------------------------- def extract_variables(text: str) -> Dict[str, Any]: """ Extract variable data: Independent_Variables, nof_Independent_Variables, Dependent_Variables, nof_Dependent_Variables, Control_Variables, Extraneous_Variables, nof_Control_Variables, nof_Extraneous_Variables """ prompt = f"""From the paper text, extract the following fields: 1. Independent_Variables 2. nof_Independent_Variables 3. Dependent_Variables 4. nof_Dependent_Variables 5. Control_Variables 6. Extraneous_Variables 7. nof_Control_Variables 8. nof_Extraneous_Variables Return them in this format: Independent_Variables: nof_Independent_Variables: Dependent_Variables: nof_Dependent_Variables: Control_Variables: Extraneous_Variables: nof_Control_Variables: nof_Extraneous_Variables: Paper text: {text} """ response = get_perplexity_response(prompt) if not response: return {} variables = {} lines = response.split("\n") for line in lines: if ":" in line: key, value = line.split(":", 1) # Attempt to convert to integer where appropriate clean_key = key.strip() clean_value = value.strip() if clean_key.startswith("nof_"): try: variables[clean_key] = int(clean_value) except ValueError: # fallback if it's not an integer variables[clean_key] = 0 else: variables[clean_key] = clean_value return variables # -------------------------------------------------------------------------------- # 8. Utility to ensure no empty fields (example logic) # -------------------------------------------------------------------------------- def ensure_non_empty_values(data: Dict[str, Any], fallback_text: str) -> Dict[str, Any]: """ Ensure each extracted field has meaningful content. If empty, fill with default text. """ for k, v in data.items(): if not v or len(str(v).split()) < 3: # example check for minimal words data[k] = f"No sufficient data found for {k}. Could not parse." return data # -------------------------------------------------------------------------------- # 9. Processing the Paper # -------------------------------------------------------------------------------- # def process_paper(text: str) -> Dict[str, Any]: # """ # Orchestrate calls to extract basic info, content sections, and variables. # Return a dictionary containing all the fields with consistent naming. # """ # with st.spinner("Extracting basic information..."): # basic_info = extract_basic_info(text) # basic_info = ensure_non_empty_values(basic_info, text) # with st.spinner("Extracting content sections..."): # content_sections = extract_content_sections(text) # content_sections = ensure_non_empty_values(content_sections, text) # with st.spinner("Extracting variables..."): # variables_info = extract_variables(text) # variables_info = ensure_non_empty_values(variables_info, text) # # Create a single dictionary with all fields # paper_doc = { # "Title": basic_info.get("Title", ""), # "Publication": basic_info.get("Publication", ""), # "Journal_Conference": basic_info.get("Journal_Conference", ""), # "Abstract": basic_info.get("Abstract", ""), # "Keywords": basic_info.get("Keywords", ""), # "Author": basic_info.get("Author", ""), # "Date_of_Publication": basic_info.get("Date_of_Publication", ""), # "Intro": content_sections.get("Intro", ""), # "Literature_Review": content_sections.get("Literature_Review", ""), # "Research_Models_Used": content_sections.get("Research_Models_Used", ""), # "Methodology": content_sections.get("Methodology", ""), # "Discussion": content_sections.get("Discussion", ""), # "Future_Scope": content_sections.get("Future_Scope", ""), # "Theory": content_sections.get("Theory", ""), # "Independent_Variables": variables_info.get("Independent_Variables", ""), # "nof_Independent_Variables": variables_info.get("nof_Independent_Variables", 0), # "Dependent_Variables": variables_info.get("Dependent_Variables", ""), # "nof_Dependent_Variables": variables_info.get("nof_Dependent_Variables", 0), # "Control_Variables": variables_info.get("Control_Variables", ""), # "Extraneous_Variables": variables_info.get("Extraneous_Variables", ""), # "nof_Control_Variables": variables_info.get("nof_Control_Variables", 0), # "nof_Extraneous_Variables": variables_info.get("nof_Extraneous_Variables", 0), # } # return paper_doc # filepath: /c:/Users/acer/OneDrive/Documents/GitHub/res-cor/research22.py # ...existing code continues... # -------------------------------------------------------------------------------- # 3. Paper Type Attributes # -------------------------------------------------------------------------------- PAPER_TYPE_ATTRIBUTES = { "Review Based Paper": [ "Title", "Publication", "Journal_Conference", "Abstract", "Keywords", "Author", "Date_of_Publication", "Intro", "Literature_Review", "Body", "Protocol", "Search String", "Included Studies", "Data Collection and Analysis Methods", "Data Extraction Table", "Synthesis and Analysis", "Conclusion", "Limitations", "Results", "References", "Risk of Bias Assessment", ], "Opinion/Perspective Based Paper": [ "Title", "Publication", "Journal_Conference", "Abstract", "Keywords", "Author", "Date_of_Publication", "Intro", "Literature_Review", "Introduction", "Body", "Results and Discussion", "Conclusion", "References", ], "Empirical Research Paper": [ "Title", "Publication", "Journal_Conference", "Abstract", "Keywords", "Author", "Date_of_Publication", "Intro", "Literature_Review", "Introduction", "Body", "Methodology", "Participants", "Survey Instrument", "Data Collection", "Data Analysis", "Results and Discussion", "Conclusion", "References", ], "Research Paper (Other)": [ "Title", "Publication", "Journal_Conference", "Abstract", "Keywords", "Author", "Date_of_Publication", "Intro", "Literature_Review", "Research_Models_Used", "Methodology", "Discussion", "Future_Scope", "Theory", "Independent_Variables", "nof_Independent_Variables", "Dependent_Variables", "nof_Dependent_Variables", "Control_Variables", "Extraneous_Variables", "nof_Control_Variables", "nof_Extraneous_Variables", ], } # -------------------------------------------------------------------------------- # 4. Extract Paper Fields # -------------------------------------------------------------------------------- def extract_paper_fields(text: str, paper_type: str) -> Dict[str, Any]: """ Use Gemini to extract fields based on the paper type attributes, then return a dictionary of extracted fields. """ if paper_type not in PAPER_TYPE_ATTRIBUTES: st.error("Invalid paper type selected.") return {} selected_attrs = PAPER_TYPE_ATTRIBUTES[paper_type] prompt = f""" Extract the following fields from the research paper text below: {", ".join(selected_attrs)} Paper text: {text} Return them in this JSON format strictly, with no extra text, and strictly don't start the JSON with a newline or markdown and don't have Unterminated string: [ {{ {", ".join([f'"{attr}": "value"' for attr in selected_attrs])} }} ] """ try: response = get_perplexity_response(prompt) if not response: st.error("No response from Gemini.") return {} # Clean up any text around JSON # Clean up any text around JSON raw_text = response.strip() # Find start and end of JSON json_start = raw_text.find("[") json_end = raw_text.rfind("]") + 1 json_str = raw_text[json_start:json_end] # Try removing trailing commas, extra quotes, etc. json_str = re.sub(r",\s*}", "}", json_str) json_str = re.sub(r",\s*\]", "]", json_str) try: data = json.loads(json_str) except json.JSONDecodeError as e: st.warning(f"Fixing JSON errors: {str(e)}") # As a last-resort attempt, remove anything after the last curly bracket bracket_pos = json_str.rfind("}") if bracket_pos != -1: json_str = json_str[: bracket_pos + 1] # Try again data = json.loads(json_str) if isinstance(data, list) and len(data) > 0: return data[0] else: st.error("Gemini did not return a valid JSON array.") return {} except Exception as e: st.error(f"Error in Gemini extraction: {str(e)}") return {} # -------------------------------------------------------------------------------- # 5. Process Paper and Save # -------------------------------------------------------------------------------- def process_paper(text: str, paper_type: str): """ Extract paper fields based on paper type, then save to the corresponding MongoDB collection. """ db = create_db_connection() if db is None: return # Determine collection name collection_name = paper_type.replace(" ", "_").lower() collection = db[collection_name] # Extract fields extracted_data = extract_paper_fields(text, paper_type) if extracted_data: # Insert into MongoDB collection.insert_one(extracted_data) return extracted_data return {} # -------------------------------------------------------------------------------- # 6. Streamlit UI for Paper Extraction # -------------------------------------------------------------------------------- def main(): # st.set_page_config(page_title="Extract Research Paper", layout="wide") st.title("Extract Research Paper") paper_type = st.selectbox( "Select type of research paper:", [ "Review Based Paper", "Opinion/Perspective Based Paper", "Empirical Research Paper", "Research Paper (Other)", ], ) uploaded_file = st.file_uploader("Upload a PDF or text file", type=["pdf", "txt"]) if st.button("Extract & Save") and uploaded_file: try: # Read file content if uploaded_file.type == "application/pdf": pdf_reader = PyPDF2.PdfReader(uploaded_file) text_content = "" for page in pdf_reader.pages: text_content += page.extract_text() else: text_content = uploaded_file.read().decode("utf-8", errors="replace") with st.spinner("Extracting fields..."): data = process_paper(text_content, paper_type) if data: st.success( f"Paper extracted and saved to MongoDB in '{paper_type}' collection!" ) st.write("Extracted fields:") st.json(data) except Exception as e: st.error(f"An error occurred: {str(e)}") # ...existing code (if any)... if __name__ == "__main__": main()