Source code for wasp_general.os.linux.lvm

# -*- coding: utf-8 -*-
# wasp_general/os/linux/lvm.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general.  If not, see <http://www.gnu.org/licenses/>.

# TODO: document the code
# TODO: write tests for the code

# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__

import subprocess
import os
import math

from wasp_general.verify import verify_type, verify_value
from wasp_general.os.linux.mounts import WMountPoint


[docs]class WLVMInfoCommand: """ This is a helper, with which it is easier to call for pvdisplay, vgdisplay or lvdisplay program. This class uses subprocess.check_output method for a program calling. And when non-zero code is returned by the program, an subprocess.CalledProcessError exception is raised. There is a timeout for a program to be complete. If a program wasn't completed for that period of time, subprocess.TimeoutExpired exception is raised """ __lvm_cmd_default_timeout__ = 3 """ Default timeout for command to process """ @verify_type(command=str, fields_count=int, cmd_timeout=(int, float, None), sudo=bool) @verify_value(cmd_timeout=lambda x: x is None or x > 0) def __init__(self, command, fields_count, cmd_timeout=None, sudo=False): """ Create new command :param command: program to execute :param fields_count: fields in a program output :param cmd_timeout: timeout for a program (if it is None - then default value is used) :param sudo: flag - whether to run this program with sudo or not """ self.__command = command self.__fields_count = fields_count self.__cmd_timeout = cmd_timeout if cmd_timeout is not None else self.__lvm_cmd_default_timeout__ self.__sudo = sudo
[docs] def command(self): """ Return target program :return: str """ return self.__command
[docs] def fields_count(self): """ Return number of fields in a program output :return: int """ return self.__fields_count
[docs] def cmd_timeout(self): """ Timeout for a program to complete :return: int, float """ return self.__cmd_timeout
[docs] def sudo(self): """ Return 'sudo' flag (whether to run this program with sudo or not) :return: bool """ return self.__sudo
[docs] @verify_type(name=(str, None)) def lvm_info(self, name=None): """ Call a program :param name: if specified - program will return information for that lvm-entity only. otherwise - all available entries are returned :return: tuple of str (fields) """ cmd = [] if self.sudo() is False else ['sudo'] cmd.extend([self.command(), '-c']) if name is not None: cmd.append(name) output = subprocess.check_output(cmd, timeout=self.cmd_timeout()) output = output.decode() result = [] fields_count = self.fields_count() for line in output.split('\n'): line = line.strip() fields = line.split(':') if len(fields) == fields_count: result.append(fields) if name is not None and len(result) != 1: raise RuntimeError('Unable to parse command result') return tuple(result)
[docs]class WLVMInfo: """ Basic class for actual LVM information. This class creates :class:`.WLVMInfoCommand` object which may be called on an object creation (it depends on constructor parameters) """ __lvm_info_cmd_timeout__ = 3 """ Timeout for a program to complete """ @verify_type('paranoid', command=str, fields_count=int, sudo=bool) @verify_type(lvm_entity=(str, tuple, list, set)) @verify_value(lvm_entity=lambda x: len(x) > 0 if isinstance(x, str) else True) def __init__(self, command, fields_count, lvm_entity, sudo=False): """ Create new info-object :param command: same as command in :meth:`.WLVMInfoCommand.__init__` :param fields_count: same as fields_count in :meth:`.WLVMInfoCommand.__init__` :param lvm_entity: if this is a list/tuple/set - then it is a collection of fields (collection length \ must be the same as 'fields_count'). If it is a string, then command is executed to get corresponding \ fields :param sudo: same as sudo in :meth:`.WLVMInfoCommand.__init__` """ self.__lvm_command = WLVMInfoCommand( command, fields_count, cmd_timeout=self.__class__.__lvm_info_cmd_timeout__, sudo=sudo ) if isinstance(lvm_entity, (tuple, list, set)) is True: if len(lvm_entity) != fields_count: raise ValueError( 'Invalid lvm entity fields count: %i (expected: %i)' % (len(lvm_entity), fields_count) ) self.__lvm_entity = tuple(lvm_entity) else: self.__lvm_entity = (self.lvm_command().lvm_info(lvm_entity)[0])
[docs] def lvm_command(self): """ Return LVM-command object :return: WLVMInfoCommand """ return self.__lvm_command
[docs] def lvm_entity(self): """ Return object fields :return: tuple of str (fields) """ return self.__lvm_entity
[docs]class WPhysicalVolume(WLVMInfo): """ Class represent a physical volume """ @verify_type('paranoid', physical_volume=(str, tuple, list, set), sudo=bool) @verify_value('paranoid', physical_volume=lambda x: len(x) > 0 if isinstance(x, str) else True) def __init__(self, physical_volume, sudo=False): """ Create new physical volume descriptor :param physical_volume: same as 'lvm_entity' in :meth:`.WLVMInfo.__init__` :param sudo: same as 'sudo' in :meth:`.WLVMInfo.__init__` """ WLVMInfo.__init__(self, 'pvdisplay', 12, physical_volume, sudo=sudo)
[docs] def all(self): """ Return every physical volume in the system :return: tuple of WPhysicalVolume """ return tuple([WPhysicalVolume(x) for x in self.lvm_command().lvm_info()])
[docs] def device_name(self): """ Return physical volume device name :return: str """ return self.lvm_entity()[0]
[docs] def volume_group(self): """ Return related volume group name (may be empty string if this volume is not allocated to any) :return: str """ return self.lvm_entity()[1]
[docs] def sectors_count(self): """ Return physical volume size in sectors :return: int """ return int(self.lvm_entity()[2])
[docs] def extent_size(self): """ Return physical extent size in kilobytes (may have 0 value if this volume is not allocated to any) :return: int """ return int(self.lvm_entity()[7])
[docs] def total_extents(self): """ Return total number of physical extents (may have 0 value if this volume is not allocated to any) :return: int """ return int(self.lvm_entity()[8])
[docs] def free_extents(self): """ Return free number of physical extents (may have 0 value if this volume is not allocated to any) :return: int """ return int(self.lvm_entity()[9])
[docs] def allocated_extents(self): """ Return allocated number of physical extents (may have 0 value if this volume is not allocated to \ any) :return: int """ return int(self.lvm_entity()[10])
[docs] def uuid(self): """ Return physical volume UUID :return: str """ return self.lvm_entity()[11]
[docs]class WVolumeGroup(WLVMInfo): """ Class represent a volume group """ @verify_type('paranoid', volume_group=(str, tuple, list, set), sudo=bool) @verify_value('paranoid', volume_group=lambda x: len(x) > 0 if isinstance(x, str) else True) def __init__(self, volume_group, sudo=False): """ Create new volume group descriptor :param volume_group: same as 'lvm_entity' in :meth:`.WLVMInfo.__init__` :param sudo: same as 'sudo' in :meth:`.WLVMInfo.__init__` """ WLVMInfo.__init__(self, 'vgdisplay', 17, volume_group, sudo=sudo)
[docs] def all(self): """ Return every volume group in the system :return: tuple of WVolumeGroup """ return tuple([WVolumeGroup(x) for x in self.lvm_command().lvm_info()])
[docs] def group_name(self): """ Return volume group name :return: str """ return self.lvm_entity()[0]
[docs] def group_access(self): """ Return volume group access :return: str """ return self.lvm_entity()[1]
[docs] def maximum_logical_volumes(self): """ Return maximum number of logical volumes (0 - for unlimited) :return: int """ return int(self.lvm_entity()[4])
[docs] def logical_volumes(self): """ Return current number of logical volumes :return: int """ return int(self.lvm_entity()[5])
[docs] def opened_logical_volumes(self): """ Return open count of all logical volumes in this volume group :return: int """ return int(self.lvm_entity()[6])
[docs] def maximum_physical_volumes(self): """ Return maximum number of physical volumes (0 - for unlimited) :return: int """ return int(self.lvm_entity()[8])
[docs] def physical_volumes(self): """ Return current number of physical volumes :return: int """ return int(self.lvm_entity()[9])
[docs] def actual_volumes(self): """ Return actual number of physical volumes :return: int """ return int(self.lvm_entity()[10])
[docs] def size(self): """ Return size of volume group in kilobytes :return: int """ return int(self.lvm_entity()[11])
[docs] def extent_size(self): """ Return physical extent size in kilobytes :return: int """ return int(self.lvm_entity()[12])
[docs] def total_extents(self): """ Return total number of physical extents for this volume group :return: int """ return int(self.lvm_entity()[13])
[docs] def allocated_extents(self): """ Return allocated number of physical extents for this volume group :return: int """ return int(self.lvm_entity()[14])
[docs] def free_extents(self): """ Return free number of physical extents for this volume group :return: int """ return int(self.lvm_entity()[15])
[docs] def uuid(self): """ Return UUID of volume group :return: str """ return self.lvm_entity()[16]
[docs]class WLogicalVolume(WLVMInfo): """ Class represent a logical volume """ __lvm_snapshot_create_cmd_timeout__ = 3 """ Timeout for snapshot creation command to complete """ __lvm_snapshot_remove_cmd_timeout__ = 3 """ Timeout for snapshot removing command to complete """ __lvm_snapshot_check_cmd_timeout__ = 3 """ Timeout for snapshot checking (getting parameters) command to complete """ __snapshot_maximum_allocation__ = 99.9 """ Maximum space usage for snapshot, till that value snapshot is treated as valid """ @verify_type('paranoid', logical_volume=(str, tuple, list, set), sudo=bool) @verify_value('paranoid', logical_volume=lambda x: len(x) > 0 if isinstance(x, str) else True) def __init__(self, logical_volume, sudo=False): """ Create new logical volume descriptor :param logical_volume: same as 'lvm_entity' in :meth:`.WLVMInfo.__init__` :param sudo: same as 'sudo' in :meth:`.WLVMInfo.__init__` """ WLVMInfo.__init__(self, 'lvdisplay', 13, logical_volume, sudo=sudo)
[docs] def all(self): """ Return every logical volume in the system :return: tuple of WLogicalVolume """ return tuple([WLogicalVolume(x) for x in self.lvm_command().lvm_info()])
[docs] def volume_path(self): """ Return logical volume path :return: str """ return self.lvm_entity()[0]
[docs] def volume_name(self): """ Return logical volume name :return: str """ return os.path.basename(self.volume_path())
[docs] def volume_group_name(self): """ Return volume group name :return: str """ return self.lvm_entity()[1]
[docs] def volume_group(self): """ Return volume group :return: WVolumeGroup """ return WVolumeGroup(self.volume_group_name(), sudo=self.lvm_command().sudo())
[docs] def sectors_count(self): """ Return logical volume size in sectors :return: int """ return int(self.lvm_entity()[6])
[docs] def extents_count(self): """ Return current logical extents associated to logical volume :return: int """ return int(self.lvm_entity()[7])
[docs] def device_number(self): """ Return tuple of major and minor device number of logical volume :return: tuple of int """ return int(self.lvm_entity()[11]), int(self.lvm_entity()[12])
[docs] def uuid(self): """ Return UUID of logical volume :return: str """ uuid_file = '/sys/block/%s/dm/uuid' % os.path.basename(os.path.realpath(self.volume_path())) lv_uuid = open(uuid_file).read().strip() if lv_uuid.startswith('LVM-') is True: return lv_uuid[4:] return lv_uuid
[docs] @verify_type(snapshot_size=(int, float), snapshot_suffix=str) @verify_value(snapshot_size=lambda x: x > 0, snapshot_suffix=lambda x: len(x) > 0) def create_snapshot(self, snapshot_size, snapshot_suffix): """ Create snapshot for this logical volume. :param snapshot_size: size of newly created snapshot volume. This size is a fraction of the source \ logical volume space (of this logical volume) :param snapshot_suffix: suffix for logical volume name (base part is the same as the original volume \ name) :return: WLogicalVolume """ size_extent = math.ceil(self.extents_count() * snapshot_size) size_kb = self.volume_group().extent_size() * size_extent snapshot_name = self.volume_name() + snapshot_suffix lvcreate_cmd = ['sudo'] if self.lvm_command().sudo() is True else [] lvcreate_cmd.extend([ 'lvcreate', '-L', '%iK' % size_kb, '-s', '-n', snapshot_name, '-p', 'r', self.volume_path() ]) subprocess.check_output(lvcreate_cmd, timeout=self.__class__.__lvm_snapshot_create_cmd_timeout__) return WLogicalVolume(self.volume_path() + snapshot_suffix, sudo=self.lvm_command().sudo())
[docs] def remove_volume(self): """ Remove this volume :return: None """ lvremove_cmd = ['sudo'] if self.lvm_command().sudo() is True else [] lvremove_cmd.extend(['lvremove', '-f', self.volume_path()]) subprocess.check_output(lvremove_cmd, timeout=self.__class__.__lvm_snapshot_remove_cmd_timeout__)
[docs] def snapshot_allocation(self): """ Return allocated size (fraction of total snapshot volume space). If this is not a snapshot volume, than RuntimeError exception is raised. :return: float """ check_cmd = ['lvs', self.volume_path(), '-o', 'snap_percent', '--noheadings'] output = subprocess.check_output(check_cmd, timeout=self.__class__.__lvm_snapshot_check_cmd_timeout__) output = output.decode().strip() if len(output) == 0: raise RuntimeError('Unable to check general logical volume') return float(output.replace(',', '.', 1))
[docs] def snapshot_corrupted(self): """ Check if this snapshot volume is corrupted or not :return: bool (True if corrupted, False - otherwise) """ return self.snapshot_allocation() > self.__class__.__snapshot_maximum_allocation__
[docs] @classmethod @verify_type('paranoid', file_path=str, sudo=bool) @verify_value('paranoid', file_path=lambda x: len(x) > 0) def logical_volume(cls, file_path, sudo=False): """ Return logical volume that stores the given path :param file_path: target path to search :param sudo: same as 'sudo' in :meth:`.WLogicalVolume.__init__` :return: WLogicalVolume or None (if file path is outside current mount points) """ mp = WMountPoint.mount_point(file_path) if mp is not None: name_file = '/sys/block/%s/dm/name' % mp.device_name() if os.path.exists(name_file): lv_path = '/dev/mapper/%s' % open(name_file).read().strip() return WLogicalVolume(lv_path, sudo=sudo)