Spaces:
Runtime error
Runtime error
| # Copyright (C) 2002-2007 Python Software Foundation | |
| # Contact: [email protected] | |
| """Email address parsing code. | |
| Lifted directly from rfc822.py. This should eventually be rewritten. | |
| """ | |
| __all__ = [ | |
| 'mktime_tz', | |
| 'parsedate', | |
| 'parsedate_tz', | |
| 'quote', | |
| ] | |
| import time, calendar | |
| SPACE = ' ' | |
| EMPTYSTRING = '' | |
| COMMASPACE = ', ' | |
| # Parse a date field | |
| _monthnames = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', | |
| 'aug', 'sep', 'oct', 'nov', 'dec', | |
| 'january', 'february', 'march', 'april', 'may', 'june', 'july', | |
| 'august', 'september', 'october', 'november', 'december'] | |
| _daynames = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] | |
| # The timezone table does not include the military time zones defined | |
| # in RFC822, other than Z. According to RFC1123, the description in | |
| # RFC822 gets the signs wrong, so we can't rely on any such time | |
| # zones. RFC1123 recommends that numeric timezone indicators be used | |
| # instead of timezone names. | |
| _timezones = {'UT':0, 'UTC':0, 'GMT':0, 'Z':0, | |
| 'AST': -400, 'ADT': -300, # Atlantic (used in Canada) | |
| 'EST': -500, 'EDT': -400, # Eastern | |
| 'CST': -600, 'CDT': -500, # Central | |
| 'MST': -700, 'MDT': -600, # Mountain | |
| 'PST': -800, 'PDT': -700 # Pacific | |
| } | |
| def parsedate_tz(data): | |
| """Convert a date string to a time tuple. | |
| Accounts for military timezones. | |
| """ | |
| res = _parsedate_tz(data) | |
| if not res: | |
| return | |
| if res[9] is None: | |
| res[9] = 0 | |
| return tuple(res) | |
| def _parsedate_tz(data): | |
| """Convert date to extended time tuple. | |
| The last (additional) element is the time zone offset in seconds, except if | |
| the timezone was specified as -0000. In that case the last element is | |
| None. This indicates a UTC timestamp that explicitly declaims knowledge of | |
| the source timezone, as opposed to a +0000 timestamp that indicates the | |
| source timezone really was UTC. | |
| """ | |
| if not data: | |
| return None | |
| data = data.split() | |
| if not data: # This happens for whitespace-only input. | |
| return None | |
| # The FWS after the comma after the day-of-week is optional, so search and | |
| # adjust for this. | |
| if data[0].endswith(',') or data[0].lower() in _daynames: | |
| # There's a dayname here. Skip it | |
| del data[0] | |
| else: | |
| i = data[0].rfind(',') | |
| if i >= 0: | |
| data[0] = data[0][i+1:] | |
| if len(data) == 3: # RFC 850 date, deprecated | |
| stuff = data[0].split('-') | |
| if len(stuff) == 3: | |
| data = stuff + data[1:] | |
| if len(data) == 4: | |
| s = data[3] | |
| i = s.find('+') | |
| if i == -1: | |
| i = s.find('-') | |
| if i > 0: | |
| data[3:] = [s[:i], s[i:]] | |
| else: | |
| data.append('') # Dummy tz | |
| if len(data) < 5: | |
| return None | |
| data = data[:5] | |
| [dd, mm, yy, tm, tz] = data | |
| if not (dd and mm and yy): | |
| return None | |
| mm = mm.lower() | |
| if mm not in _monthnames: | |
| dd, mm = mm, dd.lower() | |
| if mm not in _monthnames: | |
| return None | |
| mm = _monthnames.index(mm) + 1 | |
| if mm > 12: | |
| mm -= 12 | |
| if dd[-1] == ',': | |
| dd = dd[:-1] | |
| i = yy.find(':') | |
| if i > 0: | |
| yy, tm = tm, yy | |
| if yy[-1] == ',': | |
| yy = yy[:-1] | |
| if not yy: | |
| return None | |
| if not yy[0].isdigit(): | |
| yy, tz = tz, yy | |
| if tm[-1] == ',': | |
| tm = tm[:-1] | |
| tm = tm.split(':') | |
| if len(tm) == 2: | |
| [thh, tmm] = tm | |
| tss = '0' | |
| elif len(tm) == 3: | |
| [thh, tmm, tss] = tm | |
| elif len(tm) == 1 and '.' in tm[0]: | |
| # Some non-compliant MUAs use '.' to separate time elements. | |
| tm = tm[0].split('.') | |
| if len(tm) == 2: | |
| [thh, tmm] = tm | |
| tss = 0 | |
| elif len(tm) == 3: | |
| [thh, tmm, tss] = tm | |
| else: | |
| return None | |
| else: | |
| return None | |
| try: | |
| yy = int(yy) | |
| dd = int(dd) | |
| thh = int(thh) | |
| tmm = int(tmm) | |
| tss = int(tss) | |
| except ValueError: | |
| return None | |
| # Check for a yy specified in two-digit format, then convert it to the | |
| # appropriate four-digit format, according to the POSIX standard. RFC 822 | |
| # calls for a two-digit yy, but RFC 2822 (which obsoletes RFC 822) | |
| # mandates a 4-digit yy. For more information, see the documentation for | |
| # the time module. | |
| if yy < 100: | |
| # The year is between 1969 and 1999 (inclusive). | |
| if yy > 68: | |
| yy += 1900 | |
| # The year is between 2000 and 2068 (inclusive). | |
| else: | |
| yy += 2000 | |
| tzoffset = None | |
| tz = tz.upper() | |
| if tz in _timezones: | |
| tzoffset = _timezones[tz] | |
| else: | |
| try: | |
| tzoffset = int(tz) | |
| except ValueError: | |
| pass | |
| if tzoffset==0 and tz.startswith('-'): | |
| tzoffset = None | |
| # Convert a timezone offset into seconds ; -0500 -> -18000 | |
| if tzoffset: | |
| if tzoffset < 0: | |
| tzsign = -1 | |
| tzoffset = -tzoffset | |
| else: | |
| tzsign = 1 | |
| tzoffset = tzsign * ( (tzoffset//100)*3600 + (tzoffset % 100)*60) | |
| # Daylight Saving Time flag is set to -1, since DST is unknown. | |
| return [yy, mm, dd, thh, tmm, tss, 0, 1, -1, tzoffset] | |
| def parsedate(data): | |
| """Convert a time string to a time tuple.""" | |
| t = parsedate_tz(data) | |
| if isinstance(t, tuple): | |
| return t[:9] | |
| else: | |
| return t | |
| def mktime_tz(data): | |
| """Turn a 10-tuple as returned by parsedate_tz() into a POSIX timestamp.""" | |
| if data[9] is None: | |
| # No zone info, so localtime is better assumption than GMT | |
| return time.mktime(data[:8] + (-1,)) | |
| else: | |
| t = calendar.timegm(data) | |
| return t - data[9] | |
| def quote(str): | |
| """Prepare string to be used in a quoted string. | |
| Turns backslash and double quote characters into quoted pairs. These | |
| are the only characters that need to be quoted inside a quoted string. | |
| Does not add the surrounding double quotes. | |
| """ | |
| return str.replace('\\', '\\\\').replace('"', '\\"') | |
| class AddrlistClass: | |
| """Address parser class by Ben Escoto. | |
| To understand what this class does, it helps to have a copy of RFC 2822 in | |
| front of you. | |
| Note: this class interface is deprecated and may be removed in the future. | |
| Use email.utils.AddressList instead. | |
| """ | |
| def __init__(self, field): | |
| """Initialize a new instance. | |
| `field' is an unparsed address header field, containing | |
| one or more addresses. | |
| """ | |
| self.specials = '()<>@,:;.\"[]' | |
| self.pos = 0 | |
| self.LWS = ' \t' | |
| self.CR = '\r\n' | |
| self.FWS = self.LWS + self.CR | |
| self.atomends = self.specials + self.LWS + self.CR | |
| # Note that RFC 2822 now specifies `.' as obs-phrase, meaning that it | |
| # is obsolete syntax. RFC 2822 requires that we recognize obsolete | |
| # syntax, so allow dots in phrases. | |
| self.phraseends = self.atomends.replace('.', '') | |
| self.field = field | |
| self.commentlist = [] | |
| def gotonext(self): | |
| """Skip white space and extract comments.""" | |
| wslist = [] | |
| while self.pos < len(self.field): | |
| if self.field[self.pos] in self.LWS + '\n\r': | |
| if self.field[self.pos] not in '\n\r': | |
| wslist.append(self.field[self.pos]) | |
| self.pos += 1 | |
| elif self.field[self.pos] == '(': | |
| self.commentlist.append(self.getcomment()) | |
| else: | |
| break | |
| return EMPTYSTRING.join(wslist) | |
| def getaddrlist(self): | |
| """Parse all addresses. | |
| Returns a list containing all of the addresses. | |
| """ | |
| result = [] | |
| while self.pos < len(self.field): | |
| ad = self.getaddress() | |
| if ad: | |
| result += ad | |
| else: | |
| result.append(('', '')) | |
| return result | |
| def getaddress(self): | |
| """Parse the next address.""" | |
| self.commentlist = [] | |
| self.gotonext() | |
| oldpos = self.pos | |
| oldcl = self.commentlist | |
| plist = self.getphraselist() | |
| self.gotonext() | |
| returnlist = [] | |
| if self.pos >= len(self.field): | |
| # Bad email address technically, no domain. | |
| if plist: | |
| returnlist = [(SPACE.join(self.commentlist), plist[0])] | |
| elif self.field[self.pos] in '.@': | |
| # email address is just an addrspec | |
| # this isn't very efficient since we start over | |
| self.pos = oldpos | |
| self.commentlist = oldcl | |
| addrspec = self.getaddrspec() | |
| returnlist = [(SPACE.join(self.commentlist), addrspec)] | |
| elif self.field[self.pos] == ':': | |
| # address is a group | |
| returnlist = [] | |
| fieldlen = len(self.field) | |
| self.pos += 1 | |
| while self.pos < len(self.field): | |
| self.gotonext() | |
| if self.pos < fieldlen and self.field[self.pos] == ';': | |
| self.pos += 1 | |
| break | |
| returnlist = returnlist + self.getaddress() | |
| elif self.field[self.pos] == '<': | |
| # Address is a phrase then a route addr | |
| routeaddr = self.getrouteaddr() | |
| if self.commentlist: | |
| returnlist = [(SPACE.join(plist) + ' (' + | |
| ' '.join(self.commentlist) + ')', routeaddr)] | |
| else: | |
| returnlist = [(SPACE.join(plist), routeaddr)] | |
| else: | |
| if plist: | |
| returnlist = [(SPACE.join(self.commentlist), plist[0])] | |
| elif self.field[self.pos] in self.specials: | |
| self.pos += 1 | |
| self.gotonext() | |
| if self.pos < len(self.field) and self.field[self.pos] == ',': | |
| self.pos += 1 | |
| return returnlist | |
| def getrouteaddr(self): | |
| """Parse a route address (Return-path value). | |
| This method just skips all the route stuff and returns the addrspec. | |
| """ | |
| if self.field[self.pos] != '<': | |
| return | |
| expectroute = False | |
| self.pos += 1 | |
| self.gotonext() | |
| adlist = '' | |
| while self.pos < len(self.field): | |
| if expectroute: | |
| self.getdomain() | |
| expectroute = False | |
| elif self.field[self.pos] == '>': | |
| self.pos += 1 | |
| break | |
| elif self.field[self.pos] == '@': | |
| self.pos += 1 | |
| expectroute = True | |
| elif self.field[self.pos] == ':': | |
| self.pos += 1 | |
| else: | |
| adlist = self.getaddrspec() | |
| self.pos += 1 | |
| break | |
| self.gotonext() | |
| return adlist | |
| def getaddrspec(self): | |
| """Parse an RFC 2822 addr-spec.""" | |
| aslist = [] | |
| self.gotonext() | |
| while self.pos < len(self.field): | |
| preserve_ws = True | |
| if self.field[self.pos] == '.': | |
| if aslist and not aslist[-1].strip(): | |
| aslist.pop() | |
| aslist.append('.') | |
| self.pos += 1 | |
| preserve_ws = False | |
| elif self.field[self.pos] == '"': | |
| aslist.append('"%s"' % quote(self.getquote())) | |
| elif self.field[self.pos] in self.atomends: | |
| if aslist and not aslist[-1].strip(): | |
| aslist.pop() | |
| break | |
| else: | |
| aslist.append(self.getatom()) | |
| ws = self.gotonext() | |
| if preserve_ws and ws: | |
| aslist.append(ws) | |
| if self.pos >= len(self.field) or self.field[self.pos] != '@': | |
| return EMPTYSTRING.join(aslist) | |
| aslist.append('@') | |
| self.pos += 1 | |
| self.gotonext() | |
| domain = self.getdomain() | |
| if not domain: | |
| # Invalid domain, return an empty address instead of returning a | |
| # local part to denote failed parsing. | |
| return EMPTYSTRING | |
| return EMPTYSTRING.join(aslist) + domain | |
| def getdomain(self): | |
| """Get the complete domain name from an address.""" | |
| sdlist = [] | |
| while self.pos < len(self.field): | |
| if self.field[self.pos] in self.LWS: | |
| self.pos += 1 | |
| elif self.field[self.pos] == '(': | |
| self.commentlist.append(self.getcomment()) | |
| elif self.field[self.pos] == '[': | |
| sdlist.append(self.getdomainliteral()) | |
| elif self.field[self.pos] == '.': | |
| self.pos += 1 | |
| sdlist.append('.') | |
| elif self.field[self.pos] == '@': | |
| # bpo-34155: Don't parse domains with two `@` like | |
| # `[email protected]@important.com`. | |
| return EMPTYSTRING | |
| elif self.field[self.pos] in self.atomends: | |
| break | |
| else: | |
| sdlist.append(self.getatom()) | |
| return EMPTYSTRING.join(sdlist) | |
| def getdelimited(self, beginchar, endchars, allowcomments=True): | |
| """Parse a header fragment delimited by special characters. | |
| `beginchar' is the start character for the fragment. | |
| If self is not looking at an instance of `beginchar' then | |
| getdelimited returns the empty string. | |
| `endchars' is a sequence of allowable end-delimiting characters. | |
| Parsing stops when one of these is encountered. | |
| If `allowcomments' is non-zero, embedded RFC 2822 comments are allowed | |
| within the parsed fragment. | |
| """ | |
| if self.field[self.pos] != beginchar: | |
| return '' | |
| slist = [''] | |
| quote = False | |
| self.pos += 1 | |
| while self.pos < len(self.field): | |
| if quote: | |
| slist.append(self.field[self.pos]) | |
| quote = False | |
| elif self.field[self.pos] in endchars: | |
| self.pos += 1 | |
| break | |
| elif allowcomments and self.field[self.pos] == '(': | |
| slist.append(self.getcomment()) | |
| continue # have already advanced pos from getcomment | |
| elif self.field[self.pos] == '\\': | |
| quote = True | |
| else: | |
| slist.append(self.field[self.pos]) | |
| self.pos += 1 | |
| return EMPTYSTRING.join(slist) | |
| def getquote(self): | |
| """Get a quote-delimited fragment from self's field.""" | |
| return self.getdelimited('"', '"\r', False) | |
| def getcomment(self): | |
| """Get a parenthesis-delimited fragment from self's field.""" | |
| return self.getdelimited('(', ')\r', True) | |
| def getdomainliteral(self): | |
| """Parse an RFC 2822 domain-literal.""" | |
| return '[%s]' % self.getdelimited('[', ']\r', False) | |
| def getatom(self, atomends=None): | |
| """Parse an RFC 2822 atom. | |
| Optional atomends specifies a different set of end token delimiters | |
| (the default is to use self.atomends). This is used e.g. in | |
| getphraselist() since phrase endings must not include the `.' (which | |
| is legal in phrases).""" | |
| atomlist = [''] | |
| if atomends is None: | |
| atomends = self.atomends | |
| while self.pos < len(self.field): | |
| if self.field[self.pos] in atomends: | |
| break | |
| else: | |
| atomlist.append(self.field[self.pos]) | |
| self.pos += 1 | |
| return EMPTYSTRING.join(atomlist) | |
| def getphraselist(self): | |
| """Parse a sequence of RFC 2822 phrases. | |
| A phrase is a sequence of words, which are in turn either RFC 2822 | |
| atoms or quoted-strings. Phrases are canonicalized by squeezing all | |
| runs of continuous whitespace into one space. | |
| """ | |
| plist = [] | |
| while self.pos < len(self.field): | |
| if self.field[self.pos] in self.FWS: | |
| self.pos += 1 | |
| elif self.field[self.pos] == '"': | |
| plist.append(self.getquote()) | |
| elif self.field[self.pos] == '(': | |
| self.commentlist.append(self.getcomment()) | |
| elif self.field[self.pos] in self.phraseends: | |
| break | |
| else: | |
| plist.append(self.getatom(self.phraseends)) | |
| return plist | |
| class AddressList(AddrlistClass): | |
| """An AddressList encapsulates a list of parsed RFC 2822 addresses.""" | |
| def __init__(self, field): | |
| AddrlistClass.__init__(self, field) | |
| if field: | |
| self.addresslist = self.getaddrlist() | |
| else: | |
| self.addresslist = [] | |
| def __len__(self): | |
| return len(self.addresslist) | |
| def __add__(self, other): | |
| # Set union | |
| newaddr = AddressList(None) | |
| newaddr.addresslist = self.addresslist[:] | |
| for x in other.addresslist: | |
| if not x in self.addresslist: | |
| newaddr.addresslist.append(x) | |
| return newaddr | |
| def __iadd__(self, other): | |
| # Set union, in-place | |
| for x in other.addresslist: | |
| if not x in self.addresslist: | |
| self.addresslist.append(x) | |
| return self | |
| def __sub__(self, other): | |
| # Set difference | |
| newaddr = AddressList(None) | |
| for x in self.addresslist: | |
| if not x in other.addresslist: | |
| newaddr.addresslist.append(x) | |
| return newaddr | |
| def __isub__(self, other): | |
| # Set difference, in-place | |
| for x in other.addresslist: | |
| if x in self.addresslist: | |
| self.addresslist.remove(x) | |
| return self | |
| def __getitem__(self, index): | |
| # Make indexing, slices, and 'in' work | |
| return self.addresslist[index] | |