# The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2022 Intra2net AG """ test_model.py: unit tests for cnfvar/model.py. Tests classes and functions in cnfvar/model.py For help see :py:mod:`unittest` """ import unittest import os import shutil from textwrap import dedent from tempfile import NamedTemporaryFile from unittest.mock import patch from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore from src.arnied_api import Arnied, GetCnfRet TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf") class TestStore(unittest.TestCase): """Test querying, commiting and deleting cnfvars using the stores.""" def test_querying_and_updating(self): """Test querying and updating cnf variables.""" # To avoid creating long dummy objects, we can convert our test file # to the arnied API structure using our API itself fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \ .to_api_structure() fake_ret = GetCnfRet(vars=fake_output) store = CnfStore() with patch.object(Arnied, "get_cnf", return_value=fake_ret),\ patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\ patch.object(store, "_wait_for_generate"): self._query_and_update(store) args, kwargs = set_cnf_mock.call_args cnf_api_input = kwargs["vars"] cnf_input = CnfList.from_api_structure(cnf_api_input) expected_input = dedent("""\ 1 NIC,0: "" 2 (1) NIC_COMMENT,0: "b0" 3 (1) NIC_DRIVER,0: "virtio_net" 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1" 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0" 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99" 7 (1) NIC_LAN_IP,0: "192.168.1.1" 8 (1) NIC_LAN_NAT_INTO,0: "0" 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0" 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1" 11 (1) NIC_MAC,0: "02:00:00:00:01:01" 12 (1) NIC_TYPE,0: "NATLAN" 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2" 14 NIC,1: "" 15 (14) NIC_COMMENT,0: "b18" 16 (14) NIC_DRIVER,0: "virtio_net" 17 (14) NIC_MAC,0: "02:00:00:00:01:02" 18 (14) NIC_TYPE,0: "DSLROUTER" """) self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines()) def test_correct_parsing_of_nested_cnf_from_string(self): """Test that grandchildren cnfvars aren't disregarded when no other child cnfvar follows them.""" input = dedent("""\ 1 FIREWALL_NETGROUP,99: "QA host IP" 2 (1) FIREWALL_NETGROUP_NETWORK,0: "" 3 (2) FIREWALL_NETGROUP_NETWORK_IP,0: "192.168.1.254" 4 (2) FIREWALL_NETGROUP_NETWORK_NETMASK,0: "255.255.255.255" """) cnf = CnfList.from_cnf_string(input) self.assertEqual(str(cnf).splitlines(), input.splitlines()) def test_deleting_cnfvars(self): """Test that we can delete top-level cnfvars.""" # To avoid creating long dummy objects, we can convert our test file # to the arnied API structure using our API itself fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \ .to_api_structure() fake_ret = GetCnfRet(vars=fake_output) store = CnfStore() with patch.object(Arnied, "get_cnf", return_value=fake_ret): cnfvars = store.query() \ .where(lambda c: c.name == "THEME") with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\ patch.object(store, "_wait_for_generate"): store.delete(cnfvars) args, kwargs = set_cnf_mock.call_args cnf_api_input = kwargs["vars"] cnf_input = CnfList.from_api_structure(cnf_api_input) expected_input = dedent("""\ 1 THEME,0: "Intra2net System" 2 THEME,1: "Xerberus" """) self.assertEqual([c.deleted for c in cnf_api_input], [True] * len(cnfvars)) self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines()) def _query_and_update(self, store): """ Query the items, update a few variables and commit the changes. :param store: the cnf store to use The changes done were unified in this method so that we test the same thing with different stores. """ nics = store.query() \ .where(lambda c: c.name == "nic") self.assertEqual(len(nics), 4) # test querying by instance and child nic_0 = nics.single_with_instance(0) nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single() # test treating cnfvars as flags self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED")) nic_0.enable_child_flag("nic_lan_dns_relaying_allowed") self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED")) # test adding comments nic_0.children.single_with_name("nic_comment").comment = "my cool nic" nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router" # test adding a reference from another cnfvar proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2) nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance, -1) # testing changing the value nic_1.children.first(lambda c: c.name == "nic_comment") \ .value = "b18" store.commit(CnfList([nic_0, nic_1], renumber=True)) class TestBinaryStore(TestStore): """Test querying, commiting and deleting cnfvars using the binary store.""" def setUp(self): """Set up mocks and replacements for the real cnf binaries.""" with NamedTemporaryFile(delete=False) as tmp: self._get_cnf_output = tmp.name with NamedTemporaryFile(delete=False) as tmp: self._set_cnf_input = tmp.name with NamedTemporaryFile(delete=False, mode="w") as tmp: self._fake_get_cnf = tmp.name os.chmod(tmp.name, 0o775) # make file executable tmp.write(dedent(f"""\ #!/bin/bash cat "{self._get_cnf_output}" """)) tmp.flush() with NamedTemporaryFile(delete=False, mode="w") as tmp: self._fake_set_cnf = tmp.name os.chmod(tmp.name, 0o775) # make file executable tmp.write(dedent(f"""\ #!/bin/bash input=$( "{self._set_cnf_input}" echo "$input" >> "{self._set_cnf_input}" """)) tmp.flush() get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf) self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start()) set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf) self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start()) self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied") self._arnied_mock.start() def tearDown(self) -> None: """Drop mocks and clean up files.""" self._arnied_mock.stop() self._get_cnf_mock[0].stop() self._set_cnf_mock[0].stop() os.unlink(self._get_cnf_output) os.unlink(self._set_cnf_input) os.unlink(self._fake_get_cnf) os.unlink(self._fake_set_cnf) def test_querying_and_updating(self): """Test querying and updating cnf variables.""" # tell our fake get_cnf to return the whole file shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output) store = BinaryCnfStore() self._query_and_update(store) expected_input = dedent("""\ 1 NIC,0: "" 2 (1) NIC_COMMENT,0: "b0" # my cool nic 3 (1) NIC_DRIVER,0: "virtio_net" 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1" 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0" 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99" 7 (1) NIC_LAN_IP,0: "192.168.1.1" 8 (1) NIC_LAN_NAT_INTO,0: "0" 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0" 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1" 11 (1) NIC_MAC,0: "02:00:00:00:01:01" 12 (1) NIC_TYPE,0: "NATLAN" 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2" 14 NIC,1: "" 15 (14) NIC_COMMENT,0: "b18" 16 (14) NIC_DRIVER,0: "virtio_net" 17 (14) NIC_MAC,0: "02:00:00:00:01:02" 18 (14) NIC_TYPE,0: "DSLROUTER" # a dsl router """) with open(self._set_cnf_input, "r") as f: set_cnf_args = f.readline() set_cnf_input = f.read() self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines()) def test_deleting_cnfvars(self): """Test that we can delete top-level cnfvars.""" # tell our fake get_cnf to return the whole file shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output) store = BinaryCnfStore() cnfvars = store.query() \ .where(lambda c: c.name == "THEME") store.delete(cnfvars) with open(self._set_cnf_input, "r") as f: set_cnf_args = f.readline() set_cnf_input = f.read() expected_input = dedent("""\ 1 THEME,0: "Intra2net System" 2 (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif" 3 (1) THEME_CSS_FILENAME,0: "intranator.css" 4 (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf" 5 (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico" 6 (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html" 7 (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif" 8 (1) THEME_LOGO_FILENAME,0: "intranator.gif" 9 (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml" 10 THEME,1: "Xerberus" 11 (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif" 12 (10) THEME_CSS_FILENAME,0: "xerberus.css" 13 (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf" 14 (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico" 15 (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html" 16 (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif" 17 (10) THEME_LOGO_FILENAME,0: "xerberus.gif" 18 (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml" """) self.assertIn("-x", set_cnf_args) self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines()) if __name__ == '__main__': unittest.main()