# -*- coding: utf-8 -*-
# wasp_general/composer.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general. If not, see <http://www.gnu.org/licenses/>.
# TODO: document the code
# TODO: write tests for the code
# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__
from abc import ABCMeta, abstractmethod
from wasp_general.verify import verify_type, verify_value, verify_subclass
[docs]class WComposerProto(metaclass=ABCMeta):
[docs] @abstractmethod
def compose(self, obj_spec):
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
def decompose(self, obj):
raise NotImplementedError('This method is abstract')
[docs]class WPlainComposer(WComposerProto):
@verify_subclass(strict_cls=(int, float, str, None))
def __init__(self, strict_cls=None, permit_none=False):
WComposerProto.__init__(self)
self.__strict_cls = strict_cls
self.__permit_none = permit_none
[docs] def strict_cls(self):
return self.__strict_cls
[docs] def permit_none(self):
return self.__permit_none
[docs] @verify_type(obj_spec=(int, float, str, None))
def compose(self, obj_spec):
self.__check(obj_spec)
return obj_spec
[docs] @verify_type(obj_spec=(int, float, str, None))
def decompose(self, obj):
self.__check(obj)
return obj
def __check(self, obj):
if obj is None:
if self.permit_none() is False:
raise TypeError('None value spotted')
else:
strict_cls = self.strict_cls()
if strict_cls is not None and isinstance(obj, strict_cls) is False:
raise TypeError(
'Invalid type ("%s" should be "%s")' %
(obj.__class__.__name__, strict_cls.__name__)
)
# noinspection PyAbstractClass
[docs]class WProxyComposer(WComposerProto):
@verify_type(basic_composer=WComposerProto)
def __init__(self, basic_composer):
WComposerProto.__init__(self)
self.__basic_composer = basic_composer
[docs] def basic_composer(self):
return self.__basic_composer
[docs]class WIterComposer(WProxyComposer):
[docs] @verify_type(obj_spec=(list, set, tuple))
def compose(self, obj_spec):
return [self.basic_composer().compose(x) for x in obj_spec]
[docs] @verify_type(obj_spec=(list, set, tuple))
def decompose(self, obj):
return [self.basic_composer().decompose(x) for x in obj]
[docs]class WCompositeComposer(WComposerProto):
# noinspection PyAbstractClass
[docs] class CompositeKey(WProxyComposer):
@verify_type('paranoid', basic_composer=WComposerProto)
@verify_type(required=bool)
def __init__(self, key, basic_composer, required=False):
WProxyComposer.__init__(self, basic_composer)
self.__key = key
self.__required = required
[docs] def key(self):
return self.__key
[docs] def required(self):
return self.__required
[docs] def compose(self, key_value):
return self.basic_composer().compose(key_value)
[docs] def decompose(self, key_value):
return self.basic_composer().decompose(key_value)
[docs] @abstractmethod
def has_key(self, obj):
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
def get_key(self, obj):
raise NotImplementedError('This method is abstract')
[docs] @abstractmethod
def set_key(self, obj, value):
raise NotImplementedError('This method is abstract')
[docs] class KeyInstance:
def __init__(self, composite_key, value):
if isinstance(composite_key, WCompositeComposer.CompositeKey) is False:
raise TypeError('Invalid composite_key type')
self.composite_key = composite_key
self.value = value
[docs] class ConstructionKeys:
def __init__(self, *key_instances):
self.__key_instances = []
for instance in key_instances:
self.add(instance)
[docs] def add(self, key_instance):
if isinstance(key_instance, WCompositeComposer.KeyInstance) is False:
raise TypeError('Invalid construction pair type')
self.__key_instances.append(key_instance)
[docs] def key_instances(self):
return self.__key_instances.copy()
def __getitem__(self, key):
for instance in self.key_instances():
if instance.composite_key.key() == key:
return instance
raise KeyError('Invalid key: "%s"' % str(key))
[docs] def has_key(self, key):
for instance in self.key_instances():
if instance.composite_key.key() == key:
return True
return False
def __iter__(self):
for pair in self.key_instances():
yield pair
[docs] class InstanceConstructor(metaclass=ABCMeta):
[docs] def construct_obj(self, construction_keys):
if isinstance(construction_keys, WCompositeComposer.ConstructionKeys) is False:
raise TypeError('Invalid construction_keys type')
result = self.create_obj(construction_keys)
for instance in construction_keys:
instance.composite_key.set_key(result, instance.value)
return result
[docs] @abstractmethod
def create_obj(self, construction_keys):
raise NotImplementedError('This method is abstract')
@verify_type('paranoid', composite_keys=CompositeKey)
@verify_type(constructor=(InstanceConstructor, None))
def __init__(self, *composite_keys, constructor=None):
self.__constructor = \
constructor if constructor is not None else WCompositeComposer.InstanceConstructor()
self.__required_keys = []
self.__optional_keys = []
for key in composite_keys:
self.add_composite_key(key)
[docs] def constructor(self):
return self.__constructor
[docs] @verify_type(composite_key=CompositeKey)
def add_composite_key(self, composite_key):
keys_list = self.__required_keys if composite_key.required() else self.__optional_keys
keys_list.append(composite_key)
[docs] def required_keys(self):
return self.__required_keys.copy()
[docs] def optional_keys(self):
return self.__optional_keys.copy()
[docs] @verify_type(obj_spec=dict)
def compose(self, obj_spec):
obj_spec = obj_spec.copy()
construction_keys = WCompositeComposer.ConstructionKeys()
for composite_key in self.required_keys():
key = composite_key.key()
if key not in obj_spec.keys():
raise TypeError('Required key not found')
value = composite_key.compose(obj_spec[key])
obj_spec.pop(key)
construction_keys.add(WCompositeComposer.KeyInstance(composite_key, value))
for composite_key in self.optional_keys():
key = composite_key.key()
if key in obj_spec.keys():
value = composite_key.compose(obj_spec[key])
obj_spec.pop(key)
construction_keys.add(WCompositeComposer.KeyInstance(composite_key, value))
if len(obj_spec) > 0:
raise TypeError('Unable to process every fields')
return self.constructor().construct_obj(construction_keys)
[docs] def decompose(self, obj):
result = {}
for composite_key in self.required_keys():
if composite_key.has_key(obj) is False:
raise TypeError('Required key not found')
value = composite_key.decompose(composite_key.get_key(obj))
result[composite_key.key()] = value
for composite_key in self.optional_keys():
if composite_key.has_key(obj) is True:
value = composite_key.decompose(composite_key.get_key(obj))
result[composite_key.key()] = value
return result
[docs]class WDictComposer(WCompositeComposer):
[docs] class DictKey(WCompositeComposer.CompositeKey):
[docs] def has_key(self, obj):
return self.key() in obj.keys()
[docs] def get_key(self, obj):
return obj[self.key()]
[docs] def set_key(self, obj, value):
obj[self.key()] = value
return obj
[docs] class DictConstructor(WCompositeComposer.InstanceConstructor):
[docs] def create_obj(self, construction_keys):
return {}
@verify_type('paranoid', composite_keys=DictKey)
def __init__(self, *composite_keys):
WCompositeComposer.__init__(self, *composite_keys, constructor=WDictComposer.DictConstructor())
[docs] @verify_type(composite_key=DictKey)
def add_composite_key(self, composite_key):
WCompositeComposer.add_composite_key(self, composite_key)
[docs]class WClassComposer(WCompositeComposer):
[docs] class ClassKey(WCompositeComposer.CompositeKey):
@verify_type('paranoid', basic_composer=WComposerProto, required=bool)
@verify_type(key=str)
@verify_value(key=lambda x: len(x) > 0)
@verify_value(has_key_fn=lambda x: x is None or callable(x))
@verify_value(get_key_fn=lambda x: x is None or callable(x))
@verify_value(set_key_fn=lambda x: x is None or callable(x))
def __init__(
self, key, basic_composer, has_key_fn=None, get_key_fn=None, set_key_fn=None, required=False
):
WCompositeComposer.CompositeKey.__init__(self, key, basic_composer, required=required)
self.__get_key_fn = get_key_fn
self.__set_key_fn = set_key_fn
self.__has_key_fn = has_key_fn
[docs] def has_key(self, obj):
if self.__has_key_fn is not None:
return self.__has_key_fn(obj, self.key())
return hasattr(obj, self.key())
[docs] def get_key(self, obj):
if self.__get_key_fn is not None:
return self.__get_key_fn(obj, self.key())
return getattr(obj, self.key())
[docs] def set_key(self, obj, value):
if self.__set_key_fn is not None:
return self.__set_key_fn(obj, self.key(), value)
setattr(obj, self.key(), value)
return obj
[docs] class GetterKey(ClassKey):
def __init__(self, key, basic_composer, required=False):
WClassComposer.ClassKey.__init__(self, key, basic_composer=basic_composer, required=required)
[docs] def get_key(self, obj):
return WClassComposer.ClassKey.get_key(self, obj)()
[docs] def set_key(self, obj, value):
return obj
[docs] class ClassConstructor(WCompositeComposer.InstanceConstructor):
@verify_type(basic_cls=type)
@verify_value(create_obj_fn=(lambda x: x is None or callable(x)))
def __init__(self, basic_cls, create_obj_fn=None):
self.__basic_cls = basic_cls
self.__create_obj_fn = create_obj_fn
[docs] def basic_cls(self):
return self.__basic_cls
[docs] def create_obj(self, construction_keys):
if self.__create_obj_fn is not None:
return self.__create_obj_fn(construction_keys)
return self.basic_cls()()
@verify_type('paranoid', composite_keys=ClassKey)
@verify_type(constructor=ClassConstructor)
def __init__(self, *composite_keys, constructor=None):
WCompositeComposer.__init__(self, *composite_keys, constructor=constructor)
[docs] @verify_type(composite_key=ClassKey)
def add_composite_key(self, composite_key):
WCompositeComposer.add_composite_key(self, composite_key)
[docs] def basic_cls(self):
return self.constructor().basic_cls()
[docs]class WComposerFactory(WComposerProto):
[docs] class Entry:
@verify_type(composer=WClassComposer, name=(str, None))
@verify_value(name=lambda x: x is None or len(x) > 0)
def __init__(self, composer, name=None):
self.__composer = composer
self.__name = name if name is not None else composer.basic_cls().__name__
[docs] def composer(self):
return self.__composer
[docs] def name(self):
return self.__name
__default_name_field__ = '__cls__'
__default_value_field__ = '__instance__'
@verify_type('paranoid', entries=Entry)
@verify_type(name_field=(str, None), value_field=(str, None))
@verify_value(name_field=lambda x: x is None or len(x) > 0)
@verify_value(value_field=lambda x: x is None or len(x) > 0)
def __init__(self, *entries, name_field=None, value_field=None):
WComposerProto.__init__(self)
self.__entries = {}
for entry in entries:
self.add_entry(entry)
self.__name_field = name_field if name_field is not None else self.__default_name_field__
self.__value_field = value_field if value_field is not None else self.__default_value_field__
[docs] @verify_type(entry=Entry)
def add_entry(self, entry):
entry_name = entry.name()
if entry_name in self.__entries.keys():
raise RuntimeError('Multiple entries with the same name "%s" spotted' % entry_name)
self.__entries[entry_name] = entry
[docs] def entries(self):
return self.__entries.copy()
[docs] def name_field(self):
return self.__name_field
[docs] def value_field(self):
return self.__value_field
[docs] @verify_type(obj_spec=dict)
def compose(self, obj_spec):
name_field = self.name_field()
value_field = self.value_field()
if len(obj_spec) != 2 or name_field not in obj_spec.keys() or value_field not in obj_spec.keys():
raise TypeError('Data malformed')
entries = self.entries()
entry_name = obj_spec[name_field]
entry_value = obj_spec[value_field]
if entry_name not in entries.keys():
raise TypeError('Invalid class "%s" was specified' % entry_name)
entry = entries[entry_name]
return entry.composer().compose(entry_value)
[docs] @verify_type(obj=object)
def decompose(self, obj):
suitable_entries = []
for entry in self.entries().values():
if isinstance(obj, entry.composer().basic_cls()) is True:
suitable_entries.append(entry)
entries_count = len(suitable_entries)
if entries_count == 0:
raise TypeError(
'Unable to find suitable entry for "%s" (no one matched)' % obj.__class__.__name__
)
elif entries_count > 1:
reduced_subcls_entries = []
for i_entry in suitable_entries:
is_subcls = False
i_subcls = i_entry.composer().basic_cls()
for j_entry in suitable_entries:
j_subcls = j_entry.composer().basic_cls()
if issubclass(i_subcls, j_subcls) is True and i_subcls != j_subcls:
is_subcls = True
break
if is_subcls is False:
reduced_subcls_entries.append(i_entry)
subcls_entries_count = len(reduced_subcls_entries)
if subcls_entries_count == 0:
raise TypeError('Unable to find suitable entry for "%s"' % obj.__class__.__name__)
elif subcls_entries_count > 1:
raise TypeError(
'Unable to find suitable entry for "%s" (matched two or more entries)' %
obj.__class__.__name__
)
suitable_entries = reduced_subcls_entries
entry = suitable_entries[0]
decompose_result = {
self.name_field(): entry.name(),
self.value_field(): entry.composer().decompose(obj)
}
return decompose_result