File size: 10,305 Bytes
1a3b3aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
"""Module for fetching and parsing articles from PubMed and PMC using Entrez efetch."""

from __future__ import annotations
import html
import requests
import unicodedata
from abc import ABC, abstractmethod
from io import StringIO
from pathlib import Path
from typing import IO, Any, Dict, Union 
from xml.etree.ElementTree import Element  # nosec
from zipfile import ZipFile
from typing import Generator
from defusedxml import ElementTree

_ENTREZ_EFETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"

def _db_parser(article_id:str) -> str|None:
    """Parse the article ID to ensure it is in the correct format."""
    db = None
    if article_id.startswith('PMC') and article_id[3:].isdigit():
        db = "pmc"
    elif article_id.isdigit():
        db = "pubmed"
    return db

def _dl_article_xml(article_id:str, db:str|None) -> tuple[None|str,str] : 
    xml_string = None
    params = {"db": db, "id": article_id, "retmode": "xml"}
    response = requests.get(_ENTREZ_EFETCH_URL, params=params)
    if response.status_code == 200:
        xml_string = response.text
    return xml_string

def _parse_article(xml_string:str, db:str) -> Union[None,ArticleParser] : 
    parsed_article = None
    if db == "pmc":
        parsed_article = JATSXMLParser.from_string(xml_string)
    elif db == "pubmed":
        parsed_article = PubMedXMLParser(xml_string)
    # check if parsing was successful
    if not parsed_article.abstract and not parsed_article.paragraphs:
        parsed_article = None
    return parsed_article

def _reformat_article(parsed_article:ArticleParser) -> Dict[str,Any] :
    reformatted_article = {"Title":[parsed_article.title]}
    for sec_title,sentence in parsed_article.abstract :
        sec_title = "Abstract" if sec_title is None else "Abstract - " + sec_title
        reformatted_article[sec_title] = reformatted_article.get(sec_title,[]) + [sentence]
    for sec_title,sentence in parsed_article.paragraphs :
        reformatted_article[sec_title] = reformatted_article.get(sec_title,[]) + [sentence]
    return reformatted_article
    

def dl_and_parse(article_id:str) -> Dict[str,Union[None,Any]]:
    """Fetch article from PubMed or PMC using the ID using Entrez efetch 
    and parse it using the appropriate parser. Then returns dict containing keys : 
    article_xml(raw xml of downloaded article) and
    article_sections (parsed sections in the form of a dictionary with keys as section titles 
    and values as list of text content)"""
    parse_output = {
        "db" : None,
        "article_xml": None,
        "article_sections": None,
    }
    # parse id for correct db format
    parse_output["db"] = _db_parser(article_id)
    if parse_output["db"] is None:
        return parse_output
    parse_output["article_xml"] = _dl_article_xml(article_id, parse_output["db"])
    article_parser = _parse_article(parse_output["article_xml"], parse_output["db"])
    if article_parser is None :
        return parse_output
    parse_output["article_sections"] = _reformat_article(article_parser)
    return parse_output

class ArticleParser(ABC):
    """An abstract base class for article parsers."""

    @property
    @abstractmethod
    def title(self) -> str:
        """Get the article title.

        Returns
        -------
        str
            The article title.
        """

    @property
    @abstractmethod
    def abstract(self) -> list[str]:
        """Get a sequence of paragraphs in the article abstract.

        Returns
        -------
        list of str
            The paragraphs of the article abstract.
        """

    @property
    @abstractmethod
    def paragraphs(self) -> list[tuple[str, str]]:
        """Get all paragraphs and titles of sections they are part of.

        Returns
        -------
        list of (str, str)
            For each paragraph a tuple with two strings is returned. The first
            is the section title, the second the paragraph content.
        """


