SUMMARY
------------------------------------------------------
-Guest utility to wrap arnied related functionality through python calls.
+Interaction with central arnied daemon.
-.. note:: Partially DEPRECATED! Use :py:mod:`pyi2ncommon.arnied_api` or
- :py:mod:`pyi2ncommon.cnfvar` whenever possible. In particluar, do not
- use or extend functionality regarding configuration (`get_cnf`,
- `set_cnf`).
+All functions (except :py:func:`schedule` result in calling a binary
+(either :py:data:`BIN_ARNIED_HELPER` or *tell-connd*).
-Copyright: Intra2net AG
-
-There are three types of setting some cnfvar configuration:
+For changes of configuration (*set_cnf*, *get_cnf*), refer to :py:mod:`pyi2ncommon.cnfvar`.
-1) static (:py:class:`set_cnf`) - oldest method using a static preprocessed
- config file without modifying its content in any way
-2) semi-dynamic (:py:class:`set_cnf_semidynamic`) - old method also using
- static file but rather as a template, replacing regex-matched values to
- adapt it to different configurations
-3) dynamic (:py:class:`set_cnf_dynamic`) - new method using dictionaries
- and custom cnfvar classes and writing them into config files of a desired
- format (json, cnf, or raw)
+Copyright: Intra2net AG
INTERFACE
import logging
log = logging.getLogger('pyi2ncommon.arnied_wrapper')
-from . import sysmisc
-
-
-#: default set_cnf binary
-BIN_SET_CNF = "/usr/intranator/bin/set_cnf"
#: default arnied_helper binary
BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper"
-#: default location for template configuration files
-SRC_CONFIG_DIR = "."
-#: default location for dumped configuration files
-DUMP_CONFIG_DIR = "."
-
-
-class ConfigError(Exception):
- pass
def run_cmd(cmd="", ignore_errors=False, vm=None, timeout=60):
# Basic functionality
-def accept_licence(vm=None):
- """
- Accept the Intra2net license.
-
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
-
- This is mostly useful for simplified webpage access.
- """
- cmd = 'echo "LICENSE_ACCEPTED,0: \\"1\\"" | set_cnf'
- result = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
- log.debug(result)
- wait_for_generate(vm=vm)
-
-
def go_online(provider_id, wait_online=True, timeout=60, vm=None):
"""
Go online with the given provider id.
"""
log.info("Switching to online mode with provider %d", provider_id)
- get_cnf_res = run_cmd(cmd='get_cnf PROVIDER %d' % provider_id, vm=vm)
- if b'PROVIDER,' not in get_cnf_res.stdout:
- log.warning('There is no PROVIDER %d on the vm. Skipping go_online.',
- provider_id)
- return
-
cmd = 'tell-connd --online P%i' % provider_id
result = run_cmd(cmd=cmd, vm=vm)
log.debug(result)
.format(status, timeout))
-def disable_virscan(vm=None):
- """
- Disable virscan that could block GENERATE and thus all configurations.
-
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- """
- log.info("Disabling virus database update")
- unset_cnf("VIRSCAN_UPDATE_CRON", vm=vm)
-
- cmd = "echo 'VIRSCAN_UPDATE_DNS_PUSH,0:\"0\"' |set_cnf"
- result = run_cmd(cmd=cmd, vm=vm)
- log.debug(result)
-
- # TODO: this intervention should be solved in later arnied_helper tool
- cmd = "rm -f /var/intranator/schedule/UPDATE_VIRSCAN_NODIAL*"
- result = run_cmd(cmd=cmd, vm=vm)
- log.debug(result)
- log.info("Virus database update disabled")
-
-
def email_transfer(vm=None):
"""
Transfer all the emails using the guest tool arnied_helper.
tmp_file = tempfile.NamedTemporaryFile(mode="w+",
prefix=program.upper() + "_",
- dir=DUMP_CONFIG_DIR,
delete=False)
log.debug("Created temporary file %s", tmp_file.name)
tmp_file.write(contents)
# Configuration functionality
-def get_cnf(cnf_key, cnf_index=1, regex=".*", compact=False, timeout=30, vm=None):
- """
- Query arnied for a `cnf_key` and extract some information via regex.
-
- :param str cnf_key: queried cnf key
- :param int cnf_index: index of the cnf key
- :param str regex: regex to apply on the queried cnf key data
- :param bool compact: whether to retrieve compact version of the matched cnf keys
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :returns: extracted information via the regex
- :rtype: Match object
-
- If `cnf_index` is set to -1, retrieve and perform regex matching on all instances.
- """
- wait_for_arnied(timeout=timeout, vm=vm)
- platform_str = ""
- if vm is not None:
- platform_str = " from %s" % vm.name
- log.info("Extracting arnied value %s for %s%s using pattern %s",
- cnf_index, cnf_key, platform_str, regex)
- cmd = "get_cnf%s %s%s" % (" -c " if compact else "", cnf_key,
- " %s" % cnf_index if cnf_index != -1 else "")
- # get_cnf creates latin1-encoded output, transfer from VM removes non-ascii
- output = run_cmd(cmd=cmd, vm=vm).stdout.decode('latin1')
- return re.search(regex, output, flags=re.DOTALL)
-
-
-def get_cnf_id(cnf_key, value, timeout=30, vm=None):
- """
- Get the id of a configuration of type `cnf_key` and name `value`.
-
- :param str cnf_key: queried cnf key
- :param str value: cnf value of the cnf key
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :returns: the cnf id or -1 if no such cnf variable
- :rtype: int
- """
- wait_for_arnied(timeout=timeout, vm=vm)
- regex = "%s,(\d+): \"%s\"" % (cnf_key, value)
- cnf_id = get_cnf(cnf_key, cnf_index=-1, regex=regex, compact=True, vm=vm)
- if cnf_id is None:
- cnf_id = -1
- else:
- cnf_id = int(cnf_id.group(1))
- log.info("Retrieved id \"%s\" for %s is %i", value, cnf_key, cnf_id)
- return cnf_id
-
def wait_for_generate(timeout=300, vm=None):
"""
"""
wait_for_run('generate', timeout=timeout, retries=1, vm=vm)
wait_for_run('generate_offline', timeout=timeout, retries=1, vm=vm)
-
-
-def unset_cnf(varname="", instance="", timeout=30, vm=None):
- """
- Remove configuration from arnied.
-
- :param str varname: "varname" field of the CNF_VAR to unset
- :param int instance: "instance" of that variable to unset
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- """
- wait_for_arnied(timeout=timeout, vm=vm)
-
- cmd = "get_cnf %s %s | set_cnf -x" % (varname, instance)
- run_cmd(cmd=cmd, vm=vm)
-
- wait_for_generate(vm=vm)
-
-
-def set_cnf(config_files, kind="cnf", timeout=30, vm=None):
- """
- Perform static arnied configuration through a set of config files.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param str kind: "json" or "cnf"
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :raises: :py:class:`ConfigError` if cannot apply file
-
- The config files must be provided and are always expected to be found on
- the host. If these are absolute paths, they will be kept as is or
- otherwise will be searched for in `SRC_CONFIG_DIR`. If a vm is provided,
- the config files will be copied there as temporary files before applying.
-
- ..todo:: The static method must be deprecated after we drop and convert
- lots of use cases for it to dynamic only.
- """
- log.info("Setting arnied configuration")
- wait_for_arnied(timeout=timeout, vm=vm)
-
- config_paths = prep_config_paths(config_files)
- for config_path in config_paths:
- with open(config_path, "rt", errors='replace') as config:
- log.debug("Contents of applied %s:\n%s", config_path, config.read())
- if vm is not None:
- new_config_path = generate_config_path()
- vm.copy_files_to(config_path, new_config_path)
- config_path = new_config_path
- argv = ["set_cnf", kind == "json" and "-j" or "", config_path]
-
- result = run_cmd(" ".join(argv), ignore_errors=True, vm=vm)
- logging.debug(result)
- if result.returncode != 0:
- raise ConfigError("Failed to apply config %s%s, set_cnf returned %d"
- % (config_path,
- " on %s" % vm.name if vm is not None else "",
- result.returncode))
-
- try:
- wait_for_generate(vm=vm)
- except Exception as ex:
- # handle cases of remote configuration that leads to connection meltdown
- if vm is not None and isinstance(ex, sys.modules["aexpect"].ShellProcessTerminatedError):
- log.info("Resetting connection to %s", vm.name)
- vm.session = vm.wait_for_login(timeout=10)
- log.debug("Connection reset via remote error: %s", ex)
- else:
- raise ex
-
-
-def set_cnf_semidynamic(config_files, params_dict, regex_dict=None,
- kind="cnf", timeout=30, vm=None):
- """
- Perform semi-dynamic arnied configuration from an updated version of the
- config files.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param params_dict: parameters to override the defaults in the config files
- :type params_dict: {str, str}
- :param regex_dict: regular expressions to use for matching the overriden parameters
- :type regex_dict: {str, str} or None
- :param str kind: "json" or "cnf"
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
-
- The config files must be provided and are always expected to be found on
- the host. If these are absolute paths, they will be kept as is or
- otherwise will be searched for in `SRC_CONFIG_DIR`. If a vm is provided,
- the config files will be copied there as temporary files before applying.
-
- ..todo:: The semi-dynamic method must be deprecated after we drop and convert
- lots of use cases for it to dynamic only.
- """
- log.info("Performing semi-dynamic arnied configuration")
-
- config_paths = prep_cnf(config_files, params_dict, regex_dict)
- set_cnf(config_paths, kind=kind, timeout=timeout, vm=vm)
-
- log.info("Semi-dynamic arnied configuration successful!")
-
-
-def set_cnf_pipe(cnf, timeout=30, block=False):
- """
- Set local configuration by talking to arnied via ``set_cnf``.
-
- :param cnf: one key with the same value as *kind* and a list of cnfvars as value
- :type cnf: {str, str}
- :param int timeout: arnied run verification timeout
- :param bool block: whether to wait for generate to complete the
- configuration change
- :returns: whether ``set_cnf`` succeeded or not
- :rtype: bool
-
- This is obviously not generic but supposed to be run on the guest.
- """
- log.info("Setting arnied configuration through local pipe")
- wait_for_arnied(timeout=timeout)
-
- st, out, exit = sysmisc.run_cmd_with_pipe([BIN_SET_CNF, "-j"], inp=str(cnf))
-
- if st is False:
- log.error("Error applying configuration; status=%r" % exit)
- log.error("and stderr:\n%s" % out)
- return False
- log.debug("Configuration successfully passed to set_cnf, "
- "read %d B from pipe" % len(out))
-
- if block is True:
- log.debug("Waiting for config job to complete")
- wait_for_generate()
-
- log.debug("Exiting sucessfully")
- return True
-
-
-def prep_config_paths(config_files, config_dir=None):
- """
- Prepare absolute paths for all configs at an expected location.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param config_dir: config directory to prepend to the filepaths
- :type config_dir: str or None
- :returns: list of the full config paths
- :rtype: [str]
- """
- if config_dir is None:
- config_dir = SRC_CONFIG_DIR
- config_paths = []
- for config_file in config_files:
- if os.path.isabs(config_file):
- # Absolute path: The user requested a specific file
- # f.e. needed for dynamic arnied config update
- config_path = config_file
- else:
- config_path = os.path.join(os.path.abspath(config_dir),
- config_file)
- logging.debug("Using %s for original path %s", config_path, config_file)
- config_paths.append(config_path)
- return config_paths
-
-
-def prep_cnf_value(config_file, value,
- regex=None, template_key=None, ignore_fail=False):
- """
- Replace value in a provided arnied config file.
-
- :param str config_file: file to use for the replacement
- :param str value: value to replace the first matched group with
- :param regex: regular expression to use when replacing a cnf value
- :type regex: str or None
- :param template_key: key of a quick template to use for the regex
- :type template_key: str or None
- :param bool ignore_fail: whether to ignore regex mismatching
- :raises: :py:class:`ValueError` if (also default) `regex` doesn't have a match
-
- In order to ensure better matching capabilities you are supposed to
- provide a regex pattern with at least one subgroup to match your value.
- What this means is that the value you like to replace is not directly
- searched into the config text but matched within a larger regex in
- in order to avoid any mismatch.
-
- Example:
- provider.cnf, 'PROVIDER_LOCALIP,0: "(\d+)"', 127.0.0.1
- """
- if template_key is None:
- pattern = regex.encode()
- else:
- samples = {"provider": 'PROVIDER_LOCALIP,\d+: "(\d+\.\d+\.\d+\.\d+)"',
- "global_destination_addr": 'SPAMFILTER_GLOBAL_DESTINATION_ADDR,0: "bounce_target@(.*)"'}
- pattern = samples[template_key].encode()
-
- with open(config_file, "rb") as file_handle:
- text = file_handle.read()
- match_line = re.search(pattern, text)
-
- if match_line is None and not ignore_fail:
- raise ValueError("Pattern %s not found in %s" % (pattern, config_file))
- elif match_line is not None:
- old_line = match_line.group(0)
- text = text[:match_line.start(1)] + value.encode() + text[match_line.end(1):]
- line = re.search(pattern, text).group(0)
- log.debug("Updating %s to %s in %s", old_line, line, config_file)
- with open(config_file, "wb") as file_handle:
- file_handle.write(text)
-
-
-def prep_cnf(config_files, params_dict, regex_dict=None):
- """
- Update all config files with the default overriding parameters,
- i.e. override the values hard-coded in those config files.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param params_dict: parameters to override the defaults in the config files
- :type params_dict: {str, str}
- :param regex_dict: regular expressions to use for matching the overriden parameters
- :type regex_dict: {str, str} or None
- :returns: list of prepared (modified) config paths
- :rtype: [str]
- """
- log.info("Preparing %s template config files", len(config_files))
-
- src_config_paths = prep_config_paths(config_files)
- new_config_paths = []
- for config_path in src_config_paths:
- new_config_path = generate_config_path(dumped=True)
- shutil.copy(config_path, new_config_path)
- new_config_paths.append(new_config_path)
-
- for config_path in new_config_paths:
- for param_key in params_dict.keys():
- if regex_dict is None:
- regex_val = "\s+%s,\d+: \"(.*)\"" % param_key.upper()
- elif param_key in regex_dict.keys():
- regex_val = regex_dict[param_key] % param_key.upper()
- elif re.match("\w*_\d+$", param_key):
- final_parameter, parent_id = \
- re.match("(\w*)_(\d+)$", param_key).group(1, 2)
- regex_val = "\(%s\) %s,\d+: \"(.*)\"" \
- % (parent_id, final_parameter.upper())
- log.debug("Requested regex for %s is '%s'",
- param_key, regex_val)
- else:
- regex_val = "\s+%s,\d+: \"(.*)\"" % param_key.upper()
- prep_cnf_value(config_path, params_dict[param_key],
- regex=regex_val, ignore_fail=True)
- log.info("Prepared template config file %s", config_path)
-
- return new_config_paths
-
-
-def generate_config_path(dumped=False):
- """
- Generate path for a temporary config name.
-
- :param bool dumped: whether the file should be in the dump
- directory or in temporary directory
- :returns: generated config file path
- :rtype: str
- """
- dir = os.path.abspath(DUMP_CONFIG_DIR) if dumped else None
- fd, filename = tempfile.mkstemp(suffix=".cnf", dir=dir)
- os.close(fd)
- os.unlink(filename)
- return filename
# whether to return 1 as a fail indicator
fail_switch = False
# mapping between expected commands and their mocked output + return code
- cmds = [
- {"cmd": "pgrep -l -x arnied", "stdout": b"", "returncode": 0},
-
- {"cmd": 'echo "LICENSE_ACCEPTED,0: \\"1\\"" | set_cnf', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-program-end GENERATE', "stdout": b"", "returncode": 0},
-
- {"cmd": 'get_cnf PROVIDER 1', "stdout": b"1 PROVIDER,1: \"sample-provider\"", "returncode": 0},
- {"cmd": 'tell-connd --online P1', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/get_var ONLINE', "stdout": b"DEFAULT: 2", "returncode": 0},
-
- {"cmd": "pgrep -l -x arnied", "stdout": b"", "returncode": 0},
- {"cmd": 'get_cnf VIRSCAN_UPDATE_CRON | set_cnf -x', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/arnied_helper --is-scheduled-or-running GENERATE', "stdout": b"", "returncode": 1},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-program-end GENERATE --wait-for-program-timeout 300', "stdout": b"", "returncode": 1},
- {"cmd": '/usr/intranator/bin/arnied_helper --is-scheduled-or-running GENERATE_OFFLINE', "stdout": b"", "returncode": 1},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-program-end GENERATE_OFFLINE --wait-for-program-timeout 300', "stdout": b"", "returncode": 1},
- {"cmd": 'echo \'VIRSCAN_UPDATE_DNS_PUSH,0:"0"\' |set_cnf', "stdout": b"", "returncode": 0},
- {"cmd": 'rm -f /var/intranator/schedule/UPDATE_VIRSCAN_NODIAL*', "stdout": b"", "returncode": 0},
-
- {"cmd": '/usr/intranator/bin/arnied_helper --transfer-mail', "stdout": b"", "returncode": 0},
-
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-arnied-socket --wait-for-arnied-socket-timeout 10', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-arnied-socket --wait-for-arnied-socket-timeout 30', "stdout": b"", "returncode": 0},
- ]
asserted_cmds = []
def __init__(self, cmd="", ignore_errors=False, vm=None, timeout=60):
def setUp(self):
DummyCmdOutputMapping.fail_switch = False
DummyCmdOutputMapping.asserted_cmds = []
- self.cmd_db = DummyCmdOutputMapping.cmds
def test_verify_running(self):
"""Test checking for running programs."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[0:1]
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": "pgrep -l -x arnied", "stdout": b"", "returncode": 0}]
arnied_wrapper.verify_running(timeout=1)
DummyCmdOutputMapping.fail_switch = True
with self.assertRaises(RuntimeError):
def test_wait_for_arnied(self):
"""Test waiting for arnied to be ready."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": "/usr/intranator/bin/arnied_helper --wait-for-arnied-socket --wait-for-arnied-socket-timeout 10", "stdout": b"", "returncode": 0}]
arnied_wrapper.wait_for_arnied(timeout=10)
- def test_accept_license(self):
- """Test accepting license."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[1:3] + self.cmd_db[8:11]
- arnied_wrapper.accept_licence()
- # make sure an error is ignored since license might
- # already be accepted
- DummyCmdOutputMapping.fail_switch = True
- arnied_wrapper.accept_licence()
-
def test_go_online(self):
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[3:6]
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": 'tell-connd --online P1', "stdout": b"", "returncode": 0},
+ {"cmd": '/usr/intranator/bin/get_var ONLINE', "stdout": b"DEFAULT: 2", "returncode": 0}]
arnied_wrapper.go_online(1)
- def test_disable_virscan(self):
- """Test disabling the virus scanner."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[6:]
- arnied_wrapper.disable_virscan()
-
def test_email_transfer(self):
"""Test e-mail transferring."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[14:15]
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": '/usr/intranator/bin/arnied_helper --transfer-mail', "stdout": b"", "returncode": 0}]
arnied_wrapper.email_transfer()