# The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2022 Intra2net AG """ Wrappers around the arnied varlink API. Featuring: - Arnied: stateless class with methods as exposed in the varlink API. For documentation of Exceptions, methods and their arguments, refer to arnied source code (arnied/client/arnieclient.hxx). For compatibility, argument names are the same here as they are there (defeating python naming rules). See package `cnfvar` for a higher-level interface to this functionality. .. codeauthor:: Intra2net """ from contextlib import contextmanager from dataclasses import dataclass from types import SimpleNamespace from enum import Enum, auto import typing import json import sys #: socket where the varlink interface is exposed ARNIED_SOCKET = "/var/intranator/arnied-varlink.sock" # Interface types @dataclass class CnfVar: name: str instance: int data: str deleted: bool children: typing.List['CnfVar'] @dataclass class ChangeCnfVar: chg_nr: int name: str instance: int data: str result_type: int result_msg: str @dataclass class GetCnfQuery: name: str instance: typing.Optional[int] = None class ProgramStatus(Enum): Running = auto() Scheduled = auto() NotScheduled = auto() # Method return types (not present in the interface) class IsQueueActiveRet(SimpleNamespace): active: bool class IsScheduledOrRunningRet(SimpleNamespace): status: ProgramStatus timestamp: typing.Optional[int] class GetCnfRet(SimpleNamespace): vars: typing.List[CnfVar] class SetCnfRet(SimpleNamespace): change_numbers: typing.List[int] class SetCommitCnf(SimpleNamespace): results: typing.List[ChangeCnfVar] class CommitCnfRet(SimpleNamespace): results: typing.List[ChangeCnfVar] # Exceptions class InternalBridgeError(Exception): def __init__(self): super().__init__("An error occurred (InternalBridgeError)") class EmptyInputError(Exception): def __init__(self): super().__init__("An error occurred (EmptyInputError)") class BadCharError(Exception): def __init__(self, results: str) -> None: super().__init__(f"[BadCharError] Error in the arnied API (results={results})") self.results = results class ChildError(Exception): def __init__(self, results: str) -> None: super().__init__(f"[ChildError] Error in the arnied API (results={results})") self.results = results class CnfCommitError(Exception): def __init__(self, results: typing.List[ChangeCnfVar]) -> None: self.results = [] msgs = [] for r in results: # the type does not seem to be converted correctly if isinstance(r, dict): r = ChangeCnfVar(**r) self.results.append(r) msgs.append(f"{r.name},{r.instance}: \"{r.data}\": {r.result_msg}") super().__init__("Error committing cnfvars:\n" + "\n".join(msgs)) class NotFoundError(Exception): def __init__(self, results: str) -> None: super().__init__(f"[NotFoundError] Error in the arnied API (results={results})") self.results = results class SchedulerProgError(Exception): def __init__(self): super().__init__("An error occurred (SchedulerProgError)") class ShowerrVarDataErr(Exception): def __init__(self, msg_id: int) -> None: super().__init__(f"[ShowerrVarDataErr] Error in the arnied API (msg_id={msg_id})") self.msg_id = msg_id class ShowerrVarMissing(Exception): def __init__(self, params_count: int) -> None: super().__init__( f"[ShowerrVarMissing] Error in the arnied API (params_count={params_count})") self.params_count = params_count class ProviderNotFound(Exception): def __init__(self, provider_id: int) -> None: super().__init__( f"[ProviderNotFound] Could not find provider (provider_id={provider_id})") self.provider_id = provider_id # Arnied varlink proxy class Arnied: """ Expose methods of the arnied varlink interface in python. As described in module doc, documentation of exceptions, methods, and their arguments can be found in arnied source code. """ VARLINK_ADDRESS = f"unix:{ARNIED_SOCKET}" @classmethod def _send_msg_wrapper(cls, send_msg): """ Wrap the _send_message method of the client to always send parameters. HACK: this is as hackish as it comes, but at the moment we do not have another workaround. The underlying varlink package will only send a dictionary with a "parameters" key when we pass parameters, however methods with nullable arguments require this key, in which case the API errors out. One example is the `GetCnf` method: if we want to get *all* cnfvars we should not provide it with any args, but since the varlink module will not pass a dictionary with a "parameters" key, it will fail. This has been reported upstream: https://github.com/varlink/python/issues/10#issuecomment-1067232162 """ def _send_msg(msg): data = json.loads(msg.decode("utf8")) if not data.get("method", "").startswith("io.arnied"): return send_msg(msg) if data.get("parameters", {}): return send_msg(msg) data["parameters"] = {} return send_msg(json.dumps(data).encode("utf8")) return _send_msg @classmethod @contextmanager def new_connection(cls): # lazy import to reduce the scope of this dependency import varlink client = varlink.Client.new_with_address(cls.VARLINK_ADDRESS) conn = client.open("io.arnied", namespaced=True) try: sm = conn._send_message setattr(conn, "_send_message", Arnied._send_msg_wrapper(sm)) yield conn except varlink.VarlinkError as ex: # not an exception from this module if not ex.error().startswith("io.arnied"): raise # raise the corresponding exception error_class_name = ex.error().split(".")[-1] self_mod = sys.modules[__name__] exception_class = self_mod.__dict__[error_class_name] raise exception_class(**ex.parameters()) finally: conn.close() client.cleanup() @classmethod def is_queue_active(cls) -> IsQueueActiveRet: with cls.new_connection() as conn: return conn.IsQueueActive() @classmethod def is_scheduled_or_running(cls, prog: str) -> IsScheduledOrRunningRet: with cls.new_connection() as conn: return conn.IsScheduledOrRunning(prog) @classmethod def noop(cls) -> None: with cls.new_connection() as conn: return conn.NoOp() @classmethod def no_timeout(cls) -> None: with cls.new_connection() as conn: return conn.NoTimeout() @classmethod def get_cnf(cls, query: typing.Optional[GetCnfQuery]) -> GetCnfRet: with cls.new_connection() as conn: return conn.GetCnf(query) @classmethod def set_cnf(cls, vars: typing.List[CnfVar]) -> SetCnfRet: with cls.new_connection() as conn: return conn.SetCnf(vars) @classmethod def set_commit_cnf(cls, vars: typing.List[CnfVar], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> SetCommitCnf: with cls.new_connection() as conn: return conn.SetCommitCnf(vars, username, nogenerate, fix_commit) @classmethod def commit_cnf(cls, change_numbers: typing.List[int], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> CommitCnfRet: with cls.new_connection() as conn: return conn.CommitCnf(change_numbers, username, nogenerate, fix_commit) @classmethod def showerr_add(cls, msg_id: int, params: typing.List[str]) -> None: with cls.new_connection() as conn: return conn.ShowerrAdd(msg_id, params) @classmethod def now_online(cls, provider: int) -> None: with cls.new_connection() as conn: return conn.NowOnline(provider) @classmethod def now_offline(cls) -> None: with cls.new_connection() as conn: return conn.NowOffline() @classmethod def now_dialing(cls, provider: int) -> None: with cls.new_connection() as conn: return conn.NowDialing(provider) @classmethod def dial_taking_ages(cls) -> None: with cls.new_connection() as conn: return conn.DialTakingAges() @classmethod def barrier_executed(cls, barrier_nr: int) -> None: with cls.new_connection() as conn: return conn.BarrierExecuted(barrier_nr)