class JATSXMLParser(ArticleParser):
    def __init__(self, xml_stream: IO[Any]) -> None:
        super().__init__()
        self.content = ElementTree.parse(xml_stream)
        if self.content.getroot().tag == "pmc-articleset":
            self.content = self.content.find("article")

    @classmethod
    def from_string(cls, xml_string: str) -> JATSXMLParser:
        with StringIO(xml_string) as stream:
            obj = cls(stream)
        return obj

    @classmethod
    def from_zip(cls, path: str | Path) -> JATSXMLParser:
        with ZipFile(path) as myzip:
            xml_files = [
                x
                for x in myzip.namelist()
                if x.startswith("content/") and x.endswith(".xml")
            ]

            if len(xml_files) != 1:
                raise ValueError(
                    "There needs to be exactly one .xml file inside of content/"
                )

            xml_file = xml_files[0]

            # Parsing logic
            with myzip.open(xml_file, "r") as fh:
                obj = cls(fh)
        return obj

    @property
    def title(self) -> str:
        titles = self.content.find("./front/article-meta/title-group/article-title")
        return self._element_to_str(titles)

    @property
    def abstract(self) -> list[tuple[str, str]]:
        abstract = self.content.find("./front/article-meta/abstract")
        abstract_list: list[tuple[str, str]] = []
        if abstract:
            for sec_title, text in self.parse_section(abstract):
                abstract_list.append((sec_title,text))
        return abstract_list

    @property
    def paragraphs(self) -> list[tuple[str, str]]:
        paragraph_list: list[tuple[str, str]] = []

        # Paragraphs of text body
        body = self.content.find("./body")
        if body:
            paragraph_list.extend(self.parse_section(body,""))

        # Figure captions
        figs = self.content.findall("./body//fig")
        for fig in figs:
            fig_captions = fig.findall("caption")
            if fig_captions is None:
                continue
            caption = " ".join(self._element_to_str(c) for c in list(fig_captions))
            if caption:
                paragraph_list.append(("Figure Caption", caption))

        # Table captions
        tables = self.content.findall("./body//table-wrap")
        for table in tables:
            caption_elements = table.findall("./caption/p") or table.findall(
                "./caption/title"
            )
            if caption_elements is None:
                continue
            caption = " ".join(self._element_to_str(c) for c in caption_elements)
            if caption:
                paragraph_list.append(("Table Caption", caption))
        return paragraph_list
    
    def parse_section(self, section: Element, sec_title_path: str = "") -> Generator[tuple[str, str], None, None]:
        sec_title = self._element_to_str(section.find("title"))
        if sec_title == "Author contributions":
            return
        sec_title_path = sec_title_path + " - " + sec_title if sec_title_path else sec_title
        for element in section:
            if element.tag == "sec":
                yield from self.parse_section(element, sec_title_path)
            elif element.tag in {"title", "caption", "fig", "table-wrap", "label"}:
                continue
            else:
                text = self._element_to_str(element)
                if text:
                    yield sec_title_path, text

    def _inner_text(self, element: Element) -> str:
        text_parts = [html.unescape(element.text or "")]
        for sub_element in element:
            # recursively parse the sub-element
            text_parts.append(self._element_to_str(sub_element))
            # don't forget the text after the sub-element
            text_parts.append(html.unescape(sub_element.tail or ""))
        return unicodedata.normalize("NFKC", "".join(text_parts)).strip()

    def _element_to_str(self, element: Element | None) -> str:
        if element is None:
            return ""

        if element.tag in {
            "bold",
            "italic",
            "monospace",
            "p",
            "sc",
            "styled-content",
            "underline",
            "xref",
        }:
            # Mostly styling tags for which getting the inner text is enough.
            # Currently this is the same as the default handling. Writing it out
            # explicitly here to decouple from the default handling, which may
            # change in the future.
            return self._inner_text(element)
        elif element.tag == "sub":
            return f"_{self._inner_text(element)}"
        elif element.tag == "sup":
            return f"^{self._inner_text(element)}"
        elif element.tag in {
            "disp-formula",
            "email",
            "ext-link",
            "inline-formula",
            "uri",
        }:
            return ""
        else:
            # Default handling for all other element tags
            return self._inner_text(element)


class PubMedXMLParser(ArticleParser):
    """Parser for PubMed abstract."""

    def __init__(self, data: str | bytes) -> None:
        super().__init__()
        self.content = ElementTree.fromstring(data)

    @property
    def title(self) -> str:
        title = self.content.find("./PubmedArticle/MedlineCitation/Article/ArticleTitle")
        if title is None:
            return ""
        return "".join(title.itertext())

    @property
    def abstract(self) -> list[tuple[str,str]]:
        abstract = self.content.find("./PubmedArticle/MedlineCitation/Article/Abstract")

        if abstract is None:
            # No paragraphs to parse: stop and return an empty iterable.
            return []  # noqa
        
        paragraphs = abstract.iter("AbstractText")
        abstract_list: list[tuple[str,str]] = []
        if paragraphs is not None:
            for paragraph in paragraphs:
                sec_title = paragraph.get("Label") 
                abstract_list.append((sec_title,"".join(paragraph.itertext())))
        return abstract_list

    @property
    def paragraphs(self) -> list[tuple[str, str]]:
        # No paragraph to parse in PubMed article sets: return an empty iterable.
        return []