Source code for wasp_general.network.primitives

# -*- coding: utf-8 -*-
# wasp_general/network/primitives.py
#
# Copyright (C) 2016 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

import re

from wasp_general.verify import verify_type, verify_value

from wasp_general.types.binarray import WBinArray
from wasp_general.types.bytearray import WFixedSizeByteArray


[docs]class WMACAddress: """ Represent Ethernet/WiFi MAC address. see also https://en.wikipedia.org/wiki/MAC_address """ octet_count = 6 """ Address bytes count """ re_dash_format = re.compile( "^[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}$" ) """ Regular expression for "dash"-written MAC address like '00-11-22-33-44-55' """ re_colon_format = re.compile( "^[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}$" ) """ Regular expression for "colon"-written MAC address like '00:11:22:33:44:55' """ re_cisco_format = re.compile("^[0-9a-fA-F]{4}.[0-9a-fA-F]{4}.[0-9a-fA-F]{4}$") """ Regular expression for MAC address written in "Cisco" style like '0011.2233.4455' """ re_spaceless_format = re.compile("^[0-9a-fA-F]{12}$") """ Regular expression for MAC address written without separator like '001122334455' """ @verify_type('paranoid', address=(str, WBinArray, int, None)) def __init__(self, address=None): """ Construct new address :param address: value with which this address is initialized (zeroes are used by default). """ self.__address = WFixedSizeByteArray(WMACAddress.octet_count) if address is not None: if isinstance(address, str): self.__address = WFixedSizeByteArray( WMACAddress.octet_count, WMACAddress.from_string(address).bin_address() ) else: self.__address = WFixedSizeByteArray(WMACAddress.octet_count, address)
[docs] def bin_address(self): """ Get this address as sequence of bits :return: WBinArray """ return self.__address.bin_value()
[docs] @staticmethod @verify_type(address=str) def from_string(address): """ Return new object by the given MAC-address :param address: address to convert :return: WMACAddress """ str_address = None if WMACAddress.re_dash_format.match(address): str_address = "".join(address.split("-")) elif WMACAddress.re_colon_format.match(address): str_address = "".join(address.split(":")) elif WMACAddress.re_cisco_format.match(address): str_address = "".join(address.split(".")) elif WMACAddress.re_spaceless_format.match(address): str_address = address if str_address is None: raise ValueError("Invalid MAC address format: " + address) result = WMACAddress() for octet_index in range(WMACAddress.octet_count): octet = str_address[:2] result.__address[octet_index] = int(octet, 16) str_address = str_address[2:] return result
def __str__(self): """ Convert to string :return: str """ address = ["{:02x}".format(int(x)) for x in self.__address] return ':'.join(address) def __bytes__(self): """ Convert to bytes :return: bytes """ return bytes(self.__address)
[docs]class WIPV4Address: """ Represent IPv4 address. see also https://en.wikipedia.org/wiki/IPv4#Address_representations """ octet_count = 4 """ Address bytes count """ @verify_type('paranoid', address=(str, bytes, WBinArray, int, None)) def __init__(self, address=None): """ Create new address :param address: value with which this address is initialized (zeroes are used by default). """ self.__address = WFixedSizeByteArray(WIPV4Address.octet_count) if address is not None: if isinstance(address, str): self.__address = WFixedSizeByteArray( WIPV4Address.octet_count, WIPV4Address.from_string(address).bin_address() ) else: self.__address = WFixedSizeByteArray(WIPV4Address.octet_count, address)
[docs] def bin_address(self): """ Convert address to WBinArray :return: WBinArray """ return self.__address.bin_value()
def __bytes__(self): """ Convert address to bytes :return: bytes """ return bytes(self.__address)
[docs] @staticmethod @verify_type(address=str) def from_string(address): """ Parse string for IPv4 address :param address: address to parse :return: """ address = address.split('.') if len(address) != WIPV4Address.octet_count: raise ValueError('Invalid ip address: %s' % address) result = WIPV4Address() for i in range(WIPV4Address.octet_count): result.__address[i] = WBinArray(int(address[i]), WFixedSizeByteArray.byte_size) return result
[docs] @staticmethod @verify_type(dns_format=bool) def to_string(address, dns_format=False): """ Convert address to string :param address: WIPV4Address to convert :param dns_format: whether to use arpa-format or not :return: """ if isinstance(address, WIPV4Address) is False: raise TypeError('Invalid address type') address = [str(int(x)) for x in address.__address] if dns_format is False: return '.'.join(address) address.reverse() return ('.'.join(address) + '.in-addr.arpa')
def __str__(self): return WIPV4Address.to_string(self)
[docs]class WNetworkIPV4: """ This class represent IPv4 network address. Depends on a flag network_address (that is passed to constructor) object can represent separate host address with network mask or be an address of a IP network. see also https://en.wikipedia.org/wiki/IPv4 """ delimiter = '/' """ Separator that separate network address from network mask. """ @verify_type(address=(str, tuple, list, set), network_address=bool) @verify_value(address=lambda x: isinstance(x, (tuple, list, set)) is False or len(x) == 2) @verify_value(address=lambda x: isinstance(x, (tuple, list, set)) is False or isinstance(x[0], WIPV4Address)) @verify_value(address=lambda x: isinstance(x, (tuple, list, set)) is False or isinstance(x[1], int)) def __init__(self, address, network_address=True): """ Construct new network address :param address: address as a string or as tuple/list/set. where the first element is WIPV4Address and \ the second one is network mask (as bits count) :param network_address: whether this object is host address with network mask or an address of a network """ if isinstance(address, str) is True: ip_address, self.__mask = address.strip().split(WNetworkIPV4.delimiter) self.__address = WIPV4Address(ip_address) else: self.__address = address[0] self.__mask = address[1] self.__mask = int(self.__mask) if self.__mask < 0 or self.__mask > len(self.__address.bin_address()): raise ValueError( 'Invalid network mask: %s (network specified: %s)' % (str(self.__mask), address) ) self.__network_address__ = network_address if network_address: bin_address = self.__address.bin_address() if int(bin_address[self.__mask:]) != 0: raise ValueError( 'Invalid network mask: %s (network specified: %s)' % (str(self.__mask), address) )
[docs] def address(self): """ Return IP address :return: WIPV4Address """ return self.__address
[docs] def mask(self): """ Return network mask (as bits count) :return: int """ return self.__mask
[docs] @verify_type(skip_network_address=bool) def first_address(self, skip_network_address=True): """ Return the first IP address of this network :param skip_network_address: this flag specifies whether this function returns address of the network \ or returns address that follows address of the network (address, that a host could have) :return: WIPV4Address """ bin_address = self.__address.bin_address() bin_address_length = len(bin_address) if self.__mask > (bin_address_length - 2): skip_network_address = False for i in range(bin_address_length - self.__mask): bin_address[self.__mask + i] = 0 if skip_network_address: bin_address[bin_address_length - 1] = 1 return WIPV4Address(bin_address)
[docs] @verify_type(skip_broadcast_address=bool) def last_address(self, skip_broadcast_address=True): """ Return the last IP address of this network :param skip_broadcast_address: this flag specifies whether to skip the very last address (that is \ usually used as broadcast address) or not. :return: WIPV4Address """ bin_address = self.__address.bin_address() bin_address_length = len(bin_address) if self.__mask > (bin_address_length - 2): skip_broadcast_address = False for i in range(bin_address_length - self.__mask): bin_address[self.__mask + i] = 1 if skip_broadcast_address: bin_address[bin_address_length - 1] = 0 return WIPV4Address(bin_address)
[docs] def iterator(self, skip_network_address=True, skip_broadcast_address=True): """ Return iterator, that can iterate over network addresses :param skip_network_address: same as skip_network_address in :meth:`.NetworkIPV4.first_address` method :param skip_broadcast_address: same as skip_broadcast_address in :meth:`.NetworkIPV4.last_address` \ method :return: NetworkIPV4Iterator """ return WNetworkIPV4Iterator(self, skip_network_address, skip_broadcast_address)
@verify_type(address=WIPV4Address) def __contains__(self, address): """ Check if this network contains specified IP address. :param address: address to check :return: bool """ int_value = int(address.bin_address()) first_address = int(self.first_address().bin_address()) last_address = int(self.last_address().bin_address()) return int_value >= first_address and int_value <= last_address
[docs] @staticmethod @verify_type(address=WIPV4Address) def is_multicast(address): """ Check if address is a multicast address. :param address: IP address to check :return: bool see also https://tools.ietf.org/html/rfc5771 """ return address in WNetworkIPV4('224.0.0.0/4')
[docs]class WNetworkIPV4Iterator: """ This iterator iterates over IP network addresses. """ @verify_type(network=(WIPV4Address, WNetworkIPV4), skip_network_address=bool, skip_broadcast_address=bool) def __init__(self, network, skip_network_address=True, skip_broadcast_address=True): """ Create new iterator :param network: network to iterate. If it is WIPV4Address instance, then iterate over single address :param skip_network_address: same as skip_network_address in :meth:`.NetworkIPV4.first_address` method :param skip_broadcast_address: same as skip_broadcast_address in :meth:`.NetworkIPV4.last_address` \ method """ if isinstance(network, WIPV4Address) is True: self.__network = WNetworkIPV4((network, len(network.bin_address()))) else: # isinstance(network, NetworkIPV4) is True self.__network = network self.__skip_network_address = skip_network_address self.__skip_broadcast_address = skip_broadcast_address def __iter__(self): """ Iterate call :return: None """ first_address = self.__network.first_address(skip_network_address=self.__skip_network_address) last_address = self.__network.last_address(skip_broadcast_address=self.__skip_broadcast_address) for i in range(int(first_address.bin_address()), int(last_address.bin_address()) + 1): yield WIPV4Address(i)
[docs]class WIPPort: """ Represent TCP/UDP IP port see also: https://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_ports https://en.wikipedia.org/wiki/User_Datagram_Protocol#Service_ports """ minimum_port_number = 1 """ Minimum port number """ maximum_port_number = 65535 """ Maximum port number """ @verify_type(port=int) @verify_value(port=lambda x: WIPPort.minimum_port_number <= x <= WIPPort.maximum_port_number) def __init__(self, port): """ Construct new IP port :param port: initialization value """ self.__port = port def __int__(self): """ Return port number as int :return: int """ return self.__port def __str__(self): """ Return port number as string :return: str """ return str(self.__port)
[docs]class WFQDN: """ Represent single fully qualified domain name (FQDN). see also https://en.wikipedia.org/wiki/Fully_qualified_domain_name """ re_label = re.compile('^[a-zA-Z0-9\-]{1,63}$') """ Regular expression for FQDN label (sequence between dots) as is specified in .... """ maximum_fqdn_length = 253 @verify_type(address=(str, list, tuple, set, None)) def __init__(self, address=None): """ Construct new FQDN. If no address is specified, then this FQDN represent root node, which is '.' :param address: FQDN address in string or in list/tuple/set of labels """ self._labels = [] if isinstance(address, str) is True: self._labels = WFQDN.from_string(address)._labels elif isinstance(address, (list, tuple, set)) is True: self._labels = WFQDN.from_string('.'.join(address))._labels
[docs] @staticmethod @verify_type(address=str) def from_string(address): """ Convert doted-written FQDN address to WFQDN object :param address: address to convert :return: WFQDN """ if len(address) == 0: return WFQDN() if address[-1] == '.': address = address[:-1] if len(address) > WFQDN.maximum_fqdn_length: raise ValueError('Invalid address') result = WFQDN() for label in address.split('.'): if isinstance(label, str) and WFQDN.re_label.match(label): result._labels.append(label) else: raise ValueError('Invalid address') return result
[docs] @staticmethod @verify_type(leading_dot=bool) def to_string(address, leading_dot=False): """ Return doted-written address by the given WFQDN object :param address: address to convert :param leading_dot: whether this function place leading dot to the result or not :return: str """ if isinstance(address, WFQDN) is False: raise TypeError('Invalid type for FQDN address') result = '.'.join(address._labels) return result if leading_dot is False else (result + '.')
def __str__(self): """ Return string :return: str """ return WFQDN.to_string(self)
[docs] @staticmethod @verify_type(idn_fqdn=str) def punycode(idn_fqdn): """ Create WFQDN from IDN (Internationalized domain name) by reverting it to punycode :param idn_fqdn: internationalized domain name to convert :return: WFQDN see also https://en.wikipedia.org/wiki/Internationalized_domain_name see also https://en.wikipedia.org/wiki/Punycode """ return WFQDN(idn_fqdn.encode('idna').decode('ascii'))
[docs]class WIPV4SocketInfo: """ Represent socket information - IP address (or domain name) and port number. Mainly used for python socket module. see :meth:`.WIPV4SocketInfo.pair` """ @verify_type('paranoid', address=(WFQDN, WIPV4Address, str, WBinArray, int, None), port=(WIPPort, int, None)) def __init__(self, address=None, port=None): """ Construct new pair :param address: associated IP address :param port: associated IP port """ self.__address = None if address is not None: if isinstance(address, (WFQDN, WIPV4Address)) is True: self.__address = address elif isinstance(address, (WBinArray, int)) is True: self.__address = WIPV4Address(address) else: self.__address = WIPV4SocketInfo.parse_address(address) self.__port = None if port is not None: self.__port = port if isinstance(port, WIPPort) else WIPPort(port)
[docs] def address(self): """ Return associated IP address or None if not available :return: WIPV4Address or WFQDN or None """ return self.__address
[docs] def port(self): """ Return associated IP port or None if not available :return: WIPPort or None """ return self.__port
[docs] def pair(self): """ Return tuple (address, port), where address is a string (empty string if self.address() is None) and port is an integer (zero if self.port() is None). Mainly, this tuple is used with python socket module (like in bind method) :return: 2 value tuple of str and int. """ address = str(self.__address) if self.__address is not None else '' port = int(self.__port) if self.__port is not None else 0 return address, port
[docs] @staticmethod @verify_type(address=str) def parse_address(address): """ Parse string and return :class:`.WIPV4Address` object if an IP address is specified, :class:`.WFQDN` if domain name is specified and None if the string is empty. :param address: string to parse :return: WIPV4Address or WFQDN or None """ if len(address) == 0: return None try: return WIPV4Address(address) except ValueError: pass try: return WFQDN(address) except ValueError: pass raise ValueError('Unable to parse address string. It must be an IP address or FQDN')
[docs] @classmethod @verify_type(info=str) @verify_value(info=lambda x: len(x) > 0) def parse_socket_info(cls, info): """ Parse string that is formed like '[address]<:port>' and return corresponding :class:`.WIPV4ScketInfo` object :param info: string to parse :return: WIPV4ScketInfo """ info = info.split(':') if len(info) > 2: raise ValueError('Incorrect socket info specified') address = info[0].strip() port = int(info[1].strip()) if len(info) == 2 else None return WIPV4SocketInfo(address=address, port=port)