Source code for wasp_general.command.context

# -*- coding: utf-8 -*-
# wasp_general/command/context.py
#
# Copyright (C) 2016 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# TODO: tests require

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

from abc import ABCMeta, abstractmethod

from wasp_general.verify import verify_type, verify_value
from wasp_general.command.command import WCommandProto
from wasp_general.composer import WComposerProto


[docs]class WContextProto(metaclass=ABCMeta): """ Represent context configuration """
[docs] @abstractmethod def context_name(self): """ Return this context name :return: str """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def context_value(self): """ Return this context value (can be None) :return: str or None """ raise NotImplementedError('This method is abstract')
[docs] @abstractmethod def linked_context(self): """ Return link to 'parent'/'higher' context :return: WContextProto or None """ raise NotImplementedError('This method is abstract')
def __len__(self): """ Return linked context count :return: int """ return len([x for x in self]) def __iter__(self): """Iterate over :return: """ context = self while context is not None: yield context context = context.linked_context() def __eq__(self, other): """ Compare two context. Two context are equal if they have the same context_name and each their own linked context are equal also. :param other: context to compare :return: bool """ if isinstance(other, WContextProto) is False: return False context_a = self context_b = other while context_a is not None and context_b is not None: if context_a.context_name() != context_b.context_name(): return False context_a = context_a.linked_context() context_b = context_b.linked_context() if context_b is not None: return False elif context_a is not None: return False return True
[docs]class WContext(WContextProto): """ :class:`.WContextProto` implementation """ @verify_type(context_name=str, context_value=(str, None), linked_context=(WContextProto, None)) def __init__(self, context_name, context_value=None, linked_context=None): """ Create new context request :param context_name: context name :param context_value: context value :param linked_context: linked context """ self.__context_name = context_name self.__context_value = context_value self.__linked_context = linked_context
[docs] def context_name(self): """ :meth:`.WContextProto.context_name` implementation """ return self.__context_name
[docs] def context_value(self): """ :meth:`.WContextProto.context_value` implementation """ return self.__context_value
[docs] def linked_context(self): """ :meth:`.WContextProto.linked_context` implementation """ return self.__linked_context
[docs] @classmethod @verify_type(context=(WContextProto, None)) def export_context(cls, context): """ Export the specified context to be capable context transferring :param context: context to export :return: tuple """ if context is None: return result = [(x.context_name(), x.context_value()) for x in context] result.reverse() return tuple(result)
[docs] @classmethod @verify_type(context=(tuple, list, None)) def import_context(cls, context): """ Import context to corresponding WContextProto object (:meth:`WContext.export_context` reverse operation) :param context: context to import :return: WContext """ if context is None or len(context) == 0: return result = WContext(context[0][0], context[0][1]) for iter_context in context[1:]: result = WContext(iter_context[0], context_value=iter_context[1], linked_context=result) return result
[docs] @classmethod @verify_type(context_specs=str) def specification(cls, *context_specs): """ Return linked context as adapter specification (is used by :class:`.WCommandContextAdapter`) :param context_specs: context names :return: WContext """ import_data = [] for name in context_specs: import_data.append((name, None)) return cls.import_context(import_data)
[docs]class WContextComposer(WComposerProto):
[docs] @verify_type('paranoid', obj_spec=(tuple, list, None)) def compose(self, obj_spec): return WContext.import_context(obj_spec)
[docs] @verify_type('paranoid', obj=(WContextProto, None)) def decompose(self, obj): return WContext.export_context(obj)
[docs]class WCommandContextAdapter(metaclass=ABCMeta): """ Adapter is used for command tokens modification """ @verify_type(context_specifications=(WContextProto, None)) def __init__(self, context_specifications): """ Create adapter :param context_specifications: context for what this adapter works """ self.__spec = context_specifications
[docs] def specification(self): """ Return adapter specification :return: WContextProto or None """ return self.__spec
[docs] @verify_type(command_context=(WContextProto, None)) def match(self, command_context=None, **command_env): """ Check if context request is compatible with adapters specification. True - if compatible, False - otherwise :param command_context: context to check :param command_env: command environment :return: bool """ spec = self.specification() if command_context is None and spec is None: return True elif command_context is not None and spec is not None: return command_context == spec return False
[docs] @abstractmethod @verify_type(command_tokens=str, command_context=(WContextProto, None)) def adapt(self, *command_tokens, command_context=None, **command_env): """ Adapt the given command tokens with this adapter :param command_tokens: command tokens to adapt :param command_context: context :param command_env: command environment :return: list of str """ raise NotImplementedError('This method is abstract')
[docs]class WCommandContext(WCommandProto): """ Command that can be adapted by a context """ @verify_type(command=WCommandProto, context_adapter=WCommandContextAdapter) def __init__(self, base_command, context_adapter): """ Create new command :param base_command: basic command that does real magic :param context_adapter: adapter for command tokens modification """ WCommandProto.__init__(self) self.__command = base_command self.__adapter = context_adapter
[docs] def original_command(self): """ Return source command :return: WCommandProto """ return self.__command
[docs] def adapter(self): """ Return command adapter :return: WCommandAdapter """ return self.__adapter
[docs] @verify_type('paranoid', command_tokens=str, command_context=(WContextProto, None)) def match(self, *command_tokens, command_context=None, **command_env): """ Match command :param command_tokens: command tokens to check :param command_context: command context :param command_env: command environment :return: bool """ if self.adapter().match(command_context, **command_env) is False: return False command_tokens = self.adapter().adapt(*command_tokens, command_context=command_context, **command_env) return self.original_command().match(*command_tokens, command_context=command_context, **command_env)
[docs] @verify_type('paranoid', command_tokens=str, command_context=(WContextProto, None)) def exec(self, *command_tokens, command_context=None, **command_env): """ Execute command :param command_tokens: command tokens to execute :param command_context: command context :param command_env: command environment :return: WCommandResultProto """ if self.adapter().match(command_context, **command_env) is False: cmd = WCommandProto.join_tokens(*command_tokens) spec = self.adapter().specification() if spec is not None: spec = [x.context_name() for x in spec] spec.reverse() spec = ','.join(spec) raise RuntimeError('Command mismatch: %s (context: %s)' % (cmd, spec)) command_tokens = self.adapter().adapt(*command_tokens, command_context=command_context, **command_env) return self.original_command().exec(*command_tokens, command_context=command_context, **command_env)