From: Christian Herdtweck Date: Mon, 8 Oct 2018 07:26:43 +0000 (+0200) Subject: Remove old unused file helpers X-Git-Tag: v1.3~19^2~9 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=4cc5e7d78b8c93a4ce9de6c28cd4c12fde809380;p=pyi2ncommon Remove old unused file helpers Half-baked re-implementation of psutils, no good --- diff --git a/src/file_helpers.py b/src/file_helpers.py deleted file mode 100644 index 65a8352..0000000 --- a/src/file_helpers.py +++ /dev/null @@ -1,512 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" Helper functions and classes to deal with files and dirs and stuff - -Featuring:: - -* the cd context manager pwd(); with cd(other_dir): pwd(); pwd(); - will print current working dir, then other_dir, then first dir again -* class MountPoint and functions to: -* * get all mount points -* * find mount point that contains a given path -* class FilesystemFillState and 2 methods to determine it: -* * a wrapper around "df" -* * a wrapper around statvfs (default since faster without forking) - -What I found out on the way about filesystems: -* usually size != used + available -* there are several sources that list "all" filesystems, each omiting different - things: -* * /etc/fstab --> maintained by admin and tools -* * /etc/mtab --> usually a link to /proc/mounts -* * /proc/mounts --> usually the longest -* * df --> usually the shortest of all -* with all those virtual filesystems in memory and bind-mounts, things get - complicated! -* forking for df takes time! - -.. note:: if decide to deprecate df_wrapper, can change FilesystemFillState to - have full mount point info -> remove NOT_REAL_FILESYSTEMS_SPEC - -.. todo:: check what can be replaced by module psutil -""" - -from __future__ import print_function - -from contextlib import contextmanager -import os -import os.path -from warnings import warn -from math import floor, ceil -import re - -from .call_helpers import call_and_capture -from .iter_helpers import pairwise - - -@contextmanager -def cd(path): - """A context manager which changes the working directory to the given - path, and then changes it back to its previous value on exit. - - Taken from comment for python recipe by Greg Warner at - http://code.activestate.com/recipes/576620-changedirectory-context-manager/ - (MIT license) - """ - prev_cwd = os.getcwd() - os.chdir(path) - try: - yield - finally: - os.chdir(prev_cwd) - - -DF_CMD = ['/usr/bin/df', '--no-sync', '--portability'] -DF_SIZE_UNIT = 1024 -FS_FILL_METHOD_DF = 'df' -FS_FILL_METHOD_STATVFS = 'statvfs' - -#: types of mounts found in /proc/mounts to be ignored in get_all_mounts -IGNORE_MOUNT_TYPES = 'cgroup', 'pstore' - -#: proper filesystems that usually correspond to data on the disc -#: (value for field MountPoint.vfstype) -REAL_FILESYSTEMS_TYPE = 'ext2', 'ext3', 'ext4', 'zfs', 'btrs', 'reiserfs', \ - 'nfs4' - -#: filesystem name (MountPoint.spec / FilesystemFillState.name) that usually -#: does not correspond to something on disc (except, maybe, swap) -#: (only still here because df does not give fs type result, so class -#: FilesystemFillState does not have full mount info) -NOT_REAL_FILESYSTEMS_SPEC = 'none', 'shmfs', 'procfs', 'tmpfs', 'ramfs', \ - 'proc', 'rootfs', 'sysfs', 'devpts', 'sunrpc', \ - 'nfsd' - -class FilesystemFillState: - """ representation of 1 line of the 'df' command - - has fields filesystem, size, used, available, capacity, mount_point - - Note that only apprixomately capacity == used/size - and that only approximately used + available == size - and that all sizes are in bytes - """ - - def __init__(self): - self.name = None - self.size = None - self.used = None - self.available = None - self.capacity = None - self.mount_point = None - - def __str__(self): - return '[Filesystem {0} mounted at {1}: {2}/{3} used]' \ - .format(self.name, self.mount_point, - size_str(self.used), size_str(self.size)) - - -def get_filesystem_fill_states(method=FS_FILL_METHOD_STATVFS): - """ get fill state on all filesystems - - :param method: FS_FILL_METHOD_DF (use df_wrapper, forks) or - FS_FILL_METHOD_STATVFS (uses get_all_mounts(False), default) - all other args and kwargs are forwarded - """ - - if method == FS_FILL_METHOD_DF: - return df_wrapper() - else: - return get_all_statvfs_fills(include_duplicates=False) - - -def df_wrapper(): - """ parses the output of cmd 'df' - - :returns: list of :py:class:`FilesystemFillState` - - Tries to give results compatible with those from get_filesystem_fill_states - """ - - # call - code, out, err = call_and_capture(DF_CMD) - - # warn if unexpected outcome - if code != 0: - warn('df returned non-zero exit code {0}!'.format(code)) - if err: - for line in err: - warn('df produced output to stderr: "{0}"'.format(line)) - - # find columns in output that are just spaces - min_len = min(len(line) for line in out) - separator_cols = [idx for idx in range(min_len) if \ - all(line[idx] == ' ' for line in out)] - checked_cols = [separator_cols[0], ] - for prev_col, col in pairwise(separator_cols): - if col != prev_col+1: - checked_cols.append(col) - separator_cols = checked_cols - - # check columns and their header - if len(separator_cols) != 5: - raise ValueError('unexpected number of separator columns: {0}' - .format(separator_cols)) # must eliminate neighbours? - - title_line = out[0] - title = title_line[ : separator_cols[0]].strip() - if title != 'Filesystem': - warn('Unexpected column title: "{0}" != "Filesystem"!' - .format(title)) - title = title_line[separator_cols[0] : separator_cols[1]].strip() - if title != '1024-blocks': - warn('Unexpected column title: "{0}" != "1024-blocks"!' - .format(title)) - title = title_line[separator_cols[1] : separator_cols[2]].strip() - if title != 'Used': - warn('Unexpected column title: "{0}" != "Used"!' - .format(title)) - title = title_line[separator_cols[2] : separator_cols[3]].strip() - if title != 'Available': - warn('Unexpected column title: "{0}" != "Available"!' - .format(title)) - title = title_line[separator_cols[3] : separator_cols[4]].strip() - if title != 'Capacity': - warn('Unexpected column title: "{0}" != "Capacity"!' - .format(title)) - title = title_line[separator_cols[4] : ].strip() - if title != 'Mounted on': - warn('Unexpected column title: "{0}" != "Mounted on"!' - .format(title)) - - # create result - result = [] - for line in out[1:]: - stats = FilesystemFillState() - stats.name = line[ : separator_cols[0]].strip() - stats.size = int(line[separator_cols[0] : separator_cols[1]].strip()) \ - * DF_SIZE_UNIT - stats.used = int(line[separator_cols[1] : separator_cols[2]].strip()) \ - * DF_SIZE_UNIT - stats.available = int(line[separator_cols[2] : separator_cols[3]]\ - .strip()) * DF_SIZE_UNIT - stats.capacity = int(line[separator_cols[3] : separator_cols[4]]\ - .strip()[:-1]) - stats.mount_point = line[separator_cols[4] : ].strip() - - # more checks: does match capacity - capacity = 100. * stats.used / stats.size - if abs(capacity - stats.capacity) > 5.: - warn('capacities for {0} deviate more than 5%: ' - '{1} != {2:.2f}(={3}/{4})'.format( - stats.name, stats.capacity, capacity, stats.used, stats.size)) - - size = stats.used + stats.available - if float(abs(stats.size - size)) / float(max(stats.size, size)) > 0.1: - warn('size for {0} differs by more than 10% from used+available!' - .format(stats.name)) - - result.append(stats) - - return result - - -def get_all_statvfs_fills(mounts=None, include_duplicates=True): - """ run get_fill_from_statvfs on given MountPoints or get_all_mounts """ - - if mounts is None: - mounts = get_all_mounts(include_duplicates) - return [get_fill_from_statvfs(mount) for mount in mounts] - - -def get_fill_from_statvfs(path): - """ wrapper around os.statvfs - - :param path: a MountPoint, a path string or (since python3.3) a (os-level?) - file descriptor - :returns: FilesystemFillState - - Tries to give results compatible with those from df_wrapper - - runs os.statvfs, but actually only uses f_blocks, f_frsize, f_bsize, - f_bavail of result to set values for fields size and available fields of - :py:class:`FilesystemFillState`. Fields used and capacity are calculated - from that. - - If given a path or file descriptor, field name is set to given arg path, - mount_point is not set (but can easily determined from path using - :py:func:`find_mount_point`). - - If given a MountPoint, name is set to path.spec and mount_point to - path.file - """ - - result = FilesystemFillState() - if isinstance(path, MountPoint): - stat_struct = os.statvfs(path.file) - result.name = path.spec - result.mount_point = path.file - else: - stat_struct = os.statvfs(path) - result.name = path - # result.mount_point is not set - - # fields according to "man statvfs": - # unsigned long f_bsize; /* Filesystem block size */ - # unsigned long f_frsize; /* Fragment size */ - # fsblkcnt_t f_blocks; /* Size of fs in f_frsize units */ - # fsblkcnt_t f_bfree; /* Number of free blocks */ - # fsblkcnt_t f_bavail; /* Number of free blocks for - # unprivileged users */ - # fsfilcnt_t f_files; /* Number of inodes */ - # fsfilcnt_t f_ffree; /* Number of free inodes */ - # fsfilcnt_t f_favail; /* Number of free inodes for - # unprivileged users */ - # unsigned long f_fsid; /* Filesystem ID */ - # unsigned long f_flag; /* Mount flags */ - # unsigned long f_namemax; /* Maximum filename length */ - # (f_fsid is not included in result on linux) - - result.size = stat_struct.f_blocks * stat_struct.f_frsize - result.available = stat_struct.f_bavail * stat_struct.f_bsize - result.used = result.size - result.available - if result.size == 0: - result.capacity = 0 - else: - result.capacity = float(result.used) / float(result.size) * 100. - - return result - - -class MountPoint: - """ a mount point as in /proc/mounts a.k.a /etc/fstab - - (field names taken from man fstab) - """ - def __init__(self): - self.spec = None - self.file = None - self.vfstype = None - self.options = None - self.freq = None - self.passno = None - - def __str__(self): - if self.spec is None and self.file is None and self.type is None: - return '[MountPoint uninitialized]' - else: - return '[MountPoint {0.spec} at {0.file} (type {0.vfstype})]' \ - .format(self) - -MOUNT_REGEXP = r'^\s*(?P.+)' \ - '\s+(?P.+)' \ - '\s+(?P.+)' \ - '\s+(?P.+)' \ - '\s+(?P\d+)' \ - '\s+(?P\d+)\s*$' - -def get_all_mounts(include_duplicates=True): - """ parse /proc/mounts - - does not return those with type in :py:data:IGNORE_MOUNT_TYPES - - :param bool include_duplicates: if False, try to list every "real" - filesystem only once, e.g. ignore bind - mounts, return rootfs only if no other fs - is mounted in same file; default: True - :results: list of :class:`MountPoint` - """ - result = [] - rootfs = [] - files = [] - specs = [] - with open('/proc/mounts', 'rt') as file_handle: - for line in file_handle: - parts = line.split() - matches = re.match(MOUNT_REGEXP, line) - if not matches: - raise ValueError('failed to interpret mount line "{0}"!' - .format(line)) - new_mount = MountPoint() - for field_name, value in matches.groupdict().items(): - setattr(new_mount, field_name, value) - if new_mount.vfstype in IGNORE_MOUNT_TYPES: - continue - - if not include_duplicates: - if new_mount.spec == 'rootfs': - rootfs.append(new_mount) - continue # deal with rootfs in the end - if new_mount.file in files: - warn('multiple non-rootfs mounts in same file {0}!' - .format(new_mount.file)) - if new_mount.spec in specs \ - and new_mount.vfstype in REAL_FILESYSTEMS_TYPE: - continue # e.g. bind mounts; ignore this mount - - # if we reach this, this is no duplicate; remember it - files.append(new_mount.file) - specs.append(new_mount.spec) - result.append(new_mount) - - # if not include_duplicates: - # add rootfs mounts only if no other mount is in same file - for root_mount in rootfs: - have_mount = False - for mount in result: - if mount.file == root_mount.file: - have_mount = True # some other mount in same place - break - if not have_mount: - result.append(root_mount) - - return result - - -def get_mount_info(path): - """ get MountPoint with file system info for given path """ - - mount_point = find_mount_point(path) - - candidates = [] - - with open('/proc/mounts', 'rt') as file_handle: - for line in file_handle: - parts = line.split() - matches = re.match(MOUNT_REGEXP, line) - if not matches: - raise ValueError('failed to interpret mount line "{0}"!' - .format(line)) - if matches.group('file') != mount_point: - continue - new_mount = MountPoint() - for field_name, value in matches.groupdict().items(): - setattr(new_mount, field_name, value) - candidates.append(new_mount) - - if not candidates: - raise NotImplementedError('impossible: mount point not found in ' - 'fstab!') - elif len(candidates) == 1: - return candidates[0] - - # decide which candidates to use: return first that is a "real" - # filesystem (e.g. prefer ext4 before rootfs for '/') - for candidate in candidates: - print(candidate.vfstype) - if candidate.vfstype in REAL_FILESYSTEMS_TYPE and \ - candidate.spec not in NOT_REAL_FILESYSTEMS_SPEC: - return candidate - - # otherwise just return first - return candidates[0] - - - -def find_mount_point(path): - """ repeat os.ismount of parent's parent's parent... until returns true - - taken from, answer by larsmans from Dec 15 2010 on - http://stackoverflow.com/questions/4453602/how-to-find-the-mountpoint-a-file-resides-on - """ - path = os.path.abspath(path) - while not os.path.ismount(path): - path = os.path.dirname(path) - return path - - -def size_str(byte_number, is_diff=False): - """ round byte_number to something easily human-readable like '1.5G' - - :param bool is_diff: set to True to include a '+' or '-' in output; - default: False - """ - - # constants - units = 'B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y' - factor = 1024 - thresh_add_comma = 10. # below this, return 1.2G, above this return 12G - - # prepare - if byte_number < 0: - sign_str = '-' - elif is_diff: - sign_str = '+' - else: - sign_str = '' - curr_fac = 1 - curr_num = abs(float(byte_number)) - - # loop - for unit in units: - if curr_num > factor: - curr_num /= factor - continue - elif curr_num < thresh_add_comma and unit != 'B': # e.g. 1.2G - return '{2}{0:.1f}{1}'.format(curr_num, unit, sign_str) - else: # e.g. 12G or 1B - return '{2}{0:d}{1}'.format(int(round(curr_num)), unit, sign_str) - - # have an impossible amount of data. (>1024**4 GB) - # undo last "/factor" and show thousand-separator - return '{2}{0:,d}{1}'.format(int(round(curr_num*factor)), units[-1], - sign_str) - - -#: regular expression defining a char range in glob/fnmatch: -#: matches for example: bla_[abcd]_bla, bla_[a-d]_bla, bla_[a-dA-D]_bla, -#: bla_[a-dxyz]_bla, bla_[]_bla -#GLOB_RANGE = re.compile('[^\[\]]*\[?:((?:\S-\S)|\S)*\][^\[\]]*') - - -def is_glob(filespec): - """ determine if given file specification is a single file name or a glob - - python's glob and fnmatch can only interpret ?, *, [list], and [ra-nge], - the special chars *?[-] can only be escaped using [] - --> file_name is not a glob - --> file?name is a glob - --> file* is a glob - --> file[-._]name is a glob - --> file[?]name is not a glob (matches literal "file?name") - --> file[*]name is not a glob (matches literal "file*name") - --> file[-]name is not a glob (matches literal "file-name") - --> file-name is not a glob - - Also, obviously incorrect globs are treated as non-globs - --> file[name is not a glob - --> file]-[name is treated as a glob - (it is not a valid glob but detecting errors like this requires - sophisticated regular expression matching) - - Python's glob also works with globs in directory-part of path - --> dir-part of path is analyzed just like filename-part - --> thirdparty/*/xglob.py is a (valid) glob - """ - - # remove escaped special chars - cleaned = filespec.replace('[*]', '').replace('[?]', '') \ - .replace('[[]', '').replace('[]]', '').replace('[-]', '') - - # check if special chars remain - return '*' in cleaned or '?' in cleaned or \ - ('[' in cleaned and ']' in cleaned) - #and GLOB_RANGE.match(cleaned) is not None) diff --git a/test/test_file_helpers.py b/test/test_file_helpers.py deleted file mode 100644 index e25ea4e..0000000 --- a/test/test_file_helpers.py +++ /dev/null @@ -1,170 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" file_helper_unittest.py: unit tests for file_helpers - -Tests classes and functions in file_helpers - -Should be able to run from python2 and python3! - -For help see :py:mod:`unittest` -""" - -from __future__ import absolute_import - -import unittest - -from src import file_helpers - -from src.call_helpers import call_and_capture -import os - - -class FileHelperTester(unittest.TestCase): - - def test_cd(self): - """ tests the cd context manager """ - - test_dir = '/' - - start_cwd = os.getcwd() - - with file_helpers.cd(test_dir): - self.assertEqual(os.getcwd(), test_dir) - self.assertEqual(os.getcwd(), start_cwd) - - - def test_obj_str(self): - """ test FilesystemFillState.__str__ """ - - # create dummy state - GIGABYTE = 2**30 - state = file_helpers.FilesystemFillState() - state.name = 'dummy' - state.size = 10 * GIGABYTE - state.used = 9 * GIGABYTE - state.available = 1 * GIGABYTE - state.capacity = 90 - state.mount_point = '/not/mounted' - - expect = '[Filesystem dummy mounted at /not/mounted: 9.0G/10G used]' - - self.assertEqual(str(state), expect) - - - def test_disc_stats_df(self): - """ tests get_filesystem_fill_states using df """ - - stats = file_helpers.get_filesystem_fill_states( - method=file_helpers.FS_FILL_METHOD_DF) - - # check number - code, out, err = call_and_capture(file_helpers.DF_CMD) - self.assertEqual(code, 0) - self.assertEqual(len(err), 0) - self.assertEqual(len(out)-1, len(stats)) - - for stat in stats: - # do numbers make sense? - self.assertGreaterEqual(stat.size, 0) - self.assertGreaterEqual(stat.used, 0) - self.assertLessEqual(stat.used, stat.size) - self.assertGreaterEqual(stat.available, 0) - self.assertLessEqual(stat.available, stat.size) - self.assertGreaterEqual(stat.capacity, 0) - self.assertLessEqual(stat.capacity, 100) - - # are strings non-empty - self.assertGreater(len(stat.name), 0) - self.assertGreater(len(stat.mount_point), 0) - - # does match capacity? - if stat.size > 0: - capacity = 100. * stat.used / stat.size - self.assertLess(abs(capacity - stat.capacity), 5., - 'capacity deviates from used/size by >5%!') - - # is size approx equal to used + available? - size = stat.used + stat.available - self.assertLess(float(abs(stat.size - size)), - 0.1 * float(max(stat.size, size)), - 'size deviates from used+free by more than 10%!') - - - def test_compare_methods(self): - """ compares methods to get_filesystem_fill_states - - turns out STATVFS method finds more filesystem thatn df lists; accept - that. - """ - - statesDF = file_helpers.get_filesystem_fill_states( - method=file_helpers.FS_FILL_METHOD_DF) - statesNF = file_helpers.get_filesystem_fill_states( - method=file_helpers.FS_FILL_METHOD_STATVFS) - - self.assertLessEqual(len(statesDF), len(statesNF)) - - for stateDF in statesDF: - stateNF = None - for state in statesNF: - if state.mount_point == stateDF.mount_point: - stateNF = state - break - self.assertNotEqual(stateNF, None) - - # now compare the two - self.assertEqual(stateDF.size, stateNF.size) - self.assertLess(abs(stateDF.used - stateNF.used), - 0.1 * float(stateDF.size)) - self.assertLess(abs(stateDF.available - stateNF.available), - 0.1 * float(stateDF.size)) - self.assertLess(abs(stateDF.capacity - stateNF.capacity), 5) - self.assertEqual(stateDF.name, stateNF.name) - - - def test_disc_state_statvfs(self): - for state in file_helpers.get_all_statvfs_fills(): - self.assertEqual(state.used + state.available, state.size) - self.assertGreaterEqual(state.size, 0) - self.assertGreaterEqual(state.used, 0) - self.assertGreaterEqual(state.available, 0) - self.assertLessEqual(state.used, state.size) - self.assertLessEqual(state.available, state.size) - - # does match capacity? (would be very strange since capacity is - # calculated from used and size) - if state.size == 0: - self.assertEqual(state.used, 0) - self.assertEqual(state.available, 0) - else: - capacity = 100. * state.used / state.size - self.assertLess(abs(capacity - state.capacity), 5., - 'capacity deviates from used/size by >5%!') - - # is size approx equal to used + available? - size = state.used + state.available - self.assertLessEqual(float(abs(state.size - size)), - 0.1 * float(max(state.size, size)), - 'size deviates from used+free by more than 10%!') - - -if __name__ == '__main__': - unittest.main()