Source code for wasp_general.task.scheduler.proto

# -*- coding: utf-8 -*-
# wasp_general/task/scheduler/proto.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

import uuid
from abc import ABCMeta, abstractmethod
from enum import Enum
from datetime import datetime, timezone

from wasp_general.verify import verify_type, verify_value

from wasp_general.task.thread import WThreadTask


# noinspection PyAbstractClass
[docs]class WScheduleTask(WThreadTask): """ Class represent task that may run by a scheduler Every schedule task must be able: - to be stopped at any time - to return its status (running or stopped) - to notify when task end (thread event) note: derived classes must implement :meth:`.WThreadTask.thread_started` and :meth:`.WThreadTask.thread_stopped` methods in order to be instantiable Each task instance has "unique" identifier """ __thread_name_prefix__ = 'ScheduledTask-' """ Thread name prefix """ @verify_type('paranoid', thread_join_timeout=(int, float, None)) def __init__(self, thread_join_timeout=None): """ Create new task :param thread_join_timeout: same as thread_join_timeout in :meth:`.WThreadTask.__init__` method """ self.__uid = self.generate_uid() WThreadTask.__init__( self, thread_name=(self.__thread_name_prefix__ + str(self.__uid)), join_on_stop=True, ready_to_stop=True, thread_join_timeout=thread_join_timeout )
[docs] def uid(self): return self.__uid
[docs] @classmethod def generate_uid(cls): """ Return "random" "unique" identifier :return: UUID """ return uuid.uuid4()
[docs]class WScheduleRecord: """ This class specifies how :class:`.WScheduleTask` should run. It should be treated as scheduler record, that may not have execution time. :class:`.WScheduleRecord` has a policy, that describes what scheduler should do if it can not run this task at the specified moment. This policy is a recommendation for a scheduler and a scheduler can omit it if (for example) a scheduler queue is full. In any case, if this task is dropped (skipped) or postponed (moved to a queue of waiting tasks) correlated callback is called. "on_drop" callback is called for skipped tasks (it invokes via :meth:`.WScheduleRecord.task_dropped` method) and "on_wait" for postponed tasks (via :meth:`.WScheduleRecord.task_postponed` method) note: It is important that tasks with the same id (task_group_id) have the same postpone policy. If they do not have the same policy, then exception may be raised. No pre-checks are made to resolve this, because of unpredictable logic of different tasks from different sources """ # TODO: add policy that resolves concurrency of running tasks (like skipping tasks, that is already running)
[docs] class PostponePolicy(Enum): """ Specifies what should be with this task if a scheduler won't be able to run it (like if the scheduler limit of running tasks is reached). """ wait = 1 # will stack every postponed task to execute them later (default) drop = 2 # drop this task if it can't be executed at the moment postpone_first = 3 # stack the first task and drop all the following tasks with the same task ID postpone_last = 4 # stack the last task and drop all the previous tasks with the same task ID
@verify_type(task=WScheduleTask, task_group_id=(str, None)) @verify_value(on_drop=lambda x: x is None or callable(x), on_wait=lambda x: x is None or callable(x)) def __init__(self, task, policy=None, task_group_id=None, on_drop=None, on_wait=None): """ Create new schedule record :param task: task to run :param policy: postpone policy :param task_group_id: identifier that groups different scheduler records and single postpone policy :param on_drop: callback, that must be called if this task is skipped :param on_wait: callback, that must be called if this task is postponed """ if policy is not None and isinstance(policy, WScheduleRecord.PostponePolicy) is False: raise TypeError('Invalid policy object type') self.__task = task self.__policy = policy if policy is not None else WScheduleRecord.PostponePolicy.wait self.__task_group_id = task_group_id self.__on_drop = on_drop self.__on_wait = on_wait
[docs] def task(self): """ Return task that should be run :return: WScheduleTask """ return self.__task
[docs] def task_uid(self): """ Shortcut for self.task().uid() """ return self.task().uid()
[docs] def policy(self): """ Return postpone policy :return: WScheduleRecord.PostponePolicy """ return self.__policy
[docs] def task_group_id(self): """ Return task id :return: str or None see :meth:`.WScheduleRecord.__init__` """ return self.__task_group_id
[docs] def task_postponed(self): """ Call a "on_wait" callback. This method is executed by a scheduler when it postpone this task :return: None """ if self.__on_wait is not None: return self.__on_wait()
[docs] def task_dropped(self): """ Call a "on_drop" callback. This method is executed by a scheduler when it skip this task :return: None """ if self.__on_drop is not None: return self.__on_drop()
[docs]class WTaskSourceProto(metaclass=ABCMeta): """ Prototype for scheduler record generator. :class:`.WSchedulerServiceProto` doesn't have scheduler as set of records. Instead, a service uses this class as scheduler records holder and checks if it is time to execute them. """
[docs] @abstractmethod def has_records(self): """ Return records that should be run at the moment. :return: tuple of WScheduleRecord (tuple with one element at least) or None """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def next_start(self): """ Return datetime when the next task should be executed. :return: datetime in UTC timezone """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def tasks_planned(self): """ Return number of records (tasks) that are planned to run :return: int """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def scheduler_service(self): """ Return associated scheduler service :return: WSchedulerServiceProto or None """ raise NotImplementedError('This method is abstract')
[docs]class WRunningRecordRegistryProto(metaclass=ABCMeta): """ This class describes a registry of running tasks. It executes a scheduler record (:class:`.WScheduleRecord`), creates and store the related records (:class:`.WScheduleRecord`), and watches that these tasks are running """
[docs] @abstractmethod @verify_type(schedule_record=WScheduleRecord) def exec(self, schedule_record): """ Execute the given scheduler record (no time checks are made at this method, task executes as is) :param schedule_record: record to execute :return: None """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def running_records(self): """ Return tuple of running tasks :return: tuple of WScheduleRecord """ raise NotImplementedError('This method is abstract')
[docs]class WSchedulerServiceProto(metaclass=ABCMeta): """ Represent a scheduler. A core of wasp_general.task.scheduler module """
[docs] @abstractmethod @verify_type(task_source=(WTaskSourceProto, None)) def update(self, task_source=None): """ Update task sources information about next start. Update information for the specified source or for all of them :param task_source: if it is specified - then update information for this source only This method implementation must be thread-safe as different threads (different task source, different registries) may modify scheduler internal state. :return: """ raise NotImplementedError('This method is abstract')