Add a Python version of arnied varlink interface
authorSamir Aguiar <samir.aguiar@intra2net.com>
Mon, 14 Feb 2022 18:51:27 +0000 (15:51 -0300)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Mon, 4 Apr 2022 12:24:59 +0000 (14:24 +0200)
Also add as many typings as possible.

Can be easily used from within an IBS VM that has the varlink module
installed:

```
    from pyi2ncommon.arnied_api import Arnied, GetCnfQuery
    cnfs = Arnied.get_cnf(GetCnfQuery("USER"))
    user = cnfs.vars[0]

    assert user.name == "USER"
    assert user.instance == 1
    assert user.data == "admin"
```

src/arnied_api.py [new file with mode: 0644]
test/test_arnied_api.py [new file with mode: 0644]

diff --git a/src/arnied_api.py b/src/arnied_api.py
new file mode 100644 (file)
index 0000000..5d8b868
--- /dev/null
@@ -0,0 +1,278 @@
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
+
+"""
+arnied_api: wrappers around the arnied varlink API.
+
+Featuring
+- Arnied: stateless class with methods as exposed in the varlink API.
+
+.. codeauthor:: Intra2net
+"""
+
+from contextlib import contextmanager
+from dataclasses import dataclass
+from types import SimpleNamespace
+from enum import Enum, auto
+import typing
+import json
+import sys
+
+
+#: socket where the varlink interface is exposed
+ARNIED_SOCKET = "/var/intranator/arnied-varlink.sock"
+
+
+# Interface types
+
+
+@dataclass
+class CnfVar(object):
+    name: str
+    instance: int
+    data: str
+    deleted: bool
+    children: typing.List['CnfVar']
+
+@dataclass
+class ChangeCnfVar(object):
+    chg_nr: int
+    name: str
+    instance: int
+    data: str
+    result_type: int
+    result_msg: str
+
+@dataclass
+class GetCnfQuery(object):
+    name: str
+    instance: typing.Optional[int] = None
+
+class ProgramStatus(Enum):
+    Running = auto()
+    Scheduled = auto()
+    NotScheduled = auto()
+
+
+# Method return types (not present in the interface)
+
+class IsQueueActiveRet(SimpleNamespace):
+    active: bool
+
+class IsScheduledOrRunningRet(SimpleNamespace):
+    status: ProgramStatus
+    timestamp: typing.Optional[int]
+
+class GetCnfRet(SimpleNamespace):
+    vars: typing.List[CnfVar]
+
+class SetCnfRet(SimpleNamespace):
+    change_numbers: typing.List[int]
+
+class SetCommitCnf(SimpleNamespace):
+    results: typing.List[ChangeCnfVar]
+
+class CommitCnfRet(SimpleNamespace):
+    results: typing.List[ChangeCnfVar]
+
+
+# Exceptions
+
+
+class InternalBridgeError(Exception):
+    def __init__(self):
+        super().__init__("An error occurred (InternalBridgeError)")
+
+class EmptyInputError(Exception):
+    def __init__(self):
+        super().__init__("An error occurred (EmptyInputError)")
+
+class BadCharError(Exception):
+    def __init__(self, results: str) -> None:
+        super().__init__(f"[BadCharError] Error in the arnied API (results={results})")
+        self.results = results
+
+class ChildError(Exception):
+    def __init__(self, results: str) -> None:
+        super().__init__(f"[ChildError] Error in the arnied API (results={results})")
+        self.results = results
+
+class CnfCommitError(Exception):
+    def __init__(self, results: typing.List[ChangeCnfVar]) -> None:
+        self.results = []
+        msgs = []
+        for r in results:
+            # the type does not seem to be converted correctly
+            if isinstance(r, dict):
+                r = ChangeCnfVar(**r)
+            self.results.append(r)
+            msgs.append(f"{r.name},{r.instance}: \"{r.data}\": {r.result_msg}")
+        super().__init__("Error commiting cnfvars:\n" + "\n".join(msgs))
+
+class NotFoundError(Exception):
+    def __init__(self, results: str) -> None:
+        super().__init__(f"[NotFoundError] Error in the arnied API (results={results})")
+        self.results = results
+
+class SchedulerProgError(Exception):
+    def __init__(self):
+        super().__init__("An error occurred (SchedulerProgError)")
+
+class ShowerrVarDataErr(Exception):
+    def __init__(self, msg_id: int) -> None:
+        super().__init__(f"[ShowerrVarDataErr] Error in the arnied API (msg_id={msg_id})")
+        self.msg_id = msg_id
+
+class ShowerrVarMissing(Exception):
+    def __init__(self, params_count: int) -> None:
+        super().__init__(f"[ShowerrVarMissing] Error in the arnied API (params_count={params_count})")
+        self.params_count = params_count
+
+class ProviderNotFound(Exception):
+    def __init__(self, provider_id: int) -> None:
+        super().__init__(f"[ProviderNotFound] Not found a provider with ID `{provider_id})`")
+        self.provider_id = provider_id
+
+
+# Arnied varlink proxy
+
+
+class Arnied(object):
+    VARLINK_ADDRESS = f"unix:{ARNIED_SOCKET}"
+
+    @classmethod
+    def _send_msg_wrapper(cls, send_msg):
+        """
+        Wrap the _send_message method of the client to always send parameters.
+
+        HACK: this is as hackish as it comes, but at the moment we do not
+        have another workaround. The underlying varlink package will only
+        send a dictionary with a "parameters" key when we pass parameters,
+        however methods with nullable arguments require this key, in which
+        case the API errors out. One example is the `GetCnf` method: if
+        we want to get *all* cnfvars we should not provide it with any args,
+        but since the varlink module will not pass a dictionary with a
+        "parameters" key, it will fail. This has been reported upstream:
+        https://github.com/varlink/python/issues/10#issuecomment-1067232162
+        """
+        def _send_msg(msg):
+            data = json.loads(msg.decode("utf8"))
+            if not data.get("method", "").startswith("io.arnied"):
+                return send_msg(msg)
+            if data.get("parameters", {}):
+                return send_msg(msg)
+            data["parameters"] = {}
+            return send_msg(json.dumps(data).encode("utf8"))
+        return _send_msg
+
+    @classmethod
+    @contextmanager
+    def new_connection(cls):
+        # lazy import to reduce the scope of this dependency
+        import varlink
+        client = varlink.Client.new_with_address(cls.VARLINK_ADDRESS)
+        conn = client.open("io.arnied", namespaced=True)
+        try:
+            sm = conn._send_message
+            setattr(conn, "_send_message", Arnied._send_msg_wrapper(sm))
+            yield conn
+        except varlink.VarlinkError as ex:
+            # not an exception from this module
+            if not ex.error().startswith("io.arnied"):
+                raise
+
+            # raise the corresponding exception
+            error_class_name = ex.error().split(".")[-1]
+            self_mod = sys.modules[__name__]
+            exception_class = self_mod.__dict__[error_class_name]
+            raise exception_class(**ex.parameters())
+        finally:
+            conn.close()
+            client.cleanup()
+
+    @classmethod
+    def is_queue_active(cls) -> IsQueueActiveRet:
+        with cls.new_connection() as conn:
+            return conn.IsQueueActive()
+
+    @classmethod
+    def is_scheduled_or_running(cls, prog: str) -> IsScheduledOrRunningRet:
+        with cls.new_connection() as conn:
+            return conn.IsScheduledOrRunning(prog)
+
+    @classmethod
+    def noop(cls) -> None:
+        with cls.new_connection() as conn:
+            return conn.NoOp()
+
+    @classmethod
+    def no_timeout(cls) -> None:
+        with cls.new_connection() as conn:
+            return conn.NoTimeout()
+
+    @classmethod
+    def get_cnf(cls, query: typing.Optional[GetCnfQuery]) -> GetCnfRet:
+        with cls.new_connection() as conn:
+            return conn.GetCnf(query)
+
+    @classmethod
+    def set_cnf(cls, vars: typing.List[CnfVar]) -> SetCnfRet:
+        with cls.new_connection() as conn:
+            return conn.SetCnf(vars)
+
+    @classmethod
+    def set_commit_cnf(cls, vars: typing.List[CnfVar], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> SetCommitCnf:
+        with cls.new_connection() as conn:
+            return conn.SetCommitCnf(vars, username, nogenerate, fix_commit)
+
+    @classmethod
+    def commit_cnf(cls, change_numbers: typing.List[int], username: typing.Optional[str], nogenerate: bool, fix_commit: bool) -> CommitCnfRet:
+        with cls.new_connection() as conn:
+            return conn.CommitCnf(change_numbers, username, nogenerate, fix_commit)
+
+    @classmethod
+    def showerr_add(cls, msg_id: int, params: typing.List[str]) -> None:
+        with cls.new_connection() as conn:
+            return conn.ShowerrAdd(msg_id, params)
+
+    @classmethod
+    def now_online(cls, provider: int) -> None:
+        with cls.new_connection() as conn:
+            return conn.NowOnline(provider)
+
+    @classmethod
+    def now_offline(cls) -> None:
+        with cls.new_connection() as conn:
+            return conn.NowOffline()
+
+    @classmethod
+    def now_dialing(cls, provider: int) -> None:
+        with cls.new_connection() as conn:
+            return conn.NowDialing(provider)
+
+    @classmethod
+    def dial_taking_ages(cls) -> None:
+        with cls.new_connection() as conn:
+            return conn.DialTakingAges()
+
+    @classmethod
+    def barrier_executed(cls, barrier_nr: int) -> None:
+        with cls.new_connection() as conn:
+            return conn.BarrierExecuted(barrier_nr)
diff --git a/test/test_arnied_api.py b/test/test_arnied_api.py
new file mode 100644 (file)
index 0000000..4731472
--- /dev/null
@@ -0,0 +1,66 @@
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
+
+""" test_arnied_api.py: unit tests for arnied_api.py
+
+Tests classes and functions in arnied_api.py
+
+For help see :py:mod:`unittest`
+"""
+
+import sys
+import unittest
+from unittest.mock import patch, Mock, ANY
+
+from src.arnied_api import Arnied, CnfVar, GetCnfQuery, GetCnfRet
+
+
+class ArniedTest(unittest.TestCase):
+    """Test class Arnied."""
+
+    def setUp(self):
+        """Set up the mocks."""
+        self._conn_mock = Mock()
+
+        client_mock = Mock()
+        client_mock.open = Mock(return_value=self._conn_mock)
+
+        mod_mock = Mock()
+        mod_mock.Client.new_with_address = Mock(return_value=client_mock)
+
+        sys.modules["varlink"] = mod_mock
+
+    def tearDown(self):
+        """Clean up mocks."""
+        del sys.modules["varlink"]
+
+    def test_simple_get_cnf_call(self):
+        """Test that the get cnf method does some sanity checks."""
+        query = GetCnfQuery("USER", 5)
+        cnfvar = CnfVar("USER", 5, "joe", deleted=False, children=[])
+
+        self._conn_mock.GetCnf = Mock(return_value=GetCnfRet(vars=[cnfvar]))
+        retval = Arnied.get_cnf(query)
+
+        self.assertEqual(retval.vars[0], cnfvar)
+
+
+if __name__ == '__main__':
+    unittest.main()