#!/usr/bin/env python # Copyright 2002 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS-IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Set of classes and functions for dealing with dates and timestamps. The BaseTimestamp and Timestamp are timezone-aware wrappers around Python datetime.datetime class. """ import calendar import copy import datetime import re import sys import time import types import warnings import dateutil.parser import pytz _MICROSECONDS_PER_SECOND = 1000000 _MICROSECONDS_PER_SECOND_F = float(_MICROSECONDS_PER_SECOND) def SecondsToMicroseconds(seconds): """Convert seconds to microseconds. Args: seconds: number Returns: microseconds """ return seconds * _MICROSECONDS_PER_SECOND def MicrosecondsToSeconds(microseconds): """Convert microseconds to seconds. Args: microseconds: A number representing some duration of time measured in microseconds. Returns: A number representing the same duration of time measured in seconds. """ return microseconds / _MICROSECONDS_PER_SECOND_F def _GetCurrentTimeMicros(): """Get the current time in microseconds, in UTC. Returns: The number of microseconds since the epoch. """ return int(SecondsToMicroseconds(time.time())) def GetSecondsSinceEpoch(time_tuple): """Convert time_tuple (in UTC) to seconds (also in UTC). Args: time_tuple: tuple with at least 6 items. Returns: seconds. """ return calendar.timegm(time_tuple[:6] + (0, 0, 0)) def GetTimeMicros(time_tuple): """Get a time in microseconds. Arguments: time_tuple: A (year, month, day, hour, minute, second) tuple (the python time tuple format) in the UTC time zone. Returns: The number of microseconds since the epoch represented by the input tuple. """ return int(SecondsToMicroseconds(GetSecondsSinceEpoch(time_tuple))) def DatetimeToUTCMicros(date): """Converts a datetime object to microseconds since the epoch in UTC. Args: date: A datetime to convert. Returns: The number of microseconds since the epoch, in UTC, represented by the input datetime. """ # Using this guide: http://wiki.python.org/moin/WorkingWithTime # And this conversion guide: http://docs.python.org/library/time.html # Turn the date parameter into a tuple (struct_time) that can then be # manipulated into a long value of seconds. During the conversion from # struct_time to long, the source date in UTC, and so it follows that the # correct transformation is calendar.timegm() micros = calendar.timegm(date.utctimetuple()) * _MICROSECONDS_PER_SECOND return micros + date.microsecond def DatetimeToUTCMillis(date): """Converts a datetime object to milliseconds since the epoch in UTC. Args: date: A datetime to convert. Returns: The number of milliseconds since the epoch, in UTC, represented by the input datetime. """ return DatetimeToUTCMicros(date) / 1000 def UTCMicrosToDatetime(micros, tz=None): """Converts a microsecond epoch time to a datetime object. Args: micros: A UTC time, expressed in microseconds since the epoch. tz: The desired tzinfo for the datetime object. If None, the datetime will be naive. Returns: The datetime represented by the input value. """ # The conversion from micros to seconds for input into the # utcfromtimestamp function needs to be done as a float to make sure # we dont lose the sub-second resolution of the input time. dt = datetime.datetime.utcfromtimestamp( micros / _MICROSECONDS_PER_SECOND_F) if tz is not None: dt = tz.fromutc(dt) return dt def UTCMillisToDatetime(millis, tz=None): """Converts a millisecond epoch time to a datetime object. Args: millis: A UTC time, expressed in milliseconds since the epoch. tz: The desired tzinfo for the datetime object. If None, the datetime will be naive. Returns: The datetime represented by the input value. """ return UTCMicrosToDatetime(millis * 1000, tz) UTC = pytz.UTC US_PACIFIC = pytz.timezone('US/Pacific') class TimestampError(ValueError): """Generic timestamp-related error.""" pass class TimezoneNotSpecifiedError(TimestampError): """This error is raised when timezone is not specified.""" pass class TimeParseError(TimestampError): """This error is raised when we can't parse the input.""" pass # TODO(user): this class needs to handle daylight better class LocalTimezoneClass(datetime.tzinfo): """This class defines local timezone.""" ZERO = datetime.timedelta(0) HOUR = datetime.timedelta(hours=1) STDOFFSET = datetime.timedelta(seconds=-time.timezone) if time.daylight: DSTOFFSET = datetime.timedelta(seconds=-time.altzone) else: DSTOFFSET = STDOFFSET DSTDIFF = DSTOFFSET - STDOFFSET def utcoffset(self, dt): """datetime -> minutes east of UTC (negative for west of UTC).""" if self._isdst(dt): return self.DSTOFFSET else: return self.STDOFFSET def dst(self, dt): """datetime -> DST offset in minutes east of UTC.""" if self._isdst(dt): return self.DSTDIFF else: return self.ZERO def tzname(self, dt): """datetime -> string name of time zone.""" return time.tzname[self._isdst(dt)] def _isdst(self, dt): """Return true if given datetime is within local DST.""" tt = (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, dt.weekday(), 0, -1) stamp = time.mktime(tt) tt = time.localtime(stamp) return tt.tm_isdst > 0 def __repr__(self): """Return string ''.""" return '' def localize(self, dt, unused_is_dst=False): """Convert naive time to local time.""" if dt.tzinfo is not None: raise ValueError('Not naive datetime (tzinfo is already set)') return dt.replace(tzinfo=self) def normalize(self, dt, unused_is_dst=False): """Correct the timezone information on the given datetime.""" if dt.tzinfo is None: raise ValueError('Naive time - no tzinfo set') return dt.replace(tzinfo=self) LocalTimezone = LocalTimezoneClass() class BaseTimestamp(datetime.datetime): """Our kind of wrapper over datetime.datetime. The objects produced by methods now, today, fromtimestamp, utcnow, utcfromtimestamp are timezone-aware (with correct timezone). We also overload __add__ and __sub__ method, to fix the result of arithmetic operations. """ LocalTimezone = LocalTimezone @classmethod def AddLocalTimezone(cls, obj): """If obj is naive, add local timezone to it.""" if not obj.tzinfo: return obj.replace(tzinfo=cls.LocalTimezone) return obj @classmethod def Localize(cls, obj): """If obj is naive, localize it to cls.LocalTimezone.""" if not obj.tzinfo: return cls.LocalTimezone.localize(obj) return obj def __add__(self, *args, **kwargs): """x.__add__(y) <==> x+y.""" r = super(BaseTimestamp, self).__add__(*args, **kwargs) return type(self)(r.year, r.month, r.day, r.hour, r.minute, r.second, r.microsecond, r.tzinfo) def __sub__(self, *args, **kwargs): """x.__add__(y) <==> x-y.""" r = super(BaseTimestamp, self).__sub__(*args, **kwargs) if isinstance(r, datetime.datetime): return type(self)(r.year, r.month, r.day, r.hour, r.minute, r.second, r.microsecond, r.tzinfo) return r @classmethod def now(cls, *args, **kwargs): """Get a timestamp corresponding to right now. Args: args: Positional arguments to pass to datetime.datetime.now(). kwargs: Keyword arguments to pass to datetime.datetime.now(). If tz is not specified, local timezone is assumed. Returns: A new BaseTimestamp with tz's local day and time. """ return cls.AddLocalTimezone( super(BaseTimestamp, cls).now(*args, **kwargs)) @classmethod def today(cls): """Current BaseTimestamp. Same as self.__class__.fromtimestamp(time.time()). Returns: New self.__class__. """ return cls.AddLocalTimezone(super(BaseTimestamp, cls).today()) @classmethod def fromtimestamp(cls, *args, **kwargs): """Get a new localized timestamp from a POSIX timestamp. Args: args: Positional arguments to pass to datetime.datetime.fromtimestamp(). kwargs: Keyword arguments to pass to datetime.datetime.fromtimestamp(). If tz is not specified, local timezone is assumed. Returns: A new BaseTimestamp with tz's local day and time. """ return cls.Localize( super(BaseTimestamp, cls).fromtimestamp(*args, **kwargs)) @classmethod def utcnow(cls): """Return a new BaseTimestamp representing UTC day and time.""" return super(BaseTimestamp, cls).utcnow().replace(tzinfo=pytz.utc) @classmethod def utcfromtimestamp(cls, *args, **kwargs): """timestamp -> UTC datetime from a POSIX timestamp (like time.time()).""" return super(BaseTimestamp, cls).utcfromtimestamp( *args, **kwargs).replace(tzinfo=pytz.utc) @classmethod def strptime(cls, date_string, format, tz=None): """Parse date_string according to format and construct BaseTimestamp. Args: date_string: string passed to time.strptime. format: format string passed to time.strptime. tz: if not specified, local timezone assumed. Returns: New BaseTimestamp. """ date_time = super(BaseTimestamp, cls).strptime(date_string, format) return (tz.localize if tz else cls.Localize)(date_time) def astimezone(self, *args, **kwargs): """tz -> convert to time in new timezone tz.""" r = super(BaseTimestamp, self).astimezone(*args, **kwargs) return type(self)(r.year, r.month, r.day, r.hour, r.minute, r.second, r.microsecond, r.tzinfo) @classmethod def FromMicroTimestamp(cls, ts): """Create new Timestamp object from microsecond UTC timestamp value. Args: ts: integer microsecond UTC timestamp Returns: New cls() """ return cls.utcfromtimestamp(ts/_MICROSECONDS_PER_SECOND_F) def AsSecondsSinceEpoch(self): """Return number of seconds since epoch (timestamp in seconds).""" return GetSecondsSinceEpoch(self.utctimetuple()) def AsMicroTimestamp(self): """Return microsecond timestamp constructed from this object.""" return (SecondsToMicroseconds(self.AsSecondsSinceEpoch()) + self.microsecond) @classmethod def combine(cls, datepart, timepart, tz=None): """Combine date and time into timestamp, timezone-aware. Args: datepart: datetime.date timepart: datetime.time tz: timezone or None Returns: timestamp object """ result = super(BaseTimestamp, cls).combine(datepart, timepart) if tz: result = tz.localize(result) return result # Conversions from interval suffixes to number of seconds. # (m => 60s, d => 86400s, etc) _INTERVAL_CONV_DICT = {'s': 1} _INTERVAL_CONV_DICT['m'] = 60 * _INTERVAL_CONV_DICT['s'] _INTERVAL_CONV_DICT['h'] = 60 * _INTERVAL_CONV_DICT['m'] _INTERVAL_CONV_DICT['d'] = 24 * _INTERVAL_CONV_DICT['h'] _INTERVAL_CONV_DICT['D'] = _INTERVAL_CONV_DICT['d'] _INTERVAL_CONV_DICT['w'] = 7 * _INTERVAL_CONV_DICT['d'] _INTERVAL_CONV_DICT['W'] = _INTERVAL_CONV_DICT['w'] _INTERVAL_CONV_DICT['M'] = 30 * _INTERVAL_CONV_DICT['d'] _INTERVAL_CONV_DICT['Y'] = 365 * _INTERVAL_CONV_DICT['d'] _INTERVAL_REGEXP = re.compile('^([0-9]+)([%s])?' % ''.join(_INTERVAL_CONV_DICT)) def ConvertIntervalToSeconds(interval): """Convert a formatted string representing an interval into seconds. Args: interval: String to interpret as an interval. A basic interval looks like "". Complex intervals consisting of a chain of basic intervals are also allowed. Returns: An integer representing the number of seconds represented by the interval string, or None if the interval string could not be decoded. """ total = 0 while interval: match = _INTERVAL_REGEXP.match(interval) if not match: return None try: num = int(match.group(1)) except ValueError: return None suffix = match.group(2) if suffix: multiplier = _INTERVAL_CONV_DICT.get(suffix) if not multiplier: return None num *= multiplier total += num interval = interval[match.end(0):] return total class Timestamp(BaseTimestamp): """This subclass contains methods to parse W3C and interval date spec. The interval date specification is in the form "1D", where "D" can be "s"econds "m"inutes "h"ours "D"ays "W"eeks "M"onths "Y"ears. """ INTERVAL_CONV_DICT = _INTERVAL_CONV_DICT INTERVAL_REGEXP = _INTERVAL_REGEXP @classmethod def _StringToTime(cls, timestring, tz=None): """Use dateutil.parser to convert string into timestamp. dateutil.parser understands ISO8601 which is really handy. Args: timestring: string with datetime tz: optional timezone, if timezone is omitted from timestring. Returns: New Timestamp or None if unable to parse the timestring. """ try: r = dateutil.parser.parse(timestring) # dateutil will raise ValueError if it's an unknown format -- or # TypeError in some cases, due to bugs. except (TypeError, ValueError): return None if not r.tzinfo: r = (tz or cls.LocalTimezone).localize(r) result = cls(r.year, r.month, r.day, r.hour, r.minute, r.second, r.microsecond, r.tzinfo) return result @classmethod def _IntStringToInterval(cls, timestring): """Parse interval date specification and create a timedelta object. Args: timestring: string interval. Returns: A datetime.timedelta representing the specified interval or None if unable to parse the timestring. """ seconds = ConvertIntervalToSeconds(timestring) return datetime.timedelta(seconds=seconds) if seconds else None @classmethod def FromString(cls, value, tz=None): """Create a Timestamp from a string. Args: value: String interval or datetime. e.g. "2013-01-05 13:00:00" or "1d" tz: optional timezone, if timezone is omitted from timestring. Returns: A new Timestamp. Raises: TimeParseError if unable to parse value. """ result = cls._StringToTime(value, tz=tz) if result: return result result = cls._IntStringToInterval(value) if result: return cls.utcnow() - result raise TimeParseError(value) # What's written below is a clear python bug. I mean, okay, I can apply # negative timezone to it and end result will be inconversible. MAXIMUM_PYTHON_TIMESTAMP = Timestamp( 9999, 12, 31, 23, 59, 59, 999999, UTC) # This is also a bug. It is called 32bit time_t. I hate it. # This is fixed in 2.5, btw. MAXIMUM_MICROSECOND_TIMESTAMP = 0x80000000 * _MICROSECONDS_PER_SECOND - 1 MAXIMUM_MICROSECOND_TIMESTAMP_AS_TS = Timestamp(2038, 1, 19, 3, 14, 7, 999999)