author='Intra2net AG',
author_email='info@intra2net.com',
url='http://www.intra2net.com',
- packages=['pyi2ncommon', 'pyi2ncommon.cnfline', 'pyi2ncommon.cnfvar'],
+ packages=['pyi2ncommon', 'pyi2ncommon.cnfvar'],
package_dir={'pyi2ncommon': 'src'},
license_files=('COPYING.GPL', 'Linking-Exception.txt'),
license='GPLv2 + linking exception',
SUMMARY
------------------------------------------------------
-Guest utility to wrap arnied related functionality through python calls.
+Interaction with central arnied daemon.
-.. note:: Partially DEPRECATED! Use :py:mod:`pyi2ncommon.arnied_api` or
- :py:mod:`pyi2ncommon.cnfvar` whenever possible. In particluar, do not
- use or extend functionality regarding configuration (`get_cnf`,
- `set_cnf`).
+All functions (except :py:func:`schedule` result in calling a binary
+(either :py:data:`BIN_ARNIED_HELPER` or *tell-connd*).
-Copyright: Intra2net AG
-
-There are three types of setting some cnfvar configuration:
+For changes of configuration (*set_cnf*, *get_cnf*), refer to :py:mod:`pyi2ncommon.cnfvar`.
-1) static (:py:class:`set_cnf`) - oldest method using a static preprocessed
- config file without modifying its content in any way
-2) semi-dynamic (:py:class:`set_cnf_semidynamic`) - old method also using
- static file but rather as a template, replacing regex-matched values to
- adapt it to different configurations
-3) dynamic (:py:class:`set_cnf_dynamic`) - new method using dictionaries
- and custom cnfvar classes and writing them into config files of a desired
- format (json, cnf, or raw)
+Copyright: Intra2net AG
INTERFACE
import logging
log = logging.getLogger('pyi2ncommon.arnied_wrapper')
-from .cnfline import build_cnfvar
-from . import cnfvar_old
-from . import sysmisc
-
-
-#: default set_cnf binary
-BIN_SET_CNF = "/usr/intranator/bin/set_cnf"
#: default arnied_helper binary
BIN_ARNIED_HELPER = "/usr/intranator/bin/arnied_helper"
-#: default location for template configuration files
-SRC_CONFIG_DIR = "."
-#: default location for dumped configuration files
-DUMP_CONFIG_DIR = "."
-
-
-class ConfigError(Exception):
- pass
def run_cmd(cmd="", ignore_errors=False, vm=None, timeout=60):
# Basic functionality
-def accept_licence(vm=None):
- """
- Accept the Intra2net license.
-
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
-
- This is mostly useful for simplified webpage access.
- """
- cmd = 'echo "LICENSE_ACCEPTED,0: \\"1\\"" | set_cnf'
- result = run_cmd(cmd=cmd, ignore_errors=True, vm=vm)
- log.debug(result)
- wait_for_generate(vm=vm)
-
-
def go_online(provider_id, wait_online=True, timeout=60, vm=None):
"""
Go online with the given provider id.
"""
log.info("Switching to online mode with provider %d", provider_id)
- get_cnf_res = run_cmd(cmd='get_cnf PROVIDER %d' % provider_id, vm=vm)
- if b'PROVIDER,' not in get_cnf_res.stdout:
- log.warning('There is no PROVIDER %d on the vm. Skipping go_online.',
- provider_id)
- return
-
cmd = 'tell-connd --online P%i' % provider_id
result = run_cmd(cmd=cmd, vm=vm)
log.debug(result)
.format(status, timeout))
-def disable_virscan(vm=None):
- """
- Disable virscan that could block GENERATE and thus all configurations.
-
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- """
- log.info("Disabling virus database update")
- unset_cnf("VIRSCAN_UPDATE_CRON", vm=vm)
-
- cmd = "echo 'VIRSCAN_UPDATE_DNS_PUSH,0:\"0\"' |set_cnf"
- result = run_cmd(cmd=cmd, vm=vm)
- log.debug(result)
-
- # TODO: this intervention should be solved in later arnied_helper tool
- cmd = "rm -f /var/intranator/schedule/UPDATE_VIRSCAN_NODIAL*"
- result = run_cmd(cmd=cmd, vm=vm)
- log.debug(result)
- log.info("Virus database update disabled")
-
-
def email_transfer(vm=None):
"""
Transfer all the emails using the guest tool arnied_helper.
tmp_file = tempfile.NamedTemporaryFile(mode="w+",
prefix=program.upper() + "_",
- dir=DUMP_CONFIG_DIR,
delete=False)
log.debug("Created temporary file %s", tmp_file.name)
tmp_file.write(contents)
# Configuration functionality
-def get_cnf(cnf_key, cnf_index=1, regex=".*", compact=False, timeout=30, vm=None):
- """
- Query arnied for a `cnf_key` and extract some information via regex.
-
- :param str cnf_key: queried cnf key
- :param int cnf_index: index of the cnf key
- :param str regex: regex to apply on the queried cnf key data
- :param bool compact: whether to retrieve compact version of the matched cnf keys
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :returns: extracted information via the regex
- :rtype: Match object
-
- If `cnf_index` is set to -1, retrieve and perform regex matching on all instances.
- """
- wait_for_arnied(timeout=timeout, vm=vm)
- platform_str = ""
- if vm is not None:
- platform_str = " from %s" % vm.name
- log.info("Extracting arnied value %s for %s%s using pattern %s",
- cnf_index, cnf_key, platform_str, regex)
- cmd = "get_cnf%s %s%s" % (" -c " if compact else "", cnf_key,
- " %s" % cnf_index if cnf_index != -1 else "")
- # get_cnf creates latin1-encoded output, transfer from VM removes non-ascii
- output = run_cmd(cmd=cmd, vm=vm).stdout.decode('latin1')
- return re.search(regex, output, flags=re.DOTALL)
-
-
-def get_cnf_id(cnf_key, value, timeout=30, vm=None):
- """
- Get the id of a configuration of type `cnf_key` and name `value`.
-
- :param str cnf_key: queried cnf key
- :param str value: cnf value of the cnf key
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :returns: the cnf id or -1 if no such cnf variable
- :rtype: int
- """
- wait_for_arnied(timeout=timeout, vm=vm)
- regex = "%s,(\d+): \"%s\"" % (cnf_key, value)
- cnf_id = get_cnf(cnf_key, cnf_index=-1, regex=regex, compact=True, vm=vm)
- if cnf_id is None:
- cnf_id = -1
- else:
- cnf_id = int(cnf_id.group(1))
- log.info("Retrieved id \"%s\" for %s is %i", value, cnf_key, cnf_id)
- return cnf_id
-
-
-def get_cnfvar(varname=None, instance=None, data=None, timeout=30, vm=None):
- """
- Invoke get_cnf and return a nested CNF structure.
-
- :param str varname: "varname" field of the CNF_VAR to look up
- :param instance: "instance" of that variable to return
- :type instance: int
- :param str data: "data" field by which the resulting CNF_VAR list should be filtered
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :returns: the resulting "cnfvar" structure or None if the lookup fails or the result could not be parsed
- :rtype: cnfvar option
- """
- wait_for_arnied(timeout=timeout, vm=vm)
- # firstly, build argv for get_cnf
- cmd = ["get_cnf", "-j"]
- if varname is not None:
- cmd.append("%s" % varname)
- if instance:
- cmd.append("%d" % instance)
- cmd_line = " ".join(cmd)
-
- # now invoke get_cnf
- result = run_cmd(cmd=cmd_line, vm=vm)
- (status, raw) = result.returncode, result.stdout
- if status != 0:
- log.info("error %d executing \"%s\"", status, cmd_line)
- log.debug(raw)
- return None
-
- # reading was successful, attempt to parse what we got
- try:
- # The output from "get_cnf -j" is already utf-8. This contrast with
- # the output of "get_cnf" (no json) which is latin1.
- if isinstance(raw, bytes):
- raw = raw.decode("utf-8")
- cnf = cnfvar_old.read_cnf_json(raw)
- except TypeError as exn:
- log.info("error \"%s\" parsing result of \"%s\"", exn, cmd_line)
- return None
- except cnfvar_old.InvalidCNF as exn:
- log.info("error \"%s\" validating result of \"%s\"", exn, cmd_line)
- return None
-
- if data is not None:
- return cnfvar_old.get_vars(cnf, data=data)
-
- return cnf
-
-
-def get_cnfvar_id(varname, data, timeout=30, vm=None):
- """
- Similar to :py:func:`get_cnf_id` but uses :py:func:`get_cnfvar`.
-
- :param str varname: "varname" field of the CNF_VAR to look up
- :param str data: "data" field by which the resulting CNF_VAR list should be filtered
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :returns: the cnf id or -1 if no such cnf variable
- :rtype: int
- """
- wait_for_arnied(timeout=timeout, vm=vm)
- log.info("Extracting from arnied CNF_VAR %s with data %s",
- varname, data)
- cnf = get_cnfvar(varname=varname, data=data, vm=vm)
- variables = cnf["cnf"]
- if len(variables) == 0:
- log.info("CNF_VAR extraction unsuccessful, defaulting to -1")
- # preserve behavior
- return -1
- first_instance = int(variables[0]["instance"])
- log.info("CNF_VAR instance lookup yielded %d results, returning first value (%d)",
- len(variables), first_instance)
- return first_instance
-
def wait_for_generate(timeout=300, vm=None):
"""
"""
wait_for_run('generate', timeout=timeout, retries=1, vm=vm)
wait_for_run('generate_offline', timeout=timeout, retries=1, vm=vm)
-
-
-def unset_cnf(varname="", instance="", timeout=30, vm=None):
- """
- Remove configuration from arnied.
-
- :param str varname: "varname" field of the CNF_VAR to unset
- :param int instance: "instance" of that variable to unset
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- """
- wait_for_arnied(timeout=timeout, vm=vm)
-
- cmd = "get_cnf %s %s | set_cnf -x" % (varname, instance)
- run_cmd(cmd=cmd, vm=vm)
-
- wait_for_generate(vm=vm)
-
-
-def set_cnf(config_files, kind="cnf", timeout=30, vm=None):
- """
- Perform static arnied configuration through a set of config files.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param str kind: "json" or "cnf"
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :raises: :py:class:`ConfigError` if cannot apply file
-
- The config files must be provided and are always expected to be found on
- the host. If these are absolute paths, they will be kept as is or
- otherwise will be searched for in `SRC_CONFIG_DIR`. If a vm is provided,
- the config files will be copied there as temporary files before applying.
- """
- log.info("Setting arnied configuration")
- wait_for_arnied(timeout=timeout, vm=vm)
-
- config_paths = prep_config_paths(config_files)
- for config_path in config_paths:
- with open(config_path, "rt", errors='replace') as config:
- log.debug("Contents of applied %s:\n%s", config_path, config.read())
- if vm is not None:
- new_config_path = generate_config_path()
- vm.copy_files_to(config_path, new_config_path)
- config_path = new_config_path
- argv = ["set_cnf", kind == "json" and "-j" or "", config_path]
-
- result = run_cmd(" ".join(argv), ignore_errors=True, vm=vm)
- logging.debug(result)
- if result.returncode != 0:
- raise ConfigError("Failed to apply config %s%s, set_cnf returned %d"
- % (config_path,
- " on %s" % vm.name if vm is not None else "",
- result.returncode))
-
- try:
- wait_for_generate(vm=vm)
- except Exception as ex:
- # handle cases of remote configuration that leads to connection meltdown
- if vm is not None and isinstance(ex, sys.modules["aexpect"].ShellProcessTerminatedError):
- log.info("Resetting connection to %s", vm.name)
- vm.session = vm.wait_for_login(timeout=10)
- log.debug("Connection reset via remote error: %s", ex)
- else:
- raise ex
-
-
-def set_cnf_semidynamic(config_files, params_dict, regex_dict=None,
- kind="cnf", timeout=30, vm=None):
- """
- Perform semi-dynamic arnied configuration from an updated version of the
- config files.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param params_dict: parameters to override the defaults in the config files
- :type params_dict: {str, str}
- :param regex_dict: regular expressions to use for matching the overriden parameters
- :type regex_dict: {str, str} or None
- :param str kind: "json" or "cnf"
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
-
- The config files must be provided and are always expected to be found on
- the host. If these are absolute paths, they will be kept as is or
- otherwise will be searched for in `SRC_CONFIG_DIR`. If a vm is provided,
- the config files will be copied there as temporary files before applying.
- """
- log.info("Performing semi-dynamic arnied configuration")
-
- config_paths = prep_cnf(config_files, params_dict, regex_dict)
- set_cnf(config_paths, kind=kind, timeout=timeout, vm=vm)
-
- log.info("Semi-dynamic arnied configuration successful!")
-
-
-def set_cnf_dynamic(cnf, config_file=None, kind="cnf", timeout=30, vm=None):
- """
- Perform dynamic arnied configuration from fully generated config files.
-
- :param cnf: one key with the same value as *kind* and a list of cnfvars as value
- :type cnf: {str, str}
- :param config_file: optional user supplied filename
- :type config_file: str or None
- :param str kind: "json", "cnf", or "raw"
- :param int timeout: arnied run verification timeout
- :param vm: vm to run on if running on a guest instead of the host
- :type vm: :py:class:`virttest.qemu_vm.VM` or None
- :raises: :py:class:`ValueError` if `kind` is not an acceptable value
- :raises: :py:class:`ConfigError` if cannot apply file
-
- The config file might not be provided in which case a temporary file will
- be generated and saved on the host's `DUMP_CONFIG_DIR` of not provided as
- an absolute path. If a vm is provided, the config file will be copied there
- as a temporary file before applying.
- """
- if config_file is None:
- config_path = generate_config_path(dumped=True)
- elif os.path.isabs(config_file):
- config_path = config_file
- else:
- config_path = os.path.join(os.path.abspath(DUMP_CONFIG_DIR), config_file)
- generated = config_file is None
- config_file = os.path.basename(config_path)
- log.info("Using %s cnf file %s%s",
- "generated" if generated else "user-supplied",
- config_file, " on %s" % vm.name if vm is not None else "")
-
- # Important to write bytes here to ensure text is encoded with latin-1
- fd = open(config_path, "wb")
- try:
- SET_CNF_METHODS = {
- "raw": cnfvar_old.write_cnf_raw,
- "json": cnfvar_old.write_cnf_json,
- "cnf": cnfvar_old.write_cnf
- }
- SET_CNF_METHODS[kind](cnf, out=fd)
- except KeyError:
- raise ValueError("Invalid set_cnf method \"%s\"; expected \"json\" or \"cnf\""
- % kind)
- finally:
- fd.close()
- log.info("Generated config file %s", config_path)
-
- kind = "cnf" if kind != "json" else kind
- set_cnf([config_path], kind=kind, timeout=timeout, vm=vm)
-
-
-def set_cnf_pipe(cnf, timeout=30, block=False):
- """
- Set local configuration by talking to arnied via ``set_cnf``.
-
- :param cnf: one key with the same value as *kind* and a list of cnfvars as value
- :type cnf: {str, str}
- :param int timeout: arnied run verification timeout
- :param bool block: whether to wait for generate to complete the
- configuration change
- :returns: whether ``set_cnf`` succeeded or not
- :rtype: bool
-
- This is obviously not generic but supposed to be run on the guest.
- """
- log.info("Setting arnied configuration through local pipe")
- wait_for_arnied(timeout=timeout)
-
- st, out, exit = sysmisc.run_cmd_with_pipe([BIN_SET_CNF, "-j"], inp=str(cnf))
-
- if st is False:
- log.error("Error applying configuration; status=%r" % exit)
- log.error("and stderr:\n%s" % out)
- return False
- log.debug("Configuration successfully passed to set_cnf, "
- "read %d B from pipe" % len(out))
-
- if block is True:
- log.debug("Waiting for config job to complete")
- wait_for_generate()
-
- log.debug("Exiting sucessfully")
- return True
-
-
-def prep_config_paths(config_files, config_dir=None):
- """
- Prepare absolute paths for all configs at an expected location.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param config_dir: config directory to prepend to the filepaths
- :type config_dir: str or None
- :returns: list of the full config paths
- :rtype: [str]
- """
- if config_dir is None:
- config_dir = SRC_CONFIG_DIR
- config_paths = []
- for config_file in config_files:
- if os.path.isabs(config_file):
- # Absolute path: The user requested a specific file
- # f.e. needed for dynamic arnied config update
- config_path = config_file
- else:
- config_path = os.path.join(os.path.abspath(config_dir),
- config_file)
- logging.debug("Using %s for original path %s", config_path, config_file)
- config_paths.append(config_path)
- return config_paths
-
-
-def prep_cnf_value(config_file, value,
- regex=None, template_key=None, ignore_fail=False):
- """
- Replace value in a provided arnied config file.
-
- :param str config_file: file to use for the replacement
- :param str value: value to replace the first matched group with
- :param regex: regular expression to use when replacing a cnf value
- :type regex: str or None
- :param template_key: key of a quick template to use for the regex
- :type template_key: str or None
- :param bool ignore_fail: whether to ignore regex mismatching
- :raises: :py:class:`ValueError` if (also default) `regex` doesn't have a match
-
- In order to ensure better matching capabilities you are supposed to
- provide a regex pattern with at least one subgroup to match your value.
- What this means is that the value you like to replace is not directly
- searched into the config text but matched within a larger regex in
- in order to avoid any mismatch.
-
- Example:
- provider.cnf, 'PROVIDER_LOCALIP,0: "(\d+)"', 127.0.0.1
- """
- if template_key is None:
- pattern = regex.encode()
- else:
- samples = {"provider": 'PROVIDER_LOCALIP,\d+: "(\d+\.\d+\.\d+\.\d+)"',
- "global_destination_addr": 'SPAMFILTER_GLOBAL_DESTINATION_ADDR,0: "bounce_target@(.*)"'}
- pattern = samples[template_key].encode()
-
- with open(config_file, "rb") as file_handle:
- text = file_handle.read()
- match_line = re.search(pattern, text)
-
- if match_line is None and not ignore_fail:
- raise ValueError("Pattern %s not found in %s" % (pattern, config_file))
- elif match_line is not None:
- old_line = match_line.group(0)
- text = text[:match_line.start(1)] + value.encode() + text[match_line.end(1):]
- line = re.search(pattern, text).group(0)
- log.debug("Updating %s to %s in %s", old_line, line, config_file)
- with open(config_file, "wb") as file_handle:
- file_handle.write(text)
-
-
-def prep_cnf(config_files, params_dict, regex_dict=None):
- """
- Update all config files with the default overriding parameters,
- i.e. override the values hard-coded in those config files.
-
- :param config_files: config files to use for the configuration
- :type config_files: [str]
- :param params_dict: parameters to override the defaults in the config files
- :type params_dict: {str, str}
- :param regex_dict: regular expressions to use for matching the overriden parameters
- :type regex_dict: {str, str} or None
- :returns: list of prepared (modified) config paths
- :rtype: [str]
- """
- log.info("Preparing %s template config files", len(config_files))
-
- src_config_paths = prep_config_paths(config_files)
- new_config_paths = []
- for config_path in src_config_paths:
- new_config_path = generate_config_path(dumped=True)
- shutil.copy(config_path, new_config_path)
- new_config_paths.append(new_config_path)
-
- for config_path in new_config_paths:
- for param_key in params_dict.keys():
- if regex_dict is None:
- regex_val = "\s+%s,\d+: \"(.*)\"" % param_key.upper()
- elif param_key in regex_dict.keys():
- regex_val = regex_dict[param_key] % param_key.upper()
- elif re.match("\w*_\d+$", param_key):
- final_parameter, parent_id = \
- re.match("(\w*)_(\d+)$", param_key).group(1, 2)
- regex_val = "\(%s\) %s,\d+: \"(.*)\"" \
- % (parent_id, final_parameter.upper())
- log.debug("Requested regex for %s is '%s'",
- param_key, regex_val)
- else:
- regex_val = "\s+%s,\d+: \"(.*)\"" % param_key.upper()
- prep_cnf_value(config_path, params_dict[param_key],
- regex=regex_val, ignore_fail=True)
- log.info("Prepared template config file %s", config_path)
-
- return new_config_paths
-
-
-def generate_config_path(dumped=False):
- """
- Generate path for a temporary config name.
-
- :param bool dumped: whether the file should be in the dump
- directory or in temporary directory
- :returns: generated config file path
- :rtype: str
- """
- dir = os.path.abspath(DUMP_CONFIG_DIR) if dumped else None
- fd, filename = tempfile.mkstemp(suffix=".cnf", dir=dir)
- os.close(fd)
- os.unlink(filename)
- return filename
-
-
-# enum
-Delete = 0
-Update = 1
-Add = 2
-Child = 3
-
-
-def batch_update_cnf(cnf, vars):
- """
- Perform a batch update of multiple cnf variables.
-
- :param cnf: CNF variable to update
- :type cnf: BuildCnfVar object
- :param vars: tuples of enumerated action and subtuple with data
- :type vars: [(int, (str, int, str))]
- :returns: updated CNF variable
- :rtype: BuildCnfVar object
-
- The actions are indexed in the same order: delete, update, add, child.
- """
- last = 0
- for (action, data) in vars:
- if action == Update:
- var, ref, val = data
- last = cnf.update_cnf(var, ref, val)
- elif action == Add:
- var, ref, val = data
- last = cnf.add_cnf(var, ref, val)
- elif action == Delete:
- last = cnf.del_cnf(data)
- elif action == Child: # only one depth supported
- var, ref, val = data
- # do not update last
- cnf.add_cnf(var, ref, val, different_parent_line_no=last)
- return cnf
-
-
-def build_cnf(kind, instance=0, vals=[], data="", filename=None):
- """
- Build a CNF variable and save it in a config file.
-
- :param str kind: name of the CNF variable
- :param int instance: instance number of the CNF variable
- :param vals: tuples of enumerated action and subtuple with data
- :type vals: [(int, (str, int, str))]
- :param str data: data for the CNF variable
- :param filename: optional custom name of the config file
- :type filename: str or None
- :returns: name of the saved config file
- :rtype: str
- """
- builder = build_cnfvar.BuildCnfVar(kind, instance=instance, data=data)
- batch_update_cnf(builder, vals)
- filename = generate_config_path(dumped=True) if filename is None else filename
- [filename] = prep_config_paths([filename], DUMP_CONFIG_DIR)
- builder.save(filename)
- return filename
-
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""Basic functions for on-the-fly arnied cnf-var generator.
-
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from .cnfline import CnfLine
-
-
-class BuildCnfVar(object):
- """Basic one the fly arnied cnfvar generator"""
- def __init__(self, name, instance=0, data='', line_no=1):
- self._parent = CnfLine(name, instance, data, line_no)
- self._children = {} # key: lineno, value: CnfLine
-
- def __getattr__(self, name):
- # Redirect all unknown attributes to "parent" cnfline
- return getattr(self._parent, name)
-
- def find_child_line_no(self, name):
- """Look for child line number of child NAME"""
- for lineno, cnfline in self._children.items():
- if cnfline.name == name:
- return lineno
-
- # Not found
- return 0
-
- def find_free_line_no(self):
- """Find highest currently in use line number"""
- highest_line_no = self.line_no
-
- for line_no, unused in self._children.items():
- if line_no > highest_line_no:
- highest_line_no = line_no
-
- return highest_line_no+1
-
- def find_free_child_instance(self, name):
- """Find next free child instance of type NAME"""
- highest_instance = -1
-
- for unused, cnfline in self._children.items():
- if cnfline.name == name and cnfline.instance > highest_instance:
- highest_instance = cnfline.instance
-
- return highest_instance+1
-
- def update_cnf(self,
- name,
- instance,
- data,
- different_parent_line_no=0,
- force_append=False):
- """Update existing cnfline or create new one"""
- if not force_append:
- child_line_no = self.find_child_line_no(name)
- else:
- child_line_no = 0
-
- if child_line_no == 0:
- child_line_no = self.find_free_line_no()
-
- if instance == -1:
- instance = self.find_free_child_instance(name)
-
- parent_line_no = self._parent.line_no
- if different_parent_line_no:
- parent_line_no = different_parent_line_no
-
- new_line = CnfLine(name,
- instance,
- data,
- child_line_no,
- parent_line_no)
-
- self._children[child_line_no] = new_line
- return child_line_no
-
- def mark_as_own_parent(self, child_line_no):
- """Remove parent <-> child relationship for
- a given cnf line. We use this heavily
- for the *configure_xxx.py* files"""
- self._children[child_line_no].parent_line_no = 0
-
- def add_cnf(self,
- name,
- instance,
- data,
- different_parent_line_no=0):
- return self.update_cnf(name,
- instance,
- data,
- different_parent_line_no,
- force_append=True)
-
- def del_cnf(self, name):
- """Delete cnfline with name"""
- for lineno, cnfline in list(self._children.items()):
- if cnfline.name == name:
- del self._children[lineno]
-
- def add_defaults(self, defaults=None):
- """Add default values from a simple dictionary"""
- if defaults is None:
- return
-
- for name, value in defaults.items():
- self.update_cnf(name, 0, value)
-
- def __str__(self):
- rtn = str(self._parent) + '\n'
- for unused, value in self._children.items():
- rtn = rtn + str(value) + '\n'
-
- return rtn
-
- def save(self, filename):
- """Save string representation to disk."""
- with open(filename, 'w') as out:
- out.write(str(self))
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-from .build_cnfvar import BuildCnfVar
-"""Class to create group objects on the fly"""
-
-
-class BuildGroup(BuildCnfVar):
-
- def __init__(self, data='', instance=0, line_no=1):
- BuildCnfVar.__init__(self, 'GROUP', instance, data, line_no)
-
- # the bare defaults the UI adds upon
- # creation of new groups
- defaults = {
- 'GROUP_COMMENT': '',
- }
-
- self.add_defaults(defaults)
-
- def comment(self, comment):
- self.update_cnf('GROUP_COMMENT', 0, comment)
- return self
-
- def enable_activesync(self):
- self.update_cnf('GROUP_ACTIVESYNC_ENABLE', 0, '1')
- return self
-
- def enable_xauth(self):
- self.update_cnf('GROUP_XAUTH_ENABLE', 0, '1')
- return self
-
- def enable_go_online(self):
- self.update_cnf('GROUP_ACCESS_GO_ONLINE_ALLOWED', 0, '1')
- return self
-
- def enable_remote_administration(self):
- self.update_cnf('GROUP_ACCESS_REMOTE_ADMINISTRATION_ALLOWED', 0, '1')
- return self
-
- def email_quota(self, quota):
- self.update_cnf('GROUP_EMAIL_QUOTA', 0, quota)
- return self
-
- def email_relay_rights_block_relay(self):
- self.update_cnf('GROUP_EMAIL_RELAY_RIGHTS', 0, 'BLOCK_RELAY')
- return self
-
- def email_relay_rights_from_intranet(self):
- self.update_cnf('GROUP_EMAIL_RELAY_RIGHTS', 0, 'RELAY_FROM_INTRANET')
- return self
-
- def email_relay_rights_from_everywhere(self):
- self.update_cnf('GROUP_EMAIL_RELAY_RIGHTS', 0, 'RELAY_FROM_EVERYWHERE')
- return self
-
- def emailfilter_ban_filterlist_ref(self, filterlist_ref):
- self.update_cnf('GROUP_EMAILFILTER_BAN_FILTERLIST_REF', 0,
- filterlist_ref)
- return self
-
- def proxy_profile_ref(self, profile_ref):
- self.update_cnf('GROUP_PROXY_PROFILE_REF', 0, profile_ref)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-Create intraclients.
-"""
-
-from . import build_cnfvar
-
-default_intraclient_name = "sample client"
-default_intraclient_instance = 1
-default_cnfvars = {
- "INTRACLIENT_COMMENT": default_intraclient_name + " comment",
- "INTRACLIENT_DNS_RELAYING_ALLOWED": "1",
- "INTRACLIENT_EMAIL_RELAYING_ALLOWED": "1",
- "INTRACLIENT_FIREWALL_RULESET_REF": "5",
- "INTRACLIENT_IP": None,
- "INTRACLIENT_MAC": None,
- "INTRACLIENT_PROXY_PROFILE_REF": "-1",
-}
-
-
-class BuildIntraclient(build_cnfvar.BuildCnfVar):
-
- def __init__(self,
- data=default_intraclient_name,
- instance=default_intraclient_instance,
- line_no=1,
- ip="192.168.0.42",
- mac=None,
- alias=None):
- build_cnfvar.BuildCnfVar.__init__(self,
- "INTRACLIENT",
- instance,
- data, line_no)
- self.add_defaults(default_cnfvars)
-
- self.update_cnf("INTRACLIENT_IP", 0, ip)
- self.update_cnf("INTRACLIENT_MAC", 0, mac or "")
-
- if alias is not None:
- self.update_cnf("INTRACLIENT_ALIAS", 0, alias)
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-from .build_cnfvar import BuildCnfVar
-"""Class to create own keys cnfvar on the fly """
-
-
-class BuildKey(BuildCnfVar):
- def __init__(self, data='sample key', instance=0, line_no=1):
- BuildCnfVar.__init__(self, 'KEY_OWN', instance, data, line_no)
-
- # set some dummy data for cnf_check
- defaults = {
- 'KEY_OWN_FINGERPRINT_MD5': '76:3B:CF:8E:CB:BF:A5:7D:CC:87:39:FA:CE:99:2E:96',
- 'KEY_OWN_FINGERPRINT_SHA1': 'ED:5A:C6:D9:5B:BE:47:1F:B9:4F:CF:A3:80:3B:42:08:F4:00:16:96',
- 'KEY_OWN_ID_X509': 'CN=some.net.lan',
- 'KEY_OWN_ISSUER': 'CN=ab, C=fd, L=ab, ST=ab, O=ab, OU=ab/emailAddress=ab@ab.com',
- 'KEY_OWN_KEYSIZE': '2048',
- 'KEY_OWN_HASH_ALGO': 'SHA2_256',
- 'KEY_OWN_PRIVATE_KEY': '-----BEGIN PRIVATE KEY-----\\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKk' +
- 'ZTzqHXg41RZMiY+ywRZ037pBq8J3BkH\\n-----END PRIVATE KEY-----\\n',
- 'KEY_OWN_PUBLIC_KEY': '-----BEGIN CERTIFICATE-----\\nMIIFCTCCAvGgAwIBAgIEVBGDuTANBgkqhkiG' +
- '9w0BAQsFADCBgTEPMA0GA1UEAwwG\\n-----END CERTIFICATE-----\\n',
- 'KEY_OWN_REQUEST': '-----BEGIN CERTIFICATE REQUEST-----\\nMIIDCzCCAfMCAQAwIjEgMB4GA1UEAww' +
- 'XaW50cmFkZXYtYWllc\\n-----END CERTIFICATE REQUEST-----\\n',
- 'KEY_OWN_SUBJECT': 'CN=some.net.lan',
- 'KEY_OWN_VALIDFROM': '20140911T111257',
- 'KEY_OWN_VALIDTILL': '20160731T134608',
- 'KEY_OWN_TYPE': 'SELF',
-
- # the ones bellow should be set when using 'generate' to create the key
- 'KEY_OWN_CREATE_CN': 'somehost',
- 'KEY_OWN_CREATE_EMAIL': 'default@intra2net.com'
- }
-
- self.add_defaults(defaults)
-
- def country(self, country):
- self.update_cnf('KEY_OWN_CREATE_C', 0, country)
- return self
-
- def state(self, state):
- self.update_cnf('KEY_OWN_CREATE_ST', 0, state)
- return self
-
- def city(self, city):
- self.update_cnf('KEY_OWN_CREATE_L', 0, city)
- return self
-
- def company(self, company):
- self.update_cnf('KEY_OWN_CREATE_O', 0, company)
- return self
-
- def department(self, department):
- self.update_cnf('KEY_OWN_CREATE_OU', 0, department)
- return self
-
- def computer_name(self, computer_name):
- self.update_cnf('KEY_OWN_CREATE_CN', 0, computer_name)
- return self
-
- def email(self, email):
- self.update_cnf('KEY_OWN_CREATE_EMAIL', 0, email)
- return self
-
- def days(self, days):
- self.update_cnf('KEY_OWN_CREATE_DAYS', 0, days)
- return self
-
- def keysize(self, keysize):
- self.update_cnf('KEY_OWN_KEYSIZE', 0, keysize)
- return self
-
- def hash_algo(self, hash_algo):
- self.update_cnf('KEY_OWN_HASH_ALGO', 0, hash_algo)
- return self
-
- def certchain(self, certchain):
- self.update_cnf('KEY_OWN_CERTCHAIN', 0, certchain)
- return self
-
- def cerchain_count(self, cerchain_count):
- self.update_cnf('KEY_OWN_CERTCHAIN_CERTCOUNT', 0, cerchain_count)
- return self
-
- def create_subjalt(self, create_subjalt):
- self.update_cnf('KEY_OWN_CREATE_SUBJALT', 0, create_subjalt)
- return self
-
- def create_subjalt_type(self, create_subjalt_type):
- self.update_cnf('KEY_OWN_CREATE_SUBJALT_TYPE', 0, create_subjalt_type)
- return self
-
- def fingerprint_md5(self, fingerprint_md5):
- self.update_cnf('KEY_OWN_FINGERPRINT_MD5', 0, fingerprint_md5)
- return self
-
- def fingerprint_sha1(self, fingerprint_sha1):
- self.update_cnf('KEY_OWN_FINGERPRINT_SHA1', 0, fingerprint_sha1)
- return self
-
- def id_x509(self, id_x509):
- self.update_cnf('KEY_OWN_ID_X509', 0, id_x509)
- return self
-
- def issuer(self, issuer):
- self.update_cnf('KEY_OWN_ISSUER', 0, issuer)
- return self
-
- def private_key(self, private_key):
- self.update_cnf('KEY_OWN_PRIVATE_KEY', 0, private_key)
- return self
-
- def public_key(self, public_key):
- self.update_cnf('KEY_OWN_PUBLIC_KEY', 0, public_key)
- return self
-
- def request(self, request):
- self.update_cnf('KEY_OWN_REQUEST', 0, request)
- return self
-
- def subject(self, subject):
- self.update_cnf('KEY_OWN_SUBJECT', 0, subject)
- return self
-
- def subject_alt(self, subject_alt):
- self.update_cnf('KEY_OWN_SUBJECT_ALT', 0, subject_alt)
- return self
-
- def key_type(self, key_type):
- self.update_cnf('KEY_OWN_TYPE', 0, key_type)
- return self
-
- def valid_from(self, valid_from):
- self.update_cnf('KEY_OWN_VALIDFROM', 0, valid_from)
- return self
-
- def valid_till(self, valid_till):
- self.update_cnf('KEY_OWN_VALIDTILL', 0, valid_till)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-from .build_cnfvar import BuildCnfVar
-"""Class to create user cnfvar objects on the fly"""
-
-
-class BuildNIC(BuildCnfVar):
-
- def __init__(self, data='', instance=0, line_no=1):
- BuildCnfVar.__init__(self, 'NIC', instance, '', line_no)
-
- # the bare defaults the UI adds upon
- # creation of new groups
- defaults = {
- 'NIC_COMMENT': data,
- 'NIC_DRIVER': 'virtio_net',
- 'NIC_LAN_DNS_RELAYING_ALLOWED': "0",
- 'NIC_LAN_EMAIL_RELAYING_ALLOWED': "0",
- 'NIC_LAN_FIREWALL_RULESET_REF': "1",
- 'NIC_LAN_IP': "192.168.1.1",
- 'NIC_LAN_NAT_INTO': "0",
- 'NIC_LAN_NETMASK': "255.255.255.0",
- 'NIC_LAN_PROXY_PROFILE_REF': "-1",
- 'NIC_MAC': '02:00:00:00:20:00',
- 'NIC_TYPE': 'DSLROUTER',
- }
-
- self.add_defaults(defaults)
-
- def comment(self, comment):
- self.update_cnf('NIC_COMMENT', 0, comment)
- return self
-
- def nic_type(self, nic_type):
- self.update_cnf('NIC_TYPE', 0, nic_type)
- return self
-
- def lan_ip(self, lan_ip):
- self.update_cnf('NIC_LAN_IP', 0, lan_ip)
- return self
-
- def add_group_member_ref(self, group_ref):
- self.add_cnf('USER_GROUP_MEMBER_REF', -1, group_ref)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-Create provider profiles.
-
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from . import build_cnfvar
-
-# Defaults are extracted from data/shared_arnied/provider.cnf.
-default_provider_name = "sample_provider"
-default_provider_instance = 1
-default_cnfvars = {
- "PROVIDER_PROXY_SERVER": "",
- "PROVIDER_PROXY_PORT": "",
- "PROVIDER_PROXY_PASSWORD": "",
- "PROVIDER_PROXY_LOGIN": "",
- "PROVIDER_NIC_REF": "1",
- "PROVIDER_NETMASK": "255.255.0.0",
- "PROVIDER_MTU_SIZE": "1500",
- "PROVIDER_MODE": "ROUTER",
- "PROVIDER_MAILTRANSFER_MODE": "IMMEDIATE",
- "PROVIDER_LOCALIP": "",
- "PROVIDER_IP": "",
- "PROVIDER_FIREWALL_RULESET_REF": "7",
- "PROVIDER_FALLBACK_TIMEOUT": "60",
- "PROVIDER_FALLBACK_PROVIDER_REF": "-1",
- "PROVIDER_EMAIL_RELAY_REF": "-1",
- "PROVIDER_DYNDNS_WEBCHECKIP": "0",
- "PROVIDER_DYNDNS_ENABLE": "1",
- "PROVIDER_DNS_MODE": "IP",
- "PROVIDER_DNS": "",
- "PROVIDER_BWIDTH_MANAGEMENT_UPSTREAM_SPEED": "",
- "PROVIDER_BWIDTH_MANAGEMENT_ENABLE": "0",
- "PROVIDER_BWIDTH_MANAGEMENT_DOWNSTREAM_SPEED": "",
- "PROVIDER_PINGCHECK_SERVERLIST_REF": "-2",
-}
-
-
-class BuildProvider(build_cnfvar.BuildCnfVar):
-
- def __init__(self,
- data=default_provider_name,
- instance=default_provider_instance,
- line_no=1,
- mode="ROUTER",
- dns=None,
- ip=None,
- localip=None):
- build_cnfvar.BuildCnfVar.__init__(self,
- "PROVIDER",
- instance,
- data,
- line_no)
- self.add_defaults(default_cnfvars)
- self.update_cnf("PROVIDER_MODE", 0, mode)
-
- if dns is not None:
- self.update_cnf("PROVIDER_DNS", 0, dns)
- if ip is not None:
- self.update_cnf("PROVIDER_IP", 0, ip)
- if localip is not None:
- self.update_cnf("PROVIDER_LOCALIP", 0, localip)
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from .build_cnfvar import BuildCnfVar
-"""Class to create proxy accesslists on the fly"""
-
-
-class BuildProxyAccesslist(BuildCnfVar):
- def __init__(self, data='', instance=0, line_no=1):
- BuildCnfVar.__init__(self, 'PROXY_ACCESSLIST', instance, data, line_no)
-
- defaults = {'PROXY_ACCESSLIST_ENTRY_COUNT': '123',
- 'PROXY_ACCESSLIST_MODE': '1',
- 'PROXY_ACCESSLIST_SIZETYPE': 1,
- 'PROXY_ACCESSLIST_TYPE': 0}
-
- self.add_defaults(defaults)
-
- def mode_whitelist(self):
- self.update_cnf('PROXY_ACCESSLIST_MODE', 0, '0')
- return self
-
- def mode_blacklist(self):
- self.update_cnf('PROXY_ACCESSLIST_MODE', 0, '1')
- return self
-
- def add_url(self, url):
- self.add_cnf('PROXY_ACCESSLIST_URL', -1, url)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from .build_cnfvar import BuildCnfVar
-"""Class to create proxy profiles on the fly"""
-
-
-class BuildProxyProfile(BuildCnfVar):
- def __init__(self, data='', instance=0, line_no=1):
- BuildCnfVar.__init__(self, 'PROXY_PROFILE', instance, data, line_no)
-
- def add_accesslist_ref(self, accesslist_instance):
- self.add_cnf('PROXY_PROFILE_ACCESSLIST_REF', -1, accesslist_instance)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from .build_cnfvar import BuildCnfVar
-"""Class to create user cnfvar objects on the fly"""
-
-
-class BuildUser(BuildCnfVar):
-
- def __init__(self, data='', instance=0, line_no=1):
- BuildCnfVar.__init__(self, 'USER', instance, data, line_no)
-
- # the bare defaults the UI adds upon
- # creation of new groups
- defaults = {
- 'USER_DISABLED': '0',
- 'USER_FULLNAME': '',
- 'USER_GROUPWARE_FOLDER_CALENDAR': 'INBOX/Calendar',
- 'USER_GROUPWARE_FOLDER_CONTACTS': 'INBOX/Contacts',
- 'USER_GROUPWARE_FOLDER_DRAFTS': 'INBOX/Drafts',
- 'USER_GROUPWARE_FOLDER_NOTES': 'INBOX/Notes',
- 'USER_GROUPWARE_FOLDER_OUTBOX': 'INBOX/Sent Items',
- 'USER_GROUPWARE_FOLDER_TASKS': 'INBOX/Tasks',
- 'USER_GROUPWARE_FOLDER_TRASH': 'INBOX/Deleted Items',
- # always a member of the 'Alle' group
- 'USER_GROUP_MEMBER_REF': '2',
- 'USER_LOCALE': '',
- 'USER_PASSWORD': 'test1234',
- 'USER_TRASH_DELETEDAYS': '30',
- 'USER_WEBMAIL_MESSAGES_PER_PAGE': '25',
- 'USER_WEBMAIL_SIGNATURE': '',
- }
-
- self.add_defaults(defaults)
-
- def disabled(self, disabled='1'):
- self.update_cnf('USER_DISABLED', 0, disabled)
- return self
-
- def fullname(self, fullname):
- self.update_cnf('USER_FULLNAME', 0, fullname)
- return self
-
- def password(self, password):
- self.update_cnf('USER_PASSWORD', 0, password)
- return self
-
- def normal_email(self):
- self.update_cnf('USER_EMAIL_FORWARD_ENABLE', 0, 'NONE')
- return self
-
- def forward_email(self, email):
- self.update_cnf('USER_EMAIL_FORWARD_ENABLE', 0, 'FORWARD')
-
- addr = self.update_cnf('USER_EMAIL_FORWARD_ADDRESS', 0, '')
- self.update_cnf('USER_EMAIL_FORWARD_ADDRESS_ADDR', 0, email, addr)
- return self
-
- def copy_email(self, email):
- self.update_cnf('USER_EMAIL_FORWARD_ENABLE', 0, 'COPY')
-
- addr = self.update_cnf('USER_EMAIL_FORWARD_ADDRESS', 0, '')
- self.update_cnf('USER_EMAIL_FORWARD_ADDRESS_ADDR', 0, email, addr)
- return self
-
- def add_group_member_ref(self, group_ref):
- self.add_cnf('USER_GROUP_MEMBER_REF', -1, group_ref)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-Create vpn connections.
-
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from . import build_cnfvar
-
-# Defaults are extracted from data/shared_arnied/vpnconn.cnf.
-default_vpnconn_name = "sample_vpn"
-default_vpnconn_instance = 1
-default_cnfvars = {
- "VPNCONN_ACTIVATION": "ALWAYS",
- "VPNCONN_DISABLED": "0",
- "VPNCONN_DNS_RELAYING_ALLOWED": "1",
- "VPNCONN_EMAIL_RELAYING_ALLOWED": "1",
- "VPNCONN_ENCRYPTION_PROFILE_REF": "0",
- "VPNCONN_FIREWALL_RULESET_REF": "5",
- "VPNCONN_IKE_VERSION": "1",
- "VPNCONN_KEY_FOREIGN_REF": "1",
- "VPNCONN_KEY_OWN_REF": "1",
- "VPNCONN_KEY_TYPE": "PUBLIC",
- "VPNCONN_LAN_NAT_IP": "",
- "VPNCONN_LAN_NAT_MODE": "UNMODIFIED",
- "VPNCONN_LAN_NAT_NETWORK": "",
- "VPNCONN_LAN_NIC_REF": "2",
- "VPNCONN_LAN_NET": "172.17.0.0",
- "VPNCONN_LAN_NETMASK": "255.255.0.0",
- "VPNCONN_LAN_TYPE": "NIC",
- "VPNCONN_LIFETIME_IKE": "480",
- "VPNCONN_LIFETIME_IPSECSA": "60",
- "VPNCONN_OFFLINE_DETECTION_SEC": "60",
- "VPNCONN_PEER_DNS": "",
- "VPNCONN_PEER_IP": None,
- "VPNCONN_PEER_TYPE": "IP",
- "VPNCONN_PROXY_PROFILE_REF": "-2",
- "VPNCONN_PSK": "",
- "VPNCONN_PSK_FOREIGN_ID": "",
- "VPNCONN_PSK_FOREIGN_ID_TYPE": "IP",
- "VPNCONN_PSK_OWN_ID": "",
- "VPNCONN_PSK_OWN_ID_TYPE": "IP",
- "VPNCONN_REMOTE_INET_NAT": "1",
- "VPNCONN_REMOTE_MODECONFIG_IP": "192.168.99.1",
- "VPNCONN_REMOTE_NAT_ENABLE": "0",
- "VPNCONN_REMOTE_NAT_NETWORK": "",
- "VPNCONN_REMOTE_NET": "172.18.0.0",
- "VPNCONN_REMOTE_NETMASK": "255.255.0.0",
- "VPNCONN_REMOTE_TYPE": "CUSTOM",
- "VPNCONN_RETRIES": "3",
- "VPNCONN_SECURED": "ESP",
- "VPNCONN_XAUTH_SERVER_ENABLE": "0"
-}
-
-
-class BuildVPNConn(build_cnfvar.BuildCnfVar):
-
- def __init__(self,
- data=default_vpnconn_name,
- instance=default_vpnconn_instance,
- line_no=1,
- peer_ip="172.16.1.172"):
- build_cnfvar.BuildCnfVar.__init__(self,
- "VPNCONN",
- instance,
- data,
- line_no)
- self.add_defaults(default_cnfvars)
-
- self.update_cnf("VPNCONN_PEER_IP", 0, peer_ip)
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-class CnfLine(object):
- """Represents an arnied cnfline"""
-
- def __init__(self,
- name='',
- instance=0,
- data='',
- line_no=1,
- parent_line_no=0):
- self.name = name
- self.instance = instance
- self.data = data
- self.line_no = line_no
- self.parent_line_no = parent_line_no
-
- if len(self.name) == 0:
- raise ValueError("You can't leave the cnfvar name empty")
-
- if line_no == 0:
- raise ValueError('Zero is not a valid line number')
-
- def __str__(self):
- """Build cnfline string representation"""
-
- # Sanity checks
- if len(self.name) == 0:
- raise ValueError("Can't display empty cnfvar name")
- if self.line_no == 0:
- raise ValueError('Zero is not a valid line number')
-
- if self.parent_line_no:
- rtn = '{0} ({1})'.format(self.line_no, self.parent_line_no)
- else:
- rtn = '{0}'.format(self.line_no)
-
- rtn += ' {0},{1}: "{2}"'.format(self.name, self.instance, self.data)
-
- return rtn
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from .build_cnfvar import BuildCnfVar
-"""Class to configure the proxy antivirus cnfvars on the fly"""
-
-
-class ConfigureProxyAntivirus(BuildCnfVar):
- def __init__(self, enabled='1', line_no=1):
- BuildCnfVar.__init__(self, 'VIRSCAN_PROXY_ENABLE',
- 0, enabled, line_no)
-
- def automatic_unblock(self, block_minutes):
- line_no = self.update_cnf('VIRSCAN_PROXY_AUTOMATIC_UNBLOCK', 0, block_minutes)
- self.mark_as_own_parent(line_no)
- return self
-
- def debug_log(self, enabled='1'):
- line_no = self.update_cnf('VIRSCAN_PROXY_DEBUG_LOG', 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
-
- def add_pass_site(self, url):
- sites_parent = self.update_cnf('VIRSCAN_PROXY_PASS_SITES', 0, '')
- self.mark_as_own_parent(sites_parent)
-
- self.add_cnf('VIRSCAN_PROXY_PASS_SITES_ADDR', -1, url, sites_parent)
- return self
-
- def clear_pass_sites(self):
- self.del_cnf('VIRSCAN_PROXY_PASS_SITES_ADDR')
- self.del_cnf('VIRSCAN_PROXY_PASS_SITES')
- return self
-
- def warn_admin(self, enabled='1'):
- line_no = self.update_cnf('VIRSCAN_PROXY_WARN_ADMIN', 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-"""
-
-from .build_cnfvar import BuildCnfVar
-"""Class to configure the webfilter cnfvars on the fly"""
-
-
-class ConfigureWebfilter(BuildCnfVar):
- def __init__(self, enabled='1', line_no=1):
- BuildCnfVar.__init__(self, 'PROXY_WEBFILTER_ENABLE',
- 0, enabled, line_no)
-
- def block_drugs(self, enabled='1'):
- line_no = self.update_cnf('PROXY_WEBFILTER_BLOCK_DRUGS', 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
-
- def block_gambling(self, enabled='1'):
- line_no = self.update_cnf('PROXY_WEBFILTER_BLOCK_GAMBLING', 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
-
- def block_pornography(self, enabled='1'):
- line_no = self.update_cnf('PROXY_WEBFILTER_BLOCK_PORNOGRAPHY',
- 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
-
- def block_violence(self, enabled='1'):
- line_no = self.update_cnf('PROXY_WEBFILTER_BLOCK_VIOLENCE', 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
-
- def block_warez(self, enabled='1'):
- line_no = self.update_cnf('PROXY_WEBFILTER_BLOCK_WAREZ', 0, enabled)
- self.mark_as_own_parent(line_no)
- return self
-
- def add_pass_site(self, url):
- sites_parent = self.update_cnf('PROXY_WEBFILTER_PASS_SITES', 0, '')
- self.mark_as_own_parent(sites_parent)
-
- self.add_cnf('PROXY_WEBFILTER_PASS_SITES_URL', -1, url, sites_parent)
- return self
-
- def clear_pass_sites(self):
- self.del_cnf('PROXY_WEBFILTER_PASS_SITES_URL')
- self.del_cnf('PROXY_WEBFILTER_PASS_SITES')
- return self
from .model import Cnf, CnfList
from .binary import CnfBinary
from .store import CnfStore, BinaryCnfStore, CommitException
+from . import templates
__all__ = ["Cnf", "CnfList", "CnfBinary", "CnfStore",
"BinaryCnfStore", "CommitException"]
import json
-from .. import cnfvar_old, arnied_api
+from . import string
+from .. import arnied_api
#: value used to detect unspecified arguments
DEFAULT = object()
class CnfListSerializationMixin(BaseCnfList):
"""Add serialization support to BaseCnfList."""
- def to_cnf_structure(self, renumber=True):
+ def to_cnf_string(self, renumber=True):
"""
- Convert this list to an object meaningful to :py:mod:`cnfvar`.
+ Generate a string representation of this list in the cnfvar format.
- :param bool renumber: whether to fix up the number/ids of the CNFs
- :returns: a dictionary with the converted values
- :rtype: {str, {str, str or int}}
+ :param bool renumber: whether to fix the lineno of the cnfvars
+ :returns: the CNF string
+ :rtype: str
"""
if renumber:
self.renumber()
- return {"cnf": [x.to_cnf_structure() for x in self]}
+ return str(self)
def to_cnf_file(self, path, renumber=True, encoding=ENCODING):
"""
fp.write(self.to_json_string(renumber=renumber))
@classmethod
- def from_cnf_structure(cls, obj):
+ def _from_cnf_structure(cls, obj):
"""
- Create a list from a cnfvar object from the :py:mod:`cnfvar` module.
+ Create a list from a JSON structure obtainable from `get_cnf --json`.
:param obj: an object as defined in the :py:mod:`cnfvar`
:type obj: {str, {str, str or int}}
:returns: a list of cnfvars
:rtype: :py:class:`CnfList`
"""
- return cls(map(Cnf.from_cnf_structure, obj["cnf"]))
+ return cls(map(Cnf._from_cnf_structure, obj["cnf"]))
@classmethod
def from_cnf_string(cls, data):
:returns: a list of cnfvars
:rtype: :py:class:`CnfList`
"""
- cnf_obj = cnfvar_old.read_cnf(data)
- return CnfList.from_cnf_structure(cnf_obj)
-
- @classmethod
- def from_json_string(cls, data):
- """
- Create a list from a json string.
-
- :param str data: string to generate the list from
- :returns: a list of cnfvars
- :rtype: :py:class:`CnfList`
- """
- cnf_obj = json.loads(data)
- return CnfList.from_cnf_structure(cnf_obj)
+ cnf_obj = string.read_cnf(data)
+ return CnfList._from_cnf_structure(cnf_obj)
@classmethod
def from_cnf_file(cls, path, encoding=ENCODING):
return CnfList.from_cnf_string(fp.read())
@classmethod
+ def from_json_string(cls, data):
+ """
+ Create a list from a json string.
+
+ :param str data: string to generate the list from
+ :returns: a list of cnfvars
+ :rtype: :py:class:`CnfList`
+ """
+ cnf_obj = json.loads(data)
+ return CnfList._from_cnf_structure(cnf_obj)
+
+ @classmethod
def from_json_file(cls, path):
"""
Create a list from a json file.
class CnfSerializationMixin(BaseCnf):
"""Add serialization support to BaseCnf."""
- def to_cnf_structure(self):
- """
- Convert this instance to dictionary from the :py:mod:`cnfvar` module.
-
- :returns: the dictionary created
- :rtype: {str, str or int}
-
- .. todo:: this method is still needed because dumping cnf variables
- to strings (json or not) is still delegated to the old cnfvar module.
- """
- d = {
- "number": self.lineno,
- "varname": self.name,
- "data": self.value,
- "instance": self.instance
- }
- if self.parent and self.parent.lineno:
- d["parent"] = self.parent.lineno
- if self.comment is not None:
- d["comment"] = self.comment
- if len(self.children) > 0:
- d["children"] = [c.to_cnf_structure() for c in self.children]
- return d
-
- def to_json_string(self, renumber=True):
+ def to_cnf_string(self, renumber=True):
"""
- Convert this instance to a JSON string.
+ Generate a string representation of this list in the cnfvar format.
- :param bool renumber: whether to fix the lineno of the cnfvars
- :returns: the JSON string
+ :param bool renumber: whether to fix the lineno of this cnfvar and its children
+ :returns: the CNF string
:rtype: str
"""
- return CnfList([self]).to_json_string(renumber=renumber)
+ return CnfList([self]).to_cnf_string(renumber=renumber)
def to_cnf_file(self, path, renumber=True, encoding=ENCODING):
"""
"""
CnfList([self]).to_cnf_file(path, renumber=renumber, encoding=encoding)
+ def to_json_string(self, renumber=True):
+ """
+ Convert this instance to a JSON string.
+
+ :param bool renumber: whether to fix the lineno of the cnfvars
+ :returns: the JSON string
+ :rtype: str
+ """
+ return CnfList([self]).to_json_string(renumber=renumber)
+
def to_json_file(self, path, renumber=True):
"""
Dump a JSON representation of this instance to a file.
CnfList([self]).to_json_file(path, renumber=renumber)
@classmethod
- def from_cnf_structure(cls, obj):
+ def _from_cnf_structure(cls, obj):
"""
- Create an instance from a dictionary from the :py:mod:`cnfvar` module.
+ Create an instance from a JSON structure obtainable from `get_cnf --json`.
:param obj: dictionary to convert to this instance
:type obj: {str, str or int}
instance=obj["instance"], lineno=obj["number"],
comment=obj.get("comment", None))
for ch_obj in obj.get("children", []):
- child_cnf = Cnf.from_cnf_structure(ch_obj)
+ child_cnf = Cnf._from_cnf_structure(ch_obj)
cnf.add_child(child_cnf)
return cnf
return CnfListSerializationMixin.from_cnf_string(data).single()
@classmethod
- def from_json_string(cls, data):
+ def from_cnf_file(cls, path, encoding=ENCODING):
"""
- Create an instance of this class from a JSON string.
+ Create an instance of this class from a cnfvar file.
- :param str data: JSON string to convert
+ :param str path: path to the file to read
+ :param str encoding: encoding to use to read the file
:returns: the cnf instance created
:rtype: :py:class:`Cnf`
"""
- return CnfListSerializationMixin.from_json_string(data).single()
+ return CnfListSerializationMixin.from_cnf_file(path, encoding=encoding).single()
@classmethod
- def from_cnf_file(cls, path, encoding=ENCODING):
+ def from_json_string(cls, data):
"""
- Create an instance of this class from a cnfvar file.
+ Create an instance of this class from a JSON string.
- :param str path: path to the file to read
- :param str encoding: encoding to use to read the file
+ :param str data: JSON string to convert
:returns: the cnf instance created
:rtype: :py:class:`Cnf`
"""
- return CnfListSerializationMixin.from_cnf_file(path, encoding=encoding).single()
+ cnf_obj = json.loads(data)
+ return CnfList._from_cnf_structure(cnf_obj)
@classmethod
def from_json_file(cls, path):
--- /dev/null
+#!/usr/bin/env python
+#
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+"""
+string: functionality for parsing cnfvars from strings
+
+.. codeauthor:: Intra2net
+
+
+contents
+-------------------------------------------------------------------------------
+
+This module provides read and parse functionality for the Intra2net *CNF*
+format from strings and by extension cnf files.
+
+The input string takes one round-trip through the parsers and will error out on
+problematic lines. Thus, this module can also be used to syntax-check CNF data.
+
+Note that line numbers may be arbitrarily reassigned in the process. Of course,
+parent references and the relative ordering of lines will be preserved in this
+case.
+
+.. todo::
+ Decide on some facility for automatic fixup of line number values. The
+ internal representation is recursive so line numbers are not needed to
+ establish a variable hierarchy. They might as well be omitted from the json
+ input and only added when writing cnf. For the time being a lack of the
+ "number" field is interpreted as an error. Though it should at least
+ optionally be possible to omit the numbers entirely and have a function add
+ them after the fact. This might as well be added to :py:func:`is_cnf`
+ though it would be counterintuitive to have a predicate mutate its
+ argument. So maybe it could return a second argument to indicate a valid
+ structure that needs fixup or something like that.
+
+.. note::
+ The variable values of get_cnf seems to be encoded in latin1, and set_cnf
+ seems to assume latin1-encoded values (not var names). Function
+ :py:func:`read_cnf` converts this to unicode and other functions convert
+ unicode back to latin1.
+
+
+notes on Python 3 conversion
+-------------------------------------------------------------------------------
+
+Since the original *CNF* format assumes latin-1 encoded data pretty much
+exclusively, we preserve the original encoding while parsing the file.
+When assembling the data structures returned to the user, values are then
+converted to strings so they can be used naturally at the Python end.
+
+implementation
+-------------------------------------------------------------------------------
+"""
+
+import functools
+import sys
+import json
+import re
+import io
+
+
+###############################################################################
+# CONSTANTS
+###############################################################################
+
+
+CNF_FIELD_MANDATORY = set ([ "varname", "data", "instance" ])
+CNF_FIELD_OPTIONAL = set ([ "parent", "children", "comment", "number" ])
+CNF_FIELD_KNOWN = CNF_FIELD_MANDATORY | CNF_FIELD_OPTIONAL
+
+grab_parent_pattern = re.compile(b"""
+ ^ # match from start
+ \s* # optional spaces
+ \d+ # line number
+ \s+ # spaces
+ \((\d+)\) # parent
+ """,
+ re.VERBOSE)
+
+base_line_pattern = re.compile(b"""
+ ^ # match from start
+ \s* # optional spaces
+ (\d+) # line number
+ \s+ # spaces
+ ([A-Z][A-Z0-9_]*) # varname
+ \s* # optional spaces
+ , # delimiter
+ \s* # optional spaces
+ (-1|\d+) # instance
+ \s* # optional spaces
+ : # delimiter
+ \s* # optional spaces
+ \"( # quoted string (data)
+ (?: \\\" # (of escaped dquote
+ |[^\"])* # or anything not a
+ )\" # literal quote)
+ \s* # optional spaces
+ ( # bgroup
+ \# # comment leader
+ \s* # optional spaces
+ .* # string (comment)
+ )? # egroup, optional
+ $ # eol
+ """,
+ re.VERBOSE)
+
+child_line_pattern = re.compile(b"""
+ ^ # match from start
+ \s* # optional spaces
+ (\d+) # line number
+ \s+ # spaces
+ \((\d+)\) # parent
+ \s+ # spaces
+ ([A-Z][A-Z0-9_]*) # varname
+ \s* # optional spaces
+ , # delimiter
+ \s* # optional spaces
+ (-1|\d+) # instance
+ \s* # optional spaces
+ : # delimiter
+ \s* # optional spaces
+ \"([^\"]*)\" # quoted string (data)
+ \s* # optional spaces
+ ( # bgroup
+ \# # comment leader
+ \s* # optional spaces
+ .* # string (comment)
+ )? # egroup, optional
+ $ # eol
+ """,
+ re.VERBOSE)
+
+
+###############################################################################
+# HELPERS
+###############################################################################
+
+
+#
+# Sadly, the Intranator is still stuck with one leg in the 90s.
+#
+def to_latin1(s):
+ """Take given unicode str and convert it to a latin1-encoded `bytes`."""
+ return s.encode("latin-1")
+
+
+def from_latin1(s):
+ """Take given latin1-encoded `bytes` value and convert it to `str`."""
+ return s.decode("latin-1")
+
+
+#
+# Conversion functions
+#
+
+def marshal_in_number(number):
+ return int(number)
+
+
+def marshal_in_parent(parent):
+ return int(parent)
+
+
+def marshal_in_instance(instance):
+ return int(instance)
+
+
+def marshal_in_varname(varname):
+ return from_latin1(varname).lower()
+
+
+def marshal_in_data(data):
+ return from_latin1(data) if data is not None else ""
+
+
+def marshal_in_comment(comment):
+ return comment and from_latin1(comment[1:].strip()) or None
+
+
+#
+# Type checking
+#
+
+def is_string(s):
+ return isinstance(s, str)
+
+
+###############################################################################
+# EXCEPTIONS
+###############################################################################
+
+
+class InvalidCNF(Exception):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return "Malformed CNF_VAR: \"%s\"" % self.msg
+
+
+class MalformedCNF(Exception):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return "Malformed CNF file: \"%s\"" % self.msg
+
+
+###############################################################################
+# VALIDATION
+###############################################################################
+
+
+def is_valid(acc,
+ nested,
+ comment,
+ data,
+ instance,
+ number,
+ parent,
+ varname):
+ if varname is None:
+ raise InvalidCNF("CNF_VAR lacks a name.")
+ elif not is_string(varname):
+ raise InvalidCNF("Varname field of CNF_VAR \"%s\" is not a string."
+ % varname)
+ elif varname == "":
+ raise InvalidCNF("Varname field of CNF_VAR is the empty string.")
+
+ if comment is not None:
+ if not is_string(comment):
+ raise InvalidCNF("Comment field of CNF_VAR \"%s\" is not a string."
+ % varname)
+
+ if data is None:
+ raise InvalidCNF("Data field of CNF_VAR \"%s\" is empty."
+ % varname)
+ elif not is_string(data):
+ raise InvalidCNF("Data field of CNF_VAR \"%s\" is not a string."
+ % varname)
+
+ if instance is None:
+ raise InvalidCNF("Instance field of CNF_VAR \"%s\" is empty."
+ % varname)
+ elif not isinstance(instance, int):
+ raise InvalidCNF("Instance field of CNF_VAR \"%s\" is not an integer."
+ % varname)
+
+ if number is None:
+ raise InvalidCNF("Number field of CNF_VAR \"%s\" is empty."
+ % varname)
+ elif not isinstance(number, int):
+ raise InvalidCNF("Number field of CNF_VAR \"%s\" is not an integer."
+ % varname)
+ elif number < 1:
+ raise InvalidCNF("Number field of CNF_VAR \"%s\" must be positive, not %d."
+ % (varname, number))
+ else:
+ other = acc.get(number, None)
+ if other is not None: # already in use
+ raise InvalidCNF("Number field of CNF_VAR \"%s\" already used by variable %s."
+ % (varname, other))
+ acc[number] = varname
+
+ if nested is True:
+ if parent is None:
+ raise InvalidCNF("Parent field of nested CNF_VAR \"%s\" is empty."
+ % varname)
+ elif not isinstance(parent, int):
+ raise InvalidCNF("Parent field of CNF_VAR \"%s\" is not an integer."
+ % varname)
+ else:
+ if parent is not None:
+ raise InvalidCNF("Flat CNF_VAR \"%s\" has nonsensical parent field \"%s\"."
+ % (varname, parent))
+ return acc
+
+
+def is_cnf(root):
+ """
+ is_cnf -- Predicate testing "CNF_VAR-ness". Folds the :py:func:`is_valid`
+ predicate over the argument which can be either a well-formed CNF
+ dictionary or a list of CNF_VARs.
+
+ :type root: cnfvar or cnf list
+ :rtype: bool
+
+ Not that if it returns at all, ``is_cnf()`` returns ``True``. Any non
+ well-formed member of the argument will cause the predicate to bail out
+ with an exception during traversal.
+ """
+ cnf = cnf_root(root)
+ if cnf is None:
+ raise InvalidCNF(root)
+ return walk_cnf(cnf, False, is_valid, {}) is not None
+
+
+def is_cnf_var(obj):
+ """
+ Check whether a dictionary is a valid CNF.
+
+ :param dict obj: dictionary to check
+ :returns: True if the dictionary has all the mandatory fields and no
+ unknown fields, False otherwise
+ :rtype: bool
+ """
+ assert isinstance (obj, dict)
+
+ for f in CNF_FIELD_MANDATORY:
+ if obj.get(f, None) is None:
+ return False
+
+ for f in obj:
+ if f not in CNF_FIELD_KNOWN:
+ return False
+
+ return True
+
+
+###############################################################################
+# DESERIALIZATION
+###############################################################################
+#
+# CNF reader
+#
+# Parsing usually starts from the `read_cnf`, which accepts a string containing
+# the variables to parse in the same structure as returned by `get_cnf`.
+#
+# In the `prepare` function the string is split into lines, and a 3-element
+# tuple is built. The first (named `current`) and second (named `next`)
+# elements of this tuple are respectively the first and second non-empty lines
+# of the input, while the third is a list of the remaining lines. This tuple is
+# named `state` in the implementation below, and it is passed around during
+# parsing. The `get` and `peek` functions are used to easily retrieve the
+# `current` and `next` items from the "state".
+#
+# When we "advance" the state, we actually drop the "current" element,
+# replacing it with the "next", while a new "next" is popped from the list of
+# remaining lines. Parsing is done this way because we need to look ahead at
+# the next line -- if it is a child it needs to be appended to the `children`
+# property of the current line.
+#
+# Regular expressions are used to extract important information from the CNF
+# lines. Finally, once parsing is completed, a dictionary is returned. The dict
+# has the same structure as the serialized JSON output returned by
+# `get_cnf -j`.
+#
+
+
+def read_cnf(data):
+ """
+ Read cnf data from data bytes.
+
+ :param data: raw data
+ :type data: str or bytes
+ :return: the parsed cnf data
+ :rtype: {str, {str, str or int}}
+ """
+ if isinstance(data, str):
+ data = to_latin1(data)
+ state = prepare(data)
+ if state is None:
+ raise InvalidCNF("Empty input string.")
+
+ cnf = parse_cnf_root(state)
+ if is_cnf(cnf) is False:
+ raise TypeError("Invalid CNF_VAR.")
+ return {"cnf": cnf}
+
+
+def prepare(raw):
+ """
+ Build 3-element iterable from a CNF string dump.
+
+ :param raw: string content as returned by `get_cnf`
+ :type raw: bytes
+ :returns: 3-element tuple, where the first two elements are the first two
+ lines of the output and the third is a list containing the rest
+ of the lines in reverse.
+ :rtype: (str * str option * str list) option
+ """
+ lines = raw.splitlines()
+ lines.reverse()
+ try:
+ first = lines.pop()
+ except IndexError:
+ return None
+
+ try:
+ second = lines.pop()
+ except IndexError:
+ second = None
+
+ first = first.strip()
+ if first == b"":
+ return advance((first, second, lines))
+
+ return (first, second, lines)
+
+
+def advance(cns):
+ """
+ Pop the next line from the stream, advancing the tuple.
+
+ :param cns: a 3-element tuple containing two CNF lines and a list of the
+ remaining lines
+ :type cnd: (str, str, [str])
+ :returns: a new tuple with a new item popped from the list of lines
+ :rtype cnd: (str, str, [str])
+ """
+ current, next, stream = cns
+ if next is None: # reached end of stream
+ return None
+ current = next
+
+ try:
+ next = stream.pop()
+ next = next.strip()
+ except IndexError:
+ next = None
+
+ if current == "": # skip blank lines
+ return advance((current, next, stream))
+ return (current, next, stream)
+
+
+def get(cns):
+ """
+ Get the current line from the state without advancing it.
+
+ :param cns: a 3-element tuple containing two CNF lines and a list of the
+ remaining lines
+ :type cnd: (str, str, [str])
+ :returns: the CNF line stored as `current`
+ :rtype: str
+ """
+ current, _, _ = cns
+ return current
+
+
+def parse_cnf_root(state):
+ """
+ Iterate over and parse a list of CNF lines.
+
+ :param state: a 3-element tuple containing two lines and a list of the
+ remaining lines
+ :type state: (str, str, [str])
+ :returns: a list of parsed CNF variables
+ :rtype: [dict]
+
+ The function will parse the first element from the `state` tuple, then read
+ the next line to see if it is a child variable. If it is, it will be
+ appended to the last parsed CNF, otherwise top-level parsing is done
+ normally.
+ """
+ lines = []
+ current = get(state)
+ while state:
+ cnf_line = read_base_line(current)
+ if cnf_line is not None:
+ lines.append(cnf_line)
+ state = advance(state)
+ if state is None: # -> nothing left to do
+ break
+ current = get(state)
+ parent = get_parent(current) # peek at next line
+ if parent is not None: # -> recurse into children
+ (state, children, _parent) = parse_cnf_children(state, parent)
+ cnf_line["children"] = children
+ if state is None:
+ break
+ current = get(state)
+ else:
+ state = advance(state)
+ if state is None:
+ break
+ current = get(state)
+ return lines
+
+
+def parse_cnf_children(state, parent):
+ """
+ Read and parse child CNFs of a given parent until there is none left.
+
+ :param state: a 3-element tuple containing two lines and a list of the
+ remaining lines
+ :type state: (str, str, [str])
+ :param parent: id of the parent whose children we are looking for
+ :type parent: int
+ :returns: a 3-element tuple with the current state, a list of children of
+ the given parent and the parent ID
+ :rtype: (tuple, [str], int)
+
+ The function will recursively parse child lines from the `state` tuple
+ until one of these conditions is satisfied:
+
+ 1. the input is exhausted
+ 2. the next CNF line
+ 2.1. is a toplevel line
+ 2.2. is a child line whose parent has a lower parent number
+
+ Conceptually, 2.1 is a very similar to 2.2 but due to the special status of
+ toplevel lines in CNF we need to handle them separately.
+
+ Note that since nesting of CNF vars is achieved via parent line numbers,
+ lines with different parents could appear out of order. libcnffile will
+ happily parse those and still assign children to the specified parent:
+
+ ::
+ # set_cnf <<THATSALL
+ 1 USER,1337: "l33t_h4x0r"
+ 2 (1) USER_GROUP_MEMBER_REF,0: "2"
+ 4 USER,1701: "picard"
+ 5 (4) USER_GROUP_MEMBER_REF,0: "2"
+ 6 (4) USER_PASSWORD,0: "engage"
+ 3 (1) USER_PASSWORD,0: "hacktheplanet"
+ THATSALL
+ # get_cnf user 1337
+ 1 USER,1337: "l33t_h4x0r"
+ 2 (1) USER_GROUP_MEMBER_REF,0: "2"
+ 3 (1) USER_PASSWORD,0: "hacktheplanet"
+ # get_cnf user 1701
+ 1 USER,1701: "picard"
+ 2 (1) USER_GROUP_MEMBER_REF,0: "2"
+ 3 (1) USER_PASSWORD,0: "engage"
+
+ It is a limitation of ``cnfvar.py`` that it cannot parse CNF data
+ structured like the above example: child lists are only populated from
+ subsequent CNF vars using the parent number solely to track nesting levels.
+ The parser does not keep track of line numbers while traversing the input
+ so it doesn’t support retroactively assigning a child to anything else but
+ the immediate parent.
+ """
+ lines = []
+ current = get(state)
+ while True:
+ cnf_line = read_child_line(current)
+ if cnf_line is not None:
+ lines.append(cnf_line)
+ state = advance(state)
+ if state is None:
+ break
+ current = get(state)
+ new_parent = get_parent(current)
+ if new_parent is None:
+ # drop stack
+ return (state, lines, None)
+ if new_parent > parent:
+ # parent is further down in hierarchy -> new level
+ (state, children, new_parent) = \
+ parse_cnf_children (state, new_parent)
+ if state is None:
+ break
+ cnf_line["children"] = children
+ current = get(state)
+ new_parent = get_parent(current)
+ if new_parent is None:
+ # drop stack
+ return (state, lines, None)
+ if new_parent < parent:
+ # parent is further up in hierarchy -> pop level
+ return (state, lines, new_parent)
+ # new_parent == parent -> continue parsing on same level
+ return (state, lines, parent)
+
+
+def get_parent(line):
+ """
+ Extract the ID of the parent for a given CNF line.
+
+ :param str line: CNF line
+ :returns: parent ID or None if no parent is found
+ :rtype: int or None
+ """
+ match = re.match(grab_parent_pattern, line)
+ if match is None: # -> no parent
+ return None
+ return int(match.groups()[0])
+
+
+def read_base_line(line):
+ """
+ Turn one top-level CNF line into a dictionary.
+
+ :param str line: CNF line
+ :rtype: {str: Any}
+
+ This performs the necessary decoding on values to obtain proper Python
+ strings from 8-bit encoded CNF data.
+
+ The function only operates on individual lines. Argument strings that
+ contain data for multiple lines – this includes child lines of the current
+ CNF var! – will trigger a parsing exception.
+ """
+ if len(line.strip()) == 0:
+ return None # ignore empty lines
+ if line[0] == b"#":
+ return None # ignore comments
+
+ match = re.match(base_line_pattern, line)
+ if match is None:
+ raise MalformedCNF("Syntax error in line \"\"\"%s\"\"\"" % line)
+ number, varname, instance, data, comment = match.groups()
+ return {
+ "number" : marshal_in_number (number),
+ "varname" : marshal_in_varname (varname),
+ "instance" : marshal_in_instance (instance),
+ "data" : marshal_in_data (data),
+ "comment" : marshal_in_comment (comment),
+ }
+
+
+def read_child_line(line):
+ """
+ Turn one child CNF line into a dictionary.
+
+ :param str line: CNF line
+ :rtype: {str: Any}
+
+ This function only operates on individual lines. If the argument string is
+ syntactically valid but contains input representing multiple CNF vars, a
+ parse error will be thrown.
+ """
+ if len(line.strip()) == 0:
+ return None # ignore empty lines
+ if line[0] == "#":
+ return None # ignore comments
+
+ match = re.match(child_line_pattern, line)
+ if match is None:
+ raise MalformedCNF("Syntax error in child line \"\"\"%s\"\"\""
+ % from_latin1 (line))
+ number, parent, varname, instance, data, comment = match.groups()
+ return {
+ "number" : marshal_in_number (number),
+ "parent" : marshal_in_parent (parent),
+ "varname" : marshal_in_varname (varname),
+ "instance" : marshal_in_instance (instance),
+ "data" : marshal_in_data (data),
+ "comment" : marshal_in_comment (comment),
+ }
+
+
+###############################################################################
+# SERIALIZATION
+###############################################################################
+
+
+def cnf_root(root):
+ """
+ Extract a list of CNFs from a given structure.
+
+ :param root: list of CNFs or a CNF dictionary
+ :type root: [dict] or dict
+ :raises: :py:class:`TypeError` if no CNFs can be extracted
+ :returns: list with one or more CNF objects
+ :rtype: [dict]
+
+ Output varies depending on a few conditions:
+ - If `root` is a list, return it right away
+ - If `root` is a dict corresponding to a valid CNF value, return it wrapped
+ in a list
+ - If `root` is a dict with a `cnf` key containg a list (as the JSON
+ returned by `get_cnf -j`), return the value
+ - Otherwise, raise an error
+ """
+ if isinstance(root, list):
+ return root
+ if not isinstance(root, dict):
+ raise TypeError(
+ "Expected dictionary of CNF_VARs, got %s." % type(root))
+ if is_cnf_var(root):
+ return [root]
+ cnf = root.get("cnf", None)
+ if not isinstance(cnf, list):
+ raise TypeError("Expected list of CNF_VARs, got %s." % type(cnf))
+ return cnf
+
+
+###############################################################################
+# TRAVERSAL
+###############################################################################
+
+
+def walk_cnf(cnf, nested, fun, acc):
+ """
+ Depth-first traversal of a CNF tree.
+
+ :type cnf: cnf list
+ :type nested: bool
+ :type fun: 'a -> bool -> (cnf stuff) -> 'a
+ :type acc: 'a
+ :rtype: 'a
+
+ Executes ``fun`` recursively for each node in the tree. The function
+ receives the accumulator ``acc`` which can be of an arbitrary type as first
+ argument. The second argument is a flag indicating whether the current
+ CNF var is a child (if ``True``) or a parent var. CNF member fields are
+ passed via named optional arguments.
+ """
+ for var in cnf:
+ acc = fun(acc,
+ nested,
+ comment=var.get("comment", None),
+ data=var.get("data", None),
+ instance=var.get("instance", None),
+ number=var.get("number", None),
+ parent=var.get("parent", None),
+ varname=var.get("varname", None))
+ children = var.get("children", None)
+ if children is not None:
+ acc = walk_cnf(children, True, fun, acc)
+ return acc
--- /dev/null
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+"""
+
+summary
+------------------------------------------------------
+Module for one-step dynamic cnfvar generation from default value templates.
+
+.. codeauthor:: Intra2net
+
+
+contents
+-------------------------------------------------------
+These templates contain the bare defaults the UI adds upon
+creation of each major and frequently used cnfvar.
+
+
+interface
+------------------------------------------------------
+
+"""
+
+import time
+import logging
+
+# custom imports
+from .model import Cnf, CnfList
+
+
+log = logging.getLogger('pyi2ncommon.cnfvar.templates')
+
+
+###############################################################################
+# MAJOR CNF DEFAULTS
+###############################################################################
+
+
+#: UI defaults for a user instance
+user_defaults = {
+ "USER_DISABLED": "0",
+ "USER_FULLNAME": "",
+ "USER_GROUPWARE_FOLDER_CALENDAR": "INBOX/Kalender",
+ "USER_GROUPWARE_FOLDER_CONTACTS": "INBOX/Kontakte",
+ "USER_GROUPWARE_FOLDER_DRAFTS": "INBOX/Entwürfe",
+ "USER_GROUPWARE_FOLDER_NOTES": "INBOX/Notizen",
+ "USER_GROUPWARE_FOLDER_OUTBOX": "INBOX/Gesendete Elemente",
+ "USER_GROUPWARE_FOLDER_TASKS": "INBOX/Aufgaben",
+ "USER_GROUPWARE_FOLDER_TRASH": "INBOX/Gelöschte Elemente",
+ # always a member of the 'Alle' group
+ "USER_GROUP_MEMBER_REF": "2",
+ "USER_LOCALE": "",
+ "USER_PASSWORD": "",
+ "USER_TRASH_DELETEDAYS": "30",
+ "USER_WEBMAIL_MESSAGES_PER_PAGE": "25",
+ "USER_WEBMAIL_SIGNATURE": "",
+}
+#: UI defaults for a group instance
+group_defaults = {
+ "GROUP_COMMENT": "",
+ "GROUP_ACCESS_GO_ONLINE_ALLOWED": "1",
+ "GROUP_EMAILFILTER_BAN_FILTERLIST_REF": "-1",
+ "GROUP_EMAIL_RELAY_RIGHTS": "RELAY_FROM_INTRANET",
+ "GROUP_PROXY_PROFILE_REF": "1",
+}
+#: UI defaults for an intraclient instance
+intraclient_defaults = {
+ "INTRACLIENT_COMMENT": "",
+ "INTRACLIENT_DNS_RELAYING_ALLOWED": "1",
+ "INTRACLIENT_EMAIL_RELAYING_ALLOWED": "1",
+ "INTRACLIENT_FIREWALL_RULESET_REF": "5",
+ "INTRACLIENT_IP": "",
+ "INTRACLIENT_MAC": "",
+ "INTRACLIENT_PROXY_PROFILE_REF": "-1",
+}
+#: UI defaults for a NIC instance
+nic_defaults = {
+ "NIC_COMMENT": "",
+ "NIC_DRIVER": "",
+ "NIC_MAC": "",
+ "NIC_TYPE": "UNUSED",
+}
+#: UI defaults for a provider instance
+provider_defaults = {
+ "PROVIDER_PROXY_SERVER": "",
+ "PROVIDER_PROXY_PORT": "",
+ "PROVIDER_PROXY_PASSWORD": "",
+ "PROVIDER_PROXY_LOGIN": "",
+ "PROVIDER_NIC_REF": "1",
+ "PROVIDER_NETMASK": "255.255.0.0",
+ "PROVIDER_MTU_SIZE": "1500",
+ "PROVIDER_MODE": "ROUTER",
+ "PROVIDER_MAILTRANSFER_MODE": "IMMEDIATE",
+ "PROVIDER_LOCALIP": "",
+ "PROVIDER_IP": "",
+ "PROVIDER_FIREWALL_RULESET_REF": "7",
+ "PROVIDER_FALLBACK_TIMEOUT": "60",
+ "PROVIDER_FALLBACK_PROVIDER_REF": "-1",
+ "PROVIDER_EMAIL_RELAY_REF": "-1",
+ "PROVIDER_DYNDNS_WEBCHECKIP": "0",
+ "PROVIDER_DYNDNS_ENABLE": "1",
+ "PROVIDER_DNS_MODE": "ROOT",
+ "PROVIDER_DNS": "",
+ "PROVIDER_BWIDTH_MANAGEMENT_UPSTREAM_SPEED": "",
+ "PROVIDER_BWIDTH_MANAGEMENT_ENABLE": "0",
+ "PROVIDER_BWIDTH_MANAGEMENT_DOWNSTREAM_SPEED": "",
+ "PROVIDER_PINGCHECK_SERVERLIST_REF": "-2",
+}
+#: UI defaults for a port forwarding instance
+port_forwarding_defaults = {
+ "PORT_FORWARDING_DST_IP_REF": "1",
+ "PORT_FORWARDING_DST_PORT": "",
+ "PORT_FORWARDING_DST_PORT_END": "",
+ "PORT_FORWARDING_PROTOCOL_TYPE": "TCP",
+ "PORT_FORWARDING_SRC_PORT": "",
+ "PORT_FORWARDING_SRC_PORT_END": "",
+}
+#: UI defaults for a firewall ruleset instance
+firewall_ruleset_defaults = {
+ "FIREWALL_RULESET_PROFILE_TYPE": "FULL",
+}
+#: UI defaults for a proxy accesslist instance
+proxy_accesslist_defaults = {
+ "PROXY_ACCESSLIST_ENTRY_COUNT": "123",
+ "PROXY_ACCESSLIST_MODE": "1",
+ "PROXY_ACCESSLIST_SIZETYPE": "1",
+ "PROXY_ACCESSLIST_TYPE": "0",
+}
+#: UI defaults for a key instance
+key_own_defaults = {
+ "KEY_OWN_FINGERPRINT_MD5": "",
+ "KEY_OWN_FINGERPRINT_SHA1": "",
+ "KEY_OWN_ID_X509": "CN=net.lan",
+ "KEY_OWN_ISSUER": "CN=, C=, L=, ST=, O=, OU=",
+ "KEY_OWN_KEYSIZE": "2048",
+ "KEY_OWN_HASH_ALGO": "SHA2_256",
+ # TODO: the key own creation is currently too hacky for better sanitized defaults
+ "KEY_OWN_PRIVATE_KEY": "<CREATE_HACK>",
+ # TODO: the key own creation is currently too hacky for better sanitized defaults
+ "KEY_OWN_PUBLIC_KEY": "<CREATE_HACK>",
+ # TODO: the key own creation is currently too hacky for better sanitized defaults
+ "KEY_OWN_REQUEST": "<CREATE_HACK>",
+ "KEY_OWN_SUBJECT": "CN=net.lan",
+ # TODO: the key own creation is currently too hacky for better sanitized defaults
+ "KEY_OWN_VALIDFROM": "00001122T445566",
+ # TODO: the key own creation is currently too hacky for better sanitized defaults
+ "KEY_OWN_VALIDTILL": "99991122T445566",
+ "KEY_OWN_TYPE": "SELF",
+ # the ones bellow should be set when using 'generate' to create the key
+ "KEY_OWN_CREATE_CN": "",
+ "KEY_OWN_CREATE_EMAIL": ""
+}
+#: UI defaults for a VPN connection instance
+vpnconn_defaults = {
+ "VPNCONN_ACTIVATION": "ALWAYS",
+ "VPNCONN_DISABLED": "0",
+ "VPNCONN_DNS_RELAYING_ALLOWED": "1",
+ "VPNCONN_EMAIL_RELAYING_ALLOWED": "1",
+ "VPNCONN_ENCRYPTION_PROFILE_REF": "0",
+ "VPNCONN_FIREWALL_RULESET_REF": "5",
+ "VPNCONN_IKE_VERSION": "1",
+ "VPNCONN_KEY_FOREIGN_REF": "1",
+ "VPNCONN_KEY_OWN_REF": "1",
+ "VPNCONN_KEY_TYPE": "PUBLIC",
+ "VPNCONN_LAN_NAT_IP": "",
+ "VPNCONN_LAN_NAT_MODE": "UNMODIFIED",
+ "VPNCONN_LAN_NAT_NETWORK": "",
+ "VPNCONN_LAN_NIC_REF": "2",
+ "VPNCONN_LAN_NET": "",
+ "VPNCONN_LAN_NETMASK": "255.255.0.0",
+ "VPNCONN_LAN_TYPE": "NIC",
+ "VPNCONN_LIFETIME_IKE": "480",
+ "VPNCONN_LIFETIME_IPSECSA": "60",
+ "VPNCONN_OFFLINE_DETECTION_SEC": "60",
+ "VPNCONN_PEER_DNS": "",
+ "VPNCONN_PEER_IP": "",
+ "VPNCONN_PEER_TYPE": "IP",
+ "VPNCONN_PROXY_PROFILE_REF": "-2",
+ "VPNCONN_PSK": "",
+ "VPNCONN_PSK_FOREIGN_ID": "",
+ "VPNCONN_PSK_FOREIGN_ID_TYPE": "IP",
+ "VPNCONN_PSK_OWN_ID": "",
+ "VPNCONN_PSK_OWN_ID_TYPE": "IP",
+ "VPNCONN_REMOTE_INET_NAT": "1",
+ "VPNCONN_REMOTE_MODECONFIG_IP": "",
+ "VPNCONN_REMOTE_NAT_ENABLE": "0",
+ "VPNCONN_REMOTE_NAT_NETWORK": "",
+ "VPNCONN_REMOTE_NET": "",
+ "VPNCONN_REMOTE_NETMASK": "255.255.0.0",
+ "VPNCONN_REMOTE_TYPE": "CUSTOM",
+ "VPNCONN_RETRIES": "3",
+ "VPNCONN_SECURED": "ESP",
+ "VPNCONN_XAUTH_SERVER_ENABLE": "0"
+}
+
+
+###############################################################################
+# MINOR CONFIGURATION
+###############################################################################
+
+
+def template(name, value, instance=-1, defaults=None, **kwargs):
+ """
+ Generate a template cnf variable from provided defaults.
+
+ :param str name: cnf variable name
+ :param str value: cnf variable data value
+ :param int instance: cnf variable instance number
+ :param defaults: default child variables to populate the cnf variable with
+ :type defaults: {str, str or {}} or None
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+
+ All additional keyword arguments will be used to overwrite the defaults.
+ """
+ log.info(f"Generating a template {name} cnfvar")
+ cnf = Cnf(name, value=value, instance=instance)
+ defaults = {} if defaults is None else defaults
+ cnf.add_children(*[(key, value) for key, value in defaults.items()])
+ for key in kwargs.keys():
+ cnf.children.single_with_name(f"{name}_{key}").value = kwargs[key]
+ return cnf
+
+
+def user(name, instance=-1, **kwargs):
+ """
+ Generate a user cnf variable.
+
+ :param str name: username for the user
+ :param int instance: instance number for the user
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a user {name} cnfvar")
+ user_cnf = template("user", name, instance=instance, defaults=user_defaults, **kwargs)
+ user_cnf.children.single_with_name("user_fullname").value = name.capitalize()
+ return user_cnf
+
+
+def group(name, instance=-1, **kwargs):
+ """
+ Generate a group cnf variable.
+
+ :param str name: name for the group
+ :param int instance: instance number for the group
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a group {name} cnfvar")
+ group_cnf = template("group", name, instance=instance, defaults=group_defaults, **kwargs)
+ return group_cnf
+
+
+def nic(name, instance=-1, **kwargs):
+ """
+ Generate a nic cnf variable.
+
+ :param str name: tag or comment for the nic describing its use
+ :param int instance: instance number for the nic
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a nic cnfvar")
+ nic_cnf = template("nic", "", instance=instance, defaults=nic_defaults, **kwargs)
+ nic_cnf.children.single_with_name("nic_comment").value = name
+ if nic_cnf.children.single_with_name("nic_type").value in ["NATLAN", "PUBLICLAN", "PROXYARP"]:
+ nic_cnf.add_child("nic_lan_ip", "192.168.1.1")
+ nic_cnf.add_child("nic_lan_netmask", "255.255.255.0")
+ nic_cnf.add_child("nic_lan_dns_relaying_allowed", "0")
+ nic_cnf.add_child("nic_lan_email_relaying_allowed", "0")
+ nic_cnf.add_child("nic_lan_nat_into", "0")
+ nic_cnf.add_child("nic_lan_proxy_profile_ref", "-1")
+ nic_cnf.add_child("nic_lan_firewall_ruleset_ref", "1")
+ return nic_cnf
+
+
+def intraclient(name, instance=-1, **kwargs):
+ """
+ Generate an intraclient cnf variable.
+
+ :param str name: name for the intraclient
+ :param int instance: instance number for the intraclient
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating an intraclient {name} cnfvar")
+ intraclient_cnf = template("intraclient", name, instance=instance,
+ defaults=intraclient_defaults, **kwargs)
+ return intraclient_cnf
+
+
+def provider(name, instance=-1, **kwargs):
+ """
+ Generate a provider cnf variable.
+
+ :param str name: name for the provider
+ :param int instance: instance number for the provider
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a provider {name} cnfvar")
+ provider_cnf = template("provider", name, instance=instance,
+ defaults=provider_defaults, **kwargs)
+ # the validation of the LOCALIP will not be ignored despite choosing a different mode
+ if provider_cnf.children.single_with_name("provider_mode").value not in ["ROUTER", "GWINLAN"]:
+ provider_cnf.children.remove_where(lambda c: c.name == "provider_ip")
+ provider_cnf.children.remove_where(lambda c: c.name == "provider_netmask")
+ if provider_cnf.children.single_with_name("provider_mode").value != "ROUTER":
+ provider_cnf.children.remove_where(lambda c: c.name == "provider_localip")
+ if provider_cnf.children.single_with_name("provider_dns_mode").value != "IP":
+ provider_cnf.children.remove_where(lambda c: c.name == "provider_dns")
+ return provider_cnf
+
+
+def port_forwarding(name, instance=-1, **kwargs):
+ """
+ Generate a port forwarding cnf variable.
+
+ :param str name: name for the port forwarding mapping
+ :param int instance: instance number for the port forwarding mapping
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a port forwarding {name} cnfvar")
+ port_forwarding_cnf = template("port_forwarding", name, instance=instance,
+ defaults=port_forwarding_defaults, **kwargs)
+ if port_forwarding_cnf.children.single_with_name("port_forwarding_protocol_type").value == "OTHER":
+ port_forwarding_cnf.children.remove_where(lambda c: c.name == "port_forwarding_src_port")
+ port_forwarding_cnf.children.remove_where(lambda c: c.name == "port_forwarding_dst_port")
+ port_forwarding_cnf.children.remove_where(lambda c: c.name == "port_forwarding_src_port_end")
+ port_forwarding_cnf.children.remove_where(lambda c: c.name == "port_forwarding_dst_port_end")
+ port_forwarding_cnf.add_child("port_forwarding_protocol_num", "47")
+ return port_forwarding_cnf
+
+
+def firewall_ruleset(name, instance=-1, **kwargs):
+ """
+ Generate a firewall ruleset cnf variable.
+
+ :param str name: name for the firewall ruleset
+ :param int instance: instance number for the firewall ruleset
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a firewall ruleset {name} cnfvar")
+ firewall_ruleset_cnf = template("firewall_ruleset", name, instance=instance,
+ defaults=firewall_ruleset_defaults, **kwargs)
+ return firewall_ruleset_cnf
+
+
+def proxy_accesslist(name, instance=-1, **kwargs):
+ """
+ Generate a proxy accesslist cnf variable.
+
+ :param str name: name for the proxy accesslist
+ :param int instance: instance number for the proxy accesslist
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a proxy accesslist {name} cnfvar")
+ proxy_accesslist_cnf = template("proxy_accesslist", name, instance=instance,
+ defaults=proxy_accesslist_defaults, **kwargs)
+ return proxy_accesslist_cnf
+
+
+def key_own(name, instance=-1, **kwargs):
+ """
+ Generate an own key cnf variable.
+
+ :param str name: name for the own key
+ :param int instance: instance number for the own key
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating an own key {name} cnfvar")
+ key_own_cnf = template("key_own", name, instance=instance,
+ defaults=key_own_defaults, **kwargs)
+ return key_own_cnf
+
+
+def vpnconn(name, instance=-1, **kwargs):
+ """
+ Generate a vpn connection cnf variable.
+
+ :param str name: name for the vpn connection
+ :param int instance: instance number for the vpn connection
+ :returns: generated cnf variable
+ :rtype: :py:class:`Cnf`
+ """
+ log.info(f"Generating a vpn connection {name} cnfvar")
+ vpnconn_cnf = template("vpnconn", name, instance=instance,
+ defaults=vpnconn_defaults, **kwargs)
+ if vpnconn_cnf.children.single_with_name("vpnconn_lan_type").value not in ["NIC", "CUSTOM"]:
+ vpnconn_cnf.children.remove_where(lambda c: c.name == "vpnconn_lan_net")
+ if vpnconn_cnf.children.single_with_name("vpnconn_remote_type").value != "CUSTOM":
+ vpnconn_cnf.children.remove_where(lambda c: c.name == "vpnconn_remote_net")
+ if vpnconn_cnf.children.single_with_name("vpnconn_remote_type").value != "MODECONFIG":
+ vpnconn_cnf.children.remove_where(lambda c: c.name == "vpnconn_remote_modeconfig_ip")
+ return vpnconn_cnf
+++ /dev/null
-#!/usr/bin/env python
-#
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-
-summary
--------------------------------------------------------------------------------
-Represent CNF_VARs as recursive structures.
-
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-
-Copyright: 2014-2017 Intra2net AG
-License: GPLv2+
-
-
-contents
--------------------------------------------------------------------------------
-
-This module provides read and write functionality for the Intra2net *CNF*
-format. Two different syntaxes are available: classical *CNF* and a JSON
-representation. Both versions are commonly understood by Intra2net software.
-
-On the command line, raw CNF is accepted if the option ``-`` is given: ::
-
- $ get_cnf routing 2 |python3 cnfvar.py - <<ENOUGH
- 1 ROUTING,2: "192.168.55.0"
- 2 (1) ROUTING_COMMENT,0: ""
- 3 (1) ROUTING_DNS_RELAYING_ALLOWED,0: "1"
- 4 (1) ROUTING_EMAIL_RELAYING_ALLOWED,0: "1"
- 5 (1) ROUTING_FIREWALL_RULESET_REF,0: "9"
- 6 (1) ROUTING_GATEWAY,0: "10.0.254.1"
- 7 (1) ROUTING_NAT_INTO,0: "0"
- 8 (1) ROUTING_NETMASK,0: "255.255.255.0"
- 9 (1) ROUTING_PROXY_PROFILE_REF,0: "2"
- ENOUGH
-
- 1 ROUTING,2: "192.168.55.0"
- 2 (1) ROUTING_COMMENT,0: ""
- 3 (1) ROUTING_DNS_RELAYING_ALLOWED,0: "1"
- 4 (1) ROUTING_EMAIL_RELAYING_ALLOWED,0: "1"
- 5 (1) ROUTING_FIREWALL_RULESET_REF,0: "9"
- 6 (1) ROUTING_GATEWAY,0: "10.0.254.1"
- 7 (1) ROUTING_NAT_INTO,0: "0"
- 8 (1) ROUTING_NETMASK,0: "255.255.255.0"
- 9 (1) ROUTING_PROXY_PROFILE_REF,0: "2"
-
-The input takes one round-trip through the parsers and will error out on
-problematic lines. Thus, ``cnfvar.py`` can be used to syntax-check CNF data.
-
-Note that line numbers may be arbitrarily reassigned in the process. Of course,
-parent references and the relative ordering of lines will be preserved in this
-case.
-
-.. todo::
- Decide on some facility for automatic fixup of line number values. The
- internal representation is recursive so line numbers are not needed to
- establish a variable hierarchy. They might as well be omitted from the json
- input and only added when writing cnf. For the time being a lack of the
- "number" field is interpreted as an error. Though it should at least
- optionally be possible to omit the numbers entirely and have a function add
- them after the fact. This might as well be added to :py:func:`is_cnf`
- though it would be counterintuitive to have a predicate mutate its
- argument. So maybe it could return a second argument to indicate a valid
- structure that needs fixup or something like that.
-
-.. note::
- The variable values of get_cnf seems to be encoded in latin1, and set_cnf
- seems to assume latin1-encoded values (not var names). Function
- :py:func:`read_cnf` converts this to unicode and functions
- :py:func:`dump_cnf_string`, :py:func:`print_cnf`, and :py:func:`write_cnf`
- convert unicode back to latin1.
-
-
-notes on Python 3 conversion
--------------------------------------------------------------------------------
-
-Since the original *CNF* format assumes latin-1 encoded data pretty much
-exclusively, we preserve the original encoding while parsing the file.
-When assembling the data structures returned to the user, values are then
-converted to strings so they can be used naturally at the Python end.
-
-implementation
--------------------------------------------------------------------------------
-"""
-
-import functools
-import sys
-import json
-import re
-import io
-
-
-###############################################################################
-# CONSTANTS
-###############################################################################
-
-
-CNF_FIELD_MANDATORY = set ([ "varname", "data", "instance" ])
-CNF_FIELD_OPTIONAL = set ([ "parent", "children", "comment", "number" ])
-CNF_FIELD_KNOWN = CNF_FIELD_MANDATORY | CNF_FIELD_OPTIONAL
-
-grab_parent_pattern = re.compile(b"""
- ^ # match from start
- \s* # optional spaces
- \d+ # line number
- \s+ # spaces
- \((\d+)\) # parent
- """,
- re.VERBOSE)
-
-base_line_pattern = re.compile(b"""
- ^ # match from start
- \s* # optional spaces
- (\d+) # line number
- \s+ # spaces
- ([A-Z][A-Z0-9_]*) # varname
- \s* # optional spaces
- , # delimiter
- \s* # optional spaces
- (-1|\d+) # instance
- \s* # optional spaces
- : # delimiter
- \s* # optional spaces
- \"( # quoted string (data)
- (?: \\\" # (of escaped dquote
- |[^\"])* # or anything not a
- )\" # literal quote)
- \s* # optional spaces
- ( # bgroup
- \# # comment leader
- \s* # optional spaces
- .* # string (comment)
- )? # egroup, optional
- $ # eol
- """,
- re.VERBOSE)
-
-child_line_pattern = re.compile(b"""
- ^ # match from start
- \s* # optional spaces
- (\d+) # line number
- \s+ # spaces
- \((\d+)\) # parent
- \s+ # spaces
- ([A-Z][A-Z0-9_]*) # varname
- \s* # optional spaces
- , # delimiter
- \s* # optional spaces
- (-1|\d+) # instance
- \s* # optional spaces
- : # delimiter
- \s* # optional spaces
- \"([^\"]*)\" # quoted string (data)
- \s* # optional spaces
- ( # bgroup
- \# # comment leader
- \s* # optional spaces
- .* # string (comment)
- )? # egroup, optional
- $ # eol
- """,
- re.VERBOSE)
-
-
-###############################################################################
-# HELPERS
-###############################################################################
-
-
-#
-# Sadly, the Intranator is still stuck with one leg in the 90s.
-#
-def to_latin1(s):
- """Take given unicode str and convert it to a latin1-encoded `bytes`."""
- return s.encode("latin-1")
-
-
-def from_latin1(s):
- """Take given latin1-encoded `bytes` value and convert it to `str`."""
- return s.decode("latin-1")
-
-
-#
-# Conversion functions
-#
-
-def marshal_in_number(number):
- return int(number)
-
-
-def marshal_in_parent(parent):
- return int(parent)
-
-
-def marshal_in_instance(instance):
- return int(instance)
-
-
-def marshal_in_varname(varname):
- return from_latin1(varname).lower()
-
-
-def marshal_in_data(data):
- return from_latin1(data) if data is not None else ""
-
-
-def marshal_in_comment(comment):
- return comment and from_latin1(comment[1:].strip()) or None
-
-
-#
-# Type checking
-#
-
-def is_string(s):
- return isinstance(s, str)
-
-
-###############################################################################
-# EXCEPTIONS
-###############################################################################
-
-
-class InvalidCNF(Exception):
-
- def __init__(self, msg):
- self.msg = msg
-
- def __str__(self):
- return "Malformed CNF_VAR: \"%s\"" % self.msg
-
-
-class MalformedCNF(Exception):
-
- def __init__(self, msg):
- self.msg = msg
-
- def __str__(self):
- return "Malformed CNF file: \"%s\"" % self.msg
-
-
-###############################################################################
-# VALIDATION
-###############################################################################
-
-
-def is_valid(acc,
- nested,
- comment,
- data,
- instance,
- number,
- parent,
- varname):
- if varname is None:
- raise InvalidCNF("CNF_VAR lacks a name.")
- elif not is_string(varname):
- raise InvalidCNF("Varname field of CNF_VAR \"%s\" is not a string."
- % varname)
- elif varname == "":
- raise InvalidCNF("Varname field of CNF_VAR is the empty string.")
-
- if comment is not None:
- if not is_string(comment):
- raise InvalidCNF("Comment field of CNF_VAR \"%s\" is not a string."
- % varname)
-
- if data is None:
- raise InvalidCNF("Data field of CNF_VAR \"%s\" is empty."
- % varname)
- elif not is_string(data):
- raise InvalidCNF("Data field of CNF_VAR \"%s\" is not a string."
- % varname)
-
- if instance is None:
- raise InvalidCNF("Instance field of CNF_VAR \"%s\" is empty."
- % varname)
- elif not isinstance(instance, int):
- raise InvalidCNF("Instance field of CNF_VAR \"%s\" is not an integer."
- % varname)
-
- if number is None:
- raise InvalidCNF("Number field of CNF_VAR \"%s\" is empty."
- % varname)
- elif not isinstance(number, int):
- raise InvalidCNF("Number field of CNF_VAR \"%s\" is not an integer."
- % varname)
- elif number < 1:
- raise InvalidCNF("Number field of CNF_VAR \"%s\" must be positive, not %d."
- % (varname, number))
- else:
- other = acc.get(number, None)
- if other is not None: # already in use
- raise InvalidCNF("Number field of CNF_VAR \"%s\" already used by variable %s."
- % (varname, other))
- acc[number] = varname
-
- if nested is True:
- if parent is None:
- raise InvalidCNF("Parent field of nested CNF_VAR \"%s\" is empty."
- % varname)
- elif not isinstance(parent, int):
- raise InvalidCNF("Parent field of CNF_VAR \"%s\" is not an integer."
- % varname)
- else:
- if parent is not None:
- raise InvalidCNF("Flat CNF_VAR \"%s\" has nonsensical parent field \"%s\"."
- % (varname, parent))
- return acc
-
-
-def is_cnf(root):
- """
- is_cnf -- Predicate testing "CNF_VAR-ness". Folds the :py:func:`is_valid`
- predicate over the argument which can be either a well-formed CNF
- dictionary or a list of CNF_VARs.
-
- :type root: cnfvar or cnf list
- :rtype: bool
-
- Not that if it returns at all, ``is_cnf()`` returns ``True``. Any non
- well-formed member of the argument will cause the predicate to bail out
- with an exception during traversal.
- """
- cnf = cnf_root(root)
- if cnf is None:
- raise InvalidCNF(root)
- return walk_cnf(cnf, False, is_valid, {}) is not None
-
-
-def is_cnf_var(obj):
- """
- Check whether a dictionary is a valid CNF.
-
- :param dict obj: dictionary to check
- :returns: True if the dictionary has all the mandatory fields and no
- unknown fields, False otherwise
- :rtype: bool
- """
- assert isinstance (obj, dict)
-
- for f in CNF_FIELD_MANDATORY:
- if obj.get(f, None) is None:
- return False
-
- for f in obj:
- if f not in CNF_FIELD_KNOWN:
- return False
-
- return True
-
-
-###############################################################################
-# DESERIALIZATION
-###############################################################################
-
-
-#
-# JSON reader for get_cnf -j (the easy part)
-#
-
-def make_varname_lowercase(cnfvar):
- """
- Custom hook for json decoder: convert variable name to lowercase.
-
- Since variable names are case insensitive, :py:func:`read_cnf` converts
- them all to lower case. Downstream users of :py:func:`read_cnf_json` (e.g.
- :py:class:`simple_cnf.SimpleCnf`) rely on lowercase variable names.
-
- :param dict cnfvar: JSON "object" converted into a dict
- :returns: same as input but if field `varname` is present, its value is
- converted to lower case
- :rtype: dict with str keys
- """
- try:
- cnfvar['varname'] = cnfvar['varname'].lower()
- except KeyError: # there is no "varname" field
- pass
- except AttributeError: # cnfvar['varname'] is not a string
- pass
- return cnfvar
-
-
-def read_cnf_json(cnfdata):
- """
- Read json data from cnf data bytes.
-
- :param bytes cnfdata: config data
- :return: the parsed json data
- :rtype: str
-
- .. note:: The JSON module does not decode data for all versions
- of Python 3 so we handle the decoding ourselves.
- """
- if isinstance (cnfdata, bytes) is True:
- cnfdata = from_latin1 (cnfdata)
- cnf_json = json.loads(cnfdata, object_hook=make_varname_lowercase)
- if is_cnf(cnf_json) is False:
- raise TypeError("Invalid CNF_VAR.")
- return cnf_json
-
-
-#
-# CNF reader (the moderately more complicated part)
-#
-# Parsing usually starts from the `read_cnf`, which accepts a string containing
-# the variables to parse in the same structure as returned by `get_cnf`.
-#
-# In the `prepare` function the string is split into lines, and a 3-element
-# tuple is built. The first (named `current`) and second (named `next`)
-# elements of this tuple are respectively the first and second non-empty lines
-# of the input, while the third is a list of the remaining lines. This tuple is
-# named `state` in the implementation below, and it is passed around during
-# parsing. The `get` and `peek` functions are used to easily retrieve the
-# `current` and `next` items from the "state".
-#
-# When we "advance" the state, we actually drop the "current" element,
-# replacing it with the "next", while a new "next" is popped from the list of
-# remaining lines. Parsing is done this way because we need to look ahead at
-# the next line -- if it is a child it needs to be appended to the `children`
-# property of the current line.
-#
-# Regular expressions are used to extract important information from the CNF
-# lines. Finally, once parsing is completed, a dictionary is returned. The dict
-# has the same structure as the serialized JSON output returned by
-# `get_cnf -j`.
-#
-
-
-def read_cnf(data):
- """
- Read cnf data from data bytes.
-
- :param data: raw data
- :type data: str or bytes
- :return: the parsed cnf data
- :rtype: {str, {str, str or int}}
- """
- if isinstance(data, str):
- data = to_latin1(data)
- state = prepare(data)
- if state is None:
- raise InvalidCNF("Empty input string.")
-
- cnf = parse_cnf_root(state)
- if is_cnf(cnf) is False:
- raise TypeError("Invalid CNF_VAR.")
- return {"cnf": cnf}
-
-
-def prepare(raw):
- """
- Build 3-element iterable from a CNF string dump.
-
- :param raw: string content as returned by `get_cnf`
- :type raw: bytes
- :returns: 3-element tuple, where the first two elements are the first two
- lines of the output and the third is a list containing the rest
- of the lines in reverse.
- :rtype: (str * str option * str list) option
- """
- lines = raw.splitlines()
- lines.reverse()
- try:
- first = lines.pop()
- except IndexError:
- return None
-
- try:
- second = lines.pop()
- except IndexError:
- second = None
-
- first = first.strip()
- if first == b"":
- return advance((first, second, lines))
-
- return (first, second, lines)
-
-
-def advance(cns):
- """
- Pop the next line from the stream, advancing the tuple.
-
- :param cns: a 3-element tuple containing two CNF lines and a list of the
- remaining lines
- :type cnd: (str, str, [str])
- :returns: a new tuple with a new item popped from the list of lines
- :rtype cnd: (str, str, [str])
- """
- current, next, stream = cns
- if next is None: # reached end of stream
- return None
- current = next
-
- try:
- next = stream.pop()
- next = next.strip()
- except IndexError:
- next = None
-
- if current == "": # skip blank lines
- return advance((current, next, stream))
- return (current, next, stream)
-
-
-def get(cns):
- """
- Get the current line from the state without advancing it.
-
- :param cns: a 3-element tuple containing two CNF lines and a list of the
- remaining lines
- :type cnd: (str, str, [str])
- :returns: the CNF line stored as `current`
- :rtype: str
- """
- current, _, _ = cns
- return current
-
-
-def peek(cns):
- """
- Get the next line from the state without advancing it.
-
- :param cns: a 3-element tuple containing two CNF lines and a list of the
- remaining lines
- :type cnd: (str, str, [str])
- :returns: the CNF line stored as `next`
- :rtype: str
- """
- _, next, _ = cns
- return next
-
-
-def parse_cnf_root(state):
- """
- Iterate over and parse a list of CNF lines.
-
- :param state: a 3-element tuple containing two lines and a list of the
- remaining lines
- :type state: (str, str, [str])
- :returns: a list of parsed CNF variables
- :rtype: [dict]
-
- The function will parse the first element from the `state` tuple, then read
- the next line to see if it is a child variable. If it is, it will be
- appended to the last parsed CNF, otherwise top-level parsing is done
- normally.
- """
- lines = []
- current = get(state)
- while state:
- cnf_line = read_base_line(current)
- if cnf_line is not None:
- lines.append(cnf_line)
- state = advance(state)
- if state is None: # -> nothing left to do
- break
- current = get(state)
- parent = get_parent(current) # peek at next line
- if parent is not None: # -> recurse into children
- (state, children, _parent) = parse_cnf_children(state, parent)
- cnf_line["children"] = children
- if state is None:
- break
- current = get(state)
- else:
- state = advance(state)
- if state is None:
- break
- current = get(state)
- return lines
-
-
-def parse_cnf_children(state, parent):
- """
- Read and parse child CNFs of a given parent until there is none left.
-
- :param state: a 3-element tuple containing two lines and a list of the
- remaining lines
- :type state: (str, str, [str])
- :param parent: id of the parent whose children we are looking for
- :type parent: int
- :returns: a 3-element tuple with the current state, a list of children of
- the given parent and the parent ID
- :rtype: (tuple, [str], int)
-
- The function will recursively parse child lines from the `state` tuple
- until one of these conditions is satisfied:
-
- 1. the input is exhausted
- 2. the next CNF line
- 2.1. is a toplevel line
- 2.2. is a child line whose parent has a lower parent number
-
- Conceptually, 2.1 is a very similar to 2.2 but due to the special status of
- toplevel lines in CNF we need to handle them separately.
-
- Note that since nesting of CNF vars is achieved via parent line numbers,
- lines with different parents could appear out of order. libcnffile will
- happily parse those and still assign children to the specified parent:
-
- ::
- # set_cnf <<THATSALL
- 1 USER,1337: "l33t_h4x0r"
- 2 (1) USER_GROUP_MEMBER_REF,0: "2"
- 4 USER,1701: "picard"
- 5 (4) USER_GROUP_MEMBER_REF,0: "2"
- 6 (4) USER_PASSWORD,0: "engage"
- 3 (1) USER_PASSWORD,0: "hacktheplanet"
- THATSALL
- # get_cnf user 1337
- 1 USER,1337: "l33t_h4x0r"
- 2 (1) USER_GROUP_MEMBER_REF,0: "2"
- 3 (1) USER_PASSWORD,0: "hacktheplanet"
- # get_cnf user 1701
- 1 USER,1701: "picard"
- 2 (1) USER_GROUP_MEMBER_REF,0: "2"
- 3 (1) USER_PASSWORD,0: "engage"
-
- It is a limitation of ``cnfvar.py`` that it cannot parse CNF data
- structured like the above example: child lists are only populated from
- subsequent CNF vars using the parent number solely to track nesting levels.
- The parser does not keep track of line numbers while traversing the input
- so it doesn’t support retroactively assigning a child to anything else but
- the immediate parent.
- """
- lines = []
- current = get(state)
- while True:
- cnf_line = read_child_line(current)
- if cnf_line is not None:
- lines.append(cnf_line)
- state = advance(state)
- if state is None:
- break
- current = get(state)
- new_parent = get_parent(current)
- if new_parent is None:
- # drop stack
- return (state, lines, None)
- if new_parent > parent:
- # parent is further down in hierarchy -> new level
- (state, children, new_parent) = \
- parse_cnf_children (state, new_parent)
- if state is None:
- break
- cnf_line["children"] = children
- current = get(state)
- new_parent = get_parent(current)
- if new_parent is None:
- # drop stack
- return (state, lines, None)
- if new_parent < parent:
- # parent is further up in hierarchy -> pop level
- return (state, lines, new_parent)
- # new_parent == parent -> continue parsing on same level
- return (state, lines, parent)
-
-
-def get_parent(line):
- """
- Extract the ID of the parent for a given CNF line.
-
- :param str line: CNF line
- :returns: parent ID or None if no parent is found
- :rtype: int or None
- """
- match = re.match(grab_parent_pattern, line)
- if match is None: # -> no parent
- return None
- return int(match.groups()[0])
-
-
-def read_base_line(line):
- """
- Turn one top-level CNF line into a dictionary.
-
- :param str line: CNF line
- :rtype: {str: Any}
-
- This performs the necessary decoding on values to obtain proper Python
- strings from 8-bit encoded CNF data.
-
- The function only operates on individual lines. Argument strings that
- contain data for multiple lines – this includes child lines of the current
- CNF var! – will trigger a parsing exception.
- """
- if len(line.strip()) == 0:
- return None # ignore empty lines
- if line[0] == b"#":
- return None # ignore comments
-
- match = re.match(base_line_pattern, line)
- if match is None:
- raise MalformedCNF("Syntax error in line \"\"\"%s\"\"\"" % line)
- number, varname, instance, data, comment = match.groups()
- return {
- "number" : marshal_in_number (number),
- "varname" : marshal_in_varname (varname),
- "instance" : marshal_in_instance (instance),
- "data" : marshal_in_data (data),
- "comment" : marshal_in_comment (comment),
- }
-
-
-def read_child_line(line):
- """
- Turn one child CNF line into a dictionary.
-
- :param str line: CNF line
- :rtype: {str: Any}
-
- This function only operates on individual lines. If the argument string is
- syntactically valid but contains input representing multiple CNF vars, a
- parse error will be thrown.
- """
- if len(line.strip()) == 0:
- return None # ignore empty lines
- if line[0] == "#":
- return None # ignore comments
-
- match = re.match(child_line_pattern, line)
- if match is None:
- raise MalformedCNF("Syntax error in child line \"\"\"%s\"\"\""
- % from_latin1 (line))
- number, parent, varname, instance, data, comment = match.groups()
- return {
- "number" : marshal_in_number (number),
- "parent" : marshal_in_parent (parent),
- "varname" : marshal_in_varname (varname),
- "instance" : marshal_in_instance (instance),
- "data" : marshal_in_data (data),
- "comment" : marshal_in_comment (comment),
- }
-
-
-###############################################################################
-# SERIALIZATION
-###############################################################################
-
-
-cnf_line_nest_indent = " "
-cnf_line_base_fmt = "%d %s,%d: \"%s\""
-cnf_line_child_fmt = "%d %s(%d) %s,%d: \"%s\""
-
-
-def format_cnf_vars(da, var):
- """
- Return a list of formatted cnf_line byte strings.
-
- :param da: a tuple where the first element is the depth (0 = top-level,
- >1 = child CNF) and the second is the string being built.
- :type da: (int, str)
- :param var: the CNF element to convert to string in the current iteration
- :type var: dict
- :returns: a tuple like `da`, where the second element should contain all
- converted CNFs
- :rtype: (int, str)
-
- This function is meant to be passed to the :py:func:`functools.reduce`
- function.
-
- The variable names are uppercased unconditionally because while ``get_cnf``
- is case-indifferent for variable names, ``set_cnf`` isn’t.
- """
- depth, acc = da
- line = None
- if depth > 0:
- line = cnf_line_child_fmt \
- % (var["number"],
- cnf_line_nest_indent * depth,
- var["parent"],
- var["varname"].upper(),
- var["instance"],
- var["data"])
- else:
- line = cnf_line_base_fmt \
- % (var["number"],
- var["varname"].upper(),
- var["instance"],
- var["data"])
-
- comment = var.get("comment", None)
- if comment and len(comment) != 0:
- line = line + (" # %s" % comment)
-
- acc.append(to_latin1(line))
-
- children = var.get("children", None)
- if children is not None:
- (_, acc) = functools.reduce(format_cnf_vars, children, (depth + 1, acc))
-
- return (depth, acc)
-
-
-def cnf_root(root):
- """
- Extract a list of CNFs from a given structure.
-
- :param root: list of CNFs or a CNF dictionary
- :type root: [dict] or dict
- :raises: :py:class:`TypeError` if no CNFs can be extracted
- :returns: list with one or more CNF objects
- :rtype: [dict]
-
- Output varies depending on a few conditions:
- - If `root` is a list, return it right away
- - If `root` is a dict corresponding to a valid CNF value, return it wrapped
- in a list
- - If `root` is a dict with a `cnf` key containg a list (as the JSON
- returned by `get_cnf -j`), return the value
- - Otherwise, raise an error
- """
- if isinstance(root, list):
- return root
- if not isinstance(root, dict):
- raise TypeError(
- "Expected dictionary of CNF_VARs, got %s." % type(root))
- if is_cnf_var(root):
- return [root]
- cnf = root.get("cnf", None)
- if not isinstance(cnf, list):
- raise TypeError("Expected list of CNF_VARs, got %s." % type(cnf))
- return cnf
-
-
-def normalize_cnf(cnf):
- """
- Ensure the output conforms to set_cnf()’s expectations.
-
- :param cnf: list of CNF objects to normalize
- :type cnf: [dict]
- :returns: normalized list
- :rtype: [list]
- """
- if isinstance(cnf, list) is False:
- raise MalformedCNF("expected list of CNF_VARs, got [%s]" % type(cnf))
- def norm(var):
- vvar = \
- { "number" : var ["number"]
- , "varname" : var ["varname"].upper()
- , "instance" : var ["instance"]
- , "data" : var ["data"]
- }
-
- children = var.get("children", None)
- if children is not None:
- vvar ["children"] = normalize_cnf(children)
-
- parent = var.get("parent", None)
- if parent is not None:
- vvar ["parent"] = var["parent"]
-
- comment = var.get("comment", None)
- if comment is not None:
- vvar ["comment"] = var["comment"]
-
- return vvar
-
- return [norm(var) for var in cnf]
-
-
-###############################################################################
-# TRAVERSAL
-###############################################################################
-
-
-def walk_cnf(cnf, nested, fun, acc):
- """
- Depth-first traversal of a CNF tree.
-
- :type cnf: cnf list
- :type nested: bool
- :type fun: 'a -> bool -> (cnf stuff) -> 'a
- :type acc: 'a
- :rtype: 'a
-
- Executes ``fun`` recursively for each node in the tree. The function
- receives the accumulator ``acc`` which can be of an arbitrary type as first
- argument. The second argument is a flag indicating whether the current
- CNF var is a child (if ``True``) or a parent var. CNF member fields are
- passed via named optional arguments.
- """
- for var in cnf:
- acc = fun(acc,
- nested,
- comment=var.get("comment", None),
- data=var.get("data", None),
- instance=var.get("instance", None),
- number=var.get("number", None),
- parent=var.get("parent", None),
- varname=var.get("varname", None))
- children = var.get("children", None)
- if children is not None:
- acc = walk_cnf(children, True, fun, acc)
- return acc
-
-
-def renumber_vars(root, parent=None, toplevel=False):
- """
- Number cnfvars linearly.
-
- If *parent* is specified, numbering will start at this offset. Also, the
- VAR *root* will be assigned this number as a parent lineno unless
- *toplevel* is set (the root var in a CNF tree obviously can’t have a
- parent).
-
- The *toplevel* parameter is useful when renumbering an existing variable
- starting at a given offset without at the same time having that offset
- assigned as a parent.
- """
- root = cnf_root (root)
- i = parent or 0
- for var in root:
- i += 1
- var["number"] = i
- if toplevel is False and parent is not None:
- var["parent"] = parent
- children = var.get("children", None)
- if children is not None:
- i = renumber_vars(children, parent=i, toplevel=False)
- return i
-
-
-def count_vars(root):
- """
- Traverse the cnf structure recursively, counting VAR objects (CNF lines).
- """
- cnf = cnf_root(root)
- if cnf is None:
- raise InvalidCNF(root)
- return walk_cnf(cnf, True, lambda n, _nested, **_kwa: n + 1, 0)
-
-#
-# querying
-#
-
-
-def get_vars(cnf, data=None, instance=None):
- """
- get_vars -- Query a CNF_VAR structure. Skims the *toplevel* of the CNF_VAR
- structure for entries with a matching `data` or `instance` field.
-
- :type cnf: CNF_VAR
- :type data: str
- :type instance: int
- :rtype: CNF_VAR
- :returns: The structure containing only references to the
- matching variables. Containing an empty list of
- variables in case there is no match.
-
- Values are compared literally. If both ``instance`` and ``data`` are
- specified, vars will be compared against both.
- """
- cnf = cnf["cnf"]
- if cnf:
- criterion = lambda _var: False
- if data:
- if instance:
- criterion = lambda var: var[
- "data"] == data and var["instance"] == instance
- else:
- criterion = lambda var: var["data"] == data
- elif instance:
- criterion = lambda var: var["instance"] == instance
-
- return {"cnf": [var for var in cnf if criterion(var) is True]}
-
- return {"cnf": []}
-
-
-###############################################################################
-# PRINTING/DUMPING
-###############################################################################
-
-
-#
-# Print/dump raw CNF values
-#
-
-def output_cnf(root, out, renumber=False):
- """
- Dump a textual representation of given CNF VAR structure to given stream.
-
- Runs :py:func:`format_cnf_vars` on the input (`root`) and then writes that
- to the given file-like object (out).
-
- :param root: a CNF_VAR structure
- :type root: dict or list or anything that :py:func:`cnf_root` accepts
- :param out: file-like object or something with a `write(str)` function
- :param bool renumber: Whether to renumber cnfvars first
-
- Files are converted to the 8-bit format expected by CNF so they can be fed
- directly into libcnffile.
- """
- cnf = cnf_root(root)
- if renumber is True:
- _count = renumber_vars(root)
- if is_cnf(cnf) is True:
- (_, lines) = functools.reduce(format_cnf_vars, cnf, (0, []))
- if isinstance(out, (io.RawIOBase, io.BufferedIOBase)):
- out.write (b"\n".join (lines))
- out.write (b"\n")
- else: # either subclass of io.TextIOBase or unknown
- out.write ("\n".join (map (from_latin1, lines)))
- out.write ("\n")
-
-
-def dump_cnf_bytes (root, renumber=False):
- """
- Serialize CNF var structure, returning the result as a byte sequence.
- """
- cnf = cnf_root(root)
- out = io.BytesIO()
- output_cnf(root, out, renumber=renumber)
- res = out.getvalue()
- out.close()
- return res
-
-
-def dump_cnf_string(root, renumber=False):
- """
- Serialize CNF var structure, returning a latin1-encode byte string.
-
- .. todo::this is identical to :py:func:`dump_cnf_bytes`!
- """
- cnf = cnf_root(root)
- out = io.BytesIO()
- output_cnf(root, out, renumber=renumber)
- res = out.getvalue()
- out.close()
- return res
-
-
-def print_cnf(root, out=None, renumber=False):
- """
- Print given CNF_VAR structure to stdout (or other file-like object).
-
- Note that per default the config is printed to sys.stdout using the shell's
- preferred encoding. If the shell cannot handle unicode this might raise
- `UnicodeError`.
-
- All params forwarded to :py:func:`output_cnf`. See args there.
- """
- if root is not None:
- output_cnf(root, out or sys.stdout, renumber=renumber)
-
-
-def write_cnf(*argv, **kw_argv):
- """Alias for :py:func:`print_cnf`."""
- print_cnf(*argv, **kw_argv)
-
-
-def print_cnf_raw(root, out=None):
- """`if root is not None: out.write(root)`."""
- if root is not None:
- out.write(root)
-
-
-def write_cnf_raw(*argv, **kw_argv):
- """Alias for :py:func:`print_cnf_raw`."""
- print_cnf_raw(*argv, **kw_argv)
-
-
-#
-# Print/dump CNF values in JSON format
-#
-
-
-def output_json(root, out, renumber=False):
- """
- Dump CNF_VAR structure to file-like object in json format.
-
- :param root: CNF_VAR structure
- :type root: dict or list or anything that :py:func:`cnf_root` accepts
- :param out: file-like object, used as argument to :py:func:`json.dumps` so
- probably has to accept `str` (as opposed to `bytes`).
- :param bool renumber: whether to renumber variables before dupming.
- """
- # if not isinstance(out, file):
- #raise TypeError("%s (%s) is not a stream." % (out, type(out)))
- if renumber is True:
- _count = renumber_vars(root)
- if is_cnf(root) is True:
- root ["cnf"] = normalize_cnf(cnf_root (root))
- data = json.dumps(root)
- out.write(data)
- out.write("\n")
- # TODO: else raise value error?
-
-
-def dump_json_string(root, renumber=False):
- """
- Serialize CNF var structure as JSON, returning the result as a string.
- """
- out = io.StringIO()
- output_json(root, out, renumber=renumber)
- res = out.getvalue()
- out.close()
- return res
-
-
-def print_cnf_json(root, out=None, renumber=False):
- """
- Print CNF_VAR structure in json format to stdout.
-
- Calls :py:func:`output_json` with `sys.stdout` if `out` is not given or
- `None`.
- """
- if root is not None:
- output_json(root, out or sys.stdout, renumber=renumber)
-
-
-def write_cnf_json(*argv, **kw_argv):
- """Alias for :py:func:`print_cnf_json`."""
- print_cnf_json(*argv, **kw_argv)
-
-
-
-###############################################################################
-# ENTRY POINT FOR DEVELOPMENT
-###############################################################################
-
-
-def usage():
- print("usage: cnfvar.py -" , file=sys.stderr)
- print("" , file=sys.stderr)
- print(" Read CNF from stdin.", file=sys.stderr)
- print("" , file=sys.stderr)
-
-
-def main(argv):
- if len(argv) > 1:
- first = argv[1]
- if first == "-":
- cnf = read_cnf(sys.stdin.buffer.read())
- print_cnf(cnf)
- return 0
- elif first == "test":
- cnf = read_cnf(sys.stdin.buffer.read())
- cnff = get_vars(cnf, instance=2, data="FAX")
- print_cnf(cnff)
- return 0
- usage()
- return -1
-
-
-if __name__ == "__main__":
- sys.exit(main(sys.argv))
import logging
log = logging.getLogger('pyi2ncommon.dial')
-from . import arnied_wrapper
-from . import simple_cnf
+from . import cnfvar
from . import sysmisc
HAVE_IPADDRESS = True
:type block: bool
:returns: Whether the ``tell-connd`` command succeeded.
:rtype: int (dial result as above)
+
+ ..todo:: This function uses old and deprecated methods of cnfvar usage, it
+ should therefore be converted and needs unit tests added to verify the
+ conversion and perhaps even to test the overall dial functionality.
"""
log.debug("requested connd_dial_online%s" % " (blocking)" if block else "")
- cnf = simple_cnf.SimpleCnf()
- cnf.add("DIALOUT_MODE", DIALOUT_MODE_CNF[DIALOUT_MODE_PERMANENT])
- cnf.add("DIALOUT_DEFAULTPROVIDER_REF", "1")
+ cnf = cnfvar.CnfList([("dialout_mode", DIALOUT_MODE_CNF[DIALOUT_MODE_PERMANENT]),
+ ("dialout_defaultprovider_ref", "1")])
def aux():
- return arnied_wrapper.set_cnf_pipe(cnf, block=block), "", None
+ store = cnfvar.BinaryCnfStore()
+ store.commit(cnf)
+ return True, "", None
if block is False:
- succ = aux()
+ succ, out, _ = aux()
return sysmisc.RUN_RESULT_OK if succ is True else sysmisc.RUN_RESULT_FAIL
res, err = sysmisc.cmd_block_till(aux, DIALTOOLS_TIMEOUT, _connd_online,
# outsourced source, import required for compatiblity
from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
from .mail_validator import * # pylint: disable=unused-import
+from .sysmisc import replace_file_regex
log = logging.getLogger('pyi2ncommon.mail_utils')
of 'envelopeto' or 'received'
:raises: :py:class:`ValueError` if the choice of criterion is invalid
- In some cases this function is reusing arnied wrapper's cnf value
- preparation but for email headers.
+ ..todo:: In some cases this function is reusing arnied wrapper's cnf
+ value preparation but for email headers.
"""
if criterion == "envelopeto":
logging.debug("Updating test emails' EnvelopeTo header")
- arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
+ replace_file_regex(email_file, value, regex=regex)
elif criterion == "received":
logging.debug("Updating test emails' Received header")
with open(email_file, "r") as file_handle:
% criterion)
-def create_users(usernames, config_file, params):
- """
- Create cyrus users from an absolute path to a user configuration file.
-
- :param usernames: usernames of the created users
- :type usernames: [str]
- :param str config_file: template config file to use for each user
- configuration
- :param params: template config file to use for each user configuration
- :type params: {str, str}
- :raises: :py:class:`RuntimeError` if the user exists already or cannot be
- created
- """
- log.info("Creating new cyrus users %s", ", ".join(usernames))
- cyrus_user_path = params.get("cyrus_user_path",
- "/datastore/imap-mails/user/")
-
- # check for existence round
- for username in usernames:
- if os.path.exists(os.path.join(cyrus_user_path,
- username.replace(".", "^"))):
- raise RuntimeError("The user %s was already created" % username)
-
- for username in usernames:
- params["user"] = '%i: "%s"' % (-1, username)
- params["user_fullname"] = username
- params_regex = {"user": r'%s,(-?\d+: ".*")'}
- arnied_wrapper.set_cnf_semidynamic([config_file],
- params, params_regex)
-
- for username in usernames:
- if not os.path.exists(os.path.join(cyrus_user_path,
- username.replace(".", "^"))):
- raise RuntimeError("The user %s could not be created" % username)
- else:
- log.info("Added new user %s", username)
- log.info("%s users successfully created!", len(usernames))
-
-
def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
raise_on_defect=False, new_message_type=False):
"""
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-
-SUMMARY
-------------------------------------------------------
-Utility for one-step dynamic cnfvar configuration.
-
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-
-Copyright: Intra2net AG
-
-
-CONTENTS
-------------------------------------------------------
-Contains general as well as specialized versions of some of the main
-configurations performed by our tests.
-
-INTERFACE
-------------------------------------------------------
-
-"""
-
-import time
-import logging
-
-# custom imports
-from . import arnied_wrapper as aw
-from .arnied_wrapper import Delete, Update, Add, Child, batch_update_cnf, build_cnf
-from .cnfline import build_cnfvar, build_group, build_intraclient
-from .cnfline import build_nic, build_provider, build_user
-
-log = logging.getLogger('pyi2ncommon.mk_config')
-
-###############################################################################
-# MINOR CONFIGURATION
-###############################################################################
-
-
-def simple(varname, data, filename):
- """
- Generate and save a single-variable configuration file.
-
- :param str varname: cnf variable name
- :param str data: cnf variable data
- :param str filename: config name
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create single-variable configuration file")
- tmp = build_cnfvar.BuildCnfVar(name=varname, data=data)
- [filename] = aw.prep_config_paths([filename], aw.DUMP_CONFIG_DIR)
- logging.info("Saving simple configuration to %s", filename)
- tmp.save(filename)
- return filename
-
-
-def user(username="admin", instance=1, suffix="host"):
- """
- Generate and save a user configuration file.
-
- :param str username: username for the user variable
- :param int instance: instance number (for multiple users, -1 for next available)
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied user configuration")
- user_obj = batch_update_cnf(
- build_user.BuildUser(data=username, instance=instance, line_no=1),
- [(Update, ("USER_FULLNAME", 0, username)),
- (Update, ("USER_GROUP_MEMBER_REF", 0, "1")),
- (Add, ("USER_GROUP_MEMBER_REF", 1, "2")),
- (Delete, "USER_WEBMAIL_MESSAGES_PER_PAGE"),
- (Delete, "USER_LOCALE"),
- (Delete, "USER_TRASH_DELETEDAYS"),
- (Delete, "USER_WEBMAIL_SIGNATURE")])
- user_cnf = "user-%d-%s.cnf" % (time.time(), suffix)
- [user_cnf] = aw.prep_config_paths([user_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving user configuration to %s", user_cnf)
- user_obj.save(user_cnf)
- return user_cnf
-
-
-def group_admins(proxy_profile="1", activesync_enable=False, xauth_enable=False, suffix="host"):
- """
- Generate and save an Administrators group configuration file.
-
- :param str proxy_profile: proxy profile instance reference
- :param bool activesync_enable: whether to enable ActiveSync for the group
- :param bool xauth_enable: whether to enable XAUTH for the group
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied admin group configuration")
- group = batch_update_cnf(build_group.BuildGroup(data="Administratoren",
- instance=1),
- [(Update, ("GROUP_ACCESS_REMOTE_ADMINISTRATION_ALLOWED", 0, "1")),
- (Update, ("GROUP_EMAILFILTER_BAN_FILTERLIST_REF", 0, "-1")),
- (Update, ("GROUP_PROXY_PROFILE_REF", 0, proxy_profile)),
- (Update, ("GROUP_ACCESS_GO_ONLINE_ALLOWED", 0, "1")),
- (Update, ("GROUP_EMAIL_RELAY_RIGHTS", 0, "RELAY_FROM_INTRANET")),
- (Update, ("GROUP_ACTIVESYNC_ENABLE", 0, "1" if activesync_enable else "0")),
- (Update, ("GROUP_XAUTH_ENABLE", 0, "1" if xauth_enable else "0")),
- (Delete, ("GROUP_COMMENT",))])
- group_cnf = "group-%d-%s.cnf" % (time.time(), suffix)
- [group_cnf] = aw.prep_config_paths([group_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving group configuration to %s", group_cnf)
- group.save(group_cnf)
- return group_cnf
-
-
-def group_all(proxy_profile="1", suffix="host"):
- """
- Generate and save an "All" group configuration file.
-
- :param str proxy_profile: proxy profile instance reference
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied all group configuration")
- group = batch_update_cnf(build_group.BuildGroup(data="Alle",
- instance=2),
- [(Update, ("GROUP_ACCESS_GO_ONLINE_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_INFORMATION_VERSION_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_MAINPAGE_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_USERMANAGER_OWN_PROFILE_FORWARDING_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_USERMANAGER_OWN_PROFILE_GROUPWARE_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_USERMANAGER_OWN_PROFILE_SETTINGS_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_USERMANAGER_OWN_PROFILE_SORTING_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_USERMANAGER_OWN_PROFILE_SPAMFILTER_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_USERMANAGER_OWN_PROFILE_VACATION_ALLOWED", 0, "1")),
- (Update, ("GROUP_ACCESS_GROUPWARE_ALLOWED", 0, "1")),
- (Update, ("GROUP_EMAILFILTER_BAN_FILTERLIST_REF", 0, "-1")),
- (Update, ("GROUP_EMAIL_RELAY_RIGHTS", 0, "RELAY_FROM_EVERYWHERE")),
- (Update, ("GROUP_PROXY_PROFILE_REF", 0, proxy_profile)),
- (Delete, ("GROUP_COMMENT",))])
-
- group_cnf = "group-%d-%s.cnf" % (time.time(), suffix)
- [group_cnf] = aw.prep_config_paths([group_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving group configuration to %s", group_cnf)
- group.save(group_cnf)
- return group_cnf
-
-
-def nic(instance=0, nictype="NATLAN",
- ip="1.2.3.4", netmask="255.255.0.0", mac="00:00:00:00:00:00",
- suffix="host"):
- """
- Generate and save a nic configuration file.
-
- :param int instance: instance number (for multiple nics, -1 for next available)
- :param str nictype: type of the nic
- :param str ip: IP address of the nic
- :param str netmask: network mask of the nic
- :param str mac: MAC address of the nic
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied nic configuration")
- nic_obj = batch_update_cnf(
- build_nic.BuildNIC(data="", instance=instance, line_no=1),
- [(Update, ("NIC_TYPE", 0, nictype)),
- (Update, ("NIC_LAN_IP", 0, ip)),
- (Update, ("NIC_LAN_NETMASK", 0, netmask)),
- (Update, ("NIC_MAC", 0, mac))])
- nic_cnf = "nic-%d-%s.cnf" % (time.time(), suffix)
- [nic_cnf] = aw.prep_config_paths([nic_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving nic configuration to %s", nic_cnf)
- nic_obj.save(nic_cnf)
- return nic_cnf
-
-
-def intraclient(name="intraclient", instance=1,
- ip="1.2.3.4", mac="00:00:00:00:00:00",
- fwrules=5, suffix="host"):
- """
- Generate and save an intraclient configuration file.
-
- :param str name: name of the intraclient
- :param int instance: instance number (for multiple clients, -1 for next available)
- :param str ip: IP address of the intraclient
- :param str mac: MAC address of the intraclient
- :param int fwrules: instance of the firewall rules to use
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied intraclient configuration")
- intraclient_obj = batch_update_cnf(
- build_intraclient.BuildIntraclient(data=name, instance=instance),
- [(Update, ("INTRACLIENT_IP", 0, ip)),
- (Update, ("INTRACLIENT_MAC", 0, mac)),
- (Update, ("INTRACLIENT_FIREWALL_RULESET_REF", 0, fwrules))])
-
- intraclient_cnf = "intraclient-%d-%s.cnf" % (time.time(), suffix)
- [intraclient_cnf] = aw.prep_config_paths([intraclient_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving intraclient configuration to %s", intraclient_cnf)
- intraclient_obj.save(intraclient_cnf)
- return intraclient_cnf
-
-
-def provider(name="provider", instance=1, mode="ROUTER", ip="1.2.3.4", localip=None,
- netmask="255.255.0.0", dnsmode="IP", dns="1.2.3.4", fwrules=5,
- dialretry=None, timeout="", mtumode="AUTO",
- vlanid=None, mtusize=None, login=None, password=None,
- modemip=None, providerid=None, localdhcp=None,
- suffix="host"):
- """
- Generate and save a provider configuration file.
-
- :param str name: name of the provider
- :param int instance: instance number (for multiple clients, -1 for next available)
- :param str mode: provider mode
- :param str ip: IP address of the provider
- :param localip: IP address of the configured machine (valid for some configurations)
- :type localip: str or None
- :param str netmask: netmask of the provider
- :param str dnsmode: dnsmode of the provider
- :param str dns: IP address of the DNS server
- :param int fwrules: instance of the firewall rules to use
- :param any args: lots of detailed configuration
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied provider configuration")
-
- def add_or_del(var, field):
- if var is not None:
- return Add, (field, 0, str(var))
- return Delete, field
- provider_obj = batch_update_cnf(
- build_provider.BuildProvider(data=name, instance=instance),
- [(Update, ("PROVIDER_MODE", 0, mode)),
- ip and (Update, ("PROVIDER_IP", 0, ip))
- or (Delete, "PROVIDER_IP"),
- localip
- and (Update, ("PROVIDER_LOCALIP", 0, localip))
- or (Delete, "PROVIDER_LOCALIP"),
- netmask and (Update, ("PROVIDER_NETMASK", 0,
- netmask))
- or (Delete, "PROVIDER_NETMASK"),
- (Update, ("PROVIDER_TIMEOUT", 0, timeout)),
- (Update, ("PROVIDER_DNS_MODE", 0, dnsmode)),
- (Update, ("PROVIDER_DNS", 0,
- dns if dnsmode == "IP" else "")),
- (Update, ("PROVIDER_MTU_MODE", 0, mtumode)),
- (Update, ("PROVIDER_MTU_SIZE", 0,
- mtusize if mtumode != "AUTO" else "")),
- (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules))),
- add_or_del(vlanid, "PROVIDER_VLAN_ID"),
- add_or_del(dialretry, "PROVIDER_DIAL_RETRY"),
- add_or_del(login, "PROVIDER_LOGIN"),
- add_or_del(password, "PROVIDER_PASSWORD"),
- add_or_del(modemip, "PROVIDER_MODEM_IP"),
- add_or_del(providerid, "PROVIDER_PROVIDERID"),
- add_or_del(localdhcp, "PROVIDER_LOCAL_DHCP")])
- provider_cnf = "provider-%d-%s.cnf" % (time.time(), suffix)
- [provider_cnf] = aw.prep_config_paths([provider_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving provider configuration to %s", provider_cnf)
- provider_obj.save(provider_cnf)
- return provider_cnf
-
-
-def provider_proxy(mode="ROUTER", ip="1.2.3.4", localip=None, proxy_port=3128, fwrules=7, suffix="host"):
- """
- Generate and save a provider configuration file for proxy.
-
- :param str mode: provider mode
- :param str ip: IP address of the provider (and DNS server)
- :param localip: IP address of the configured machine (valid for some configurations)
- :type localip: str or None
- :param int proxy_port: port for the provider proxy
- :param int fwrules: instance of the firewall rules to use
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create arnied provider configuration.")
- provider_obj = batch_update_cnf(
- build_provider.BuildProvider(),
- [(Update, ("PROVIDER_MODE", 0, mode)),
- (Update, ("PROVIDER_DNS", 0, ip)),
- (Update, ("PROVIDER_DYNDNS_ENABLE", 0, "0")),
- (Update, ("PROVIDER_IP", 0, ip)),
- (Update, ("PROVIDER_PROXY_SERVER", 0, ip)),
- (Update, ("PROVIDER_PROXY_PORT", 0, str(proxy_port))),
- localip
- and (Update, ("PROVIDER_LOCALIP", 0, localip))
- or (Delete, "PROVIDER_LOCALIP"),
- (Update, ("PROVIDER_DNS_MODE", 0, "IP")),
- (Update, ("PROVIDER_FIREWALL_RULESET_REF", 0, str(fwrules)))])
- provider_cnf = "provider-%d-%s.cnf" % (time.time(), suffix)
- [provider_cnf] = aw.prep_config_paths([provider_cnf], aw.DUMP_CONFIG_DIR)
- logging.info("Saving provider configuration to %s", provider_cnf)
- provider_obj.save(provider_cnf)
- return provider_cnf
-
-
-def port_forwarding(src_port="1234", src_port_end="",
- dst_port="1234", dst_port_end="",
- dst_ip_ref="1", protocol_type="TCP",
- suffix="host"):
- """
- Generate and save a port forwarding configuration file.
-
- :param str src_port: forwarded source port
- :param str src_port_end: forwarded source port end for a port range
- :param str dst_port: forwarded destination port
- :param str dst_port_end: forwarded destination port end for a port range
- :param str dst_ip_ref: destination nic instance for a port range
- :param str protocol_type: port forwarding protocol type
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create port forwarding configuration")
- value_id = "test"
- portforward_client_cnf = "portforward-%d-%s.cnf" % (time.time(), suffix)
- return build_cnf("PORT_FORWARDING",
- data=value_id,
- filename=portforward_client_cnf,
- vals=[(Child, ("PORT_FORWARDING_DST_IP_REF", 0, dst_ip_ref)),
- (Child, ("PORT_FORWARDING_DST_PORT", 0, dst_port)),
- (Child, ("PORT_FORWARDING_DST_PORT_END", 0, dst_port_end)),
- (Child, ("PORT_FORWARDING_PROTOCOL_TYPE", 0, protocol_type)),
- (Child, ("PORT_FORWARDING_SRC_PORT", 0, src_port)),
- (Child, ("PORT_FORWARDING_SRC_PORT_END", 0, src_port_end))])
-
-
-def firewall_ruleset_simple(suffix="host"):
- """
- Generate and save a simple firewall ruleset configuration file.
-
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create firewall ruleset")
- fw_cnf = "fw-%d-%s.cnf" % (time.time(), suffix)
- return build_cnf("FIREWALL_RULESET",
- instance=101,
- data="Port Forwarding libfirewall test",
- filename=fw_cnf,
- vals=[(Update, ("FIREWALL_RULESET_PROFILE_TYPE", 0, "SIMPLE_PROVIDER")),
- (Update, ("FIREWALL_RULESET_PROVIDER_HTTPS_OPEN", 0, "0")),
- (Update, ("FIREWALL_RULESET_PROVIDER_POP3SIMAPS_OPEN", 0, "0")),
- (Update, ("FIREWALL_RULESET_PROVIDER_PORT_FORWARDING_ENABLE", 0, "1")),
- (Update, ("FIREWALL_RULESET_PROVIDER_SMTP_OPEN", 0, "0")),
- (Update, ("FIREWALL_RULESET_PROVIDER_HTTP_OPEN", 0, "0")),
- (Update, ("FIREWALL_RULESET_PROVIDER_VPN_OPEN", 0, "0"))])
-
-
-def firewall_ruleset_port(suffix="host"):
- """
- Generate and save a firewall ruleset configuration file for port forwarding.
-
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create firewall ruleset")
- fw_portforward_cnf = "fw-portforward-%d-%s.cnf" % (time.time(), suffix)
- return build_cnf("FIREWALL_RULESET",
- instance=100,
- data="Port forwarding only",
- filename=fw_portforward_cnf,
- vals=[(Update, ("FIREWALL_RULESET_AUTOMATIC_ANSWER_RULE", 0, "1")),
- (Update, ("FIREWALL_RULESET_PROFILE_TYPE", 0, "FULL")),
- (Add, ("FIREWALL_RULESET_RULE", 1, "")),
- (Child, ("FIREWALL_RULESET_RULE_ACTION", 0, "ACCEPT")),
- (Child, ("FIREWALL_RULESET_RULE_CHECK_CONNECTION_STATUS", 0, "PORTFORWARDING")),
- (Child, ("FIREWALL_RULESET_RULE_CHECK_TCP_FLAGS", 0, "DISABLED")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_FOR_ACTION_ENABLE", 0, "0")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_FOR_LOG_ENABLE", 0, "0")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_PACKETS_AVERAGE_COUNT", 0, "")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_PACKETS_AVERAGE_PERIOD", 0, "SEC")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_PACKETS_PEAK_COUNT", 0, "")),
- (Child, ("FIREWALL_RULESET_RULE_LOG_ENABLE", 0, "0")),
- (Child, ("FIREWALL_RULESET_RULE_LOG_MESSAGE", 0, "")),
- (Child, ("FIREWALL_RULESET_RULE_TIME_INCLUDE_TIME_REF", 0, "-1")),
- (Update, ("FIREWALL_RULESET_USAGE", 0, "PROVIDER"))])
-
-
-def firewall_ruleset_dmz(suffix="host"):
- """
- Generate and save a firewall ruleset configuration file for DMZ.
-
- :param str suffix: optional suffix to use for config identification
- :returns: generated config filename
- :rtype: str
- """
- log.info("Create firewall ruleset")
- fw_dmz_cnf = "fw-dmz-%d-%s.cnf" % (time.time(), suffix)
- return build_cnf("FIREWALL_RULESET",
- instance=100,
- data="DMZ firewall rules",
- filename=fw_dmz_cnf,
- vals=[(Update, ("FIREWALL_RULESET_AUTOMATIC_ANSWER_RULE", 0, "1")),
- (Update, ("FIREWALL_RULESET_PROFILE_TYPE", 0, "FULL")),
- (Add, ("FIREWALL_RULESET_RULE", 1, "")),
- (Child, ("FIREWALL_RULESET_RULE_ACTION", 0, "ACCEPT")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_FOR_ACTION_ENABLE", 0, "0")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_FOR_LOG_ENABLE", 0, "0")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_PACKETS_AVERAGE_COUNT", 0, "")),
- (Child, ("FIREWALL_RULESET_RULE_LIMIT_PACKETS_PEAK_COUNT", 0, "")),
- (Child, ("FIREWALL_RULESET_RULE_LOG_ENABLE", 0, "0")),
- (Child, ("FIREWALL_RULESET_RULE_LOG_MESSAGE", 0, "")),
- (Child, ("FIREWALL_RULESET_RULE_SERVICE_INCLUDE_SERVICEGROUP_REF", 0, "6")),
- (Child, ("FIREWALL_RULESET_RULE_DST_INCLUDE_CLIENT_REF", 0, "2")),
- (Update, ("FIREWALL_RULESET_USAGE", 0, "LANVPN"))])
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-
-SUMMARY
-------------------------------------------------------
-Read / write / merge guest cnf var sets, even on host.
-
-.. note:: DEPRECATED! Please do not extend this or add new uses of this module,
- use :py:mod:`pyi2ncommon.arnied_api` or :py:mod:`pyi2ncommon.cnfvar`
- instead!
-
-Copyright: Intra2net AG
-
-
-CONTENTS
-------------------------------------------------------
-
-This module can be viewed as a convenience-wrapper around module
-:py:mod:`cnfvar`. It uses many of its function but provides some higher-level
-interfaces, most of all class :py:class:`SimpleCnf`. It is completely
-independent of the :py:mod:`cnfline` package and its included subclasses
-(modules in `shared.cnfline`, starting with ``build_`` and ``configure_``).
-
-Class :py:class:`SimpleCnf` represents a hierarchical set of conf vars and
-provides functions to deal with that hierarchy. Under the hood, all functions
-here (and probably also in :py:mod:`cnfvar`) work with conf vars represented as
-dictionaries and lists thereof. Conf var dicts have keys `number`, `varname`,
-`instance`, `data`, `comment` and possibly `parent` and/or `children`.
-`varname` is a regular lower-case string, `data` is a (utf8) string, `comment`
-is usually None, `number`, `parent` and `instance` are int. If a conf var has
-children, then this is a list of conf var dicts. `parent` is only present if a
-conf-var is a child. Several conf vars, if not wrapped in a
-:py:class:`SimpleCnf`, appear either as simple list of conf var dicts or as a
-dict with a single key `cnf` whose value is a list of conf var dicts. (Function
-:py:func:`get_cnf` returns the former given the latter).
-
-.. todo:: Exceptions Invalid[Json]Cnf are used inconsistently (e.g. check type
- of function arguments `-->` should be ValueError) and difference
- between them is unclear. Also name differs only in case from
- :py:class:`cnfvar.InvalidCNF`
-
-
-INTERFACE
-------------------------------------------------------
-"""
-
-import os
-import json
-import tempfile
-import time
-import logging
-log = logging.getLogger('pyi2ncommon.simple_cnf')
-
-from . import arnied_wrapper
-from . import cnfvar_old
-from . import sysmisc
-
-###############################################################################
-# constants
-###############################################################################
-
-#: timeout for copying temporary config files to VM objects (seconds)
-COPY_FILES_TIMEOUT = 15
-
-#: additional location of configuration files
-ADD_CNFFILE_PATH = "/tmp/configs"
-
-
-###############################################################################
-# EXCEPTIONS
-###############################################################################
-
-
-class InvalidCnf(Exception):
- """Exception that indicates a general problem with conf var processing."""
-
- def __init__(self, m):
- """Create an invalid config exception."""
- msg = "Invalid CNF_VAR: %s" % m
- super(InvalidCnf, self).__init__(msg)
- self.msg = msg
- self.pfx = "[CNF]"
-
- def __str__(self):
- """Get a string version of the exception message."""
- return "%s %s" % (self.pfx, self.msg)
-
-
-class InvalidJsonCnf(InvalidCnf):
- """Exception that indicates a general problem with conf var processing."""
-
- def __init__(self, m):
- """Create an invalid JSON config exception."""
- super(InvalidJsonCnf, self).__init__(m)
- self.pfx = "[CNF:JSON]"
-
-
-###############################################################################
-# auxiliary functions
-###############################################################################
-
-
-def get_cnf(cnf):
- """
- "Convert" a config dict to a list of conf var dicts.
-
- This just removes the top-level 'cnf' key and returns its value.
-
- :param cnf: config dictionary
- :type cnf: {str, [dict]}
- :returns: list of cnf var dicts
- :rtype: [{str, int or str or None}]
- :raises: :py:class:`InvalidJsonCnf` if there is no `cnf` field found
- """
- cnf_vars = cnf.get("cnf")
- if cnf_vars is None:
- raise InvalidJsonCnf("toplevel \"cnf\" field required")
- return cnf_vars
-
-
-def gen_tmpname():
- """
- Get a (quite) safe temporary file name for config file.
-
- :returns: temporary file name
- :rtype: str
- """
- now = time.time()
- file_handle, file_name = tempfile.mkstemp(prefix="simple_%d_" % int(now),
- suffix=".cnf")
- os.close(file_handle)
- os.unlink(file_name)
- return file_name
-
-
-def set_values(cnf_vars, replacements):
- """
- Recursively replace values in configuration
-
- Works in-place, meaning that no new configuration is created and returned
- but instead argument `cnf_vars` is modified (and nothing returned).
-
- :param cnf_vars: config where replacements are to be made
- :type cnf_vars: [{str, int or str or None}] or {str, [dict]}
- :param replacements: what to replace and what to replace it with
- :type replacements: {str, str} or [(str, str)]
- :raises: :py:class:`InvalidJsonCnf` if cnf_vars is neither dict or list
- """
- # determine set replace_me of keys to replace and function get that returns
- # value for key or empty string if key not in replacements
- replace_me = None
- get = None
- if isinstance(replacements, dict):
- replace_me = set(k.lower() for k in replacements.keys())
- get = lambda var: str(replacements.get(var, "")) # pylint: disable=function-redefined
- elif isinstance(replacements, list):
- replace_me = set(r[0].lower() for r in replacements)
-
- def get(var): # pylint: disable=function-redefined
- """Get replacement value for given variable name."""
- try:
- return str(next(r[1] for r in replacements if r[0] == var))
- except StopIteration:
- return ""
- else:
- raise TypeError("replacements must be dictionary or key-value list")
-
- # check type of arg "cnf_vars"
- if isinstance(cnf_vars, dict):
- cnf_vars = cnf_vars["cnf"] # operate on the var list
- if not isinstance(cnf_vars, list):
- raise InvalidJsonCnf("ill-formed CNF_VAR: expected list, got %s (%s)"
- % (type(cnf_vars), cnf_vars))
-
- def aux(varlist):
- """Internal recursive function to replace values."""
- for var in varlist:
- varname = var["varname"].lower()
- if varname in replace_me:
- var["data"] = str(get(varname))
- children = var.get("children", None)
- if children is not None:
- aux(children)
-
- # apply function on complete cnf_vars
- aux(cnf_vars)
-
-
-def lookup_cnf_file(fname):
- """
- Searches for config file with given name in default locations.
-
- :param str fname: file name of config file (without path)
- :returns: first existing config file found in default locations
- :rtype: str
- :raises: :py:class:`IOError` if no such config file was found
- """
- locations = [arnied_wrapper.SRC_CONFIG_DIR, ADD_CNFFILE_PATH]
- for path in locations:
- fullpath = os.path.join(path, fname)
- if os.path.isfile(fullpath):
- return fullpath
- raise IOError("config file %s does not exist in any of the readable "
- "locations %s" % (fname, locations))
-
-
-###############################################################################
-# primary class
-###############################################################################
-
-
-class SimpleCnf(object):
- """
- Representation of hierarchical configuration of variables.
-
- Based on C++ `cnf_vars` as visualized by *get_cnf*.
-
- Internal data representation: list of conf var dicts; see module doc for
- details
- """
-
- def __init__(self, cnf=None):
- """
- Creates a simple configuration.
-
- Does not check whether given cnf list contains only valid data.
- Does not recurse into dicts.
-
- :param cnf: initial set of conf var data (default: None = empty conf)
- :type cnf: list or anything that :py:func:`get_cnf` can read
- """
- if cnf is None:
- self.__cnfvars = []
- elif isinstance(cnf, list):
- self.__cnfvars = cnf
- elif isinstance(cnf, dict):
- self.__cnfvars = get_cnf(cnf)
- else:
- raise InvalidCnf ("cannot handle %s type inputs" % type (cnf))
-
- def _find_new_number(self, cnf_vars):
- """Recursive helper function to find new unique (line) number."""
- if not cnf_vars:
- return 1
- new_numbers = [1, ] # in case cnf_vars is empty
- for cnf_var in cnf_vars:
- new_numbers.append(cnf_var['number'] + 1)
- try:
- new_numbers.append(self._find_new_number(cnf_var['children']))
- except KeyError:
- pass
- return max(new_numbers) # this is max(all numbers) + 1
-
- def _find_new_instance(self, varname):
- """
- Find an instance number for variable with non-unique varname.
-
- Will only check on top level, is not recursive.
-
- :param str varname: name of conf var; will be converted to lower-case
- :returns: instance number for which there is no other conf var of same
- name (0 if there is not other conf var with that name)
- :rtype: int
- """
- result = 0
- varname = varname.lower()
- for entry in self.__cnfvars:
- if entry['varname'] == varname:
- result = max(result, entry['number']+1)
- return result
-
- def add(self, varname, data='', number=None, instance=None, children=None):
- """
- Add a cnf var to config on top level.
-
- :param str varname: name of conf var; only required arg; case ignored
- :param str data: conf var's value
- :param int number: line number of that conf var; if given as None
- (default) the function looks through config to find
- a new number that is not taken; must be positive!
- Value will be ignored if children are given.
- :param int instance: Instance of the new conf var or None (default).
- If None, then function looks through config to
- find a new unique instance number
- :param children: child confs for given conf var. Children's parent
- and line attributes will be set in this function
- :type children: :py:class:`SimpleCnf`
- """
- if instance is None:
- instance = self._find_new_instance(varname)
- if children:
- number = self._find_new_number(self.__cnfvars) # need top number
- new_children = []
- for child in children:
- new_dict = child.get_single_dict()
- new_dict['parent'] = number
- new_children.append(new_dict)
- cnfvar_old.renumber_vars({'cnf':new_children}, number)
- children = new_children
- elif number is None:
- number = self._find_new_number(self.__cnfvars)
-
- new_var = dict(varname=varname.lower(), data=data,
- number=number, comment=None, instance=instance)
- if children:
- new_var['children'] = children # only add if non-empty
- self.__cnfvars.append(new_var)
-
- def add_single(self, varname, data=u'', number=None):
- """
- Add a single cnf var to config on top level.
-
- Compatibility API.
- """
- return self.add (varname, data=data, number=number)
-
- def append_file_generic(self, reader, cnf, replacements=None):
- """
- Append conf var data from file.
-
- If `replacements` are given, calls :py:meth:`set_values` with these
- before adding values to config.
-
- :param cnf: file name or dictionary of conf vars
- :type cnf: str or {str, int or str or None}
- :param replacements: see help in :py:meth:`set_values`
- """
- log.info("append CNF_VARs from file")
- new_vars = None
- if callable(reader) is False:
- raise TypeError("append_file_generic: reader must be callable, "
- "not %s" % type(reader))
- if isinstance(cnf, dict):
- new_vars = get_cnf(cnf)
- elif isinstance(cnf, str):
- fullpath = lookup_cnf_file(cnf)
- with open(fullpath, "rb") as chan:
- cnfdata = chan.read()
- tmp = reader(cnfdata)
- new_vars = get_cnf(tmp)
- if new_vars is None:
- raise InvalidCnf("Cannot append object \"%s\" of type \"%s\"."
- % (cnf, type(cnf)))
-
- if replacements is not None:
- set_values(new_vars, replacements)
-
- self.__cnfvars.extend(new_vars)
-
- def append_file(self, cnf, replacements=None):
- """Append conf var data from file."""
- return self.append_file_generic(cnfvar_old.read_cnf, cnf,
- replacements=replacements)
-
- def append_file_json(self, cnf, replacements=None):
- """Append conf var data from json file."""
- return self.append_file_generic(cnfvar_old.read_cnf_json, cnf,
- replacements=replacements)
-
- def append_guest_vars(self, vm=None, varname=None, replacements=None):
- """
- Append content from machine's "real" config to this object.
-
- Runs `get_cnf -j [varname]` on local host or VM (depending on arg
- `vm`), converts output and appends it to this objects' conf var set.
- If replacements are given, runs :py:meth:`set_values`, first.
-
- :param vm: a guest vm or None to run on local host
- :type vm: VM object or None
- :param str varname: optional root of conf vars to append. If given as
- None (default), append complete conf
- :param replacements: see help in :py:meth:`set_values`
- """
- cnf = arnied_wrapper.get_cnfvar(varname=varname, vm=vm)
- new_vars = get_cnf(cnf)
-
- log.info("apply substitutions to extracted CNF_VARs")
- if replacements is not None:
- set_values(new_vars, replacements)
-
- current = self.__cnfvars
- current.extend(new_vars)
-
- def save(self, filename=None):
- """
- Saves this object's configuration data to a file.
-
- The output file's content can be interpreted by `set_cnf -j`.
-
- :param str filename: name of file to write config to; if None (default)
- the config will be written to a temporary file
- :returns: filename that was written to
- :rtype: str
- """
- log.info("save configuration")
- current = self.__cnfvars
- if not current:
- raise InvalidCnf("No variables to write.")
-
- if filename is None:
- # create temporary filename
- filename = arnied_wrapper.generate_config_path(dumped=True)
-
- with open(filename, 'w') as out:
- cnfvar_old.output_json({"cnf": current}, out, renumber=True)
-
- return filename
-
- def apply(self, vm=None, renumber=True):
- """
- Apply object's config on VM or local host.
-
- Runs a `set_cnf` with complete internal config data, possibly waits for
- generate to finish afterwards.
-
- :param vm: a guest vm or None to apply on local host
- :type vm: VM object or None
- :param bool renumber: re-number conf vars before application
- """
- current = self.__cnfvars
- if renumber:
- log.info("enforce consistent CNF_LINE numbering")
- cnfvar_old.renumber_vars(current)
- log.info("inject configuration %s" % "into guest" if vm else "in place")
- arnied_wrapper.set_cnf_dynamic({"cnf": current},
- config_file=gen_tmpname(), vm=vm)
-
- def __str__(self):
- """
- Get a config in json format, ready for `set_cnf -j`.
-
- :returns: config in json format
- :rtype: str
- """
- return cnfvar_old.dump_json_string({"cnf": self.__cnfvars}, renumber=True)
-
- def pretty_print(self, print_func=None):
- """
- Get a string representation of this simple_cnf that is human-readable
-
- Result is valid json with nice line breaks and indentation but not
- renumbered (so may not be fit for parsing)
- """
- for line in json.dumps({"cnf": self.__cnfvars}, check_circular=False,
- indent=4, sort_keys=True).splitlines():
- if print_func is None:
- print(line)
- else:
- print_func(line)
-
- def __iter__(self):
- """
- Return an iterator over the contents of this simple cnf.
-
- The iteration might not be ordered by line number nor entry nor
- anything else. No guarantees made!
-
- The entries returned by the iterator are :py:class:`SimpleCnf`.
-
- Example::
-
- for cnf_list in iter(my_cnf['PROXY_ACCESSLIST']):
- print('checking proxy list {0} with {1} children'
- .format(cnf_list.get_value(), len(cnf_list)))
- """
- # self.__cnfvars is a list of dicts, each with the same fields
- for dict_entry in self.__cnfvars:
- yield SimpleCnf([dict_entry, ])
-
- def __getitem__(self, item):
- """
- Called by `cnf['key']` or `cnf[line_number]`; returns subset of cnf.
-
- Processing time is O(n) where n is the number of top-level entries in
- simple cnf.
-
- Examples (on VM)::
-
- all = SimpleCnf()
- all.append_guest_vars()
- len(all) # --> probably huge
- len(all['user']) # should give the number of users
-
- # should result in the same as all['user']:
- users = SimpleCnf()
- users.append_guest_vars(varname='user')
-
- :param item: line number or value to specify a cnf subset;
- if string value, will be converted to lower case
- :type item: int or str
- :returns: another simple cnf that contains a subset of this simple cnf
- :rtype: :py:class:`SimpleCnf`
-
- .. seealso:: method :py:func:`get` (more general than this)
- """
- # determine whether arg 'item' is a key name or a line number
- if isinstance(item, int): # is line number
- dict_key = 'number'
- else: # assume key name
- dict_key = 'varname'
- item = item.lower()
-
- # search all entries for matches
- results = [dict_entry for dict_entry in self.__cnfvars
- if dict_entry[dict_key] == item]
-
- # convert result to a simple cnf
- return SimpleCnf(results)
-
- def __len__(self):
- """
- Get the number of top-level entries in cnf.
-
- :returns: number of top-level entries in cnf
- :rtype: int
- """
- return len(self.__cnfvars)
-
- def get(self, name=None, value=None, instance=None, line=None):
- """
- Get a subset of this config that matches ALL of given criteria.
-
- For example, if :py:func:`get_cnf` contains the line
- '1121 USER,1: "admin"', all of these examples will result in the same
- simple cnf::
-
- cnf.get(name='user', value='admin')
- cnf.get(name='user', instance=1)
- cnf.get(name='user').get(value='admin')
- cnf.get(line=1121)
-
- :param str name: conf var name (key) or None to not use this criterion;
- will be converted to lower case
- :param str value: value of conf var or None to not use this criterion
- :param int instance: instance number of value in a list (e.g. USERS)
- or None to not use this criterion
- :param int line: line number of None to not use this criterion
- :returns: a simple cnf that contains only entries that match ALL of the
- given criteria. If nothing matches the given criteria, an
- empty simple cnf will be returned
- :rtype: :py:class:`SimpleCnf`
-
- .. seealso:: method :py:func:`__getitem__` (less general than this)
- """
- if name is None:
- name_test = lambda test_val: True
- else:
- name = name.lower()
- name_test = lambda test_val: name == test_val['varname']
-
- if value is None:
- value_test = lambda test_val: True
- else:
- value = str(value)
- value_test = lambda test_val: test_val['data'] == value
-
- if instance is None:
- instance_test = lambda test_val: True
- elif not isinstance(instance, int):
- raise ValueError('expect int value for instance!')
- else:
- instance_test = lambda test_val: instance == test_val['instance']
-
- if line is None:
- line_test = lambda test_val: True
- elif not isinstance(line, int):
- raise ValueError('expect int value for line number!')
- else:
- line_test = lambda test_val: test_val['number'] == line
-
- return SimpleCnf(list(entry for entry in self.__cnfvars
- if name_test(entry) and value_test(entry)
- and instance_test(entry) and line_test(entry)))
-
- def get_children(self):
- """
- Get children of simple cnf of just 1 entry.
-
- :returns: simple cnf children or an empty simple cnf if entry has
- no children
- :rtype: :py:class:`SimpleCnf`
- :raises: :py:class:`ValueError` if this simple cnf has more
- than 1 entry
- """
- if len(self) != 1:
- raise ValueError('get_children only possible if len == 1 (is {0})!'
- .format(len(self)))
- try:
- result = self.__cnfvars[0]['children']
- except KeyError:
- return SimpleCnf()
-
- for entry in result:
- try:
- del entry['parent']
- except KeyError:
- pass
- return SimpleCnf(result)
-
- def get_value(self):
- """
- Get a value of a simple cnf of just 1 entry.
-
- :returns: str cnf value/data
- :rtype: str
- :raises: :py:class:`ValueError` if this simple cnf has more
- than 1 entry
- """
- if len(self) != 1:
- raise ValueError('get_value only possible if len == 1 (is {0})!'
- .format(len(self)))
- return self.__cnfvars[0]['data']
-
- def get_single_dict(self):
- """
- Get a dictionary of a simple cnf of just 1 entry.
-
- :returns: dictionary of a simple cnf
- :rtype: {str, int or str or None}
- """
- if len(self) != 1:
- raise ValueError('get_single_dict only possible if len == 1 (is {0})!'
- .format(len(self)))
- return self.__cnfvars[0]
-
- def __eq__(self, other_cnf):
- """
- Determine wether `self` == `other_cnf`.
-
- :param other_cnf: cnf to compare with
- :type other_cnf: :py:class:`SimpleCnf`
- :returns: whether all cnf var entries are equal
- :rtype: bool
- """
- key_func = lambda cnf_var_entry: cnf_var_entry['number']
-
- if isinstance (other_cnf, SimpleCnf) is False:
- return False
-
- return sorted(self.__cnfvars, key=key_func) \
- == sorted(other_cnf.__cnfvars, key=key_func) # pylint: disable=protected-access
return RUN_RESULT_OK, None
+def replace_file_regex(edited_file, value, regex=None, ignore_fail=False):
+ """
+ Replace with value in a provided file using an optional regex or entirely.
+
+ :param str edited_file: file to use for the replacement
+ :param str value: value to replace the first matched group with
+ :param regex: more restrictive regular expression to use when replacing with value
+ :type regex: str or None
+ :param bool ignore_fail: whether to ignore regex mismatching
+ :raises: :py:class:`ValueError` if (also default) `regex` doesn't have a match
+
+ In order to ensure better matching capabilities you are supposed to
+ provide a regex pattern with at least one subgroup to match your value.
+ What this means is that the value you like to replace is not directly
+ searched into the config text but matched within a larger regex in
+ in order to avoid any mismatch.
+
+ Example:
+ provider.cnf, 'PROVIDER_LOCALIP,0: "(\d+)"', 127.0.0.1
+ """
+ pattern = regex.encode() if regex else "(.+)"
+
+ with open(edited_file, "rb") as file_handle:
+ text = file_handle.read()
+ match_line = re.search(pattern, text)
+
+ if match_line is None and not ignore_fail:
+ raise ValueError(f"Pattern {pattern} not found in {edited_file}")
+ elif match_line is not None:
+ old_line = match_line.group(0)
+ text = text[:match_line.start(1)] + value.encode() + text[match_line.end(1):]
+ line = re.search(pattern, text).group(0)
+ llog.debug(f"Updating {old_line} to {line} in {edited_file}")
+ with open(edited_file, "wb") as file_handle:
+ file_handle.write(text)
+
+
###############################################################################
# LOGGING
###############################################################################
import urllib.parse as parse
import socket
import logging
-from .arnied_wrapper import accept_licence
log = logging.getLogger('pyi2ncommon.web_interface')
:returns: whether the regex was found
:rtype: bool
"""
- accept_licence()
data = web_page_request(method="GET", url="/arnie?form=" + form,
check_certs=check_certs)
if escape:
--- /dev/null
+#!/usr/bin/env python
+# This Python file uses the following encoding: utf-8
+
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+import os
+import unittest
+
+from src.cnfvar import string as cnfvar_old
+
+#
+# test data
+#
+
+# model cnf tree
+demo_cnfvar = {"cnf": [
+ {
+ "varname": "MY_FAVORITE_CNF_VAR",
+ "instance": 1337,
+ "number": 42,
+ "data": "string conf content",
+ "comment": ""
+ },
+ {
+ "varname": "SOME_NESTED_CNF_VAR",
+ "instance": 0,
+ "number": 23,
+ "data": "999",
+ "comment": "",
+ "children": [
+ {
+ "varname": "SOME_CHILD_VAR",
+ "instance": 1,
+ "number": 24,
+ "data": "2014",
+ "parent": 23,
+ "comment": ""
+ }
+ ]
+ },
+]}
+
+# duplicate line number
+demo_invalid_cnfvar = {"cnf": [
+ {
+ "varname": "SOME_PARTICULARLY_TASTY_CNF_VAR",
+ "instance": 1337,
+ "number": 23,
+ "data": "classic wingers",
+ "comment": ""
+ },
+ {
+ "varname": "EXTRAORDINARILY_FANCY_CNF_VAR",
+ "instance": 1,
+ "number": 42,
+ "data": "ab mentions",
+ "comment": ""
+ },
+ {
+ "varname": "ANOTHER_POPULAR_CNF_VAR",
+ "instance": 0,
+ "number": 42,
+ "data": "notches",
+ "comment": ""
+ }
+]}
+
+demo_nonascii = r"""
+1 USER,1: "admin"
+2 (1) USER_DISABLED,0: "0"
+3 (1) USER_FULLNAME,0: "Administrator"
+4 (1) USER_GROUPWARE_FOLDER_CALENDAR,0: "INBOX/Kalender"
+5 (1) USER_GROUPWARE_FOLDER_CONTACTS,0: "INBOX/Kontakte"
+6 (1) USER_GROUPWARE_FOLDER_DRAFTS,0: "INBOX/Entwürfe"
+7 (1) USER_GROUPWARE_FOLDER_NOTES,0: "INBOX/Notizen"
+8 (1) USER_GROUPWARE_FOLDER_OUTBOX,0: "INBOX/Gesendete Elemente"
+9 (1) USER_GROUPWARE_FOLDER_TASKS,0: "INBOX/Aufgaben"
+10 (1) USER_GROUPWARE_FOLDER_TRASH,0: "INBOX/Gelöschte Elemente"
+11 (1) USER_GROUP_MEMBER_REF,0: "1"
+12 (1) USER_GROUP_MEMBER_REF,1: "2"
+13 (1) USER_PASSWORD,0: "test1234"
+"""
+
+demo_latin1crap = demo_nonascii.encode('latin1')
+
+demo_cnf_group = """
+1 GROUP,1: "Administratoren"
+2 (1) GROUP_ACCESS_GO_ONLINE_ALLOWED,0: "1"
+3 (1) GROUP_EMAILFILTER_BAN_FILTERLIST_REF,0: "-1"
+4 (1) GROUP_EMAIL_RELAY_RIGHTS,0: "RELAY_FROM_INTRANET"
+5 (1) GROUP_PROXY_PROFILE_REF,0: "1"
+"""
+
+demo_cnf_group_bytes = demo_cnf_group.encode("latin-1")
+
+demo_cnf_filter = b"""
+1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten"
+2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK"
+3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: ""
+4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: ""
+5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain"
+6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html"
+7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW"
+8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1"
+"""
+
+demo_cnf_comments = b"""
+1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten"
+2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK"
+3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: ""
+4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: "" # foo
+5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain"#bar
+6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html"
+7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW" # baz
+8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1"
+"""
+
+demo_cnf_escaped_quotes = """
+1 HERE_BE_QUOTES,0: "\""
+2 HERE_BE_QUOTES,1: "foo\"bar\"\"\"baz"
+3 HERE_BE_QUOTES,2: "unquo\\\"table"
+4 HERE_BE_QUOTES,3: "unquo\\\\\"\"table"
+"""
+
+#
+# test class
+#
+
+class CnfVarUnittest(unittest.TestCase):
+
+ def test_print_cnf(self):
+ with open(os.devnull, "w") as devnull:
+ print(demo_cnfvar, file=devnull)
+
+ def test_parse_cnf_simple_str(self):
+ cnf = cnfvar_old.read_cnf(demo_cnf_group)
+ with open(os.devnull, "w") as devnull:
+ print(cnf, file=devnull)
+
+ def test_parse_cnf_simple_bytes(self):
+ cnf = cnfvar_old.read_cnf(demo_cnf_group_bytes)
+ with open(os.devnull, "w") as devnull:
+ print(cnf, file=devnull)
+
+ def test_parse_cnf_nested(self):
+ cnf = cnfvar_old.read_cnf(demo_cnf_filter)
+ with open(os.devnull, "w") as devnull:
+ print(cnf, file=devnull)
+
+ def test_parse_cnf_comments(self):
+ cnf = cnfvar_old.read_cnf(demo_cnf_comments)
+ with open(os.devnull, "w") as devnull:
+ print(cnf, file=devnull)
+
+ def test_print_cnf_garbage(self):
+ try:
+ with open(os.devnull, "w") as devnull:
+ print(demo_invalid_cnfvar, file=devnull)
+ except cnfvar_old.InvalidCNF:
+ print ("Caught the duplicate line, bravo!")
+
+ def test_parse_cnf_quotes(self):
+ cnf = cnfvar_old.read_cnf(demo_cnf_escaped_quotes)
+ with open(os.devnull, "w") as devnull:
+ print(demo_invalid_cnfvar, file=devnull)
+
+ def test_read_nonascii(self):
+ cnf = cnfvar_old.read_cnf(demo_nonascii)
+ with open(os.devnull, "w") as devnull:
+ print(cnf, file=devnull)
+
+ def test_read_latin1(self):
+ cnf = cnfvar_old.read_cnf(demo_latin1crap)
+ with open(os.devnull, "w") as devnull:
+ print(cnf, file=devnull)
+
+
+class CnfVarUnittestVarnameCase(unittest.TestCase):
+ """Tests for verifying that uppercasing/lowercasing of varname works."""
+ # TODO: rethink whether this lower-casing is worth all the effort it causes
+
+ def test_read_cnf_lowercase(self):
+ """Test that after reading, varnames are lowercase."""
+ cnf = cnfvar_old.read_cnf(demo_cnf_group_bytes)
+ for parentvar in cnf['cnf']:
+ self.assertEqual(parentvar['varname'],
+ parentvar['varname'].lower())
+ if 'children' in parentvar:
+ for childvar in parentvar['children']:
+ self.assertEqual(parentvar['varname'],
+ parentvar['varname'].lower())
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
+
+"""
+test_templates.py: unit tests for cnfvar/templates.py.
+
+Tests functions in cnfvar/templates.py
+
+For help see :py:mod:`unittest`
+"""
+
+import unittest
+from textwrap import dedent
+from types import SimpleNamespace
+from unittest.mock import patch, ANY
+
+from src.cnfvar import templates
+from src.cnfvar.model import Cnf
+
+
+class CnfTemplatesTest(unittest.TestCase):
+ """Test functions in cnfvar.templates."""
+
+ def test_template_calls(self):
+ """Tests calling each template to detect runtime breakage."""
+ template_cnf = templates.template("name", "value", instance=1, defaults={})
+ self.assertTrue(isinstance(template_cnf, Cnf))
+ attr_dict = templates.__dict__
+ for attr in attr_dict.keys():
+ if attr in ["Cnf", "CnfList"]:
+ continue
+ if attr == "template":
+ continue
+ if callable(attr_dict[attr]):
+ template_cnf = attr_dict[attr]("name", instance=1)
+ self.assertTrue(isinstance(template_cnf, Cnf))
+ elif attr.endswith("_defaults"):
+ self.assertTrue(isinstance(attr_dict[attr], dict))
+
+
+if __name__ == '__main__':
+ unittest.main()
# whether to return 1 as a fail indicator
fail_switch = False
# mapping between expected commands and their mocked output + return code
- cmds = [
- {"cmd": "pgrep -l -x arnied", "stdout": b"", "returncode": 0},
-
- {"cmd": 'echo "LICENSE_ACCEPTED,0: \\"1\\"" | set_cnf', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-program-end GENERATE', "stdout": b"", "returncode": 0},
-
- {"cmd": 'get_cnf PROVIDER 1', "stdout": b"1 PROVIDER,1: \"sample-provider\"", "returncode": 0},
- {"cmd": 'tell-connd --online P1', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/get_var ONLINE', "stdout": b"DEFAULT: 2", "returncode": 0},
-
- {"cmd": "pgrep -l -x arnied", "stdout": b"", "returncode": 0},
- {"cmd": 'get_cnf VIRSCAN_UPDATE_CRON | set_cnf -x', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/arnied_helper --is-scheduled-or-running GENERATE', "stdout": b"", "returncode": 1},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-program-end GENERATE --wait-for-program-timeout 300', "stdout": b"", "returncode": 1},
- {"cmd": '/usr/intranator/bin/arnied_helper --is-scheduled-or-running GENERATE_OFFLINE', "stdout": b"", "returncode": 1},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-program-end GENERATE_OFFLINE --wait-for-program-timeout 300', "stdout": b"", "returncode": 1},
- {"cmd": 'echo \'VIRSCAN_UPDATE_DNS_PUSH,0:"0"\' |set_cnf', "stdout": b"", "returncode": 0},
- {"cmd": 'rm -f /var/intranator/schedule/UPDATE_VIRSCAN_NODIAL*', "stdout": b"", "returncode": 0},
-
- {"cmd": '/usr/intranator/bin/arnied_helper --transfer-mail', "stdout": b"", "returncode": 0},
-
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-arnied-socket --wait-for-arnied-socket-timeout 10', "stdout": b"", "returncode": 0},
- {"cmd": '/usr/intranator/bin/arnied_helper --wait-for-arnied-socket --wait-for-arnied-socket-timeout 30', "stdout": b"", "returncode": 0},
- ]
asserted_cmds = []
def __init__(self, cmd="", ignore_errors=False, vm=None, timeout=60):
def setUp(self):
DummyCmdOutputMapping.fail_switch = False
DummyCmdOutputMapping.asserted_cmds = []
- self.cmd_db = DummyCmdOutputMapping.cmds
def test_verify_running(self):
"""Test checking for running programs."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[0:1]
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": "pgrep -l -x arnied", "stdout": b"", "returncode": 0}]
arnied_wrapper.verify_running(timeout=1)
DummyCmdOutputMapping.fail_switch = True
with self.assertRaises(RuntimeError):
def test_wait_for_arnied(self):
"""Test waiting for arnied to be ready."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": "/usr/intranator/bin/arnied_helper --wait-for-arnied-socket --wait-for-arnied-socket-timeout 10", "stdout": b"", "returncode": 0}]
arnied_wrapper.wait_for_arnied(timeout=10)
- def test_accept_license(self):
- """Test accepting license."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[1:3] + self.cmd_db[8:11]
- arnied_wrapper.accept_licence()
- # make sure an error is ignored since license might
- # already be accepted
- DummyCmdOutputMapping.fail_switch = True
- arnied_wrapper.accept_licence()
-
def test_go_online(self):
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[3:6]
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": 'tell-connd --online P1', "stdout": b"", "returncode": 0},
+ {"cmd": '/usr/intranator/bin/get_var ONLINE', "stdout": b"DEFAULT: 2", "returncode": 0}]
arnied_wrapper.go_online(1)
- def test_disable_virscan(self):
- """Test disabling the virus scanner."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[6:]
- arnied_wrapper.disable_virscan()
-
def test_email_transfer(self):
"""Test e-mail transferring."""
- DummyCmdOutputMapping.asserted_cmds = self.cmd_db[14:15]
+ DummyCmdOutputMapping.asserted_cmds = [{"cmd": '/usr/intranator/bin/arnied_helper --transfer-mail', "stdout": b"", "returncode": 0}]
arnied_wrapper.email_transfer()
+++ /dev/null
-#!/usr/bin/env python
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""Unit test for build_cnfvar.py"""
-import unittest
-import os
-from src.cnfline.build_cnfvar import BuildCnfVar
-
-TEST_CONFIG_FILENAME = 'some_config.cnf'
-
-
-class BuildCnfVarTest(unittest.TestCase):
- def tearDown(self):
- if os.path.isfile(TEST_CONFIG_FILENAME):
- os.unlink(TEST_CONFIG_FILENAME)
-
- def test_simple(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
-
- self.assertEqual('FOOBAR', cnfvar.name)
- self.assertEqual(123, cnfvar.instance)
- self.assertEqual('some_data', cnfvar.data)
- self.assertEqual(10, cnfvar.line_no)
-
- def test_find_child_line_no(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- new_child_line_no = cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
-
- self.assertEqual(new_child_line_no,
- cnfvar.find_child_line_no('FOOBAR_CHILD'))
-
- # Should not be found
- self.assertEqual(0, cnfvar.find_child_line_no('FOOBAR_SPACESHARK'))
-
- def test_find_free_line_no(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- self.assertEqual(11, cnfvar.find_free_line_no())
-
- cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
- self.assertEqual(12, cnfvar.find_free_line_no())
-
- def test_find_free_child_instance(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
-
- self.assertEqual(0, cnfvar.find_free_child_instance('FOOBAR_OTHER'))
- self.assertEqual(1, cnfvar.find_free_child_instance('FOOBAR_CHILD'))
-
- def test_update_cnf(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
-
- # Update existing cnfvar
- cnfvar.update_cnf('FOOBAR_CHILD', 0, 'abc')
-
- self.assertEqual('10 FOOBAR,123: "some_data"\n'
- '11 (10) FOOBAR_CHILD,0: "abc"\n', str(cnfvar))
-
- def test_string_output(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
- cnfvar.update_cnf('FOOBAR_CHILD', 0, 'abc')
- cnfvar.add_cnf('FOOBAR_CHILD', 1, 'more data')
-
- self.assertEqual('10 FOOBAR,123: "some_data"\n'
- '11 (10) FOOBAR_CHILD,0: "abc"\n'
- '12 (10) FOOBAR_CHILD,1: "more data"\n', str(cnfvar))
-
- def test_del_cnf(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
- cnfvar.add_cnf('FOOBAR_CHILD', 1, 'more data')
-
- cnfvar.del_cnf('FOOBAR_CHILD')
-
- self.assertEqual('10 FOOBAR,123: "some_data"\n', str(cnfvar))
-
- def test_add_different_parent_no(self):
- cnfvar = BuildCnfVar('FOOBAR', 123, 'some_data', 10)
- sub_parent = cnfvar.add_cnf('FOOBAR_CHILD', 0, 'xxx')
- cnfvar.add_cnf('FOOBAR_OTHER', 0, 'foo')
- cnfvar.add_cnf('FOOBAR_CHILD_TYPE', 1, 'spaceshark', sub_parent)
- cnfvar.add_cnf('FOOBAR_OTHER2', 0, 'foo2')
-
- self.assertEqual('10 FOOBAR,123: "some_data"\n'
- '11 (10) FOOBAR_CHILD,0: "xxx"\n'
- '12 (10) FOOBAR_OTHER,0: "foo"\n'
- '13 (11) FOOBAR_CHILD_TYPE,1: "spaceshark"\n'
- '14 (10) FOOBAR_OTHER2,0: "foo2"\n', str(cnfvar))
-
- def test_add_defaults(self):
- cnfvar = BuildCnfVar('FOOBAR', 0, 'some_data')
-
- defaults = {'FOOBAR_SOMETHING': 'abc',
- 'FOOBAR_MODE': 'optimize'}
- cnfvar.add_defaults(defaults)
-
- self.assertTrue('1 FOOBAR,0: "some_data"\n'
- '2 (1) FOOBAR_SOMETHING,0: "abc"\n'
- '3 (1) FOOBAR_MODE,0: "optimize"\n' == str(cnfvar)
- or '1 FOOBAR,0: "some_data"\n'
- '2 (1) FOOBAR_MODE,0: "optimize"\n'
- '3 (1) FOOBAR_SOMETHING,0: "abc"\n' == str(cnfvar))
-
- def test_mark_as_own_parent(self):
- cnfvar = BuildCnfVar('FOOBAR_SOMETHING', 123, 'some_data', 10)
-
- line_no = cnfvar.add_cnf('FOOBAR_OTHER', 0, 'xxx')
- cnfvar.mark_as_own_parent(line_no)
-
- self.assertEqual('10 FOOBAR_SOMETHING,123: "some_data"\n'
- '11 FOOBAR_OTHER,0: "xxx"\n', str(cnfvar))
-
- def test_save(self):
- cnfvar = BuildCnfVar('FOOBAR', 0, 'some_data')
-
- defaults = {'FOOBAR_SOMETHING': 'abc',
- 'FOOBAR_MODE': 'optimize'}
- cnfvar.add_defaults(defaults)
-
- cnfvar.save(TEST_CONFIG_FILENAME)
- with open(TEST_CONFIG_FILENAME, 'r') as input:
- read_back = input.read()
-
- self.assertTrue('1 FOOBAR,0: "some_data"\n'
- '2 (1) FOOBAR_SOMETHING,0: "abc"\n'
- '3 (1) FOOBAR_MODE,0: "optimize"\n' == read_back
- or '1 FOOBAR,0: "some_data"\n'
- '2 (1) FOOBAR_MODE,0: "optimize"\n'
- '3 (1) FOOBAR_SOMETHING,0: "abc"\n' == read_back)
-
- os.unlink(TEST_CONFIG_FILENAME)
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-#!/usr/bin/env python
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-import unittest
-from src.cnfline.cnfline import CnfLine
-from src.cnfline.build_nic import BuildNIC
-
-
-class CnfLineTest(unittest.TestCase):
- def test_simple(self):
- line = CnfLine('MY_NAME', 123, 'my_data', 888, 456)
-
- self.assertEqual('MY_NAME', line.name)
- self.assertEqual(123, line.instance)
- self.assertEqual('my_data', line.data)
- self.assertEqual(888, line.line_no)
- self.assertEqual(456, line.parent_line_no)
-
- def test_deny_empty_name(self):
- with self.assertRaises(ValueError):
- CnfLine('')
-
- def test_deny_lineno_zero(self):
- with self.assertRaises(ValueError):
- CnfLine('foobar', 0, 'some_data', 0)
-
- def test_str_output_parent(self):
- line = CnfLine('MY_NAME', 123, 'my_data', 10, 0)
- self.assertEqual('10 MY_NAME,123: "my_data"', str(line))
-
- def test_str_output_child(self):
- line = CnfLine('MY_NAME', 123, 'my_data', 10, 456)
- self.assertEqual('10 (456) MY_NAME,123: "my_data"', str(line))
-
-
-class BuildNICTest(unittest.TestCase):
- def test_nic_comment(self):
- nic = BuildNIC('my comment', 10, 100)
-
- cnf_text = str(nic)
- self.assertTrue('NIC_COMMENT,0: "my comment"' in cnf_text)
- self.assertEqual('', nic.data)
-
- def test_change_comment(self):
- nic = BuildNIC('initial comment', 10, 100)
- nic.comment('new comment')
-
- cnf_text = str(nic)
- self.assertTrue('NIC_COMMENT,0: "new comment"' in cnf_text)
- self.assertTrue('initial comment"' not in cnf_text)
- self.assertEqual('', nic.data)
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-#!/usr/bin/env python
-# This Python file uses the following encoding: utf-8
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-import os
-import unittest
-
-from src import cnfvar_old
-
-#
-# test data
-#
-
-# model cnf tree
-demo_cnfvar = {"cnf": [
- {
- "varname": "MY_FAVORITE_CNF_VAR",
- "instance": 1337,
- "number": 42,
- "data": "string conf content",
- "comment": ""
- },
- {
- "varname": "SOME_NESTED_CNF_VAR",
- "instance": 0,
- "number": 23,
- "data": "999",
- "comment": "",
- "children": [
- {
- "varname": "SOME_CHILD_VAR",
- "instance": 1,
- "number": 24,
- "data": "2014",
- "parent": 23,
- "comment": ""
- }
- ]
- },
-]}
-
-# duplicate line number
-demo_invalid_cnfvar = {"cnf": [
- {
- "varname": "SOME_PARTICULARLY_TASTY_CNF_VAR",
- "instance": 1337,
- "number": 23,
- "data": "classic wingers",
- "comment": ""
- },
- {
- "varname": "EXTRAORDINARILY_FANCY_CNF_VAR",
- "instance": 1,
- "number": 42,
- "data": "ab mentions",
- "comment": ""
- },
- {
- "varname": "ANOTHER_POPULAR_CNF_VAR",
- "instance": 0,
- "number": 42,
- "data": "notches",
- "comment": ""
- }
-]}
-
-demo_jsoncnf = """
-{
- "cnf" : [
- {
- "children" : [
- {
- "comment" : "",
- "data" : "1",
- "instance" : 0,
- "number" : 2,
- "parent" : 1,
- "varname" : "FIREWALL_SERVICEGROUP_PREDEFINED_ID"
- },
- {
- "children" : [
- {
- "comment" : "",
- "data" : "",
- "instance" : 0,
- "number" : 4,
- "parent" : 3,
- "varname" : "FIREWALL_SERVICEGROUP_TYPE_COMMENT"
- },
- {
- "comment" : "",
- "data" : "80",
- "instance" : 0,
- "number" : 5,
- "parent" : 3,
- "varname" : "FIREWALL_SERVICEGROUP_TYPE_TCPUDP_DST_PORT"
- }
- ],
- "comment" : "",
- "data" : "TCP",
- "instance" : 0,
- "number" : 3,
- "parent" : 1,
- "varname" : "FIREWALL_SERVICEGROUP_TYPE"
- }
- ],
- "comment" : "",
- "data" : "http",
- "instance" : 1,
- "number" : 1,
- "varname" : "FIREWALL_SERVICEGROUP"
- },
- {
- "children" : [
- {
- "comment" : "",
- "data" : "2",
- "instance" : 0,
- "number" : 7,
- "parent" : 6,
- "varname" : "FIREWALL_SERVICEGROUP_PREDEFINED_ID"
- },
- {
- "children" : [
- {
- "comment" : "",
- "data" : "",
- "instance" : 0,
- "number" : 9,
- "parent" : 8,
- "varname" : "FIREWALL_SERVICEGROUP_TYPE_COMMENT"
- },
- {
- "comment" : "",
- "data" : "443",
- "instance" : 0,
- "number" : 10,
- "parent" : 8,
- "varname" : "FIREWALL_SERVICEGROUP_TYPE_TCPUDP_DST_PORT"
- }
- ],
- "comment" : "",
- "data" : "TCP",
- "instance" : 0,
- "number" : 8,
- "parent" : 6,
- "varname" : "FIREWALL_SERVICEGROUP_TYPE"
- }
- ],
- "comment" : "",
- "data" : "https",
- "instance" : 2,
- "number" : 6,
- "varname" : "FIREWALL_SERVICEGROUP"
- }
- ]
-}
-"""
-
-demo_jsoncnf_bytes = demo_jsoncnf.encode ("latin-1")
-
-demo_nonascii = r"""
-{ "cnf" : [
- {
- "children" : [
- {
- "comment" : "",
- "data" : "0",
- "instance" : 0,
- "number" : 2,
- "parent" : 1,
- "varname" : "USER_DISABLED"
- },
- {
- "comment" : "",
- "data" : "Administrator",
- "instance" : 0,
- "number" : 3,
- "parent" : 1,
- "varname" : "USER_FULLNAME"
- },
- {
- "comment" : "",
- "data" : "INBOX/Kalender",
- "instance" : 0,
- "number" : 4,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_CALENDAR"
- },
- {
- "comment" : "",
- "data" : "INBOX/Kontakte",
- "instance" : 0,
- "number" : 5,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_CONTACTS"
- },
- {
- "comment" : "",
- "data" : "INBOX/Entwürfe",
- "instance" : 0,
- "number" : 6,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_DRAFTS"
- },
- {
- "comment" : "",
- "data" : "INBOX/Notizen",
- "instance" : 0,
- "number" : 7,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_NOTES"
- },
- {
- "comment" : "",
- "data" : "INBOX/Gesendete Elemente",
- "instance" : 0,
- "number" : 8,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_OUTBOX"
- },
- {
- "comment" : "",
- "data" : "INBOX/Aufgaben",
- "instance" : 0,
- "number" : 9,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_TASKS"
- },
-
- {
- "comment" : "",
- "data" : "INBOX/Gelöschte Elemente",
- "instance" : 0,
- "number" : 10,
- "parent" : 1,
- "varname" : "USER_GROUPWARE_FOLDER_TRASH"
- },
- {
- "comment" : "",
- "data" : "1",
- "instance" : 0,
- "number" : 11,
- "parent" : 1,
- "varname" : "USER_GROUP_MEMBER_REF"
- },
- {
- "comment" : "",
- "data" : "2",
- "instance" : 1,
- "number" : 12,
- "parent" : 1,
- "varname" : "USER_GROUP_MEMBER_REF"
- },
- {
- "comment" : "",
- "data" : "",
- "instance" : 0,
- "number" : 13,
- "parent" : 1,
- "varname" : "USER_LOCALE"
- },
- {
- "comment" : "",
- "data" : "idkfa",
- "instance" : 0,
- "number" : 14,
- "parent" : 1,
- "varname" : "USER_PASSWORD"
- },
- {
- "comment" : "",
- "data" : "30",
- "instance" : 0,
- "number" : 15,
- "parent" : 1,
- "varname" : "USER_TRASH_DELETEDAYS"
- }
- ],
- "comment" : "",
- "data" : "admin",
- "instance" : 1,
- "number" : 1,
- "varname" : "USER"
- }
-]}
-"""
-
-demo_latin1crap = demo_nonascii.encode('latin1')
-
-demo_cnf_group = """
-1 GROUP,1: "Administratoren"
-2 (1) GROUP_ACCESS_GO_ONLINE_ALLOWED,0: "1"
-3 (1) GROUP_EMAILFILTER_BAN_FILTERLIST_REF,0: "-1"
-4 (1) GROUP_EMAIL_RELAY_RIGHTS,0: "RELAY_FROM_INTRANET"
-5 (1) GROUP_PROXY_PROFILE_REF,0: "1"
-"""
-
-demo_cnf_group_bytes = demo_cnf_group.encode ("latin-1")
-
-demo_cnf_filter = b"""
-1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten"
-2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK"
-3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: ""
-4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: ""
-5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain"
-6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html"
-7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW"
-8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1"
-"""
-
-demo_cnf_comments = b"""
-1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten"
-2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK"
-3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: ""
-4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: "" # foo
-5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain"#bar
-6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html"
-7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW" # baz
-8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1"
-"""
-
-demo_cnf_escaped_quotes = """
-1 HERE_BE_QUOTES,0: "\""
-2 HERE_BE_QUOTES,1: "foo\"bar\"\"\"baz"
-3 HERE_BE_QUOTES,2: "unquo\\\"table"
-4 HERE_BE_QUOTES,3: "unquo\\\\\"\"table"
-"""
-
-demo_json_escaped_quotes = """
-{ "cnf": [ { "number" : 1,
- "varname" : "HERE_BE_QUOTES",
- "instance" : 0,
- "data" : "\\"" },
- { "number" : 2,
- "varname" : "HERE_BE_QUOTES",
- "instance" : 1,
- "data" : "foo\\"bar\\"\\"\\"baz" },
- { "number" : 3,
- "varname" : "HERE_BE_QUOTES",
- "instance" : 2,
- "data" : "unquo\\\\\\"table" },
- { "number" : 4,
- "varname" : "HERE_BE_QUOTES",
- "instance" : 3,
- "data" : "unquo\\\\\\\\\\"\\"table" } ] }
-"""
-
-#
-# test class
-#
-
-class CnfVarUnittest(unittest.TestCase):
-
- def test_print_cnf(self):
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf(demo_cnfvar, out=devnull)
-
- def test_parse_cnf_simple_str(self):
- cnf = cnfvar_old.read_cnf(demo_cnf_group)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf_json(cnf, out=devnull)
-
- def test_parse_cnf_simple_bytes(self):
- cnf = cnfvar_old.read_cnf(demo_cnf_group_bytes)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf_json(cnf, out=devnull)
-
- def test_parse_cnf_nested(self):
- cnf = cnfvar_old.read_cnf(demo_cnf_filter)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf_json(cnf, out=devnull)
-
- def test_parse_cnf_comments(self):
- cnf = cnfvar_old.read_cnf(demo_cnf_comments)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf_json(cnf, out=devnull)
-
- def test_print_cnf_garbage(self):
- try:
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf(demo_invalid_cnfvar, out=devnull)
- except cnfvar_old.InvalidCNF:
- print ("Caught the duplicate line, bravo!")
-
- def test_read_json_str(self):
- cnf = cnfvar_old.read_cnf_json(demo_jsoncnf)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf(cnf, out=devnull)
-
- def test_read_json_bytes(self):
- cnf = cnfvar_old.read_cnf_json(demo_jsoncnf_bytes)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf(cnf, out=devnull)
-
- def test_read_json_nonascii(self):
- cnf = cnfvar_old.read_cnf_json(demo_nonascii)
- with open(os.devnull, "wb") as devnull:
- cnfvar_old.print_cnf(cnf, out=devnull)
-
- def test_read_json_latin1(self):
- cnf = cnfvar_old.read_cnf_json(demo_latin1crap)
- with open(os.devnull, "wb") as devnull:
- cnfvar_old.print_cnf(cnf, out=devnull)
-
- def test_parse_cnf_quotes(self):
- cnf = cnfvar_old.read_cnf(demo_cnf_escaped_quotes)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf_json(cnf, out=devnull)
-
- def test_parse_json_quotes(self):
- cnf = cnfvar_old.read_cnf_json(demo_json_escaped_quotes)
- with open(os.devnull, "w") as devnull:
- cnfvar_old.print_cnf_json(cnf, out=devnull)
-
-
-class CnfVarUnittestVarnameCase(unittest.TestCase):
- """Tests for verifying that uppercasing/lowercasing of varname works."""
- # TODO: rethink whether this lower-casing is worth all the effort it causes
-
- def test_dump_cnf_uppercase(self):
- """Test dump of cnfvars results in uppercase var names."""
- cnf = {"cnf": [
- {"instance": 0, "varname": "dialout_mode",
- "data": "ONLINE", "number": 1, "comment": None},
- {"instance": 0, "varname": "dialout_defaultprovider_ref",
- "data": "1", "number": 2, "comment": None},
- {"instance": 0, "varname": "hypothetical_parent",
- "data": "parent value", "number": 3, "comment": None,
- "children": [
- {"instance": 0, "varname": "hypothetical_child",
- "data": "0", "number": 4, "parent": 3, "comment": None},
- {"instance": 1, "varname": "hypothetical_child",
- "data": "1", "number": 5, "parent": 3, "comment": None}]}
- ]}
- serialization = cnfvar_old.dump_json_string(cnf)
- self.assertIn('DIALOUT_MODE', serialization)
- self.assertIn('DIALOUT_DEFAULTPROVIDER_REF', serialization)
- self.assertIn('HYPOTHETICAL_CHILD', serialization)
- self.assertNotIn('dialout_mode', serialization)
- self.assertNotIn('dialout_defaultprovider_ref', serialization)
- self.assertNotIn('hypothetical_child', serialization)
-
- def test_read_cnf_lowercase(self):
- """Test that after reading, varnames are lowercase."""
- cnf = cnfvar_old.read_cnf_json(demo_jsoncnf.encode('latin1'))
- for parentvar in cnf['cnf']:
- self.assertEqual(parentvar['varname'],
- parentvar['varname'].lower())
- if 'children' in parentvar:
- for childvar in parentvar['children']:
- self.assertEqual(parentvar['varname'],
- parentvar['varname'].lower())
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-#!/usr/bin/env python
-# This Python file uses the following encoding: utf-8
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-import unittest
-from traceback import print_exc
-from tempfile import mkstemp
-import os
-import sys
-
-from src.simple_cnf import SimpleCnf
-from src.simple_cnf import InvalidCnf
-
-TEST_SET = """
-1 USER,1: "admin"
-2 (1) USER_DISABLED,0: "0"
-3 (1) USER_FULLNAME,0: "Administrator"
-4 (1) USER_GROUPWARE_FOLDER_CALENDAR,0: "INBOX/Kalender"
-5 (1) USER_GROUPWARE_FOLDER_CONTACTS,0: "INBOX/Kontakte"
-6 (1) USER_GROUPWARE_FOLDER_DRAFTS,0: "INBOX/Entwürfe"
-7 (1) USER_GROUPWARE_FOLDER_NOTES,0: "INBOX/Notizen"
-8 (1) USER_GROUPWARE_FOLDER_OUTBOX,0: "INBOX/Gesendete Elemente"
-9 (1) USER_GROUPWARE_FOLDER_TASKS,0: "INBOX/Aufgaben"
-10 (1) USER_GROUPWARE_FOLDER_TRASH,0: "INBOX/Gelöschte Elemente"
-11 (1) USER_GROUP_MEMBER_REF,0: "1"
-12 (1) USER_GROUP_MEMBER_REF,1: "2"
-13 (1) USER_LOCALE,0: ""
-14 (1) USER_PASSWORD,0: "test1234"
-15 (1) USER_TRASH_DELETEDAYS,0: "30"
-16 USER,2: "test"
-17 (16) USER_DISABLED,0: "0"
-18 (16) USER_EMAIL_VACATION,0: "ON"
-19 (16) USER_EMAIL_VACATION_AUTOMATIC_END,0: ""
-20 (16) USER_EMAIL_VACATION_AUTOMATIC_START,0: ""
-21 (16) USER_EMAIL_VACATION_AUTOMATIC_STATE,0: "UNKNOWN"
-22 (16) USER_EMAIL_VACATION_REPLYDAYS,0: "1"
-23 (16) USER_EMAIL_VACATION_TEXT,0: "Bin im Urlaub"
-24 (16) USER_FULLNAME,0: "testnutzer"
-25 (16) USER_GROUPWARE_FOLDER_CALENDAR,0: "INBOX/Kalender"
-26 (16) USER_GROUPWARE_FOLDER_CONTACTS,0: "INBOX/Kontakte"
-27 (16) USER_GROUPWARE_FOLDER_DRAFTS,0: "INBOX/Entwürfe"
-28 (16) USER_GROUPWARE_FOLDER_NOTES,0: "INBOX/Notizen"
-29 (16) USER_GROUPWARE_FOLDER_OUTBOX,0: "INBOX/Gesendete Elemente"
-30 (16) USER_GROUPWARE_FOLDER_TASKS,0: "INBOX/Aufgaben"
-31 (16) USER_GROUPWARE_FOLDER_TRASH,0: "INBOX/Gelöschte Elemente"
-32 (16) USER_GROUP_MEMBER_REF,0: "2"
-33 (16) USER_GROUP_MEMBER_REF,1: "3"
-34 (16) USER_LOCALE,0: ""
-35 (16) USER_PASSWORD,0: "test1234"
-36 (16) USER_TRASH_DELETEDAYS,0: "30"
-37 (16) USER_WEBMAIL_MESSAGES_PER_PAGE,0: "25"
-38 (16) USER_WEBMAIL_SIGNATURE,0: ""
-39 USER,3: "mueller"
-40 (39) USER_DISABLED,0: "0"
-41 (39) USER_FULLNAME,0: "Kärößü"
-42 (39) USER_GROUPWARE_FOLDER_CALENDAR,0: "INBOX/Kalender"
-43 (39) USER_GROUPWARE_FOLDER_CONTACTS,0: "INBOX/Kontakte"
-44 (39) USER_GROUPWARE_FOLDER_DRAFTS,0: "INBOX/Entwürfe"
-45 (39) USER_GROUPWARE_FOLDER_NOTES,0: "INBOX/Notizen"
-46 (39) USER_GROUPWARE_FOLDER_OUTBOX,0: "INBOX/Gesendete Elemente"
-47 (39) USER_GROUPWARE_FOLDER_TASKS,0: "INBOX/Aufgaben"
-48 (39) USER_GROUPWARE_FOLDER_TRASH,0: "INBOX/Gelöschte Elemente"
-49 (39) USER_GROUP_MEMBER_REF,0: "2"
-50 (39) USER_GROUP_MEMBER_REF,1: "3"
-51 (39) USER_LOCALE,0: ""
-52 (39) USER_PASSWORD,0: "grmpfl"
-53 (39) USER_TRASH_DELETEDAYS,0: "30"
-54 (39) USER_WEBMAIL_MESSAGES_PER_PAGE,0: "25"
-55 (39) USER_WEBMAIL_SIGNATURE,0: ""
-56 BACKUP_COMPRESS_ENABLE,0: "1"
-57 BACKUP_CRON,0: "0123456"
-58 (57) BACKUP_CRON_BEGIN,0: "7200"
-59 BACKUP_ENCRYPT_ENABLE,0: "0"
-60 BACKUP_ENCRYPT_PASSWORD,0: ""
-61 TESTVAR,0: "test"
-"""
-
-#: encoding for text files, set in :py:func:`setUpModule`
-ENCODING = None
-
-
-def setUpModule():
- """Called once before all tests; determines encoding
-
- This test might run in very limited build environments without locale, so
- auto-selection of temp-file text encoding may return 'ASCII'.
- Therefore, do the encoding "manually".
- """
- global ENCODING
- ENCODING = sys.getfilesystemencoding()
- if ENCODING.lower() in ('ascii', 'ansi_x3.4-1968'):
- ENCODING = 'utf8'
- print('\nWARNING: Using fallback encoding {} for temp file creation'
- .format(ENCODING))
-
-
-class SimpleCnfTest(unittest.TestCase):
- """ The one and only test case in this module `-->` see module doc """
-
- # setup and cleanup
-
- import_file = None
- cnf = None
-
- @classmethod
- def _import_cnf(cls):
- """ import conf var data from temp file """
- cls.cnf = SimpleCnf()
- cls.cnf.append_file(cls.import_file)
-
- @classmethod
- def setUpClass(cls):
- """ before running tests: write conf var string to temp file """
- try:
- sys_file_descriptor, cls.import_file = mkstemp()
- os.close(sys_file_descriptor)
- with open(cls.import_file, 'wb') as file_handle:
- file_handle.write(TEST_SET.encode(ENCODING))
- except Exception:
- print('\nWARNING: exception creating temp file:')
- print_exc()
-
- # clean up
- cls.tearDownClass()
-
- # re-raise
- raise
-
- cls._import_cnf()
-
- @classmethod
- def tearDownClass(cls):
- """ after all tests have run, delete temp file """
- if cls.import_file is not None:
- try:
- os.unlink(cls.import_file)
- except Exception:
- print('\nWARNING: exception deleting temp file:')
- print_exc()
-
- # tests
-
- def test_ctor_ok (self):
- """
- Test initializer :py:meth:`SimpleCnf.__init__` must succeed on sane
- inputs.
- """
- self.assertNotEqual (SimpleCnf ({"cnf": []}), None)
- self.assertNotEqual (SimpleCnf ([]), None)
- self.assertNotEqual (SimpleCnf (), None)
-
- def test_ctor_fail (self):
- """
- Test initializer :py:meth:`SimpleCnf.__init__` must reject inputs that
- ``cnfvar.py`` does not handle.
- """
- with self.assertRaises (InvalidCnf):
- _void = SimpleCnf (b"junk")
- with self.assertRaises (InvalidCnf):
- _void = SimpleCnf (42)
- with self.assertRaises (InvalidCnf):
- _void = SimpleCnf (tuple ([1701, 1337]))
- with self.assertRaises (InvalidCnf):
- _void = SimpleCnf (set (["one", "two"]))
-
- def test_eq(self):
- """ test method :py:meth:`SimpleCnf.__eq__` """
- self.assertEqual(self.cnf[61], self.cnf[61])
- self.assertEqual(self.cnf['testvar'], self.cnf[61])
- self.assertEqual(self.cnf, self.cnf)
- self.assertNotEqual(self.cnf[56], self.cnf[57])
- self.assertNotEqual(SimpleCnf ({"cnf": []}), None)
- self.assertNotEqual(SimpleCnf ({"cnf": []}), 42)
- self.assertNotEqual(SimpleCnf ({"cnf": []}), "got wood?")
- self.assertEqual(SimpleCnf (), SimpleCnf ({"cnf": []}))
-
- def test_len(self):
- """ test method :py:meth:`SimpleCnf.__len__` """
- self.assertEqual(len(self.cnf), 8)
-
- def test_getitem(self):
- """ test method :py:meth:`SimpleCnf.__item__` """
- self.assertEqual(len(self.cnf['user']), 3)
- self.assertEqual(self.cnf['USER'], self.cnf['user'])
- self.assertEqual(len(self.cnf['backup_encrypt_password']), 1)
- self.assertEqual(len(self.cnf[12232]), 0)
- self.assertEqual(len(self.cnf[55]), 0)
- self.assertEqual(len(self.cnf[61]), 1)
- self.assertEqual(len(self.cnf['user_webmail_signature']), 0)
-
- def test_get(self):
- """ test method :py:meth:`SimpleCnf.get` """
- self.assertEqual(len(self.cnf.get()), 8)
- self.assertEqual(len(self.cnf.get(name='user')), 3)
-
- def test_get_value(self):
- """ test method :py:meth:`SimpleCnf.get_value` """
-
- with self.assertRaises(ValueError):
- self.cnf.get_value()
-
- self.assertEqual(self.cnf[56].get_value(), '1')
- self.assertEqual(self.cnf[61].get_value(), 'test')
-
- def test_get_children(self):
- """ test method :py:meth:`SimpleCnf.get_children` """
- with self.assertRaises(ValueError):
- self.cnf.get_children()
- self.assertEqual(len(self.cnf.get(name='user', value='mueller')
- .get_children()), 16)
- self.assertEqual(self.cnf.get(name='user'),
- self.cnf.get(name='USER'))
- self.assertEqual(self.cnf.get(name='user', value='mueller'),
- self.cnf.get(name='USER', value='mueller'))
- self.assertEqual(len(self.cnf[57].get_children()), 1)
- self.assertEqual(self.cnf[57].get_children().get_value(), '7200')
-
- def test_add_alone(self):
- """ test method :py:meth:`SimpleCnf.add` on empty conf """
- # do not use self.cnf since that would void other test methods
- cnf = SimpleCnf()
- self.assertEqual(len(cnf), 0)
- cnf.add('new_var', 'new_value')
- self.assertEqual(len(cnf), 1)
- cnf_var = cnf.get(name='new_var').get_single_dict()
- self.assertEqual(cnf_var['data'], 'new_value')
- self.assertIsInstance(cnf_var['data'], str)
- self.assertEqual(cnf_var['number'], 1)
-
- def test_add_on_top(self):
- """ test method :py:meth:`SimpleCnf.add` on regular conf """
- cnf = SimpleCnf()
- self.assertEqual(len(cnf), 0)
- cnf.append_file(self.import_file)
- self.assertEqual(len(cnf), 8)
-
- cnf.add('new_var', 'new_value')
- self.assertEqual(len(cnf), 9)
- cnf_var = cnf.get(name='new_var').get_single_dict()
- self.assertEqual(cnf_var['data'], 'new_value')
- self.assertIsInstance(cnf_var['data'], str)
- self.assertEqual(cnf_var['number'], 62)
-
- def test_add_with_children(self):
- """ test method :py:meth:`SimpleCnf.add` by adding var with children"""
- # load config
- cnf = SimpleCnf()
- cnf.append_file(self.import_file)
- self.assertEqual(len(cnf['user']), 3)
-
- # get a certain user with all its sub config
- user_cnf = cnf.get(name='user', value='admin')
- self.assertEqual(len(user_cnf), 1)
-
- # copy as new user with different name but same children
- cnf.add('user', 'admin2', children=user_cnf.get_children())
- self.assertEqual(len(cnf['user']), 4)
- self.assertEqual(len(cnf.get(name='user', value='admin2')), 1)
- self.assertEqual(len(cnf.get(name='user', value='admin2').get_children()), 14)
-
-
-if __name__ == '__main__':
- unittest.main()