Convert the old cnfvar module into a cnfvar string parsing module
[pyi2ncommon] / src / cnfvar / string.py
diff --git a/src/cnfvar/string.py b/src/cnfvar/string.py
new file mode 100644 (file)
index 0000000..0aec596
--- /dev/null
@@ -0,0 +1,733 @@
+#!/usr/bin/env python
+#
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+"""
+string: functionality for parsing cnfvars from strings
+
+.. codeauthor:: Intra2net
+
+
+contents
+-------------------------------------------------------------------------------
+
+This module provides read and parse functionality for the Intra2net *CNF*
+format from strings and by extension cnf files.
+
+The input string takes one round-trip through the parsers and will error out on
+problematic lines. Thus, this module can also be used to syntax-check CNF data.
+
+Note that line numbers may be arbitrarily reassigned in the process. Of course,
+parent references and the relative ordering of lines will be preserved in this
+case.
+
+.. todo::
+    Decide on some facility for automatic fixup of line number values. The
+    internal representation is recursive so line numbers are not needed to
+    establish a variable hierarchy. They might as well be omitted from the json
+    input and only added when writing cnf. For the time being a lack of the
+    "number" field is interpreted as an error. Though it should at least
+    optionally be possible to omit the numbers entirely and have a function add
+    them after the fact. This might as well be added to :py:func:`is_cnf`
+    though it would be counterintuitive to have a predicate mutate its
+    argument. So maybe it could return a second argument to indicate a valid
+    structure that needs fixup or something like that.
+
+.. note::
+    The variable values of get_cnf seems to be encoded in latin1, and set_cnf
+    seems to assume latin1-encoded values (not var names). Function
+    :py:func:`read_cnf` converts this to unicode and other functions convert
+    unicode back to latin1.
+
+
+notes on Python 3 conversion
+-------------------------------------------------------------------------------
+
+Since the original *CNF* format assumes latin-1 encoded data pretty much
+exclusively, we preserve the original encoding while parsing the file.
+When assembling the data structures returned to the user, values are then
+converted to strings so they can be used naturally at the Python end.
+
+implementation
+-------------------------------------------------------------------------------
+"""
+
+import functools
+import sys
+import json
+import re
+import io
+
+
+###############################################################################
+# CONSTANTS
+###############################################################################
+
+
+CNF_FIELD_MANDATORY = set ([ "varname", "data", "instance" ])
+CNF_FIELD_OPTIONAL  = set ([ "parent", "children", "comment", "number" ])
+CNF_FIELD_KNOWN     = CNF_FIELD_MANDATORY | CNF_FIELD_OPTIONAL
+
+grab_parent_pattern = re.compile(b"""
+                                    ^            # match from start
+                                    \s*          # optional spaces
+                                    \d+          # line number
+                                    \s+          # spaces
+                                    \((\d+)\)    # parent
+                                 """,
+                                 re.VERBOSE)
+
+base_line_pattern = re.compile(b"""
+                                    ^                    # match from start
+                                    \s*                  # optional spaces
+                                    (\d+)                # line number
+                                    \s+                  # spaces
+                                    ([A-Z][A-Z0-9_]*)    # varname
+                                    \s*                  # optional spaces
+                                    ,                    # delimiter
+                                    \s*                  # optional spaces
+                                    (-1|\d+)             # instance
+                                    \s*                  # optional spaces
+                                    :                    # delimiter
+                                    \s*                  # optional spaces
+                                    \"(                  # quoted string (data)
+                                       (?:  \\\"         #  (of escaped dquote
+                                           |[^\"])*      #   or anything not a
+                                      )\"                #   literal quote)
+                                    \s*                  # optional spaces
+                                    (                    # bgroup
+                                     \#                  # comment leader
+                                     \s*                 # optional spaces
+                                     .*                  # string (comment)
+                                    )?                   # egroup, optional
+                                    $                    # eol
+                               """,
+                               re.VERBOSE)
+
+child_line_pattern = re.compile(b"""
+                                     ^                    # match from start
+                                     \s*                  # optional spaces
+                                     (\d+)                # line number
+                                     \s+                  # spaces
+                                     \((\d+)\)            # parent
+                                     \s+                  # spaces
+                                     ([A-Z][A-Z0-9_]*)    # varname
+                                     \s*                  # optional spaces
+                                     ,                    # delimiter
+                                     \s*                  # optional spaces
+                                     (-1|\d+)             # instance
+                                     \s*                  # optional spaces
+                                     :                    # delimiter
+                                     \s*                  # optional spaces
+                                     \"([^\"]*)\"         # quoted string (data)
+                                     \s*                  # optional spaces
+                                     (                    # bgroup
+                                      \#                  # comment leader
+                                      \s*                 # optional spaces
+                                      .*                  # string (comment)
+                                     )?                   # egroup, optional
+                                     $                    # eol
+                                """,
+                                re.VERBOSE)
+
+
+###############################################################################
+# HELPERS
+###############################################################################
+
+
+#
+# Sadly, the Intranator is still stuck with one leg in the 90s.
+#
+def to_latin1(s):
+    """Take given unicode str and convert it to a latin1-encoded `bytes`."""
+    return s.encode("latin-1")
+
+
+def from_latin1(s):
+    """Take given latin1-encoded `bytes` value and convert it to `str`."""
+    return s.decode("latin-1")
+
+
+#
+# Conversion functions
+#
+
+def marshal_in_number(number):
+    return int(number)
+
+
+def marshal_in_parent(parent):
+    return int(parent)
+
+
+def marshal_in_instance(instance):
+    return int(instance)
+
+
+def marshal_in_varname(varname):
+    return from_latin1(varname).lower()
+
+
+def marshal_in_data(data):
+    return from_latin1(data) if data is not None else ""
+
+
+def marshal_in_comment(comment):
+    return comment and from_latin1(comment[1:].strip()) or None
+
+
+#
+# Type checking
+#
+
+def is_string(s):
+    return isinstance(s, str)
+
+
+###############################################################################
+# EXCEPTIONS
+###############################################################################
+
+
+class InvalidCNF(Exception):
+
+    def __init__(self, msg):
+        self.msg = msg
+
+    def __str__(self):
+        return "Malformed CNF_VAR: \"%s\"" % self.msg
+
+
+class MalformedCNF(Exception):
+
+    def __init__(self, msg):
+        self.msg = msg
+
+    def __str__(self):
+        return "Malformed CNF file: \"%s\"" % self.msg
+
+
+###############################################################################
+# VALIDATION
+###############################################################################
+
+
+def is_valid(acc,
+             nested,
+             comment,
+             data,
+             instance,
+             number,
+             parent,
+             varname):
+    if varname is None:
+        raise InvalidCNF("CNF_VAR lacks a name.")
+    elif not is_string(varname):
+        raise InvalidCNF("Varname field of CNF_VAR \"%s\" is not a string."
+                         % varname)
+    elif varname == "":
+        raise InvalidCNF("Varname field of CNF_VAR is the empty string.")
+
+    if comment is not None:
+        if not is_string(comment):
+            raise InvalidCNF("Comment field of CNF_VAR \"%s\" is not a string."
+                             % varname)
+
+    if data is None:
+        raise InvalidCNF("Data field of CNF_VAR \"%s\" is empty."
+                         % varname)
+    elif not is_string(data):
+        raise InvalidCNF("Data field of CNF_VAR \"%s\" is not a string."
+                         % varname)
+
+    if instance is None:
+        raise InvalidCNF("Instance field of CNF_VAR \"%s\" is empty."
+                         % varname)
+    elif not isinstance(instance, int):
+        raise InvalidCNF("Instance field of CNF_VAR \"%s\" is not an integer."
+                         % varname)
+
+    if number is None:
+        raise InvalidCNF("Number field of CNF_VAR \"%s\" is empty."
+                         % varname)
+    elif not isinstance(number, int):
+        raise InvalidCNF("Number field of CNF_VAR \"%s\" is not an integer."
+                         % varname)
+    elif number < 1:
+        raise InvalidCNF("Number field of CNF_VAR \"%s\" must be positive, not %d."
+                         % (varname, number))
+    else:
+        other = acc.get(number, None)
+        if other is not None:  # already in use
+            raise InvalidCNF("Number field of CNF_VAR \"%s\" already used by variable %s."
+                             % (varname, other))
+        acc[number] = varname
+
+    if nested is True:
+        if parent is None:
+            raise InvalidCNF("Parent field of nested CNF_VAR \"%s\" is empty."
+                             % varname)
+        elif not isinstance(parent, int):
+            raise InvalidCNF("Parent field of CNF_VAR \"%s\" is not an integer."
+                             % varname)
+    else:
+        if parent is not None:
+            raise InvalidCNF("Flat CNF_VAR \"%s\" has nonsensical parent field \"%s\"."
+                             % (varname, parent))
+    return acc
+
+
+def is_cnf(root):
+    """
+    is_cnf -- Predicate testing "CNF_VAR-ness". Folds the :py:func:`is_valid`
+    predicate over the argument which can be either a well-formed CNF
+    dictionary or a list of CNF_VARs.
+
+    :type root: cnfvar or cnf list
+    :rtype: bool
+
+    Not that if it returns at all, ``is_cnf()`` returns ``True``. Any non
+    well-formed member of the argument will cause the predicate to bail out
+    with an exception during traversal.
+    """
+    cnf = cnf_root(root)
+    if cnf is None:
+        raise InvalidCNF(root)
+    return walk_cnf(cnf, False, is_valid, {}) is not None
+
+
+def is_cnf_var(obj):
+    """
+    Check whether a dictionary is a valid CNF.
+
+    :param dict obj: dictionary to check
+    :returns: True if the dictionary has all the mandatory fields and no
+              unknown fields, False otherwise
+    :rtype: bool
+    """
+    assert isinstance (obj, dict)
+
+    for f in CNF_FIELD_MANDATORY:
+        if obj.get(f, None) is None:
+            return False
+
+    for f in obj:
+        if f not in CNF_FIELD_KNOWN:
+            return False
+
+    return True
+
+
+###############################################################################
+# DESERIALIZATION
+###############################################################################
+#
+# CNF reader
+#
+# Parsing usually starts from the `read_cnf`, which accepts a string containing
+# the variables to parse in the same structure as returned by `get_cnf`.
+#
+# In the `prepare` function the string is split into lines, and a 3-element
+# tuple is built. The first (named `current`) and second (named `next`)
+# elements of this tuple are respectively the first and second non-empty lines
+# of the input, while the third is a list of the remaining lines. This tuple is
+# named `state` in the implementation below, and it is passed around during
+# parsing. The `get` and `peek` functions are used to easily retrieve the
+# `current` and `next` items from the "state".
+#
+# When we "advance" the state, we actually drop the "current" element,
+# replacing it with the "next", while a new "next" is popped from the list of
+# remaining lines. Parsing is done this way because we need to look ahead at
+# the next line -- if it is a child it needs to be appended to the `children`
+# property of the current line.
+#
+# Regular expressions are used to extract important information from the CNF
+# lines. Finally, once parsing is completed, a dictionary is returned. The dict
+# has the same structure as the serialized JSON output returned by
+# `get_cnf -j`.
+#
+
+
+def read_cnf(data):
+    """
+    Read cnf data from data bytes.
+
+    :param data: raw data
+    :type data: str or bytes
+    :return: the parsed cnf data
+    :rtype: {str, {str, str or int}}
+    """
+    if isinstance(data, str):
+        data = to_latin1(data)
+    state = prepare(data)
+    if state is None:
+        raise InvalidCNF("Empty input string.")
+
+    cnf = parse_cnf_root(state)
+    if is_cnf(cnf) is False:
+        raise TypeError("Invalid CNF_VAR.")
+    return {"cnf": cnf}
+
+
+def prepare(raw):
+    """
+    Build 3-element iterable from a CNF string dump.
+
+    :param raw: string content as returned by `get_cnf`
+    :type raw: bytes
+    :returns: 3-element tuple, where the first two elements are the first two
+              lines of the output and the third is a list containing the rest
+              of the lines in reverse.
+    :rtype: (str * str option * str list) option
+    """
+    lines = raw.splitlines()
+    lines.reverse()
+    try:
+        first = lines.pop()
+    except IndexError:
+        return None
+
+    try:
+        second = lines.pop()
+    except IndexError:
+        second = None
+
+    first = first.strip()
+    if first == b"":
+        return advance((first, second, lines))
+
+    return (first, second, lines)
+
+
+def advance(cns):
+    """
+    Pop the next line from the stream, advancing the tuple.
+
+    :param cns: a 3-element tuple containing two CNF lines and a list of the
+                remaining lines
+    :type cnd: (str, str, [str])
+    :returns: a new tuple with a new item popped from the list of lines
+    :rtype cnd: (str, str, [str])
+    """
+    current, next, stream = cns
+    if next is None:  # reached end of stream
+        return None
+    current = next
+
+    try:
+        next = stream.pop()
+        next = next.strip()
+    except IndexError:
+        next = None
+
+    if current == "":  # skip blank lines
+        return advance((current, next, stream))
+    return (current, next, stream)
+
+
+def get(cns):
+    """
+    Get the current line from the state without advancing it.
+
+    :param cns: a 3-element tuple containing two CNF lines and a list of the
+                remaining lines
+    :type cnd: (str, str, [str])
+    :returns: the CNF line stored as `current`
+    :rtype: str
+    """
+    current, _, _ = cns
+    return current
+
+
+def parse_cnf_root(state):
+    """
+    Iterate over and parse a list of CNF lines.
+
+    :param state: a 3-element tuple containing two lines and a list of the
+                  remaining lines
+    :type state: (str, str, [str])
+    :returns: a list of parsed CNF variables
+    :rtype: [dict]
+
+    The function will parse the first element from the `state` tuple, then read
+    the next line to see if it is a child variable. If it is, it will be
+    appended to the last parsed CNF, otherwise top-level parsing is done
+    normally.
+    """
+    lines = []
+    current = get(state)
+    while state:
+        cnf_line = read_base_line(current)
+        if cnf_line is not None:
+            lines.append(cnf_line)
+            state = advance(state)
+            if state is None:  # -> nothing left to do
+                break
+            current = get(state)
+            parent = get_parent(current)  # peek at next line
+            if parent is not None:  # -> recurse into children
+                (state, children, _parent) = parse_cnf_children(state, parent)
+                cnf_line["children"] = children
+                if state is None:
+                    break
+                current = get(state)
+        else:
+            state = advance(state)
+            if state is None:
+                break
+            current = get(state)
+    return lines
+
+
+def parse_cnf_children(state, parent):
+    """
+    Read and parse child CNFs of a given parent until there is none left.
+
+    :param state: a 3-element tuple containing two lines and a list of the
+                  remaining lines
+    :type state: (str, str, [str])
+    :param parent: id of the parent whose children we are looking for
+    :type parent: int
+    :returns: a 3-element tuple with the current state, a list of children of
+              the given parent and the parent ID
+    :rtype: (tuple, [str], int)
+
+    The function will recursively parse child lines from the `state` tuple
+    until one of these conditions is satisfied:
+
+    1. the input is exhausted
+    2. the next CNF line
+        2.1. is a toplevel line
+        2.2. is a child line whose parent has a lower parent number
+
+    Conceptually, 2.1 is a very similar to 2.2 but due to the special status of
+    toplevel lines in CNF we need to handle them separately.
+
+    Note that since nesting of CNF vars is achieved via parent line numbers,
+    lines with different parents could appear out of order. libcnffile will
+    happily parse those and still assign children to the specified parent:
+
+    ::
+        # set_cnf <<THATSALL
+        1 USER,1337: "l33t_h4x0r"
+        2    (1) USER_GROUP_MEMBER_REF,0: "2"
+        4 USER,1701: "picard"
+        5    (4) USER_GROUP_MEMBER_REF,0: "2"
+        6    (4) USER_PASSWORD,0: "engage"
+        3    (1) USER_PASSWORD,0: "hacktheplanet"
+        THATSALL
+        # get_cnf user 1337
+        1 USER,1337: "l33t_h4x0r"
+        2    (1) USER_GROUP_MEMBER_REF,0: "2"
+        3    (1) USER_PASSWORD,0: "hacktheplanet"
+        # get_cnf user 1701
+        1 USER,1701: "picard"
+        2    (1) USER_GROUP_MEMBER_REF,0: "2"
+        3    (1) USER_PASSWORD,0: "engage"
+
+    It is a limitation of ``cnfvar.py`` that it cannot parse CNF data
+    structured like the above example: child lists are only populated from
+    subsequent CNF vars using the parent number solely to track nesting levels.
+    The parser does not keep track of line numbers while traversing the input
+    so it doesn’t support retroactively assigning a child to anything else but
+    the immediate parent.
+    """
+    lines = []
+    current = get(state)
+    while True:
+        cnf_line = read_child_line(current)
+        if cnf_line is not None:
+            lines.append(cnf_line)
+            state = advance(state)
+            if state is None:
+                break
+            current = get(state)
+            new_parent = get_parent(current)
+            if new_parent is None:
+                # drop stack
+                return (state, lines, None)
+            if new_parent > parent:
+                # parent is further down in hierarchy -> new level
+                (state, children, new_parent) = \
+                    parse_cnf_children (state, new_parent)
+                if state is None:
+                    break
+                cnf_line["children"] = children
+                current = get(state)
+                new_parent = get_parent(current)
+                if new_parent is None:
+                    # drop stack
+                    return (state, lines, None)
+            if new_parent < parent:
+                # parent is further up in hierarchy -> pop level
+                return (state, lines, new_parent)
+            # new_parent == parent -> continue parsing on same level
+    return (state, lines, parent)
+
+
+def get_parent(line):
+    """
+    Extract the ID of the parent for a given CNF line.
+
+    :param str line: CNF line
+    :returns: parent ID or None if no parent is found
+    :rtype: int or None
+    """
+    match = re.match(grab_parent_pattern, line)
+    if match is None:  # -> no parent
+        return None
+    return int(match.groups()[0])
+
+
+def read_base_line(line):
+    """
+    Turn one top-level CNF line into a dictionary.
+
+    :param str line: CNF line
+    :rtype: {str: Any}
+
+    This performs the necessary decoding on values to obtain proper Python
+    strings from 8-bit encoded CNF data.
+
+    The function only operates on individual lines. Argument strings that
+    contain data for multiple lines – this includes child lines of the current
+    CNF var! – will trigger a parsing exception.
+    """
+    if len(line.strip()) == 0:
+        return None  # ignore empty lines
+    if line[0] == b"#":
+        return None  # ignore comments
+
+    match = re.match(base_line_pattern, line)
+    if match is None:
+        raise MalformedCNF("Syntax error in line \"\"\"%s\"\"\"" % line)
+    number, varname, instance, data, comment = match.groups()
+    return {
+        "number"   : marshal_in_number   (number),
+        "varname"  : marshal_in_varname  (varname),
+        "instance" : marshal_in_instance (instance),
+        "data"     : marshal_in_data     (data),
+        "comment"  : marshal_in_comment  (comment),
+    }
+
+
+def read_child_line(line):
+    """
+    Turn one child CNF line into a dictionary.
+
+    :param str line: CNF line
+    :rtype: {str: Any}
+
+    This function only operates on individual lines. If the argument string is
+    syntactically valid but contains input representing multiple CNF vars, a
+    parse error will be thrown.
+    """
+    if len(line.strip()) == 0:
+        return None  # ignore empty lines
+    if line[0] == "#":
+        return None  # ignore comments
+
+    match = re.match(child_line_pattern, line)
+    if match is None:
+        raise MalformedCNF("Syntax error in child line \"\"\"%s\"\"\""
+                           % from_latin1 (line))
+    number, parent, varname, instance, data, comment = match.groups()
+    return {
+        "number"   : marshal_in_number   (number),
+        "parent"   : marshal_in_parent   (parent),
+        "varname"  : marshal_in_varname  (varname),
+        "instance" : marshal_in_instance (instance),
+        "data"     : marshal_in_data     (data),
+        "comment"  : marshal_in_comment  (comment),
+    }
+
+
+###############################################################################
+# SERIALIZATION
+###############################################################################
+
+
+def cnf_root(root):
+    """
+    Extract a list of CNFs from a given structure.
+
+    :param root: list of CNFs or a CNF dictionary
+    :type root: [dict] or dict
+    :raises: :py:class:`TypeError` if no CNFs can be extracted
+    :returns: list with one or more CNF objects
+    :rtype: [dict]
+
+    Output varies depending on a few conditions:
+    - If `root` is a list, return it right away
+    - If `root` is a dict corresponding to a valid CNF value, return it wrapped
+      in a list
+    - If `root` is a dict with a `cnf` key containg a list (as the JSON
+      returned by `get_cnf -j`), return the value
+    - Otherwise, raise an error
+    """
+    if isinstance(root, list):
+        return root
+    if not isinstance(root, dict):
+        raise TypeError(
+            "Expected dictionary of CNF_VARs, got %s." % type(root))
+    if is_cnf_var(root):
+        return [root]
+    cnf = root.get("cnf", None)
+    if not isinstance(cnf, list):
+        raise TypeError("Expected list of CNF_VARs, got %s." % type(cnf))
+    return cnf
+
+
+###############################################################################
+# TRAVERSAL
+###############################################################################
+
+
+def walk_cnf(cnf, nested, fun, acc):
+    """
+    Depth-first traversal of a CNF tree.
+
+    :type cnf: cnf list
+    :type nested: bool
+    :type fun: 'a -> bool -> (cnf stuff) -> 'a
+    :type acc: 'a
+    :rtype: 'a
+
+    Executes ``fun`` recursively for each node in the tree. The function
+    receives the accumulator ``acc`` which can be of an arbitrary type as first
+    argument. The second argument is a flag indicating whether the current
+    CNF var is a child (if ``True``) or a parent var. CNF member fields are
+    passed via named optional arguments.
+    """
+    for var in cnf:
+        acc = fun(acc,
+                  nested,
+                  comment=var.get("comment", None),
+                  data=var.get("data", None),
+                  instance=var.get("instance", None),
+                  number=var.get("number", None),
+                  parent=var.get("parent", None),
+                  varname=var.get("varname", None))
+        children = var.get("children", None)
+        if children is not None:
+            acc = walk_cnf(children, True, fun, acc)
+    return acc