upgrade wakatime-cli to v6.0.8

This commit is contained in:
Alan Hamlett 2016-09-02 10:50:54 +02:00
parent 21601f9688
commit b4a13a48b9
1305 changed files with 489 additions and 4963 deletions

View file

@ -1,7 +1,7 @@
__title__ = 'wakatime' __title__ = 'wakatime'
__description__ = 'Common interface to the WakaTime api.' __description__ = 'Common interface to the WakaTime api.'
__url__ = 'https://github.com/wakatime/wakatime' __url__ = 'https://github.com/wakatime/wakatime'
__version_info__ = ('6', '0', '7') __version_info__ = ('6', '0', '8')
__version__ = '.'.join(__version_info__) __version__ = '.'.join(__version_info__)
__author__ = 'Alan Hamlett' __author__ = 'Alan Hamlett'
__author_email__ = 'alan@wakatime.com' __author_email__ = 'alan@wakatime.com'

View file

@ -31,7 +31,7 @@ if is_py2: # pragma: nocover
try: try:
return unicode(text) return unicode(text)
except: except:
return text return text.decode('utf-8', 'replace')
open = codecs.open open = codecs.open
basestring = basestring basestring = basestring
@ -52,7 +52,7 @@ elif is_py3: # pragma: nocover
try: try:
return str(text) return str(text)
except: except:
return text return text.decode('utf-8', 'replace')
open = open open = open
basestring = (str, bytes) basestring = (str, bytes)

View file

@ -9,10 +9,32 @@
:license: BSD, see LICENSE for more details. :license: BSD, see LICENSE for more details.
""" """
""" Success
Exit code used when a heartbeat was sent successfully.
"""
SUCCESS = 0 SUCCESS = 0
""" Api Error
Exit code used when the WakaTime API returned an error.
"""
API_ERROR = 102 API_ERROR = 102
""" Config File Parse Error
Exit code used when the ~/.wakatime.cfg config file could not be parsed.
"""
CONFIG_FILE_PARSE_ERROR = 103 CONFIG_FILE_PARSE_ERROR = 103
""" Auth Error
Exit code used when our api key is invalid.
"""
AUTH_ERROR = 104 AUTH_ERROR = 104
""" Unknown Error
Exit code used when there was an unhandled exception.
"""
UNKNOWN_ERROR = 105 UNKNOWN_ERROR = 105
""" Malformed Heartbeat Error
Exit code used when the JSON input from `--extra-heartbeats` is malformed.
"""
MALFORMED_HEARTBEAT_ERROR = 106 MALFORMED_HEARTBEAT_ERROR = 106

View file

@ -12,7 +12,6 @@
import logging import logging
import re import re
import sys import sys
import traceback
from ..compat import u, open, import_module from ..compat import u, open, import_module
from ..exceptions import NotYetImplemented from ..exceptions import NotYetImplemented
@ -68,7 +67,7 @@ class TokenParser(object):
pass pass
try: try:
with open(self.source_file, 'r', encoding=sys.getfilesystemencoding()) as fh: with open(self.source_file, 'r', encoding=sys.getfilesystemencoding()) as fh:
return self.lexer.get_tokens_unprocessed(fh.read(512000)) return self.lexer.get_tokens_unprocessed(fh.read(512000)) # pragma: nocover
except: except:
pass pass
return [] return []
@ -120,7 +119,7 @@ class DependencyParser(object):
except AttributeError: except AttributeError:
log.debug('Module {0} is missing class {1}'.format(module.__name__, class_name)) log.debug('Module {0} is missing class {1}'.format(module.__name__, class_name))
except ImportError: except ImportError:
log.debug(traceback.format_exc()) log.traceback(logging.DEBUG)
def parse(self): def parse(self):
if self.parser: if self.parser:

View file

@ -20,6 +20,7 @@ class PythonParser(TokenParser):
r'^os$', r'^os$',
r'^sys$', r'^sys$',
r'^sys\.', r'^sys\.',
r'^__future__$',
] ]
def parse(self): def parse(self):
@ -49,9 +50,7 @@ class PythonParser(TokenParser):
self._process_import(token, content) self._process_import(token, content)
def _process_operator(self, token, content): def _process_operator(self, token, content):
if self.state is not None: pass
if content == '.':
self.nonpackage = True
def _process_punctuation(self, token, content): def _process_punctuation(self, token, content):
if content == '(': if content == '(':
@ -74,8 +73,6 @@ class PythonParser(TokenParser):
if self.state == 'from': if self.state == 'from':
self.append(content, truncate=True, truncate_to=1) self.append(content, truncate=True, truncate_to=1)
self.state = 'from-2' self.state = 'from-2'
elif self.state == 'from-2' and content != 'import':
self.append(content, truncate=True, truncate_to=1)
elif self.state == 'import': elif self.state == 'import':
self.append(content, truncate=True, truncate_to=1) self.append(content, truncate=True, truncate_to=1)
self.state = 'import-2' self.state = 'import-2'

View file

