Review: a few fixes
[pyi2ncommon] / src / cnfvar / binary.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 binary: wrappers around binaries that work with the CNF store.
23
24 Featuring:
25     - CnfBinary: stateless class that can be used to invoke the different binaries
26       in a Python-friendly way.
27
28 .. note:: It is written as a class on purpose for it to be easily extended to
29           support invoking non-local binaries, but methods are all class methods since
30           the class is stateless.
31
32 .. seealso:: Overview Diagram linked to from doc main page
33
34 .. codeauthor:: Intra2net
35 """
36
37 import subprocess
38 import logging
39 import shlex
40 import html
41 import re
42
43 log = logging.getLogger("pyi2ncommon.cnfvar.binary")
44
45 #: default get_cnf binary
46 BIN_GET_CNF = "/usr/intranator/bin/get_cnf"
47 #: default set_cnf binary
48 BIN_SET_CNF = "/usr/intranator/bin/set_cnf"
49 #: encoding used by the get_cnf and set_cnf binaries
50 ENCODING = "latin1"
51
52
53 class CnfBinary:
54     """Provide wrappers around the multiple binaries to handle the CNF store."""
55
56     @classmethod
57     def run_cmd(cls, cmd, cmd_input=None, ignore_errors=False, timeout=60, encoding=ENCODING):
58         """
59         Run commands on the local machine with input via stdin.
60
61         :param cmd: command to run
62         :type cmd: str or list
63         :param str cmd_input: input to give to the command
64         :param bool ignore_errors: whether to ignore the exit code or raise if not zero
65         :param int timeout: amount of seconds to wait for the command to complete
66         :param str encoding: encoding to use (pass latin1 when not working with JSON)
67         :returns: command result
68         :rtype: :py:class:`subprocess.CompletedProcess`
69         """
70         if isinstance(cmd, str):
71             cmd = shlex.split(cmd)
72
73         log.debug("Running binary cmd `%s` with input:\n%s",
74                   ' '.join(cmd), cmd_input)
75         retval = subprocess.run(cmd, input=cmd_input, check=not ignore_errors,
76                                 capture_output=True, encoding=encoding, timeout=timeout)
77         log.debug("Command exited with \n"
78                   "\treturncode=%d\n"
79                   "\tstderr=%s\n"
80                   "\tstdout=%s", retval.returncode, retval.stderr, retval.stdout)
81         return retval
82
83     @classmethod
84     def get_cnf(cls, name=None, instance=None, no_children=False,
85                 as_json=False):
86         """
87         Wrapper around the `get_cnf` binary.
88
89         :param str name: optional name of the CNFs to get
90         :param instance: CNF instance
91         :type instance: str or int
92         :param bool no_children: whether to return child CNFs
93         :param bool as_json: whether to return a JSON-formatted string
94         :returns: output of the tool
95         :rtype: str
96
97         .. note:: being a wrapper, this function does not do anything extra
98                   like checking if arnied is running or waiting for generate.
99         """
100         if name is None and instance is not None:
101             raise ValueError("cannot pass `instance` without a `name`")
102
103         if isinstance(instance, str):
104             instance = int(instance)  # validate
105         elif instance is not None and not isinstance(instance, int):
106             raise TypeError(f"`instance` is of wrong type {type(instance)}")
107
108         cmd = f"{BIN_GET_CNF} {name or ''} {instance or ''}"
109
110         encoding = ENCODING
111         if no_children:
112             cmd += " --no-childs"
113         if as_json:
114             cmd += " --json"
115             encoding = "utf8"
116
117         # TODO: should error handling be improved so error messages
118         # from the binary are converted into specific exceptions types?
119         output = cls.run_cmd(cmd, encoding=encoding).stdout.strip()
120         # remove escape chars (such as '//')
121         output = output.replace('\\"', '"')
122         return output
123
124     @classmethod
125     def set_cnf(cls, input_str=None, input_file=None, delete=False,
126                 delete_file=False, as_json=False, fix_problems=False, **kwargs):
127         """
128         Wrapper around the `set_cnf` binary.
129
130         :param str input_str: string to send as input to the binary
131         :param str input_file: path to a file to pass to the binary
132         :param bool delete: whether to delete the corresponding CNFs
133         :param bool delete_file: whether to delete the file passed as input
134         :param bool as_json: whether to interpret input as JSON
135         :param bool fix_problems: whether to automatically fix errors in the vars
136         :param kwargs: extra arguments to pass to the binary - underscores are
137                        replaced by dash, e.g. set_cnf(..., enable_queue=True)
138                        becomes `/usr/intranator/bin/set_cnf --enable-queue`
139         :raises: :py:class:`SetCnfException` in case the binary errors out
140
141         .. note:: being a wrapper, this function does not do anything extra
142                   like checking if arnied is running or waiting for generate.
143         """
144         if input_str is None and input_file is None:
145             raise ValueError("Need to pass either a string or a file")
146
147         if delete_file is True and input_file is None:
148             raise ValueError("Cannot delete an unspecified file")
149
150         cmd = f"{BIN_SET_CNF}"
151         encoding = ENCODING
152         if as_json:
153             cmd += " --json"
154             encoding = "utf8"
155         if delete:
156             cmd += " -x"
157         if delete_file:
158             cmd += " --delete-file"
159         if fix_problems:
160             cmd += " --fix-problems"
161
162         for k, v in kwargs.items():
163             if v is True:
164                 k = "-".join(k.split("_"))
165                 cmd += f" --{k}"
166
167         if input_file:
168             cmd += f" {input_file}"
169
170         try:
171             cls.run_cmd(cmd, cmd_input=input_str, encoding=encoding)
172         except subprocess.CalledProcessError as ex:
173             # clean up the error message
174             ex.stderr = cls._clean_set_cnf_error(ex.stderr)
175             raise
176
177     @classmethod
178     def _clean_set_cnf_error(cls, err):
179         """
180         Turn the error from the set_cnf binary into a more readable message.
181
182         :param str err: error message from the binary
183         :returns: the clean error message
184         :rtype: str
185
186         Keep only offending lines and strip out HTML tags.
187         """
188         def get_error_lines(output_lines):
189             for cnfline in output_lines:
190                 buffer = ""
191                 quoted = False
192                 parts = []
193                 for c in cnfline:
194                     last_chr = buffer[-1] if buffer else None
195                     # this is a literal (unescaped) quote
196                     if c == "\"" and last_chr != "\\":
197                         quoted = not quoted
198
199                     if c == " " and not quoted:
200                         parts.append(buffer)
201                         buffer = ""
202                     else:
203                         buffer += c
204                 parts.append(buffer)
205
206                 if parts[-2] == "0":
207                     # no errors, ignore
208                     continue
209
210                 lineno, name, instance, parent, value, exit_code, error = parts
211
212                 # the binary outputs HTML strings
213                 error = html.unescape(error)
214                 # replace line breaks for readability
215                 error = re.sub(r"(<br\s*>)+", " ", error)
216                 # clean up HTML tags
217                 error = re.sub(r"<[^<]+?>", "", error)
218
219                 if parent == "-1":
220                     message = f"`` {lineno} {name},{instance}: {value} ``"
221                 else:
222                     message = f"`` {lineno}   ({parent}) {name},{instance}: {value} ``"
223                 yield f"Error in {message}: {error} (code={exit_code})"
224
225         lines = err.splitlines()
226         if len(lines) == 0:
227             return err
228
229         if "fatal errors in changes" not in lines[0]:
230             return err
231
232         errors = list(get_error_lines(lines[1:]))
233         return "\n".join(errors)