wasp_general.task.scheduler package

Submodules

wasp_general.task.scheduler.proto module

class wasp_general.task.scheduler.proto.WRunningRecordRegistryProto[source]

Bases: object

This class describes a registry of running tasks. It executes a scheduler record (WScheduleRecord), creates and store the related records (WScheduleRecord), and watches that these tasks are running

exec(schedule_record)[source]

Execute the given scheduler record (no time checks are made at this method, task executes as is)

Parameters:schedule_record – record to execute
Returns:None
running_records()[source]

Return tuple of running tasks

Returns:tuple of WScheduleRecord
class wasp_general.task.scheduler.proto.WScheduleRecord(task, policy=None, task_group_id=None, on_drop=None, on_wait=None)[source]

Bases: object

This class specifies how WScheduleTask should run. It should be treated as scheduler record, that may not have execution time.

WScheduleRecord has a policy, that describes what scheduler should do if it can not run this task at the specified moment. This policy is a recommendation for a scheduler and a scheduler can omit it if (for example) a scheduler queue is full. In any case, if this task is dropped (skipped) or postponed (moved to a queue of waiting tasks) correlated callback is called. “on_drop” callback is called for skipped tasks (it invokes via WScheduleRecord.task_dropped() method) and “on_wait” for postponed tasks (via WScheduleRecord.task_postponed() method)

note: It is important that tasks with the same id (task_group_id) have the same postpone policy. If they do not have the same policy, then exception may be raised. No pre-checks are made to resolve this, because of unpredictable logic of different tasks from different sources

class PostponePolicy[source]

Bases: enum.Enum

Specifies what should be with this task if a scheduler won’t be able to run it (like if the scheduler limit of running tasks is reached).

drop = 2
postpone_first = 3
postpone_last = 4
wait = 1
policy()[source]

Return postpone policy

Returns:WScheduleRecord.PostponePolicy
task()[source]

Return task that should be run

Returns:WScheduleTask
task_dropped()[source]

Call a “on_drop” callback. This method is executed by a scheduler when it skip this task

Returns:None
task_group_id()[source]

Return task id

Returns:str or None

see WScheduleRecord.__init__()

task_postponed()[source]

Call a “on_wait” callback. This method is executed by a scheduler when it postpone this task

Returns:None
task_uid()[source]

Shortcut for self.task().uid()

class wasp_general.task.scheduler.proto.WScheduleTask(thread_join_timeout=None)[source]

Bases: wasp_general.task.thread.WThreadTask

Class represent task that may run by a scheduler Every schedule task must be able:

  • to be stopped at any time
  • to return its status (running or stopped)
  • to notify when task end (thread event)

note: derived classes must implement WThreadTask.thread_started() and WThreadTask.thread_stopped() methods in order to be instantiable

Each task instance has “unique” identifier

classmethod generate_uid()[source]

Return “random” “unique” identifier

Returns:UUID
uid()[source]
class wasp_general.task.scheduler.proto.WSchedulerServiceProto[source]

Bases: object

Represent a scheduler. A core of wasp_general.task.scheduler module

update(task_source=None)[source]

Update task sources information about next start. Update information for the specified source or for all of them

Parameters:task_source – if it is specified - then update information for this source only

This method implementation must be thread-safe as different threads (different task source, different registries) may modify scheduler internal state. :return:

class wasp_general.task.scheduler.proto.WTaskSourceProto[source]

Bases: object

Prototype for scheduler record generator. WSchedulerServiceProto doesn’t have scheduler as set of records. Instead, a service uses this class as scheduler records holder and checks if it is time to execute them.

has_records()[source]

Return records that should be run at the moment.

Returns:tuple of WScheduleRecord (tuple with one element at least) or None
next_start()[source]

Return datetime when the next task should be executed.

Returns:datetime in UTC timezone
scheduler_service()[source]

Return associated scheduler service

Returns:WSchedulerServiceProto or None
tasks_planned()[source]

Return number of records (tasks) that are planned to run

Returns:int

wasp_general.task.scheduler.scheduler module

class wasp_general.task.scheduler.scheduler.WPostponedRecordRegistry(maximum_records=None)[source]

Bases: object

Registry for postponed records.

has_records()[source]

Check if there are postponed records. True - there is at least one postpone record, False - otherwise

Returns:bool
maximum_records()[source]

Return maximum number of records to postpone

Returns:int
postpone(record)[source]

Postpone (if required) the given task. The real action is depended on task postpone policy

Parameters:record – record to postpone
Returns:None
class wasp_general.task.scheduler.scheduler.WRunningRecordRegistry(watchdog_cls=None, thread_name_suffix=None)[source]

Bases: wasp_general.thread.WCriticalResource, wasp_general.task.scheduler.proto.WRunningRecordRegistryProto, wasp_general.task.thread.WPollingThreadTask

Registry of started scheduled records. Has WRunningRecordRegistry.cleanup_event() event that is set when any of running scheduled task stopped. This event starts process of internal clean up (descriptors that were created for the record - will be removed)

cleanup()[source]

Do cleanup (stop and remove watchdogs that are no longer needed)

Returns:None
cleanup_event()[source]

Return “cleanup” event

Returns:Event
exec(record)[source]

Start the given schedule record (no time checks are made by this method, task will be started as is)

Parameters:record – schedule record to start
Returns:None
running_records()[source]

Return schedule records that are running at the moment

Returns:tuple of WScheduleRecord
stop_running_tasks()[source]

Terminate all the running tasks

Returns:None
task_finished(watchdog)[source]

Handle/process scheduled task stop

Parameters:watchdog – watchdog of task that was stopped
Returns:None
thread_stopped()[source]

