Spaces:
Running
Running
File size: 10,305 Bytes
1a3b3aa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
"""Module for fetching and parsing articles from PubMed and PMC using Entrez efetch."""
from __future__ import annotations
import html
import requests
import unicodedata
from abc import ABC, abstractmethod
from io import StringIO
from pathlib import Path
from typing import IO, Any, Dict, Union
from xml.etree.ElementTree import Element # nosec
from zipfile import ZipFile
from typing import Generator
from defusedxml import ElementTree
_ENTREZ_EFETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
def _db_parser(article_id:str) -> str|None:
"""Parse the article ID to ensure it is in the correct format."""
db = None
if article_id.startswith('PMC') and article_id[3:].isdigit():
db = "pmc"
elif article_id.isdigit():
db = "pubmed"
return db
def _dl_article_xml(article_id:str, db:str|None) -> tuple[None|str,str] :
xml_string = None
params = {"db": db, "id": article_id, "retmode": "xml"}
response = requests.get(_ENTREZ_EFETCH_URL, params=params)
if response.status_code == 200:
xml_string = response.text
return xml_string
def _parse_article(xml_string:str, db:str) -> Union[None,ArticleParser] :
parsed_article = None
if db == "pmc":
parsed_article = JATSXMLParser.from_string(xml_string)
elif db == "pubmed":
parsed_article = PubMedXMLParser(xml_string)
# check if parsing was successful
if not parsed_article.abstract and not parsed_article.paragraphs:
parsed_article = None
return parsed_article
def _reformat_article(parsed_article:ArticleParser) -> Dict[str,Any] :
reformatted_article = {"Title":[parsed_article.title]}
for sec_title,sentence in parsed_article.abstract :
sec_title = "Abstract" if sec_title is None else "Abstract - " + sec_title
reformatted_article[sec_title] = reformatted_article.get(sec_title,[]) + [sentence]
for sec_title,sentence in parsed_article.paragraphs :
reformatted_article[sec_title] = reformatted_article.get(sec_title,[]) + [sentence]
return reformatted_article
def dl_and_parse(article_id:str) -> Dict[str,Union[None,Any]]:
"""Fetch article from PubMed or PMC using the ID using Entrez efetch
and parse it using the appropriate parser. Then returns dict containing keys :
article_xml(raw xml of downloaded article) and
article_sections (parsed sections in the form of a dictionary with keys as section titles
and values as list of text content)"""
parse_output = {
"db" : None,
"article_xml": None,
"article_sections": None,
}
# parse id for correct db format
parse_output["db"] = _db_parser(article_id)
if parse_output["db"] is None:
return parse_output
parse_output["article_xml"] = _dl_article_xml(article_id, parse_output["db"])
article_parser = _parse_article(parse_output["article_xml"], parse_output["db"])
if article_parser is None :
return parse_output
parse_output["article_sections"] = _reformat_article(article_parser)
return parse_output
class ArticleParser(ABC):
"""An abstract base class for article parsers."""
@property
@abstractmethod
def title(self) -> str:
"""Get the article title.
Returns
-------
str
The article title.
"""
@property
@abstractmethod
def abstract(self) -> list[str]:
"""Get a sequence of paragraphs in the article abstract.
Returns
-------
list of str
The paragraphs of the article abstract.
"""
@property
@abstractmethod
def paragraphs(self) -> list[tuple[str, str]]:
"""Get all paragraphs and titles of sections they are part of.
Returns
-------
list of (str, str)
For each paragraph a tuple with two strings is returned. The first
is the section title, the second the paragraph content.
"""
class JATSXMLParser(ArticleParser):
def __init__(self, xml_stream: IO[Any]) -> None:
super().__init__()
self.content = ElementTree.parse(xml_stream)
if self.content.getroot().tag == "pmc-articleset":
self.content = self.content.find("article")
@classmethod
def from_string(cls, xml_string: str) -> JATSXMLParser:
with StringIO(xml_string) as stream:
obj = cls(stream)
return obj
@classmethod
def from_zip(cls, path: str | Path) -> JATSXMLParser:
with ZipFile(path) as myzip:
xml_files = [
x
for x in myzip.namelist()
if x.startswith("content/") and x.endswith(".xml")
]
if len(xml_files) != 1:
raise ValueError(
"There needs to be exactly one .xml file inside of content/"
)
xml_file = xml_files[0]
# Parsing logic
with myzip.open(xml_file, "r") as fh:
obj = cls(fh)
return obj
@property
def title(self) -> str:
titles = self.content.find("./front/article-meta/title-group/article-title")
return self._element_to_str(titles)
@property
def abstract(self) -> list[tuple[str, str]]:
abstract = self.content.find("./front/article-meta/abstract")
abstract_list: list[tuple[str, str]] = []
if abstract:
for sec_title, text in self.parse_section(abstract):
abstract_list.append((sec_title,text))
return abstract_list
@property
def paragraphs(self) -> list[tuple[str, str]]:
paragraph_list: list[tuple[str, str]] = []
# Paragraphs of text body
body = self.content.find("./body")
if body:
paragraph_list.extend(self.parse_section(body,""))
# Figure captions
figs = self.content.findall("./body//fig")
for fig in figs:
fig_captions = fig.findall("caption")
if fig_captions is None:
continue
caption = " ".join(self._element_to_str(c) for c in list(fig_captions))
if caption:
paragraph_list.append(("Figure Caption", caption))
# Table captions
tables = self.content.findall("./body//table-wrap")
for table in tables:
caption_elements = table.findall("./caption/p") or table.findall(
"./caption/title"
)
if caption_elements is None:
continue
caption = " ".join(self._element_to_str(c) for c in caption_elements)
if caption:
paragraph_list.append(("Table Caption", caption))
return paragraph_list
def parse_section(self, section: Element, sec_title_path: str = "") -> Generator[tuple[str, str], None, None]:
sec_title = self._element_to_str(section.find("title"))
if sec_title == "Author contributions":
return
sec_title_path = sec_title_path + " - " + sec_title if sec_title_path else sec_title
for element in section:
if element.tag == "sec":
yield from self.parse_section(element, sec_title_path)
elif element.tag in {"title", "caption", "fig", "table-wrap", "label"}:
continue
else:
text = self._element_to_str(element)
if text:
yield sec_title_path, text
def _inner_text(self, element: Element) -> str:
text_parts = [html.unescape(element.text or "")]
for sub_element in element:
# recursively parse the sub-element
text_parts.append(self._element_to_str(sub_element))
# don't forget the text after the sub-element
text_parts.append(html.unescape(sub_element.tail or ""))
return unicodedata.normalize("NFKC", "".join(text_parts)).strip()
def _element_to_str(self, element: Element | None) -> str:
if element is None:
return ""
if element.tag in {
"bold",
"italic",
"monospace",
"p",
"sc",
"styled-content",
"underline",
"xref",
}:
# Mostly styling tags for which getting the inner text is enough.
# Currently this is the same as the default handling. Writing it out
# explicitly here to decouple from the default handling, which may
# change in the future.
return self._inner_text(element)
elif element.tag == "sub":
return f"_{self._inner_text(element)}"
elif element.tag == "sup":
return f"^{self._inner_text(element)}"
elif element.tag in {
"disp-formula",
"email",
"ext-link",
"inline-formula",
"uri",
}:
return ""
else:
# Default handling for all other element tags
return self._inner_text(element)
class PubMedXMLParser(ArticleParser):
"""Parser for PubMed abstract."""
def __init__(self, data: str | bytes) -> None:
super().__init__()
self.content = ElementTree.fromstring(data)
@property
def title(self) -> str:
title = self.content.find("./PubmedArticle/MedlineCitation/Article/ArticleTitle")
if title is None:
return ""
return "".join(title.itertext())
@property
def abstract(self) -> list[tuple[str,str]]:
abstract = self.content.find("./PubmedArticle/MedlineCitation/Article/Abstract")
if abstract is None:
# No paragraphs to parse: stop and return an empty iterable.
return [] # noqa
paragraphs = abstract.iter("AbstractText")
abstract_list: list[tuple[str,str]] = []
if paragraphs is not None:
for paragraph in paragraphs:
sec_title = paragraph.get("Label")
abstract_list.append((sec_title,"".join(paragraph.itertext())))
return abstract_list
@property
def paragraphs(self) -> list[tuple[str, str]]:
# No paragraph to parse in PubMed article sets: return an empty iterable.
return []
|