Source code for wasp_general.command.enhanced

# -*- coding: utf-8 -*-
# wasp_general/command/enhanced.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# TODO: document the code
# TODO: write tests for the code

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

import re
from abc import abstractmethod
from enum import Enum

from wasp_general.verify import verify_type, verify_value, verify_subclass
from wasp_general.command.command import WCommandProto


[docs]class WCommandArgumentParsingError(Exception): pass
[docs]class WCommandArgumentDescriptor:
[docs] class ArgumentCastingHelperProto:
[docs] @abstractmethod @verify_type(argument_name=str) @verify_value(argument_name=lambda x: len(x) > 0) def cast(self, argument_name, argument_value): raise NotImplementedError('This method is abstract')
[docs] class FlagArgumentCastingHelper(ArgumentCastingHelperProto):
[docs] @verify_type(argument_name=str, argument_value=bool) @verify_value(argument_name=lambda x: len(x) > 0) def cast(self, argument_name, argument_value): return argument_value
[docs] class ArgumentCastingHelper(ArgumentCastingHelperProto): @verify_type(error_message=(str, None)) @verify_value(casting_fn=lambda x: x is None or callable(x)) @verify_value(validate_fn=lambda x: x is None or callable(x)) def __init__(self, casting_fn=None, validate_fn=None, error_message=None): WCommandArgumentDescriptor.ArgumentCastingHelperProto.__init__(self) self.__casting_fn = casting_fn self.__validate_fn = validate_fn self.__error_message = error_message
[docs] def casting_function(self): return self.__casting_fn
[docs] def validate_function(self): return self.__validate_fn
[docs] def error_message(self): return self.__error_message
[docs] @verify_type(argument_name=str, argument_value=str) @verify_value(argument_name=lambda x: len(x) > 0) def cast(self, argument_name, argument_value): casting_fn = self.casting_function() if casting_fn is not None: casted_argument_value = casting_fn(argument_value) else: casted_argument_value = argument_value validate_fn = self.validate_function() if validate_fn is not None: if validate_fn(casted_argument_value) is not True: error_message = self.error_message() if error_message is None: error_message = 'Attribute "%s" has invalid value: "%s"' % ( argument_name, argument_value ) raise WCommandArgumentParsingError(error_message) return casted_argument_value
[docs] class StringArgumentCastingHelper(ArgumentCastingHelper): @verify_type('paranoid', error_message=(str, None)) @verify_value('paranoid', validate_fn=lambda x: x is None or callable(x)) def __init__(self, validate_fn=None, error_message=None): WCommandArgumentDescriptor.ArgumentCastingHelper.__init__( self, validate_fn=validate_fn, error_message=error_message )
[docs] class IntegerArgumentCastingHelper(ArgumentCastingHelper): @verify_type('paranoid', error_message=(str, None)) @verify_value('paranoid', validate_fn=lambda x: x is None or callable(x)) @verify_type(base=int) @verify_value(base=lambda x: x > 0) def __init__(self, base=10, validate_fn=None, error_message=None): WCommandArgumentDescriptor.ArgumentCastingHelper.__init__( self, casting_fn=lambda x: int(x, base=base), validate_fn=validate_fn, error_message=error_message )
[docs] class FloatArgumentCastingHelper(ArgumentCastingHelper): @verify_type('paranoid', error_message=(str, None)) @verify_value('paranoid', validate_fn=lambda x: x is None or callable(x)) def __init__(self, validate_fn=None, error_message=None): WCommandArgumentDescriptor.ArgumentCastingHelper.__init__( self, casting_fn=lambda x: float(x), validate_fn=validate_fn, error_message=error_message )
[docs] class DataSizeArgumentHelper(ArgumentCastingHelper): __write_rate_re__ = re.compile('^(\d+[.\d]*)([KMGT]?)$') def __init__(self): WCommandArgumentDescriptor.ArgumentCastingHelper.__init__( self, casting_fn=self.cast_string )
[docs] @staticmethod @verify_type(value=str) def cast_string(value): re_rate = WCommandArgumentDescriptor.DataSizeArgumentHelper.__write_rate_re__.search(value) if re_rate is None: raise ValueError('Invalid write rate') result = float(re_rate.group(1)) if re_rate.group(2) == 'K': result *= (1 << 10) elif re_rate.group(2) == 'M': result *= (1 << 20) elif re_rate.group(2) == 'G': result *= (1 << 30) elif re_rate.group(2) == 'T': result *= (1 << 40) return result
[docs] class EnumArgumentHelper(ArgumentCastingHelper): @verify_subclass(enum_cls=Enum) def __init__(self, enum_cls): WCommandArgumentDescriptor.ArgumentCastingHelper.__init__( self, casting_fn=self.cast_string ) for item in enum_cls: if isinstance(item.value, str) is False: raise TypeError('Enum fields must bt str type') self.__enum_cls = enum_cls
[docs] @verify_type(value=str) def cast_string(self, value): return self.__enum_cls(value)
[docs] class RegExpArgumentHelper(ArgumentCastingHelper): @verify_type(regexp=str) def __init__(self, regexp): WCommandArgumentDescriptor.ArgumentCastingHelper.__init__( self, casting_fn=self.cast_string ) self.__regexp = re.compile(regexp)
[docs] def re(self): return self.__regexp
[docs] def validate_function(self): return lambda x: x is not None
[docs] @verify_type(value=str) def cast_string(self, value): result = self.re().match(value) if result is not None: return result.groups()
@verify_type(argument_name=str, required=bool, flag_mode=bool, multiple_values=bool, help_info=(str, None)) @verify_type(meta_var=(str, None), default_value=(str, None)) @verify_value(argument_name=lambda x: len(x) > 0) def __init__( self, argument_name, required=False, flag_mode=False, multiple_values=False, help_info=None, meta_var=None, default_value=None, casting_helper=None ): """ note: 'required' is useless for flag-mode attribute """ if (flag_mode is True and multiple_values is True) or \ (flag_mode is True and default_value is not None) or \ (multiple_values is True and default_value is not None): raise ValueError( 'Argument has conflict options. "flag_mode" and "multiple_values" can not be ' 'used at the same time' ) if casting_helper is not None: flag_helper = WCommandArgumentDescriptor.FlagArgumentCastingHelper general_helper = WCommandArgumentDescriptor.ArgumentCastingHelper if flag_mode is True and isinstance(casting_helper, flag_helper) is False: raise TypeError( 'casting_helper must be an instance of ' 'WCommandArgumentDescriptor.FlagArgumentCastingHelper for flag-mode attribute' ) elif flag_mode is False and isinstance(casting_helper, general_helper) is False: raise TypeError( 'casting_helper must be an instance of ' 'WCommandArgumentDescriptor.ArgumentCastingHelper for every attribute except ' 'flag-mode attribute' ) self.__argument_name = argument_name self.__flag_mode = flag_mode self.__multiple_values = multiple_values self.__default_value = default_value self.__required = required self.__help_info = help_info self.__meta_var = meta_var if casting_helper is not None: self.__casting_helper = casting_helper elif flag_mode is True: self.__casting_helper = WCommandArgumentDescriptor.FlagArgumentCastingHelper() else: self.__casting_helper = WCommandArgumentDescriptor.StringArgumentCastingHelper()
[docs] def argument_name(self): return self.__argument_name
[docs] def flag_mode(self): return self.__flag_mode
[docs] def multiple_values(self): return self.__multiple_values
[docs] def required(self): return self.__required
[docs] def default_value(self): return self.__default_value
[docs] def casting_helper(self): return self.__casting_helper
[docs] @verify_type(argument_name=str) @verify_value(argument_name=lambda x: len(x) > 0) def cast(self, argument_name, argument_value): return self.casting_helper().cast(argument_name, argument_value)
[docs] def help_info(self): return self.__help_info
[docs] def meta_var(self): return self.__meta_var
[docs]class WCommandArgumentRelationship:
[docs] class Relationship(Enum): conflict = 1 requirement = 2 one_of = 3
@verify_type(argument_names=str) def __init__(self, relationship, *argument_names): if isinstance(relationship, WCommandArgumentRelationship.Relationship) is False: raise TypeError('Invalid relationship type') self.__relationship = relationship if len(argument_names) < 2: raise ValueError('Relationship can be made with 2 arguments and more') self.__arguments = argument_names
[docs] def relationship(self): return self.__relationship
[docs] def arguments(self): return self.__arguments
[docs]class WCommandArgumentParser: @verify_type(argument_descriptors=WCommandArgumentDescriptor, relationships=(list, tuple, set, None)) def __init__(self, *argument_descriptors, relationships=None): self.__descriptors = argument_descriptors self.__relationships = relationships if self.__relationships is not None: for relation in self.__relationships: if isinstance(relation, WCommandArgumentRelationship) is False: raise TypeError('Invalid relationship type') for argument_name in relation.arguments(): argument_found = False for descriptor in self.__descriptors: if argument_name == descriptor.argument_name(): argument_found = True break if argument_found is False: raise ValueError('Relationship with unknown argument was specified')
[docs] def descriptors(self): return self.__descriptors
[docs] def relationships(self): return self.__relationships
def __select_arguments(self, relation, parsed_result): result = [] arguments = relation.arguments() for argument_name in arguments: if argument_name in parsed_result.keys(): result.append(argument_name) return result def __check_conflict_relation(self, relation, parsed_result): conflict_arguments = self.__select_arguments(relation, parsed_result) if len(conflict_arguments) >= 1: raise WCommandArgumentParsingError( "Conflict arguments was found: %s" % (', '.join(conflict_arguments)) ) def __check_requirements_relation(self, relation, parsed_result): found_arguments = set(self.__select_arguments(relation, parsed_result)) required_arguments = set(relation.arguments()) not_found_arguments = required_arguments.difference(found_arguments) if len(not_found_arguments) > 0 and len(not_found_arguments) != len(required_arguments): raise WCommandArgumentParsingError( "Required arguments wasn't found: %s" % (', '.join(not_found_arguments)) ) def __check_one_of_relation(self, relation, parsed_result): found_arguments = set(self.__select_arguments(relation, parsed_result)) if len(found_arguments) > 1: raise WCommandArgumentParsingError( "Conflict arguments was found: %s" % (', '.join(found_arguments)) ) elif len(found_arguments) == 0: arguments = relation.arguments() raise WCommandArgumentParsingError( "Required arguments was not found. It should be one of: %s" % (', '.join(arguments)) )
[docs] @verify_type(command_tokens=str) def parse(self, *command_tokens): descriptors = list(self.descriptors()) command_tokens = list(command_tokens) result = {} while len(command_tokens) > 0: reduced_command_tokens, descriptors, next_result = \ self.reduce_tokens(command_tokens.copy(), descriptors.copy(), previous_result=result) if len(reduced_command_tokens) >= len(command_tokens): raise WCommandArgumentParsingError("Command tokens wasn't reduce") command_tokens = reduced_command_tokens result = next_result relationships = self.relationships() if relationships is not None: for relation in relationships: relation_type = relation.relationship() if relation_type == WCommandArgumentRelationship.Relationship.conflict: self.__check_conflict_relation(relation, result) elif relation_type == WCommandArgumentRelationship.Relationship.requirement: self.__check_requirements_relation(relation, result) elif relation_type == WCommandArgumentRelationship.Relationship.one_of: self.__check_one_of_relation(relation, result) else: raise RuntimeError('Unknown relationship was specified') for descriptor in descriptors: argument_name = descriptor.argument_name() if descriptor.flag_mode() is True: result[argument_name] = descriptor.cast(argument_name, False) if descriptor.default_value() is not None: result[argument_name] = descriptor.cast(argument_name, descriptor.default_value()) for descriptor in self.descriptors(): if descriptor.required() is True: argument_name = descriptor.argument_name() if argument_name not in result.keys(): raise WCommandArgumentParsingError( "Required argument wasn't found: %s" % argument_name ) return result
[docs] @classmethod @verify_type(command_tokens=list, argument_descriptors=list, previous_result=(dict, None)) def reduce_tokens(cls, command_tokens, argument_descriptors, previous_result=None): argument_name = command_tokens.pop(0) descriptor = None for i in range(len(argument_descriptors)): descriptor_to_check = argument_descriptors[i] if descriptor_to_check.argument_name() == argument_name: descriptor = descriptor_to_check if descriptor_to_check.multiple_values() is False: argument_descriptors.pop(i) break if descriptor is None: if argument_name in previous_result.keys(): raise WCommandArgumentParsingError( 'Multiple argument ("%s") values found' % argument_name ) else: raise WCommandArgumentParsingError('Unknown argument: "%s"' % argument_name) result = previous_result.copy() if previous_result is not None else {} if descriptor.flag_mode() is True: result[argument_name] = descriptor.cast(argument_name, True) else: if len(command_tokens) == 0: raise WCommandArgumentParsingError("Argument requires value. Value wasn't found") argument_value = descriptor.cast(argument_name, command_tokens.pop(0)) if descriptor.multiple_values() is True: if argument_name not in result.keys(): result[argument_name] = [argument_value] else: result[argument_name].append(argument_value) else: if argument_name not in result.keys(): result[argument_name] = argument_value else: raise WCommandArgumentParsingError( 'Multiple values spotted for the single argument' ) return command_tokens, argument_descriptors, result
[docs] def arguments_help(self): result = [] for argument in self.descriptors(): argument_name = argument.argument_name() if argument.flag_mode() is not True: value_name = argument.meta_var() if value_name is None: value_name = 'value' argument_name = '%s [%s]' % (argument_name, value_name) description = argument.help_info() if description is None: description = 'argument description unavailable' meta = [] if argument.required() is True: meta.append('required') if argument.multiple_values() is True: meta.append('may have multiple values') default_value = argument.default_value() if default_value is not None: meta.append('default value: %s' % default_value) if len(meta) > 0: description += (' (%s)' % ', '.join(meta)) result.append((argument_name, description)) return tuple(result)
[docs]class WEnhancedCommand(WCommandProto): @verify_type('paranoid', argument_descriptors=WCommandArgumentDescriptor, relationships=(list, tuple, set, None)) @verify_type(command=str) @verify_value(command=lambda x: len(x) > 0) def __init__(self, command, *argument_descriptors, relationships=None): WCommandProto.__init__(self) self.__command = command self.__arguments_descriptors = argument_descriptors self.__relationships = relationships self.__parser = WCommandArgumentParser(*self.argument_descriptors(), relationships=self.relationships())
[docs] def command_token(self): return self.__command
[docs] def argument_descriptors(self): return self.__arguments_descriptors
[docs] def relationships(self): return self.__relationships
[docs] def parser(self): return self.__parser
[docs] @verify_type(command_tokens=str) def match(self, *command_tokens, **command_env): if len(command_tokens) > 0: if command_tokens[0] == self.command_token(): return True return False
[docs] @verify_type(command_tokens=str) def exec(self, *command_tokens, **command_env): if len(command_tokens) > 0: if command_tokens[0] == self.command_token(): return self._exec(self.parser().parse(*command_tokens[1:]), **command_env) raise RuntimeError('Invalid tokens')
@abstractmethod @verify_type(command_arguments=dict) def _exec(self, command_arguments, **command_env): raise NotImplementedError('This method is abstract')
[docs] def arguments_help(self): return self.parser().arguments_help()
[docs] def command_help(self): arguments_help = self.arguments_help() if len(arguments_help) == 0: return 'Command does not have arguments\n' info = 'Command arguments:\n' for argument_name, argument_description in arguments_help: info += '\t%s - %s\n' % (argument_name, argument_description) return info