1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22 Interaction with central arnied daemon.
24 All functions (except :py:func:`schedule` result in calling a binary
25 (either :py:data:`BIN_ARNIED_HELPER` or *tell-connd*).
27 For changes of configuration (*set_cnf*, *get_cnf*), refer to :py:mod:`pyi2ncommon.cnfvar`.
29 Copyright: Intra2net AG
35 from subprocess import CompletedProcess
38 from shlex import quote
39 from typing import Any
41 log = logging.getLogger('pyi2ncommon.arnied_wrapper')
44 #: default arnied_helper binary
45 BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper"
48 def run_cmd(cmd: str = "", ignore_errors: bool = False, vm=None, timeout: int = 60) -> CompletedProcess:
50 Universal command run wrapper.
52 :param cmd: command to run
53 :param ignore_errors: whether not to raise error on command failure
54 :param vm: vm to run on if running on a guest instead of the host
55 :type vm: :py:class:`virttest.qemu_vm.VM` or None
56 :param timeout: amount of seconds to wait for the program to run
57 :returns: command result output where output (stdout/stderr) is bytes
58 (encoding dependent on environment and command given)
59 :raises: :py:class:`OSError` if command failed and cannot be ignored
62 status, stdout = vm.session.cmd_status_output(cmd, timeout=timeout)
63 stdout = stdout.encode()
69 raise subprocess.CalledProcessError(status, cmd, stderr=stderr)
70 return subprocess.CompletedProcess(cmd, status,
71 stdout=stdout, stderr=stderr)
73 return subprocess.run(cmd, check=not ignore_errors, shell=True,
77 def verify_running(process: str = "arnied", timeout: int = 60, vm=None):
79 Verify if a given process is running via 'pgrep'.
81 :param process: process to verify if running
82 :param timeout: run verification timeout
83 :param vm: vm to run on if running on a guest instead of the host
84 :type vm: :py:class:`virttest.qemu_vm.VM` or None
85 :raises: :py:class:`RuntimeError` if process is not running
90 platform_str = " on %s" % vm.name
91 for i in range(timeout):
92 log.debug("Checking whether %s is running%s (%i/%i)",
93 process, platform_str, i, timeout)
94 result = run_cmd(cmd="pgrep -l -x %s" % process,
95 ignore_errors=True, vm=vm)
96 if result.returncode == 0:
100 raise RuntimeError("Process %s does not seem to be running" % process)
103 # Basic functionality
106 def go_online(provider_id: int, wait_online: bool = True, timeout: int = 60, vm=None):
108 Go online with the given provider id.
110 :param provider_id: provider to go online with
111 :param wait_online: whether to wait until online
112 :param timeout: Seconds to wait in :py:func:`wait_for_online`
113 :param vm: vm to run on if running on a guest instead of the host
114 :type vm: :py:class:`virttest.qemu_vm.VM` or None
115 :raises: :py:class:`RuntimeError` if waiting requested but failed within timeout
117 .. seealso:: :py:func:`go_offline`, :py:func:`wait_for_online`
119 log.info("Switching to online mode with provider %d", provider_id)
121 cmd = 'tell-connd --online P%i' % provider_id
122 result = run_cmd(cmd=cmd, vm=vm)
126 wait_for_online(provider_id, timeout=timeout, vm=vm)
129 def go_offline(wait_offline: bool = True, vm=None):
133 :param wait_offline: whether to wait until offline
134 :param vm: vm to run on if running on a guest instead of the host
135 :type vm: :py:class:`virttest.qemu_vm.VM` or None
136 :raises: :py:class:`RuntimeError` if waiting requested but failed within timeout
138 .. seealso:: :py:func:`go_online`, :py:func:`wait_for_offline`
140 cmd = 'tell-connd --offline'
141 result = run_cmd(cmd=cmd, vm=vm)
145 if wait_offline is True:
146 wait_for_offline(vm=vm)
148 wait_for_offline(wait_offline, vm=vm)
151 def wait_for_offline(timeout: int = 60, vm=None):
153 Wait for arnied to signal we are offline.
155 :param timeout: maximum timeout for waiting
156 :param vm: vm to run on if running on a guest instead of the host
157 :type vm: :py:class:`virttest.qemu_vm.VM` or None
158 :raises: :py:class:`RuntimeError` if did not go online within timeout
160 _wait_for_online_status('offline', None, timeout, vm)
163 def wait_for_online(provider_id: int, timeout: int = 60, vm=None):
165 Wait for arnied to signal we are online.
167 :param provider_id: provider to go online with
168 :param timeout: maximum timeout for waiting
169 :param vm: vm to run on if running on a guest instead of the host
170 :type vm: :py:class:`virttest.qemu_vm.VM` or None
171 :raises: :py:class:`RuntimeError` if did not go online within timeout
173 _wait_for_online_status('online', provider_id, timeout, vm)
176 def _wait_for_online_status(status: str, provider_id: int, timeout: int, vm):
177 # Don't use tell-connd --status here since the actual
178 # ONLINE signal to arnied is transmitted
179 # asynchronously via arnieclient_muxer.
181 def status_func_online():
182 go_online(provider_id, False, vm)
184 def status_func_offline():
185 go_offline(False, vm)
187 if status == 'online':
188 expected_output = 'DEFAULT: 2'
189 set_status_func = status_func_online
190 elif status == 'offline':
191 expected_output = 'DEFAULT: 0'
192 set_status_func = status_func_offline
194 raise ValueError('expect status "online" or "offline", not "{0}"!'
197 log.info("Waiting for arnied to be {0} within {1} seconds"
198 .format(status, timeout))
200 for i in range(timeout):
201 # arnied might invalidate the connd "connection barrier"
202 # after generate was running and switch to OFFLINE (race condition).
203 # -> tell arnied every ten seconds to go online again
204 if i % 10 == 0 and i != 0:
207 cmd = '/usr/intranator/bin/get_var ONLINE'
208 result = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
211 if expected_output in result.stdout.decode():
212 log.info("arnied is {0}. Continuing.".format(status))
217 raise RuntimeError("We didn't manage to go {0} within {1} seconds\n"
218 .format(status, timeout))
221 def email_transfer(vm=None):
223 Transfer all the emails using the guest tool arnied_helper.
225 :param vm: vm to run on if running on a guest instead of the host
226 :type vm: :py:class:`virttest.qemu_vm.VM` or None
228 cmd = f"{BIN_ARNIED_HELPER} --transfer-mail"
229 result = run_cmd(cmd=cmd, vm=vm)
233 def wait_for_email_transfer(timeout: int = 300, vm=None):
235 Wait until the mail queue is empty and all emails are sent.
237 If the mail queue is still not empty after timeout is reached, a warning is logged and a
238 :py:class:`TimeoutError` is raised.
240 :param timeout: email transfer timeout
241 :param vm: vm to run on if running on a guest instead of the host
242 :type vm: :py:class:`virttest.qemu_vm.VM` or None
243 :raises TimeoutError: if mail transfer was not complete when timeout was reached
245 for i in range(timeout):
247 # Retrigger mail queue in case something is deferred
248 # by an amavisd-new reconfiguration
249 run_cmd(cmd='postqueue -f', vm=vm)
250 log.debug('Waiting for SMTP queue to get empty (%i/%i s)',
252 if not run_cmd(cmd='postqueue -j', vm=vm).stdout:
253 log.debug('SMTP queue is empty')
256 log.warning('Timeout reached but SMTP queue still not empty after {} s'
261 def wait_for_quarantine_processing(vm_session: Any = None, max_wait: int = 30) -> bool:
263 Wait until quarantined is finished processing.
265 This checks quarantined's input and temp dirs and returns as soon as they are all empty or when
266 max waiting time is reached.
268 To be used after :py:func:`wait_for_email_transfer`.
270 :param vm_session: optional :py:class:`aexpect.client.ShellSession`; default: run on localhost
271 :param max_wait: maximum time in seconds to wait here
272 :returns: `True` if all quarantines have empty input/tmp dirs upon return, `False` if we
273 reached `max_time` while waiting
275 def has_files(dirname: str) -> bool:
276 # Quick abstraction to check dir on local host or in remote session
277 if vm_session is None:
278 return bool(os.listdir(dirname))
279 cmd = f"ls -UNq {quote(dirname)}"
280 status, output = vm_session.cmd_status_output(cmd)
282 return bool(output.strip()) # False <==> empty output <==> no files
283 elif status == 2: # dir does not exist
284 return False # non-existent dir is empty
286 raise RuntimeError(f"{cmd} returned {status} and output: {output}")
289 for quarantine in ("spam", "attachment", "virus", "dmarc"):
290 for subdir in ("q-in", "q-tmp"):
292 full_dir = f"/datastore/quarantine/{quarantine}/{subdir}/"
293 while has_files(full_dir):
295 if n_sleep > max_wait:
298 except FileNotFoundError: # no such directory on local host
303 def schedule(program: str, exec_time: int = 0, optional_args: str = "", vm=None):
305 Schedule a program to be executed at a given unix time stamp.
307 :param program: program whose execution is scheduled
308 :param exec_time: scheduled time of program's execution
309 :param optional_args: optional command line arguments
310 :param vm: vm to run on if running on a guest instead of the host
311 :type vm: :py:class:`virttest.qemu_vm.VM` or None
313 log.info("Scheduling %s to be executed at %i", program, exec_time)
314 schedule_dir = "/var/intranator/schedule"
315 # clean previous schedules of the same program
316 files = vm.session.cmd("ls " + schedule_dir).split() if vm else os.listdir(schedule_dir)
317 for file_name in files:
318 if file_name.startswith(program.upper()):
319 log.debug("Removing previous scheduled %s", file_name)
321 vm.session.cmd("rm -f " + os.path.join(schedule_dir, file_name))
323 os.unlink(os.path.join(schedule_dir, file_name))
325 contents = "%i\n%s\n" % (exec_time, optional_args)
327 tmp_file = tempfile.NamedTemporaryFile(mode="w+",
328 prefix=program.upper() + "_",
330 log.debug("Created temporary file %s", tmp_file.name)
331 tmp_file.write(contents)
333 moved_tmp_file = os.path.join(schedule_dir, os.path.basename(tmp_file.name))
336 vm.copy_files_to(tmp_file.name, moved_tmp_file)
337 os.remove(tmp_file.name)
339 shutil.move(tmp_file.name, moved_tmp_file)
341 log.debug("Moved temporary file to %s", moved_tmp_file)
344 def wait_for_run(program: str, timeout: int = 300, retries: int = 10, vm=None):
346 Wait for a program using the guest arnied_helper tool.
348 :param program: scheduled or running program to wait for
349 :param timeout: program run timeout
350 :param retries: number of tries to verify that the program is scheduled or running
351 :param vm: vm to run on if running on a guest instead of the host
352 :type vm: :py:class:`virttest.qemu_vm.VM` or None
354 log.info("Waiting for program %s to finish with timeout %i",
356 for i in range(retries):
357 cmd = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running " \
359 check_scheduled = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
360 if check_scheduled.returncode == 0:
361 break # is scheduled or already running
363 else: # always returned 1, so neither scheduled nor running
364 log.warning("The program %s was not scheduled and is not running", program)
365 return # no need to wait for it to finish since it's not running
367 # Wait for a scheduled or running program to end:
368 cmd = f"{BIN_ARNIED_HELPER} --wait-for-program-end " \
369 f"{program.upper()} --wait-for-program-timeout {timeout}"
370 # add one second to make sure arnied_helper is finished when we expire
371 result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
372 log.debug(result.stdout)
375 def wait_for_arnied(timeout: int = 60, vm=None):
377 Wait for arnied socket to be ready.
379 :param timeout: maximum number of seconds to wait
380 :param vm: vm to run on if running on a guest instead of the host
381 :type vm: :py:class:`virttest.qemu_vm.VM` or None
383 cmd = f"{BIN_ARNIED_HELPER} --wait-for-arnied-socket " \
384 f"--wait-for-arnied-socket-timeout {timeout}"
385 # add one second to make sure arnied_helper is finished when we expire
386 result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
387 log.debug(result.stdout)
390 # Configuration functionality
393 def wait_for_generate(timeout: int = 300, vm=None) -> bool:
395 Wait for the 'generate' program to complete.
397 At the end of this function call, there will be no `generate` or `generate_offline` be
398 scheduled or running, except if any of those took longer than `timeout`. Will return `False`
399 in those cases, `True` otherwise
401 :param timeout: max time to wait for this function to finish
402 :param vm: vm to run on if running on a guest instead of the host
403 :type vm: :py:class:`virttest.qemu_vm.VM` or None
404 :returns: True if no runs of generate are underway or scheduled, False if `timeout` was not
407 # To avoid races (which we did encounter), do not wait_for_run("generate") and then for
408 # "generate_offline", but do both "simultaneously" here.
409 # Since generate may well cause a generate-offline to be scheduled right afterwards, check
411 cmd1 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE"
412 cmd2 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE_OFFLINE"
413 end_time = time.monotonic() + timeout - 0.5
415 while time.monotonic() < end_time:
416 # from docu of arnied_helper:
417 # --is-scheduled-or-running PROGNAME Exit code 0 if scheduled or running, 1 otherwise
418 if run_cmd(cmd=cmd1, ignore_errors=True, vm=vm).returncode == 1 \
419 and run_cmd(cmd=cmd2, ignore_errors=True, vm=vm).returncode == 1:
420 # both commands succeeded and indicate that neither program is running nor scheduled
422 log.debug("Waiting for generate to start/finish...")
424 log.warning("Timeout waiting for generate to start/finish")