+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" Helper functions and classes to deal with files and dirs and stuff
-
-Featuring::
-
-* the cd context manager pwd(); with cd(other_dir): pwd(); pwd();
- will print current working dir, then other_dir, then first dir again
-* class MountPoint and functions to:
-* * get all mount points
-* * find mount point that contains a given path
-* class FilesystemFillState and 2 methods to determine it:
-* * a wrapper around "df"
-* * a wrapper around statvfs (default since faster without forking)
-
-What I found out on the way about filesystems:
-* usually size != used + available
-* there are several sources that list "all" filesystems, each omiting different
- things:
-* * /etc/fstab --> maintained by admin and tools
-* * /etc/mtab --> usually a link to /proc/mounts
-* * /proc/mounts --> usually the longest
-* * df --> usually the shortest of all
-* with all those virtual filesystems in memory and bind-mounts, things get
- complicated!
-* forking for df takes time!
-
-.. note:: if decide to deprecate df_wrapper, can change FilesystemFillState to
- have full mount point info -> remove NOT_REAL_FILESYSTEMS_SPEC
-
-.. todo:: check what can be replaced by module psutil
-"""
-
-from __future__ import print_function
-
-from contextlib import contextmanager
-import os
-import os.path
-from warnings import warn
-from math import floor, ceil
-import re
-
-from .call_helpers import call_and_capture
-from .iter_helpers import pairwise
-
-
-@contextmanager
-def cd(path):
- """A context manager which changes the working directory to the given
- path, and then changes it back to its previous value on exit.
-
- Taken from comment for python recipe by Greg Warner at
- http://code.activestate.com/recipes/576620-changedirectory-context-manager/
- (MIT license)
- """
- prev_cwd = os.getcwd()
- os.chdir(path)
- try:
- yield
- finally:
- os.chdir(prev_cwd)
-
-
-DF_CMD = ['/usr/bin/df', '--no-sync', '--portability']
-DF_SIZE_UNIT = 1024
-FS_FILL_METHOD_DF = 'df'
-FS_FILL_METHOD_STATVFS = 'statvfs'
-
-#: types of mounts found in /proc/mounts to be ignored in get_all_mounts
-IGNORE_MOUNT_TYPES = 'cgroup', 'pstore'
-
-#: proper filesystems that usually correspond to data on the disc
-#: (value for field MountPoint.vfstype)
-REAL_FILESYSTEMS_TYPE = 'ext2', 'ext3', 'ext4', 'zfs', 'btrs', 'reiserfs', \
- 'nfs4'
-
-#: filesystem name (MountPoint.spec / FilesystemFillState.name) that usually
-#: does not correspond to something on disc (except, maybe, swap)
-#: (only still here because df does not give fs type result, so class
-#: FilesystemFillState does not have full mount info)
-NOT_REAL_FILESYSTEMS_SPEC = 'none', 'shmfs', 'procfs', 'tmpfs', 'ramfs', \
- 'proc', 'rootfs', 'sysfs', 'devpts', 'sunrpc', \
- 'nfsd'
-
-class FilesystemFillState:
- """ representation of 1 line of the 'df' command
-
- has fields filesystem, size, used, available, capacity, mount_point
-
- Note that only apprixomately capacity == used/size
- and that only approximately used + available == size
- and that all sizes are in bytes
- """
-
- def __init__(self):
- self.name = None
- self.size = None
- self.used = None
- self.available = None
- self.capacity = None
- self.mount_point = None
-
- def __str__(self):
- return '[Filesystem {0} mounted at {1}: {2}/{3} used]' \
- .format(self.name, self.mount_point,
- size_str(self.used), size_str(self.size))
-
-
-def get_filesystem_fill_states(method=FS_FILL_METHOD_STATVFS):
- """ get fill state on all filesystems
-
- :param method: FS_FILL_METHOD_DF (use df_wrapper, forks) or
- FS_FILL_METHOD_STATVFS (uses get_all_mounts(False), default)
- all other args and kwargs are forwarded
- """
-
- if method == FS_FILL_METHOD_DF:
- return df_wrapper()
- else:
- return get_all_statvfs_fills(include_duplicates=False)
-
-
-def df_wrapper():
- """ parses the output of cmd 'df'
-
- :returns: list of :py:class:`FilesystemFillState`
-
- Tries to give results compatible with those from get_filesystem_fill_states
- """
-
- # call
- code, out, err = call_and_capture(DF_CMD)
-
- # warn if unexpected outcome
- if code != 0:
- warn('df returned non-zero exit code {0}!'.format(code))
- if err:
- for line in err:
- warn('df produced output to stderr: "{0}"'.format(line))
-
- # find columns in output that are just spaces
- min_len = min(len(line) for line in out)
- separator_cols = [idx for idx in range(min_len) if \
- all(line[idx] == ' ' for line in out)]
- checked_cols = [separator_cols[0], ]
- for prev_col, col in pairwise(separator_cols):
- if col != prev_col+1:
- checked_cols.append(col)
- separator_cols = checked_cols
-
- # check columns and their header
- if len(separator_cols) != 5:
- raise ValueError('unexpected number of separator columns: {0}'
- .format(separator_cols)) # must eliminate neighbours?
-
- title_line = out[0]
- title = title_line[ : separator_cols[0]].strip()
- if title != 'Filesystem':
- warn('Unexpected column title: "{0}" != "Filesystem"!'
- .format(title))
- title = title_line[separator_cols[0] : separator_cols[1]].strip()
- if title != '1024-blocks':
- warn('Unexpected column title: "{0}" != "1024-blocks"!'
- .format(title))
- title = title_line[separator_cols[1] : separator_cols[2]].strip()
- if title != 'Used':
- warn('Unexpected column title: "{0}" != "Used"!'
- .format(title))
- title = title_line[separator_cols[2] : separator_cols[3]].strip()
- if title != 'Available':
- warn('Unexpected column title: "{0}" != "Available"!'
- .format(title))
- title = title_line[separator_cols[3] : separator_cols[4]].strip()
- if title != 'Capacity':
- warn('Unexpected column title: "{0}" != "Capacity"!'
- .format(title))
- title = title_line[separator_cols[4] : ].strip()
- if title != 'Mounted on':
- warn('Unexpected column title: "{0}" != "Mounted on"!'
- .format(title))
-
- # create result
- result = []
- for line in out[1:]:
- stats = FilesystemFillState()
- stats.name = line[ : separator_cols[0]].strip()
- stats.size = int(line[separator_cols[0] : separator_cols[1]].strip()) \
- * DF_SIZE_UNIT
- stats.used = int(line[separator_cols[1] : separator_cols[2]].strip()) \
- * DF_SIZE_UNIT
- stats.available = int(line[separator_cols[2] : separator_cols[3]]\
- .strip()) * DF_SIZE_UNIT
- stats.capacity = int(line[separator_cols[3] : separator_cols[4]]\
- .strip()[:-1])
- stats.mount_point = line[separator_cols[4] : ].strip()
-
- # more checks: does match capacity
- capacity = 100. * stats.used / stats.size
- if abs(capacity - stats.capacity) > 5.:
- warn('capacities for {0} deviate more than 5%: '
- '{1} != {2:.2f}(={3}/{4})'.format(
- stats.name, stats.capacity, capacity, stats.used, stats.size))
-
- size = stats.used + stats.available
- if float(abs(stats.size - size)) / float(max(stats.size, size)) > 0.1:
- warn('size for {0} differs by more than 10% from used+available!'
- .format(stats.name))
-
- result.append(stats)
-
- return result
-
-
-def get_all_statvfs_fills(mounts=None, include_duplicates=True):
- """ run get_fill_from_statvfs on given MountPoints or get_all_mounts """
-
- if mounts is None:
- mounts = get_all_mounts(include_duplicates)
- return [get_fill_from_statvfs(mount) for mount in mounts]
-
-
-def get_fill_from_statvfs(path):
- """ wrapper around os.statvfs
-
- :param path: a MountPoint, a path string or (since python3.3) a (os-level?)
- file descriptor
- :returns: FilesystemFillState
-
- Tries to give results compatible with those from df_wrapper
-
- runs os.statvfs, but actually only uses f_blocks, f_frsize, f_bsize,
- f_bavail of result to set values for fields size and available fields of
- :py:class:`FilesystemFillState`. Fields used and capacity are calculated
- from that.
-
- If given a path or file descriptor, field name is set to given arg path,
- mount_point is not set (but can easily determined from path using
- :py:func:`find_mount_point`).
-
- If given a MountPoint, name is set to path.spec and mount_point to
- path.file
- """
-
- result = FilesystemFillState()
- if isinstance(path, MountPoint):
- stat_struct = os.statvfs(path.file)
- result.name = path.spec
- result.mount_point = path.file
- else:
- stat_struct = os.statvfs(path)
- result.name = path
- # result.mount_point is not set
-
- # fields according to "man statvfs":
- # unsigned long f_bsize; /* Filesystem block size */
- # unsigned long f_frsize; /* Fragment size */
- # fsblkcnt_t f_blocks; /* Size of fs in f_frsize units */
- # fsblkcnt_t f_bfree; /* Number of free blocks */
- # fsblkcnt_t f_bavail; /* Number of free blocks for
- # unprivileged users */
- # fsfilcnt_t f_files; /* Number of inodes */
- # fsfilcnt_t f_ffree; /* Number of free inodes */
- # fsfilcnt_t f_favail; /* Number of free inodes for
- # unprivileged users */
- # unsigned long f_fsid; /* Filesystem ID */
- # unsigned long f_flag; /* Mount flags */
- # unsigned long f_namemax; /* Maximum filename length */
- # (f_fsid is not included in result on linux)
-
- result.size = stat_struct.f_blocks * stat_struct.f_frsize
- result.available = stat_struct.f_bavail * stat_struct.f_bsize
- result.used = result.size - result.available
- if result.size == 0:
- result.capacity = 0
- else:
- result.capacity = float(result.used) / float(result.size) * 100.
-
- return result
-
-
-class MountPoint:
- """ a mount point as in /proc/mounts a.k.a /etc/fstab
-
- (field names taken from man fstab)
- """
- def __init__(self):
- self.spec = None
- self.file = None
- self.vfstype = None
- self.options = None
- self.freq = None
- self.passno = None
-
- def __str__(self):
- if self.spec is None and self.file is None and self.type is None:
- return '[MountPoint uninitialized]'
- else:
- return '[MountPoint {0.spec} at {0.file} (type {0.vfstype})]' \
- .format(self)
-
-MOUNT_REGEXP = r'^\s*(?P<spec>.+)' \
- '\s+(?P<file>.+)' \
- '\s+(?P<vfstype>.+)' \
- '\s+(?P<options>.+)' \
- '\s+(?P<freq>\d+)' \
- '\s+(?P<passno>\d+)\s*$'
-
-def get_all_mounts(include_duplicates=True):
- """ parse /proc/mounts
-
- does not return those with type in :py:data:IGNORE_MOUNT_TYPES
-
- :param bool include_duplicates: if False, try to list every "real"
- filesystem only once, e.g. ignore bind
- mounts, return rootfs only if no other fs
- is mounted in same file; default: True
- :results: list of :class:`MountPoint`
- """
- result = []
- rootfs = []
- files = []
- specs = []
- with open('/proc/mounts', 'rt') as file_handle:
- for line in file_handle:
- parts = line.split()
- matches = re.match(MOUNT_REGEXP, line)
- if not matches:
- raise ValueError('failed to interpret mount line "{0}"!'
- .format(line))
- new_mount = MountPoint()
- for field_name, value in matches.groupdict().items():
- setattr(new_mount, field_name, value)
- if new_mount.vfstype in IGNORE_MOUNT_TYPES:
- continue
-
- if not include_duplicates:
- if new_mount.spec == 'rootfs':
- rootfs.append(new_mount)
- continue # deal with rootfs in the end
- if new_mount.file in files:
- warn('multiple non-rootfs mounts in same file {0}!'
- .format(new_mount.file))
- if new_mount.spec in specs \
- and new_mount.vfstype in REAL_FILESYSTEMS_TYPE:
- continue # e.g. bind mounts; ignore this mount
-
- # if we reach this, this is no duplicate; remember it
- files.append(new_mount.file)
- specs.append(new_mount.spec)
- result.append(new_mount)
-
- # if not include_duplicates:
- # add rootfs mounts only if no other mount is in same file
- for root_mount in rootfs:
- have_mount = False
- for mount in result:
- if mount.file == root_mount.file:
- have_mount = True # some other mount in same place
- break
- if not have_mount:
- result.append(root_mount)
-
- return result
-
-
-def get_mount_info(path):
- """ get MountPoint with file system info for given path """
-
- mount_point = find_mount_point(path)
-
- candidates = []
-
- with open('/proc/mounts', 'rt') as file_handle:
- for line in file_handle:
- parts = line.split()
- matches = re.match(MOUNT_REGEXP, line)
- if not matches:
- raise ValueError('failed to interpret mount line "{0}"!'
- .format(line))
- if matches.group('file') != mount_point:
- continue
- new_mount = MountPoint()
- for field_name, value in matches.groupdict().items():
- setattr(new_mount, field_name, value)
- candidates.append(new_mount)
-
- if not candidates:
- raise NotImplementedError('impossible: mount point not found in '
- 'fstab!')
- elif len(candidates) == 1:
- return candidates[0]
-
- # decide which candidates to use: return first that is a "real"
- # filesystem (e.g. prefer ext4 before rootfs for '/')
- for candidate in candidates:
- print(candidate.vfstype)
- if candidate.vfstype in REAL_FILESYSTEMS_TYPE and \
- candidate.spec not in NOT_REAL_FILESYSTEMS_SPEC:
- return candidate
-
- # otherwise just return first
- return candidates[0]
-
-
-
-def find_mount_point(path):
- """ repeat os.ismount of parent's parent's parent... until returns true
-
- taken from, answer by larsmans from Dec 15 2010 on
- http://stackoverflow.com/questions/4453602/how-to-find-the-mountpoint-a-file-resides-on
- """
- path = os.path.abspath(path)
- while not os.path.ismount(path):
- path = os.path.dirname(path)
- return path
-
-
-def size_str(byte_number, is_diff=False):
- """ round byte_number to something easily human-readable like '1.5G'
-
- :param bool is_diff: set to True to include a '+' or '-' in output;
- default: False
- """
-
- # constants
- units = 'B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'
- factor = 1024
- thresh_add_comma = 10. # below this, return 1.2G, above this return 12G
-
- # prepare
- if byte_number < 0:
- sign_str = '-'
- elif is_diff:
- sign_str = '+'
- else:
- sign_str = ''
- curr_fac = 1
- curr_num = abs(float(byte_number))
-
- # loop
- for unit in units:
- if curr_num > factor:
- curr_num /= factor
- continue
- elif curr_num < thresh_add_comma and unit != 'B': # e.g. 1.2G
- return '{2}{0:.1f}{1}'.format(curr_num, unit, sign_str)
- else: # e.g. 12G or 1B
- return '{2}{0:d}{1}'.format(int(round(curr_num)), unit, sign_str)
-
- # have an impossible amount of data. (>1024**4 GB)
- # undo last "/factor" and show thousand-separator
- return '{2}{0:,d}{1}'.format(int(round(curr_num*factor)), units[-1],
- sign_str)
-
-
-#: regular expression defining a char range in glob/fnmatch:
-#: matches for example: bla_[abcd]_bla, bla_[a-d]_bla, bla_[a-dA-D]_bla,
-#: bla_[a-dxyz]_bla, bla_[]_bla
-#GLOB_RANGE = re.compile('[^\[\]]*\[?:((?:\S-\S)|\S)*\][^\[\]]*')
-
-
-def is_glob(filespec):
- """ determine if given file specification is a single file name or a glob
-
- python's glob and fnmatch can only interpret ?, *, [list], and [ra-nge],
- the special chars *?[-] can only be escaped using []
- --> file_name is not a glob
- --> file?name is a glob
- --> file* is a glob
- --> file[-._]name is a glob
- --> file[?]name is not a glob (matches literal "file?name")
- --> file[*]name is not a glob (matches literal "file*name")
- --> file[-]name is not a glob (matches literal "file-name")
- --> file-name is not a glob
-
- Also, obviously incorrect globs are treated as non-globs
- --> file[name is not a glob
- --> file]-[name is treated as a glob
- (it is not a valid glob but detecting errors like this requires
- sophisticated regular expression matching)
-
- Python's glob also works with globs in directory-part of path
- --> dir-part of path is analyzed just like filename-part
- --> thirdparty/*/xglob.py is a (valid) glob
- """
-
- # remove escaped special chars
- cleaned = filespec.replace('[*]', '').replace('[?]', '') \
- .replace('[[]', '').replace('[]]', '').replace('[-]', '')
-
- # check if special chars remain
- return '*' in cleaned or '?' in cleaned or \
- ('[' in cleaned and ']' in cleaned)
- #and GLOB_RANGE.match(cleaned) is not None)
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" file_helper_unittest.py: unit tests for file_helpers
-
-Tests classes and functions in file_helpers
-
-Should be able to run from python2 and python3!
-
-For help see :py:mod:`unittest`
-"""
-
-from __future__ import absolute_import
-
-import unittest
-
-from src import file_helpers
-
-from src.call_helpers import call_and_capture
-import os
-
-
-class FileHelperTester(unittest.TestCase):
-
- def test_cd(self):
- """ tests the cd context manager """
-
- test_dir = '/'
-
- start_cwd = os.getcwd()
-
- with file_helpers.cd(test_dir):
- self.assertEqual(os.getcwd(), test_dir)
- self.assertEqual(os.getcwd(), start_cwd)
-
-
- def test_obj_str(self):
- """ test FilesystemFillState.__str__ """
-
- # create dummy state
- GIGABYTE = 2**30
- state = file_helpers.FilesystemFillState()
- state.name = 'dummy'
- state.size = 10 * GIGABYTE
- state.used = 9 * GIGABYTE
- state.available = 1 * GIGABYTE
- state.capacity = 90
- state.mount_point = '/not/mounted'
-
- expect = '[Filesystem dummy mounted at /not/mounted: 9.0G/10G used]'
-
- self.assertEqual(str(state), expect)
-
-
- def test_disc_stats_df(self):
- """ tests get_filesystem_fill_states using df """
-
- stats = file_helpers.get_filesystem_fill_states(
- method=file_helpers.FS_FILL_METHOD_DF)
-
- # check number
- code, out, err = call_and_capture(file_helpers.DF_CMD)
- self.assertEqual(code, 0)
- self.assertEqual(len(err), 0)
- self.assertEqual(len(out)-1, len(stats))
-
- for stat in stats:
- # do numbers make sense?
- self.assertGreaterEqual(stat.size, 0)
- self.assertGreaterEqual(stat.used, 0)
- self.assertLessEqual(stat.used, stat.size)
- self.assertGreaterEqual(stat.available, 0)
- self.assertLessEqual(stat.available, stat.size)
- self.assertGreaterEqual(stat.capacity, 0)
- self.assertLessEqual(stat.capacity, 100)
-
- # are strings non-empty
- self.assertGreater(len(stat.name), 0)
- self.assertGreater(len(stat.mount_point), 0)
-
- # does match capacity?
- if stat.size > 0:
- capacity = 100. * stat.used / stat.size
- self.assertLess(abs(capacity - stat.capacity), 5.,
- 'capacity deviates from used/size by >5%!')
-
- # is size approx equal to used + available?
- size = stat.used + stat.available
- self.assertLess(float(abs(stat.size - size)),
- 0.1 * float(max(stat.size, size)),
- 'size deviates from used+free by more than 10%!')
-
-
- def test_compare_methods(self):
- """ compares methods to get_filesystem_fill_states
-
- turns out STATVFS method finds more filesystem thatn df lists; accept
- that.
- """
-
- statesDF = file_helpers.get_filesystem_fill_states(
- method=file_helpers.FS_FILL_METHOD_DF)
- statesNF = file_helpers.get_filesystem_fill_states(
- method=file_helpers.FS_FILL_METHOD_STATVFS)
-
- self.assertLessEqual(len(statesDF), len(statesNF))
-
- for stateDF in statesDF:
- stateNF = None
- for state in statesNF:
- if state.mount_point == stateDF.mount_point:
- stateNF = state
- break
- self.assertNotEqual(stateNF, None)
-
- # now compare the two
- self.assertEqual(stateDF.size, stateNF.size)
- self.assertLess(abs(stateDF.used - stateNF.used),
- 0.1 * float(stateDF.size))
- self.assertLess(abs(stateDF.available - stateNF.available),
- 0.1 * float(stateDF.size))
- self.assertLess(abs(stateDF.capacity - stateNF.capacity), 5)
- self.assertEqual(stateDF.name, stateNF.name)
-
-
- def test_disc_state_statvfs(self):
- for state in file_helpers.get_all_statvfs_fills():
- self.assertEqual(state.used + state.available, state.size)
- self.assertGreaterEqual(state.size, 0)
- self.assertGreaterEqual(state.used, 0)
- self.assertGreaterEqual(state.available, 0)
- self.assertLessEqual(state.used, state.size)
- self.assertLessEqual(state.available, state.size)
-
- # does match capacity? (would be very strange since capacity is
- # calculated from used and size)
- if state.size == 0:
- self.assertEqual(state.used, 0)
- self.assertEqual(state.available, 0)
- else:
- capacity = 100. * state.used / state.size
- self.assertLess(abs(capacity - state.capacity), 5.,
- 'capacity deviates from used/size by >5%!')
-
- # is size approx equal to used + available?
- size = state.used + state.available
- self.assertLessEqual(float(abs(state.size - size)),
- 0.1 * float(max(state.size, size)),
- 'size deviates from used+free by more than 10%!')
-
-
-if __name__ == '__main__':
- unittest.main()