Increase version to 1.7.4
[pyi2ncommon] / src / arnied_wrapper.py
... / ...
CommitLineData
1# The software in this package is distributed under the GNU General
2# Public License version 2 (with a special exception described below).
3#
4# A copy of GNU General Public License (GPL) is included in this distribution,
5# in the file COPYING.GPL.
6#
7# As a special exception, if other files instantiate templates or use macros
8# or inline functions from this file, or you compile this file and link it
9# with other works to produce a work based on this file, this file
10# does not by itself cause the resulting work to be covered
11# by the GNU General Public License.
12#
13# However the source code for this file must still be made available
14# in accordance with section (3) of the GNU General Public License.
15#
16# This exception does not invalidate any other reasons why a work based
17# on this file might be covered by the GNU General Public License.
18#
19# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21"""
22Interaction with central arnied daemon.
23
24All functions (except :py:func:`schedule` result in calling a binary
25(either :py:data:`BIN_ARNIED_HELPER` or *tell-connd*).
26
27For changes of configuration (*set_cnf*, *get_cnf*), refer to :py:mod:`pyi2ncommon.cnfvar`.
28
29Copyright: Intra2net AG
30"""
31
32import os
33import time
34import subprocess
35from subprocess import CompletedProcess
36import shutil
37import tempfile
38from shlex import quote
39from typing import Any
40import logging
41log = logging.getLogger('pyi2ncommon.arnied_wrapper')
42
43
44#: default arnied_helper binary
45BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper"
46
47
48def run_cmd(cmd: str = "", ignore_errors: bool = False, vm=None, timeout: int = 60) -> CompletedProcess:
49 """
50 Universal command run wrapper.
51
52 :param cmd: command to run
53 :param ignore_errors: whether not to raise error on command failure
54 :param vm: vm to run on if running on a guest instead of the host
55 :type vm: :py:class:`virttest.qemu_vm.VM` or None
56 :param timeout: amount of seconds to wait for the program to run
57 :returns: command result output where output (stdout/stderr) is bytes
58 (encoding dependent on environment and command given)
59 :raises: :py:class:`OSError` if command failed and cannot be ignored
60 """
61 if vm is not None:
62 status, stdout = vm.session.cmd_status_output(cmd, timeout=timeout)
63 stdout = stdout.encode()
64 stderr = b""
65 if status != 0:
66 stderr = stdout
67 stdout = b""
68 if not ignore_errors:
69 raise subprocess.CalledProcessError(status, cmd, stderr=stderr)
70 return subprocess.CompletedProcess(cmd, status,
71 stdout=stdout, stderr=stderr)
72 else:
73 return subprocess.run(cmd, check=not ignore_errors, shell=True,
74 capture_output=True)
75
76
77def verify_running(process: str = "arnied", timeout: int = 60, vm=None):
78 """
79 Verify if a given process is running via 'pgrep'.
80
81 :param process: process to verify if running
82 :param timeout: run verification timeout
83 :param vm: vm to run on if running on a guest instead of the host
84 :type vm: :py:class:`virttest.qemu_vm.VM` or None
85 :raises: :py:class:`RuntimeError` if process is not running
86 """
87 platform_str = ""
88 if vm is not None:
89 vm.verify_alive()
90 platform_str = " on %s" % vm.name
91 for i in range(timeout):
92 log.debug("Checking whether %s is running%s (%i/%i)",
93 process, platform_str, i, timeout)
94 result = run_cmd(cmd="pgrep -l -x %s" % process,
95 ignore_errors=True, vm=vm)
96 if result.returncode == 0:
97 log.debug(result)
98 return
99 time.sleep(1)
100 raise RuntimeError("Process %s does not seem to be running" % process)
101
102
103# Basic functionality
104
105
106def go_online(provider_id: int, wait_online: bool = True, timeout: int = 60, vm=None):
107 """
108 Go online with the given provider id.
109
110 :param provider_id: provider to go online with
111 :param wait_online: whether to wait until online
112 :param timeout: Seconds to wait in :py:func:`wait_for_online`
113 :param vm: vm to run on if running on a guest instead of the host
114 :type vm: :py:class:`virttest.qemu_vm.VM` or None
115 :raises: :py:class:`RuntimeError` if waiting requested but failed within timeout
116
117 .. seealso:: :py:func:`go_offline`, :py:func:`wait_for_online`
118 """
119 log.info("Switching to online mode with provider %d", provider_id)
120
121 cmd = 'tell-connd --online P%i' % provider_id
122 result = run_cmd(cmd=cmd, vm=vm)
123 log.debug(result)
124
125 if wait_online:
126 wait_for_online(provider_id, timeout=timeout, vm=vm)
127
128
129def go_offline(wait_offline: bool = True, vm=None):
130 """
131 Go offline.
132
133 :param wait_offline: whether to wait until offline
134 :param vm: vm to run on if running on a guest instead of the host
135 :type vm: :py:class:`virttest.qemu_vm.VM` or None
136 :raises: :py:class:`RuntimeError` if waiting requested but failed within timeout
137
138 .. seealso:: :py:func:`go_online`, :py:func:`wait_for_offline`
139 """
140 cmd = 'tell-connd --offline'
141 result = run_cmd(cmd=cmd, vm=vm)
142 log.debug(result)
143
144 if wait_offline:
145 if wait_offline is True:
146 wait_for_offline(vm=vm)
147 else:
148 wait_for_offline(wait_offline, vm=vm)
149
150
151def wait_for_offline(timeout: int = 60, vm=None):
152 """
153 Wait for arnied to signal we are offline.
154
155 :param timeout: maximum timeout for waiting
156 :param vm: vm to run on if running on a guest instead of the host
157 :type vm: :py:class:`virttest.qemu_vm.VM` or None
158 :raises: :py:class:`RuntimeError` if did not go online within timeout
159 """
160 _wait_for_online_status('offline', None, timeout, vm)
161
162
163def wait_for_online(provider_id: int, timeout: int = 60, vm=None):
164 """
165 Wait for arnied to signal we are online.
166
167 :param provider_id: provider to go online with
168 :param timeout: maximum timeout for waiting
169 :param vm: vm to run on if running on a guest instead of the host
170 :type vm: :py:class:`virttest.qemu_vm.VM` or None
171 :raises: :py:class:`RuntimeError` if did not go online within timeout
172 """
173 _wait_for_online_status('online', provider_id, timeout, vm)
174
175
176def _wait_for_online_status(status: str, provider_id: int, timeout: int, vm):
177 # Don't use tell-connd --status here since the actual
178 # ONLINE signal to arnied is transmitted
179 # asynchronously via arnieclient_muxer.
180
181 def status_func_online():
182 go_online(provider_id, False, vm)
183
184 def status_func_offline():
185 go_offline(False, vm)
186
187 if status == 'online':
188 expected_output = 'DEFAULT: 2'
189 set_status_func = status_func_online
190 elif status == 'offline':
191 expected_output = 'DEFAULT: 0'
192 set_status_func = status_func_offline
193 else:
194 raise ValueError('expect status "online" or "offline", not "{0}"!'
195 .format(status))
196
197 log.info("Waiting for arnied to be {0} within {1} seconds"
198 .format(status, timeout))
199
200 for i in range(timeout):
201 # arnied might invalidate the connd "connection barrier"
202 # after generate was running and switch to OFFLINE (race condition).
203 # -> tell arnied every ten seconds to go online again
204 if i % 10 == 0 and i != 0:
205 set_status_func()
206
207 cmd = '/usr/intranator/bin/get_var ONLINE'
208 result = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
209 log.debug(result)
210
211 if expected_output in result.stdout.decode():
212 log.info("arnied is {0}. Continuing.".format(status))
213 return
214
215 time.sleep(1)
216
217 raise RuntimeError("We didn't manage to go {0} within {1} seconds\n"
218 .format(status, timeout))
219
220
221def email_transfer(vm=None):
222 """
223 Transfer all the emails using the guest tool arnied_helper.
224
225 :param vm: vm to run on if running on a guest instead of the host
226 :type vm: :py:class:`virttest.qemu_vm.VM` or None
227 """
228 cmd = f"{BIN_ARNIED_HELPER} --transfer-mail"
229 result = run_cmd(cmd=cmd, vm=vm)
230 log.debug(result)
231
232
233def wait_for_email_transfer(timeout: int = 300, vm=None):
234 """
235 Wait until the mail queue is empty and all emails are sent.
236
237 If the mail queue is still not empty after timeout is reached, a warning is logged and a
238 :py:class:`TimeoutError` is raised.
239
240 :param timeout: email transfer timeout
241 :param vm: vm to run on if running on a guest instead of the host
242 :type vm: :py:class:`virttest.qemu_vm.VM` or None
243 :raises TimeoutError: if mail transfer was not complete when timeout was reached
244 """
245 for i in range(timeout):
246 if i % 10 == 0:
247 # Retrigger mail queue in case something is deferred
248 # by an amavisd-new reconfiguration
249 run_cmd(cmd='postqueue -f', vm=vm)
250 log.debug('Waiting for SMTP queue to get empty (%i/%i s)',
251 i, timeout)
252 if not run_cmd(cmd='postqueue -j', vm=vm).stdout:
253 log.debug('SMTP queue is empty')
254 return
255 time.sleep(1)
256 log.warning('Timeout reached but SMTP queue still not empty after {} s'
257 .format(timeout))
258 raise TimeoutError()
259
260
261def wait_for_quarantine_processing(vm_session: Any = None, max_wait: int = 30) -> bool:
262 """
263 Wait until quarantined is finished processing.
264
265 This checks quarantined's input and temp dirs and returns as soon as they are all empty or when
266 max waiting time is reached.
267
268 To be used after :py:func:`wait_for_email_transfer`.
269
270 :param vm_session: optional :py:class:`aexpect.client.ShellSession`; default: run on localhost
271 :param max_wait: maximum time in seconds to wait here
272 :returns: `True` if all quarantines have empty input/tmp dirs upon return, `False` if we
273 reached `max_time` while waiting
274 """
275 def has_files(dirname: str) -> bool:
276 # Quick abstraction to check dir on local host or in remote session
277 if vm_session is None:
278 return bool(os.listdir(dirname))
279 cmd = f"ls -UNq {quote(dirname)}"
280 status, output = vm_session.cmd_status_output(cmd)
281 if status == 0:
282 return bool(output.strip()) # False <==> empty output <==> no files
283 elif status == 2: # dir does not exist
284 return False # non-existent dir is empty
285 else:
286 raise RuntimeError(f"{cmd} returned {status} and output: {output}")
287
288 n_sleep = 0
289 for quarantine in ("spam", "attachment", "virus", "dmarc"):
290 for subdir in ("q-in", "q-tmp"):
291 try:
292 full_dir = f"/datastore/quarantine/{quarantine}/{subdir}/"
293 while has_files(full_dir):
294 n_sleep += 1
295 if n_sleep > max_wait:
296 return False # abort
297 time.sleep(1)
298 except FileNotFoundError: # no such directory on local host
299 continue
300 return True
301
302
303def schedule(program: str, exec_time: int = 0, optional_args: str = "", vm=None):
304 """
305 Schedule a program to be executed at a given unix time stamp.
306
307 :param program: program whose execution is scheduled
308 :param exec_time: scheduled time of program's execution
309 :param optional_args: optional command line arguments
310 :param vm: vm to run on if running on a guest instead of the host
311 :type vm: :py:class:`virttest.qemu_vm.VM` or None
312 """
313 log.info("Scheduling %s to be executed at %i", program, exec_time)
314 schedule_dir = "/var/intranator/schedule"
315 # clean previous schedules of the same program
316 files = vm.session.cmd("ls " + schedule_dir).split() if vm else os.listdir(schedule_dir)
317 for file_name in files:
318 if file_name.startswith(program.upper()):
319 log.debug("Removing previous scheduled %s", file_name)
320 if vm:
321 vm.session.cmd("rm -f " + os.path.join(schedule_dir, file_name))
322 else:
323 os.unlink(os.path.join(schedule_dir, file_name))
324
325 contents = "%i\n%s\n" % (exec_time, optional_args)
326
327 tmp_file = tempfile.NamedTemporaryFile(mode="w+",
328 prefix=program.upper() + "_",
329 delete=False)
330 log.debug("Created temporary file %s", tmp_file.name)
331 tmp_file.write(contents)
332 tmp_file.close()
333 moved_tmp_file = os.path.join(schedule_dir, os.path.basename(tmp_file.name))
334
335 if vm:
336 vm.copy_files_to(tmp_file.name, moved_tmp_file)
337 os.remove(tmp_file.name)
338 else:
339 shutil.move(tmp_file.name, moved_tmp_file)
340
341 log.debug("Moved temporary file to %s", moved_tmp_file)
342
343
344def wait_for_run(program: str, timeout: int = 300, retries: int = 10, vm=None):
345 """
346 Wait for a program using the guest arnied_helper tool.
347
348 :param program: scheduled or running program to wait for
349 :param timeout: program run timeout
350 :param retries: number of tries to verify that the program is scheduled or running
351 :param vm: vm to run on if running on a guest instead of the host
352 :type vm: :py:class:`virttest.qemu_vm.VM` or None
353 """
354 log.info("Waiting for program %s to finish with timeout %i",
355 program, timeout)
356 for i in range(retries):
357 cmd = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running " \
358 + program.upper()
359 check_scheduled = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
360 if check_scheduled.returncode == 0:
361 break # is scheduled or already running
362 time.sleep(1)
363 else: # always returned 1, so neither scheduled nor running
364 log.warning("The program %s was not scheduled and is not running", program)
365 return # no need to wait for it to finish since it's not running
366
367 # Wait for a scheduled or running program to end:
368 cmd = f"{BIN_ARNIED_HELPER} --wait-for-program-end " \
369 f"{program.upper()} --wait-for-program-timeout {timeout}"
370 # add one second to make sure arnied_helper is finished when we expire
371 result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
372 log.debug(result.stdout)
373
374
375def wait_for_arnied(timeout: int = 60, vm=None):
376 """
377 Wait for arnied socket to be ready.
378
379 :param timeout: maximum number of seconds to wait
380 :param vm: vm to run on if running on a guest instead of the host
381 :type vm: :py:class:`virttest.qemu_vm.VM` or None
382 """
383 cmd = f"{BIN_ARNIED_HELPER} --wait-for-arnied-socket " \
384 f"--wait-for-arnied-socket-timeout {timeout}"
385 # add one second to make sure arnied_helper is finished when we expire
386 result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
387 log.debug(result.stdout)
388
389
390# Configuration functionality
391
392
393def wait_for_generate(timeout: int = 300, vm=None) -> bool:
394 """
395 Wait for the 'generate' program to complete.
396
397 At the end of this function call, there will be no `generate` or `generate_offline` be
398 scheduled or running, except if any of those took longer than `timeout`. Will return `False`
399 in those cases, `True` otherwise
400
401 :param timeout: max time to wait for this function to finish
402 :param vm: vm to run on if running on a guest instead of the host
403 :type vm: :py:class:`virttest.qemu_vm.VM` or None
404 :returns: True if no runs of generate are underway or scheduled, False if `timeout` was not
405 enough
406 """
407 # To avoid races (which we did encounter), do not wait_for_run("generate") and then for
408 # "generate_offline", but do both "simultaneously" here.
409 # Since generate may well cause a generate-offline to be scheduled right afterwards, check
410 # in this order
411 cmd1 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE"
412 cmd2 = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running GENERATE_OFFLINE"
413 end_time = time.monotonic() + timeout - 0.5
414
415 while time.monotonic() < end_time:
416 # from docu of arnied_helper:
417 # --is-scheduled-or-running PROGNAME Exit code 0 if scheduled or running, 1 otherwise
418 if run_cmd(cmd=cmd1, ignore_errors=True, vm=vm).returncode == 1 \
419 and run_cmd(cmd=cmd2, ignore_errors=True, vm=vm).returncode == 1:
420 # both commands succeeded and indicate that neither program is running nor scheduled
421 return True
422 log.debug("Waiting for generate to start/finish...")
423 time.sleep(1)
424 log.warning("Timeout waiting for generate to start/finish")
425 return False