From: Samir Aguiar Date: Mon, 14 Feb 2022 18:51:27 +0000 (-0300) Subject: Add a Python version of arnied varlink interface X-Git-Tag: v1.7.0~2^2~3 X-Git-Url: http://developer.intra2net.com/git/?p=pyi2ncommon;a=commitdiff_plain;h=4f4604814b8988ca2bc75d6baf29b75d6b7319c5 Add a Python version of arnied varlink interface Also add as many typings as possible. Can be easily used from within an IBS VM that has the varlink module installed: ``` from pyi2ncommon.arnied_api import Arnied, GetCnfQuery cnfs = Arnied.get_cnf(GetCnfQuery("USER")) user = cnfs.vars[0] assert user.name == "USER" assert user.instance == 1 assert user.data == "admin" ``` --- diff --git a/src/arnied_api.py b/src/arnied_api.py new file mode 100644 index 0000000..5d8b868 --- /dev/null +++ b/src/arnied_api.py @@ -0,0 +1,278 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. +# +# Copyright (c) 2016-2022 Intra2net AG + +""" +arnied_api: wrappers around the arnied varlink API. + +Featuring +- Arnied: stateless class with methods as exposed in the varlink API. + +.. codeauthor:: Intra2net +""" + +from contextlib import contextmanager +from dataclasses import dataclass +from types import SimpleNamespace +from enum import Enum, auto +import typing +import json +import sys + + +#: socket where the varlink interface is exposed +ARNIED_SOCKET = "/var/intranator/arnied-varlink.sock" + + +# Interface types + + +@dataclass +class CnfVar(object): + name: str + instance: int + data: str + deleted: bool + children: typing.List['CnfVar'] + +@dataclass +class ChangeCnfVar(object): + chg_nr: int + name: str + instance: int + data: str + result_type: int + result_msg: str + +@dataclass +class GetCnfQuery(object): + name: str + instance: typing.Optional[int] = None + +class ProgramStatus(Enum): + Running = auto() + Scheduled = auto() + NotScheduled = auto() + + +# Method return types (not present in the interface) + +class IsQueueActiveRet(SimpleNamespace): + active: bool + +class IsScheduledOrRunningRet(SimpleNamespace): + status: ProgramStatus + timestamp: typing.Optional[int] + +class GetCnfRet(SimpleNamespace): + vars: typing.List[CnfVar] + +class SetCnfRet(SimpleNamespace): + change_numbers: typing.List[int] + +class SetCommitCnf(SimpleNamespace): + results: typing.List[ChangeCnfVar] + +class CommitCnfRet(SimpleNamespace): + results: typing.List[ChangeCnfVar] + + +# Exceptions + + +class InternalBridgeError(Exception): + def __init__(self): + super().__init__("An error occurred (InternalBridgeError)") + +class EmptyInputError(Exception): + def __init__(self): + super().__init__("An error occurred (EmptyInputError)") + +class BadCharError(Exception): + def __init__(self, results: str) -> None: + super().__init__(f"[BadCharError] Error in the arnied API (results={results})") + self.results = results + +class ChildError(Exception): + def __init__(self, results: str) -> None: + super().__init__(f"[ChildError] Error in the arnied API (results={results})") + self.results = results + +class CnfCommitError(Exception): + def __init__(self, results: typing.List[ChangeCnfVar]) -> None: + self.results = [] + msgs = [] + for r in results: + # the type does not seem to be converted correctly + if isinstance(r, dict): + r = ChangeCnfVar(**r) + self.results.append(r) + msgs.append(f"{r.name},{r.instance}: \"{r.data}\": {r.result_msg}") + super().__init__("Error commiting cnfvars:\n" + "\n".join(msgs)) + +class NotFoundError(Exception): + def __init__(self, results: str) -> None: + super().__init__(f"[NotFoundError] Error in the arnied API (results={results})") + self.results = results + +class SchedulerProgError(Exception): + def __init__(self): + super().__init__("An error occurred (SchedulerProgError)") + +class ShowerrVarDataErr(Exception): + def __init__(self, msg_id: int) -> None: + super().__init__(f"[ShowerrVarDataErr] Error in the arnied API (msg_id={msg_id})") + self.msg_id = msg_id + +class ShowerrVarMissing(Exception): + def __init__(self, params_count: int) -> None: + super().__init__(f"[ShowerrVarMissing] Error in the arnied API (params_count={params_count})") + self.params_count = params_count + +class ProviderNotFound(Exception): + def __init__(self, provider_id: int) -> None: + super().__init__(f"[ProviderNotFound] Not found a provider with ID `{provider_id})`") + self.provider_id = provider_id + + +# Arnied varlink proxy + + +class Arnied(object): + VARLINK_ADDRESS = f"unix:{ARNIED_SOCKET}" + + @classmethod + def _send_msg_wrapper(cls, send_msg): + """ + Wrap the _send_message method of the client to always send parameters. + + HACK: this is as hackish as it comes, but at the moment we do not + have another workaround. The underlying varlink package will only + send a dictionary with a "parameters" key when we pass parameters, + however methods with nullable arguments require this key, in which + case the API errors out. One example is the `GetCnf` method: if + we want to get *all* cnfvars we should not provide it with any args, + but since the varlink module will not pass a dictionary with a + "parameters" key, it will fail. This has been reported upstream: + https://github.com/varlink/python/issues/10#issuecomment-1067232162 + """ + def _send_msg(msg): + data = json.loads(msg.decode("utf8")) + if not data.get("method", "").startswith("io.arnied"): + return send_msg(msg) + if data.get("parameters", {}): + return send_msg(msg) + data["parameters"] = {} + return send_msg(json.dumps(data).encode("utf8")) + return _send_msg + + @classmethod + @contextmanager + def new_connection(cls): + # lazy import to reduce the scope of this dependency + import varlink + client = varlink.Client.new_with_address(cls.VARLINK_ADDRESS) + conn = client.open("io.arnied", namespaced=True) + try: + sm = conn._send_message + setattr(conn, "_send_message", Arnied._send_msg_wrapper(sm)) + yield conn + except varlink.VarlinkError as ex: + # not an exception from this module + if not ex.error().startswith("io.arnied"): + raise + + # raise the corresponding exception + error_class_name = ex.error().split(".")[-1] + self_mod = sys.modules[__name__] + exception_class = self_mod.__dict__[error_class_name] + raise exception_class(**ex.parameters()) + finally: + conn.close() + client.cleanup() + + @classmethod + def is_queue_active(cls) -> IsQueueActiveRet: + with cls.new_connection() as conn: + return conn.IsQueueActive() + + @classmethod + def is_scheduled_or_running(cls, prog: str) -> IsScheduledOrRunningRet: + with cls.new_connection() as conn: + return conn.IsScheduledOrRunning(prog) + + @classmethod + def noop(cls) -> None: + with cls.new_connection() as conn: + return conn.NoOp() + + @classmethod + def no_timeout(cls) -> None: + with cls.new_connection() as conn: + return conn.NoTimeout() + + @classmethod + def get_cnf(cls, query: typing.Optional[GetCnfQuery]) -> GetCnfRet: + with cls.new_connection() as conn: + return conn.GetCnf(query) + + @classmethod + def set_cnf(cls, vars: typing.List[CnfVar]) -> SetCnfRet: + with cls.new_connection() as conn: + return conn.SetCnf(vars) + + @classmethod + def set_commit_cnf(cls, vars: typing.List[CnfVar], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> SetCommitCnf: + with cls.new_connection() as conn: + return conn.SetCommitCnf(vars, username, nogenerate, fix_commit) + + @classmethod + def commit_cnf(cls, change_numbers: typing.List[int], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> CommitCnfRet: + with cls.new_connection() as conn: + return conn.CommitCnf(change_numbers, username, nogenerate, fix_commit) + + @classmethod + def showerr_add(cls, msg_id: int, params: typing.List[str]) -> None: + with cls.new_connection() as conn: + return conn.ShowerrAdd(msg_id, params) + + @classmethod + def now_online(cls, provider: int) -> None: + with cls.new_connection() as conn: + return conn.NowOnline(provider) + + @classmethod + def now_offline(cls) -> None: + with cls.new_connection() as conn: + return conn.NowOffline() + + @classmethod + def now_dialing(cls, provider: int) -> None: + with cls.new_connection() as conn: + return conn.NowDialing(provider) + + @classmethod + def dial_taking_ages(cls) -> None: + with cls.new_connection() as conn: + return conn.DialTakingAges() + + @classmethod + def barrier_executed(cls, barrier_nr: int) -> None: + with cls.new_connection() as conn: + return conn.BarrierExecuted(barrier_nr) diff --git a/test/test_arnied_api.py b/test/test_arnied_api.py new file mode 100644 index 0000000..4731472 --- /dev/null +++ b/test/test_arnied_api.py @@ -0,0 +1,66 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. +# +# Copyright (c) 2016-2022 Intra2net AG + +""" test_arnied_api.py: unit tests for arnied_api.py + +Tests classes and functions in arnied_api.py + +For help see :py:mod:`unittest` +""" + +import sys +import unittest +from unittest.mock import patch, Mock, ANY + +from src.arnied_api import Arnied, CnfVar, GetCnfQuery, GetCnfRet + + +class ArniedTest(unittest.TestCase): + """Test class Arnied.""" + + def setUp(self): + """Set up the mocks.""" + self._conn_mock = Mock() + + client_mock = Mock() + client_mock.open = Mock(return_value=self._conn_mock) + + mod_mock = Mock() + mod_mock.Client.new_with_address = Mock(return_value=client_mock) + + sys.modules["varlink"] = mod_mock + + def tearDown(self): + """Clean up mocks.""" + del sys.modules["varlink"] + + def test_simple_get_cnf_call(self): + """Test that the get cnf method does some sanity checks.""" + query = GetCnfQuery("USER", 5) + cnfvar = CnfVar("USER", 5, "joe", deleted=False, children=[]) + + self._conn_mock.GetCnf = Mock(return_value=GetCnfRet(vars=[cnfvar])) + retval = Arnied.get_cnf(query) + + self.assertEqual(retval.vars[0], cnfvar) + + +if __name__ == '__main__': + unittest.main()