# -*- coding: utf-8 -*-
# wasp_general/os/linux/mounts.py
#
# Copyright (C) 2017 the wasp-general authors and contributors
# <see AUTHORS file>
#
# This file is part of wasp-general.
#
# Wasp-general is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Wasp-general is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with wasp-general. If not, see <http://www.gnu.org/licenses/>.
# TODO: document the code
# TODO: write tests for the code
# noinspection PyUnresolvedReferences
from wasp_general.version import __author__, __version__, __credits__, __license__, __copyright__, __email__
# noinspection PyUnresolvedReferences
from wasp_general.version import __status__
import os
import subprocess
from wasp_general.verify import verify_type, verify_value
[docs]class WMountPoint:
""" This class supply information about system mount points. It allows to find mount point by a custom file path.
"""
__mounts_file__ = '/proc/mounts'
""" System file with current mounts information
"""
@verify_type(mount_record=str)
def __init__(self, mount_record):
""" Create new mount point information by parsing mount point record (a single line from /proc/mounts
file)
:param mount_record: line to parse
"""
parsed_record = mount_record.split()
if len(parsed_record) != 6:
raise RuntimeError('Unable to parse mount record: %s' % mount_record)
device = parsed_record[0]
self.__device = os.path.realpath(device) if os.path.isabs(device) else device
self.__device_name = os.path.basename(self.__device)
self.__path = parsed_record[1]
self.__fs = parsed_record[2]
self.__options = tuple(parsed_record[3].split(','))
def check_flag(flag):
if flag == '0':
return False
elif flag == '1':
return True
else:
raise RuntimeError('Invalid dump-flag spotted')
self.__dump_flag = check_flag(parsed_record[4])
self.__fsck_order = int(parsed_record[5])
[docs] def device(self):
""" Return a mount point device path
:return: str
"""
return self.__device
[docs] def device_name(self):
""" Return a mount point device name (a base name of a device path)
:return: str
"""
return self.__device_name
[docs] def path(self):
""" Return a mount point path (path where this mount point is mounted to)
:return: str
"""
return self.__path
[docs] def fs(self):
""" Return a mount point filesystem name
:return: str
"""
return self.__fs
[docs] def options(self):
""" Return a mount point options (options with which this point was mounted)
:return: tuple of str
"""
return self.__options
[docs] def dump_flag(self):
""" Return dump flag for this mount point
:return: bool
"""
return self.__dump_flag
[docs] def fsck_order(self):
""" Return fsck (filesystem check) order for this mount point
:return: int
"""
return self.__fsck_order
[docs] @classmethod
def mounts(cls):
""" Return tuple of current mount points
:return: tuple of WMountPoint
"""
result = []
with open(cls.__mounts_file__) as f:
for mount_record in f:
result.append(WMountPoint(mount_record))
return tuple(result)
[docs] @classmethod
@verify_type(file_path=str)
@verify_value(file_path=lambda x: len(x) > 0)
def mount_point(cls, file_path):
""" Return mount point that, where the given path is reside on
:param file_path: target path to search
:return: WMountPoint or None (if file path is outside current mount points)
"""
mount = None
for mp in cls.mounts():
mp_path = mp.path()
if file_path.startswith(mp_path) is True:
if mount is None or len(mount.path()) <= len(mp_path):
mount = mp
return mount
[docs] @classmethod
@verify_type(device=str, mount_directory=str, fs=(str, None), options=(list, tuple, set, None))
@verify_type(cmd_timeout=(int, float, None), sudo=bool)
@verify_value(device=lambda x: len(x) > 0, mount_directory=lambda x: len(x) > 0)
@verify_value(fs=lambda x: x is None or len(x) > 0, cmd_timeout=lambda x: x is None or x > 0)
def mount(cls, device, mount_directory, fs=None, options=None, cmd_timeout=None, sudo=False):
""" Mount a device to mount directory
:param device: device to mount
:param mount_directory: target directory where the given device will be mounted to
:param fs: optional, filesystem on the specified device. If specifies - overrides OS filesystem \
detection with this value.
:param options: specifies mount options (OS/filesystem dependent)
:param cmd_timeout: if specified - timeout with which this mount command should be evaluated (if \
command isn't complete within the given timeout - an exception will be raised)
:param sudo: whether to use sudo to run mount command
:return: None
"""
cmd = [] if sudo is False else ['sudo']
cmd.extend(['mount', device, os.path.abspath(mount_directory)])
if fs is not None:
cmd.extend(['-t', fs])
if options is not None and len(options) > 0:
cmd.append('-o')
cmd.extend(options)
subprocess.check_output(cmd, timeout=cmd_timeout)
[docs] @classmethod
@verify_type(device_or_directory=str, cmd_timeout=(int, float, None), sudo=bool)
@verify_value(device_or_directory=lambda x: len(x) > 0, cmd_timeout=lambda x: x is None or x > 0)
def umount(cls, device_or_directory, cmd_timeout=None, sudo=False):
""" Unmount device (or mount directory)
:param device_or_directory: device name or mount directory to unmount
:param cmd_timeout: if specified - timeout with which this unmount command should be evaluated (if \
command isn't complete within the given timeout - an exception will be raised)
:param sudo: whether to use sudo to run mount command
:return: None
"""
cmd = [] if sudo is False else ['sudo']
cmd.extend(['umount', device_or_directory])
subprocess.check_output(cmd, timeout=cmd_timeout)