FreeBibTec2 / app.py
C2MV's picture
Update app.py
f732808 verified
raw
history blame
27.3 kB
import os
import re
import time
import logging
import zipfile
import requests
import bibtexparser
from tqdm import tqdm
from urllib.parse import quote, urlencode
import gradio as gr
from bs4 import BeautifulSoup
import io
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, CancelledError
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
class PaperDownloader:
def __init__(self, output_dir='papers'):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
# Updated download sources
self.download_sources = [
'https://sci-hub.ee/',
'https://sci-hub.st/',
'https://sci-hub.ru/',
'https://sci-hub.ren/',
'https://sci-hub.mksa.top/',
'https://sci-hub.se/',
'https://libgen.rs/scimag/'
]
# Request headers
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
}
self.executor = ThreadPoolExecutor(max_workers=4)
self.download_task = None # Added attribute
self.results_dict = {}
def clean_doi(self, doi):
"""Clean and encode DOI for URL"""
if not isinstance(doi, str):
return None
return quote(doi.strip()) if doi else None
async def fetch_with_headers(self, session, url, timeout=10):
"""Utility method to fetch an URL with headers and timeout"""
try:
async with session.get(url, headers=self.headers, timeout=timeout, allow_redirects=True) as response:
response.raise_for_status()
return await response.text(), response.headers
except Exception as e:
logger.debug(f"Error fetching {url}: {e}")
return None, None
async def fetch_pdf_content(self, session, url, max_redirects=5, max_retries=2, retry_delay=1):
"""Fetch content and validate if response is PDF, following up to max_redirects redirections with retries."""
current_url = url
redirect_count = 0
retry_count = 0
while redirect_count <= max_redirects:
try:
while retry_count <= max_retries:
try:
logger.debug(f"Fetching PDF from {current_url} - Retry {retry_count + 1}")#ADDED
async with session.get(current_url, headers=self.headers, timeout=10, allow_redirects=False) as response:
if response.status in [301, 302, 307, 308]:
current_url = response.headers['Location']
redirect_count += 1
logger.debug(f"Following redirect from {url} to {current_url}")
break # Break out of the retry loop for a redirect
response.raise_for_status()
if 'application/pdf' in response.headers.get('Content-Type', ''):
logger.debug(f"Successfully fetched PDF from {current_url}")#ADDED
return await response.read()
else:
logger.debug(f"Content type not PDF for {current_url}: {response.headers.get('Content-Type', '')}")
return None
except Exception as e:
logger.debug(f"Error getting PDF, retrying ({retry_count}/{max_retries}) from {current_url}: {e}")
retry_count += 1
await asyncio.sleep(retry_delay)
retry_count = 0 # Reset the retry count, in case there's a next redirect attempt
except CancelledError:
logger.info(f"Fetch PDF cancelled from: {url}")
return None
except Exception as e:
logger.debug(f"Error getting PDF from {current_url}: {e}")
return None
logger.debug(f"Too many redirects or retries {url}, not following this link further")
return None
async def download_paper_direct_doi_async(self, session, doi):
"""Attempt to download the pdf from the landing page of the doi"""
if not doi:
return None
try:
doi_url = f"https://doi.org/{self.clean_doi(doi)}"
# First, let's try to download the URL directly in case it is already the pdf.
pdf_content = await self.fetch_pdf_content(session, doi_url)
if pdf_content:
logger.debug(f"Direct DOI resolved to PDF from {doi_url}")
return pdf_content
# If direct DOI link was not a pdf, fetch landing page and extract links
text, headers = await self.fetch_with_headers(session, doi_url, timeout=15)
if not text:
return None
pdf_patterns = [
r'(https?://[^\s<>"]+?\.pdf)',
r'(https?://[^\s<>"]+?download/[^\s<>"]+)',
r'(https?://[^\s<>"]+?\/pdf\/[^\s<>"]+)',
]
pdf_urls = []
for pattern in pdf_patterns:
pdf_urls.extend(re.findall(pattern, text))
# Attempt each pdf url and break when you find a PDF content.
for pdf_url in pdf_urls:
pdf_content = await self.fetch_pdf_content(session, pdf_url)
if pdf_content:
logger.debug(f"Found PDF from: {pdf_url}")
return pdf_content
except Exception as e:
logger.debug(f"Error trying to get the PDF from {doi}: {e}")
return None
async def download_paper_scihub_async(self, session, doi):
"""Improved method to download paper from Sci-Hub using async requests"""
if not doi:
logger.warning("DOI not provided")
return None
for base_url in self.download_sources:
try:
scihub_url = f"{base_url}{self.clean_doi(doi)}"
text, headers = await self.fetch_with_headers(session, scihub_url, timeout=15)
if not text:
continue
# Search for multiple PDF URL patterns
pdf_patterns = [
r'(https?://[^\s<>"]+?\.pdf)',
r'(https?://[^\s<>"]+?download/[^\s<>"]+)',
r'(https?://[^\s<>"]+?\/pdf\/[^\s<>"]+)',
]
pdf_urls = []
for pattern in pdf_patterns:
pdf_urls.extend(re.findall(pattern, text))
# Try downloading from found URLs
for pdf_url in pdf_urls:
try:
pdf_response = await session.get(pdf_url, headers=self.headers, timeout=10)
# Verify if it's a PDF
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Error downloading PDF from {pdf_url}: {e}")
except Exception as e:
logger.debug(f"Error trying to download {doi} from {base_url}: {e}")
return None
async def download_paper_libgen_async(self, session, doi):
"""Download from Libgen, handles the query and the redirection"""
if not doi:
return None
base_url = 'https://libgen.rs/scimag/'
try:
search_url = f"{base_url}?q={self.clean_doi(doi)}"
text, headers = await self.fetch_with_headers(session, search_url, timeout=10)
if not text or "No results" in text:
logger.debug(f"No results for DOI: {doi} on libgen")
return None
soup = BeautifulSoup(text, 'html.parser')
links = soup.select('table.c > tbody > tr:nth-child(2) > td:nth-child(1) > a')
if links:
link = links[0]
pdf_url = link['href']
pdf_response = await session.get(pdf_url, headers=self.headers, allow_redirects=True, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Error trying to download {doi} from libgen: {e}")
return None
async def download_paper_google_scholar_async(self, session, doi):
"""Search google scholar to find an article with the given doi, try to get the pdf"""
if not doi:
return None
try:
query = f'doi:"{doi}"'
params = {'q': query}
url = f'https://scholar.google.com/scholar?{urlencode(params)}'
text, headers = await self.fetch_with_headers(session, url, timeout=10)
if not text:
return None
soup = BeautifulSoup(text, 'html.parser')
# Find any links with [PDF]
links = soup.find_all('a', string=re.compile(r'\[PDF\]', re.IGNORECASE))
if links:
pdf_url = links[0]['href']
pdf_response = await session.get(pdf_url, headers=self.headers, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Google Scholar error for {doi}: {e}")
return None
async def download_paper_crossref_async(self, session, doi):
"""Alternative search method using Crossref"""
if not doi:
return None
try:
# Search for open access link
url = f"https://api.crossref.org/works/{doi}"
response = await session.get(url, headers=self.headers, timeout=10)
if response.status == 200:
data = await response.json()
work = data.get('message', {})
# Search for open access links
links = work.get('link', [])
for link in links:
if link.get('content-type') == 'application/pdf':
pdf_url = link.get('URL')
if pdf_url:
pdf_response = await session.get(pdf_url, headers=self.headers)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Crossref error for {doi}: {e}")
return None
async def download_with_retry_async(self, doi, max_retries=3, initial_delay=2):
"""Downloads a paper using multiple strategies with exponential backoff and async requests"""
pdf_content = None
retries = 0
delay = initial_delay
async with aiohttp.ClientSession() as session:
while retries < max_retries and not pdf_content:
try:
pdf_content = (
await self.download_paper_direct_doi_async(session, doi) or
await self.download_paper_scihub_async(session, doi) or
await self.download_paper_libgen_async(session, doi) or
await self.download_paper_google_scholar_async(session, doi) or
await self.download_paper_crossref_async(session, doi)
)
if pdf_content:
return pdf_content
except Exception as e:
logger.error(f"Error in download attempt {retries + 1} for DOI {doi}: {e}")
if not pdf_content:
retries += 1
logger.warning(f"Retry attempt {retries} for DOI: {doi} after {delay} seconds")
await asyncio.sleep(delay)
delay *= 2 # Exponential backoff
return None
async def _download_single_doi(self, doi):
"""Descargar un único DOI con retroalimentación de progreso"""
if not doi:
return None, "Error: DOI no proporcionado", "Error: DOI no proporcionado"
logger.info(f"Starting download process for DOI: {doi}")
try:
pdf_content = await self.download_with_retry_async(doi)
if pdf_content:
logger.info(f"Downloaded PDF for DOI: {doi}")
filename = f"{str(doi).replace('/', '_').replace('.', '_')}.pdf"
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'wb') as f:
f.write(pdf_content)
logger.info(f"Saved PDF to file: {filepath}")
logger.info(f"Descarga exitosa: {filename}")
return filepath, f"Descargado exitosamente: <a href='https://doi.org/{doi}'>{doi}</a>", ""
else:
logger.warning(f"No se pudo descargar: {doi}")
return None, f"No se pudo descargar {doi}", f'<a href="https://doi.org/{doi}">{doi}</a>'
except CancelledError:
logger.info(f"Download Cancelled DOI: {doi}")
return None, f"Download cancelled {doi}","Download Cancelled"
except Exception as e:
logger.error(f"Error processing {doi}: {e}")
return None, f"Error processing {doi}: {e}", f"Error processing {doi}: {e}"
async def download_multiple_dois(self, dois_text, cancel_event):
"""Download multiple DOIs"""
if not dois_text:
return None, "Error: No DOIs provided", "Error: No DOIs provided", ""
# Sanitizar y filtrar DOIs
# Eliminar líneas vacías, espacios en blanco, y DOIs duplicados
dois = list(set([doi.strip() for doi in dois_text.split('\n') if doi.strip()]))
# Validar lista de DOIs
if not dois:
return None, "Error: No valid DOIs provided", "Error: No valid DOIs provided", ""
# Listas para rastrear resultados
downloaded_files = [] # Rutas de archivos descargados
failed_dois = [] # DOIs que no se pudieron descargar
downloaded_links = [] # Links de DOIs descargados
for i, doi in enumerate(dois):
result = await self._download_single_doi(doi)
if cancel_event.is_set():
logger.info("Downloads cancelled on multiple dois download")
return None,"Downloads cancelled","Downloads cancelled", ""
if result is None:
continue
if isinstance(result, Exception):
# Excepción inesperada
error_msg = f"Unexpected error: {str(result)}"
logger.error(f"Error downloading {doi}: {error_msg}")
failed_dois.append(f'<a href="https://doi.org/{doi}">{doi}</a> - {error_msg}')
elif result[0] is None:
# Descarga fallida (resultado de download_single_doi_async)
error_msg = result[1]
logger.warning(f"Failed to download {doi}: {error_msg}")
failed_dois.append(f'<a href="https://doi.org/{doi}">{doi}</a> - {error_msg}')
else:
# Descarga exitosa
filepath = result[0]
# Generar nombre de archivo único
filename = f"{str(doi).replace('/', '_').replace('.', '_')}_{i}.pdf" # indent fix.
filepath_unique = os.path.join(self.output_dir, filename)
try:
# Renombrar archivo
os.rename(filepath, filepath_unique) #Fixed ident
# Añadir a lista de archivos descargados
downloaded_files.append(filepath_unique) #Fixed ident
downloaded_links.append(f'<a href="https://doi.org/{doi}">{doi}</a>')#Fixed ident
except Exception as rename_error:
logger.error(f"Error renaming file for {doi}: {rename_error}")
failed_dois.append(f'<a href="https://doi.org/{doi}">{doi}</a> - Error saving file')#Fixed ident
# Crear archivo ZIP si hay archivos descargados
zip_filename = None
if downloaded_files:
zip_filename = 'papers.zip'
loop = asyncio.get_running_loop()
# Ejecutar creación de ZIP en un executor para no bloquear
loop.run_in_executor(
self.executor,
lambda: self.create_zip(zip_filename, downloaded_files)
)
logger.info(f"ZIP file created: {zip_filename}")
return zip_filename if downloaded_files else None, "\n".join(downloaded_links),"\n".join(failed_dois),""
async def process_bibtex(self, bib_file, cancel_event):
"""Process BibTeX file and download papers with multiple strategies and reports UI updates using a callback"""
# Read BibTeX file content from the uploaded object
try:
with open(bib_file.name, 'r', encoding='utf-8') as f:
bib_content = f.read()
except Exception as e:
logger.error(f"Error reading uploaded file {bib_file.name}: {e}")
return None, f"Error reading uploaded file {bib_file.name}: {e}", f"Error reading uploaded file {bib_file.name}: {e}", ""
# Parse BibTeX data
try:
bib_database = bibtexparser.loads(bib_content)
except Exception as e:
logger.error(f"Error parsing BibTeX data: {e}")
return None,f"Error parsing BibTeX data: {e}", f"Error parsing BibTeX data: {e}",""
# Extract DOIs
dois = [entry.get('doi') for entry in bib_database.entries if entry.get('doi')]
logger.info(f"Found {len(dois)} DOIs to download")
# Result lists
downloaded_files = []
failed_dois = []
downloaded_links = []
for i, doi in enumerate(dois):
result = await self._download_single_doi(doi, cancel_event) # now its async directly here
if cancel_event.is_set():
logger.info("Download Cancelled in bibtex mode")
return None, "Download Cancelled", "Download Cancelled", ""
if result is None:
continue
if isinstance(result, Exception):
# Excepción inesperada
error_msg = f"Unexpected error: {str(result)}"
logger.error(f"Error downloading {doi}: {error_msg}")
failed_dois.append(f'<a href="https://doi.org/{doi}">{doi}</a> - {error_msg}')
elif result[0] is None:
# Descarga fallida (resultado de download_single_doi_async)
error_msg = result[1]
logger.warning(f"Failed to download {doi}: {error_msg}")
failed_dois.append(f'<a href="https://doi.org/{doi}">{doi}</a> - {error_msg}')
else:
# Descarga exitosa
filepath = result[0]
# Unique filename for zip
filename = f"{str(doi).replace('/', '_').replace('.', '_')}_{i}.pdf"
filepath_unique = os.path.join(self.output_dir, filename)
os.rename(filepath, filepath_unique)
downloaded_files.append(filepath_unique)
downloaded_links.append(f'<a href="https://doi.org/{doi}">{doi}</a>')
if downloaded_files:
zip_filename = 'papers.zip'
loop = asyncio.get_running_loop()
loop.run_in_executor(self.executor, lambda: self.create_zip(zip_filename,downloaded_files))
logger.info(f"ZIP file created: {zip_filename}")
return zip_filename, "\n".join(downloaded_links), "\n".join(failed_dois),""
def create_zip(self, zip_filename, files):
"""Crea un archivo zip con los pdfs descargados"""
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zf:
for file in files:
zf.write(file, os.path.basename(file))
def cancel_download(self):
if self.download_task:
self.cancel_event.set()
# Cancel the download task if it exists and it is cancelable
self.download_task.cancel()
def create_gradio_interface():
"""Create Gradio interface for Paper Downloader"""
downloader = PaperDownloader()
def update_progress( message="", logs=""):
return gr.Textbox.update(value=f"{message}"),gr.Textbox.update(value=f"<pre>{logs}</pre>")
async def download_papers(bib_file, doi_input, dois_input):
cancel_event = asyncio.Event() # Create cancellation event for every submission.
downloader.cancel_event = cancel_event # store the event so that it is available to stop the process
if bib_file:
# Check file type
if not bib_file.name.lower().endswith('.bib'):
return None, "Error: Please upload a .bib file", "Error: Please upload a .bib file", "", None
zip_file, downloaded_dois, failed_dois, logs_text= await downloader.process_bibtex(bib_file, cancel_event)
return zip_file, downloaded_dois, failed_dois, logs_text, None #all outputs at return.
elif doi_input:
filepath, message, error = await downloader._download_single_doi(doi_input,cancel_event)
return None, message, error, "", filepath
elif dois_input:
zip_file, downloaded_dois, failed_dois, logs_text= await downloader.download_multiple_dois(dois_input, cancel_event)
return zip_file, downloaded_dois, failed_dois, logs_text, None
else:
return None, "Please provide a .bib file, a single DOI, or a list of DOIs", "Please provide a .bib file, a single DOI, or a list of DOIs","", None #all output data returned
with gr.Blocks(theme="Hev832/Applio", css="""
.gradio-container {
background-color: black;
}
.gr-interface {
max-width: 800px;
margin: 0 auto;
}
.gr-box {
background-color: black;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
.output-text a {
color: #007bff; /* Blue color for hyperlinks */
}
.logs_box {
}
""") as interface:
with gr.Row():
with gr.Column():
bib_file = gr.File(file_types=['.bib'], label="Upload BibTeX File")
doi_input = gr.Textbox(label="Enter Single DOI", placeholder="10.xxxx/xxxx")
dois_input = gr.Textbox(label="Enter Multiple DOIs (one per line)", placeholder="10.xxxx/xxxx\n10.yyyy/yyyy\n...")
with gr.Row():
clear_button= gr.ClearButton(value = "Clear") # added a clear button
submit_button= gr.Button(value="Submit")
examples= gr.Examples([
["example.bib", None, None], # Bibtex File
[None, "10.1038/nature12373", None], # Single DOI
[None, None, "10.1109/5.771073\n10.3390/horticulturae8080677"], # Multiple DOIs
],
inputs=[bib_file, doi_input, dois_input]
)
with gr.Column():
output_file = gr.File(label="Download Papers (ZIP) or Single PDF")
downloaded_dois_textbox = gr.HTML(label="""
Found DOIs
""",)
failed_dois_textbox=gr.HTML(label="""
Missed DOIs
""",)
logs = gr.Textbox(label="""
Logs
""", lines = 10)
single_file= gr.File(label="Downloaded Single PDF")
with gr.Row():
stop_button = gr.Button(value="Stop Downloads")
stop_button.click(lambda: downloader.cancel_download(), outputs=None) # added function in object downloader
submit_button.click(
download_papers,
inputs=[bib_file, doi_input, dois_input],
outputs=[output_file, downloaded_dois_textbox, failed_dois_textbox,logs, single_file ], # the new output should be a tuple and we output logs too for debugging.
)
interface.title="🔬 Academic Paper Batch Downloader"
interface.description="Upload a BibTeX file or enter DOIs to download PDFs. We'll attempt to fetch PDFs from multiple sources like Sci-Hub, Libgen, Google Scholar and Crossref. You can use any of the three inputs at any moment."
return interface
def main():
interface = create_gradio_interface()
interface.launch(share=True)
if __name__ == "__main__":
main()