File size: 10,605 Bytes
6eefbd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

# Modifications:
# Copyright 2006 Google, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

"""Parser driver.

This provides a high-level interface to parse a file into a syntax tree.

"""

__author__ = "Guido van Rossum <[email protected]>"

__all__ = ["Driver", "load_grammar"]

# Python imports
import io
import os
import logging
import pkgutil
import sys
from typing import (
    Any,
    cast,
    IO,
    Iterable,
    List,
    Optional,
    Iterator,
    Tuple,
    Union,
)
from contextlib import contextmanager
from dataclasses import dataclass, field

# Pgen imports
from . import grammar, parse, token, tokenize, pgen
from logging import Logger
from blib2to3.pytree import NL
from blib2to3.pgen2.grammar import Grammar
from blib2to3.pgen2.tokenize import GoodTokenInfo

Path = Union[str, "os.PathLike[str]"]


@dataclass
class ReleaseRange:
    start: int
    end: Optional[int] = None
    tokens: List[Any] = field(default_factory=list)

    def lock(self) -> None:
        total_eaten = len(self.tokens)
        self.end = self.start + total_eaten


class TokenProxy:
    def __init__(self, generator: Any) -> None:
        self._tokens = generator
        self._counter = 0
        self._release_ranges: List[ReleaseRange] = []

    @contextmanager
    def release(self) -> Iterator["TokenProxy"]:
        release_range = ReleaseRange(self._counter)
        self._release_ranges.append(release_range)
        try:
            yield self
        finally:
            # Lock the last release range to the final position that
            # has been eaten.
            release_range.lock()

    def eat(self, point: int) -> Any:
        eaten_tokens = self._release_ranges[-1].tokens
        if point < len(eaten_tokens):
            return eaten_tokens[point]
        else:
            while point >= len(eaten_tokens):
                token = next(self._tokens)
                eaten_tokens.append(token)
            return token

    def __iter__(self) -> "TokenProxy":
        return self

    def __next__(self) -> Any:
        # If the current position is already compromised (looked up)
        # return the eaten token, if not just go further on the given
        # token producer.
        for release_range in self._release_ranges:
            assert release_range.end is not None

            start, end = release_range.start, release_range.end
            if start <= self._counter < end:
                token = release_range.tokens[self._counter - start]
                break
        else:
            token = next(self._tokens)
        self._counter += 1
        return token

    def can_advance(self, to: int) -> bool:
        # Try to eat, fail if it can't. The eat operation is cached
        # so there won't be any additional cost of eating here
        try:
            self.eat(to)
        except StopIteration:
            return False
        else:
            return True