Handle registry stop

Returns:None
watchdog_class()[source]

Return watchdog class that is used by this registry

Returns:WSchedulerWatchdog class or subclass
class wasp_general.task.scheduler.scheduler.WSchedulerService(maximum_running_records=None, running_record_registry=None, maximum_postponed_records=None, postponed_record_registry=None, thread_name_suffix=None)[source]

Bases: wasp_general.thread.WCriticalResource, wasp_general.task.scheduler.proto.WSchedulerServiceProto, wasp_general.task.thread.WPollingThreadTask

Main scheduler service. This class unites different registries to present entire scheduler

add_task_source(task_source)[source]

Add tasks source

Parameters:task_source – task source to add
Returns:None
maximum_postponed_records()[source]

Return number of tasks that are able to be postponed

Returns:int or None (for no limit)
maximum_running_records()[source]

Return number of tasks that are able to run simultaneously

Returns:int
records_status()[source]

Return number of running and postponed tasks

Returns:tuple of two ints (first - running tasks, second - postponed tasks)
running_records()[source]

Return scheduled tasks that are running at the moment

Returns:tuple of WScheduleRecord
task_sources()[source]

Return task sources that was added to this scheduler

Returns:tuple of WTaskSourceProto
thread_started()[source]

Start required registries and start this scheduler

Returns:None
thread_stopped()[source]

Stop registries and this scheduler

Returns:None
update(task_source=None)[source]

Recheck next start of tasks from all the sources (or from the given one only)

Parameters:task_source – if defined - source to check
Returns:None
class wasp_general.task.scheduler.scheduler.WSchedulerWatchdog(record, registry)[source]

Bases: wasp_general.thread.WCriticalResource, wasp_general.task.thread.WPollingThreadTask

Class that is looking for execution process of scheduled task. Each scheduled task has its own watchdog. Watchdog that will un-register stopped task from registry of running tasks

classmethod create(record, registry)[source]

Core method for watchdog creation. Derived classes may redefine this method in order to change watchdog creation process

Parameters:
  • record – schedule record that is ready to be executed
  • registry – registry that is created this watchdog and registry that must be notified of scheduled task stopping
Returns:

record()[source]

Return scheduler record

Returns:WScheduleRecord
registry()[source]

Return parent registry

Returns:WRunningRecordRegistry
start()[source]

Start scheduled task and start watching

Returns:None
thread_started()[source]

Start watchdog thread function

Returns:None
thread_stopped()[source]

Stop scheduled task beacuse of watchdog stop

Returns:None
class wasp_general.task.scheduler.scheduler.WTaskSourceRegistry[source]

Bases: object

Registry of tasks sources. It works as a dynamic queue - every task source notify this registry when next task should be started. And this registry fetches those tasks that is about to start. Registry is able to return schedule records from different sources at one time.

add_source(task_source)[source]

Add new tasks source

Parameters:task_source
Returns:None
check()[source]

Check if there are records that are ready to start and return them if there are any

Returns:tuple of WScheduleRecord or None (if there are no tasks to start)
task_sources()[source]

Return task sources that was added to this registry

Returns:tuple of WTaskSourceProto
update(task_source=None)[source]

Recheck next start of records from all the sources (or from the given one only)

Parameters:task_source – if defined - source to check
Returns:None

wasp_general.task.scheduler.task_source module

class wasp_general.task.scheduler.task_source.WBasicTaskSource(scheduler_service)[source]

Bases: wasp_general.task.scheduler.proto.WTaskSourceProto

scheduler_service()[source]
class wasp_general.task.scheduler.task_source.WCronLocalTZSchedule(start_datetime=None, minute=None, hour=None, day_of_month=None, day_of_week=None, month=None)[source]

Bases: wasp_general.task.scheduler.task_source.WCronSchedule

class wasp_general.task.scheduler.task_source.WCronSchedule(start_datetime=None, minute=None, hour=None, day_of_month=None, day_of_week=None, month=None)[source]

Bases: object

complete(omit_skipped=True)[source]
day_iterator(year, month)[source]
day_of_month()[source]
day_of_week()[source]
final_datetime()[source]
classmethod from_string(scheduled)[source]
classmethod from_string_tokens(*tokens)[source]
hour()[source]
minute()[source]
month()[source]
month_iterator()[source]
next_start()[source]
no_frequency()[source]
classmethod now()[source]
start_datetime()[source]
time_iterator(year, month, day)[source]
update()[source]
class wasp_general.task.scheduler.task_source.WCronScheduleRecord(cron_schedule, task, policy=None, task_group_id=None, on_drop=None, on_wait=None, omit_skipped=True)[source]

Bases: wasp_general.task.scheduler.proto.WScheduleRecord

complete()[source]
cron_schedule()[source]
next_start()[source]
class wasp_general.task.scheduler.task_source.WCronTaskSource(scheduler_service)[source]

Bases: wasp_general.task.scheduler.task_source.WBasicTaskSource, wasp_general.thread.WCriticalResource

add_record(record)[source]
has_records()[source]
next_start()[source]
tasks_planned()[source]
class wasp_general.task.scheduler.task_source.WCronUTCSchedule(start_datetime=None, minute=None, hour=None, day_of_month=None, day_of_week=None, month=None)[source]

Bases: wasp_general.task.scheduler.task_source.WCronSchedule

classmethod now()[source]
class wasp_general.task.scheduler.task_source.WInstantTaskSource(scheduler_service)[source]

Bases: wasp_general.task.scheduler.task_source.WBasicTaskSource, wasp_general.thread.WCriticalResource

add_record(record)[source]
has_records()[source]
next_start()[source]
tasks_planned()[source]

Module contents