# -*- coding: utf-8 -*-
# wasp_general/task/scheduler/task_source.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general. If not, see <http://www.gnu.org/licenses/>.
# TODO: document the code
# TODO: write tests for the code
# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__
from datetime import datetime, timedelta, MAXYEAR, timezone
from calendar import monthrange, weekday
from wasp_general.verify import verify_type, verify_value
from wasp_general.datetime import utc_datetime
from wasp_general.thread import WCriticalResource
from wasp_general.task.scheduler.proto import WTaskSourceProto, WScheduleRecord, WScheduleTask, WSchedulerServiceProto
# noinspection PyAbstractClass
[docs]class WBasicTaskSource(WTaskSourceProto):
@verify_type(scheduler=WSchedulerServiceProto)
def __init__(self, scheduler_service):
self.__service = scheduler_service
[docs] def scheduler_service(self):
return self.__service
[docs]class WCronSchedule:
__calendar__ = {1: 31, 2: 29, 3: 31, 4: 30, 5: 31, 6: 30, 7: 31, 8: 31, 9: 30, 10: 31, 11: 30, 12: 31}
@verify_type(start_datetime=(datetime, None), minute=(int, None), hour=(int, None), day_of_month=(int, None))
@verify_type(day_of_week=(int, None), month=(int, None))
@verify_value(minute=lambda x: x is None or (0 <= x <= 59))
@verify_value(hour=lambda x: x is None or (0 <= x <= 23))
@verify_value(day_of_month=lambda x: x is None or (1 <= x <= 31))
@verify_value(day_of_week=lambda x: x is None or (1 <= x <= 7))
@verify_value(month=lambda x: x is None or (1 <= x <= 12))
def __init__(self, start_datetime=None, minute=None, hour=None, day_of_month=None, day_of_week=None, month=None):
if month is not None and day_of_month is not None and day_of_month > WCronSchedule.__calendar__[month]:
raise ValueError('Invalid day and month combination')
self.__start_datetime = start_datetime if start_datetime is not None else self.now()
self.__next_start = None
self.__minute = minute
self.__hour = hour
self.__day_of_month = day_of_month
self.__day_of_week = day_of_week
self.__month = month
self.update()
[docs] def start_datetime(self):
return self.__start_datetime
@verify_type(start_datetime=datetime)
def _set_start_datetime(self, start_datetime):
self.__start_datetime = start_datetime
self.__next_start = None
@verify_type('paranoid', year=int, month=int, day=int, hour=int, minute=int)
@verify_value('paranoid', year=lambda x: x > 0)
@verify_value('paranoid', month=lambda x: x is None or (1 <= x <= 12))
@verify_value('paranoid', day=lambda x: x is None or (1 <= x <= 31))
@verify_value('paranoid', hour=lambda x: x is None or (0 <= x <= 23))
@verify_value('paranoid', minute=lambda x: x is None or (0 <= x <= 59))
def _datetime(self, year, month, day, hour, minute):
return datetime(year, month, day, hour, minute)
[docs] def next_start(self):
return self.__next_start
[docs] def minute(self):
return self.__minute
[docs] def hour(self):
return self.__hour
[docs] def day_of_month(self):
return self.__day_of_month
[docs] def day_of_week(self):
return self.__day_of_week
[docs] def month(self):
return self.__month
[docs] def no_frequency(self):
result = self.minute() is None and self.hour() is None and self.day_of_month() is None
return result is True and self.day_of_week() is None and self.month() is None
[docs] def final_datetime(self):
return datetime(MAXYEAR, 12, 31, 23, 59)
[docs] @classmethod
def now(cls):
return datetime.now()
[docs] def update(self):
if self.no_frequency() is True:
return
for (year, month) in self.month_iterator():
for day in self.day_iterator(year, month):
for (hour, minute) in self.time_iterator(year, month, day):
self.__next_start = self._datetime(year, month, day, hour, minute)
return
[docs] @verify_type(omit_skipped=bool)
def complete(self, omit_skipped=True):
if self.no_frequency() is True:
return
next_start = self.next_start()
seconds_last_minute = next_start.second
seconds_last_hour = (next_start.minute * 60) + seconds_last_minute
seconds_last_day = (next_start.hour * 24 * 60 * 60) + seconds_last_hour
time_shift = 0
if self.month() is not None:
year = next_start.year
month = next_start.month
if month < 12:
month += 1
else:
year += 1
month = 1
time_shift = (self._datetime(year, month, 1, 0, 0) - next_start).total_seconds()
elif self.day_of_week() is not None:
time_shift = (7 * 24 * 60 * 60) - seconds_last_day
elif self.day_of_month() is not None:
time_shift = (24 * 60 * 60) - seconds_last_day
elif self.hour() is not None:
time_shift = (60 * 60) - seconds_last_hour
elif self.minute() is not None:
time_shift = 60 - seconds_last_minute
time_shift += 1
new_start = next_start + timedelta(seconds=time_shift)
if omit_skipped is True:
now = self.now()
if now > new_start:
new_start = now
self._set_start_datetime(new_start)
self.update()
[docs] def month_iterator(self):
if self.no_frequency():
return
start_datetime = self.start_datetime()
start_year = start_datetime.year
start_month = start_datetime.month
final_datetime = self.final_datetime()
max_year = final_datetime.year
max_month = final_datetime.month
month = self.month()
year = start_year
if month is not None:
if month < start_month:
year += 1
if year > max_year or (year == max_year and month > max_month):
return
while year < max_year or (year == max_year and month < max_month):
yield ((year, month))
year += 1
else: # month is None
month = start_month
while year < max_year or (year == max_year and month < max_month):
yield ((year, month))
if month < 12:
month += 1
else:
year += 1
month = 1
[docs] def day_iterator(self, year, month):
if self.no_frequency():
return
start_datetime = self.start_datetime()
days_in_month = monthrange(year, month)[1]
day_of_month = self.day_of_month()
day_of_week = self.day_of_week()
final_datetime = self.final_datetime()
if year == final_datetime.year and month == final_datetime.month:
days_in_month = final_datetime.day
start_day = start_datetime.day
if year != start_datetime.year or month != start_datetime.month:
start_day = 1
if day_of_month is None and day_of_week is None:
day = start_day
while day <= days_in_month:
yield day
day += 1
elif day_of_month is not None and day_of_week is None:
if start_day <= days_in_month:
yield day_of_month
elif day_of_month is not None and day_of_week is not None:
if start_day <= days_in_month and weekday(year, month, day_of_month) == day_of_week:
yield day_of_month
else: # day_of_month is None and day_of_week is not None
current_weekday = weekday(year, month, start_day)
delta = day_of_week - current_weekday
if delta < 0:
delta += 7
day = start_day + delta
while day <= days_in_month:
yield day
day += 7
[docs] def time_iterator(self, year, month, day):
start_datetime = self.start_datetime()
max_hour = 23
max_minute = 59
hour = self.hour()
minute = self.minute()
final_datetime = self.final_datetime()
if year == final_datetime.year and month == final_datetime.month and day == self.final_datetime():
max_hour = final_datetime.hour
max_minute = final_datetime.minute
start_hour = start_datetime.hour
start_minute = start_datetime.minute
if year != start_datetime.year or month != start_datetime.month or day != start_datetime.day:
start_hour = 0
start_minute = 0
if hour is None and minute is None:
hour = start_hour
minute = start_minute
while start_hour <= hour < max_hour or (hour == max_hour and start_minute <= minute <= max_minute):
yield ((hour, minute))
if minute < 59:
minute += 1
else:
minute = 0
hour += 1
elif hour is not None and minute is None:
minute = start_minute
if start_hour <= hour <= max_hour:
while minute <= max_minute:
yield ((hour, minute))
minute += 1
elif hour is not None and minute is not None:
if start_hour <= hour < max_hour or (hour == max_hour and start_minute <= minute <= max_minute):
yield ((hour, minute))
else: # hour is None and minute is not None
hour = start_hour
if start_minute > minute:
hour += 1
while start_hour <= hour < max_hour or (hour == max_hour and start_minute <= minute <= max_minute):
yield ((hour, minute))
hour += 1
[docs] @classmethod
@verify_type(scheduled=str)
def from_string(cls, scheduled):
return cls.from_string_tokens(*filter(lambda x: len(x) > 0, scheduled.strip().split(' ')))
[docs] @classmethod
@verify_type(tokens=str)
def from_string_tokens(cls, *tokens):
if len(tokens) != 5:
raise ValueError('Malformed cron-schedule')
tokens = [int(x) if x != '*' else None for x in tokens]
if len(tokens) != 5:
raise ValueError('Malformed cron-schedule')
return cls(
cls.now(), minute=tokens[0], hour=tokens[1], day_of_month=tokens[2], day_of_week=tokens[3],
month=tokens[4]
)
[docs]class WCronLocalTZSchedule(WCronSchedule):
pass
[docs]class WCronUTCSchedule(WCronSchedule):
@verify_type('paranoid', start_datetime=(datetime, None), minute=(int, None), hour=(int, None))
@verify_type('paranoid', day_of_month=(int, None), day_of_week=(int, None), month=(int, None))
@verify_value('paranoid', minute=lambda x: x is None or (0 <= x <= 59))
@verify_value('paranoid', hour=lambda x: x is None or (0 <= x <= 23))
@verify_value('paranoid', day_of_month=lambda x: x is None or (1 <= x <= 31))
@verify_value('paranoid', day_of_week=lambda x: x is None or (1 <= x <= 7))
@verify_value('paranoid', month=lambda x: x is None or (1 <= x <= 12))
@verify_value(start_datetime=lambda x: x.tzinfo is not None and x.tzinfo == timezone.utc)
def __init__(self, start_datetime=None, minute=None, hour=None, day_of_month=None, day_of_week=None, month=None):
WCronSchedule.__init__(
self, start_datetime, minute=minute, hour=hour, day_of_month=day_of_month,
day_of_week=day_of_week, month=month
)
@verify_type(start_datetime=datetime)
@verify_value(start_datetime=lambda x: x.tzinfo is not None and x.tzinfo == timezone.utc)
def _set_start_datetime(self, start_datetime):
WCronSchedule._set_start_datetime(self, start_datetime)
@verify_type('paranoid', year=int, month=int, day=int, hour=int, minute=int)
@verify_value('paranoid', year=lambda x: x > 0)
@verify_value('paranoid', month=lambda x: x is None or (1 <= x <= 12))
@verify_value('paranoid', day=lambda x: x is None or (1 <= x <= 31))
@verify_value('paranoid', hour=lambda x: x is None or (0 <= x <= 23))
@verify_value('paranoid', minute=lambda x: x is None or (0 <= x <= 59))
def _datetime(self, year, month, day, hour, minute):
return utc_datetime(datetime(year, month, day, hour, minute), local_value=False)
[docs] @classmethod
def now(cls):
return utc_datetime()
[docs]class WCronScheduleRecord(WScheduleRecord):
@verify_type('paranoid', task=WScheduleTask, task_group_id=(str, None))
@verify_value('paranoid', on_drop=lambda x: x is None or callable(x))
@verify_value('paranoid', on_wait=lambda x: x is None or callable(x))
@verify_type(schedule=WCronSchedule, omit_skipped=bool)
@verify_value(schedule=lambda x: isinstance(x, WCronLocalTZSchedule) or isinstance(x, WCronUTCSchedule))
def __init__(
self, cron_schedule, task, policy=None, task_group_id=None, on_drop=None, on_wait=None,
omit_skipped=True
):
WScheduleRecord.__init__(
self, task, policy=policy, task_group_id=task_group_id, on_drop=on_drop, on_wait=on_wait
)
self.__schedule = cron_schedule
self.__omit_skipped = omit_skipped
[docs] def cron_schedule(self):
return self.__schedule
[docs] def next_start(self):
cron = self.cron_schedule()
next_start = cron.next_start()
if isinstance(cron, WCronLocalTZSchedule):
return utc_datetime(dt=next_start)
elif isinstance(cron, WCronUTCSchedule):
return next_start
raise RuntimeError('Corrupted object!')
[docs] def complete(self):
self.cron_schedule().complete(omit_skipped=self.__omit_skipped)
[docs]class WCronTaskSource(WBasicTaskSource, WCriticalResource):
@verify_type('paranoid', scheduler_service=WSchedulerServiceProto)
def __init__(self, scheduler_service):
WBasicTaskSource.__init__(self, scheduler_service=scheduler_service)
WCriticalResource.__init__(self)
self.__tasks = []
self.__next_task = None
[docs] @verify_type(record=WCronScheduleRecord)
def add_record(self, record):
self.__add_record(record)
self.scheduler_service().update(task_source=self)
@WCriticalResource.critical_section()
@verify_type('paranoid', record=WCronScheduleRecord)
def __add_record(self, record):
self.__tasks.append(record)
self.__update(record)
@verify_type('paranoid', task=(WCronScheduleRecord, None))
def __update(self, task=None):
if task is not None:
next_start = task.next_start()
if self.__next_task is None or next_start < self.__next_task.next_start():
self.__next_task = task
elif len(self.__tasks) > 0:
next_task = self.__tasks[0]
for task in self.__tasks[1:]:
if next_task.next_start() is None:
next_task = task
elif task.next_start() is not None and task.next_start() < next_task.next_start():
next_task = task
self.__next_task = next_task
else:
self.__next_task = None
[docs] @WCriticalResource.critical_section()
def tasks_planned(self):
return len(self.__tasks)
[docs] @WCriticalResource.critical_section()
def has_records(self):
if self.__next_task is not None:
next_start = self.__next_task.next_start()
if next_start is not None and next_start <= utc_datetime():
result = [self.__next_task]
self.__next_task.complete()
self.__update()
return tuple(result)
[docs] @WCriticalResource.critical_section()
def next_start(self):
if self.__next_task is not None:
return self.__next_task.next_start()
[docs]class WInstantTaskSource(WBasicTaskSource, WCriticalResource):
__lock_acquiring_timeout__ = 5
""" Timeout with which critical section lock must be acquired
"""
@verify_type('paranoid', scheduler_service=WSchedulerServiceProto)
def __init__(self, scheduler_service):
WBasicTaskSource.__init__(self, scheduler_service=scheduler_service)
WCriticalResource.__init__(self)
self.__records = []
[docs] @verify_type('paranoid', record=WScheduleRecord)
def add_record(self, record):
self.__add_record(record)
self.scheduler_service().update(self)
@verify_type(record=WScheduleRecord)
@WCriticalResource.critical_section(timeout=__lock_acquiring_timeout__)
def __add_record(self, record):
self.__records.append(record)
[docs] @WCriticalResource.critical_section(timeout=__lock_acquiring_timeout__)
def has_records(self):
if len(self.__records) > 0:
result = self.__records.copy()
self.__records = []
return tuple(result)
[docs] @WCriticalResource.critical_section(timeout=__lock_acquiring_timeout__)
def next_start(self):
if len(self.__records) > 0:
return utc_datetime()
[docs] @WCriticalResource.critical_section(timeout=__lock_acquiring_timeout__)
def tasks_planned(self):
return len(self.__records)