From: Christian Herdtweck Date: Mon, 4 Apr 2022 15:45:14 +0000 (+0200) Subject: Clean up, remove compat with py < 3.6 X-Git-Tag: v1.7.1~10 X-Git-Url: http://developer.intra2net.com/git/?p=pyi2ncommon;a=commitdiff_plain;h=7628bc484b23ebce4a92bdaeabf0da9f456661a5 Clean up, remove compat with py < 3.6 Remove __future__ statements and other remainders of python2-compatibility. Remove checks for python version <3.6, assume we are running on python 3.6 or above. Also run a major pylint on all the code and fix lots of things, including many typos, missing paramters, strange whitespace, "r" in front of regex string literals, ... However, ignore deprecated modules. --- diff --git a/src/arnied_wrapper.py b/src/arnied_wrapper.py index 5cb96f4..cf51f16 100644 --- a/src/arnied_wrapper.py +++ b/src/arnied_wrapper.py @@ -158,6 +158,7 @@ def go_online(provider_id, wait_online=True, timeout=60, vm=None): :type provider_id: int :param wait_online: whether to wait until online :type wait_online: bool + :param int timeout: Seconds to wait in :py:func:`wait_for_online` :param vm: vm to run on if running on a guest instead of the host :type vm: :py:class:`virttest.qemu_vm.VM` or None diff --git a/src/buffers.py b/src/buffers.py index d0588d2..aa79de3 100644 --- a/src/buffers.py +++ b/src/buffers.py @@ -22,12 +22,12 @@ buffers.py: buffers of various shapes, sizes and functionalities Featuring:: - -* CircularBuffer -* LogarithmicBuffer: saves only last N items, and after that less and less so -* very few old items are kept + - CircularBuffer + - LogarithmicBuffer: saves only last N items, and after that less and less so + very few old items are kept """ + class CircularBuffer: """ circular buffer for data; saves last N sets @@ -46,7 +46,7 @@ class CircularBuffer: if size < 1: raise ValueError('size must be positive!') self.buffer_size = size - self._buffer = [empty_element for idx in range(size)] + self._buffer = [empty_element for _ in range(size)] self._buff_idx = 0 self.n_items = 0 @@ -119,7 +119,7 @@ class LogarithmicBuffer: """ internal helper for saving (or not) items in data_old """ # determine whether we throw it away or actually save it - if age % 2**index != 0: + if age % 2 ** index != 0: return # we save it. But before check if we need to extend data_old or @@ -127,7 +127,7 @@ class LogarithmicBuffer: if len(self._data_old) <= index: self._data_old.append(item) else: - self._save_old(self._data_old[index], age, index+1) + self._save_old(self._data_old[index], age, index + 1) self._data_old[index] = item def get_all(self): diff --git a/src/call_helpers.py b/src/call_helpers.py index 4e945e1..9aa855c 100644 --- a/src/call_helpers.py +++ b/src/call_helpers.py @@ -18,11 +18,12 @@ # # Copyright (c) 2016-2018 Intra2net AG -""" Helpers for calling commands, capture their output, return result code - -Subprocess library just does not provide all the simplicity we would like +""" +Helpers for calling commands, capture their output, return result code. -Stay python2 compatible --> no timeouts +Subprocess library did not provide all the simplicity we would have liked. +However, this has since changed, so consider using the easy-to-use builtin +functions :py:func:`subprocess.run` or :py:func:`subprocess.call` instead. """ from subprocess import Popen, PIPE @@ -55,8 +56,8 @@ def call_and_capture(command, stdin_data=None, split_lines=True, :returns: (return_code, stdout, stderr); stdout and stderr are lists of text lines (without terminating newlines); if universal_newlines is True (default), the lines are of type str, otherwise they are non- - unicode text (type str in py2, bytes in py3). If split_lines is False - (not default), then stdout and stderr are single multi-line strings + unicode text (type bytes). If split_lines is False (not default), + then stdout and stderr are single multi-line strings :raise: OSError (e.g., if command does not exist), ValueError if args are invalid; no :py:class:`subprocess.CalledProcessError` nor diff --git a/src/cnfline/build_cnfvar.py b/src/cnfline/build_cnfvar.py index d57ad99..064bf41 100644 --- a/src/cnfline/build_cnfvar.py +++ b/src/cnfline/build_cnfvar.py @@ -21,8 +21,6 @@ """Basic functions for on-the-fly arnied cnf-var generator""" from .cnfline import CnfLine -import os -import tempfile class BuildCnfVar(object): diff --git a/src/cnfvar/store.py b/src/cnfvar/store.py index 0bfeb26..b0dcbcf 100644 --- a/src/cnfvar/store.py +++ b/src/cnfvar/store.py @@ -139,7 +139,7 @@ class CnfStore: :type cnfs: :py:class`CnfList` When the instance of a cnfvar is -1, the backend initializes it - automatically. However it starts on 1, whereas many variables are not + automatically. However, it starts on 1, whereas many variables are not allowed to start on one (those that are meant to be unique, f.e.). This method can be used in child classes to use an alternative scheme, however for performance reasons the base API class uses the default and diff --git a/src/connd_state.py b/src/connd_state.py index 6b2b3a4..df2bc67 100755 --- a/src/connd_state.py +++ b/src/connd_state.py @@ -34,14 +34,13 @@ INTERFACE """ -from __future__ import print_function import subprocess from re import match as regexp from os import EX_OK # constants DEFAULT_TELL_CONND_BINARY = '/usr/intranator/bin/tell-connd' -# TIMEOUT = 1 can only be used with python3 +TIMEOUT = 10 ONLINE_MODE_ALWAYS_ONLINE = 'always online' ONLINE_MODE_ALWAYS_OFFLINE = 'always offline' @@ -82,7 +81,7 @@ class ConndState(object): def __str__(self): return \ - '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]'\ + '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]' \ .format(self.online_mode, self.default_provider, len(self.connections), len(self.online_ips), len(self.connected_vpns)) @@ -188,20 +187,18 @@ class ConndState(object): cmd_parts.extend(*args) output = subprocess.check_output(cmd_parts, stderr=subprocess.STDOUT, - universal_newlines=True, shell=False) # py3:, timeout=TIMEOUT) + universal_newlines=True, shell=False, + timeout=TIMEOUT) return EX_OK, output.splitlines() except subprocess.CalledProcessError as cpe: # non-zero return status output = [ 'tell-connd exited with status {0}'.format(cpe.returncode), ] output.extend(cpe.output.splitlines()) return cpe.returncode, output - # not python-2-compatible: - # except subprocess.TimeoutExpired as texp: - # output = [ - # 'tell-connd timed out after {0}s. Returning -1'.format( - # texp.timeout), ] - # output.extend(te.output.splitlines()) - # return -1, output + except subprocess.TimeoutExpired as texp: + output = [f'tell-connd timed out after {texp.timeout}s. Returning -1', ] + output.extend(texp.output.splitlines()) + return -1, output except Exception as exp: output = [str(exp), ] return -1, output @@ -245,19 +242,19 @@ class ConndState(object): line = next(output).strip() assert line == 'subsys', 'expected subsys but got {0}'.format(line) line = next(output).strip() - state.subsys_online = regexp('online\s*:\s*(.*)$', line)\ + state.subsys_online = regexp('online\s*:\s*(.*)$', line) \ .groups()[0].split() for subsys in state.subsys_online: assert subsys in ALL_SUBSYS, \ 'unexpected subsys: {0}'.format(subsys) line = next(output).strip() - state.subsys_offline = regexp('offline\s*:\s*(.*)$', line)\ + state.subsys_offline = regexp('offline\s*:\s*(.*)$', line) \ .groups()[0].split() for subsys in state.subsys_offline: assert subsys in ALL_SUBSYS, \ 'unexpected subsys: {0}'.format(subsys) line = next(output).strip() - state.subsys_disabled = regexp('disabled\s*:\s*(.*)$', line)\ + state.subsys_disabled = regexp('disabled\s*:\s*(.*)$', line) \ .groups()[0].split() for subsys in state.subsys_disabled: assert subsys in ALL_SUBSYS, \ @@ -271,6 +268,8 @@ class ConndState(object): assert line == 'connection map:', \ 'expected connection map but got {0}'.format(line) expect_new = True + conn_name = None + conn_info = None for line in output: line = line.strip() if len(line) == 0: @@ -282,10 +281,14 @@ class ConndState(object): '\[\s*(.+)\s*\]\s*:\s*\(\s*(.*)\s*\)', line).groups() expect_new = False else: - conn_actions = regexp('actions\s*:\s*\[\s*(.+)\s*\]', line)\ + conn_actions = regexp('actions\s*:\s*\[\s*(.+)\s*\]', line) \ .groups() + assert conn_name is not None and conn_info is not None, \ + 'error parsing connection maps' state.connections.append((conn_name, conn_info, conn_actions)) expect_new = True + conn_name = None + conn_info = None assert expect_new line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) @@ -300,7 +303,7 @@ class ConndState(object): # online IPs line = next(output).strip() - state.online_ips = regexp('list of online ips\s*:\s*(.*)', line)\ + state.online_ips = regexp('list of online ips\s*:\s*(.*)', line) \ .groups()[0].split() if len(state.online_ips) == 1 \ and state.online_ips[0].strip() == 'NONE': diff --git a/src/dial.py b/src/dial.py index 30e930e..a87117e 100644 --- a/src/dial.py +++ b/src/dial.py @@ -140,6 +140,8 @@ def arnied_dial_doc(prid="P1", block=False): :param prid: Provider id, default *P1*. It is up to the caller to ensure this is a valid provider id. :type prid: str + :param block: block execution until system is online + :type block: bool :returns: Whether the ``doc`` command succeeded. :rtype: int (dial result as above) """ @@ -162,6 +164,8 @@ def arnied_dial_permanent(prid="P1", block=False): :param prid: Provider id, default *P1*. It is up to the caller to ensure this is a valid provider id. :type prid: str + :param block: block execution until system is online + :type block: bool :returns: Whether the ``tell-connd`` command succeeded. :rtype: int (dial result as above) """ @@ -206,7 +210,7 @@ def dialout(mode=DIALOUT_MODE_DEFAULT, prid="P1", block=True): elif isinstance(mode, str) is True: try: dmode = DIALOUT_MODE_BY_NAME[mode] - except: + except Exception: log.error("invalid online mode name “%s” requested" % mode) pass diff --git a/src/imap_mailbox.py b/src/imap_mailbox.py index 90be0ab..1f1d887 100644 --- a/src/imap_mailbox.py +++ b/src/imap_mailbox.py @@ -76,16 +76,16 @@ def quote_imap_folder(folder): implementation is incomplete, but should suffice for current use. :param str folder: Name of a folder in an IMAP mailbox, possibly quoted - :returns: same folder name, possibly with added quotes + :returns: same folder name as bytes, possibly with added quotes :rtype: bytes """ if not folder: - return folder + return b'' if folder[0] == '"' and folder[-1] == '"': - return folder # is already quoted + return folder.encode('ascii', 'strict') # is already quoted if any(special in folder for special in ' (){}"\\[]'): # see RFC3501 $5.1 and $9 - return '"' + folder + '"' # quote the folder + return f'"{folder}"'.encode('ascii', 'strict') # quote the folder return folder.encode('ascii', 'strict') @@ -112,7 +112,7 @@ class ImapMailbox(imaplib.IMAP4): - ... Before adding too much capability here, consider using libraries that do all that already like ImapClient (which unfortunately does not deal with - unsolicited reponses)... + unsolicited responses)... Cannot deal with unicode folder names. """ @@ -125,7 +125,7 @@ class ImapMailbox(imaplib.IMAP4): User and password can be given either here or to method :py:meth:`login`. - :param str user: User name for imap mailbox, optional + :param str user: Username for imap mailbox, optional :param str password: Password for logging into mailbox :param str host: Host name, defaults to localhost :param debug_log: Either bool to enable/disable debug logging or a file @@ -140,7 +140,7 @@ class ImapMailbox(imaplib.IMAP4): self._debug_log_handle = None if debug_log: self.debug = 4 # enable debugging in super class - # overwrite prviate debug log function of base class with own. + # overwrite private debug log function of base class with own. # this is slightly hacky since depends on private implementation if isinstance(debug_log, str): self._debug_log_handle = open(debug_log, 'at') @@ -193,7 +193,7 @@ class ImapMailbox(imaplib.IMAP4): """ Log in to mailbox. - :param str user: User name to use for login. Overrides the one given in + :param str user: Username to use for login. Overrides the one given in constructor. Must be given if not given to constructor :param str password: Password for login. Same restrictions as user :returns: first return item from imap login. diff --git a/src/log_helpers.py b/src/log_helpers.py index b76a011..87ab5f2 100644 --- a/src/log_helpers.py +++ b/src/log_helpers.py @@ -43,7 +43,7 @@ Further ideas: :: import logging from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL, NOTSET -from logging.handlers import RotatingFileHandler +from logging.handlers import RotatingFileHandler, SysLogHandler from math import log10, floor import sys @@ -104,7 +104,7 @@ class ShortLevelFormatter(logging.Formatter): formatter.add_level(notice, 'note') logger.log(notice, 'more important than info but no warning nor error') - .. seealso:: testing funcion :py:func:`test_short_level_format` + .. seealso:: testing function :py:func:`test_short_level_format` """ def __init__(self, fmt=DEFAULT_SHORT_LEVEL_FORMAT, @@ -175,7 +175,7 @@ class I2nLogger: * Simplifies setting of logger's logging level using :py:meth:`Logger.set_level`. The level is kept in sync between logger and handlers and can be queried using :py:meth:`get_level` - (does not work accross logger hierarchies!) + (does not work across logger hierarchies!) * can specify name and level and [date]fmt all in constructor * can limit the number of lines this logger will produce to prevent filling hard drive with log file @@ -397,11 +397,12 @@ class I2nLogger: :py:class:`logging.handlers.RotatingFileHandler` to avoid huge log files filling up the disc in case of error. + :param str filename: path to file that logger should write to :param str mode: Mode with which to open log file; forwarded to handler :param int max_bytes: Max size in bytes of a log file before it is renamed to `[filename].1` and logging continues with an empty `[filename]`. - :param int max_rots: Max number of rotated files to keep + :param int max_rotations: Max number of rotated files to keep :param dict kwargs: All other args (e.g. `encoding`) are forwarded to handler constructor. `maxBytes` and `backupCount` are overwritten with `max_bytes` and `max_rots`. @@ -418,7 +419,7 @@ def n_digits(number): Returns the number of digits a number has in decimal format :returns: 1 for 1...9, 2 for 10...99, 3 for 100...999, ... - 0 for 0 (and everything else beween -1 and 1) + 0 for 0 (and everything else between -1 and 1) 1 for -1...-9, 2 for -10...-99, ... """ if abs(number) < 1: diff --git a/src/log_read.py b/src/log_read.py index ce3e71b..1d8bef4 100644 --- a/src/log_read.py +++ b/src/log_read.py @@ -286,23 +286,19 @@ class IterativeReader(object): """ From raw new data create some yield-able results. - Intended for overwriting in sub-classes. + Intended for overwriting in subclasses. This function is called from __iter__ for each new data that becomes available. It has to return some iterable whose entries are yielded from iteration over objects of this class. - The result must be an iterable of objects, which are yielded as-is, so - can have any form. - This base implementation just returns its input in a list, so new data - is yielded from __iter__ as-is. - - Subclass implementations can also yield tuples. + is yielded from __iter__ as-is. Subclass implementations can also yield + tuples. :param str description: Description of source of lines, one of :py:data:`self.descriptions` - :param str new_data: Text data read from source + :param str data: Text data read from source :param idx: Index of data source :returns: [(description, data, idx], same as input :rtype [(str, str, int)] @@ -395,6 +391,6 @@ class LogParser(LineReader): super(LogParser, self).prepare_result(*args): result = re.match(self.pattern, raw_line) if result: - return (description, result, idx) + return description, result, idx else: - return (description, raw_line, idx) + return description, raw_line, idx diff --git a/src/mail_utils.py b/src/mail_utils.py index ed128df..e3339a1 100644 --- a/src/mail_utils.py +++ b/src/mail_utils.py @@ -37,16 +37,11 @@ INTERFACE """ -import os from base64 import b64decode -import re -import logging from email.utils import parsedate_to_datetime from email.parser import BytesParser from email import policy -from . import arnied_wrapper - # outsourced source, import required for compatiblity from .imap_mailbox import ImapMailbox # pylint: disable=unused-import from .mail_validator import * # pylint: disable=unused-import @@ -341,7 +336,7 @@ def cyrus_escape(user_or_folder, keep_path=False, regex=False): * inbox --> (the empty string) Would like to use a general modified utf-7-encoder/decoder but python has - non built-in (see https://bugs.python.org/issue5305) and an extra lib like + none builtin (see https://bugs.python.org/issue5305) and an extra lib like https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we control the input to this function via params and this is enough umlaut- testing I think... @@ -393,7 +388,7 @@ def get_filename(message, failobj=None, do_unwrap=True): Only for ascii filenames: also unwraps file names if they are line-wrapped. But note that this may remove too much whitespace from the filename if - line-wrapping happend in the same position as the filename's whitespace. + line-wrapping happened in the same position as the filename's whitespace. To get unwrapped version, set param `do_unwrap` to `False`. See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word diff --git a/src/mail_validator.py b/src/mail_validator.py index 8c93af7..7888665 100644 --- a/src/mail_validator.py +++ b/src/mail_validator.py @@ -93,7 +93,7 @@ class EmailMismatch(EmailException): # pylint: disable=missing-docstring pass -class MailValidator(): +class MailValidator: """Class for validation of emails.""" def target_path(self, new_value=None): @@ -242,7 +242,7 @@ class MailValidator(): :param bool in_target: whether the verified email is on the target side If `in_target` is set to True we are getting the target id from the - target list of a source email. Otherwise we assume a target email from + target list of a source email. Otherwise, we assume a target email from a source list. """ if in_target: @@ -435,6 +435,7 @@ class MailValidator(): X-Autotest-Signature for simpler recognition of mail (if None do not add header) :type autotest_signature: str or None + :param str subject: Subject of created mails """ text = 'This is an autogenerated email.\n' @@ -567,9 +568,8 @@ class MailValidator(): if match_id is not None: message_id = match_id.group(1).rstrip('\r\n') if message_id == "": - raise MissingEmailID("No id was found in target message %s so it " - "cannot be properly matched" - % (message_path)) + raise MissingEmailID(f"No id was found in target message {message_path}, " + f"so it cannot be properly matched") return message_id def _default_compare_emails(self, source_email_path, target_email_path, diff --git a/src/mk_config.py b/src/mk_config.py index 26def37..4bd44bc 100644 --- a/src/mk_config.py +++ b/src/mk_config.py @@ -39,7 +39,6 @@ INTERFACE import time import logging -log = logging.getLogger('pyi2ncommon.mk_config') # custom imports from . import arnied_wrapper as aw @@ -47,11 +46,13 @@ from .arnied_wrapper import Delete, Update, Add, Child, batch_update_cnf, build_ from .cnfline import build_cnfvar, build_group, build_intraclient from .cnfline import build_nic, build_provider, build_user +log = logging.getLogger('pyi2ncommon.mk_config') ############################################################################### # MINOR CONFIGURATION ############################################################################### + def simple(varname, data, filename): """ Generate and save a single-variable configuration file. @@ -81,7 +82,7 @@ def user(username="admin", instance=1, suffix="host"): :rtype: str """ log.info("Create arnied user configuration") - user = batch_update_cnf( + user_obj = batch_update_cnf( build_user.BuildUser(data=username, instance=instance, line_no=1), [(Update, ("USER_FULLNAME", 0, username)), (Update, ("USER_GROUP_MEMBER_REF", 0, "1")), @@ -93,7 +94,7 @@ def user(username="admin", instance=1, suffix="host"): user_cnf = "user-%d-%s.cnf" % (time.time(), suffix) [user_cnf] = aw.prep_config_paths([user_cnf], aw.DUMP_CONFIG_DIR) logging.info("Saving user configuration to %s", user_cnf) - user.save(user_cnf) + user_obj.save(user_cnf) return user_cnf @@ -118,7 +119,7 @@ def group_admins(proxy_profile="1", activesync_enable=False, xauth_enable=False, (Update, ("GROUP_EMAIL_RELAY_RIGHTS", 0, "RELAY_FROM_INTRANET")), (Update, ("GROUP_ACTIVESYNC_ENABLE", 0, "1" if activesync_enable else "0")), (Update, ("GROUP_XAUTH_ENABLE", 0, "1" if xauth_enable else "0")), - (Delete, ("GROUP_COMMENT"))]) + (Delete, ("GROUP_COMMENT",))]) group_cnf = "group-%d-%s.cnf" % (time.time(), suffix) [group_cnf] = aw.prep_config_paths([group_cnf], aw.DUMP_CONFIG_DIR) logging.info("Saving group configuration to %s", group_cnf) @@ -128,7 +129,7 @@ def group_admins(proxy_profile="1", activesync_enable=False, xauth_enable=False, def group_all(proxy_profile="1", suffix="host"): """ - Generate and save an All group configuration file. + Generate and save an "All" group configuration file. :param str proxy_profile: proxy profile instance reference :param str suffix: optional suffix to use for config identification @@ -151,7 +152,7 @@ def group_all(proxy_profile="1", suffix="host"): (Update, ("GROUP_EMAILFILTER_BAN_FILTERLIST_REF", 0, "-1")), (Update, ("GROUP_EMAIL_RELAY_RIGHTS", 0, "RELAY_FROM_EVERYWHERE")), (Update, ("GROUP_PROXY_PROFILE_REF", 0, proxy_profile)), - (Delete, ("GROUP_COMMENT"))]) + (Delete, ("GROUP_COMMENT",))]) group_cnf = "group-%d-%s.cnf" % (time.time(), suffix) [group_cnf] = aw.prep_config_paths([group_cnf], aw.DUMP_CONFIG_DIR) @@ -176,7 +177,7 @@ def nic(instance=0, nictype="NATLAN", :rtype: str """ log.info("Create arnied nic configuration") - nic = batch_update_cnf( + nic_obj = batch_update_cnf( build_nic.BuildNIC(data="", instance=instance, line_no=1), [(Update, ("NIC_TYPE", 0, nictype)), (Update, ("NIC_LAN_IP", 0, ip)), @@ -185,7 +186,7 @@ def nic(instance=0, nictype="NATLAN", nic_cnf = "nic-%d-%s.cnf" % (time.time(), suffix) [nic_cnf] = aw.prep_config_paths([nic_cnf], aw.DUMP_CONFIG_DIR) logging.info("Saving nic configuration to %s", nic_cnf) - nic.save(nic_cnf) + nic_obj.save(nic_cnf) return nic_cnf @@ -205,16 +206,16 @@ def intraclient(name="intraclient", instance=1, :rtype: str """ log.info("Create arnied intraclient configuration") - intraclient = batch_update_cnf(build_intraclient.BuildIntraclient(data=name, - instance=instance), - [(Update, ("INTRACLIENT_IP", 0, ip)), - (Update, ("INTRACLIENT_MAC", 0, mac)), - (Update, ("INTRACLIENT_FIREWALL_RULESET_REF", 0, fwrules))]) + intraclient_obj = batch_update_cnf( + build_intraclient.BuildIntraclient(data=name, instance=instance), + [(Update, ("INTRACLIENT_IP", 0, ip)), + (Update, ("INTRACLIENT_MAC", 0, mac)), + (Update, ("INTRACLIENT_FIREWALL_RULESET_REF", 0, fwrules))]) intraclient_cnf = "intraclient-%d-%s.cnf" % (time.time(), suffix) [intraclient_cnf] = aw.prep_config_paths([intraclient_cnf], aw.DUMP_CONFIG_DIR) logging.info("Saving intraclient configuration to %s", intraclient_cnf) - intraclient.save(intraclient_cnf) + intraclient_obj.save(intraclient_cnf) return intraclient_cnf @@ -243,39 +244,41 @@ def provider(name="provider", instance=1, mode="ROUTER", ip="1.2.3.4", localip=N :rtype: str """ log.info("Create arnied provider configuration") + def add_or_del(var, field): if var is not None: - return (Add, (field, 0, str(var))) - return (Delete, field) - provider = batch_update_cnf(build_provider.BuildProvider(data=name, instance=instance), - [(Update, ("PROVIDER_MODE", 0, mode)), - ip and (Update, ("PROVIDER_IP", 0, ip)) - or (Delete, "PROVIDER_IP"), - localip - and (Update, ("PROVIDER_LOCALIP", 0, localip)) - or (Delete, "PROVIDER_LOCALIP"), - netmask and (Update, ("PROVIDER_NETMASK", 0, - netmask)) - or (Delete, "PROVIDER_NETMASK"), - (Update, ("PROVIDER_TIMEOUT", 0, timeout)), - (Update, ("PROVIDER_DNS_MODE", 0, dnsmode)), - (Update, ("PROVIDER_DNS", 0, - dns if dnsmode == "IP" else "")), - (Update, ("PROVIDER_MTU_MODE", 0, mtumode)), - (Update, ("PROVIDER_MTU_SIZE", 0, - mtusize if mtumode != "AUTO" else "")), - (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules))), - add_or_del(vlanid, "PROVIDER_VLAN_ID"), - add_or_del(dialretry, "PROVIDER_DIAL_RETRY"), - add_or_del(login, "PROVIDER_LOGIN"), - add_or_del(password, "PROVIDER_PASSWORD"), - add_or_del(modemip, "PROVIDER_MODEM_IP"), - add_or_del(providerid, "PROVIDER_PROVIDERID"), - add_or_del(localdhcp, "PROVIDER_LOCAL_DHCP")]) + return Add, (field, 0, str(var)) + return Delete, field + provider_obj = batch_update_cnf( + build_provider.BuildProvider(data=name, instance=instance), + [(Update, ("PROVIDER_MODE", 0, mode)), + ip and (Update, ("PROVIDER_IP", 0, ip)) + or (Delete, "PROVIDER_IP"), + localip + and (Update, ("PROVIDER_LOCALIP", 0, localip)) + or (Delete, "PROVIDER_LOCALIP"), + netmask and (Update, ("PROVIDER_NETMASK", 0, + netmask)) + or (Delete, "PROVIDER_NETMASK"), + (Update, ("PROVIDER_TIMEOUT", 0, timeout)), + (Update, ("PROVIDER_DNS_MODE", 0, dnsmode)), + (Update, ("PROVIDER_DNS", 0, + dns if dnsmode == "IP" else "")), + (Update, ("PROVIDER_MTU_MODE", 0, mtumode)), + (Update, ("PROVIDER_MTU_SIZE", 0, + mtusize if mtumode != "AUTO" else "")), + (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules))), + add_or_del(vlanid, "PROVIDER_VLAN_ID"), + add_or_del(dialretry, "PROVIDER_DIAL_RETRY"), + add_or_del(login, "PROVIDER_LOGIN"), + add_or_del(password, "PROVIDER_PASSWORD"), + add_or_del(modemip, "PROVIDER_MODEM_IP"), + add_or_del(providerid, "PROVIDER_PROVIDERID"), + add_or_del(localdhcp, "PROVIDER_LOCAL_DHCP")]) provider_cnf = "provider-%d-%s.cnf" % (time.time(), suffix) [provider_cnf] = aw.prep_config_paths([provider_cnf], aw.DUMP_CONFIG_DIR) logging.info("Saving provider configuration to %s", provider_cnf) - provider.save(provider_cnf) + provider_obj.save(provider_cnf) return provider_cnf @@ -294,22 +297,23 @@ def provider_proxy(mode="ROUTER", ip="1.2.3.4", localip=None, proxy_port=3128, f :rtype: str """ log.info("Create arnied provider configuration.") - provider = batch_update_cnf(build_provider.BuildProvider(), - [(Update, ("PROVIDER_MODE", 0, mode)), - (Update, ("PROVIDER_DNS", 0, ip)), - (Update, ("PROVIDER_DYNDNS_ENABLE", 0, "0")), - (Update, ("PROVIDER_IP", 0, ip)), - (Update, ("PROVIDER_PROXY_SERVER", 0, ip)), - (Update, ("PROVIDER_PROXY_PORT", 0, str(proxy_port))), - localip - and (Update, ("PROVIDER_LOCALIP", 0, localip)) - or (Delete, "PROVIDER_LOCALIP"), - (Update, ("PROVIDER_DNS_MODE", 0, "IP")), - (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules)))]) + provider_obj = batch_update_cnf( + build_provider.BuildProvider(), + [(Update, ("PROVIDER_MODE", 0, mode)), + (Update, ("PROVIDER_DNS", 0, ip)), + (Update, ("PROVIDER_DYNDNS_ENABLE", 0, "0")), + (Update, ("PROVIDER_IP", 0, ip)), + (Update, ("PROVIDER_PROXY_SERVER", 0, ip)), + (Update, ("PROVIDER_PROXY_PORT", 0, str(proxy_port))), + localip + and (Update, ("PROVIDER_LOCALIP", 0, localip)) + or (Delete, "PROVIDER_LOCALIP"), + (Update, ("PROVIDER_DNS_MODE", 0, "IP")), + (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules)))]) provider_cnf = "provider-%d-%s.cnf" % (time.time(), suffix) [provider_cnf] = aw.prep_config_paths([provider_cnf], aw.DUMP_CONFIG_DIR) logging.info("Saving provider configuration to %s", provider_cnf) - provider.save(provider_cnf) + provider_obj.save(provider_cnf) return provider_cnf diff --git a/src/sysmisc.py b/src/sysmisc.py index b4e7d0a..3806b31 100644 --- a/src/sysmisc.py +++ b/src/sysmisc.py @@ -35,7 +35,7 @@ The library exports the symbols below and some custom logging functions. run_cmd_with_pipe Wrapper for the default use case of the cumbersome "subprocess" library. - Acceps a list of arguments that describe the command invocation. Returns + Accepts a list of arguments that describe the command invocation. Returns ``True`` and the contents of ``stdout`` if the pipe returned sucessfully, ``False`` plus ``stderr`` and the exit status otherwise. For example:: @@ -60,12 +60,12 @@ read_linewise import re import sysmisc def parse(line): - if re.match('\d', line): + if re.match(r'\d', line): print('found digits in line!') sysmisc.read_linewise('dump_db', parse) hash_file - Return a hash of a flie. + Return a hash of a file. cheat_reboot Replace the reboot binary with a fake one. @@ -85,8 +85,6 @@ INTERFACE """ -from __future__ import print_function - import re import subprocess import hashlib @@ -100,7 +98,10 @@ import logging llog = logging.getLogger('pyi2ncommon.sysmisc') -__all__ = ("inf", "run_cmd_with_pipe", "get_mountpoints_by_type", "read_linewise", "hash_file", "cheat_reboot", "RUN_RESULT_OK", "RUN_RESULT_TIMEDOUT", "RUN_RESULT_FAIL", "RUN_RESULT_NAME", "cmd_block_till") +__all__ = ("inf", "run_cmd_with_pipe", "get_mountpoints_by_type", + "read_linewise", "hash_file", "cheat_reboot", + "RUN_RESULT_OK", "RUN_RESULT_TIMEDOUT", "RUN_RESULT_FAIL", + "RUN_RESULT_NAME", "cmd_block_till") ############################################################################### @@ -119,7 +120,7 @@ def cd(path): http://code.activestate.com/recipes/576620-changedirectory-context-manager/ (MIT license) - :arg str path: paht to temporarily switch to + :arg str path: path to temporarily switch to """ orig_wd = os.getcwd() os.chdir(path) @@ -182,12 +183,12 @@ def get_mountpoints_by_type(fstype): try: with open(procmounts, "r") as m: lines = list(m) - pat = re.compile("^[^\s]+\s+([^\s]+)\s+" + fstype + "\s+.*$") + pat = re.compile(r"^\S+\s+(\S+)\s+" + fstype + r"\s+.*$") mps = [mp.group(1) for mp in map(lambda l: re.match(pat, l), lines) if mp] - except IOError as e: - raise IOError("Failed to read %s." % procmounts) + except IOError: + raise IOError(f"Failed to read {procmounts}") if not mps: return None return mps @@ -222,9 +223,9 @@ def read_linewise(cmd, func, **kwargs): # if proc.poll() is not None: # break - #rest_output,_ = proc.communicate() - #for line in rest_output: - # func(line) + # rest_output,_ = proc.communicate() + # for line in rest_output: + # func(line) return proc.wait() @@ -303,7 +304,7 @@ def cheat_reboot(): This replaces the ``reboot-intranator`` executable by script which replaces itself by the backed up executable upon the next invocation. """ - #path = utils.system_output("which reboot") + # path = utils.system_output("which reboot") path = "/usr/intranator/bin/reboot-intranator" suffix = uuid.uuid1() backup = backup_fmt % (path, backup_infix, suffix) @@ -336,12 +337,13 @@ def cmd_block_till(cmd, timeout, cond, interval=1, *userdata, **kwuserdata): :param cmd: Command line or callback to execute. Function arguments must have the same signature as :py:func:`run_cmd_with_pipe`. - :type cmd: [str] | types.FunctionType + :type cmd: [str] | types.FunctionType :param int timeout: Blocking timeout - - :returns: Pair of result and error message if appropriate or - :py:value:`None`. - :rtype: (run_result, str | None) + :param cond: Function to call; code will wait for this to return something + other than `False` + :param interval: Time (in seconds) to sleep between each attempt at `cond` + :returns: Pair of result and error message if appropriate or :py:value:`None`. + :rtype: (run_result, str | None) """ llog.debug("cmd_block_till: %r, %d s, %r", cmd, timeout, cond) if isinstance(cmd, types.FunctionType): @@ -395,7 +397,7 @@ def progress(fmt, *args): else: label = "" name = CURRENT_TEST_NAME if isinstance(CURRENT_TEST_NAME, str) else "" - fmt, label = str(fmt), str(label) # yes + fmt, label = str(fmt), str(label) # yes llog.info("[%s%s] %s" % (name, label, fmt), *args) # TODO: this method is more dynamic # llog.info("[%s%s] %s%s" % (LOG_TAG, "", LOG_INDENT*indent, fmt), *args) diff --git a/src/test_data_sync.py b/src/test_data_sync.py index eb7e04a..70652cb 100644 --- a/src/test_data_sync.py +++ b/src/test_data_sync.py @@ -52,7 +52,7 @@ def append_email_id_header(data_dir): if i % 100 == 0: log.info("%i done\n", i) - if not re.match("^[0-9]+\.$", files[i]): + if not re.match(r"^\d+\.$", files[i]): continue if os.path.getsize(file_path) == 0: log.warning("Skipping empty file %s", file_path) diff --git a/src/text_helpers.py b/src/text_helpers.py index a258f43..662c7b3 100644 --- a/src/text_helpers.py +++ b/src/text_helpers.py @@ -30,10 +30,9 @@ Copyright: 2015 Intra2net AG CONTENTS ------------------------------------------------------ This module has two parts. Part 1 includes: - -- head_and_tail: shows the first few and last few elements of an iterable that - could potentially be pretty long -- size_str: textual representation of data size + - head_and_tail: shows the first few and last few elements of an iterable that + could potentially be pretty long + - size_str: textual representation of data size Part2 contains functions for coloring text, a poor-man's version of other modules like :py:mod:`colorclass` (which is now also available on Intra2net @@ -56,10 +55,7 @@ INTERFACE ------------------------------------------------------ """ -try: - from builtins import print as _builtin_print -except ImportError: # different name in py2 - from __builtin__ import print as _builtin_print +from builtins import print as _builtin_print from functools import partial from itertools import islice @@ -138,7 +134,7 @@ def size_str(byte_number, is_diff=False): Rounds and shortens size to something easily human-readable like '1.5 GB'. - :param int byte_number: Number of bytes to express as text + :param float byte_number: Number of bytes to express as text :param bool is_diff: Set to True to include a '+' or '-' in output; default: False :returns: textual representation of data @@ -170,8 +166,7 @@ def size_str(byte_number, is_diff=False): # have an impossible amount of data. (>1024**4 GB) # undo last "/factor" and show thousand-separator - return '{2}{0:,d} {1}B'.format(int(round(curr_num*factor)), units[-1], - sign_str) + return f'{sign_str}{int(round(curr_num*factor)):,d} {units[-1]}B' ############################################################################### @@ -194,7 +189,7 @@ STYLE_BLINK = 5 STYLE_REVERSE = 7 -_COLOR_TO_CODE = dict(zip([COLOR_BLACK, COLOR_RED, COLOR_GREEN, COLOR_YELLOW, \ +_COLOR_TO_CODE = dict(zip([COLOR_BLACK, COLOR_RED, COLOR_GREEN, COLOR_YELLOW, COLOR_BLUE, COLOR_MAGENTA, COLOR_CYAN, COLOR_WHITE], range(8))) @@ -209,13 +204,13 @@ except Exception: def colored(text, foreground=None, background=None, style=None): - """ return text with given foreground/background ANSI color escape seqence + """ return text with given foreground/background ANSI color escape sequence :param str text: text to color :param str style: one of the style constants above :param str foreground: one of the color constants to use for text color or None to leave as-is - :param str foreground: one of the color constants to use for background + :param str background: one of the color constants to use for background or None to leave as-is :param style: single STYLE constant or iterable of those or None to leave as-is diff --git a/src/type_helpers.py b/src/type_helpers.py index eb74968..5a26725 100644 --- a/src/type_helpers.py +++ b/src/type_helpers.py @@ -19,49 +19,35 @@ # Copyright (c) 2016-2018 Intra2net AG """ -Helpers for type checking and conversion, like isstr(x), is_file_obj(x) +Helpers for type checking and conversion, like `isstr(x)`, `is_file_obj(x)` + +Some of these things used to be more complicated when python2 was still +around, so some functions are not that useful anymore. """ -import sys from io import IOBase def isstr(var): - """ determines if the given var is a (regular/unicode/raw) string or not - - in python2, u'a' is not a subclass of str, so to get a True as result, you - have to test for basestring as superclass. - - In python3 that is no longer the case - - @returns True if the input is a regular string, a unicode or a raw string, - False for everything else (including byte literals like b'abc') - - For more complex py2/py3 compatibility issues, consider using six - (https://pythonhosted.org/six) - """ - + """Alias for `isinstance(var, str)`.""" return isinstance(var, str) def is_str_or_byte(var): - """ returns true for str, unicode and byte objects """ + """Returns `True` for `str` and `bytes` objects.""" return isinstance(var, (str, bytes)) def is_unicode(var): - """ returns true for unicode strings - - py2: return True for type unicode but not type str - py3: return True for type str but not type bytes - """ + """Alias for `isinstance(var, str)`.""" return isinstance(var, str) def is_file_obj(var): - """ determines whether given input is the result of 'open(file_name)' + """ + Determine whether given input is the result of 'open(file_name)'. - just checks whether given var is subclass of io.IOBase, which is also True - for 'file-like objects' like StringIO + Just checks whether given var is subclass of io.IOBase, which is also True + for 'file-like objects' like StringIO. """ return isinstance(var, IOBase) diff --git a/src/v4_addr_range.py b/src/v4_addr_range.py index 53493f8..e9769d5 100644 --- a/src/v4_addr_range.py +++ b/src/v4_addr_range.py @@ -82,8 +82,7 @@ class V4_addr_range: def fix_range(self): if self.val_range <= 0: - raise TypeError("IP address ranges need to be natural numbers > 0." - % (self.val_lo, self.val_range)) + raise TypeError("IP address ranges need to be natural numbers > 0.") hi = self.val_lo + self.val_range if hi <= self.val_lo or hi > uint32_max: raise ValueError("Invalid IP address range: %d+%d." diff --git a/src/web_interface.py b/src/web_interface.py index 011ac91..e4e401d 100644 --- a/src/web_interface.py +++ b/src/web_interface.py @@ -38,9 +38,10 @@ import http.client as client import urllib.parse as parse import socket import logging +from .arnied_wrapper import accept_licence + log = logging.getLogger('pyi2ncommon.web_interface') -from .arnied_wrapper import accept_licence #: FQDN of local machine LOCALHOST = socket.gethostname() @@ -48,9 +49,10 @@ LOCALHOST = socket.gethostname() def find_in_form(regex, form="status", escape=False, check_certs=True): """ - Find a regex in I2N web page's status frame. + Find a regex in given I2N web page form. :param str regex: regular expression to find + :param str form: form name to open :param bool escape: whether to escape the regex :param check_certs: forwarded to :py:func:`web_page_request`, see doc there :returns: whether the regex was found diff --git a/src/zip_stream.py b/src/zip_stream.py index d4adf4a..ea4da6b 100644 --- a/src/zip_stream.py +++ b/src/zip_stream.py @@ -36,48 +36,12 @@ Use as follows:: .. codeauthor:: Intra2net AG """ -import sys -import os - -if sys.version_info.major < 3: - raise ImportError('Did not backport zipfile from python 3.5 to py2') -if sys.version_info.minor >= 6: - # imports for _write_stream_36 - import shutil -else: - # imports for create_zipinfo, _write_stream_35 and _get_compressor - from stat import S_ISDIR - import time - import zlib - crc32 = zlib.crc32 - import bz2 - import struct -if sys.version_info.minor >= 5: - from zipfile import * -else: - # backport of zipfile from python 3.5 to support stream output - from zipfile35 import * +import shutil +from zipfile import * from .type_helpers import isstr -# copied from zipfile.py -ZIP64_LIMIT = (1 << 31) - 1 - - -def _get_compressor(compress_type): - """Copied fomr zipfile.py in py3.5 (cannot legally import)""" - if compress_type == ZIP_DEFLATED: - return zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, - zlib.DEFLATED, -15) - elif compress_type == ZIP_BZIP2: - return bz2.BZ2Compressor() - elif compress_type == ZIP_LZMA: - return LZMACompressor() - else: - return None - - class BytesTellWrapper: """ Wrapper around a write-only stream that supports tell but not seek @@ -132,36 +96,8 @@ class ZipStream(ZipFile): Create ZipInfo for given file Optionally set arcname as name of file inside archive. - - Adapted from zipfile.py in (ZipInfo.from_file in py3.6, ZipFile.write - in py3.5) """ - if sys.version_info.major >= 3 and sys.version_info.minor >= 6: - return ZipInfo.from_file(filename, arcname) - - st = os.stat(filename) - isdir = S_ISDIR(st.st_mode) - mtime = time.localtime(st.st_mtime) - date_time = mtime[0:6] - # Create ZipInfo instance to store file information - if arcname is None: - arcname = filename - arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) - while arcname[0] in (os.sep, os.altsep): - arcname = arcname[1:] - if isdir: - arcname += '/' - zinfo = ZipInfo(arcname, date_time) - zinfo.external_attr = (st.st_mode & 0xFFFF) << 16 # Unix attributes - if isdir: - zinfo.compress_type = ZIP_STORED - zinfo.file_size = 0 - zinfo.external_attr |= 0x10 # MS-DOS directory flag - else: - zinfo.compress_type = self.compression - zinfo.file_size = st.st_size - - return zinfo + return ZipInfo.from_file(filename, arcname) def write_stream(self, src, zinfo): """ @@ -176,69 +112,6 @@ class ZipStream(ZipFile): This is a shortened version of python's :py:func:`zipfile.ZipFile.write`. """ - if sys.version_info.major >= 3 and sys.version_info.minor >= 6: - return self._write_stream_36(src, zinfo) - else: - return self._write_stream_35(src, zinfo) - - def _write_stream_35(self, src, zinfo): - """Implementation of _write_stream based on ZipFile.write (py 3.5)""" - if not self.fp: - raise RuntimeError( - "Attempt to write to ZIP archive that was already closed") - - zinfo.flag_bits = 0x00 - - with self._lock: - zinfo.header_offset = self.fp.tell() # Start of header bytes - if zinfo.compress_type == ZIP_LZMA: - # Compressed data includes an end-of-stream (EOS) marker - zinfo.flag_bits |= 0x02 - - self._writecheck(zinfo) - self._didModify = True - - cmpr = _get_compressor(zinfo.compress_type) - zinfo.flag_bits |= 0x08 - - # Must overwrite CRC and sizes with correct data later - zinfo.CRC = CRC = 0 - zinfo.compress_size = compress_size = 0 - # Compressed size can be larger than uncompressed size - zip64 = self._allowZip64 and \ - zinfo.file_size * 1.05 > ZIP64_LIMIT - self.fp.write(zinfo.FileHeader(zip64)) - file_size = 0 - while 1: - buf = src.read(1024 * 8) - if not buf: - break - file_size = file_size + len(buf) - CRC = crc32(buf, CRC) & 0xffffffff - if cmpr: - buf = cmpr.compress(buf) - compress_size = compress_size + len(buf) - self.fp.write(buf) - if cmpr: - buf = cmpr.flush() - compress_size = compress_size + len(buf) - self.fp.write(buf) - zinfo.compress_size = compress_size - else: - zinfo.compress_size = file_size - zinfo.CRC = CRC - zinfo.file_size = file_size - - # Write CRC and file sizes after the file data - fmt = ' """ -import sys import unittest import logging from tempfile import mkstemp diff --git a/test/test_log_read.py b/test/test_log_read.py index f5ebe61..28576a4 100644 --- a/test/test_log_read.py +++ b/test/test_log_read.py @@ -34,13 +34,7 @@ from warnings import warn from src.log_read import IterativeReader, LineReader, LogReadWarning # get best clock -from sys import version_info -if version_info.major == 2: - raise NotImplementedError('pyi2ncommon is no longer compatible with py2') -elif version_info.minor < 4: - perf_counter = time.clock -else: - perf_counter = time.perf_counter +perf_counter = time.perf_counter DEBUG = False diff --git a/test/test_type_helpers.py b/test/test_type_helpers.py index c9cb289..c32277f 100644 --- a/test/test_type_helpers.py +++ b/test/test_type_helpers.py @@ -20,7 +20,7 @@ """ type_helper_unittest.py: unit tests for type_helpers -Tests classes and functions in type_helpers +Test classes and functions in type_helpers Should be run from python2 and python3! @@ -28,23 +28,13 @@ For help see :py:mod:`unittest` """ import unittest - -from src.type_helpers import is_unicode, isstr from sys import version_info +from src.type_helpers import is_unicode, isstr -is_py2 = version_info.major == 2 class TypeHelperTester(unittest.TestCase): - def test_is_py2_or_py3(self): - """ test that python version is 2 or 3 - - when implementing type_helpers, there was no py4 and no idea what it - might be like. Also will probably fail for python v1 - """ - self.assertIn(version_info.major, (2, 3)) - def test_is_unicode(self): """ tests function is_unicode """ @@ -53,25 +43,18 @@ class TypeHelperTester(unittest.TestCase): self.assertFalse(is_unicode(unittest.TestCase)) self.assertFalse(is_unicode(type(unittest.TestCase))) - if is_py2: - self.assertTrue(is_unicode(u'bla')) - self.assertTrue(eval("is_unicode(ur'bla')")) # not allowed in py3! - self.assertFalse(is_unicode('bla')) - self.assertFalse(is_unicode(r'bla')) - else: - self.assertTrue(is_unicode('bla')) - self.assertTrue(is_unicode(r'bla')) - self.assertFalse(is_unicode(b'bla')) - self.assertFalse(is_unicode(br'bla')) + self.assertTrue(is_unicode('bla')) + self.assertTrue(is_unicode(r'bla')) + self.assertFalse(is_unicode(b'bla')) + self.assertFalse(is_unicode(br'bla')) def test_isstr(self): """ test function isstr """ tests = [ ('abc', True), (u'abc', True), (r'abc', True), - (1, False), (['a', 'b', 'c'], False), (('a', 'b', 'c'), False)] - if not is_py2: - tests.append((b'abc', False)) # b'' does not exist in py2 + (1, False), (['a', 'b', 'c'], False), (('a', 'b', 'c'), False), + (b'abc', False)] for test_var, expected_result in tests: self.assertEqual(isstr(test_var), expected_result, diff --git a/test/test_v4_addr_range.py b/test/test_v4_addr_range.py index d133c07..c0d2330 100644 --- a/test/test_v4_addr_range.py +++ b/test/test_v4_addr_range.py @@ -19,8 +19,6 @@ # # Copyright (c) 2016-2018 Intra2net AG -from __future__ import absolute_import - import unittest from src import v4_addr_range diff --git a/test/test_zip_stream.py b/test/test_zip_stream.py index f915d42..c497ed6 100644 --- a/test/test_zip_stream.py +++ b/test/test_zip_stream.py @@ -18,19 +18,11 @@ """ test_zip_stream.py: unit tests for zip_stream -Tests classes and functions in :py:mod:`pyi2ncommon.zip_stream`. - -Only runs in python3 since zip_stream is not py2-compatible (see comment in -module doc there) - -For help see :py:mod:`unittest` +Test classes and functions in :py:mod:`pyi2ncommon.zip_stream`. .. codeauthor:: Intra2net """ -from __future__ import print_function -from __future__ import absolute_import - import unittest from tempfile import mkdtemp import os @@ -68,7 +60,7 @@ class BytesIONoSeekNorRead(io.BytesIO): def readable(self): return False - def seek(sel, *args): + def seek(self, *args): raise AttributeError('this function was removed') def readline(self, *args): @@ -127,7 +119,7 @@ class ZipStreamTester(unittest.TestCase): def _test_zip(self, test_subdir=False): """Helper for test_* functions: check given zip for contents""" - expect_contents = set((TEXT_FILE, BIN_FILE)) + expect_contents = {TEXT_FILE, BIN_FILE} if test_subdir: expect_contents.add(SUBDIR + '/')