From 7abff5a707178022d60e87dcf7f8c19157b4021c Mon Sep 17 00:00:00 2001 From: Samir Aguiar Date: Thu, 17 Mar 2022 18:14:05 -0300 Subject: [PATCH] Rename cnfvar module to cnfvar_old The new API will replace it and we want to prevent users from being confused about which to use. --- src/arnied_wrapper.py | 14 +- src/cnfvar.py | 1165 ----------------------------------------------- src/cnfvar_old.py | 1165 +++++++++++++++++++++++++++++++++++++++++++++++ src/simple_cnf.py | 14 +- test/test_cnfvar.py | 476 ------------------- test/test_cnfvar_old.py | 476 +++++++++++++++++++ 6 files changed, 1655 insertions(+), 1655 deletions(-) delete mode 100644 src/cnfvar.py create mode 100644 src/cnfvar_old.py delete mode 100755 test/test_cnfvar.py create mode 100755 test/test_cnfvar_old.py diff --git a/src/arnied_wrapper.py b/src/arnied_wrapper.py index 4d13c4c..e8ee70e 100644 --- a/src/arnied_wrapper.py +++ b/src/arnied_wrapper.py @@ -55,7 +55,7 @@ import logging log = logging.getLogger('pyi2ncommon.arnied_wrapper') from .cnfline import build_cnfvar -from . import cnfvar +from . import cnfvar_old from . import sysmisc @@ -495,16 +495,16 @@ def get_cnfvar(varname=None, instance=None, data=None, timeout=30, vm=None): # the output of "get_cnf" (no json) which is latin1. if isinstance(raw, bytes): raw = raw.decode("utf-8") - cnf = cnfvar.read_cnf_json(raw) + cnf = cnfvar_old.read_cnf_json(raw) except TypeError as exn: log.info("error \"%s\" parsing result of \"%s\"", exn, cmd_line) return None - except cnfvar.InvalidCNF as exn: + except cnfvar_old.InvalidCNF as exn: log.info("error \"%s\" validating result of \"%s\"", exn, cmd_line) return None if data is not None: - return cnfvar.get_vars(cnf, data=data) + return cnfvar_old.get_vars(cnf, data=data) return cnf @@ -680,9 +680,9 @@ def set_cnf_dynamic(cnf, config_file=None, kind="cnf", timeout=30, vm=None): fd = open(config_path, "wb") try: SET_CNF_METHODS = { - "raw": cnfvar.write_cnf_raw, - "json": cnfvar.write_cnf_json, - "cnf": cnfvar.write_cnf + "raw": cnfvar_old.write_cnf_raw, + "json": cnfvar_old.write_cnf_json, + "cnf": cnfvar_old.write_cnf } SET_CNF_METHODS[kind](cnf, out=fd) except KeyError: diff --git a/src/cnfvar.py b/src/cnfvar.py deleted file mode 100644 index a8b0fef..0000000 --- a/src/cnfvar.py +++ /dev/null @@ -1,1165 +0,0 @@ -#!/usr/bin/env python -# -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" - -summary -------------------------------------------------------------------------------- -Represent CNF_VARs as recursive structures. - -Copyright: 2014-2017 Intra2net AG -License: GPLv2+ - - -contents -------------------------------------------------------------------------------- - -This module provides read and write functionality for the Intra2net *CNF* -format. Two different syntaxes are available: classical *CNF* and a JSON -representation. Both versions are commonly understood by Intra2net software. - -On the command line, raw CNF is accepted if the option ``-`` is given: :: - - $ get_cnf routing 2 |python3 cnfvar.py - < nothing left to do - break - current = get(state) - parent = get_parent(current) # peek at next line - if parent is not None: # -> recurse into children - (state, children, _parent) = parse_cnf_children(state, parent) - cnf_line["children"] = children - if state is None: - break - current = get(state) - else: - state = advance(state) - if state is None: - break - current = get(state) - return lines - - -def parse_cnf_children(state, parent): - """ - Read and parse child CNFs of a given parent until there is none left. - - :param state: a 3-element tuple containing two lines and a list of the - remaining lines - :type state: (str, str, [str]) - :param parent: id of the parent whose children we are looking for - :type parent: int - :returns: a 3-element tuple with the current state, a list of children of - the given parent and the parent ID - :rtype: (tuple, [str], int) - - The function will recursively parse child lines from the `state` tuple - until one of these conditions is satisfied: - - 1. the input is exhausted - 2. the next CNF line - 2.1. is a toplevel line - 2.2. is a child line whose parent has a lower parent number - - Conceptually, 2.1 is a very similar to 2.2 but due to the special status of - toplevel lines in CNF we need to handle them separately. - - Note that since nesting of CNF vars is achieved via parent line numbers, - lines with different parents could appear out of order. libcnffile will - happily parse those and still assign children to the specified parent: - - :: - # set_cnf < parent: - # parent is further down in hierarchy -> new level - (state, children, new_parent) = \ - parse_cnf_children (state, new_parent) - if state is None: - break - cnf_line["children"] = children - current = get(state) - new_parent = get_parent(current) - if new_parent is None: - # drop stack - return (state, lines, None) - if new_parent < parent: - # parent is further up in hierarchy -> pop level - return (state, lines, new_parent) - # new_parent == parent -> continue parsing on same level - return (state, lines, parent) - - -def get_parent(line): - """ - Extract the ID of the parent for a given CNF line. - - :param str line: CNF line - :returns: parent ID or None if no parent is found - :rtype: int or None - """ - match = re.match(grab_parent_pattern, line) - if match is None: # -> no parent - return None - return int(match.groups()[0]) - - -def read_base_line(line): - """ - Turn one top-level CNF line into a dictionary. - - :param str line: CNF line - :rtype: {str: Any} - - This performs the necessary decoding on values to obtain proper Python - strings from 8-bit encoded CNF data. - - The function only operates on individual lines. Argument strings that - contain data for multiple lines – this includes child lines of the current - CNF var! – will trigger a parsing exception. - """ - if len(line.strip()) == 0: - return None # ignore empty lines - if line[0] == b"#": - return None # ignore comments - - match = re.match(base_line_pattern, line) - if match is None: - raise MalformedCNF("Syntax error in line \"\"\"%s\"\"\"" % line) - number, varname, instance, data, comment = match.groups() - return { - "number" : marshal_in_number (number), - "varname" : marshal_in_varname (varname), - "instance" : marshal_in_instance (instance), - "data" : marshal_in_data (data), - "comment" : marshal_in_comment (comment), - } - - -def read_child_line(line): - """ - Turn one child CNF line into a dictionary. - - :param str line: CNF line - :rtype: {str: Any} - - This function only operates on individual lines. If the argument string is - syntactically valid but contains input representing multiple CNF vars, a - parse error will be thrown. - """ - if len(line.strip()) == 0: - return None # ignore empty lines - if line[0] == "#": - return None # ignore comments - - match = re.match(child_line_pattern, line) - if match is None: - raise MalformedCNF("Syntax error in child line \"\"\"%s\"\"\"" - % from_latin1 (line)) - number, parent, varname, instance, data, comment = match.groups() - return { - "number" : marshal_in_number (number), - "parent" : marshal_in_parent (parent), - "varname" : marshal_in_varname (varname), - "instance" : marshal_in_instance (instance), - "data" : marshal_in_data (data), - "comment" : marshal_in_comment (comment), - } - - -############################################################################### -# SERIALIZATION -############################################################################### - - -cnf_line_nest_indent = " " -cnf_line_base_fmt = "%d %s,%d: \"%s\"" -cnf_line_child_fmt = "%d %s(%d) %s,%d: \"%s\"" - - -def format_cnf_vars(da, var): - """ - Return a list of formatted cnf_line byte strings. - - :param da: a tuple where the first element is the depth (0 = top-level, - >1 = child CNF) and the second is the string being built. - :type da: (int, str) - :param var: the CNF element to convert to string in the current iteration - :type var: dict - :returns: a tuple like `da`, where the second element should contain all - converted CNFs - :rtype: (int, str) - - This function is meant to be passed to the :py:func:`functools.reduce` - function. - - The variable names are uppercased unconditionally because while ``get_cnf`` - is case-indifferent for variable names, ``set_cnf`` isn’t. - """ - depth, acc = da - line = None - if depth > 0: - line = cnf_line_child_fmt \ - % (var["number"], - cnf_line_nest_indent * depth, - var["parent"], - var["varname"].upper(), - var["instance"], - var["data"]) - else: - line = cnf_line_base_fmt \ - % (var["number"], - var["varname"].upper(), - var["instance"], - var["data"]) - - comment = var.get("comment", None) - if comment and len(comment) != 0: - line = line + (" # %s" % comment) - - acc.append(to_latin1(line)) - - children = var.get("children", None) - if children is not None: - (_, acc) = functools.reduce(format_cnf_vars, children, (depth + 1, acc)) - - return (depth, acc) - - -def cnf_root(root): - """ - Extract a list of CNFs from a given structure. - - :param root: list of CNFs or a CNF dictionary - :type root: [dict] or dict - :raises: :py:class:`TypeError` if no CNFs can be extracted - :returns: list with one or more CNF objects - :rtype: [dict] - - Output varies depending on a few conditions: - - If `root` is a list, return it right away - - If `root` is a dict corresponding to a valid CNF value, return it wrapped - in a list - - If `root` is a dict with a `cnf` key containg a list (as the JSON - returned by `get_cnf -j`), return the value - - Otherwise, raise an error - """ - if isinstance(root, list): - return root - if not isinstance(root, dict): - raise TypeError( - "Expected dictionary of CNF_VARs, got %s." % type(root)) - if is_cnf_var(root): - return [root] - cnf = root.get("cnf", None) - if not isinstance(cnf, list): - raise TypeError("Expected list of CNF_VARs, got %s." % type(cnf)) - return cnf - - -def normalize_cnf(cnf): - """ - Ensure the output conforms to set_cnf()’s expectations. - - :param cnf: list of CNF objects to normalize - :type cnf: [dict] - :returns: normalized list - :rtype: [list] - """ - if isinstance(cnf, list) is False: - raise MalformedCNF("expected list of CNF_VARs, got [%s]" % type(cnf)) - def norm(var): - vvar = \ - { "number" : var ["number"] - , "varname" : var ["varname"].upper() - , "instance" : var ["instance"] - , "data" : var ["data"] - } - - children = var.get("children", None) - if children is not None: - vvar ["children"] = normalize_cnf(children) - - parent = var.get("parent", None) - if parent is not None: - vvar ["parent"] = var["parent"] - - comment = var.get("comment", None) - if comment is not None: - vvar ["comment"] = var["comment"] - - return vvar - - return [norm(var) for var in cnf] - - -############################################################################### -# TRAVERSAL -############################################################################### - - -def walk_cnf(cnf, nested, fun, acc): - """ - Depth-first traversal of a CNF tree. - - :type cnf: cnf list - :type nested: bool - :type fun: 'a -> bool -> (cnf stuff) -> 'a - :type acc: 'a - :rtype: 'a - - Executes ``fun`` recursively for each node in the tree. The function - receives the accumulator ``acc`` which can be of an arbitrary type as first - argument. The second argument is a flag indicating whether the current - CNF var is a child (if ``True``) or a parent var. CNF member fields are - passed via named optional arguments. - """ - for var in cnf: - acc = fun(acc, - nested, - comment=var.get("comment", None), - data=var.get("data", None), - instance=var.get("instance", None), - number=var.get("number", None), - parent=var.get("parent", None), - varname=var.get("varname", None)) - children = var.get("children", None) - if children is not None: - acc = walk_cnf(children, True, fun, acc) - return acc - - -def renumber_vars(root, parent=None, toplevel=False): - """ - Number cnfvars linearly. - - If *parent* is specified, numbering will start at this offset. Also, the - VAR *root* will be assigned this number as a parent lineno unless - *toplevel* is set (the root var in a CNF tree obviously can’t have a - parent). - - The *toplevel* parameter is useful when renumbering an existing variable - starting at a given offset without at the same time having that offset - assigned as a parent. - """ - root = cnf_root (root) - i = parent or 0 - for var in root: - i += 1 - var["number"] = i - if toplevel is False and parent is not None: - var["parent"] = parent - children = var.get("children", None) - if children is not None: - i = renumber_vars(children, parent=i, toplevel=False) - return i - - -def count_vars(root): - """ - Traverse the cnf structure recursively, counting VAR objects (CNF lines). - """ - cnf = cnf_root(root) - if cnf is None: - raise InvalidCNF(root) - return walk_cnf(cnf, True, lambda n, _nested, **_kwa: n + 1, 0) - -# -# querying -# - - -def get_vars(cnf, data=None, instance=None): - """ - get_vars -- Query a CNF_VAR structure. Skims the *toplevel* of the CNF_VAR - structure for entries with a matching `data` or `instance` field. - - :type cnf: CNF_VAR - :type data: str - :type instance: int - :rtype: CNF_VAR - :returns: The structure containing only references to the - matching variables. Containing an empty list of - variables in case there is no match. - - Values are compared literally. If both ``instance`` and ``data`` are - specified, vars will be compared against both. - """ - cnf = cnf["cnf"] - if cnf: - criterion = lambda _var: False - if data: - if instance: - criterion = lambda var: var[ - "data"] == data and var["instance"] == instance - else: - criterion = lambda var: var["data"] == data - elif instance: - criterion = lambda var: var["instance"] == instance - - return {"cnf": [var for var in cnf if criterion(var) is True]} - - return {"cnf": []} - - -############################################################################### -# PRINTING/DUMPING -############################################################################### - - -# -# Print/dump raw CNF values -# - -def output_cnf(root, out, renumber=False): - """ - Dump a textual representation of given CNF VAR structure to given stream. - - Runs :py:func:`format_cnf_vars` on the input (`root`) and then writes that - to the given file-like object (out). - - :param root: a CNF_VAR structure - :type root: dict or list or anything that :py:func:`cnf_root` accepts - :param out: file-like object or something with a `write(str)` function - :param bool renumber: Whether to renumber cnfvars first - - Files are converted to the 8-bit format expected by CNF so they can be fed - directly into libcnffile. - """ - cnf = cnf_root(root) - if renumber is True: - _count = renumber_vars(root) - if is_cnf(cnf) is True: - (_, lines) = functools.reduce(format_cnf_vars, cnf, (0, [])) - if isinstance(out, (io.RawIOBase, io.BufferedIOBase)): - out.write (b"\n".join (lines)) - out.write (b"\n") - else: # either subclass of io.TextIOBase or unknown - out.write ("\n".join (map (from_latin1, lines))) - out.write ("\n") - - -def dump_cnf_bytes (root, renumber=False): - """ - Serialize CNF var structure, returning the result as a byte sequence. - """ - cnf = cnf_root(root) - out = io.BytesIO() - output_cnf(root, out, renumber=renumber) - res = out.getvalue() - out.close() - return res - - -def dump_cnf_string(root, renumber=False): - """ - Serialize CNF var structure, returning a latin1-encode byte string. - - .. todo::this is identical to :py:func:`dump_cnf_bytes`! - """ - cnf = cnf_root(root) - out = io.BytesIO() - output_cnf(root, out, renumber=renumber) - res = out.getvalue() - out.close() - return res - - -def print_cnf(root, out=None, renumber=False): - """ - Print given CNF_VAR structure to stdout (or other file-like object). - - Note that per default the config is printed to sys.stdout using the shell's - preferred encoding. If the shell cannot handle unicode this might raise - `UnicodeError`. - - All params forwarded to :py:func:`output_cnf`. See args there. - """ - if root is not None: - output_cnf(root, out or sys.stdout, renumber=renumber) - - -def write_cnf(*argv, **kw_argv): - """Alias for :py:func:`print_cnf`.""" - print_cnf(*argv, **kw_argv) - - -def print_cnf_raw(root, out=None): - """`if root is not None: out.write(root)`.""" - if root is not None: - out.write(root) - - -def write_cnf_raw(*argv, **kw_argv): - """Alias for :py:func:`print_cnf_raw`.""" - print_cnf_raw(*argv, **kw_argv) - - -# -# Print/dump CNF values in JSON format -# - - -def output_json(root, out, renumber=False): - """ - Dump CNF_VAR structure to file-like object in json format. - - :param root: CNF_VAR structure - :type root: dict or list or anything that :py:func:`cnf_root` accepts - :param out: file-like object, used as argument to :py:func:`json.dumps` so - probably has to accept `str` (as opposed to `bytes`). - :param bool renumber: whether to renumber variables before dupming. - """ - # if not isinstance(out, file): - #raise TypeError("%s (%s) is not a stream." % (out, type(out))) - if renumber is True: - _count = renumber_vars(root) - if is_cnf(root) is True: - root ["cnf"] = normalize_cnf(cnf_root (root)) - data = json.dumps(root) - out.write(data) - out.write("\n") - # TODO: else raise value error? - - -def dump_json_string(root, renumber=False): - """ - Serialize CNF var structure as JSON, returning the result as a string. - """ - out = io.StringIO() - output_json(root, out, renumber=renumber) - res = out.getvalue() - out.close() - return res - - -def print_cnf_json(root, out=None, renumber=False): - """ - Print CNF_VAR structure in json format to stdout. - - Calls :py:func:`output_json` with `sys.stdout` if `out` is not given or - `None`. - """ - if root is not None: - output_json(root, out or sys.stdout, renumber=renumber) - - -def write_cnf_json(*argv, **kw_argv): - """Alias for :py:func:`print_cnf_json`.""" - print_cnf_json(*argv, **kw_argv) - - - -############################################################################### -# ENTRY POINT FOR DEVELOPMENT -############################################################################### - - -def usage(): - print("usage: cnfvar.py -" , file=sys.stderr) - print("" , file=sys.stderr) - print(" Read CNF from stdin.", file=sys.stderr) - print("" , file=sys.stderr) - - -def main(argv): - if len(argv) > 1: - first = argv[1] - if first == "-": - cnf = read_cnf(sys.stdin.buffer.read()) - print_cnf(cnf) - return 0 - elif first == "test": - cnf = read_cnf(sys.stdin.buffer.read()) - cnff = get_vars(cnf, instance=2, data="FAX") - print_cnf(cnff) - return 0 - usage() - return -1 - -if __name__ == "__main__": - sys.exit(main(sys.argv)) diff --git a/src/cnfvar_old.py b/src/cnfvar_old.py new file mode 100644 index 0000000..a8b0fef --- /dev/null +++ b/src/cnfvar_old.py @@ -0,0 +1,1165 @@ +#!/usr/bin/env python +# +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. +# +# Copyright (c) 2016-2018 Intra2net AG + +""" + +summary +------------------------------------------------------------------------------- +Represent CNF_VARs as recursive structures. + +Copyright: 2014-2017 Intra2net AG +License: GPLv2+ + + +contents +------------------------------------------------------------------------------- + +This module provides read and write functionality for the Intra2net *CNF* +format. Two different syntaxes are available: classical *CNF* and a JSON +representation. Both versions are commonly understood by Intra2net software. + +On the command line, raw CNF is accepted if the option ``-`` is given: :: + + $ get_cnf routing 2 |python3 cnfvar.py - < nothing left to do + break + current = get(state) + parent = get_parent(current) # peek at next line + if parent is not None: # -> recurse into children + (state, children, _parent) = parse_cnf_children(state, parent) + cnf_line["children"] = children + if state is None: + break + current = get(state) + else: + state = advance(state) + if state is None: + break + current = get(state) + return lines + + +def parse_cnf_children(state, parent): + """ + Read and parse child CNFs of a given parent until there is none left. + + :param state: a 3-element tuple containing two lines and a list of the + remaining lines + :type state: (str, str, [str]) + :param parent: id of the parent whose children we are looking for + :type parent: int + :returns: a 3-element tuple with the current state, a list of children of + the given parent and the parent ID + :rtype: (tuple, [str], int) + + The function will recursively parse child lines from the `state` tuple + until one of these conditions is satisfied: + + 1. the input is exhausted + 2. the next CNF line + 2.1. is a toplevel line + 2.2. is a child line whose parent has a lower parent number + + Conceptually, 2.1 is a very similar to 2.2 but due to the special status of + toplevel lines in CNF we need to handle them separately. + + Note that since nesting of CNF vars is achieved via parent line numbers, + lines with different parents could appear out of order. libcnffile will + happily parse those and still assign children to the specified parent: + + :: + # set_cnf < parent: + # parent is further down in hierarchy -> new level + (state, children, new_parent) = \ + parse_cnf_children (state, new_parent) + if state is None: + break + cnf_line["children"] = children + current = get(state) + new_parent = get_parent(current) + if new_parent is None: + # drop stack + return (state, lines, None) + if new_parent < parent: + # parent is further up in hierarchy -> pop level + return (state, lines, new_parent) + # new_parent == parent -> continue parsing on same level + return (state, lines, parent) + + +def get_parent(line): + """ + Extract the ID of the parent for a given CNF line. + + :param str line: CNF line + :returns: parent ID or None if no parent is found + :rtype: int or None + """ + match = re.match(grab_parent_pattern, line) + if match is None: # -> no parent + return None + return int(match.groups()[0]) + + +def read_base_line(line): + """ + Turn one top-level CNF line into a dictionary. + + :param str line: CNF line + :rtype: {str: Any} + + This performs the necessary decoding on values to obtain proper Python + strings from 8-bit encoded CNF data. + + The function only operates on individual lines. Argument strings that + contain data for multiple lines – this includes child lines of the current + CNF var! – will trigger a parsing exception. + """ + if len(line.strip()) == 0: + return None # ignore empty lines + if line[0] == b"#": + return None # ignore comments + + match = re.match(base_line_pattern, line) + if match is None: + raise MalformedCNF("Syntax error in line \"\"\"%s\"\"\"" % line) + number, varname, instance, data, comment = match.groups() + return { + "number" : marshal_in_number (number), + "varname" : marshal_in_varname (varname), + "instance" : marshal_in_instance (instance), + "data" : marshal_in_data (data), + "comment" : marshal_in_comment (comment), + } + + +def read_child_line(line): + """ + Turn one child CNF line into a dictionary. + + :param str line: CNF line + :rtype: {str: Any} + + This function only operates on individual lines. If the argument string is + syntactically valid but contains input representing multiple CNF vars, a + parse error will be thrown. + """ + if len(line.strip()) == 0: + return None # ignore empty lines + if line[0] == "#": + return None # ignore comments + + match = re.match(child_line_pattern, line) + if match is None: + raise MalformedCNF("Syntax error in child line \"\"\"%s\"\"\"" + % from_latin1 (line)) + number, parent, varname, instance, data, comment = match.groups() + return { + "number" : marshal_in_number (number), + "parent" : marshal_in_parent (parent), + "varname" : marshal_in_varname (varname), + "instance" : marshal_in_instance (instance), + "data" : marshal_in_data (data), + "comment" : marshal_in_comment (comment), + } + + +############################################################################### +# SERIALIZATION +############################################################################### + + +cnf_line_nest_indent = " " +cnf_line_base_fmt = "%d %s,%d: \"%s\"" +cnf_line_child_fmt = "%d %s(%d) %s,%d: \"%s\"" + + +def format_cnf_vars(da, var): + """ + Return a list of formatted cnf_line byte strings. + + :param da: a tuple where the first element is the depth (0 = top-level, + >1 = child CNF) and the second is the string being built. + :type da: (int, str) + :param var: the CNF element to convert to string in the current iteration + :type var: dict + :returns: a tuple like `da`, where the second element should contain all + converted CNFs + :rtype: (int, str) + + This function is meant to be passed to the :py:func:`functools.reduce` + function. + + The variable names are uppercased unconditionally because while ``get_cnf`` + is case-indifferent for variable names, ``set_cnf`` isn’t. + """ + depth, acc = da + line = None + if depth > 0: + line = cnf_line_child_fmt \ + % (var["number"], + cnf_line_nest_indent * depth, + var["parent"], + var["varname"].upper(), + var["instance"], + var["data"]) + else: + line = cnf_line_base_fmt \ + % (var["number"], + var["varname"].upper(), + var["instance"], + var["data"]) + + comment = var.get("comment", None) + if comment and len(comment) != 0: + line = line + (" # %s" % comment) + + acc.append(to_latin1(line)) + + children = var.get("children", None) + if children is not None: + (_, acc) = functools.reduce(format_cnf_vars, children, (depth + 1, acc)) + + return (depth, acc) + + +def cnf_root(root): + """ + Extract a list of CNFs from a given structure. + + :param root: list of CNFs or a CNF dictionary + :type root: [dict] or dict + :raises: :py:class:`TypeError` if no CNFs can be extracted + :returns: list with one or more CNF objects + :rtype: [dict] + + Output varies depending on a few conditions: + - If `root` is a list, return it right away + - If `root` is a dict corresponding to a valid CNF value, return it wrapped + in a list + - If `root` is a dict with a `cnf` key containg a list (as the JSON + returned by `get_cnf -j`), return the value + - Otherwise, raise an error + """ + if isinstance(root, list): + return root + if not isinstance(root, dict): + raise TypeError( + "Expected dictionary of CNF_VARs, got %s." % type(root)) + if is_cnf_var(root): + return [root] + cnf = root.get("cnf", None) + if not isinstance(cnf, list): + raise TypeError("Expected list of CNF_VARs, got %s." % type(cnf)) + return cnf + + +def normalize_cnf(cnf): + """ + Ensure the output conforms to set_cnf()’s expectations. + + :param cnf: list of CNF objects to normalize + :type cnf: [dict] + :returns: normalized list + :rtype: [list] + """ + if isinstance(cnf, list) is False: + raise MalformedCNF("expected list of CNF_VARs, got [%s]" % type(cnf)) + def norm(var): + vvar = \ + { "number" : var ["number"] + , "varname" : var ["varname"].upper() + , "instance" : var ["instance"] + , "data" : var ["data"] + } + + children = var.get("children", None) + if children is not None: + vvar ["children"] = normalize_cnf(children) + + parent = var.get("parent", None) + if parent is not None: + vvar ["parent"] = var["parent"] + + comment = var.get("comment", None) + if comment is not None: + vvar ["comment"] = var["comment"] + + return vvar + + return [norm(var) for var in cnf] + + +############################################################################### +# TRAVERSAL +############################################################################### + + +def walk_cnf(cnf, nested, fun, acc): + """ + Depth-first traversal of a CNF tree. + + :type cnf: cnf list + :type nested: bool + :type fun: 'a -> bool -> (cnf stuff) -> 'a + :type acc: 'a + :rtype: 'a + + Executes ``fun`` recursively for each node in the tree. The function + receives the accumulator ``acc`` which can be of an arbitrary type as first + argument. The second argument is a flag indicating whether the current + CNF var is a child (if ``True``) or a parent var. CNF member fields are + passed via named optional arguments. + """ + for var in cnf: + acc = fun(acc, + nested, + comment=var.get("comment", None), + data=var.get("data", None), + instance=var.get("instance", None), + number=var.get("number", None), + parent=var.get("parent", None), + varname=var.get("varname", None)) + children = var.get("children", None) + if children is not None: + acc = walk_cnf(children, True, fun, acc) + return acc + + +def renumber_vars(root, parent=None, toplevel=False): + """ + Number cnfvars linearly. + + If *parent* is specified, numbering will start at this offset. Also, the + VAR *root* will be assigned this number as a parent lineno unless + *toplevel* is set (the root var in a CNF tree obviously can’t have a + parent). + + The *toplevel* parameter is useful when renumbering an existing variable + starting at a given offset without at the same time having that offset + assigned as a parent. + """ + root = cnf_root (root) + i = parent or 0 + for var in root: + i += 1 + var["number"] = i + if toplevel is False and parent is not None: + var["parent"] = parent + children = var.get("children", None) + if children is not None: + i = renumber_vars(children, parent=i, toplevel=False) + return i + + +def count_vars(root): + """ + Traverse the cnf structure recursively, counting VAR objects (CNF lines). + """ + cnf = cnf_root(root) + if cnf is None: + raise InvalidCNF(root) + return walk_cnf(cnf, True, lambda n, _nested, **_kwa: n + 1, 0) + +# +# querying +# + + +def get_vars(cnf, data=None, instance=None): + """ + get_vars -- Query a CNF_VAR structure. Skims the *toplevel* of the CNF_VAR + structure for entries with a matching `data` or `instance` field. + + :type cnf: CNF_VAR + :type data: str + :type instance: int + :rtype: CNF_VAR + :returns: The structure containing only references to the + matching variables. Containing an empty list of + variables in case there is no match. + + Values are compared literally. If both ``instance`` and ``data`` are + specified, vars will be compared against both. + """ + cnf = cnf["cnf"] + if cnf: + criterion = lambda _var: False + if data: + if instance: + criterion = lambda var: var[ + "data"] == data and var["instance"] == instance + else: + criterion = lambda var: var["data"] == data + elif instance: + criterion = lambda var: var["instance"] == instance + + return {"cnf": [var for var in cnf if criterion(var) is True]} + + return {"cnf": []} + + +############################################################################### +# PRINTING/DUMPING +############################################################################### + + +# +# Print/dump raw CNF values +# + +def output_cnf(root, out, renumber=False): + """ + Dump a textual representation of given CNF VAR structure to given stream. + + Runs :py:func:`format_cnf_vars` on the input (`root`) and then writes that + to the given file-like object (out). + + :param root: a CNF_VAR structure + :type root: dict or list or anything that :py:func:`cnf_root` accepts + :param out: file-like object or something with a `write(str)` function + :param bool renumber: Whether to renumber cnfvars first + + Files are converted to the 8-bit format expected by CNF so they can be fed + directly into libcnffile. + """ + cnf = cnf_root(root) + if renumber is True: + _count = renumber_vars(root) + if is_cnf(cnf) is True: + (_, lines) = functools.reduce(format_cnf_vars, cnf, (0, [])) + if isinstance(out, (io.RawIOBase, io.BufferedIOBase)): + out.write (b"\n".join (lines)) + out.write (b"\n") + else: # either subclass of io.TextIOBase or unknown + out.write ("\n".join (map (from_latin1, lines))) + out.write ("\n") + + +def dump_cnf_bytes (root, renumber=False): + """ + Serialize CNF var structure, returning the result as a byte sequence. + """ + cnf = cnf_root(root) + out = io.BytesIO() + output_cnf(root, out, renumber=renumber) + res = out.getvalue() + out.close() + return res + + +def dump_cnf_string(root, renumber=False): + """ + Serialize CNF var structure, returning a latin1-encode byte string. + + .. todo::this is identical to :py:func:`dump_cnf_bytes`! + """ + cnf = cnf_root(root) + out = io.BytesIO() + output_cnf(root, out, renumber=renumber) + res = out.getvalue() + out.close() + return res + + +def print_cnf(root, out=None, renumber=False): + """ + Print given CNF_VAR structure to stdout (or other file-like object). + + Note that per default the config is printed to sys.stdout using the shell's + preferred encoding. If the shell cannot handle unicode this might raise + `UnicodeError`. + + All params forwarded to :py:func:`output_cnf`. See args there. + """ + if root is not None: + output_cnf(root, out or sys.stdout, renumber=renumber) + + +def write_cnf(*argv, **kw_argv): + """Alias for :py:func:`print_cnf`.""" + print_cnf(*argv, **kw_argv) + + +def print_cnf_raw(root, out=None): + """`if root is not None: out.write(root)`.""" + if root is not None: + out.write(root) + + +def write_cnf_raw(*argv, **kw_argv): + """Alias for :py:func:`print_cnf_raw`.""" + print_cnf_raw(*argv, **kw_argv) + + +# +# Print/dump CNF values in JSON format +# + + +def output_json(root, out, renumber=False): + """ + Dump CNF_VAR structure to file-like object in json format. + + :param root: CNF_VAR structure + :type root: dict or list or anything that :py:func:`cnf_root` accepts + :param out: file-like object, used as argument to :py:func:`json.dumps` so + probably has to accept `str` (as opposed to `bytes`). + :param bool renumber: whether to renumber variables before dupming. + """ + # if not isinstance(out, file): + #raise TypeError("%s (%s) is not a stream." % (out, type(out))) + if renumber is True: + _count = renumber_vars(root) + if is_cnf(root) is True: + root ["cnf"] = normalize_cnf(cnf_root (root)) + data = json.dumps(root) + out.write(data) + out.write("\n") + # TODO: else raise value error? + + +def dump_json_string(root, renumber=False): + """ + Serialize CNF var structure as JSON, returning the result as a string. + """ + out = io.StringIO() + output_json(root, out, renumber=renumber) + res = out.getvalue() + out.close() + return res + + +def print_cnf_json(root, out=None, renumber=False): + """ + Print CNF_VAR structure in json format to stdout. + + Calls :py:func:`output_json` with `sys.stdout` if `out` is not given or + `None`. + """ + if root is not None: + output_json(root, out or sys.stdout, renumber=renumber) + + +def write_cnf_json(*argv, **kw_argv): + """Alias for :py:func:`print_cnf_json`.""" + print_cnf_json(*argv, **kw_argv) + + + +############################################################################### +# ENTRY POINT FOR DEVELOPMENT +############################################################################### + + +def usage(): + print("usage: cnfvar.py -" , file=sys.stderr) + print("" , file=sys.stderr) + print(" Read CNF from stdin.", file=sys.stderr) + print("" , file=sys.stderr) + + +def main(argv): + if len(argv) > 1: + first = argv[1] + if first == "-": + cnf = read_cnf(sys.stdin.buffer.read()) + print_cnf(cnf) + return 0 + elif first == "test": + cnf = read_cnf(sys.stdin.buffer.read()) + cnff = get_vars(cnf, instance=2, data="FAX") + print_cnf(cnff) + return 0 + usage() + return -1 + +if __name__ == "__main__": + sys.exit(main(sys.argv)) diff --git a/src/simple_cnf.py b/src/simple_cnf.py index cdc4318..03a50f7 100644 --- a/src/simple_cnf.py +++ b/src/simple_cnf.py @@ -67,7 +67,7 @@ import logging log = logging.getLogger('pyi2ncommon.simple_cnf') from . import arnied_wrapper -from . import cnfvar +from . import cnfvar_old from . import sysmisc ############################################################################### @@ -310,7 +310,7 @@ class SimpleCnf(object): new_dict = child.get_single_dict() new_dict['parent'] = number new_children.append(new_dict) - cnfvar.renumber_vars({'cnf':new_children}, number) + cnfvar_old.renumber_vars({'cnf':new_children}, number) children = new_children elif number is None: number = self._find_new_number(self.__cnfvars) @@ -364,12 +364,12 @@ class SimpleCnf(object): def append_file(self, cnf, replacements=None): """Append conf var data from file.""" - return self.append_file_generic(cnfvar.read_cnf, cnf, + return self.append_file_generic(cnfvar_old.read_cnf, cnf, replacements=replacements) def append_file_json(self, cnf, replacements=None): """Append conf var data from json file.""" - return self.append_file_generic(cnfvar.read_cnf_json, cnf, + return self.append_file_generic(cnfvar_old.read_cnf_json, cnf, replacements=replacements) def append_guest_vars(self, vm=None, varname=None, replacements=None): @@ -417,7 +417,7 @@ class SimpleCnf(object): filename = arnied_wrapper.generate_config_path(dumped=True) with open(filename, 'w') as out: - cnfvar.output_json({"cnf": current}, out, renumber=True) + cnfvar_old.output_json({"cnf": current}, out, renumber=True) return filename @@ -435,7 +435,7 @@ class SimpleCnf(object): current = self.__cnfvars if renumber: log.info("enforce consistent CNF_LINE numbering") - cnfvar.renumber_vars(current) + cnfvar_old.renumber_vars(current) log.info("inject configuration %s" % "into guest" if vm else "in place") arnied_wrapper.set_cnf_dynamic({"cnf": current}, config_file=gen_tmpname(), vm=vm) @@ -447,7 +447,7 @@ class SimpleCnf(object): :returns: config in json format :rtype: str """ - return cnfvar.dump_json_string({"cnf": self.__cnfvars}, renumber=True) + return cnfvar_old.dump_json_string({"cnf": self.__cnfvars}, renumber=True) def pretty_print(self, print_func=None): """ diff --git a/test/test_cnfvar.py b/test/test_cnfvar.py deleted file mode 100755 index f733700..0000000 --- a/test/test_cnfvar.py +++ /dev/null @@ -1,476 +0,0 @@ -#!/usr/bin/env python -# This Python file uses the following encoding: utf-8 - -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -import os -import unittest - -from src import cnfvar - -# -# test data -# - -# model cnf tree -demo_cnfvar = {"cnf": [ - { - "varname": "MY_FAVORITE_CNF_VAR", - "instance": 1337, - "number": 42, - "data": "string conf content", - "comment": "" - }, - { - "varname": "SOME_NESTED_CNF_VAR", - "instance": 0, - "number": 23, - "data": "999", - "comment": "", - "children": [ - { - "varname": "SOME_CHILD_VAR", - "instance": 1, - "number": 24, - "data": "2014", - "parent": 23, - "comment": "" - } - ] - }, -]} - -# duplicate line number -demo_invalid_cnfvar = {"cnf": [ - { - "varname": "SOME_PARTICULARLY_TASTY_CNF_VAR", - "instance": 1337, - "number": 23, - "data": "classic wingers", - "comment": "" - }, - { - "varname": "EXTRAORDINARILY_FANCY_CNF_VAR", - "instance": 1, - "number": 42, - "data": "ab mentions", - "comment": "" - }, - { - "varname": "ANOTHER_POPULAR_CNF_VAR", - "instance": 0, - "number": 42, - "data": "notches", - "comment": "" - } -]} - -demo_jsoncnf = """ -{ - "cnf" : [ - { - "children" : [ - { - "comment" : "", - "data" : "1", - "instance" : 0, - "number" : 2, - "parent" : 1, - "varname" : "FIREWALL_SERVICEGROUP_PREDEFINED_ID" - }, - { - "children" : [ - { - "comment" : "", - "data" : "", - "instance" : 0, - "number" : 4, - "parent" : 3, - "varname" : "FIREWALL_SERVICEGROUP_TYPE_COMMENT" - }, - { - "comment" : "", - "data" : "80", - "instance" : 0, - "number" : 5, - "parent" : 3, - "varname" : "FIREWALL_SERVICEGROUP_TYPE_TCPUDP_DST_PORT" - } - ], - "comment" : "", - "data" : "TCP", - "instance" : 0, - "number" : 3, - "parent" : 1, - "varname" : "FIREWALL_SERVICEGROUP_TYPE" - } - ], - "comment" : "", - "data" : "http", - "instance" : 1, - "number" : 1, - "varname" : "FIREWALL_SERVICEGROUP" - }, - { - "children" : [ - { - "comment" : "", - "data" : "2", - "instance" : 0, - "number" : 7, - "parent" : 6, - "varname" : "FIREWALL_SERVICEGROUP_PREDEFINED_ID" - }, - { - "children" : [ - { - "comment" : "", - "data" : "", - "instance" : 0, - "number" : 9, - "parent" : 8, - "varname" : "FIREWALL_SERVICEGROUP_TYPE_COMMENT" - }, - { - "comment" : "", - "data" : "443", - "instance" : 0, - "number" : 10, - "parent" : 8, - "varname" : "FIREWALL_SERVICEGROUP_TYPE_TCPUDP_DST_PORT" - } - ], - "comment" : "", - "data" : "TCP", - "instance" : 0, - "number" : 8, - "parent" : 6, - "varname" : "FIREWALL_SERVICEGROUP_TYPE" - } - ], - "comment" : "", - "data" : "https", - "instance" : 2, - "number" : 6, - "varname" : "FIREWALL_SERVICEGROUP" - } - ] -} -""" - -demo_jsoncnf_bytes = demo_jsoncnf.encode ("latin-1") - -demo_nonascii = r""" -{ "cnf" : [ - { - "children" : [ - { - "comment" : "", - "data" : "0", - "instance" : 0, - "number" : 2, - "parent" : 1, - "varname" : "USER_DISABLED" - }, - { - "comment" : "", - "data" : "Administrator", - "instance" : 0, - "number" : 3, - "parent" : 1, - "varname" : "USER_FULLNAME" - }, - { - "comment" : "", - "data" : "INBOX/Kalender", - "instance" : 0, - "number" : 4, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_CALENDAR" - }, - { - "comment" : "", - "data" : "INBOX/Kontakte", - "instance" : 0, - "number" : 5, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_CONTACTS" - }, - { - "comment" : "", - "data" : "INBOX/Entwürfe", - "instance" : 0, - "number" : 6, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_DRAFTS" - }, - { - "comment" : "", - "data" : "INBOX/Notizen", - "instance" : 0, - "number" : 7, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_NOTES" - }, - { - "comment" : "", - "data" : "INBOX/Gesendete Elemente", - "instance" : 0, - "number" : 8, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_OUTBOX" - }, - { - "comment" : "", - "data" : "INBOX/Aufgaben", - "instance" : 0, - "number" : 9, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_TASKS" - }, - - { - "comment" : "", - "data" : "INBOX/Gelöschte Elemente", - "instance" : 0, - "number" : 10, - "parent" : 1, - "varname" : "USER_GROUPWARE_FOLDER_TRASH" - }, - { - "comment" : "", - "data" : "1", - "instance" : 0, - "number" : 11, - "parent" : 1, - "varname" : "USER_GROUP_MEMBER_REF" - }, - { - "comment" : "", - "data" : "2", - "instance" : 1, - "number" : 12, - "parent" : 1, - "varname" : "USER_GROUP_MEMBER_REF" - }, - { - "comment" : "", - "data" : "", - "instance" : 0, - "number" : 13, - "parent" : 1, - "varname" : "USER_LOCALE" - }, - { - "comment" : "", - "data" : "idkfa", - "instance" : 0, - "number" : 14, - "parent" : 1, - "varname" : "USER_PASSWORD" - }, - { - "comment" : "", - "data" : "30", - "instance" : 0, - "number" : 15, - "parent" : 1, - "varname" : "USER_TRASH_DELETEDAYS" - } - ], - "comment" : "", - "data" : "admin", - "instance" : 1, - "number" : 1, - "varname" : "USER" - } -]} -""" - -demo_latin1crap = demo_nonascii.encode('latin1') - -demo_cnf_group = """ -1 GROUP,1: "Administratoren" -2 (1) GROUP_ACCESS_GO_ONLINE_ALLOWED,0: "1" -3 (1) GROUP_EMAILFILTER_BAN_FILTERLIST_REF,0: "-1" -4 (1) GROUP_EMAIL_RELAY_RIGHTS,0: "RELAY_FROM_INTRANET" -5 (1) GROUP_PROXY_PROFILE_REF,0: "1" -""" - -demo_cnf_group_bytes = demo_cnf_group.encode ("latin-1") - -demo_cnf_filter = b""" -1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten" -2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK" -3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: "" -4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: "" -5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain" -6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html" -7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW" -8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1" -""" - -demo_cnf_comments = b""" -1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten" -2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK" -3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: "" -4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: "" # foo -5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain"#bar -6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html" -7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW" # baz -8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1" -""" - -demo_cnf_escaped_quotes = """ -1 HERE_BE_QUOTES,0: "\"" -2 HERE_BE_QUOTES,1: "foo\"bar\"\"\"baz" -3 HERE_BE_QUOTES,2: "unquo\\\"table" -4 HERE_BE_QUOTES,3: "unquo\\\\\"\"table" -""" - -demo_json_escaped_quotes = """ -{ "cnf": [ { "number" : 1, - "varname" : "HERE_BE_QUOTES", - "instance" : 0, - "data" : "\\"" }, - { "number" : 2, - "varname" : "HERE_BE_QUOTES", - "instance" : 1, - "data" : "foo\\"bar\\"\\"\\"baz" }, - { "number" : 3, - "varname" : "HERE_BE_QUOTES", - "instance" : 2, - "data" : "unquo\\\\\\"table" }, - { "number" : 4, - "varname" : "HERE_BE_QUOTES", - "instance" : 3, - "data" : "unquo\\\\\\\\\\"\\"table" } ] } -""" - -# -# test class -# - -class CnfVarUnittest(unittest.TestCase): - - def test_print_cnf(self): - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf(demo_cnfvar, out=devnull) - - def test_parse_cnf_simple_str(self): - cnf = cnfvar.read_cnf(demo_cnf_group) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf_json(cnf, out=devnull) - - def test_parse_cnf_simple_bytes(self): - cnf = cnfvar.read_cnf(demo_cnf_group_bytes) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf_json(cnf, out=devnull) - - def test_parse_cnf_nested(self): - cnf = cnfvar.read_cnf(demo_cnf_filter) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf_json(cnf, out=devnull) - - def test_parse_cnf_comments(self): - cnf = cnfvar.read_cnf(demo_cnf_comments) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf_json(cnf, out=devnull) - - def test_print_cnf_garbage(self): - try: - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf(demo_invalid_cnfvar, out=devnull) - except cnfvar.InvalidCNF: - print ("Caught the duplicate line, bravo!") - - def test_read_json_str(self): - cnf = cnfvar.read_cnf_json(demo_jsoncnf) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf(cnf, out=devnull) - - def test_read_json_bytes(self): - cnf = cnfvar.read_cnf_json(demo_jsoncnf_bytes) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf(cnf, out=devnull) - - def test_read_json_nonascii(self): - cnf = cnfvar.read_cnf_json(demo_nonascii) - with open(os.devnull, "wb") as devnull: - cnfvar.print_cnf(cnf, out=devnull) - - def test_read_json_latin1(self): - cnf = cnfvar.read_cnf_json(demo_latin1crap) - with open(os.devnull, "wb") as devnull: - cnfvar.print_cnf(cnf, out=devnull) - - def test_parse_cnf_quotes(self): - cnf = cnfvar.read_cnf(demo_cnf_escaped_quotes) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf_json(cnf, out=devnull) - - def test_parse_json_quotes(self): - cnf = cnfvar.read_cnf_json(demo_json_escaped_quotes) - with open(os.devnull, "w") as devnull: - cnfvar.print_cnf_json(cnf, out=devnull) - - -class CnfVarUnittestVarnameCase(unittest.TestCase): - """Tests for verifying that uppercasing/lowercasing of varname works.""" - # TODO: rethink whether this lower-casing is worth all the effort it causes - - def test_dump_cnf_uppercase(self): - """Test dump of cnfvars results in uppercase var names.""" - cnf = {"cnf": [ - {"instance": 0, "varname": "dialout_mode", - "data": "ONLINE", "number": 1, "comment": None}, - {"instance": 0, "varname": "dialout_defaultprovider_ref", - "data": "1", "number": 2, "comment": None}, - {"instance": 0, "varname": "hypothetical_parent", - "data": "parent value", "number": 3, "comment": None, - "children": [ - {"instance": 0, "varname": "hypothetical_child", - "data": "0", "number": 4, "parent": 3, "comment": None}, - {"instance": 1, "varname": "hypothetical_child", - "data": "1", "number": 5, "parent": 3, "comment": None}]} - ]} - serialization = cnfvar.dump_json_string(cnf) - self.assertIn('DIALOUT_MODE', serialization) - self.assertIn('DIALOUT_DEFAULTPROVIDER_REF', serialization) - self.assertIn('HYPOTHETICAL_CHILD', serialization) - self.assertNotIn('dialout_mode', serialization) - self.assertNotIn('dialout_defaultprovider_ref', serialization) - self.assertNotIn('hypothetical_child', serialization) - - def test_read_cnf_lowercase(self): - """Test that after reading, varnames are lowercase.""" - cnf = cnfvar.read_cnf_json(demo_jsoncnf.encode('latin1')) - for parentvar in cnf['cnf']: - self.assertEqual(parentvar['varname'], - parentvar['varname'].lower()) - if 'children' in parentvar: - for childvar in parentvar['children']: - self.assertEqual(parentvar['varname'], - parentvar['varname'].lower()) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/test_cnfvar_old.py b/test/test_cnfvar_old.py new file mode 100755 index 0000000..8efe0a3 --- /dev/null +++ b/test/test_cnfvar_old.py @@ -0,0 +1,476 @@ +#!/usr/bin/env python +# This Python file uses the following encoding: utf-8 + +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. +# +# Copyright (c) 2016-2018 Intra2net AG + +import os +import unittest + +from src import cnfvar_old + +# +# test data +# + +# model cnf tree +demo_cnfvar = {"cnf": [ + { + "varname": "MY_FAVORITE_CNF_VAR", + "instance": 1337, + "number": 42, + "data": "string conf content", + "comment": "" + }, + { + "varname": "SOME_NESTED_CNF_VAR", + "instance": 0, + "number": 23, + "data": "999", + "comment": "", + "children": [ + { + "varname": "SOME_CHILD_VAR", + "instance": 1, + "number": 24, + "data": "2014", + "parent": 23, + "comment": "" + } + ] + }, +]} + +# duplicate line number +demo_invalid_cnfvar = {"cnf": [ + { + "varname": "SOME_PARTICULARLY_TASTY_CNF_VAR", + "instance": 1337, + "number": 23, + "data": "classic wingers", + "comment": "" + }, + { + "varname": "EXTRAORDINARILY_FANCY_CNF_VAR", + "instance": 1, + "number": 42, + "data": "ab mentions", + "comment": "" + }, + { + "varname": "ANOTHER_POPULAR_CNF_VAR", + "instance": 0, + "number": 42, + "data": "notches", + "comment": "" + } +]} + +demo_jsoncnf = """ +{ + "cnf" : [ + { + "children" : [ + { + "comment" : "", + "data" : "1", + "instance" : 0, + "number" : 2, + "parent" : 1, + "varname" : "FIREWALL_SERVICEGROUP_PREDEFINED_ID" + }, + { + "children" : [ + { + "comment" : "", + "data" : "", + "instance" : 0, + "number" : 4, + "parent" : 3, + "varname" : "FIREWALL_SERVICEGROUP_TYPE_COMMENT" + }, + { + "comment" : "", + "data" : "80", + "instance" : 0, + "number" : 5, + "parent" : 3, + "varname" : "FIREWALL_SERVICEGROUP_TYPE_TCPUDP_DST_PORT" + } + ], + "comment" : "", + "data" : "TCP", + "instance" : 0, + "number" : 3, + "parent" : 1, + "varname" : "FIREWALL_SERVICEGROUP_TYPE" + } + ], + "comment" : "", + "data" : "http", + "instance" : 1, + "number" : 1, + "varname" : "FIREWALL_SERVICEGROUP" + }, + { + "children" : [ + { + "comment" : "", + "data" : "2", + "instance" : 0, + "number" : 7, + "parent" : 6, + "varname" : "FIREWALL_SERVICEGROUP_PREDEFINED_ID" + }, + { + "children" : [ + { + "comment" : "", + "data" : "", + "instance" : 0, + "number" : 9, + "parent" : 8, + "varname" : "FIREWALL_SERVICEGROUP_TYPE_COMMENT" + }, + { + "comment" : "", + "data" : "443", + "instance" : 0, + "number" : 10, + "parent" : 8, + "varname" : "FIREWALL_SERVICEGROUP_TYPE_TCPUDP_DST_PORT" + } + ], + "comment" : "", + "data" : "TCP", + "instance" : 0, + "number" : 8, + "parent" : 6, + "varname" : "FIREWALL_SERVICEGROUP_TYPE" + } + ], + "comment" : "", + "data" : "https", + "instance" : 2, + "number" : 6, + "varname" : "FIREWALL_SERVICEGROUP" + } + ] +} +""" + +demo_jsoncnf_bytes = demo_jsoncnf.encode ("latin-1") + +demo_nonascii = r""" +{ "cnf" : [ + { + "children" : [ + { + "comment" : "", + "data" : "0", + "instance" : 0, + "number" : 2, + "parent" : 1, + "varname" : "USER_DISABLED" + }, + { + "comment" : "", + "data" : "Administrator", + "instance" : 0, + "number" : 3, + "parent" : 1, + "varname" : "USER_FULLNAME" + }, + { + "comment" : "", + "data" : "INBOX/Kalender", + "instance" : 0, + "number" : 4, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_CALENDAR" + }, + { + "comment" : "", + "data" : "INBOX/Kontakte", + "instance" : 0, + "number" : 5, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_CONTACTS" + }, + { + "comment" : "", + "data" : "INBOX/Entwürfe", + "instance" : 0, + "number" : 6, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_DRAFTS" + }, + { + "comment" : "", + "data" : "INBOX/Notizen", + "instance" : 0, + "number" : 7, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_NOTES" + }, + { + "comment" : "", + "data" : "INBOX/Gesendete Elemente", + "instance" : 0, + "number" : 8, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_OUTBOX" + }, + { + "comment" : "", + "data" : "INBOX/Aufgaben", + "instance" : 0, + "number" : 9, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_TASKS" + }, + + { + "comment" : "", + "data" : "INBOX/Gelöschte Elemente", + "instance" : 0, + "number" : 10, + "parent" : 1, + "varname" : "USER_GROUPWARE_FOLDER_TRASH" + }, + { + "comment" : "", + "data" : "1", + "instance" : 0, + "number" : 11, + "parent" : 1, + "varname" : "USER_GROUP_MEMBER_REF" + }, + { + "comment" : "", + "data" : "2", + "instance" : 1, + "number" : 12, + "parent" : 1, + "varname" : "USER_GROUP_MEMBER_REF" + }, + { + "comment" : "", + "data" : "", + "instance" : 0, + "number" : 13, + "parent" : 1, + "varname" : "USER_LOCALE" + }, + { + "comment" : "", + "data" : "idkfa", + "instance" : 0, + "number" : 14, + "parent" : 1, + "varname" : "USER_PASSWORD" + }, + { + "comment" : "", + "data" : "30", + "instance" : 0, + "number" : 15, + "parent" : 1, + "varname" : "USER_TRASH_DELETEDAYS" + } + ], + "comment" : "", + "data" : "admin", + "instance" : 1, + "number" : 1, + "varname" : "USER" + } +]} +""" + +demo_latin1crap = demo_nonascii.encode('latin1') + +demo_cnf_group = """ +1 GROUP,1: "Administratoren" +2 (1) GROUP_ACCESS_GO_ONLINE_ALLOWED,0: "1" +3 (1) GROUP_EMAILFILTER_BAN_FILTERLIST_REF,0: "-1" +4 (1) GROUP_EMAIL_RELAY_RIGHTS,0: "RELAY_FROM_INTRANET" +5 (1) GROUP_PROXY_PROFILE_REF,0: "1" +""" + +demo_cnf_group_bytes = demo_cnf_group.encode ("latin-1") + +demo_cnf_filter = b""" +1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten" +2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK" +3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: "" +4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: "" +5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain" +6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html" +7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW" +8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1" +""" + +demo_cnf_comments = b""" +1 EMAILFILTER_BAN_FILTERLIST,1: "Vordefiniert: Alles verboten" +2 (1) EMAILFILTER_BAN_FILTERLIST_ENCRYPTED,0: "BLOCK" +3 (1) EMAILFILTER_BAN_FILTERLIST_EXTENSIONS,0: "" +4 (1) EMAILFILTER_BAN_FILTERLIST_MIMETYPES,0: "" # foo +5 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,0: "text/plain"#bar +6 (4) EMAILFILTER_BAN_FILTERLIST_MIMETYPES_NAME,1: "text/html" +7 (1) EMAILFILTER_BAN_FILTERLIST_MODE,0: "ALLOW" # baz +8 (1) EMAILFILTER_BAN_FILTERLIST_PREDEFINED_ID,0: "1" +""" + +demo_cnf_escaped_quotes = """ +1 HERE_BE_QUOTES,0: "\"" +2 HERE_BE_QUOTES,1: "foo\"bar\"\"\"baz" +3 HERE_BE_QUOTES,2: "unquo\\\"table" +4 HERE_BE_QUOTES,3: "unquo\\\\\"\"table" +""" + +demo_json_escaped_quotes = """ +{ "cnf": [ { "number" : 1, + "varname" : "HERE_BE_QUOTES", + "instance" : 0, + "data" : "\\"" }, + { "number" : 2, + "varname" : "HERE_BE_QUOTES", + "instance" : 1, + "data" : "foo\\"bar\\"\\"\\"baz" }, + { "number" : 3, + "varname" : "HERE_BE_QUOTES", + "instance" : 2, + "data" : "unquo\\\\\\"table" }, + { "number" : 4, + "varname" : "HERE_BE_QUOTES", + "instance" : 3, + "data" : "unquo\\\\\\\\\\"\\"table" } ] } +""" + +# +# test class +# + +class CnfVarUnittest(unittest.TestCase): + + def test_print_cnf(self): + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf(demo_cnfvar, out=devnull) + + def test_parse_cnf_simple_str(self): + cnf = cnfvar_old.read_cnf(demo_cnf_group) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf_json(cnf, out=devnull) + + def test_parse_cnf_simple_bytes(self): + cnf = cnfvar_old.read_cnf(demo_cnf_group_bytes) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf_json(cnf, out=devnull) + + def test_parse_cnf_nested(self): + cnf = cnfvar_old.read_cnf(demo_cnf_filter) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf_json(cnf, out=devnull) + + def test_parse_cnf_comments(self): + cnf = cnfvar_old.read_cnf(demo_cnf_comments) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf_json(cnf, out=devnull) + + def test_print_cnf_garbage(self): + try: + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf(demo_invalid_cnfvar, out=devnull) + except cnfvar_old.InvalidCNF: + print ("Caught the duplicate line, bravo!") + + def test_read_json_str(self): + cnf = cnfvar_old.read_cnf_json(demo_jsoncnf) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf(cnf, out=devnull) + + def test_read_json_bytes(self): + cnf = cnfvar_old.read_cnf_json(demo_jsoncnf_bytes) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf(cnf, out=devnull) + + def test_read_json_nonascii(self): + cnf = cnfvar_old.read_cnf_json(demo_nonascii) + with open(os.devnull, "wb") as devnull: + cnfvar_old.print_cnf(cnf, out=devnull) + + def test_read_json_latin1(self): + cnf = cnfvar_old.read_cnf_json(demo_latin1crap) + with open(os.devnull, "wb") as devnull: + cnfvar_old.print_cnf(cnf, out=devnull) + + def test_parse_cnf_quotes(self): + cnf = cnfvar_old.read_cnf(demo_cnf_escaped_quotes) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf_json(cnf, out=devnull) + + def test_parse_json_quotes(self): + cnf = cnfvar_old.read_cnf_json(demo_json_escaped_quotes) + with open(os.devnull, "w") as devnull: + cnfvar_old.print_cnf_json(cnf, out=devnull) + + +class CnfVarUnittestVarnameCase(unittest.TestCase): + """Tests for verifying that uppercasing/lowercasing of varname works.""" + # TODO: rethink whether this lower-casing is worth all the effort it causes + + def test_dump_cnf_uppercase(self): + """Test dump of cnfvars results in uppercase var names.""" + cnf = {"cnf": [ + {"instance": 0, "varname": "dialout_mode", + "data": "ONLINE", "number": 1, "comment": None}, + {"instance": 0, "varname": "dialout_defaultprovider_ref", + "data": "1", "number": 2, "comment": None}, + {"instance": 0, "varname": "hypothetical_parent", + "data": "parent value", "number": 3, "comment": None, + "children": [ + {"instance": 0, "varname": "hypothetical_child", + "data": "0", "number": 4, "parent": 3, "comment": None}, + {"instance": 1, "varname": "hypothetical_child", + "data": "1", "number": 5, "parent": 3, "comment": None}]} + ]} + serialization = cnfvar_old.dump_json_string(cnf) + self.assertIn('DIALOUT_MODE', serialization) + self.assertIn('DIALOUT_DEFAULTPROVIDER_REF', serialization) + self.assertIn('HYPOTHETICAL_CHILD', serialization) + self.assertNotIn('dialout_mode', serialization) + self.assertNotIn('dialout_defaultprovider_ref', serialization) + self.assertNotIn('hypothetical_child', serialization) + + def test_read_cnf_lowercase(self): + """Test that after reading, varnames are lowercase.""" + cnf = cnfvar_old.read_cnf_json(demo_jsoncnf.encode('latin1')) + for parentvar in cnf['cnf']: + self.assertEqual(parentvar['varname'], + parentvar['varname'].lower()) + if 'children' in parentvar: + for childvar in parentvar['children']: + self.assertEqual(parentvar['varname'], + parentvar['varname'].lower()) + + +if __name__ == '__main__': + unittest.main() -- 1.7.1