# -*- coding: utf-8 -*-
# wasp_general/task/thread_tracker.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general. If not, see <http://www.gnu.org/licenses/>.
# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__
from abc import ABCMeta, abstractmethod
import traceback
from enum import Enum
from wasp_general.verify import verify_type, verify_value
from wasp_general.task.thread import WThreadTask
from wasp_general.task.scheduler.proto import WScheduleTask, WScheduleRecord
from wasp_general.thread import WCriticalResource
from wasp_general.datetime import utc_datetime
[docs]class WTrackerEvents(Enum):
""" Possible tracking events
"""
start = 1 # start
stop = 2 # normal stop
termination = 3 # termination stop
exception = 4 # unhandled exception stop
wait = 5 # task has been postponed (scheduler event)
drop = 6 # task has been dropped (scheduler event)
[docs]class WThreadTrackerInfoStorageProto(metaclass=ABCMeta):
""" Prototype for a storage that keeps thread task events like start event, normal stop, termination, raised
unhandled exceptions) or scheduler events
"""
[docs] @abstractmethod
@verify_type(task=WThreadTask, event_details=(str, None))
def register_start(self, task, event_details=None):
""" Store start event
:param task: task that is starting
:param event_details: (optional) event details - any kind of data related to the given task and start \
event
:return: None
"""
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
@verify_type(task=WThreadTask, event_details=(str, None))
def register_stop(self, task, event_details=None):
""" Store stop event
:param task: task that stopped
:param event_details: (optional) event details - any kind of data related to the given task and stop \
event
:return: None
"""
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
@verify_type(task=WThreadTask, event_details=(str, None))
def register_termination(self, task, event_details=None):
""" Store termination event
:param task: task that was terminated
:param event_details: (optional) event details - any kind of data related to the given task and \
termination event
:return: None
"""
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
@verify_type(task=WThreadTask, raised_exception=Exception, exception_details=str, event_details=(str, None))
def register_exception(self, task, raised_exception, exception_details, event_details=None):
""" Store exception event
:param task: task that was terminated by unhandled exception
:param raised_exception: unhandled exception
:param exception_details: any kind of data related to the raised exception
:param event_details: (optional) event details - any kind of data related to the given task and
exception event
:return: None
"""
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
@verify_type(task=WThreadTask, event_details=(str, None))
def register_wait(self, task, event_details=None):
""" Store event of task postponing (this event is used in a scheduler classes)
:param task: task that was postponed
:param event_details: (optional) event details - any kind of data related to the given task and
postponing event
:return: None
"""
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
@verify_type(task=WThreadTask, event_details=(str, None))
def register_drop(self, task, event_details=None):
""" Store event of task drop (this event is used in a scheduler classes)
:param task: task that was dropped
:param event_details: (optional) event details - any kind of data related to the given task and
event of task drop
:return: None
"""
raise NotImplementedError('This method is abstract')
# noinspection PyAbstractClass
[docs]class WThreadTracker(WThreadTask):
""" Threaded task that may register its events (start event, normal stop fact, task termination fact,
unhandled exceptions)
:note: Since there is an extra work that should be done, this class may be inappropriate for low-latency
situation. Also, registering events should be done quickly because of this task joining timeout.
"""
@verify_type('paranoid', thread_name=(str, None), join_on_stop=bool, thread_join_timeout=(int, float, None))
@verify_type(tracker_storage=(WThreadTrackerInfoStorageProto, None), track_start=bool, track_stop=bool)
@verify_type(track_termination=bool, track_exception=bool)
def __init__(
self, tracker_storage=None, thread_name=None, join_on_stop=True, thread_join_timeout=None,
track_start=True, track_stop=True, track_termination=True, track_exception=True
):
""" Create new tracker
:param tracker_storage: storage that is used for registering eventd
:param thread_name: same as 'thread_name' in :meth:`.WThreadTask.__init__`
:param join_on_stop: same as 'join_on_stop' in :meth:`.WThreadTask.__init__`
:param thread_join_timeout: same as 'thread_join_timeout' in :meth:`.WThreadTask.__init__`
:param track_start: whether to register start event of this task or not
:param track_stop: whether to register stop event of this task or not
:param track_termination: whether to register termination event of this task or not
:param track_exception: whether to register unhandled exception event or not
"""
WThreadTask.__init__(
self,
thread_name=thread_name,
join_on_stop=join_on_stop,
ready_to_stop=True,
thread_join_timeout=thread_join_timeout
)
self.__tracker = tracker_storage
self.__track_start = track_start
self.__track_stop = track_stop
self.__track_termination = track_termination
self.__track_exception = track_exception
[docs] def tracker_storage(self):
""" Return linked storage
:return: WThreadTrackerInfoStorageProto or None
"""
return self.__tracker
[docs] def track_start(self):
""" Return True if this task is tracking "start-event" otherwise - False
:return: bool
"""
return self.__track_start
[docs] def track_stop(self):
""" Return True if this task is tracking "stop-event" otherwise - False
:return: bool
"""
return self.__track_stop
[docs] def track_termination(self):
""" Return True if this task is tracking "termination-event" otherwise - False
:return: bool
"""
return self.__track_termination
[docs] def track_exception(self):
""" Return True if this task is tracking unhandled exception event otherwise - False
:return: bool
"""
return self.__track_exception
# noinspection PyMethodMayBeStatic
[docs] @verify_type(event=WTrackerEvents)
def event_details(self, event):
""" Return task details that should be registered with a tracker storage
:param event: source event that requested details
:return: str or None
"""
return None
[docs] def start(self):
""" :meth:`.WThreadTask.start` implementation. Register (if required) start event by a tracker storage
:return: None
"""
tracker = self.tracker_storage()
if tracker is not None and self.track_start() is True:
details = self.event_details(WTrackerEvents.start)
tracker.register_start(self, event_details=details)
WThreadTask.start(self)
[docs] def thread_stopped(self):
""" :meth:`.WThreadTask.thread_stopped` implementation. Register (if required) stop and termination
event by a tracker storage
:return: None
"""
tracker = self.tracker_storage()
if tracker is not None:
try:
if self.ready_event().is_set() is True:
if self.track_stop() is True:
details = self.event_details(WTrackerEvents.stop)
tracker.register_stop(self, event_details=details)
elif self.exception_event().is_set() is False:
if self.track_termination() is True:
details = self.event_details(WTrackerEvents.termination)
tracker.register_termination(self, event_details=details)
except Exception as e:
self.thread_tracker_exception(e)
[docs] @verify_type(raised_exception=Exception)
def thread_exception(self, raised_exception):
""" :meth:`.WThreadTask.thread_exception` implementation. Register (if required) unhandled exception
event by a tracker storage
:param raised_exception: unhandled exception
:return: None
"""
tracker = self.tracker_storage()
if tracker is not None:
try:
if self.track_exception() is True:
details = self.event_details(WTrackerEvents.exception)
tracker.register_exception(
self,
raised_exception,
traceback.format_exc(),
event_details=details
)
except Exception as e:
self.thread_tracker_exception(e)
# noinspection PyMethodMayBeStatic
[docs] @verify_type(raised_exception=Exception)
def thread_tracker_exception(self, raised_exception):
""" Method is called whenever an exception is raised during registering a event
:param raised_exception: raised exception
:return: None
"""
print('Thread tracker execution was stopped by the exception. Exception: %s' % str(raised_exception))
print('Traceback:')
print(traceback.format_exc())
[docs]class WSimpleTrackerStorage(WCriticalResource, WThreadTrackerInfoStorageProto):
""" Simple :class:`.WThreadTrackerInfoStorageProto` implementation which stores events in a operation memory
"""
__critical_section_timeout__ = 1
""" Timeout for capturing a lock for critical sections
"""
[docs] class Record:
""" General record of single event
"""
@verify_type(record_type=WTrackerEvents, thread_task=WThreadTask)
@verify_type(event_details=(str, None))
def __init__(self, record_type, thread_task, event_details=None):
""" Create new record
:param record_type: tracking event
:param thread_task: original task
:param event_details: task details
"""
self.record_type = record_type
self.thread_task = thread_task
self.event_details = event_details
self.registered_at = utc_datetime()
[docs] class ExceptionRecord(Record):
""" Record for unhandled exception
"""
@verify_type('paranoid', task=WThreadTask, event_details=(str, None))
@verify_type(exception=Exception, exception_details=str)
def __init__(self, task, exception, exception_details, event_details=None):
WSimpleTrackerStorage.Record.__init__(
self, WTrackerEvents.exception, task, event_details=event_details
)
self.exception = exception
self.exception_details = exception_details
@verify_type(records_limit=(int, None), record_start=bool, record_stop=bool, record_termination=bool)
@verify_type(record_exception=bool, record_wait=bool, record_drop=bool)
def __init__(
self, records_limit=None, record_start=True, record_stop=True, record_termination=True,
record_exception=True, record_wait=True, record_drop=True
):
""" Create new storage
:param records_limit: number of records to keep (if record limit is reached - new record will
overwrite the oldest one)
:param record_start: whether to keep start events or not
:param record_stop: whether to keep normal stop events or not
:param record_termination: whether to keep termination stop events or not
:param record_exception: whether to keep unhandled exceptions events or not
:param record_wait: whether to keep postponing events or not
:param record_drop: whether to keep drop events or not
"""
WCriticalResource.__init__(self)
WThreadTrackerInfoStorageProto.__init__(self)
self.__limit = records_limit
self.__registry = []
self.__record_start = record_start
self.__record_stop = record_stop
self.__record_termination = record_termination
self.__record_exception = record_exception
self.__record_wait = record_wait
self.__record_drop = record_drop
[docs] def record_limit(self):
""" Return maximum number of records to keep
:return: int or None (for no limit)
"""
return self.__limit
[docs] def record_start(self):
""" Return True if this storage is saving start events, otherwise - False
:return: bool
"""
return self.__record_start
[docs] def record_stop(self):
""" Return True if this storage is saving normal stop events, otherwise - False
:return: bool
"""
return self.__record_stop
[docs] def record_termination(self):
""" Return True if this storage is saving termination stop events, otherwise - False
:return: bool
"""
return self.__record_termination
[docs] def record_exception(self):
""" Return True if this storage is saving unhandled exceptions events, otherwise - False
:return: bool
"""
return self.__record_exception
[docs] def record_wait(self):
""" Return True if this storage is saving postponing events, otherwise - False
:return: bool
"""
return self.__record_wait
[docs] def record_drop(self):
""" Return True if this storage is saving dropping events, otherwise - False
:return: bool
"""
return self.__record_drop
[docs] def register_start(self, task, event_details=None):
""" :meth:`.WSimpleTrackerStorage.register_start` method implementation
"""
if self.record_start() is True:
record_type = WTrackerEvents.start
record = WSimpleTrackerStorage.Record(record_type, task, event_details=event_details)
self.__store_record(record)
[docs] @verify_type(task=WThreadTask, details=(str, None))
def register_stop(self, task, event_details=None):
""" :meth:`.WSimpleTrackerStorage.register_stop` method implementation
"""
if self.record_stop() is True:
record_type = WTrackerEvents.stop
record = WSimpleTrackerStorage.Record(record_type, task, event_details=event_details)
self.__store_record(record)
[docs] @verify_type(task=WThreadTask, details=(str, None))
def register_termination(self, task, event_details=None):
""" :meth:`.WSimpleTrackerStorage.register_termination` method implementation
"""
if self.record_termination() is True:
record_type = WTrackerEvents.termination
record = WSimpleTrackerStorage.Record(record_type, task, event_details=event_details)
self.__store_record(record)
[docs] @verify_type(task=WThreadTask, raised_exception=Exception, exception_details=str, details=(str, None))
def register_exception(self, task, raised_exception, exception_details, event_details=None):
""" :meth:`.WSimpleTrackerStorage.register_exception` method implementation
"""
if self.record_exception() is True:
record = WSimpleTrackerStorage.ExceptionRecord(
task, raised_exception, exception_details, event_details=event_details
)
self.__store_record(record)
[docs] @verify_type(task=WThreadTask, details=(str, None))
def register_wait(self, task, event_details=None):
""" :meth:`.WSimpleTrackerStorage.register_wait` method implementation
"""
if self.record_wait() is True:
record_type = WTrackerEvents.wait
record = WSimpleTrackerStorage.Record(record_type, task, event_details=event_details)
self.__store_record(record)
[docs] @verify_type(task=WThreadTask, details=(str, None))
def register_drop(self, task, event_details=None):
""" :meth:`.WSimpleTrackerStorage.register_drop` method implementation
"""
if self.record_drop() is True:
record_type = WTrackerEvents.drop
record = WSimpleTrackerStorage.Record(record_type, task, event_details=event_details)
self.__store_record(record)
@WCriticalResource.critical_section(timeout=__critical_section_timeout__)
def __store_record(self, record):
""" Save record in a internal storage
:param record: record to save
:return: None
"""
if isinstance(record, WSimpleTrackerStorage.Record) is False:
raise TypeError('Invalid record type was')
limit = self.record_limit()
if limit is not None and len(self.__registry) >= limit:
self.__registry.pop(0)
self.__registry.append(record)
@WCriticalResource.critical_section(timeout=__critical_section_timeout__)
def __registry_copy(self):
""" Return copy of tracked events
:return: list of WSimpleTrackerStorage.Record
"""
return self.__registry.copy()
def __iter__(self):
""" Iterate over registered events (WSimpleTrackerStorage.Record). The newest record will be yield
the first
:return: generator
"""
registry = self.__registry_copy()
while len(registry) > 0:
yield registry.pop(-1)
[docs] @verify_type(requested_events=WTrackerEvents)
def last_record(self, task_uid, *requested_events):
""" Search over registered :class:`.WScheduleTask` instances and return the last record that matches
search criteria.
:param task_uid: uid of :class:`.WScheduleTask` instance
:param requested_events: target events types
:return: WSimpleTrackerStorage.Record or None
"""
for record in self:
if isinstance(record.thread_task, WScheduleTask) is False:
continue
if record.thread_task.uid() == task_uid:
if len(requested_events) == 0 or record.record_type in requested_events:
return record
[docs]class WScheduleRecordTracker(WScheduleRecord):
""" Schedule record that may register scheduler events related to a scheduled task like 'postponing' event
and 'drop' event.
:note: Since there is an extra work that should be done, this class may be inappropriate for low-latency
situation. Also, registering events should be done quickly because of this task joining timeout.
"""
@verify_type('paranoid', task=WScheduleTask, task_group_id=(str, None))
@verify_value('paranoid', on_drop=lambda x: x is None or callable(x))
@verify_value('paranoid', on_wait=lambda x: x is None or callable(x))
@verify_type(task=WThreadTracker)
@verify_type(track_wait=bool, track_drop=bool)
def __init__(
self, task, policy=None, task_group_id=None, on_drop=None, on_wait=None,
track_wait=True, track_drop=True
):
""" Create new schedule record, that may track schedule events
:param task: same as task in :meth:`.WScheduleRecord.__init__` except it must be \
:class:`.WThreadTracker` instance
:param policy: same as policy in :meth:`.WScheduleRecord.__init__`
:param task_group_id: same as task_group_id in :meth:`.WScheduleRecord.__init__`
:param on_drop: same as on_drop in :meth:`.WScheduleRecord.__init__`
:param on_wait: same as on_wait in :meth:`.WScheduleRecord.__init__`
:param track_wait: whether to register postponing event of this record or not
:param track_drop: whether to register drop event of this record or not
"""
WScheduleRecord.__init__(
self, task, policy=policy, task_group_id=task_group_id, on_drop=on_drop, on_wait=on_wait
)
self.__track_wait = track_wait
self.__track_drop = track_drop
[docs] def track_wait(self):
""" Return True if this task is tracking "postponing" event otherwise - False
:return: bool
"""
return self.__track_wait
[docs] def track_drop(self):
""" Return True if this task is tracking "drop" event otherwise - False
:return: bool
"""
return self.__track_drop
[docs] def task_postponed(self):
""" Track (if required) postponing event and do the same job as :meth:`.WScheduleRecord.task_postponed`
method do
:return: None
"""
tracker = self.task().tracker_storage()
if tracker is not None and self.track_wait() is True:
details = self.task().event_details(WTrackerEvents.wait)
tracker.register_wait(self.task(), event_details=details)
WScheduleRecord.task_postponed(self)
[docs] def task_dropped(self):
""" Track (if required) drop event and do the same job as :meth:`.WScheduleRecord.task_dropped`
method do
:return: None
"""
tracker = self.task().tracker_storage()
if tracker is not None and self.track_drop() is True:
details = self.task().event_details(WTrackerEvents.drop)
tracker.register_drop(self.task(), event_details=details)
WScheduleRecord.task_dropped(self)