Add a new cnfvar2 API
[pyi2ncommon] / src / cnfvar / binary.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 binary: wrappers around binaries that work with the CNF store.
23
24 Featuring
25 - CnfBinary: stateless class that can be used to invoke the different binaries
26 in a Python-friendly way.
27
28 .. note:: It is written as a class on purpose for it to be easily extended to
29 support invoking non-local binaries, but methods are all class methods since
30 the class is stateless.
31
32 .. codeauthor:: Intra2net
33 """
34
35 import subprocess
36 import logging
37 import shlex
38 import html
39 import re
40
41 log = logging.getLogger("pyi2ncommon.cnfvar.store")
42
43 #: default get_cnf binary
44 BIN_GET_CNF = "/usr/intranator/bin/get_cnf"
45 #: default set_cnf binary
46 BIN_SET_CNF = "/usr/intranator/bin/set_cnf"
47 #: encoding used by the get_cnf and set_cnf binaries
48 ENCODING = "latin1"
49
50
51 class CnfBinary(object):
52     """Provide wrappers around the multiple binaries to handle the CNF store."""
53
54     @classmethod
55     def run_cmd(cls, cmd, cmd_input=None, ignore_errors=False, timeout=60, encoding=ENCODING):
56         """
57         Run commands on the local machine with input via stdin.
58
59         :param cmd: command to run
60         :type cmd: str or list
61         :param str cmd_input: input to give to the command
62         :param bool ignore_errors: whether to ignore the exit code or raise if not zero
63         :param int timeout: amount of seconds to wait for the command to complete
64         :param str encoding: encoding to use (pass latin1 when not working with JSON)
65         :returns: command result
66         :rtype: :py:class:`subprocess.CompletedProcess`
67         """
68         if isinstance(cmd, str):
69             cmd = shlex.split(cmd)
70
71         log.debug("Running binary cmd `%s` with input:\n%s",
72                   ' '.join(cmd), cmd_input)
73         retval = subprocess.run(cmd, input=cmd_input, check=not ignore_errors,
74                                 capture_output=True, encoding=encoding, timeout=timeout)
75         log.debug("Command exited with \n"
76                   "\treturncode=%d\n"
77                   "\tstderr=%s\n"
78                   "\tstdout=%s", retval.returncode, retval.stderr, retval.stdout)
79         return retval
80
81     @classmethod
82     def get_cnf(cls, name=None, instance=None, no_children=False,
83                 as_json=False):
84         """
85         Wrapper around the `get_cnf` binary.
86
87         :param str name: optional name of the CNFs to get
88         :param instance: CNF instance
89         :type instance: str or int
90         :param bool no_children: whether to return child CNFs
91         :param bool as_json: whether to return a JSON-formatted string
92         :returns: output of the tool
93         :rtype: str
94
95         .. note:: being a wrapper, this function does not do anything extra
96         like checking if arnied is running or waiting for generate.
97         """
98         if name is None and instance is not None:
99             raise ValueError("cannot pass `instance` without a `name`")
100
101         if isinstance(instance, str):
102             instance = int(instance)  # validate
103         elif instance is not None and not isinstance(instance, int):
104             raise TypeError(f"`instance` is of wrong type {type(instance)}")
105
106         cmd = f"{BIN_GET_CNF} {name or ''} {instance or ''}"
107
108         encoding = ENCODING
109         if no_children:
110             cmd += " --no-childs"
111         if as_json:
112             cmd += " --json"
113             encoding = "utf8"
114
115         # TODO: should error handling be improved so error messages
116         # from the binary are converted into specific exceptions types?
117         output = cls.run_cmd(cmd, encoding=encoding).stdout.strip()
118         # remove escape chars (such as '//')
119         output = output.replace('\\"', '"')
120         return output
121
122     @classmethod
123     def set_cnf(cls, input_str=None, input_file=None, delete=False,
124                 delete_file=False, as_json=False, fix_problems=False, **kwargs):
125         """
126         Wrapper around the `set_cnf` binary.
127
128         :param str input_str: string to send as input to the binary
129         :param str input_file: path to a file to pass to the binary
130         :param bool delete: whether to delete the corresponding CNFs
131         :param bool delete_file: whether to delete the file passed as input
132         :param bool as_json: whether to interpret input as JSON
133         :param bool fix_problems: whether to automatically fix errors in the vars
134         :param kwargs: extra arguments to pass to the binary - underscores are
135                        replaced by dash, e.g. set_cnf(..., enable_queue=True)
136                        becomes `/usr/intranator/bin/set_cnf --enable-queue`
137         :raises: :py:class:`SetCnfException` in case the binary errors out
138
139         .. note:: being a wrapper, this function does not do anything extra
140         like checking if arnied is running or waiting for generate.
141         """
142         if input_str is None and input_file is None:
143             raise ValueError("Need to pass either a string or a file")
144
145         if delete_file is True and input_file is None:
146             raise ValueError("Cannot delete an unspecified file")
147
148         cmd = f"{BIN_SET_CNF}"
149         encoding = ENCODING
150         if as_json:
151             cmd += " --json"
152             encoding = "utf8"
153         if delete:
154             cmd += " -x"
155         if delete_file:
156             cmd += " --delete-file"
157         if fix_problems:
158             cmd += " --fix-problems"
159
160         for k, v in kwargs.items():
161             if v is True:
162                 k = "-".join(k.split("_"))
163                 cmd += f" --{k}"
164
165         if input_file:
166             cmd += f" {input_file}"
167
168         try:
169             cls.run_cmd(cmd, cmd_input=input_str, encoding=encoding)
170         except subprocess.CalledProcessError as ex:
171             # clean up the error message
172             ex.stderr = cls._clean_set_cnf_error(ex.stderr)
173             raise
174
175     @classmethod
176     def _clean_set_cnf_error(cls, err):
177         """
178         Turn the error from the set_cnf binary into a more readable message.
179
180         :param str err: error message from the binary
181         :returns: the clean error message
182         :rtype: str
183
184         Keep only offending lines and strip out HTML tags.
185         """
186         def get_error_lines(output_lines):
187             for cnfline in output_lines:
188                 buffer = ""
189                 quoted = False
190                 parts = []
191                 for c in cnfline:
192                     last_chr = buffer[-1] if buffer else None
193                     # this is a literal (unescaped) quote
194                     if c == "\"" and last_chr != "\\":
195                         quoted = not quoted
196
197                     if c == " " and not quoted:
198                         parts.append(buffer)
199                         buffer = ""
200                     else:
201                         buffer += c
202                 parts.append(buffer)
203
204                 if parts[-2] == "0":
205                     # no errors, ignore
206                     continue
207
208                 lineno, name, instance, parent, value, exit_code, error = parts
209
210                 # the binary outputs HTML strings
211                 error = html.unescape(error)
212                 # replace line breaks for readability
213                 error = re.sub(r"(<br\s*>)+", " ", error)
214                 # clean up HTML tags
215                 error = re.sub(r"<[^<]+?>", "", error)
216
217                 if parent == "-1":
218                     message = f"`` {lineno} {name},{instance}: {value} ``"
219                 else:
220                     message = f"`` {lineno}   ({parent}) {name},{instance}: {value} ``"
221                 yield f"Error in {message}: {error} (code={exit_code})"
222
223         lines = err.splitlines()
224         if len(lines) == 0:
225             return err
226
227         if "fatal errors in changes" not in lines[0]:
228             return err
229
230         errors = list(get_error_lines(lines[1:]))
231         return "\n".join(errors)