Add a Python version of arnied varlink interface
[pyi2ncommon] / src / arnied_api.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 arnied_api: wrappers around the arnied varlink API.
23
24 Featuring
25 - Arnied: stateless class with methods as exposed in the varlink API.
26
27 .. codeauthor:: Intra2net
28 """
29
30 from contextlib import contextmanager
31 from dataclasses import dataclass
32 from types import SimpleNamespace
33 from enum import Enum, auto
34 import typing
35 import json
36 import sys
37
38
39 #: socket where the varlink interface is exposed
40 ARNIED_SOCKET = "/var/intranator/arnied-varlink.sock"
41
42
43 # Interface types
44
45
46 @dataclass
47 class CnfVar(object):
48     name: str
49     instance: int
50     data: str
51     deleted: bool
52     children: typing.List['CnfVar']
53
54 @dataclass
55 class ChangeCnfVar(object):
56     chg_nr: int
57     name: str
58     instance: int
59     data: str
60     result_type: int
61     result_msg: str
62
63 @dataclass
64 class GetCnfQuery(object):
65     name: str
66     instance: typing.Optional[int] = None
67
68 class ProgramStatus(Enum):
69     Running = auto()
70     Scheduled = auto()
71     NotScheduled = auto()
72
73
74 # Method return types (not present in the interface)
75
76 class IsQueueActiveRet(SimpleNamespace):
77     active: bool
78
79 class IsScheduledOrRunningRet(SimpleNamespace):
80     status: ProgramStatus
81     timestamp: typing.Optional[int]
82
83 class GetCnfRet(SimpleNamespace):
84     vars: typing.List[CnfVar]
85
86 class SetCnfRet(SimpleNamespace):
87     change_numbers: typing.List[int]
88
89 class SetCommitCnf(SimpleNamespace):
90     results: typing.List[ChangeCnfVar]
91
92 class CommitCnfRet(SimpleNamespace):
93     results: typing.List[ChangeCnfVar]
94
95
96 # Exceptions
97
98
99 class InternalBridgeError(Exception):
100     def __init__(self):
101         super().__init__("An error occurred (InternalBridgeError)")
102
103 class EmptyInputError(Exception):
104     def __init__(self):
105         super().__init__("An error occurred (EmptyInputError)")
106
107 class BadCharError(Exception):
108     def __init__(self, results: str) -> None:
109         super().__init__(f"[BadCharError] Error in the arnied API (results={results})")
110         self.results = results
111
112 class ChildError(Exception):
113     def __init__(self, results: str) -> None:
114         super().__init__(f"[ChildError] Error in the arnied API (results={results})")
115         self.results = results
116
117 class CnfCommitError(Exception):
118     def __init__(self, results: typing.List[ChangeCnfVar]) -> None:
119         self.results = []
120         msgs = []
121         for r in results:
122             # the type does not seem to be converted correctly
123             if isinstance(r, dict):
124                 r = ChangeCnfVar(**r)
125             self.results.append(r)
126             msgs.append(f"{r.name},{r.instance}: \"{r.data}\": {r.result_msg}")
127         super().__init__("Error commiting cnfvars:\n" + "\n".join(msgs))
128
129 class NotFoundError(Exception):
130     def __init__(self, results: str) -> None:
131         super().__init__(f"[NotFoundError] Error in the arnied API (results={results})")
132         self.results = results
133
134 class SchedulerProgError(Exception):
135     def __init__(self):
136         super().__init__("An error occurred (SchedulerProgError)")
137
138 class ShowerrVarDataErr(Exception):
139     def __init__(self, msg_id: int) -> None:
140         super().__init__(f"[ShowerrVarDataErr] Error in the arnied API (msg_id={msg_id})")
141         self.msg_id = msg_id
142
143 class ShowerrVarMissing(Exception):
144     def __init__(self, params_count: int) -> None:
145         super().__init__(f"[ShowerrVarMissing] Error in the arnied API (params_count={params_count})")
146         self.params_count = params_count
147
148 class ProviderNotFound(Exception):
149     def __init__(self, provider_id: int) -> None:
150         super().__init__(f"[ProviderNotFound] Not found a provider with ID `{provider_id})`")
151         self.provider_id = provider_id
152
153
154 # Arnied varlink proxy
155
156
157 class Arnied(object):
158     VARLINK_ADDRESS = f"unix:{ARNIED_SOCKET}"
159
160     @classmethod
161     def _send_msg_wrapper(cls, send_msg):
162         """
163         Wrap the _send_message method of the client to always send parameters.
164
165         HACK: this is as hackish as it comes, but at the moment we do not
166         have another workaround. The underlying varlink package will only
167         send a dictionary with a "parameters" key when we pass parameters,
168         however methods with nullable arguments require this key, in which
169         case the API errors out. One example is the `GetCnf` method: if
170         we want to get *all* cnfvars we should not provide it with any args,
171         but since the varlink module will not pass a dictionary with a
172         "parameters" key, it will fail. This has been reported upstream:
173         https://github.com/varlink/python/issues/10#issuecomment-1067232162
174         """
175         def _send_msg(msg):
176             data = json.loads(msg.decode("utf8"))
177             if not data.get("method", "").startswith("io.arnied"):
178                 return send_msg(msg)
179             if data.get("parameters", {}):
180                 return send_msg(msg)
181             data["parameters"] = {}
182             return send_msg(json.dumps(data).encode("utf8"))
183         return _send_msg
184
185     @classmethod
186     @contextmanager
187     def new_connection(cls):
188         # lazy import to reduce the scope of this dependency
189         import varlink
190         client = varlink.Client.new_with_address(cls.VARLINK_ADDRESS)
191         conn = client.open("io.arnied", namespaced=True)
192         try:
193             sm = conn._send_message
194             setattr(conn, "_send_message", Arnied._send_msg_wrapper(sm))
195             yield conn
196         except varlink.VarlinkError as ex:
197             # not an exception from this module
198             if not ex.error().startswith("io.arnied"):
199                 raise
200
201             # raise the corresponding exception
202             error_class_name = ex.error().split(".")[-1]
203             self_mod = sys.modules[__name__]
204             exception_class = self_mod.__dict__[error_class_name]
205             raise exception_class(**ex.parameters())
206         finally:
207             conn.close()
208             client.cleanup()
209
210     @classmethod
211     def is_queue_active(cls) -> IsQueueActiveRet:
212         with cls.new_connection() as conn:
213             return conn.IsQueueActive()
214
215     @classmethod
216     def is_scheduled_or_running(cls, prog: str) -> IsScheduledOrRunningRet:
217         with cls.new_connection() as conn:
218             return conn.IsScheduledOrRunning(prog)
219
220     @classmethod
221     def noop(cls) -> None:
222         with cls.new_connection() as conn:
223             return conn.NoOp()
224
225     @classmethod
226     def no_timeout(cls) -> None:
227         with cls.new_connection() as conn:
228             return conn.NoTimeout()
229
230     @classmethod
231     def get_cnf(cls, query: typing.Optional[GetCnfQuery]) -> GetCnfRet:
232         with cls.new_connection() as conn:
233             return conn.GetCnf(query)
234
235     @classmethod
236     def set_cnf(cls, vars: typing.List[CnfVar]) -> SetCnfRet:
237         with cls.new_connection() as conn:
238             return conn.SetCnf(vars)
239
240     @classmethod
241     def set_commit_cnf(cls, vars: typing.List[CnfVar], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> SetCommitCnf:
242         with cls.new_connection() as conn:
243             return conn.SetCommitCnf(vars, username, nogenerate, fix_commit)
244
245     @classmethod
246     def commit_cnf(cls, change_numbers: typing.List[int], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> CommitCnfRet:
247         with cls.new_connection() as conn:
248             return conn.CommitCnf(change_numbers, username, nogenerate, fix_commit)
249
250     @classmethod
251     def showerr_add(cls, msg_id: int, params: typing.List[str]) -> None:
252         with cls.new_connection() as conn:
253             return conn.ShowerrAdd(msg_id, params)
254
255     @classmethod
256     def now_online(cls, provider: int) -> None:
257         with cls.new_connection() as conn:
258             return conn.NowOnline(provider)
259
260     @classmethod
261     def now_offline(cls) -> None:
262         with cls.new_connection() as conn:
263             return conn.NowOffline()
264
265     @classmethod
266     def now_dialing(cls, provider: int) -> None:
267         with cls.new_connection() as conn:
268             return conn.NowDialing(provider)
269
270     @classmethod
271     def dial_taking_ages(cls) -> None:
272         with cls.new_connection() as conn:
273             return conn.DialTakingAges()
274
275     @classmethod
276     def barrier_executed(cls, barrier_nr: int) -> None:
277         with cls.new_connection() as conn:
278             return conn.BarrierExecuted(barrier_nr)