a6c08dd9cc329114004d3ed5fa80125c1700ac6a
[pyi2ncommon] / src / arnied_wrapper.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """
22 Interaction with central arnied daemon.
23
24 All functions (except :py:func:`schedule` result in calling a binary
25 (either :py:data:`BIN_ARNIED_HELPER` or *tell-connd*).
26
27 For changes of configuration (*set_cnf*, *get_cnf*), refer to :py:mod:`pyi2ncommon.cnfvar`.
28
29 Copyright: Intra2net AG
30 """
31
32 import os
33 import time
34 import subprocess
35 from subprocess import CompletedProcess
36 import shutil
37 import tempfile
38 from shlex import quote
39 from typing import Any
40 import logging
41 log = logging.getLogger('pyi2ncommon.arnied_wrapper')
42
43
44 #: default arnied_helper binary
45 BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper"
46
47
48 def run_cmd(cmd: str = "", ignore_errors: bool = False, vm=None, timeout: int = 60) -> CompletedProcess:
49     """
50     Universal command run wrapper.
51
52     :param cmd: command to run
53     :param ignore_errors: whether not to raise error on command failure
54     :param vm: vm to run on if running on a guest instead of the host
55     :type vm: :py:class:`virttest.qemu_vm.VM` or None
56     :param timeout: amount of seconds to wait for the program to run
57     :returns: command result output where output (stdout/stderr) is bytes
58               (encoding dependent on environment and command given)
59     :raises: :py:class:`OSError` if command failed and cannot be ignored
60     """
61     if vm is not None:
62         status, stdout = vm.session.cmd_status_output(cmd, timeout=timeout)
63         stdout = stdout.encode()
64         stderr = b""
65         if status != 0:
66             stderr = stdout
67             stdout = b""
68             if not ignore_errors:
69                 raise subprocess.CalledProcessError(status, cmd, stderr=stderr)
70         return subprocess.CompletedProcess(cmd, status,
71                                            stdout=stdout, stderr=stderr)
72     else:
73         return subprocess.run(cmd, check=not ignore_errors, shell=True,
74                               capture_output=True)
75
76
77 def verify_running(process: str = "arnied", timeout: int = 60, vm=None):
78     """
79     Verify if a given process is running via 'pgrep'.
80
81     :param process: process to verify if running
82     :param timeout: run verification timeout
83     :param vm: vm to run on if running on a guest instead of the host
84     :type vm: :py:class:`virttest.qemu_vm.VM` or None
85     :raises: :py:class:`RuntimeError` if process is not running
86     """
87     platform_str = ""
88     if vm is not None:
89         vm.verify_alive()
90         platform_str = " on %s" % vm.name
91     for i in range(timeout):
92         log.debug("Checking whether %s is running%s (%i/%i)",
93                   process, platform_str, i, timeout)
94         result = run_cmd(cmd="pgrep -l -x %s" % process,
95                          ignore_errors=True, vm=vm)
96         if result.returncode == 0:
97             log.debug(result)
98             return
99         time.sleep(1)
100     raise RuntimeError("Process %s does not seem to be running" % process)
101
102
103 # Basic functionality
104
105
106 def go_online(provider_id: int, wait_online: bool = True, timeout: int = 60, vm=None):
107     """
108     Go online with the given provider id.
109
110     :param provider_id: provider to go online with
111     :param wait_online: whether to wait until online
112     :param timeout: Seconds to wait in :py:func:`wait_for_online`
113     :param vm: vm to run on if running on a guest instead of the host
114     :type vm: :py:class:`virttest.qemu_vm.VM` or None
115     :raises: :py:class:`RuntimeError` if waiting requested but failed within timeout
116
117     .. seealso:: :py:func:`go_offline`, :py:func:`wait_for_online`
118     """
119     log.info("Switching to online mode with provider %d", provider_id)
120
121     cmd = 'tell-connd --online P%i' % provider_id
122     result = run_cmd(cmd=cmd, vm=vm)
123     log.debug(result)
124
125     if wait_online:
126         wait_for_online(provider_id, timeout=timeout, vm=vm)
127
128
129 def go_offline(wait_offline: bool = True, vm=None):
130     """
131     Go offline.
132
133     :param wait_offline: whether to wait until offline
134     :param vm: vm to run on if running on a guest instead of the host
135     :type vm: :py:class:`virttest.qemu_vm.VM` or None
136     :raises: :py:class:`RuntimeError` if waiting requested but failed within timeout
137
138     .. seealso:: :py:func:`go_online`, :py:func:`wait_for_offline`
139     """
140     cmd = 'tell-connd --offline'
141     result = run_cmd(cmd=cmd, vm=vm)
142     log.debug(result)
143
144     if wait_offline:
145         if wait_offline is True:
146             wait_for_offline(vm=vm)
147         else:
148             wait_for_offline(wait_offline, vm=vm)
149
150
151 def wait_for_offline(timeout: int = 60, vm=None):
152     """
153     Wait for arnied to signal we are offline.
154
155     :param timeout: maximum timeout for waiting
156     :param vm: vm to run on if running on a guest instead of the host
157     :type vm: :py:class:`virttest.qemu_vm.VM` or None
158     :raises: :py:class:`RuntimeError` if did not go online within timeout
159     """
160     _wait_for_online_status('offline', None, timeout, vm)
161
162
163 def wait_for_online(provider_id: int, timeout: int = 60, vm=None):
164     """
165     Wait for arnied to signal we are online.
166
167     :param provider_id: provider to go online with
168     :param timeout: maximum timeout for waiting
169     :param vm: vm to run on if running on a guest instead of the host
170     :type vm: :py:class:`virttest.qemu_vm.VM` or None
171     :raises: :py:class:`RuntimeError` if did not go online within timeout
172     """
173     _wait_for_online_status('online', provider_id, timeout, vm)
174
175
176 def _wait_for_online_status(status: str, provider_id: int, timeout: int, vm):
177     # Don't use tell-connd --status here since the actual
178     # ONLINE signal to arnied is transmitted
179     # asynchronously via arnieclient_muxer.
180
181     def status_func_online():
182         go_online(provider_id, False, vm)
183
184     def status_func_offline():
185         go_offline(False, vm)
186
187     if status == 'online':
188         expected_output = 'DEFAULT: 2'
189         set_status_func = status_func_online
190     elif status == 'offline':
191         expected_output = 'DEFAULT: 0'
192         set_status_func = status_func_offline
193     else:
194         raise ValueError('expect status "online" or "offline", not "{0}"!'
195                          .format(status))
196
197     log.info("Waiting for arnied to be {0} within {1} seconds"
198              .format(status, timeout))
199
200     for i in range(timeout):
201         # arnied might invalidate the connd "connection barrier"
202         # after generate was running and switch to OFFLINE (race condition).
203         # -> tell arnied every ten seconds to go online again
204         if i % 10 == 0 and i != 0:
205             set_status_func()
206
207         cmd = '/usr/intranator/bin/get_var ONLINE'
208         result = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
209         log.debug(result)
210
211         if expected_output in result.stdout.decode():
212             log.info("arnied is {0}. Continuing.".format(status))
213             return
214
215         time.sleep(1)
216
217     raise RuntimeError("We didn't manage to go {0} within {1} seconds\n"
218                        .format(status, timeout))
219
220
221 def email_transfer(vm=None):
222     """
223     Transfer all the emails using the guest tool arnied_helper.
224
225     :param vm: vm to run on if running on a guest instead of the host
226     :type vm: :py:class:`virttest.qemu_vm.VM` or None
227     """
228     cmd = f"{BIN_ARNIED_HELPER} --transfer-mail"
229     result = run_cmd(cmd=cmd, vm=vm)
230     log.debug(result)
231
232
233 def wait_for_email_transfer(timeout: int = 300, vm=None):
234     """
235     Wait until the mail queue is empty and all emails are sent.
236
237     If the mail queue is still not empty after timeout is reached, a warning is logged and a
238     :py:class:`TimeoutError` is raised.
239
240     :param timeout: email transfer timeout
241     :param vm: vm to run on if running on a guest instead of the host
242     :type vm: :py:class:`virttest.qemu_vm.VM` or None
243     :raises TimeoutError: if mail transfer was not complete when timeout was reached
244     """
245     for i in range(timeout):
246         if i % 10 == 0:
247             # Retrigger mail queue in case something is deferred
248             # by an amavisd-new reconfiguration
249             run_cmd(cmd='postqueue -f', vm=vm)
250             log.debug('Waiting for SMTP queue to get empty (%i/%i s)',
251                       i, timeout)
252         if not run_cmd(cmd='postqueue -j', vm=vm).stdout:
253             log.debug('SMTP queue is empty')
254             return
255         time.sleep(1)
256     log.warning('Timeout reached but SMTP queue still not empty after {} s'
257                 .format(timeout))
258     raise TimeoutError()
259
260
261 def wait_for_quarantine_processing(vm_session: Any = None, max_wait: int = 30) -> bool:
262     """
263     Wait until quarantined is finished processing.
264
265     This checks quarantined's input and temp dirs and returns as soon as they are all empty or when
266     max waiting time is reached.
267
268     To be used after :py:func:`wait_for_email_transfer`.
269
270     :param vm_session: optional :py:class:`aexpect.client.ShellSession`; default: run on localhost
271     :param max_wait: maximum time in seconds to wait here
272     :returns: `True` if all quarantines have empty input/tmp dirs upon return, `False` if we
273               reached `max_time` while waiting
274     """
275     def has_files(dirname: str) -> bool:
276         # Quick abstraction to check dir on local host or in remote session
277         if vm_session is None:
278             return bool(os.listdir(dirname))
279         cmd = f"ls -UNq {quote(dirname)}"
280         status, output = vm_session.cmd_status_output(cmd)
281         if status == 0:
282             return bool(output.strip())    # False <==> empty output <==> no files
283         elif status == 2:   # dir does not exist
284             return False    # non-existent dir is empty
285         else:
286             raise RuntimeError(f"{cmd} returned {status} and output: {output}")
287
288     n_sleep = 0
289     for quarantine in ("spam", "attachment", "virus", "dmarc"):
290         for subdir in ("q-in", "q-tmp"):
291             try:
292                 full_dir = f"/datastore/quarantine/{quarantine}/{subdir}/"
293                 while has_files(full_dir):
294                     n_sleep += 1
295                     if n_sleep > max_wait:
296                         return False     # abort
297                     time.sleep(1)
298             except FileNotFoundError:    # no such directory on local host
299                 continue
300     return True
301
302
303 def schedule(program: str, exec_time: int = 0, optional_args: str = "", vm=None):
304     """
305     Schedule a program to be executed at a given unix time stamp.
306
307     :param program: program whose execution is scheduled
308     :param exec_time: scheduled time of program's execution
309     :param optional_args: optional command line arguments
310     :param vm: vm to run on if running on a guest instead of the host
311     :type vm: :py:class:`virttest.qemu_vm.VM` or None
312     """
313     log.info("Scheduling %s to be executed at %i", program, exec_time)
314     schedule_dir = "/var/intranator/schedule"
315     # clean previous schedules of the same program
316     files = vm.session.cmd("ls " + schedule_dir).split() if vm else os.listdir(schedule_dir)
317     for file_name in files:
318         if file_name.startswith(program.upper()):
319             log.debug("Removing previous scheduled %s", file_name)
320             if vm:
321                 vm.session.cmd("rm -f " + os.path.join(schedule_dir, file_name))
322             else:
323                 os.unlink(os.path.join(schedule_dir, file_name))
324
325     contents = "%i\n%s\n" % (exec_time, optional_args)
326
327     tmp_file = tempfile.NamedTemporaryFile(mode="w+",
328                                            prefix=program.upper() + "_",
329                                            delete=False)
330     log.debug("Created temporary file %s", tmp_file.name)
331     tmp_file.write(contents)
332     tmp_file.close()
333     moved_tmp_file = os.path.join(schedule_dir, os.path.basename(tmp_file.name))
334
335     if vm:
336         vm.copy_files_to(tmp_file.name, moved_tmp_file)
337         os.remove(tmp_file.name)
338     else:
339         shutil.move(tmp_file.name, moved_tmp_file)
340
341     log.debug("Moved temporary file to %s", moved_tmp_file)
342
343
344 def wait_for_run(program: str, timeout: int = 300, retries: int = 10, vm=None):
345     """
346     Wait for a program using the guest arnied_helper tool.
347
348     :param program: scheduled or running program to wait for
349     :param timeout: program run timeout
350     :param retries: number of tries to verify that the program is scheduled or running
351     :param vm: vm to run on if running on a guest instead of the host
352     :type vm: :py:class:`virttest.qemu_vm.VM` or None
353     """
354     log.info("Waiting for program %s to finish with timeout %i",
355              program, timeout)
356     for i in range(retries):
357         cmd = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running " \
358             + program.upper()
359         check_scheduled = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
360         if check_scheduled.returncode == 0:
361             break   # is scheduled or already running
362         time.sleep(1)
363     else:    # always returned 1, so neither scheduled nor running
364         log.warning("The program %s was not scheduled and is not running", program)
365         return    # no need to wait for it to finish since it's not running
366
367     # Wait for a scheduled or running program to end:
368     cmd = f"{BIN_ARNIED_HELPER} --wait-for-program-end " \
369           f"{program.upper()} --wait-for-program-timeout {timeout}"
370     # add one second to make sure arnied_helper is finished when we expire
371     result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
372     log.debug(result.stdout)
373
374
375 def wait_for_arnied(timeout: int = 60, vm=None):
376     """
377     Wait for arnied socket to be ready.
378
379     :param timeout: maximum number of seconds to wait
380     :param vm: vm to run on if running on a guest instead of the host
381     :type vm: :py:class:`virttest.qemu_vm.VM` or None
382     """
383     cmd = f"{BIN_ARNIED_HELPER} --wait-for-arnied-socket " \
384           f"--wait-for-arnied-socket-timeout {timeout}"
385     # add one second to make sure arnied_helper is finished when we expire
386     result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
387     log.debug(result.stdout)
388
389
390 # Configuration functionality
391
392
393 def wait_for_generate(timeout: int = 300, vm=None) -> bool:
394     """
395     Wait for the 'generate' program to complete.
396
397     At the end of this function call, there will be no `generate` or `generate_offline` be
398     scheduled or running, except if any of those took longer than `timeout`. Will return `False`
399     in those cases, `True` otherwise
400
401     :param timeout: max time to wait for this function to finish
402     :param vm: vm to run on if running on a guest instead of the host
403     :type vm: :py:class:`virttest.qemu_vm.VM` or None
404     :returns: True if no runs of generate are underway or scheduled, False if `timeout` was not
405     enough
406     """
407     # To avoid races (which we did encounter), do not wait_for_run("generate") and then for
408     # "generate_offline", but do both "simultaneously" here.
409     # Since generate may well cause a generate-offline to be scheduled right afterwards, check
410     # in this order
411     cmd1 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE"
412     cmd2 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE_OFFLINE"
413     end_time = time.monotonic() + timeout - 0.5
414
415     while time.monotonic() < end_time:
416         # from docu of arnied_helper:
417         # --is-scheduled-or-running PROGNAME      Exit code 0 if scheduled or running, 1 otherwise
418         if run_cmd(cmd=cmd1, ignore_errors=True, vm=vm).returncode == 1 \
419                 and run_cmd(cmd=cmd2, ignore_errors=True, vm=vm).returncode == 1:
420             # both commands succeeded and indicate that neither program is running nor scheduled
421             return True
422         log.debug("Waiting for generate to start/finish...")
423         time.sleep(1)
424     log.warning("Timeout waiting for generate to start/finish")
425     return False