1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
22 arnied_api: wrappers around the arnied varlink API.
25 - Arnied: stateless class with methods as exposed in the varlink API.
27 .. codeauthor:: Intra2net
30 from contextlib import contextmanager
31 from dataclasses import dataclass
32 from types import SimpleNamespace
33 from enum import Enum, auto
39 #: socket where the varlink interface is exposed
40 ARNIED_SOCKET = "/var/intranator/arnied-varlink.sock"
52 children: typing.List['CnfVar']
55 class ChangeCnfVar(object):
64 class GetCnfQuery(object):
66 instance: typing.Optional[int] = None
68 class ProgramStatus(Enum):
74 # Method return types (not present in the interface)
76 class IsQueueActiveRet(SimpleNamespace):
79 class IsScheduledOrRunningRet(SimpleNamespace):
81 timestamp: typing.Optional[int]
83 class GetCnfRet(SimpleNamespace):
84 vars: typing.List[CnfVar]
86 class SetCnfRet(SimpleNamespace):
87 change_numbers: typing.List[int]
89 class SetCommitCnf(SimpleNamespace):
90 results: typing.List[ChangeCnfVar]
92 class CommitCnfRet(SimpleNamespace):
93 results: typing.List[ChangeCnfVar]
99 class InternalBridgeError(Exception):
101 super().__init__("An error occurred (InternalBridgeError)")
103 class EmptyInputError(Exception):
105 super().__init__("An error occurred (EmptyInputError)")
107 class BadCharError(Exception):
108 def __init__(self, results: str) -> None:
109 super().__init__(f"[BadCharError] Error in the arnied API (results={results})")
110 self.results = results
112 class ChildError(Exception):
113 def __init__(self, results: str) -> None:
114 super().__init__(f"[ChildError] Error in the arnied API (results={results})")
115 self.results = results
117 class CnfCommitError(Exception):
118 def __init__(self, results: typing.List[ChangeCnfVar]) -> None:
122 # the type does not seem to be converted correctly
123 if isinstance(r, dict):
124 r = ChangeCnfVar(**r)
125 self.results.append(r)
126 msgs.append(f"{r.name},{r.instance}: \"{r.data}\": {r.result_msg}")
127 super().__init__("Error commiting cnfvars:\n" + "\n".join(msgs))
129 class NotFoundError(Exception):
130 def __init__(self, results: str) -> None:
131 super().__init__(f"[NotFoundError] Error in the arnied API (results={results})")
132 self.results = results
134 class SchedulerProgError(Exception):
136 super().__init__("An error occurred (SchedulerProgError)")
138 class ShowerrVarDataErr(Exception):
139 def __init__(self, msg_id: int) -> None:
140 super().__init__(f"[ShowerrVarDataErr] Error in the arnied API (msg_id={msg_id})")
143 class ShowerrVarMissing(Exception):
144 def __init__(self, params_count: int) -> None:
145 super().__init__(f"[ShowerrVarMissing] Error in the arnied API (params_count={params_count})")
146 self.params_count = params_count
148 class ProviderNotFound(Exception):
149 def __init__(self, provider_id: int) -> None:
150 super().__init__(f"[ProviderNotFound] Not found a provider with ID `{provider_id})`")
151 self.provider_id = provider_id
154 # Arnied varlink proxy
157 class Arnied(object):
158 VARLINK_ADDRESS = f"unix:{ARNIED_SOCKET}"
161 def _send_msg_wrapper(cls, send_msg):
163 Wrap the _send_message method of the client to always send parameters.
165 HACK: this is as hackish as it comes, but at the moment we do not
166 have another workaround. The underlying varlink package will only
167 send a dictionary with a "parameters" key when we pass parameters,
168 however methods with nullable arguments require this key, in which
169 case the API errors out. One example is the `GetCnf` method: if
170 we want to get *all* cnfvars we should not provide it with any args,
171 but since the varlink module will not pass a dictionary with a
172 "parameters" key, it will fail. This has been reported upstream:
173 https://github.com/varlink/python/issues/10#issuecomment-1067232162
176 data = json.loads(msg.decode("utf8"))
177 if not data.get("method", "").startswith("io.arnied"):
179 if data.get("parameters", {}):
181 data["parameters"] = {}
182 return send_msg(json.dumps(data).encode("utf8"))
187 def new_connection(cls):
188 # lazy import to reduce the scope of this dependency
190 client = varlink.Client.new_with_address(cls.VARLINK_ADDRESS)
191 conn = client.open("io.arnied", namespaced=True)
193 sm = conn._send_message
194 setattr(conn, "_send_message", Arnied._send_msg_wrapper(sm))
196 except varlink.VarlinkError as ex:
197 # not an exception from this module
198 if not ex.error().startswith("io.arnied"):
201 # raise the corresponding exception
202 error_class_name = ex.error().split(".")[-1]
203 self_mod = sys.modules[__name__]
204 exception_class = self_mod.__dict__[error_class_name]
205 raise exception_class(**ex.parameters())
211 def is_queue_active(cls) -> IsQueueActiveRet:
212 with cls.new_connection() as conn:
213 return conn.IsQueueActive()
216 def is_scheduled_or_running(cls, prog: str) -> IsScheduledOrRunningRet:
217 with cls.new_connection() as conn:
218 return conn.IsScheduledOrRunning(prog)
221 def noop(cls) -> None:
222 with cls.new_connection() as conn:
226 def no_timeout(cls) -> None:
227 with cls.new_connection() as conn:
228 return conn.NoTimeout()
231 def get_cnf(cls, query: typing.Optional[GetCnfQuery]) -> GetCnfRet:
232 with cls.new_connection() as conn:
233 return conn.GetCnf(query)
236 def set_cnf(cls, vars: typing.List[CnfVar]) -> SetCnfRet:
237 with cls.new_connection() as conn:
238 return conn.SetCnf(vars)
241 def set_commit_cnf(cls, vars: typing.List[CnfVar], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> SetCommitCnf:
242 with cls.new_connection() as conn:
243 return conn.SetCommitCnf(vars, username, nogenerate, fix_commit)
246 def commit_cnf(cls, change_numbers: typing.List[int], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> CommitCnfRet:
247 with cls.new_connection() as conn:
248 return conn.CommitCnf(change_numbers, username, nogenerate, fix_commit)
251 def showerr_add(cls, msg_id: int, params: typing.List[str]) -> None:
252 with cls.new_connection() as conn:
253 return conn.ShowerrAdd(msg_id, params)
256 def now_online(cls, provider: int) -> None:
257 with cls.new_connection() as conn:
258 return conn.NowOnline(provider)
261 def now_offline(cls) -> None:
262 with cls.new_connection() as conn:
263 return conn.NowOffline()
266 def now_dialing(cls, provider: int) -> None:
267 with cls.new_connection() as conn:
268 return conn.NowDialing(provider)
271 def dial_taking_ages(cls) -> None:
272 with cls.new_connection() as conn:
273 return conn.DialTakingAges()
276 def barrier_executed(cls, barrier_nr: int) -> None:
277 with cls.new_connection() as conn:
278 return conn.BarrierExecuted(barrier_nr)