# The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Interaction with central arnied daemon. All functions (except :py:func:`schedule` result in calling a binary (either :py:data:`BIN_ARNIED_HELPER` or *tell-connd*). For changes of configuration (*set_cnf*, *get_cnf*), refer to :py:mod:`pyi2ncommon.cnfvar`. Copyright: Intra2net AG """ import os import time import subprocess import shutil import tempfile from shlex import quote from typing import Any import logging log = logging.getLogger('pyi2ncommon.arnied_wrapper') #: default arnied_helper binary BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper" def run_cmd(cmd="", ignore_errors=False, vm=None, timeout=60): """ Universal command run wrapper. :param str cmd: command to run :param bool ignore_errors: whether not to raise error on command failure :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None :param int timeout: amount of seconds to wait for the program to run :returns: command result output where output (stdout/stderr) is bytes (encoding dependent on environment and command given) :rtype: :py:class:`subprocess.CompletedProcess` :raises: :py:class:`OSError` if command failed and cannot be ignored """ if vm is not None: status, stdout = vm.session.cmd_status_output(cmd, timeout=timeout) stdout = stdout.encode() stderr = b"" if status != 0: stderr = stdout stdout = b"" if not ignore_errors: raise subprocess.CalledProcessError(status, cmd, stderr=stderr) return subprocess.CompletedProcess(cmd, status, stdout=stdout, stderr=stderr) else: return subprocess.run(cmd, check=not ignore_errors, shell=True, capture_output=True) def verify_running(process='arnied', timeout=60, vm=None): """ Verify if a given process is running via 'pgrep'. :param str process: process to verify if running :param int timeout: run verification timeout :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None :raises: :py:class:`RuntimeError` if process is not running """ platform_str = "" if vm is not None: vm.verify_alive() platform_str = " on %s" % vm.name for i in range(timeout): log.debug("Checking whether %s is running%s (%i\%i)", process, platform_str, i, timeout) result = run_cmd(cmd="pgrep -l -x %s" % process, ignore_errors=True, vm=vm) if result.returncode == 0: log.debug(result) return time.sleep(1) raise RuntimeError("Process %s does not seem to be running" % process) # Basic functionality def go_online(provider_id, wait_online=True, timeout=60, vm=None): """ Go online with the given provider id. :param provider_id: provider to go online with :type provider_id: int :param wait_online: whether to wait until online :type wait_online: bool :param int timeout: Seconds to wait in :py:func:`wait_for_online` :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None .. seealso:: :py:func:`go_offline`, :py:func:`wait_for_online` """ log.info("Switching to online mode with provider %d", provider_id) cmd = 'tell-connd --online P%i' % provider_id result = run_cmd(cmd=cmd, vm=vm) log.debug(result) if wait_online: wait_for_online(provider_id, timeout=timeout, vm=vm) def go_offline(wait_offline=True, vm=None): """ Go offline. :param wait_offline: whether to wait until offline :type wait_offline: bool :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None .. seealso:: :py:func:`go_online`, :py:func:`wait_for_offline` """ cmd = 'tell-connd --offline' result = run_cmd(cmd=cmd, vm=vm) log.debug(result) if wait_offline: if wait_offline is True: wait_for_offline(vm=vm) else: wait_for_offline(wait_offline, vm=vm) def wait_for_offline(timeout=60, vm=None): """ Wait for arnied to signal we are offline. :param int timeout: maximum timeout for waiting :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None """ _wait_for_online_status('offline', None, timeout, vm) def wait_for_online(provider_id, timeout=60, vm=None): """ Wait for arnied to signal we are online. :param provider_id: provider to go online with :type provider_id: int :param int timeout: maximum timeout for waiting :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None """ _wait_for_online_status('online', provider_id, timeout, vm) def _wait_for_online_status(status, provider_id, timeout, vm): # Don't use tell-connd --status here since the actual # ONLINE signal to arnied is transmitted # asynchronously via arnieclient_muxer. if status == 'online': expected_output = 'DEFAULT: 2' set_status_func = lambda: go_online(provider_id, False, vm) elif status == 'offline': expected_output = 'DEFAULT: 0' set_status_func = lambda: go_offline(False, vm) else: raise ValueError('expect status "online" or "offline", not "{0}"!' .format(status)) log.info("Waiting for arnied to be {0} within {1} seconds" .format(status, timeout)) for i in range(timeout): # arnied might invalidate the connd "connection barrier" # after generate was running and switch to OFFLINE (race condition). # -> tell arnied every ten seconds to go online again if i % 10 == 0 and i != 0: set_status_func() cmd = '/usr/intranator/bin/get_var ONLINE' result = run_cmd(cmd=cmd, ignore_errors=True, vm=vm) log.debug(result) if expected_output in result.stdout.decode(): log.info("arnied is {0}. Continuing.".format(status)) return time.sleep(1) raise RuntimeError("We didn't manage to go {0} within {1} seconds\n" .format(status, timeout)) def email_transfer(vm=None): """ Transfer all the emails using the guest tool arnied_helper. :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None """ cmd = f"{BIN_ARNIED_HELPER} --transfer-mail" result = run_cmd(cmd=cmd, vm=vm) log.debug(result) def wait_for_email_transfer(timeout=300, vm=None): """ Wait until the mail queue is empty and all emails are sent. If the mail queue is still not empty after timeout is reached, a warning is logged and a :py:class:`TimeoutError` is raised. :param int timeout: email transfer timeout :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None :raises TimeoutError: if mail transfer was not complete when timeout was reached """ for i in range(timeout): if i % 10 == 0: # Retrigger mail queue in case something is deferred # by an amavisd-new reconfiguration run_cmd(cmd='postqueue -f', vm=vm) log.debug('Waiting for SMTP queue to get empty (%i/%i s)', i, timeout) if not run_cmd(cmd='postqueue -j', vm=vm).stdout: log.debug('SMTP queue is empty') return time.sleep(1) log.warning('Timeout reached but SMTP queue still not empty after {} s' .format(timeout)) raise TimeoutError() def wait_for_quarantine_processing(vm_session: Any = None, max_wait: int = 30) -> bool: """ Wait until quarantined is finished processing. This checks quarantined's input and temp dirs and returns as soon as they are all empty or when max waiting time is reached. To be used after :py:func:`wait_for_email_transfer`. :param vm_session: optional :py:class:`aexpect.client.ShellSession`; default: run on localhost :param max_wait: maximum time in seconds to wait here :returns: `True` if all quarantines have empty input/tmp dirs upon return, `False` if we reached `max_time` while waiting """ def has_files(dirname: str) -> bool: # Quick abstraction to check dir on local host or in remote session if vm_session is None: return bool(os.listdir(dirname)) cmd = f"ls -UNq {quote(dirname)}" status, output = vm_session.cmd_status_output(cmd) if status == 0: return bool(output.strip()) # False <==> empty output <==> no files elif status == 2: # dir does not exist return False # non-existent dir is empty else: raise RuntimeError(f"{cmd} returned {status} and output: {output}") n_sleep = 0 for quarantine in ("spam", "attachment", "virus"): for subdir in ("q-in", "q-tmp"): try: full_dir = f"/datastore/quarantine/{quarantine}/{subdir}/" while has_files(full_dir): n_sleep += 1 if n_sleep > max_wait: return False # abort time.sleep(1) except FileNotFoundError: # no such directory on local host continue return True def schedule(program, exec_time=0, optional_args="", vm=None): """ Schedule a program to be executed at a given unix time stamp. :param str program: program whose execution is scheduled :param int exec_time: scheduled time of program's execution :param str optional_args: optional command line arguments :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None """ log.info("Scheduling %s to be executed at %i", program, exec_time) schedule_dir = "/var/intranator/schedule" # clean previous schedules of the same program files = vm.session.cmd("ls " + schedule_dir).split() if vm else os.listdir(schedule_dir) for file_name in files: if file_name.startswith(program.upper()): log.debug("Removing previous scheduled %s", file_name) if vm: vm.session.cmd("rm -f " + os.path.join(schedule_dir, file_name)) else: os.unlink(os.path.join(schedule_dir, file_name)) contents = "%i\n%s\n" % (exec_time, optional_args) tmp_file = tempfile.NamedTemporaryFile(mode="w+", prefix=program.upper() + "_", delete=False) log.debug("Created temporary file %s", tmp_file.name) tmp_file.write(contents) tmp_file.close() moved_tmp_file = os.path.join(schedule_dir, os.path.basename(tmp_file.name)) if vm: vm.copy_files_to(tmp_file.name, moved_tmp_file) os.remove(tmp_file.name) else: shutil.move(tmp_file.name, moved_tmp_file) log.debug("Moved temporary file to %s", moved_tmp_file) def wait_for_run(program, timeout=300, retries=10, vm=None): """ Wait for a program using the guest arnied_helper tool. :param str program: scheduled or running program to wait for :param int timeout: program run timeout :param int retries: number of tries to verify that the program is scheduled or running :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None """ log.info("Waiting for program %s to finish with timeout %i", program, timeout) for i in range(retries): cmd = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running " \ + program.upper() check_scheduled = run_cmd(cmd=cmd, ignore_errors=True, vm=vm) if check_scheduled.returncode == 0: break # is scheduled or already running time.sleep(1) else: # always returned 1, so neither scheduled nor running log.warning("The program %s was not scheduled and is not running", program) return # no need to wait for it to finish since it's not running # Wait for a scheduled or running program to end: cmd = f"{BIN_ARNIED_HELPER} --wait-for-program-end " \ f"{program.upper()} --wait-for-program-timeout {timeout}" # add one second to make sure arnied_helper is finished when we expire result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1) log.debug(result.stdout) def wait_for_arnied(timeout=60, vm=None): """ Wait for arnied socket to be ready. :param int timeout: maximum number of seconds to wait :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None """ cmd = f"{BIN_ARNIED_HELPER} --wait-for-arnied-socket " \ f"--wait-for-arnied-socket-timeout {timeout}" # add one second to make sure arnied_helper is finished when we expire result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1) log.debug(result.stdout) # Configuration functionality def wait_for_generate(timeout: int = 300, vm=None) -> bool: """ Wait for the 'generate' program to complete. At the end of this function call, there will be no `generate` or `generate_offline` be scheduled or running, except if any of those took longer than `timeout`. Will return `False` in those cases, `True` otherwise :param timeout: max time to wait for this function to finish :returns: True if no runs of generate are underway or scheduled, False if `timeout` was not enough """ # To avoid races (which we did encounter), do not wait_for_run("generate") and then for # "generate_offline", but do both "simultaneously" here. # Since generate may well cause a generate-offline to be scheduled right afterwards, check # in this order cmd1 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE" cmd2 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE_OFFLINE" end_time = time.monotonic() + timeout - 0.5 while run_cmd(cmd=cmd1, ignore_errors=True, vm=vm).returncode == 0 \ or run_cmd(cmd=cmd2, ignore_errors=True, vm=vm).returncode == 0: # one of them is scheduled or running, so check timeout and wait if time.monotonic() > end_time: log.warning("Timeout waiting for generate to start/finish") return False log.debug("Waiting for generate to start/finish...") time.sleep(1) return True