@ -114,16 +114,11 @@ class HtmlDjangoParser(TokenParser):
if self.opening_tag: if self.opening_tag:
self.tags.insert(0, content.replace('<', '', 1).strip().lower()) self.tags.insert(0, content.replace('<', '', 1).strip().lower())
self.getting_attrs = True self.getting_attrs = True
elif content.startswith('>'):
self.opening_tag = False
self.getting_attrs = False
self.current_attr = None self.current_attr = None
def _process_attribute(self, token, content): def _process_attribute(self, token, content):
if self.getting_attrs: if self.getting_attrs:
self.current_attr = content.lower().strip('=') self.current_attr = content.lower().strip('=')
else:
self.current_attr = None
self.current_attr_value = None self.current_attr_value = None
def _process_string(self, token, content): def _process_string(self, token, content):
@ -146,8 +141,6 @@ class HtmlDjangoParser(TokenParser):
elif content.startswith('"') or content.startswith("'"): elif content.startswith('"') or content.startswith("'"):
if self.current_attr_value is None: if self.current_attr_value is None:
self.current_attr_value = content self.current_attr_value = content
else:
self.current_attr_value += content
class VelocityHtmlParser(HtmlDjangoParser): class VelocityHtmlParser(HtmlDjangoParser):

View file

@ -13,7 +13,7 @@ import logging
import os import os
import traceback import traceback
from .compat import basestring, u from .compat import u
from .packages.requests.packages import urllib3 from .packages.requests.packages import urllib3
try: try:
from collections import OrderedDict # pragma: nocover from collections import OrderedDict # pragma: nocover
@ -25,20 +25,6 @@ except (ImportError, SyntaxError): # pragma: nocover
import json import json
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, basestring):
obj = u(obj)
return json.dumps(obj)
try: # pragma: nocover
encoded = super(CustomEncoder, self).default(obj)
except UnicodeDecodeError: # pragma: nocover
obj = u(obj)
encoded = super(CustomEncoder, self).default(obj)
return encoded
class JsonFormatter(logging.Formatter): class JsonFormatter(logging.Formatter):
def setup(self, timestamp, is_write, entity, version, plugin, verbose, def setup(self, timestamp, is_write, entity, version, plugin, verbose,
@ -55,35 +41,25 @@ class JsonFormatter(logging.Formatter):
data = OrderedDict([ data = OrderedDict([
('now', self.formatTime(record, self.datefmt)), ('now', self.formatTime(record, self.datefmt)),
]) ])
data['version'] = self.version data['version'] = u(self.version)
data['plugin'] = self.plugin if self.plugin:
data['plugin'] = u(self.plugin)
data['time'] = self.timestamp data['time'] = self.timestamp
if self.verbose: if self.verbose:
data['caller'] = record.pathname data['caller'] = u(record.pathname)
data['lineno'] = record.lineno data['lineno'] = record.lineno
if self.is_write:
data['is_write'] = self.is_write data['is_write'] = self.is_write
data['file'] = self.entity data['file'] = u(self.entity)
if not self.is_write:
del data['is_write']
data['level'] = record.levelname data['level'] = record.levelname
data['message'] = record.getMessage() if self.warnings else record.msg data['message'] = u(record.getMessage() if self.warnings else record.msg)
if not self.plugin: return json.dumps(data)
del data['plugin']
return CustomEncoder().encode(data)
def traceback(self, lvl=None):
def traceback_formatter(*args, **kwargs): logger = logging.getLogger('WakaTime')
level = kwargs.get('level', args[0] if len(args) else None) if not lvl:
if level: lvl = logger.getEffectiveLevel()
level = level.lower() logger.log(lvl, traceback.format_exc())
if level == 'warn' or level == 'warning':
logging.getLogger('WakaTime').warning(traceback.format_exc())
elif level == 'info':
logging.getLogger('WakaTime').info(traceback.format_exc())
elif level == 'debug':
logging.getLogger('WakaTime').debug(traceback.format_exc())
else:
logging.getLogger('WakaTime').error(traceback.format_exc())
def set_log_level(logger, args): def set_log_level(logger, args):
@ -116,7 +92,7 @@ def setup_logging(args, version):
logger.addHandler(handler) logger.addHandler(handler)
# add custom traceback logging method # add custom traceback logging method
logger.traceback = traceback_formatter logger.traceback = formatter.traceback
warnings_formatter = JsonFormatter(datefmt='%Y/%m/%d %H:%M:%S %z') warnings_formatter = JsonFormatter(datefmt='%Y/%m/%d %H:%M:%S %z')
warnings_formatter.setup( warnings_formatter.setup(

View file

@ -545,6 +545,6 @@ def execute(argv=None):
return retval return retval
except: except:
log.traceback() log.traceback(logging.ERROR)
print(traceback.format_exc()) print(traceback.format_exc())
return UNKNOWN_ERROR return UNKNOWN_ERROR

View file

@ -80,7 +80,7 @@ class Queue(object):
try: try:
conn, c = self.connect() conn, c = self.connect()
except sqlite3.Error: except sqlite3.Error:
log.traceback('debug') log.traceback(logging.DEBUG)
return None return None
loop = True loop = True
while loop and tries > -1: while loop and tries > -1:
@ -118,11 +118,11 @@ class Queue(object):
} }
loop = False loop = False
except sqlite3.Error: # pragma: nocover except sqlite3.Error: # pragma: nocover
log.traceback('debug') log.traceback(logging.DEBUG)
sleep(wait) sleep(wait)
tries -= 1 tries -= 1
try: try:
conn.close() conn.close()
except sqlite3.Error: # pragma: nocover except sqlite3.Error: # pragma: nocover
log.traceback('debug') log.traceback(logging.DEBUG)
return heartbeat return heartbeat

View file

@ -1,13 +1,3 @@
import os
import sys
from ..compat import is_py2
if is_py2:
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'py2'))
else:
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'py3'))
import tzlocal import tzlocal
from pygments.lexers import get_lexer_by_name, guess_lexer_for_filename from pygments.lexers import get_lexer_by_name, guess_lexer_for_filename
from pygments.modeline import get_filetype_from_buffer from pygments.modeline import get_filetype_from_buffer

