from dataclasses import dataclass
from types import SimpleNamespace
from enum import Enum, auto
+import time
import typing
import json
import sys
def barrier_executed(cls, barrier_nr: int) -> None:
with cls.new_connection() as conn:
return conn.BarrierExecuted(barrier_nr)
+
+ @classmethod
+ def wait_for_program(cls, progname, timeout=60):
+ log.debug(f"Waiting for `{progname}` to be running")
+
+ def scheduled_or_running(progname):
+ ret = cls.is_scheduled_or_running(progname)
+ return ret.status in [ProgramStatus.Scheduled, ProgramStatus.Running]
+
+ for _ in range(10):
+ if scheduled_or_running(progname):
+ # if is running or scheduled, break to wait for completion
+ break
+ time.sleep(1)
+ else:
+ # after trying and retrying, program is not scheduled nor
+ # running, so it is safe to assume it has already executed
+ return
+
+ # program running or scheduled, wait
+ log.debug(f"Waiting for `{progname}` to finish")
+ for _ in range(0, timeout):
+ if not scheduled_or_running(progname):
+ # finished executing, bail out
+ return
+ time.sleep(1)
+ raise TimeoutError(f"Program `{progname}` did not end in time")
+
+ @classmethod
+ def wait_for_generate(cls, timeout=300):
+ cls.wait_for_program("GENERATE", timeout=timeout)
+ cls.wait_for_program("GENERATE_OFFLINE", timeout=timeout)
log.debug("Moved temporary file to %s", moved_tmp_file)
-def wait_for_run(program, timeout=300, retries=10, vm=None):
- """
- Wait for a program using the guest arnied_helper tool.
-
- :param str program: scheduled or running program to wait for
- :param int timeout: program run timeout
- :param int retries: number of tries to verify that the program is scheduled or running
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- """
- log.info("Waiting for program %s to finish with timeout %i",
- program, timeout)
- for i in range(retries):
- cmd = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running " \
- + program.upper()
- check_scheduled = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
- if check_scheduled.returncode == 0:
- break
- time.sleep(1)
- else:
- log.warning("The program %s was not scheduled and is not running", program)
- return
- cmd = f"{BIN_ARNIED_HELPER} --wait-for-program-end " \
- f"{program.upper()} --wait-for-program-timeout {timeout}"
- # add one second to make sure arnied_helper is finished when we expire
- result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
- log.debug(result.stdout)
-
-
def wait_for_arnied(timeout=60, vm=None):
"""
Wait for arnied socket to be ready.
# add one second to make sure arnied_helper is finished when we expire
result = run_cmd(cmd=cmd, vm=vm, timeout=timeout+1)
log.debug(result.stdout)
-
-
-# Configuration functionality
-
-
-def wait_for_generate(timeout=300, vm=None):
- """
- Wait for the 'generate' program to complete.
-
- Arguments are similar to the ones from :py:func:`wait_for_run`.
- """
- wait_for_run('generate', timeout=timeout, retries=1, vm=vm)
- wait_for_run('generate_offline', timeout=timeout, retries=1, vm=vm)
BIN_SET_CNF = "/usr/intranator/bin/set_cnf"
#: encoding used by the get_cnf and set_cnf binaries
ENCODING = "latin1"
+#: default arnied_helper binary
+BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper"
class CnfBinary:
errors = list(get_error_lines(lines[1:]))
return "\n".join(errors)
+
+ @classmethod
+ def wait_for_run(cls, program, timeout=300, retries=10):
+ """
+ Wait for a program using the guest arnied_helper tool.
+
+ :param str program: scheduled or running program to wait for
+ :param int timeout: program run timeout
+ :param int retries: number of tries to verify that the program is scheduled or running
+
+ ..todo:: These go beyond the regular cnfvar but as we are planning to deprecate
+ the binary interface we don't need to provide a generic binary Arnied API.
+ """
+ log.info("Waiting for program %s to finish with timeout %i",
+ program, timeout)
+ for i in range(retries):
+ cmd = f"{BIN_ARNIED_HELPER} --is-scheduled-or-running " \
+ + program.upper()
+ check_scheduled = cls.run_cmd(cmd=cmd, ignore_errors=True)
+ if check_scheduled.returncode == 0:
+ break
+ time.sleep(1)
+ else:
+ log.warning("The program %s was not scheduled and is not running", program)
+ return
+ cmd = f"{BIN_ARNIED_HELPER} --wait-for-program-end " \
+ f"{program.upper()} --wait-for-program-timeout {timeout}"
+ # add one second to make sure arnied_helper is finished when we expire
+ result = cls.run_cmd(cmd=cmd, timeout=timeout+1)
+ log.debug(result.stdout)
+
+ @classmethod
+ def wait_for_generate(cls, timeout=300):
+ """
+ Wait for the 'generate' program to complete.
+
+ Arguments are similar to the ones from :py:method:`wait_for_run`.
+ """
+ cls.wait_for_run('generate', timeout=timeout, retries=1)
+ cls.wait_for_run('generate_offline', timeout=timeout, retries=1)
log.debug("Error sending variables:\n%s", arnied_cnfs)
raise CommitException(original_cnfs, "\n".join(errors))
- self._wait_for_generate()
-
- def _wait_for_generate(self, timeout=300):
- """
- Wait for the 'generate' program to end.
-
- :param int timeout: program run timeout
- :raises: :py:class:`TimeoutError` if the program did not finish on time
- """
- def scheduled_or_running(progname):
- ret = self._driver.is_scheduled_or_running(progname)
- return ret.status in [arnied_api.ProgramStatus.Scheduled,
- arnied_api.ProgramStatus.Running]
-
- def wait_for_program(progname):
- log.debug("Waiting for `%s` to be running", progname)
- for _ in range(10):
- if scheduled_or_running(progname):
- # if is running or scheduled, break to wait for completion
- break
- time.sleep(1)
- else:
- # after trying and retrying, program is not scheduled nor
- # running, so it is safe to assume it has already executed
- return
-
- # program running or scheduled, wait
- log.debug("Waiting for `%s` to finish", progname)
- for _ in range(0, timeout):
- if not scheduled_or_running(progname):
- # finished executing, bail out
- return
- time.sleep(1)
- raise TimeoutError(f"Program `{progname}` did not end in time")
-
- wait_for_program("GENERATE")
- wait_for_program("GENERATE_OFFLINE")
+ self._driver.wait_for_generate()
def _cnf_or_list(self, cnf, operation) -> CnfList:
"""
self._driver.set_cnf(input_str=str(cnf), fix_problems=fix_problems)
except subprocess.CalledProcessError as ex:
raise CommitException(cnf, ex.stderr) from None
- self._call_arnied(arnied_wrapper.wait_for_generate)
+ self._driver.wait_for_generate()
def delete(self, cnf, fix_problems=False):
"""
self._driver.set_cnf(input_str=str(cnf), delete=True, fix_problems=fix_problems)
except subprocess.CalledProcessError as ex:
raise CommitException(cnf, ex.stderr) from None
- self._call_arnied(arnied_wrapper.wait_for_generate)
+ self._driver.wait_for_generate()
def _call_arnied(self, fn, *args, **kwargs):
"""
store = CnfStore()
with patch.object(Arnied, "get_cnf", return_value=fake_ret),\
patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
- patch.object(store, "_wait_for_generate"):
+ patch.object(Arnied, "wait_for_generate"):
self._query_and_update(store)
args, kwargs = set_cnf_mock.call_args
.where(lambda c: c.name == "THEME")
with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
- patch.object(store, "_wait_for_generate"):
+ patch.object(Arnied, "wait_for_generate"):
store.delete(cnfvars)
args, kwargs = set_cnf_mock.call_args
set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf)
self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start())
+ helper_mock = patch.object(binary, "BIN_ARNIED_HELPER", "/bin/true")
+ self._helper_mock = (helper_mock, helper_mock.start())
+
self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied")
self._arnied_mock.start()
self._arnied_mock.stop()
self._get_cnf_mock[0].stop()
self._set_cnf_mock[0].stop()
+ self._helper_mock[0].stop()
os.unlink(self._get_cnf_output)
os.unlink(self._set_cnf_input)