Remove __future__ statements and other remainders of python2-compatibility.
Remove checks for python version <3.6, assume we are running on python 3.6
or above.
Also run a major pylint on all the code and fix lots of things, including
many typos, missing paramters, strange whitespace, "r" in front of regex
string literals, ...
However, ignore deprecated modules.
:type provider_id: int
:param wait_online: whether to wait until online
:type wait_online: bool
+ :param int timeout: Seconds to wait in :py:func:`wait_for_online`
:param vm: vm to run on if running on a guest instead of the host
:type vm: :py:class:`virttest.qemu_vm.VM` or None
buffers.py: buffers of various shapes, sizes and functionalities
Featuring::
-
-* CircularBuffer
-* LogarithmicBuffer: saves only last N items, and after that less and less so
-* very few old items are kept
+ - CircularBuffer
+ - LogarithmicBuffer: saves only last N items, and after that less and less so
+ very few old items are kept
"""
+
class CircularBuffer:
""" circular buffer for data; saves last N sets
if size < 1:
raise ValueError('size must be positive!')
self.buffer_size = size
- self._buffer = [empty_element for idx in range(size)]
+ self._buffer = [empty_element for _ in range(size)]
self._buff_idx = 0
self.n_items = 0
""" internal helper for saving (or not) items in data_old """
# determine whether we throw it away or actually save it
- if age % 2**index != 0:
+ if age % 2 ** index != 0:
return
# we save it. But before check if we need to extend data_old or
if len(self._data_old) <= index:
self._data_old.append(item)
else:
- self._save_old(self._data_old[index], age, index+1)
+ self._save_old(self._data_old[index], age, index + 1)
self._data_old[index] = item
def get_all(self):
#
# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-""" Helpers for calling commands, capture their output, return result code
-
-Subprocess library just does not provide all the simplicity we would like
+"""
+Helpers for calling commands, capture their output, return result code.
-Stay python2 compatible --> no timeouts
+Subprocess library did not provide all the simplicity we would have liked.
+However, this has since changed, so consider using the easy-to-use builtin
+functions :py:func:`subprocess.run` or :py:func:`subprocess.call` instead.
"""
from subprocess import Popen, PIPE
:returns: (return_code, stdout, stderr); stdout and stderr are lists of
text lines (without terminating newlines); if universal_newlines is
True (default), the lines are of type str, otherwise they are non-
- unicode text (type str in py2, bytes in py3). If split_lines is False
- (not default), then stdout and stderr are single multi-line strings
+ unicode text (type bytes). If split_lines is False (not default),
+ then stdout and stderr are single multi-line strings
:raise: OSError (e.g., if command does not exist), ValueError if args are
invalid; no :py:class:`subprocess.CalledProcessError` nor
"""Basic functions for on-the-fly arnied cnf-var generator"""
from .cnfline import CnfLine
-import os
-import tempfile
class BuildCnfVar(object):
:type cnfs: :py:class`CnfList`
When the instance of a cnfvar is -1, the backend initializes it
- automatically. However it starts on 1, whereas many variables are not
+ automatically. However, it starts on 1, whereas many variables are not
allowed to start on one (those that are meant to be unique, f.e.).
This method can be used in child classes to use an alternative scheme,
however for performance reasons the base API class uses the default and
"""
-from __future__ import print_function
import subprocess
from re import match as regexp
from os import EX_OK
# constants
DEFAULT_TELL_CONND_BINARY = '/usr/intranator/bin/tell-connd'
-# TIMEOUT = 1 can only be used with python3
+TIMEOUT = 10
ONLINE_MODE_ALWAYS_ONLINE = 'always online'
ONLINE_MODE_ALWAYS_OFFLINE = 'always offline'
def __str__(self):
return \
- '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]'\
+ '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]' \
.format(self.online_mode, self.default_provider,
len(self.connections), len(self.online_ips),
len(self.connected_vpns))
cmd_parts.extend(*args)
output = subprocess.check_output(cmd_parts,
stderr=subprocess.STDOUT,
- universal_newlines=True, shell=False) # py3:, timeout=TIMEOUT)
+ universal_newlines=True, shell=False,
+ timeout=TIMEOUT)
return EX_OK, output.splitlines()
except subprocess.CalledProcessError as cpe: # non-zero return status
output = [
'tell-connd exited with status {0}'.format(cpe.returncode), ]
output.extend(cpe.output.splitlines())
return cpe.returncode, output
- # not python-2-compatible:
- # except subprocess.TimeoutExpired as texp:
- # output = [
- # 'tell-connd timed out after {0}s. Returning -1'.format(
- # texp.timeout), ]
- # output.extend(te.output.splitlines())
- # return -1, output
+ except subprocess.TimeoutExpired as texp:
+ output = [f'tell-connd timed out after {texp.timeout}s. Returning -1', ]
+ output.extend(texp.output.splitlines())
+ return -1, output
except Exception as exp:
output = [str(exp), ]
return -1, output
line = next(output).strip()
assert line == 'subsys', 'expected subsys but got {0}'.format(line)
line = next(output).strip()
- state.subsys_online = regexp('online\s*:\s*(.*)$', line)\
+ state.subsys_online = regexp('online\s*:\s*(.*)$', line) \
.groups()[0].split()
for subsys in state.subsys_online:
assert subsys in ALL_SUBSYS, \
'unexpected subsys: {0}'.format(subsys)
line = next(output).strip()
- state.subsys_offline = regexp('offline\s*:\s*(.*)$', line)\
+ state.subsys_offline = regexp('offline\s*:\s*(.*)$', line) \
.groups()[0].split()
for subsys in state.subsys_offline:
assert subsys in ALL_SUBSYS, \
'unexpected subsys: {0}'.format(subsys)
line = next(output).strip()
- state.subsys_disabled = regexp('disabled\s*:\s*(.*)$', line)\
+ state.subsys_disabled = regexp('disabled\s*:\s*(.*)$', line) \
.groups()[0].split()
for subsys in state.subsys_disabled:
assert subsys in ALL_SUBSYS, \
assert line == 'connection map:', \
'expected connection map but got {0}'.format(line)
expect_new = True
+ conn_name = None
+ conn_info = None
for line in output:
line = line.strip()
if len(line) == 0:
'\[\s*(.+)\s*\]\s*:\s*\(\s*(.*)\s*\)', line).groups()
expect_new = False
else:
- conn_actions = regexp('actions\s*:\s*\[\s*(.+)\s*\]', line)\
+ conn_actions = regexp('actions\s*:\s*\[\s*(.+)\s*\]', line) \
.groups()
+ assert conn_name is not None and conn_info is not None, \
+ 'error parsing connection maps'
state.connections.append((conn_name, conn_info, conn_actions))
expect_new = True
+ conn_name = None
+ conn_info = None
assert expect_new
line = next(output).strip()
assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
# online IPs
line = next(output).strip()
- state.online_ips = regexp('list of online ips\s*:\s*(.*)', line)\
+ state.online_ips = regexp('list of online ips\s*:\s*(.*)', line) \
.groups()[0].split()
if len(state.online_ips) == 1 \
and state.online_ips[0].strip() == 'NONE':
:param prid: Provider id, default *P1*. It is up to the caller to ensure
this is a valid provider id.
:type prid: str
+ :param block: block execution until system is online
+ :type block: bool
:returns: Whether the ``doc`` command succeeded.
:rtype: int (dial result as above)
"""
:param prid: Provider id, default *P1*. It is up to the caller to ensure
this is a valid provider id.
:type prid: str
+ :param block: block execution until system is online
+ :type block: bool
:returns: Whether the ``tell-connd`` command succeeded.
:rtype: int (dial result as above)
"""
elif isinstance(mode, str) is True:
try:
dmode = DIALOUT_MODE_BY_NAME[mode]
- except:
+ except Exception:
log.error("invalid online mode name “%s” requested" % mode)
pass
implementation is incomplete, but should suffice for current use.
:param str folder: Name of a folder in an IMAP mailbox, possibly quoted
- :returns: same folder name, possibly with added quotes
+ :returns: same folder name as bytes, possibly with added quotes
:rtype: bytes
"""
if not folder:
- return folder
+ return b''
if folder[0] == '"' and folder[-1] == '"':
- return folder # is already quoted
+ return folder.encode('ascii', 'strict') # is already quoted
if any(special in folder for special in ' (){}"\\[]'):
# see RFC3501 $5.1 and $9
- return '"' + folder + '"' # quote the folder
+ return f'"{folder}"'.encode('ascii', 'strict') # quote the folder
return folder.encode('ascii', 'strict')
- ...
Before adding too much capability here, consider using libraries that do
all that already like ImapClient (which unfortunately does not deal with
- unsolicited reponses)...
+ unsolicited responses)...
Cannot deal with unicode folder names.
"""
User and password can be given either here or to method
:py:meth:`login`.
- :param str user: User name for imap mailbox, optional
+ :param str user: Username for imap mailbox, optional
:param str password: Password for logging into mailbox
:param str host: Host name, defaults to localhost
:param debug_log: Either bool to enable/disable debug logging or a file
self._debug_log_handle = None
if debug_log:
self.debug = 4 # enable debugging in super class
- # overwrite prviate debug log function of base class with own.
+ # overwrite private debug log function of base class with own.
# this is slightly hacky since depends on private implementation
if isinstance(debug_log, str):
self._debug_log_handle = open(debug_log, 'at')
"""
Log in to mailbox.
- :param str user: User name to use for login. Overrides the one given in
+ :param str user: Username to use for login. Overrides the one given in
constructor. Must be given if not given to constructor
:param str password: Password for login. Same restrictions as user
:returns: first return item from imap login.
import logging
from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL, NOTSET
-from logging.handlers import RotatingFileHandler
+from logging.handlers import RotatingFileHandler, SysLogHandler
from math import log10, floor
import sys
formatter.add_level(notice, 'note')
logger.log(notice, 'more important than info but no warning nor error')
- .. seealso:: testing funcion :py:func:`test_short_level_format`
+ .. seealso:: testing function :py:func:`test_short_level_format`
"""
def __init__(self, fmt=DEFAULT_SHORT_LEVEL_FORMAT,
* Simplifies setting of logger's logging level using
:py:meth:`Logger.set_level`. The level is kept in sync between logger and
handlers and can be queried using :py:meth:`get_level`
- (does not work accross logger hierarchies!)
+ (does not work across logger hierarchies!)
* can specify name and level and [date]fmt all in constructor
* can limit the number of lines this logger will produce to prevent filling
hard drive with log file
:py:class:`logging.handlers.RotatingFileHandler` to avoid huge log
files filling up the disc in case of error.
+ :param str filename: path to file that logger should write to
:param str mode: Mode with which to open log file; forwarded to handler
:param int max_bytes: Max size in bytes of a log file before it is
renamed to `[filename].1` and logging continues
with an empty `[filename]`.
- :param int max_rots: Max number of rotated files to keep
+ :param int max_rotations: Max number of rotated files to keep
:param dict kwargs: All other args (e.g. `encoding`) are forwarded to
handler constructor. `maxBytes` and `backupCount`
are overwritten with `max_bytes` and `max_rots`.
Returns the number of digits a number has in decimal format
:returns: 1 for 1...9, 2 for 10...99, 3 for 100...999, ...
- 0 for 0 (and everything else beween -1 and 1)
+ 0 for 0 (and everything else between -1 and 1)
1 for -1...-9, 2 for -10...-99, ...
"""
if abs(number) < 1:
"""
From raw new data create some yield-able results.
- Intended for overwriting in sub-classes.
+ Intended for overwriting in subclasses.
This function is called from __iter__ for each new data that becomes
available. It has to return some iterable whose entries are yielded
from iteration over objects of this class.
- The result must be an iterable of objects, which are yielded as-is, so
- can have any form.
-
This base implementation just returns its input in a list, so new data
- is yielded from __iter__ as-is.
-
- Subclass implementations can also yield tuples.
+ is yielded from __iter__ as-is. Subclass implementations can also yield
+ tuples.
:param str description: Description of source of lines, one of
:py:data:`self.descriptions`
- :param str new_data: Text data read from source
+ :param str data: Text data read from source
:param idx: Index of data source
:returns: [(description, data, idx], same as input
:rtype [(str, str, int)]
super(LogParser, self).prepare_result(*args):
result = re.match(self.pattern, raw_line)
if result:
- return (description, result, idx)
+ return description, result, idx
else:
- return (description, raw_line, idx)
+ return description, raw_line, idx
"""
-import os
from base64 import b64decode
-import re
-import logging
from email.utils import parsedate_to_datetime
from email.parser import BytesParser
from email import policy
-from . import arnied_wrapper
-
# outsourced source, import required for compatiblity
from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
from .mail_validator import * # pylint: disable=unused-import
* inbox --> (the empty string)
Would like to use a general modified utf-7-encoder/decoder but python has
- non built-in (see https://bugs.python.org/issue5305) and an extra lib like
+ none builtin (see https://bugs.python.org/issue5305) and an extra lib like
https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
control the input to this function via params and this is enough umlaut-
testing I think...
Only for ascii filenames: also unwraps file names if they are line-wrapped.
But note that this may remove too much whitespace from the filename if
- line-wrapping happend in the same position as the filename's whitespace.
+ line-wrapping happened in the same position as the filename's whitespace.
To get unwrapped version, set param `do_unwrap` to `False`.
See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
pass
-class MailValidator():
+class MailValidator:
"""Class for validation of emails."""
def target_path(self, new_value=None):
:param bool in_target: whether the verified email is on the target side
If `in_target` is set to True we are getting the target id from the
- target list of a source email. Otherwise we assume a target email from
+ target list of a source email. Otherwise, we assume a target email from
a source list.
"""
if in_target:
X-Autotest-Signature for simpler recognition
of mail (if None do not add header)
:type autotest_signature: str or None
+ :param str subject: Subject of created mails
"""
text = 'This is an autogenerated email.\n'
if match_id is not None:
message_id = match_id.group(1).rstrip('\r\n')
if message_id == "":
- raise MissingEmailID("No id was found in target message %s so it "
- "cannot be properly matched"
- % (message_path))
+ raise MissingEmailID(f"No id was found in target message {message_path}, "
+ f"so it cannot be properly matched")
return message_id
def _default_compare_emails(self, source_email_path, target_email_path,
import time
import logging
-log = logging.getLogger('pyi2ncommon.mk_config')
# custom imports
from . import arnied_wrapper as aw
from .cnfline import build_cnfvar, build_group, build_intraclient
from .cnfline import build_nic, build_provider, build_user
+log = logging.getLogger('pyi2ncommon.mk_config')
###############################################################################
# MINOR CONFIGURATION
###############################################################################
+
def simple(varname, data, filename):
"""
Generate and save a single-variable configuration file.
:rtype: str
"""
log.info("Create arnied user configuration")
- user = batch_update_cnf(
+ user_obj = batch_update_cnf(
build_user.BuildUser(data=username, instance=instance, line_no=1),
[(Update, ("USER_FULLNAME", 0, username)),
(Update, ("USER_GROUP_MEMBER_REF", 0, "1")),
user_cnf = "user-%d-%s.cnf" % (time.time(), suffix)
[user_cnf] = aw.prep_config_paths([user_cnf], aw.DUMP_CONFIG_DIR)
logging.info("Saving user configuration to %s", user_cnf)
- user.save(user_cnf)
+ user_obj.save(user_cnf)
return user_cnf
(Update, ("GROUP_EMAIL_RELAY_RIGHTS", 0, "RELAY_FROM_INTRANET")),
(Update, ("GROUP_ACTIVESYNC_ENABLE", 0, "1" if activesync_enable else "0")),
(Update, ("GROUP_XAUTH_ENABLE", 0, "1" if xauth_enable else "0")),
- (Delete, ("GROUP_COMMENT"))])
+ (Delete, ("GROUP_COMMENT",))])
group_cnf = "group-%d-%s.cnf" % (time.time(), suffix)
[group_cnf] = aw.prep_config_paths([group_cnf], aw.DUMP_CONFIG_DIR)
logging.info("Saving group configuration to %s", group_cnf)
def group_all(proxy_profile="1", suffix="host"):
"""
- Generate and save an All group configuration file.
+ Generate and save an "All" group configuration file.
:param str proxy_profile: proxy profile instance reference
:param str suffix: optional suffix to use for config identification
(Update, ("GROUP_EMAILFILTER_BAN_FILTERLIST_REF", 0, "-1")),
(Update, ("GROUP_EMAIL_RELAY_RIGHTS", 0, "RELAY_FROM_EVERYWHERE")),
(Update, ("GROUP_PROXY_PROFILE_REF", 0, proxy_profile)),
- (Delete, ("GROUP_COMMENT"))])
+ (Delete, ("GROUP_COMMENT",))])
group_cnf = "group-%d-%s.cnf" % (time.time(), suffix)
[group_cnf] = aw.prep_config_paths([group_cnf], aw.DUMP_CONFIG_DIR)
:rtype: str
"""
log.info("Create arnied nic configuration")
- nic = batch_update_cnf(
+ nic_obj = batch_update_cnf(
build_nic.BuildNIC(data="", instance=instance, line_no=1),
[(Update, ("NIC_TYPE", 0, nictype)),
(Update, ("NIC_LAN_IP", 0, ip)),
nic_cnf = "nic-%d-%s.cnf" % (time.time(), suffix)
[nic_cnf] = aw.prep_config_paths([nic_cnf], aw.DUMP_CONFIG_DIR)
logging.info("Saving nic configuration to %s", nic_cnf)
- nic.save(nic_cnf)
+ nic_obj.save(nic_cnf)
return nic_cnf
:rtype: str
"""
log.info("Create arnied intraclient configuration")
- intraclient = batch_update_cnf(build_intraclient.BuildIntraclient(data=name,
- instance=instance),
- [(Update, ("INTRACLIENT_IP", 0, ip)),
- (Update, ("INTRACLIENT_MAC", 0, mac)),
- (Update, ("INTRACLIENT_FIREWALL_RULESET_REF", 0, fwrules))])
+ intraclient_obj = batch_update_cnf(
+ build_intraclient.BuildIntraclient(data=name, instance=instance),
+ [(Update, ("INTRACLIENT_IP", 0, ip)),
+ (Update, ("INTRACLIENT_MAC", 0, mac)),
+ (Update, ("INTRACLIENT_FIREWALL_RULESET_REF", 0, fwrules))])
intraclient_cnf = "intraclient-%d-%s.cnf" % (time.time(), suffix)
[intraclient_cnf] = aw.prep_config_paths([intraclient_cnf], aw.DUMP_CONFIG_DIR)
logging.info("Saving intraclient configuration to %s", intraclient_cnf)
- intraclient.save(intraclient_cnf)
+ intraclient_obj.save(intraclient_cnf)
return intraclient_cnf
:rtype: str
"""
log.info("Create arnied provider configuration")
+
def add_or_del(var, field):
if var is not None:
- return (Add, (field, 0, str(var)))
- return (Delete, field)
- provider = batch_update_cnf(build_provider.BuildProvider(data=name, instance=instance),
- [(Update, ("PROVIDER_MODE", 0, mode)),
- ip and (Update, ("PROVIDER_IP", 0, ip))
- or (Delete, "PROVIDER_IP"),
- localip
- and (Update, ("PROVIDER_LOCALIP", 0, localip))
- or (Delete, "PROVIDER_LOCALIP"),
- netmask and (Update, ("PROVIDER_NETMASK", 0,
- netmask))
- or (Delete, "PROVIDER_NETMASK"),
- (Update, ("PROVIDER_TIMEOUT", 0, timeout)),
- (Update, ("PROVIDER_DNS_MODE", 0, dnsmode)),
- (Update, ("PROVIDER_DNS", 0,
- dns if dnsmode == "IP" else "")),
- (Update, ("PROVIDER_MTU_MODE", 0, mtumode)),
- (Update, ("PROVIDER_MTU_SIZE", 0,
- mtusize if mtumode != "AUTO" else "")),
- (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules))),
- add_or_del(vlanid, "PROVIDER_VLAN_ID"),
- add_or_del(dialretry, "PROVIDER_DIAL_RETRY"),
- add_or_del(login, "PROVIDER_LOGIN"),
- add_or_del(password, "PROVIDER_PASSWORD"),
- add_or_del(modemip, "PROVIDER_MODEM_IP"),
- add_or_del(providerid, "PROVIDER_PROVIDERID"),
- add_or_del(localdhcp, "PROVIDER_LOCAL_DHCP")])
+ return Add, (field, 0, str(var))
+ return Delete, field
+ provider_obj = batch_update_cnf(
+ build_provider.BuildProvider(data=name, instance=instance),
+ [(Update, ("PROVIDER_MODE", 0, mode)),
+ ip and (Update, ("PROVIDER_IP", 0, ip))
+ or (Delete, "PROVIDER_IP"),
+ localip
+ and (Update, ("PROVIDER_LOCALIP", 0, localip))
+ or (Delete, "PROVIDER_LOCALIP"),
+ netmask and (Update, ("PROVIDER_NETMASK", 0,
+ netmask))
+ or (Delete, "PROVIDER_NETMASK"),
+ (Update, ("PROVIDER_TIMEOUT", 0, timeout)),
+ (Update, ("PROVIDER_DNS_MODE", 0, dnsmode)),
+ (Update, ("PROVIDER_DNS", 0,
+ dns if dnsmode == "IP" else "")),
+ (Update, ("PROVIDER_MTU_MODE", 0, mtumode)),
+ (Update, ("PROVIDER_MTU_SIZE", 0,
+ mtusize if mtumode != "AUTO" else "")),
+ (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules))),
+ add_or_del(vlanid, "PROVIDER_VLAN_ID"),
+ add_or_del(dialretry, "PROVIDER_DIAL_RETRY"),
+ add_or_del(login, "PROVIDER_LOGIN"),
+ add_or_del(password, "PROVIDER_PASSWORD"),
+ add_or_del(modemip, "PROVIDER_MODEM_IP"),
+ add_or_del(providerid, "PROVIDER_PROVIDERID"),
+ add_or_del(localdhcp, "PROVIDER_LOCAL_DHCP")])
provider_cnf = "provider-%d-%s.cnf" % (time.time(), suffix)
[provider_cnf] = aw.prep_config_paths([provider_cnf], aw.DUMP_CONFIG_DIR)
logging.info("Saving provider configuration to %s", provider_cnf)
- provider.save(provider_cnf)
+ provider_obj.save(provider_cnf)
return provider_cnf
:rtype: str
"""
log.info("Create arnied provider configuration.")
- provider = batch_update_cnf(build_provider.BuildProvider(),
- [(Update, ("PROVIDER_MODE", 0, mode)),
- (Update, ("PROVIDER_DNS", 0, ip)),
- (Update, ("PROVIDER_DYNDNS_ENABLE", 0, "0")),
- (Update, ("PROVIDER_IP", 0, ip)),
- (Update, ("PROVIDER_PROXY_SERVER", 0, ip)),
- (Update, ("PROVIDER_PROXY_PORT", 0, str(proxy_port))),
- localip
- and (Update, ("PROVIDER_LOCALIP", 0, localip))
- or (Delete, "PROVIDER_LOCALIP"),
- (Update, ("PROVIDER_DNS_MODE", 0, "IP")),
- (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules)))])
+ provider_obj = batch_update_cnf(
+ build_provider.BuildProvider(),
+ [(Update, ("PROVIDER_MODE", 0, mode)),
+ (Update, ("PROVIDER_DNS", 0, ip)),
+ (Update, ("PROVIDER_DYNDNS_ENABLE", 0, "0")),
+ (Update, ("PROVIDER_IP", 0, ip)),
+ (Update, ("PROVIDER_PROXY_SERVER", 0, ip)),
+ (Update, ("PROVIDER_PROXY_PORT", 0, str(proxy_port))),
+ localip
+ and (Update, ("PROVIDER_LOCALIP", 0, localip))
+ or (Delete, "PROVIDER_LOCALIP"),
+ (Update, ("PROVIDER_DNS_MODE", 0, "IP")),
+ (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules)))])
provider_cnf = "provider-%d-%s.cnf" % (time.time(), suffix)
[provider_cnf] = aw.prep_config_paths([provider_cnf], aw.DUMP_CONFIG_DIR)
logging.info("Saving provider configuration to %s", provider_cnf)
- provider.save(provider_cnf)
+ provider_obj.save(provider_cnf)
return provider_cnf
run_cmd_with_pipe
Wrapper for the default use case of the cumbersome "subprocess" library.
- Acceps a list of arguments that describe the command invocation. Returns
+ Accepts a list of arguments that describe the command invocation. Returns
``True`` and the contents of ``stdout`` if the pipe returned sucessfully,
``False`` plus ``stderr`` and the exit status otherwise. For example::
import re
import sysmisc
def parse(line):
- if re.match('\d', line):
+ if re.match(r'\d', line):
print('found digits in line!')
sysmisc.read_linewise('dump_db', parse)
hash_file
- Return a hash of a flie.
+ Return a hash of a file.
cheat_reboot
Replace the reboot binary with a fake one.
"""
-from __future__ import print_function
-
import re
import subprocess
import hashlib
llog = logging.getLogger('pyi2ncommon.sysmisc')
-__all__ = ("inf", "run_cmd_with_pipe", "get_mountpoints_by_type", "read_linewise", "hash_file", "cheat_reboot", "RUN_RESULT_OK", "RUN_RESULT_TIMEDOUT", "RUN_RESULT_FAIL", "RUN_RESULT_NAME", "cmd_block_till")
+__all__ = ("inf", "run_cmd_with_pipe", "get_mountpoints_by_type",
+ "read_linewise", "hash_file", "cheat_reboot",
+ "RUN_RESULT_OK", "RUN_RESULT_TIMEDOUT", "RUN_RESULT_FAIL",
+ "RUN_RESULT_NAME", "cmd_block_till")
###############################################################################
http://code.activestate.com/recipes/576620-changedirectory-context-manager/
(MIT license)
- :arg str path: paht to temporarily switch to
+ :arg str path: path to temporarily switch to
"""
orig_wd = os.getcwd()
os.chdir(path)
try:
with open(procmounts, "r") as m:
lines = list(m)
- pat = re.compile("^[^\s]+\s+([^\s]+)\s+" + fstype + "\s+.*$")
+ pat = re.compile(r"^\S+\s+(\S+)\s+" + fstype + r"\s+.*$")
mps = [mp.group(1)
for mp in map(lambda l: re.match(pat, l), lines)
if mp]
- except IOError as e:
- raise IOError("Failed to read %s." % procmounts)
+ except IOError:
+ raise IOError(f"Failed to read {procmounts}")
if not mps:
return None
return mps
# if proc.poll() is not None:
# break
- #rest_output,_ = proc.communicate()
- #for line in rest_output:
- # func(line)
+ # rest_output,_ = proc.communicate()
+ # for line in rest_output:
+ # func(line)
return proc.wait()
This replaces the ``reboot-intranator`` executable by script which
replaces itself by the backed up executable upon the next invocation.
"""
- #path = utils.system_output("which reboot")
+ # path = utils.system_output("which reboot")
path = "/usr/intranator/bin/reboot-intranator"
suffix = uuid.uuid1()
backup = backup_fmt % (path, backup_infix, suffix)
:param cmd: Command line or callback to execute. Function arguments must
have the same signature as :py:func:`run_cmd_with_pipe`.
- :type cmd: [str] | types.FunctionType
+ :type cmd: [str] | types.FunctionType
:param int timeout: Blocking timeout
-
- :returns: Pair of result and error message if appropriate or
- :py:value:`None`.
- :rtype: (run_result, str | None)
+ :param cond: Function to call; code will wait for this to return something
+ other than `False`
+ :param interval: Time (in seconds) to sleep between each attempt at `cond`
+ :returns: Pair of result and error message if appropriate or :py:value:`None`.
+ :rtype: (run_result, str | None)
"""
llog.debug("cmd_block_till: %r, %d s, %r", cmd, timeout, cond)
if isinstance(cmd, types.FunctionType):
else:
label = ""
name = CURRENT_TEST_NAME if isinstance(CURRENT_TEST_NAME, str) else ""
- fmt, label = str(fmt), str(label) # yes
+ fmt, label = str(fmt), str(label) # yes
llog.info("[%s%s] %s" % (name, label, fmt), *args)
# TODO: this method is more dynamic
# llog.info("[%s%s] %s%s" % (LOG_TAG, "", LOG_INDENT*indent, fmt), *args)
if i % 100 == 0:
log.info("%i done\n", i)
- if not re.match("^[0-9]+\.$", files[i]):
+ if not re.match(r"^\d+\.$", files[i]):
continue
if os.path.getsize(file_path) == 0:
log.warning("Skipping empty file %s", file_path)
CONTENTS
------------------------------------------------------
This module has two parts. Part 1 includes:
-
-- head_and_tail: shows the first few and last few elements of an iterable that
- could potentially be pretty long
-- size_str: textual representation of data size
+ - head_and_tail: shows the first few and last few elements of an iterable that
+ could potentially be pretty long
+ - size_str: textual representation of data size
Part2 contains functions for coloring text, a poor-man's version of other
modules like :py:mod:`colorclass` (which is now also available on Intra2net
------------------------------------------------------
"""
-try:
- from builtins import print as _builtin_print
-except ImportError: # different name in py2
- from __builtin__ import print as _builtin_print
+from builtins import print as _builtin_print
from functools import partial
from itertools import islice
Rounds and shortens size to something easily human-readable like '1.5 GB'.
- :param int byte_number: Number of bytes to express as text
+ :param float byte_number: Number of bytes to express as text
:param bool is_diff: Set to True to include a '+' or '-' in output;
default: False
:returns: textual representation of data
# have an impossible amount of data. (>1024**4 GB)
# undo last "/factor" and show thousand-separator
- return '{2}{0:,d} {1}B'.format(int(round(curr_num*factor)), units[-1],
- sign_str)
+ return f'{sign_str}{int(round(curr_num*factor)):,d} {units[-1]}B'
###############################################################################
STYLE_REVERSE = 7
-_COLOR_TO_CODE = dict(zip([COLOR_BLACK, COLOR_RED, COLOR_GREEN, COLOR_YELLOW, \
+_COLOR_TO_CODE = dict(zip([COLOR_BLACK, COLOR_RED, COLOR_GREEN, COLOR_YELLOW,
COLOR_BLUE, COLOR_MAGENTA, COLOR_CYAN, COLOR_WHITE],
range(8)))
def colored(text, foreground=None, background=None, style=None):
- """ return text with given foreground/background ANSI color escape seqence
+ """ return text with given foreground/background ANSI color escape sequence
:param str text: text to color
:param str style: one of the style constants above
:param str foreground: one of the color constants to use for text color
or None to leave as-is
- :param str foreground: one of the color constants to use for background
+ :param str background: one of the color constants to use for background
or None to leave as-is
:param style: single STYLE constant or iterable of those
or None to leave as-is
# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
"""
-Helpers for type checking and conversion, like isstr(x), is_file_obj(x)
+Helpers for type checking and conversion, like `isstr(x)`, `is_file_obj(x)`
+
+Some of these things used to be more complicated when python2 was still
+around, so some functions are not that useful anymore.
"""
-import sys
from io import IOBase
def isstr(var):
- """ determines if the given var is a (regular/unicode/raw) string or not
-
- in python2, u'a' is not a subclass of str, so to get a True as result, you
- have to test for basestring as superclass.
-
- In python3 that is no longer the case
-
- @returns True if the input is a regular string, a unicode or a raw string,
- False for everything else (including byte literals like b'abc')
-
- For more complex py2/py3 compatibility issues, consider using six
- (https://pythonhosted.org/six)
- """
-
+ """Alias for `isinstance(var, str)`."""
return isinstance(var, str)
def is_str_or_byte(var):
- """ returns true for str, unicode and byte objects """
+ """Returns `True` for `str` and `bytes` objects."""
return isinstance(var, (str, bytes))
def is_unicode(var):
- """ returns true for unicode strings
-
- py2: return True for type unicode but not type str
- py3: return True for type str but not type bytes
- """
+ """Alias for `isinstance(var, str)`."""
return isinstance(var, str)
def is_file_obj(var):
- """ determines whether given input is the result of 'open(file_name)'
+ """
+ Determine whether given input is the result of 'open(file_name)'.
- just checks whether given var is subclass of io.IOBase, which is also True
- for 'file-like objects' like StringIO
+ Just checks whether given var is subclass of io.IOBase, which is also True
+ for 'file-like objects' like StringIO.
"""
return isinstance(var, IOBase)
def fix_range(self):
if self.val_range <= 0:
- raise TypeError("IP address ranges need to be natural numbers > 0."
- % (self.val_lo, self.val_range))
+ raise TypeError("IP address ranges need to be natural numbers > 0.")
hi = self.val_lo + self.val_range
if hi <= self.val_lo or hi > uint32_max:
raise ValueError("Invalid IP address range: %d+%d."
import urllib.parse as parse
import socket
import logging
+from .arnied_wrapper import accept_licence
+
log = logging.getLogger('pyi2ncommon.web_interface')
-from .arnied_wrapper import accept_licence
#: FQDN of local machine
LOCALHOST = socket.gethostname()
def find_in_form(regex, form="status", escape=False, check_certs=True):
"""
- Find a regex in I2N web page's status frame.
+ Find a regex in given I2N web page form.
:param str regex: regular expression to find
+ :param str form: form name to open
:param bool escape: whether to escape the regex
:param check_certs: forwarded to :py:func:`web_page_request`, see doc there
:returns: whether the regex was found
.. codeauthor:: Intra2net AG <info@intra2net>
"""
-import sys
-import os
-
-if sys.version_info.major < 3:
- raise ImportError('Did not backport zipfile from python 3.5 to py2')
-if sys.version_info.minor >= 6:
- # imports for _write_stream_36
- import shutil
-else:
- # imports for create_zipinfo, _write_stream_35 and _get_compressor
- from stat import S_ISDIR
- import time
- import zlib
- crc32 = zlib.crc32
- import bz2
- import struct
-if sys.version_info.minor >= 5:
- from zipfile import *
-else:
- # backport of zipfile from python 3.5 to support stream output
- from zipfile35 import *
+import shutil
+from zipfile import *
from .type_helpers import isstr
-# copied from zipfile.py
-ZIP64_LIMIT = (1 << 31) - 1
-
-
-def _get_compressor(compress_type):
- """Copied fomr zipfile.py in py3.5 (cannot legally import)"""
- if compress_type == ZIP_DEFLATED:
- return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED, -15)
- elif compress_type == ZIP_BZIP2:
- return bz2.BZ2Compressor()
- elif compress_type == ZIP_LZMA:
- return LZMACompressor()
- else:
- return None
-
-
class BytesTellWrapper:
"""
Wrapper around a write-only stream that supports tell but not seek
Create ZipInfo for given file
Optionally set arcname as name of file inside archive.
-
- Adapted from zipfile.py in (ZipInfo.from_file in py3.6, ZipFile.write
- in py3.5)
"""
- if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
- return ZipInfo.from_file(filename, arcname)
-
- st = os.stat(filename)
- isdir = S_ISDIR(st.st_mode)
- mtime = time.localtime(st.st_mtime)
- date_time = mtime[0:6]
- # Create ZipInfo instance to store file information
- if arcname is None:
- arcname = filename
- arcname = os.path.normpath(os.path.splitdrive(arcname)[1])
- while arcname[0] in (os.sep, os.altsep):
- arcname = arcname[1:]
- if isdir:
- arcname += '/'
- zinfo = ZipInfo(arcname, date_time)
- zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes
- if isdir:
- zinfo.compress_type = ZIP_STORED
- zinfo.file_size = 0
- zinfo.external_attr |= 0x10 # MS-DOS directory flag
- else:
- zinfo.compress_type = self.compression
- zinfo.file_size = st.st_size
-
- return zinfo
+ return ZipInfo.from_file(filename, arcname)
def write_stream(self, src, zinfo):
"""
This is a shortened version of python's
:py:func:`zipfile.ZipFile.write`.
"""
- if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
- return self._write_stream_36(src, zinfo)
- else:
- return self._write_stream_35(src, zinfo)
-
- def _write_stream_35(self, src, zinfo):
- """Implementation of _write_stream based on ZipFile.write (py 3.5)"""
- if not self.fp:
- raise RuntimeError(
- "Attempt to write to ZIP archive that was already closed")
-
- zinfo.flag_bits = 0x00
-
- with self._lock:
- zinfo.header_offset = self.fp.tell() # Start of header bytes
- if zinfo.compress_type == ZIP_LZMA:
- # Compressed data includes an end-of-stream (EOS) marker
- zinfo.flag_bits |= 0x02
-
- self._writecheck(zinfo)
- self._didModify = True
-
- cmpr = _get_compressor(zinfo.compress_type)
- zinfo.flag_bits |= 0x08
-
- # Must overwrite CRC and sizes with correct data later
- zinfo.CRC = CRC = 0
- zinfo.compress_size = compress_size = 0
- # Compressed size can be larger than uncompressed size
- zip64 = self._allowZip64 and \
- zinfo.file_size * 1.05 > ZIP64_LIMIT
- self.fp.write(zinfo.FileHeader(zip64))
- file_size = 0
- while 1:
- buf = src.read(1024 * 8)
- if not buf:
- break
- file_size = file_size + len(buf)
- CRC = crc32(buf, CRC) & 0xffffffff
- if cmpr:
- buf = cmpr.compress(buf)
- compress_size = compress_size + len(buf)
- self.fp.write(buf)
- if cmpr:
- buf = cmpr.flush()
- compress_size = compress_size + len(buf)
- self.fp.write(buf)
- zinfo.compress_size = compress_size
- else:
- zinfo.compress_size = file_size
- zinfo.CRC = CRC
- zinfo.file_size = file_size
-
- # Write CRC and file sizes after the file data
- fmt = '<LQQ' if zip64 else '<LLL'
- self.fp.write(struct.pack(fmt, zinfo.CRC, zinfo.compress_size,
- zinfo.file_size))
- self.start_dir = self.fp.tell()
- self.filelist.append(zinfo)
- self.NameToInfo[zinfo.filename] = zinfo
-
- def _write_stream_36(self, src, zinfo):
- """Implementation of _write_stream based on ZipFile.write (py 3.6)"""
if not self.fp:
raise ValueError(
"Attempt to write to ZIP archive that was already closed")
""" test_argparse_helpers.py: unit tests for argparse_helpers
-Tests classes and functions in argparse_helpers
+Test classes and functions in argparse_helpers
For help see :py:mod:`unittest`
.. codeauthor:: Intra2net
"""
-from __future__ import print_function
-from __future__ import absolute_import
-
import unittest
import os
from os.path import isfile
from tempfile import mkstemp
-import sys
# relative import of tested module ensures we do not test installed version
try:
# test with existing file
parser = argparse_helpers.NonExitingParser()
parser.add_argument('--input', type=argparse_helpers.existing_file)
- parser.parse_args(['--input', temp_file, ])
-
- # remove file
- os.remove(temp_file)
-
- # test with non-existing file
- if sys.version_info.major < 3:
- self.assertRaises(argparse_helpers.ArgParserWantsExit,
- parser.parse_args, ['--input', temp_file, ])
- else:
- self.assertRaisesRegex(argparse_helpers.ArgParserWantsExit,
- 'is not an existing file',
- parser.parse_args,
- ['--input', temp_file, ])
- temp_file = None
finally:
if temp_handle:
os.close(temp_handle)
if temp_file and isfile(temp_file):
os.remove(temp_file)
+ # test with non-existing file
+ self.assertRaisesRegex(argparse_helpers.ArgParserWantsExit,
+ 'is not an existing file',
+ parser.parse_args,
+ ['--input', temp_file, ])
+
if __name__ == '__main__':
unittest.main()
"""Unit tests for :py:mod:`pyi2ncommon.call_helpers`."""
-from __future__ import absolute_import
-
import unittest
import os
.. codeauthor:: Intra2net
"""
-from __future__ import print_function
-from __future__ import absolute_import
-
import unittest
# relative import of tested module ensures we do not test installed version
.. codeauthor:: Intra2net AG <info@intra2net.com>
"""
-import sys
import unittest
import logging
from tempfile import mkstemp
from src.log_read import IterativeReader, LineReader, LogReadWarning
# get best clock
-from sys import version_info
-if version_info.major == 2:
- raise NotImplementedError('pyi2ncommon is no longer compatible with py2')
-elif version_info.minor < 4:
- perf_counter = time.clock
-else:
- perf_counter = time.perf_counter
+perf_counter = time.perf_counter
DEBUG = False
""" type_helper_unittest.py: unit tests for type_helpers
-Tests classes and functions in type_helpers
+Test classes and functions in type_helpers
Should be run from python2 and python3!
"""
import unittest
-
-from src.type_helpers import is_unicode, isstr
from sys import version_info
+from src.type_helpers import is_unicode, isstr
-is_py2 = version_info.major == 2
class TypeHelperTester(unittest.TestCase):
- def test_is_py2_or_py3(self):
- """ test that python version is 2 or 3
-
- when implementing type_helpers, there was no py4 and no idea what it
- might be like. Also will probably fail for python v1
- """
- self.assertIn(version_info.major, (2, 3))
-
def test_is_unicode(self):
""" tests function is_unicode """
self.assertFalse(is_unicode(unittest.TestCase))
self.assertFalse(is_unicode(type(unittest.TestCase)))
- if is_py2:
- self.assertTrue(is_unicode(u'bla'))
- self.assertTrue(eval("is_unicode(ur'bla')")) # not allowed in py3!
- self.assertFalse(is_unicode('bla'))
- self.assertFalse(is_unicode(r'bla'))
- else:
- self.assertTrue(is_unicode('bla'))
- self.assertTrue(is_unicode(r'bla'))
- self.assertFalse(is_unicode(b'bla'))
- self.assertFalse(is_unicode(br'bla'))
+ self.assertTrue(is_unicode('bla'))
+ self.assertTrue(is_unicode(r'bla'))
+ self.assertFalse(is_unicode(b'bla'))
+ self.assertFalse(is_unicode(br'bla'))
def test_isstr(self):
""" test function isstr """
tests = [
('abc', True), (u'abc', True), (r'abc', True),
- (1, False), (['a', 'b', 'c'], False), (('a', 'b', 'c'), False)]
- if not is_py2:
- tests.append((b'abc', False)) # b'' does not exist in py2
+ (1, False), (['a', 'b', 'c'], False), (('a', 'b', 'c'), False),
+ (b'abc', False)]
for test_var, expected_result in tests:
self.assertEqual(isstr(test_var), expected_result,
#
# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-from __future__ import absolute_import
-
import unittest
from src import v4_addr_range
""" test_zip_stream.py: unit tests for zip_stream
-Tests classes and functions in :py:mod:`pyi2ncommon.zip_stream`.
-
-Only runs in python3 since zip_stream is not py2-compatible (see comment in
-module doc there)
-
-For help see :py:mod:`unittest`
+Test classes and functions in :py:mod:`pyi2ncommon.zip_stream`.
.. codeauthor:: Intra2net
"""
-from __future__ import print_function
-from __future__ import absolute_import
-
import unittest
from tempfile import mkdtemp
import os
def readable(self):
return False
- def seek(sel, *args):
+ def seek(self, *args):
raise AttributeError('this function was removed')
def readline(self, *args):
def _test_zip(self, test_subdir=False):
"""Helper for test_* functions: check given zip for contents"""
- expect_contents = set((TEXT_FILE, BIN_FILE))
+ expect_contents = {TEXT_FILE, BIN_FILE}
if test_subdir:
expect_contents.add(SUBDIR + '/')