File diff suppressed because it is too large Load diff

View file

@ -1,48 +0,0 @@
'''
Custom exceptions raised by pytz.
'''
__all__ = [
'UnknownTimeZoneError', 'InvalidTimeError', 'AmbiguousTimeError',
'NonExistentTimeError',
]
class UnknownTimeZoneError(KeyError):
'''Exception raised when pytz is passed an unknown timezone.
>>> isinstance(UnknownTimeZoneError(), LookupError)
True
This class is actually a subclass of KeyError to provide backwards
compatibility with code relying on the undocumented behavior of earlier
pytz releases.
>>> isinstance(UnknownTimeZoneError(), KeyError)
True
'''
pass
class InvalidTimeError(Exception):
'''Base class for invalid time exceptions.'''
class AmbiguousTimeError(InvalidTimeError):
'''Exception raised when attempting to create an ambiguous wallclock time.
At the end of a DST transition period, a particular wallclock time will
occur twice (once before the clocks are set back, once after). Both
possibilities may be correct, unless further information is supplied.
See DstTzInfo.normalize() for more info
'''
class NonExistentTimeError(InvalidTimeError):
'''Exception raised when attempting to create a wallclock time that
cannot exist.
At the start of a DST transition period, the wallclock time jumps forward.
The instants jumped over never occur.
'''

View file

@ -1,168 +0,0 @@
from threading import RLock
try:
from UserDict import DictMixin
except ImportError:
from collections import Mapping as DictMixin
# With lazy loading, we might end up with multiple threads triggering
# it at the same time. We need a lock.
_fill_lock = RLock()
class LazyDict(DictMixin):
"""Dictionary populated on first use."""
data = None
def __getitem__(self, key):
if self.data is None:
_fill_lock.acquire()
try:
if self.data is None:
self._fill()
finally:
_fill_lock.release()
return self.data[key.upper()]
def __contains__(self, key):
if self.data is None:
_fill_lock.acquire()
try:
if self.data is None:
self._fill()
finally:
_fill_lock.release()
return key in self.data
def __iter__(self):
if self.data is None:
_fill_lock.acquire()
try:
if self.data is None:
self._fill()
finally:
_fill_lock.release()
return iter(self.data)
def __len__(self):
if self.data is None:
_fill_lock.acquire()
try:
if self.data is None:
self._fill()
finally:
_fill_lock.release()
return len(self.data)
def keys(self):
if self.data is None:
_fill_lock.acquire()
try:
if self.data is None:
self._fill()
finally:
_fill_lock.release()
return self.data.keys()
class LazyList(list):
"""List populated on first use."""
_props = [
'__str__', '__repr__', '__unicode__',
'__hash__', '__sizeof__', '__cmp__',
'__lt__', '__le__', '__eq__', '__ne__', '__gt__', '__ge__',
'append', 'count', 'index', 'extend', 'insert', 'pop', 'remove',
'reverse', 'sort', '__add__', '__radd__', '__iadd__', '__mul__',
'__rmul__', '__imul__', '__contains__', '__len__', '__nonzero__',
'__getitem__', '__setitem__', '__delitem__', '__iter__',
'__reversed__', '__getslice__', '__setslice__', '__delslice__']
def __new__(cls, fill_iter=None):
if fill_iter is None:
return list()
# We need a new class as we will be dynamically messing with its
# methods.
class LazyList(list):
pass
fill_iter = [fill_iter]
def lazy(name):
def _lazy(self, *args, **kw):
_fill_lock.acquire()
try:
if len(fill_iter) > 0:
list.extend(self, fill_iter.pop())
for method_name in cls._props:
delattr(LazyList, method_name)
finally:
_fill_lock.release()
return getattr(list, name)(self, *args, **kw)
return _lazy
for name in cls._props:
setattr(LazyList, name, lazy(name))
new_list = LazyList()
return new_list
# Not all versions of Python declare the same magic methods.
# Filter out properties that don't exist in this version of Python
# from the list.
LazyList._props = [prop for prop in LazyList._props if hasattr(list, prop)]
class LazySet(set):
"""Set populated on first use."""
_props = (
'__str__', '__repr__', '__unicode__',
'__hash__', '__sizeof__', '__cmp__',
'__lt__', '__le__', '__eq__', '__ne__', '__gt__', '__ge__',
'__contains__', '__len__', '__nonzero__',
'__getitem__', '__setitem__', '__delitem__', '__iter__',
'__sub__', '__and__', '__xor__', '__or__',
'__rsub__', '__rand__', '__rxor__', '__ror__',
'__isub__', '__iand__', '__ixor__', '__ior__',
'add', 'clear', 'copy', 'difference', 'difference_update',
'discard', 'intersection', 'intersection_update', 'isdisjoint',
'issubset', 'issuperset', 'pop', 'remove',
'symmetric_difference', 'symmetric_difference_update',
'union', 'update')
def __new__(cls, fill_iter=None):
if fill_iter is None:
return set()
class LazySet(set):
pass
fill_iter = [fill_iter]
def lazy(name):
def _lazy(self, *args, **kw):
_fill_lock.acquire()
try:
if len(fill_iter) > 0:
for i in fill_iter.pop():
set.add(self, i)
for method_name in cls._props:
delattr(LazySet, method_name)
finally:
_fill_lock.release()
return getattr(set, name)(self, *args, **kw)
return _lazy
for name in cls._props:
setattr(LazySet, name, lazy(name))
new_set = LazySet()
return new_set
# Not all versions of Python declare the same magic methods.
# Filter out properties that don't exist in this version of Python
# from the list.
LazySet._props = [prop for prop in LazySet._props if hasattr(set, prop)]