class Driver:
    def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None:
        self.grammar = grammar
        if logger is None:
            logger = logging.getLogger(__name__)
        self.logger = logger

    def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL:
        """Parse a series of tokens and return the syntax tree."""
        # XXX Move the prefix computation into a wrapper around tokenize.
        proxy = TokenProxy(tokens)

        p = parse.Parser(self.grammar)
        p.setup(proxy=proxy)

        lineno = 1
        column = 0
        indent_columns: List[int] = []
        type = value = start = end = line_text = None
        prefix = ""

        for quintuple in proxy:
            type, value, start, end, line_text = quintuple
            if start != (lineno, column):
                assert (lineno, column) <= start, ((lineno, column), start)
                s_lineno, s_column = start
                if lineno < s_lineno:
                    prefix += "\n" * (s_lineno - lineno)
                    lineno = s_lineno
                    column = 0
                if column < s_column:
                    prefix += line_text[column:s_column]
                    column = s_column
            if type in (tokenize.COMMENT, tokenize.NL):
                prefix += value
                lineno, column = end
                if value.endswith("\n"):
                    lineno += 1
                    column = 0
                continue
            if type == token.OP:
                type = grammar.opmap[value]
            if debug:
                assert type is not None
                self.logger.debug(
                    "%s %r (prefix=%r)", token.tok_name[type], value, prefix
                )
            if type == token.INDENT:
                indent_columns.append(len(value))
                _prefix = prefix + value
                prefix = ""
                value = ""
            elif type == token.DEDENT:
                _indent_col = indent_columns.pop()
                prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col)
            if p.addtoken(cast(int, type), value, (prefix, start)):
                if debug:
                    self.logger.debug("Stop.")
                break
            prefix = ""
            if type in {token.INDENT, token.DEDENT}:
                prefix = _prefix
            lineno, column = end
            if value.endswith("\n"):
                lineno += 1
                column = 0
        else:
            # We never broke out -- EOF is too soon (how can this happen???)
            assert start is not None
            raise parse.ParseError("incomplete input", type, value, (prefix, start))
        assert p.rootnode is not None
        return p.rootnode

    def parse_stream_raw(self, stream: IO[str], debug: bool = False) -> NL:
        """Parse a stream and return the syntax tree."""
        tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar)
        return self.parse_tokens(tokens, debug)

    def parse_stream(self, stream: IO[str], debug: bool = False) -> NL:
        """Parse a stream and return the syntax tree."""
        return self.parse_stream_raw(stream, debug)

    def parse_file(
        self, filename: Path, encoding: Optional[str] = None, debug: bool = False
    ) -> NL:
        """Parse a file and return the syntax tree."""
        with open(filename, encoding=encoding) as stream:
            return self.parse_stream(stream, debug)

    def parse_string(self, text: str, debug: bool = False) -> NL:
        """Parse a string and return the syntax tree."""
        tokens = tokenize.generate_tokens(
            io.StringIO(text).readline, grammar=self.grammar
        )
        return self.parse_tokens(tokens, debug)

    def _partially_consume_prefix(self, prefix: str, column: int) -> Tuple[str, str]:
        lines: List[str] = []
        current_line = ""
        current_column = 0
        wait_for_nl = False
        for char in prefix:
            current_line += char
            if wait_for_nl:
                if char == "\n":
                    if current_line.strip() and current_column < column:
                        res = "".join(lines)
                        return res, prefix[len(res) :]

                    lines.append(current_line)
                    current_line = ""
                    current_column = 0
                    wait_for_nl = False
            elif char in " \t":
                current_column += 1
            elif char == "\n":
                # unexpected empty line
                current_column = 0
            else:
                # indent is finished
                wait_for_nl = True
        return "".join(lines), current_line


def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str:
    head, tail = os.path.splitext(gt)
    if tail == ".txt":
        tail = ""
    name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle"
    if cache_dir:
        return os.path.join(cache_dir, os.path.basename(name))
    else:
        return name


def load_grammar(
    gt: str = "Grammar.txt",
    gp: Optional[str] = None,
    save: bool = True,
    force: bool = False,
    logger: Optional[Logger] = None,
) -> Grammar:
    """Load the grammar (maybe from a pickle)."""
    if logger is None:
        logger = logging.getLogger(__name__)
    gp = _generate_pickle_name(gt) if gp is None else gp
    if force or not _newer(gp, gt):
        g: grammar.Grammar = pgen.generate_grammar(gt)
        if save:
            try:
                g.dump(gp)
            except OSError:
                # Ignore error, caching is not vital.
                pass
    else:
        g = grammar.Grammar()
        g.load(gp)
    return g


def _newer(a: str, b: str) -> bool:
    """Inquire whether file a was written since file b."""
    if not os.path.exists(a):
        return False
    if not os.path.exists(b):
        return True
    return os.path.getmtime(a) >= os.path.getmtime(b)


def load_packaged_grammar(
    package: str, grammar_source: str, cache_dir: Optional[Path] = None
) -> grammar.Grammar:
    """Normally, loads a pickled grammar by doing
        pkgutil.get_data(package, pickled_grammar)
    where *pickled_grammar* is computed from *grammar_source* by adding the
    Python version and using a ``.pickle`` extension.

    However, if *grammar_source* is an extant file, load_grammar(grammar_source)
    is called instead. This facilitates using a packaged grammar file when needed
    but preserves load_grammar's automatic regeneration behavior when possible.

    """
    if os.path.isfile(grammar_source):
        gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None
        return load_grammar(grammar_source, gp=gp)
    pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir)
    data = pkgutil.get_data(package, pickled_name)
    assert data is not None
    g = grammar.Grammar()
    g.loads(data)
    return g


def main(*args: str) -> bool:
    """Main program, when run as a script: produce grammar pickle files.

    Calls load_grammar for each argument, a path to a grammar text file.
    """
    if not args:
        args = tuple(sys.argv[1:])
    logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s")
    for gt in args:
        load_grammar(gt, save=True, force=True)
    return True


if __name__ == "__main__":
    sys.exit(int(not main()))