# This Python file uses the following encoding: utf-8 # The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Miscellaneous system utility: Collection of various common system stuff / idioms. Copyright: 2015 Intra2net AG The library exports the symbols below and some custom logging functions. * run_cmd_with_pipe Wrapper for the default use case of the cumbersome "subprocess" library. Accepts a list of arguments that describe the command invocation. Returns ``True`` and the contents of ``stdout`` if the pipe returned sucessfully, ``False`` plus ``stderr`` and the exit status otherwise. For example:: import sysmisc (success, output, _ret) = sysmisc.run_cmd_with_pipe([ "/usr/bin/date", "+%F" ]) if success is True: print("Today is %s" % output) else: print("Failed to read date from pipe.") * get_mountpoints_by_type Extract mount points for the given file system type from */proc/mounts*. Returns ``None`` if the file system is not mounted, a list of mount points otherwise. Raises a test error if */proc/mounts* cannot be accessed. * read_linewise Similar to run_cmd_with_pipe but allows processing of output line-by-line as it becomes available. This may be necessary when the underlying binary creates lots of output that cannot be buffered until the process finishes. Example:: import re import sysmisc def parse(line): if re.match('\\d', line): print('found digits in line!') sysmisc.read_linewise('dump_db', parse) * hash_file Return a hash of a file. * cheat_reboot Replace the reboot binary with a fake one. * cmd_block_till Run a command and wait until a condition evaluates to True. * cd A context manager that temporarily changes the current working directory The logging functions either use the format capability or play the simple role of providing shorter names. """ import re import subprocess import hashlib import os import stat import time import types import uuid from contextlib import contextmanager import enum import logging llog = logging.getLogger('pyi2ncommon.sysmisc') __all__ = ("inf", "run_cmd_with_pipe", "get_mountpoints_by_type", "read_linewise", "hash_file", "cheat_reboot", "RUN_RESULT_OK", "RUN_RESULT_TIMEDOUT", "RUN_RESULT_FAIL", "RUN_RESULT_NAME", "cmd_block_till") ############################################################################### # HELPERS ############################################################################### @contextmanager def cd(path): """ A context manager which changes the working directory. Changes current working directory to the given path, and then changes it back to its previous value on exit. Taken from comment for python recipe by Greg Warner at http://code.activestate.com/recipes/576620-changedirectory-context-manager/ (MIT license) :arg str path: path to temporarily switch to """ orig_wd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(orig_wd) def run_cmd_with_pipe(argv, inp=None): """ Read from a process pipe. :param argv: arguments to use for creating a process :type argv: [str] :param inp: Text to be piped into the program’s standard input. :type inp: str :returns: a processes' stdout along with a status info :rtype: bool * str * (int option) Executes a binary and reads its output from a pipe. Returns a triple encoding the programm success, its output either from stdout or stderr, as well the exit status if non-zero. If your process creates a lot of output, consider using :py:func:`read_linewise` instead. """ llog.debug("About to execute \"" + " ".join(argv) + "\"") stdin = None if isinstance(inp, str): stdin = subprocess.PIPE p = subprocess.Popen(argv, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if p.stdin is not None: p.stdin.write(inp.encode()) (stdout, stderr) = p.communicate() exit_status = p.wait() if exit_status != 0: return False, stderr.decode(), exit_status return True, stdout.decode(), None procmounts = "/proc/mounts" def get_mountpoints_by_type(fstype): """ Use */proc/mounts* to find filesystem mount points. Determine where some filesystem is mounted by reading the list of mountpoints from */proc/mounts*. :param str fstype: filesystem type :returns: any mountpoints found :rtype: str list option or None :raises: :py:class:`IOError` if failed to read the process mounts """ llog.debug("Determine mountpoints of %s." % fstype) mps = None try: with open(procmounts, "r") as m: lines = list(m) pat = re.compile(r"^\S+\s+(\S+)\s+" + fstype + r"\s+.*$") mps = [mp.group(1) for mp in map(lambda line: re.match(pat, line), lines) if mp] except IOError: raise IOError(f"Failed to read {procmounts}") if not mps: return None return mps def read_linewise(cmd, func, **kwargs): """ Run `cmd` using subprocess, applying `func` to each line of stdout/stderr. :param str cmd: command to read linewise :param func: function to apply on each stdout line :type func: function :param kwargs: extra arguments for the subprocess initiation :returns: the process' returncode from :py:meth:`subprocess.Popen.wait` :rtype: int Creates a subprocess.Popen object with given `cmd`, `bufsize`=1, `stdout`=PIPE, `stderr`=STDOUT, `universal_newlines`=True and the given `kwargs`. As opposed to :py:func:`run_cmd_with_pipe`, output is not gathered and returned but processed as it becomes available and then discared. This allows to process output even if there is much of it. """ proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, **kwargs) for line in proc.stdout: func(line) # (raises a ValueError in communicate() line: # "Mixing iteration and read methods would lose data") # if proc.poll() is not None: # break # rest_output,_ = proc.communicate() # for line in rest_output: # func(line) return proc.wait() def hash_file(fname, new=hashlib.sha512, bs=4096): """ Return a file hash. :param str fname: name of the file to hash :param new: constructor algorithm :type new: builtin_function_or_method :param int bs: read and write up to 'bs' bytes at a time :returns: hexadecimal digest of the strings read from the file :rtype: str """ hsh = new() with open(fname, "rb") as fp: buf = fp.read(bs) while len(buf) > 0: hsh.update(buf) buf = fp.read(bs) return hsh.hexdigest() class ServiceState(enum.Enum): """State of a system service, see `get_service_state`.""" RUNNING = 0 DEAD_WITH_PIDFILE = 1 DEAD_WITH_LOCKFILE = 2 STOPPED = 3 SERVICE_UNKNOWN = 4 OTHER = -1 def get_service_state(service_name: str) -> ServiceState: """ Get state of a system service. Calls "service {service_name} status", since that works on older and newer systemd-based installs, and tries to interpret return code as defined by the "Linux Standard Base PDA Specification 3.0RC1", Chapter 8.2 ("Init Script Actions", https://refspecs.linuxbase.org/LSB_3.0.0/LSB-PDA/LSB-PDA/iniscrptact.html ). """ code = subprocess.run(["service", service_name, "status"], capture_output=True).returncode if code == 0: return ServiceState.RUNNING elif code == 1: return ServiceState.DEAD_WITH_PIDFILE elif code == 2: return ServiceState.DEAD_WITH_LOCKFILE elif code == 3: return ServiceState.STOPPED elif code == 4: return ServiceState.SERVICE_UNKNOWN else: return ServiceState.OTHER cheat_tpl = """\ #!/bin/sh set -u path="%s" # <- location of original executable, == location of script backup="%s" # <- location of backup chksum="%s" # <- sha512(orig_reboot) log="%s" # <- stdout msg () { echo "[$(date '+%%F %%T')] $*" &>> "${log}" } msg "Fake reboot invoked. Restoring executable from ${backup}." if [ "${chksum}" = "$(sha512sum ${path} | cut -d ' ' -f 1)" ]; then msg "Real reboot executable already in place at ${path}. Aborting." exit 1 fi if [ ! -x "${backup}" ]; then msg "No backup executable at ${backup}!." exit 1 fi if [ "${chksum}" = "$(sha512sum ${backup} | cut -d ' ' -f 1)" ] then msg "Installing backup executable from ${backup} as ${path}." if ! mv -f "${backup}" "${path}"; then msg "Failed to replace ${path}." fi else msg "Checksum mismatch of ${backup}: Expected ${chksum}." exit 1 fi msg "Fake reboot successful -- next invocation will reboot indeed." """ backup_infix = "backup" backup_fmt = "%s.%s_%s" logfile = "/var/log/cheat_reboot.log" def cheat_reboot(): """ Skip one reboot. :raises: :py:class:`exceptions.OSError` if backup target already exists This replaces the ``reboot-intranator`` executable by script which replaces itself by the backed up executable upon the next invocation. """ # path = utils.system_output("which reboot") path = "/usr/intranator/bin/reboot-intranator" suffix = uuid.uuid1() backup = backup_fmt % (path, backup_infix, suffix) if os.path.exists(backup): raise OSError("Target %s already exists." % backup) hexa = hash_file(path) llog.debug("Found reboot at %s, hash %s; dst %s." % (path, hexa, backup)) subprocess.check_call(["mv", "-f", path, backup]) # backup existing binary script = cheat_tpl % (path, backup, hexa, logfile) with open(path, "w") as fp: fp.write(script) # write script content to original location mode = os.stat(path).st_mode os.chmod(path, mode | stat.S_IXUSR | stat.S_IXGRP) # ug+x RUN_RESULT_OK = 0 # → success, cond returned True within timeout RUN_RESULT_TIMEDOUT = 1 # → success, but timeout elapsed RUN_RESULT_FAIL = 2 # → fail RUN_RESULT_NAME = ( {RUN_RESULT_OK: "RUN_RESULT_OK", RUN_RESULT_TIMEDOUT: "RUN_RESULT_TIMEDOUT", RUN_RESULT_FAIL: "RUN_RESULT_FAIL"} ) def cmd_block_till(cmd, timeout, cond, interval=1, *userdata, **kwuserdata): """ Run ``cmd`` and wait until :py:func`cond` evaluates to True. :param cmd: Command line or callback to execute. Function arguments must have the same signature as :py:func:`run_cmd_with_pipe`. :type cmd: [str] | types.FunctionType :param int timeout: Blocking timeout :param cond: Function to call; code will wait for this to return something other than `False` :param interval: Time (in seconds) to sleep between each attempt at `cond` :returns: A Pair of result and error message if appropriate or `None`. :rtype: (run_result, str | None) """ llog.debug("cmd_block_till: %r, %d s, %r", cmd, timeout, cond) if isinstance(cmd, types.FunctionType): succ, out, _ = cmd() elif isinstance(cmd, list): succ, out, _ = run_cmd_with_pipe(cmd) # caution: never pass further arguments! else: raise TypeError("cmd_block_till: invalid type (cmd=%r); expected " "function or argv", cmd) if timeout < 0: return RUN_RESULT_FAIL, "cmd_block_till: invalid timeout; nonnegative " \ "integer expected" if succ is False: return RUN_RESULT_FAIL, "cmd_block_till: command %r failed (%s)" \ % (cmd, str(out)) t_0 = time.time() # brr; cf. PEP 418 as to why while cond(*userdata, **kwuserdata) is False: t_now = time.time() dt = t_now - t_0 if dt > timeout: return RUN_RESULT_TIMEDOUT, "cmd_block_till: command %r exceeded " \ "%d s timeout" % (cmd, timeout) llog.debug("cmd_block_till: condition not satisfied after %d s, " "retrying for another %d s" % (dt, timeout - dt)) time.sleep(interval) return RUN_RESULT_OK, None def replace_file_regex(edited_file, value, regex=None, ignore_fail=False): """ Replace with value in a provided file using an optional regex or entirely. :param str edited_file: file to use for the replacement :param str value: value to replace the first matched group with :param regex: more restrictive regular expression to use when replacing with value :type regex: str or None :param bool ignore_fail: whether to ignore regex mismatching :raises: :py:class:`ValueError` if (also default) `regex` doesn't have a match In order to ensure better matching capabilities you are supposed to provide a regex pattern with at least one subgroup to match your value. What this means is that the value you like to replace is not directly searched into the config text but matched within a larger regex in in order to avoid any mismatch. Example: provider.cnf, 'PROVIDER_LOCALIP,0: "(\\d+)"', 127.0.0.1 """ pattern = regex.encode() if regex else "(.+)" with open(edited_file, "rb") as file_handle: text = file_handle.read() match_line = re.search(pattern, text) if match_line is None and not ignore_fail: raise ValueError(f"Pattern {pattern} not found in {edited_file}") elif match_line is not None: old_line = match_line.group(0) text = text[:match_line.start(1)] + value.encode() + text[match_line.end(1):] line = re.search(pattern, text).group(0) llog.debug(f"Updating {old_line} to {line} in {edited_file}") with open(edited_file, "wb") as file_handle: file_handle.write(text) ############################################################################### # LOGGING ############################################################################### CURRENT_TEST_STAGE = None CURRENT_TEST_NAME = None LOG_TAG = "%s/%s" % (os.path.basename(__file__), os.uname()[1]) LOG_INDENT = " " def enter_test_stage(s): """Group events into stages for status updates.""" global CURRENT_TEST_STAGE CURRENT_TEST_STAGE = s llog.info("Transitioning to test stage %s", s) def progress(fmt, *args): """Status updates that stand out among the log noise.""" if isinstance(CURRENT_TEST_STAGE, str): label = "/%s" % CURRENT_TEST_STAGE else: label = "" name = CURRENT_TEST_NAME if isinstance(CURRENT_TEST_NAME, str) else "" fmt, label = str(fmt), str(label) # yes llog.info("[%s%s] %s" % (name, label, fmt), *args) # TODO: this method is more dynamic # llog.info("[%s%s] %s%s" % (LOG_TAG, "", LOG_INDENT*indent, fmt), *args) # these methods serve as shorter names def inf(fmt, *args): """Short name for INFO logging.""" llog.info(fmt, *args) def dbg(fmt, *args): """Short name for DEBUG logging.""" llog.debug(fmt, *args) def err(fmt, *args): """Short name for ERROR logging.""" llog.error(fmt, *args) def wrn(fmt, *args): """Short name for WARN logging.""" llog.error(fmt, *args) # these methods use the format capability def log(level, text, *args, **kwargs): """Log at any level using format capability.""" llog.log(level, text.format(*args), **kwargs) def info(text, *args, **kwargs): """Log at INFO level using format capability.""" log(logging.INFO, text, *args, **kwargs) def debug(text, *args, **kwargs): """Log at DEBUG level using format capability.""" log(logging.DEBUG, text, *args, **kwargs) def error(text, *args, **kwargs): """Log at ERROR level using format capability.""" log(logging.ERROR, text, *args, **kwargs) def warn(text, *args, **kwargs): """Log at WARN level using format capability.""" log(logging.WARN, text, *args, **kwargs)