View file

@ -1,127 +0,0 @@
'''
Reference tzinfo implementations from the Python docs.
Used for testing against as they are only correct for the years
1987 to 2006. Do not use these for real code.
'''
from datetime import tzinfo, timedelta, datetime
from pytz import utc, UTC, HOUR, ZERO
# A class building tzinfo objects for fixed-offset time zones.
# Note that FixedOffset(0, "UTC") is a different way to build a
# UTC tzinfo object.
class FixedOffset(tzinfo):
"""Fixed offset in minutes east from UTC."""
def __init__(self, offset, name):
self.__offset = timedelta(minutes = offset)
self.__name = name
def utcoffset(self, dt):
return self.__offset
def tzname(self, dt):
return self.__name
def dst(self, dt):
return ZERO
# A class capturing the platform's idea of local time.
import time as _time
STDOFFSET = timedelta(seconds = -_time.timezone)
if _time.daylight:
DSTOFFSET = timedelta(seconds = -_time.altzone)
else:
DSTOFFSET = STDOFFSET
DSTDIFF = DSTOFFSET - STDOFFSET
class LocalTimezone(tzinfo):
def utcoffset(self, dt):
if self._isdst(dt):
return DSTOFFSET
else:
return STDOFFSET
def dst(self, dt):
if self._isdst(dt):
return DSTDIFF
else:
return ZERO
def tzname(self, dt):
return _time.tzname[self._isdst(dt)]
def _isdst(self, dt):
tt = (dt.year, dt.month, dt.day,
dt.hour, dt.minute, dt.second,
dt.weekday(), 0, -1)
stamp = _time.mktime(tt)
tt = _time.localtime(stamp)
return tt.tm_isdst > 0
Local = LocalTimezone()
# A complete implementation of current DST rules for major US time zones.
def first_sunday_on_or_after(dt):
days_to_go = 6 - dt.weekday()
if days_to_go:
dt += timedelta(days_to_go)
return dt
# In the US, DST starts at 2am (standard time) on the first Sunday in April.
DSTSTART = datetime(1, 4, 1, 2)
# and ends at 2am (DST time; 1am standard time) on the last Sunday of Oct.
# which is the first Sunday on or after Oct 25.
DSTEND = datetime(1, 10, 25, 1)
class USTimeZone(tzinfo):
def __init__(self, hours, reprname, stdname, dstname):
self.stdoffset = timedelta(hours=hours)
self.reprname = reprname
self.stdname = stdname
self.dstname = dstname
def __repr__(self):
return self.reprname
def tzname(self, dt):
if self.dst(dt):
return self.dstname
else:
return self.stdname
def utcoffset(self, dt):
return self.stdoffset + self.dst(dt)
def dst(self, dt):
if dt is None or dt.tzinfo is None:
# An exception may be sensible here, in one or both cases.
# It depends on how you want to treat them. The default
# fromutc() implementation (called by the default astimezone()
# implementation) passes a datetime with dt.tzinfo is self.
return ZERO
assert dt.tzinfo is self
# Find first Sunday in April & the last in October.
start = first_sunday_on_or_after(DSTSTART.replace(year=dt.year))
end = first_sunday_on_or_after(DSTEND.replace(year=dt.year))
# Can't compare naive to aware objects, so strip the timezone from
# dt first.
if start <= dt.replace(tzinfo=None) < end:
return HOUR
else:
return ZERO
Eastern = USTimeZone(-5, "Eastern", "EST", "EDT")
Central = USTimeZone(-6, "Central", "CST", "CDT")
Mountain = USTimeZone(-7, "Mountain", "MST", "MDT")
Pacific = USTimeZone(-8, "Pacific", "PST", "PDT")

View file

