Source code for wasp_general.network.messenger.coders

# -*- coding: utf-8 -*-
# wasp_general/network/messenger/coders.py
#
# Copyright (C) 2016 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

from base64 import b64encode, b64decode
from enum import Enum

from wasp_general.verify import verify_type
from wasp_general.crypto.hex import WHex, WUnHex
from wasp_general.crypto.aes import WAES
from wasp_general.crypto.rsa import WRSA

from wasp_general.network.messenger.proto import WMessengerOnionSessionProto
from wasp_general.network.messenger.envelope import WMessengerTextEnvelope, WMessengerBytesEnvelope
from wasp_general.network.messenger.layers import WMessengerOnionCoderLayerProto


[docs]class WMessengerFixedModificationLayer(WMessengerOnionCoderLayerProto): """ :class:`.WMessengerOnionCoderLayerProto` class implementation. This class applies fixed modification to specified messages. In :meth:`.WMessengerFixedModificationLayer.encode` method this class appends "header" to the message start or appends "tail" to the message end. One of them must be specified. For :class:`.WMessengerTextEnvelope` envelope, "header" or "tail" must be "str" type. For :class:`.WMessengerBytesEnvelope` envelope, "header" or "tail" must be "bytes" type. """ __layer_name__ = "com.binblob.wasp-general.fixed-modification-layer" """ Layer name """
[docs] class Target(Enum): """ Modification mode. Specifies whether modification code must be appended to start or to the end of a message """ head = 1 """ Modification code must be inserted to a message start """ tail = 2 """ Modification code must be added to a message end """
def __init__(self): """ Construct new layer """ WMessengerOnionCoderLayerProto.__init__(self, WMessengerFixedModificationLayer.__layer_name__) @verify_type(envelope=(WMessengerTextEnvelope, WMessengerBytesEnvelope)) def __args_check(self, envelope, target, modification_code): """ Method checks arguments, that are specified to the :meth:`.WMessengerFixedModificationLayer.encode` and :meth:`.WMessengerFixedModificationLayer.decode` methods :param envelope: same as envelope in :meth:`.WMessengerFixedModificationLayer.encode` and \ :meth:`.WMessengerFixedModificationLayer.decode` methods :param target: same as target in :meth:`.WMessengerFixedModificationLayer.encode` and \ :meth:`.WMessengerFixedModificationLayer.decode` methods :param modification_code: same as modification_code in \ :meth:`.WMessengerFixedModificationLayer.encode` and :meth:`.WMessengerFixedModificationLayer.decode` \ methods :return: None """ if target is None: raise RuntimeError('"target" argument must be specified for this layer') if modification_code is None: raise RuntimeError('"modification_code" argument must be specified for this layer') if isinstance(target, WMessengerFixedModificationLayer.Target) is False: raise TypeError('Invalid "target" argument') if isinstance(envelope, WMessengerTextEnvelope) is True: if isinstance(modification_code, str) is False: raise TypeError('Invalid "modification_code" argument for specified envelope') elif isinstance(modification_code, bytes) is False: raise TypeError('Invalid "modification_code" argument for specified envelope')
[docs] @verify_type('paranoid', envelope=(WMessengerTextEnvelope, WMessengerBytesEnvelope)) @verify_type('paranoid', session=WMessengerOnionSessionProto) def encode(self, envelope, session, target=None, modification_code=None, **kwargs): """ Methods appends 'modification_code' to the specified envelope. :param envelope: original envelope :param session: original session :param target: flag, that specifies whether code must be appended to the start or to the end :param modification_code: code to append :param kwargs: additional arguments :return: WMessengerTextEnvelope or WMessengerBytesEnvelope (depends on the original envelope) """ self.__args_check(envelope, target, modification_code) if isinstance(envelope, WMessengerTextEnvelope): target_envelope_cls = WMessengerTextEnvelope else: # isinstance(envelope, WMessengerBytesEnvelope) target_envelope_cls = WMessengerBytesEnvelope if target == WMessengerFixedModificationLayer.Target.head: return target_envelope_cls(modification_code + envelope.message(), meta=envelope) else: # target == WMessengerFixedModificationLayer.Target.tail return target_envelope_cls(envelope.message() + modification_code, meta=envelope)
[docs] @verify_type('paranoid', envelope=(WMessengerTextEnvelope, WMessengerBytesEnvelope)) @verify_type('paranoid', session=WMessengerOnionSessionProto) def decode(self, envelope, session, target=None, modification_code=None, **kwargs): """ Methods checks envelope for 'modification_code' existence and removes it. :param envelope: original envelope :param session: original session :param target: flag, that specifies whether code must be searched and removed at the start or at the end :param modification_code: code to search/remove :param kwargs: additional arguments :return: WMessengerTextEnvelope or WMessengerBytesEnvelope (depends on the original envelope) """ self.__args_check(envelope, target, modification_code) message = envelope.message() if len(message) < len(modification_code): raise ValueError('Invalid message length') if isinstance(envelope, WMessengerTextEnvelope): target_envelope_cls = WMessengerTextEnvelope else: # isinstance(envelope, WMessengerBytesEnvelope) target_envelope_cls = WMessengerBytesEnvelope if target == WMessengerFixedModificationLayer.Target.head: if message[:len(modification_code)] != modification_code: raise ValueError('Invalid header in message') return target_envelope_cls(message[len(modification_code):], meta=envelope) else: # target == WMessengerFixedModificationLayer.Target.tail if message[-len(modification_code):] != modification_code: raise ValueError('Invalid tail in message') return target_envelope_cls(message[:-len(modification_code)], meta=envelope)
[docs]class WMessengerEncodingLayer(WMessengerOnionCoderLayerProto): """ This layer can encode str-object to the related encoding (to the bytes-object). Or decode bytes-object from the specified encoding (from bytes-object to str-object) """ __layer_name__ = "com.binblob.wasp-general.encoding-layer" """ Layer name """ def __init__(self): """ Construct new layer """ WMessengerOnionCoderLayerProto.__init__(self, WMessengerEncodingLayer.__layer_name__)
[docs] @verify_type('paranoid', envelope=WMessengerTextEnvelope, session=WMessengerOnionSessionProto) @verify_type(encoding=(str, None)) def encode(self, envelope, session, encoding=None, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.encode` method implementation. :param envelope: original envelope :param session: original session :param encoding: encoding to use (default is 'utf-8') :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ message = envelope.message() message = message.encode() if encoding is None else message.encode(encoding) return WMessengerBytesEnvelope(message, meta=envelope)
[docs] @verify_type('paranoid', envelope=WMessengerBytesEnvelope, session=WMessengerOnionSessionProto) @verify_type(encoding=(str, None)) def decode(self, envelope, session, encoding=None, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.decode` method implementation. :param envelope: original envelope :param session: original session :param encoding: encoding to use (default is 'utf-8') :param kwargs: additional arguments :return: WMessengerTextEnvelope """ message = envelope.message() message = message.decode() if encoding is None else message.decode(encoding) return WMessengerTextEnvelope(message, meta=envelope)
[docs]class WMessengerHexLayer(WMessengerOnionCoderLayerProto): """ :class:`.WMessengerOnionCoderLayerProto` class implementation. This class translate message to corresponding hex-string, or decodes it from hex-string to original binary representation. """ __layer_name__ = "com.binblob.wasp-general.hex-layer" """ Layer name """ def __init__(self): """ Construct new layer """ WMessengerOnionCoderLayerProto.__init__(self, WMessengerHexLayer.__layer_name__)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto) @verify_type(envelope=WMessengerBytesEnvelope) def encode(self, envelope, session, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.encode` method implementation. :param envelope: original envelope :param session: original session :param kwargs: additional arguments :return: WMessengerTextEnvelope """ return WMessengerTextEnvelope(str(WHex(envelope.message())), meta=envelope)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto) @verify_type(envelope=WMessengerTextEnvelope) def decode(self, envelope, session, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.decode` method implementation. :param envelope: original envelope :param session: original session :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ return WMessengerBytesEnvelope(bytes(WUnHex(envelope.message())), meta=envelope)
[docs]class WMessengerBase64Layer(WMessengerOnionCoderLayerProto): """ :class:`.WMessengerOnionCoderLayerProto` class implementation. This class translate binary message to the corresponding base64 encoded bytes, or decodes it from base64 encoded bytes to the original binary representation. """ __layer_name__ = "com.binblob.wasp-general.base64-layer" """ Layer name """ def __init__(self): """ Construct new layer """ WMessengerOnionCoderLayerProto.__init__(self, WMessengerBase64Layer.__layer_name__)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto) @verify_type(envelope=WMessengerBytesEnvelope) def encode(self, envelope, session, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.encode` method implementation. :param envelope: original envelope :param session: original session :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ return WMessengerBytesEnvelope(b64encode(envelope.message()), meta=envelope)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto) @verify_type(envelope=WMessengerBytesEnvelope) def decode(self, envelope, session, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.decode` method implementation. :param envelope: original envelope :param session: original session :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ return WMessengerBytesEnvelope(b64decode(envelope.message()), meta=envelope)
[docs]class WMessengerAESLayer(WMessengerOnionCoderLayerProto): """ :class:`.WMessengerOnionCoderLayerProto` class implementation. This class encrypts/decrypts message with the specified AES cipher """ __layer_name__ = "com.binblob.wasp-general.aes-layer" """ Layer name """ def __init__(self): """ Construct new layer """ WMessengerOnionCoderLayerProto.__init__(self, WMessengerAESLayer.__layer_name__)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto) @verify_type(envelope=WMessengerBytesEnvelope) @verify_type(aes_cipher=WAES) def encode(self, envelope, session, aes_cipher=None, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.encode` method implementation. :param envelope: original envelope :param session: original session :param aes_cipher: cipher to use :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ return WMessengerBytesEnvelope(aes_cipher.encrypt(envelope.message()), meta=envelope)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto) @verify_type(envelope=WMessengerBytesEnvelope) @verify_type(aes_cipher=WAES) def decode(self, envelope, session, aes_cipher=None, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.decode` method implementation. :param envelope: original envelope :param session: original session :param aes_cipher: cipher to use :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ return WMessengerBytesEnvelope(aes_cipher.decrypt(envelope.message(), decode=False), meta=envelope)
[docs]class WMessengerRSALayer(WMessengerOnionCoderLayerProto): """ :class:`.WMessengerOnionCoderLayerProto` class implementation. This class encrypts/decrypts message with specified RSA cipher """ __layer_name__ = "com.binblob.wasp-general.rsa-layer" """ Layer name """ def __init__(self): """ Construct new layer """ WMessengerOnionCoderLayerProto.__init__(self, WMessengerRSALayer.__layer_name__)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto, public_key=WRSA.wrapped_class, sha_digest_size=int) @verify_type(envelope=WMessengerBytesEnvelope) def encode(self, envelope, session, public_key=None, sha_digest_size=32, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.encode` method implementation. :param envelope: original envelope :param session: original session :param public_key: public key to encrypt :param sha_digest_size: SHA digest size to use :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ message = WRSA.encrypt(envelope.message(), public_key, sha_digest_size=sha_digest_size) return WMessengerBytesEnvelope(message, meta=envelope)
[docs] @verify_type('paranoid', session=WMessengerOnionSessionProto, private_key=WRSA.wrapped_class) @verify_type('paranoid', sha_digest_size=int) @verify_type(envelope=WMessengerBytesEnvelope) def decode(self, envelope, session, private_key=None, sha_digest_size=32, **kwargs): """ :meth:`.WMessengerOnionCoderLayerProto.decode` method implementation. :param envelope: original envelope :param session: original session :param private_key: private key to decrypt :param sha_digest_size: SHA digest size to use :param kwargs: additional arguments :return: WMessengerBytesEnvelope """ message = WRSA.decrypt(envelope.message(), private_key, sha_digest_size=sha_digest_size) return WMessengerBytesEnvelope(message, meta=envelope)