will print current working dir, then other_dir, then first dir again
* a wrapper around "df" to determine size and usage of file systems
-.. todo:: change get_filesystem_fill_states to not fork
+.. todo:: test and change default method for get_filesystem_fill_states
.. codeauthor:: Intra2net
"""
from contextlib import contextmanager
import os
+import os.path
from warnings import warn
from math import floor, ceil
+import re
from call_helpers import call_and_capture
from iter_helpers import pairwise
DF_CMD = ['/usr/bin/df', '--no-sync', '--portability']
DF_SIZE_UNIT = 1024
+FS_FILL_METHOD_DF = 'df'
+FS_FILL_METHOD_STATVFS = 'statvfs'
+
+#: types of mounts found in /proc/mounts to be ignored in get_all_mounts
+IGNORE_MOUNT_TYPES = 'cgroup', 'pstore'
+
class FilesystemFillState:
""" representation of 1 line of the 'df' command
return '[Filesystem {0} mounted at {1}: {2}% used]' \
.format(self.name, self.mount_point, self.capacity)
-def get_filesystem_fill_states():
+
+def get_filesystem_fill_states(method=FS_FILL_METHOD_DF, *args, **kwargs):
""" get fill state on all filesystems
- parses the output of cmd 'df', returns list of
- :py:class:`FilesystemFillState`
+ :param method: FS_FILL_METHOD_DF (use df_wrapper, forks) or
+ FS_FILL_METHOD_STATVFS (uses get_all_statvfs)
+ all other args and kwargs are forwarded
+ """
+
+ if method == FS_FILL_METHOD_DF:
+ return df_wrapper(*args, **kwargs)
+ else:
+ return get_all_statvfs_fills(*args, **kwargs)
+
- ..todo:: replace call to df (which forks) by call to statvfs combined with
- some way to find all mounted partitions (parse /proc/mounts
- whenever it changes)
+def df_wrapper():
+ """ parses the output of cmd 'df'
+
+ :returns: list of :py:class:`FilesystemFillState`
+
+ Tries to give results compatible with those from get_filesystem_fill_states
"""
# call
result.append(stats)
return result
+
+
+def get_all_statvfs_fills(*mounts):
+ """ run get_fill_from_statvfs on given MountPoints or get_all_mounts """
+
+ if not mounts:
+ mounts = get_all_mounts()
+
+ return [get_fill_from_statvfs(mount) for mount in mounts]
+
+
+def get_fill_from_statvfs(path):
+ """ wrapper around os.statvfs
+
+ :param path: a MountPoint, a path string or (since python3.3) a (os-level?)
+ file descriptor
+ :returns: FilesystemFillState
+
+ Tries to give results compatible with those from df_wrapper
+
+ runs os.statvfs, but actually only uses f_blocks, f_frsize, f_bsize,
+ f_bavail of result to set values for fields size and available fields of
+ :py:class:`FilesystemFillState`. Fields used and capacity are calculated
+ from that.
+
+ If given a path or file descriptor, field name is set to given arg path,
+ mount_point is not set (but can easily determined from path using
+ :py:func:`find_mount_point`).
+
+ If given a MountPoint, name is set to path.spec and mount_point to
+ path.file
+ """
+
+ result = FilesystemFillState()
+ if isinstance(path, MountPoint):
+ stat_struct = os.statvfs(path.file)
+ result.name = path.spec
+ result.mount_point = path.file
+ else:
+ stat_struct = os.statvfs(path)
+ result.name = path
+ # result.mount_point is not set
+
+ # fields according to "man statvfs":
+ # unsigned long f_bsize; /* Filesystem block size */
+ # unsigned long f_frsize; /* Fragment size */
+ # fsblkcnt_t f_blocks; /* Size of fs in f_frsize units */
+ # fsblkcnt_t f_bfree; /* Number of free blocks */
+ # fsblkcnt_t f_bavail; /* Number of free blocks for
+ # unprivileged users */
+ # fsfilcnt_t f_files; /* Number of inodes */
+ # fsfilcnt_t f_ffree; /* Number of free inodes */
+ # fsfilcnt_t f_favail; /* Number of free inodes for
+ # unprivileged users */
+ # unsigned long f_fsid; /* Filesystem ID */
+ # unsigned long f_flag; /* Mount flags */
+ # unsigned long f_namemax; /* Maximum filename length */
+ # (f_fsid is not included in result on linux)
+
+ result.size = stat_struct.f_blocks * stat_struct.f_frsize
+ result.available = stat_struct.f_bavail * stat_struct.f_bsize
+ result.used = result.size - result.available
+ if result.size == 0:
+ result.capacity = 0
+ else:
+ result.capacity = float(result.used) / float(result.size) * 100.
+
+ return result
+
+
+class MountPoint:
+ """ a mount point as in /proc/mounts a.k.a /etc/fstab
+
+ (field names taken from man fstab)
+ """
+ def __init__(self):
+ self.spec = None
+ self.file = None
+ self.vfstype = None
+ self.options = None
+ self.freq = None
+ self.passno = None
+
+ def __str__(self):
+ if self.spec is None and self.file is None and self.type is None:
+ return '[MountPoint uninitialized]'
+ else:
+ return '[MountPoint {0.spec} at {0.file} (type {0.vfstype})]' \
+ .format(self)
+
+MOUNT_REGEXP = r'^\s*(?P<spec>.+)' \
+ '\s+(?P<file>.+)' \
+ '\s+(?P<vfstype>.+)' \
+ '\s+(?P<options>.+)' \
+ '\s+(?P<freq>\d+)' \
+ '\s+(?P<passno>\d+)\s*$'
+
+def get_all_mounts():
+ """ parse /proc/mounts
+
+ does not return those with type in :py:data:IGNORE_MOUNT_TYPES
+
+ :results: list of :class:`MountPoint`
+ """
+ result = []
+ with open('/proc/mounts', 'rt') as file_handle:
+ for line in file_handle:
+ parts = line.split()
+ matches = re.match(MOUNT_REGEXP, line)
+ if not matches:
+ raise ValueError('failed to interpret mount line "{0}"!'
+ .format(line))
+ new_mount = MountPoint()
+ for field_name, value in matches.groupdict().items():
+ setattr(new_mount, field_name, value)
+ if new_mount.vfstype in IGNORE_MOUNT_TYPES:
+ continue
+ result.append(new_mount)
+
+ return result
+
+
+def find_mount_point(path):
+ """ repeat os.ismount of parent's parent's parent... until returns true
+
+ taken from, answer by larsmans from Dec 15 2010 on
+ http://stackoverflow.com/questions/4453602/how-to-find-the-mountpoint-a-file-resides-on
+ """
+ path = os.path.abspath(path)
+ while not os.path.ismount(path):
+ path = os.path.dirname(path)
+ return path
self.assertEqual(str(state), expect)
- def test_disc_stats(self):
- """ tests get_filesystem_fill_states """
+ def test_disc_stats_df(self):
+ """ tests get_filesystem_fill_states using df """
stats = file_helpers.get_filesystem_fill_states()
self.assertGreater(len(stat.mount_point), 0)
# does match capacity?
- capacity = 100. * stat.used / stat.size
- self.assertLess(abs(capacity - stat.capacity), 5.,
- 'capacity deviates from used/size by >5%!')
+ if stat.size > 0:
+ capacity = 100. * stat.used / stat.size
+ self.assertLess(abs(capacity - stat.capacity), 5.,
+ 'capacity deviates from used/size by >5%!')
# is size approx equal to used + available?
size = stat.used + stat.available
self.assertLess(float(abs(stat.size - size))
- / float(max(stat.size, size)),
- 0.10,
+ , 0.1 * float(max(stat.size, size)),
'size deviates from used+free by more than 10%!')
+ def test_compare_methods(self):
+ """ compares methods to get_filesystem_fill_states
+
+ turns out df does not list everything there is...
+ """
+
+ states1 = file_helpers.get_filesystem_fill_states(
+ method=file_helpers.FS_FILL_METHOD_DF)
+ states2 = file_helpers.get_filesystem_fill_states(
+ method=file_helpers.FS_FILL_METHOD_STATVFS)
+
+ self.assertEqual(len(states1), len(states2))
+
+ for state1 in states1:
+ state2 = None
+ for state in states2:
+ if state.name == state1.name:
+ state2 = state
+ break
+ self.assertUnequal(state2, None)
+
+ # now compare the two
+ self.assertEqual(state1.size, state2.size)
+ self.assertLess(abs(state1.used - state2.used),
+ 0.1 * float(state1.size))
+ self.assertTrue(abs(state1.available - state2.available),
+ 0.1 * float(state1.size))
+ self.assertLess(abs(state1.capacity - state2.capacity), 5)
+ self.assertEqual(state1.mount_point, state2.mount_point)
+
+ def test_disc_state_statvfs(self):
+ for state in file_helpers.get_filesystem_fill_states(
+ method=file_helpers.FS_FILL_METHOD_STATVFS):
+ self.assertEqual(state.used + state.available, state.size)
+ self.assertGreaterEqual(state.size, 0)
+ self.assertGreaterEqual(state.used, 0)
+ self.assertGreaterEqual(state.available, 0)
+ self.assertLessEqual(state.used, state.size)
+ self.assertLessEqual(state.available, state.size)
+
if __name__ == '__main__':
unittest.main()