@ -1,137 +0,0 @@
#!/usr/bin/env python
'''
$Id: tzfile.py,v 1.8 2004/06/03 00:15:24 zenzen Exp $
'''
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
from datetime import datetime, timedelta
from struct import unpack, calcsize
from pytz.tzinfo import StaticTzInfo, DstTzInfo, memorized_ttinfo
from pytz.tzinfo import memorized_datetime, memorized_timedelta
def _byte_string(s):
"""Cast a string or byte string to an ASCII byte string."""
return s.encode('US-ASCII')
_NULL = _byte_string('\0')
def _std_string(s):
"""Cast a string or byte string to an ASCII string."""
return str(s.decode('US-ASCII'))
def build_tzinfo(zone, fp):
head_fmt = '>4s c 15x 6l'
head_size = calcsize(head_fmt)
(magic, format, ttisgmtcnt, ttisstdcnt,leapcnt, timecnt,
typecnt, charcnt) = unpack(head_fmt, fp.read(head_size))
# Make sure it is a tzfile(5) file
assert magic == _byte_string('TZif'), 'Got magic %s' % repr(magic)
# Read out the transition times, localtime indices and ttinfo structures.
data_fmt = '>%(timecnt)dl %(timecnt)dB %(ttinfo)s %(charcnt)ds' % dict(
timecnt=timecnt, ttinfo='lBB'*typecnt, charcnt=charcnt)
data_size = calcsize(data_fmt)
data = unpack(data_fmt, fp.read(data_size))
# make sure we unpacked the right number of values
assert len(data) == 2 * timecnt + 3 * typecnt + 1
transitions = [memorized_datetime(trans)
for trans in data[:timecnt]]
lindexes = list(data[timecnt:2 * timecnt])
ttinfo_raw = data[2 * timecnt:-1]
tznames_raw = data[-1]
del data
# Process ttinfo into separate structs
ttinfo = []
tznames = {}
i = 0
while i < len(ttinfo_raw):
# have we looked up this timezone name yet?
tzname_offset = ttinfo_raw[i+2]
if tzname_offset not in tznames:
nul = tznames_raw.find(_NULL, tzname_offset)
if nul < 0:
nul = len(tznames_raw)
tznames[tzname_offset] = _std_string(
tznames_raw[tzname_offset:nul])
ttinfo.append((ttinfo_raw[i],
bool(ttinfo_raw[i+1]),
tznames[tzname_offset]))
i += 3
# Now build the timezone object
if len(transitions) == 0:
ttinfo[0][0], ttinfo[0][2]
cls = type(zone, (StaticTzInfo,), dict(
zone=zone,
_utcoffset=memorized_timedelta(ttinfo[0][0]),
_tzname=ttinfo[0][2]))
else:
# Early dates use the first standard time ttinfo
i = 0
while ttinfo[i][1]:
i += 1
if ttinfo[i] == ttinfo[lindexes[0]]:
transitions[0] = datetime.min
else:
transitions.insert(0, datetime.min)
lindexes.insert(0, i)
# calculate transition info
transition_info = []
for i in range(len(transitions)):
inf = ttinfo[lindexes[i]]
utcoffset = inf[0]
if not inf[1]:
dst = 0
else:
for j in range(i-1, -1, -1):
prev_inf = ttinfo[lindexes[j]]
if not prev_inf[1]:
break
dst = inf[0] - prev_inf[0] # dst offset
# Bad dst? Look further. DST > 24 hours happens when
# a timzone has moved across the international dateline.
if dst <= 0 or dst > 3600*3:
for j in range(i+1, len(transitions)):
stdinf = ttinfo[lindexes[j]]
if not stdinf[1]:
dst = inf[0] - stdinf[0]
if dst > 0:
break # Found a useful std time.
tzname = inf[2]
# Round utcoffset and dst to the nearest minute or the
# datetime library will complain. Conversions to these timezones
# might be up to plus or minus 30 seconds out, but it is
# the best we can do.
utcoffset = int((utcoffset + 30) // 60) * 60
dst = int((dst + 30) // 60) * 60
transition_info.append(memorized_ttinfo(utcoffset, dst, tzname))
cls = type(zone, (DstTzInfo,), dict(
zone=zone,
_utc_transition_times=transitions,
_transition_info=transition_info))
return cls()
if __name__ == '__main__':
import os.path
from pprint import pprint
base = os.path.join(os.path.dirname(__file__), 'zoneinfo')
tz = build_tzinfo('Australia/Melbourne',
open(os.path.join(base,'Australia','Melbourne'), 'rb'))
tz = build_tzinfo('US/Eastern',
open(os.path.join(base,'US','Eastern'), 'rb'))
pprint(tz._utc_transition_times)
#print tz.asPython(4)
#print tz.transitions_mapping

View file

@ -1,564 +0,0 @@
'''Base classes and helpers for building zone specific tzinfo classes'''
from datetime import datetime, timedelta, tzinfo
from bisect import bisect_right
try:
set
except NameError:
from sets import Set as set
import pytz
from pytz.exceptions import AmbiguousTimeError, NonExistentTimeError
__all__ = []
_timedelta_cache = {}
def memorized_timedelta(seconds):
'''Create only one instance of each distinct timedelta'''
try:
return _timedelta_cache[seconds]
except KeyError:
delta = timedelta(seconds=seconds)
_timedelta_cache[seconds] = delta
return delta
_epoch = datetime.utcfromtimestamp(0)
_datetime_cache = {0: _epoch}
def memorized_datetime(seconds):
'''Create only one instance of each distinct datetime'''
try:
return _datetime_cache[seconds]
except KeyError:
# NB. We can't just do datetime.utcfromtimestamp(seconds) as this
# fails with negative values under Windows (Bug #90096)
dt = _epoch + timedelta(seconds=seconds)
_datetime_cache[seconds] = dt
return dt
_ttinfo_cache = {}
def memorized_ttinfo(*args):
'''Create only one instance of each distinct tuple'''
try:
return _ttinfo_cache[args]
except KeyError:
ttinfo = (
memorized_timedelta(args[0]),
memorized_timedelta(args[1]),
args[2]
)
_ttinfo_cache[args] = ttinfo
return ttinfo
_notime = memorized_timedelta(0)
def _to_seconds(td):
'''Convert a timedelta to seconds'''
return td.seconds + td.days * 24 * 60 * 60
class BaseTzInfo(tzinfo):
# Overridden in subclass
_utcoffset = None
_tzname = None
zone = None
def __str__(self):
return self.zone
class StaticTzInfo(BaseTzInfo):
'''A timezone that has a constant offset from UTC
These timezones are rare, as most locations have changed their
offset at some point in their history
'''
def fromutc(self, dt):
'''See datetime.tzinfo.fromutc'''
if dt.tzinfo is not None and dt.tzinfo is not self:
raise ValueError('fromutc: dt.tzinfo is not self')
return (dt + self._utcoffset).replace(tzinfo=self)
def utcoffset(self, dt, is_dst=None):
'''See datetime.tzinfo.utcoffset
is_dst is ignored for StaticTzInfo, and exists only to
retain compatibility with DstTzInfo.
'''
return self._utcoffset
def dst(self, dt, is_dst=None):
'''See datetime.tzinfo.dst
is_dst is ignored for StaticTzInfo, and exists only to
retain compatibility with DstTzInfo.
'''
return _notime
def tzname(self, dt, is_dst=None):
'''See datetime.tzinfo.tzname
is_dst is ignored for StaticTzInfo, and exists only to
retain compatibility with DstTzInfo.
'''
return self._tzname
def localize(self, dt, is_dst=False):
'''Convert naive time to local time'''
if dt.tzinfo is not None:
raise ValueError('Not naive datetime (tzinfo is already set)')
return dt.replace(tzinfo=self)
def normalize(self, dt, is_dst=False):
'''Correct the timezone information on the given datetime.
This is normally a no-op, as StaticTzInfo timezones never have
ambiguous cases to correct:
>>> from pytz import timezone
>>> gmt = timezone('GMT')
>>> isinstance(gmt, StaticTzInfo)
True
>>> dt = datetime(2011, 5, 8, 1, 2, 3, tzinfo=gmt)
>>> gmt.normalize(dt) is dt
True
The supported method of converting between timezones is to use
datetime.astimezone(). Currently normalize() also works:
>>> la = timezone('America/Los_Angeles')
>>> dt = la.localize(datetime(2011, 5, 7, 1, 2, 3))
>>> fmt = '%Y-%m-%d %H:%M:%S %Z (%z)'
>>> gmt.normalize(dt).strftime(fmt)
'2011-05-07 08:02:03 GMT (+0000)'
'''
if dt.tzinfo is self:
return dt
if dt.tzinfo is None:
raise ValueError('Naive time - no tzinfo set')
return dt.astimezone(self)
def __repr__(self):
return '<StaticTzInfo %r>' % (self.zone,)
def __reduce__(self):
# Special pickle to zone remains a singleton and to cope with
# database changes.
return pytz._p, (self.zone,)
class DstTzInfo(BaseTzInfo):
'''A timezone that has a variable offset from UTC
The offset might change if daylight saving time comes into effect,
or at a point in history when the region decides to change their
timezone definition.
'''
# Overridden in subclass
_utc_transition_times = None # Sorted list of DST transition times in UTC
_transition_info = None # [(utcoffset, dstoffset, tzname)] corresponding
# to _utc_transition_times entries
zone = None
# Set in __init__
_tzinfos = None
_dst = None # DST offset
def __init__(self, _inf=None, _tzinfos=None):
if _inf:
self._tzinfos = _tzinfos
self._utcoffset, self._dst, self._tzname = _inf
else:
_tzinfos = {}
self._tzinfos = _tzinfos
self._utcoffset, self._dst, self._tzname = self._transition_info[0]
_tzinfos[self._transition_info[0]] = self
for inf in self._transition_info[1:]:
if inf not in _tzinfos:
_tzinfos[inf] = self.__class__(inf, _tzinfos)
def fromutc(self, dt):
'''See datetime.tzinfo.fromutc'''
if (dt.tzinfo is not None
and getattr(dt.tzinfo, '_tzinfos', None) is not self._tzinfos):
raise ValueError('fromutc: dt.tzinfo is not self')
dt = dt.replace(tzinfo=None)
idx = max(0, bisect_right(self._utc_transition_times, dt) - 1)
inf = self._transition_info[idx]
return (dt + inf[0]).replace(tzinfo=self._tzinfos[inf])
def normalize(self, dt):
'''Correct the timezone information on the given datetime
If date arithmetic crosses DST boundaries, the tzinfo
is not magically adjusted. This method normalizes the
tzinfo to the correct one.
To test, first we need to do some setup
>>> from pytz import timezone
>>> utc = timezone('UTC')
>>> eastern = timezone('US/Eastern')
>>> fmt = '%Y-%m-%d %H:%M:%S %Z (%z)'
We next create a datetime right on an end-of-DST transition point,
the instant when the wallclocks are wound back one hour.
>>> utc_dt = datetime(2002, 10, 27, 6, 0, 0, tzinfo=utc)
>>> loc_dt = utc_dt.astimezone(eastern)
>>> loc_dt.strftime(fmt)
'2002-10-27 01:00:00 EST (-0500)'
Now, if we subtract a few minutes from it, note that the timezone
information has not changed.
>>> before = loc_dt - timedelta(minutes=10)
>>> before.strftime(fmt)
'2002-10-27 00:50:00 EST (-0500)'
But we can fix that by calling the normalize method
>>> before = eastern.normalize(before)
>>> before.strftime(fmt)
'2002-10-27 01:50:00 EDT (-0400)'
The supported method of converting between timezones is to use
datetime.astimezone(). Currently, normalize() also works:
>>> th = timezone('Asia/Bangkok')
>>> am = timezone('Europe/Amsterdam')
>>> dt = th.localize(datetime(2011, 5, 7, 1, 2, 3))
>>> fmt = '%Y-%m-%d %H:%M:%S %Z (%z)'
>>> am.normalize(dt).strftime(fmt)
'2011-05-06 20:02:03 CEST (+0200)'
'''
if dt.tzinfo is None:
raise ValueError('Naive time - no tzinfo set')
# Convert dt in localtime to UTC
offset = dt.tzinfo._utcoffset
dt = dt.replace(tzinfo=None)
dt = dt - offset
# convert it back, and return it
return self.fromutc(dt)
def localize(self, dt, is_dst=False):
'''Convert naive time to local time.
This method should be used to construct localtimes, rather
than passing a tzinfo argument to a datetime constructor.
is_dst is used to determine the correct timezone in the ambigous
period at the end of daylight saving time.
>>> from pytz import timezone
>>> fmt = '%Y-%m-%d %H:%M:%S %Z (%z)'
>>> amdam = timezone('Europe/Amsterdam')
>>> dt = datetime(2004, 10, 31, 2, 0, 0)
>>> loc_dt1 = amdam.localize(dt, is_dst=True)
>>> loc_dt2 = amdam.localize(dt, is_dst=False)
>>> loc_dt1.strftime(fmt)
'2004-10-31 02:00:00 CEST (+0200)'
>>> loc_dt2.strftime(fmt)
'2004-10-31 02:00:00 CET (+0100)'
>>> str(loc_dt2 - loc_dt1)
'1:00:00'
Use is_dst=None to raise an AmbiguousTimeError for ambiguous
times at the end of daylight saving time
>>> try:
... loc_dt1 = amdam.localize(dt, is_dst=None)
... except AmbiguousTimeError:
... print('Ambiguous')
Ambiguous
is_dst defaults to False
>>> amdam.localize(dt) == amdam.localize(dt, False)
True
is_dst is also used to determine the correct timezone in the
wallclock times jumped over at the start of daylight saving time.
>>> pacific = timezone('US/Pacific')
>>> dt = datetime(2008, 3, 9, 2, 0, 0)
>>> ploc_dt1 = pacific.localize(dt, is_dst=True)
>>> ploc_dt2 = pacific.localize(dt, is_dst=False)
>>> ploc_dt1.strftime(fmt)
'2008-03-09 02:00:00 PDT (-0700)'
>>> ploc_dt2.strftime(fmt)
'2008-03-09 02:00:00 PST (-0800)'
>>> str(ploc_dt2 - ploc_dt1)
'1:00:00'
Use is_dst=None to raise a NonExistentTimeError for these skipped
times.
>>> try:
... loc_dt1 = pacific.localize(dt, is_dst=None)
... except NonExistentTimeError:
... print('Non-existent')
Non-existent
'''
if dt.tzinfo is not None:
raise ValueError('Not naive datetime (tzinfo is already set)')
# Find the two best possibilities.
possible_loc_dt = set()
for delta in [timedelta(days=-1), timedelta(days=1)]:
loc_dt = dt + delta
idx = max(0, bisect_right(
self._utc_transition_times, loc_dt) - 1)
inf = self._transition_info[idx]
tzinfo = self._tzinfos[inf]
loc_dt = tzinfo.normalize(dt.replace(tzinfo=tzinfo))
if loc_dt.replace(tzinfo=None) == dt:
possible_loc_dt.add(loc_dt)
if len(possible_loc_dt) == 1:
return possible_loc_dt.pop()
# If there are no possibly correct timezones, we are attempting
# to convert a time that never happened - the time period jumped
# during the start-of-DST transition period.
if len(possible_loc_dt) == 0:
# If we refuse to guess, raise an exception.
if is_dst is None:
raise NonExistentTimeError(dt)
# If we are forcing the pre-DST side of the DST transition, we
# obtain the correct timezone by winding the clock forward a few
# hours.
elif is_dst:
return self.localize(
dt + timedelta(hours=6), is_dst=True) - timedelta(hours=6)
# If we are forcing the post-DST side of the DST transition, we
# obtain the correct timezone by winding the clock back.
else:
return self.localize(
dt - timedelta(hours=6), is_dst=False) + timedelta(hours=6)
# If we get this far, we have multiple possible timezones - this
# is an ambiguous case occuring during the end-of-DST transition.
# If told to be strict, raise an exception since we have an
# ambiguous case
if is_dst is None:
raise AmbiguousTimeError(dt)
# Filter out the possiblilities that don't match the requested
# is_dst
filtered_possible_loc_dt = [
p for p in possible_loc_dt
if bool(p.tzinfo._dst) == is_dst
]
# Hopefully we only have one possibility left. Return it.
if len(filtered_possible_loc_dt) == 1:
return filtered_possible_loc_dt[0]
if len(filtered_possible_loc_dt) == 0:
filtered_possible_loc_dt = list(possible_loc_dt)
# If we get this far, we have in a wierd timezone transition
# where the clocks have been wound back but is_dst is the same
# in both (eg. Europe/Warsaw 1915 when they switched to CET).
# At this point, we just have to guess unless we allow more
# hints to be passed in (such as the UTC offset or abbreviation),
# but that is just getting silly.
#
# Choose the earliest (by UTC) applicable timezone if is_dst=True
# Choose the latest (by UTC) applicable timezone if is_dst=False
# i.e., behave like end-of-DST transition
dates = {} # utc -> local
for local_dt in filtered_possible_loc_dt:
utc_time = local_dt.replace(tzinfo=None) - local_dt.tzinfo._utcoffset
assert utc_time not in dates
dates[utc_time] = local_dt
return dates[[min, max][not is_dst](dates)]
def utcoffset(self, dt, is_dst=None):
'''See datetime.tzinfo.utcoffset
The is_dst parameter may be used to remove ambiguity during DST
transitions.
>>> from pytz import timezone
>>> tz = timezone('America/St_Johns')
>>> ambiguous = datetime(2009, 10, 31, 23, 30)
>>> tz.utcoffset(ambiguous, is_dst=False)
datetime.timedelta(-1, 73800)
>>> tz.utcoffset(ambiguous, is_dst=True)
datetime.timedelta(-1, 77400)
>>> try:
... tz.utcoffset(ambiguous)
... except AmbiguousTimeError:
... print('Ambiguous')
Ambiguous
'''
if dt is None:
return None
elif dt.tzinfo is not self:
dt = self.localize(dt, is_dst)
return dt.tzinfo._utcoffset
else:
return self._utcoffset
def dst(self, dt, is_dst=None):
'''See datetime.tzinfo.dst
The is_dst parameter may be used to remove ambiguity during DST
transitions.
>>> from pytz import timezone
>>> tz = timezone('America/St_Johns')
>>> normal = datetime(2009, 9, 1)
>>> tz.dst(normal)
datetime.timedelta(0, 3600)
>>> tz.dst(normal, is_dst=False)
datetime.timedelta(0, 3600)
>>> tz.dst(normal, is_dst=True)
datetime.timedelta(0, 3600)
>>> ambiguous = datetime(2009, 10, 31, 23, 30)
>>> tz.dst(ambiguous, is_dst=False)
datetime.timedelta(0)
>>> tz.dst(ambiguous, is_dst=True)
datetime.timedelta(0, 3600)
>>> try:
... tz.dst(ambiguous)
... except AmbiguousTimeError:
... print('Ambiguous')
Ambiguous
'''
if dt is None:
return None
elif dt.tzinfo is not self:
dt = self.localize(dt, is_dst)
return dt.tzinfo._dst
else:
return self._dst
def tzname(self, dt, is_dst=None):
'''See datetime.tzinfo.tzname
The is_dst parameter may be used to remove ambiguity during DST
transitions.
>>> from pytz import timezone
>>> tz = timezone('America/St_Johns')
>>> normal = datetime(2009, 9, 1)
>>> tz.tzname(normal)
'NDT'
>>> tz.tzname(normal, is_dst=False)
'NDT'
>>> tz.tzname(normal, is_dst=True)
'NDT'
>>> ambiguous = datetime(2009, 10, 31, 23, 30)
>>> tz.tzname(ambiguous, is_dst=False)
'NST'
>>> tz.tzname(ambiguous, is_dst=True)
'NDT'
>>> try:
... tz.tzname(ambiguous)
... except AmbiguousTimeError:
... print('Ambiguous')
Ambiguous
'''
if dt is None:
return self.zone
elif dt.tzinfo is not self:
dt = self.localize(dt, is_dst)
return dt.tzinfo._tzname
else:
return self._tzname
def __repr__(self):
if self._dst:
dst = 'DST'
else:
dst = 'STD'
if self._utcoffset > _notime:
return '<DstTzInfo %r %s+%s %s>' % (
self.zone, self._tzname, self._utcoffset, dst
)
else:
return '<DstTzInfo %r %s%s %s>' % (
self.zone, self._tzname, self._utcoffset, dst
)
def __reduce__(self):
# Special pickle to zone remains a singleton and to cope with
# database changes.
return pytz._p, (
self.zone,
_to_seconds(self._utcoffset),
_to_seconds(self._dst),
self._tzname
)
def unpickler(zone, utcoffset=None, dstoffset=None, tzname=None):
"""Factory function for unpickling pytz tzinfo instances.
This is shared for both StaticTzInfo and DstTzInfo instances, because
database changes could cause a zones implementation to switch between
these two base classes and we can't break pickles on a pytz version
upgrade.
"""
# Raises a KeyError if zone no longer exists, which should never happen
# and would be a bug.
tz = pytz.timezone(zone)
# A StaticTzInfo - just return it
if utcoffset is None:
return tz
# This pickle was created from a DstTzInfo. We need to
# determine which of the list of tzinfo instances for this zone
# to use in order to restore the state of any datetime instances using
# it correctly.
utcoffset = memorized_timedelta(utcoffset)
dstoffset = memorized_timedelta(dstoffset)
try:
return tz._tzinfos[(utcoffset, dstoffset, tzname)]
except KeyError:
# The particular state requested in this timezone no longer exists.
# This indicates a corrupt pickle, or the timezone database has been
# corrected violently enough to make this particular
# (utcoffset,dstoffset) no longer exist in the zone, or the
# abbreviation has been changed.
pass
# See if we can find an entry differing only by tzname. Abbreviations
# get changed from the initial guess by the database maintainers to
# match reality when this information is discovered.
for localized_tz in tz._tzinfos.values():
if (localized_tz._utcoffset == utcoffset
and localized_tz._dst == dstoffset):
return localized_tz
# This (utcoffset, dstoffset) information has been removed from the
# zone. Add it back. This might occur when the database maintainers have
# corrected incorrect information. datetime instances using this
# incorrect information will continue to do so, exactly as they were
# before being pickled. This is purely an overly paranoid safety net - I
# doubt this will ever been needed in real life.
inf = (utcoffset, dstoffset, tzname)
tz._tzinfos[inf] = tz.__class__(inf, tz._tzinfos)
return tz._tzinfos[inf]

Some files were not shown because too many files have changed in this diff Show more