Restore support for zero instance filtering in binary cnf store queries
[pyi2ncommon] / src / cnfvar / binary.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 binary: wrappers around binaries that work with the CNF store.
23
24 Featuring:
25     - CnfBinary: stateless class that can be used to invoke the different binaries
26       in a Python-friendly way.
27
28 .. note:: It is written as a class on purpose for it to be easily extended to
29           support invoking non-local binaries, but methods are all class methods since
30           the class is stateless.
31
32 .. seealso:: Overview Diagram linked to from doc main page
33
34 .. codeauthor:: Intra2net
35 """
36
37 import subprocess
38 import logging
39 import shlex
40 import html
41 import re
42
43 log = logging.getLogger("pyi2ncommon.cnfvar.binary")
44
45 #: default get_cnf binary
46 BIN_GET_CNF = "/usr/intranator/bin/get_cnf"
47 #: default set_cnf binary
48 BIN_SET_CNF = "/usr/intranator/bin/set_cnf"
49 #: encoding used by the get_cnf and set_cnf binaries
50 ENCODING = "latin1"
51
52
53 class CnfBinary:
54     """Provide wrappers around the multiple binaries to handle the CNF store."""
55
56     @classmethod
57     def run_cmd(cls, cmd, cmd_input=None, ignore_errors=False, timeout=60, encoding=ENCODING):
58         """
59         Run commands on the local machine with input via stdin.
60
61         :param cmd: command to run
62         :type cmd: str or list
63         :param str cmd_input: input to give to the command
64         :param bool ignore_errors: whether to ignore the exit code or raise if not zero
65         :param int timeout: amount of seconds to wait for the command to complete
66         :param str encoding: encoding to use (pass latin1 when not working with JSON)
67         :returns: command result
68         :rtype: :py:class:`subprocess.CompletedProcess`
69         """
70         if isinstance(cmd, str):
71             cmd = shlex.split(cmd)
72
73         log.debug("Running binary cmd `%s` with input:\n%s",
74                   ' '.join(cmd), cmd_input)
75         retval = subprocess.run(cmd, input=cmd_input, check=not ignore_errors,
76                                 capture_output=True, encoding=encoding, timeout=timeout)
77         log.debug("Command exited with \n"
78                   "\treturncode=%d\n"
79                   "\tstderr=%s\n"
80                   "\tstdout=%s", retval.returncode, retval.stderr, retval.stdout)
81         return retval
82
83     @classmethod
84     def get_cnf(cls, name=None, instance=None, no_children=False,
85                 as_json=False):
86         """
87         Wrapper around the `get_cnf` binary.
88
89         :param str name: optional name of the CNFs to get
90         :param instance: CNF instance
91         :type instance: str or int
92         :param bool no_children: whether to return child CNFs
93         :param bool as_json: whether to return a JSON-formatted string
94         :returns: output of the tool
95         :rtype: str
96
97         .. note:: being a wrapper, this function does not do anything extra
98                   like checking if arnied is running or waiting for generate.
99         """
100         if name is None and instance is not None:
101             raise ValueError("cannot pass `instance` without a `name`")
102
103         if isinstance(instance, str):
104             instance = int(instance)  # validate
105         elif instance is not None and not isinstance(instance, int):
106             raise TypeError(f"`instance` is of wrong type {type(instance)}")
107
108         # make sure 0 instance cnfvars like e.g. NICs can also be filtered by instance
109         cmd = f"{BIN_GET_CNF} {name or ''} {instance if instance is not None else ''}"
110
111         encoding = ENCODING
112         if no_children:
113             cmd += " --no-childs"
114         if as_json:
115             cmd += " --json"
116             encoding = "utf8"
117
118         # TODO: should error handling be improved so error messages
119         # from the binary are converted into specific exceptions types?
120         output = cls.run_cmd(cmd, encoding=encoding).stdout.strip()
121         # remove escape chars (such as '//')
122         output = output.replace('\\"', '"')
123         return output
124
125     @classmethod
126     def set_cnf(cls, input_str=None, input_file=None, delete=False,
127                 delete_file=False, as_json=False, fix_problems=False, **kwargs):
128         """
129         Wrapper around the `set_cnf` binary.
130
131         :param str input_str: string to send as input to the binary
132         :param str input_file: path to a file to pass to the binary
133         :param bool delete: whether to delete the corresponding CNFs
134         :param bool delete_file: whether to delete the file passed as input
135         :param bool as_json: whether to interpret input as JSON
136         :param bool fix_problems: whether to automatically fix errors in the vars
137         :param kwargs: extra arguments to pass to the binary - underscores are
138                        replaced by dash, e.g. set_cnf(..., enable_queue=True)
139                        becomes `/usr/intranator/bin/set_cnf --enable-queue`
140         :raises: :py:class:`SetCnfException` in case the binary errors out
141
142         .. note:: being a wrapper, this function does not do anything extra
143                   like checking if arnied is running or waiting for generate.
144         """
145         if input_str is None and input_file is None:
146             raise ValueError("Need to pass either a string or a file")
147
148         if delete_file is True and input_file is None:
149             raise ValueError("Cannot delete an unspecified file")
150
151         cmd = f"{BIN_SET_CNF}"
152         encoding = ENCODING
153         if as_json:
154             cmd += " --json"
155             encoding = "utf8"
156         if delete:
157             cmd += " -x"
158         if delete_file:
159             cmd += " --delete-file"
160         if fix_problems:
161             cmd += " --fix-problems"
162
163         for k, v in kwargs.items():
164             if v is True:
165                 k = "-".join(k.split("_"))
166                 cmd += f" --{k}"
167
168         if input_file:
169             cmd += f" {input_file}"
170
171         try:
172             cls.run_cmd(cmd, cmd_input=input_str, encoding=encoding)
173         except subprocess.CalledProcessError as ex:
174             # clean up the error message
175             ex.stderr = cls._clean_set_cnf_error(ex.stderr)
176             raise
177
178     @classmethod
179     def _clean_set_cnf_error(cls, err):
180         """
181         Turn the error from the set_cnf binary into a more readable message.
182
183         :param str err: error message from the binary
184         :returns: the clean error message
185         :rtype: str
186
187         Keep only offending lines and strip out HTML tags.
188         """
189         def get_error_lines(output_lines):
190             for cnfline in output_lines:
191                 buffer = ""
192                 quoted = False
193                 parts = []
194                 for c in cnfline:
195                     last_chr = buffer[-1] if buffer else None
196                     # this is a literal (unescaped) quote
197                     if c == "\"" and last_chr != "\\":
198                         quoted = not quoted
199
200                     if c == " " and not quoted:
201                         parts.append(buffer)
202                         buffer = ""
203                     else:
204                         buffer += c
205                 parts.append(buffer)
206
207                 if parts[-2] == "0":
208                     # no errors, ignore
209                     continue
210
211                 lineno, name, instance, parent, value, exit_code, error = parts
212
213                 # the binary outputs HTML strings
214                 error = html.unescape(error)
215                 # replace line breaks for readability
216                 error = re.sub(r"(<br\s*>)+", " ", error)
217                 # clean up HTML tags
218                 error = re.sub(r"<[^<]+?>", "", error)
219
220                 if parent == "-1":
221                     message = f"`` {lineno} {name},{instance}: {value} ``"
222                 else:
223                     message = f"`` {lineno}   ({parent}) {name},{instance}: {value} ``"
224                 yield f"Error in {message}: {error} (code={exit_code})"
225
226         lines = err.splitlines()
227         if len(lines) == 0:
228             return err
229
230         if "fatal errors in changes" not in lines[0]:
231             return err
232
233         errors = list(get_error_lines(lines[1:]))
234         return "\n".join(errors)