Revert the default instance number back to zero
[pyi2ncommon] / test / cnfvar / test_store.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 test_model.py: unit tests for cnfvar/model.py.
23
24 Tests classes and functions in cnfvar/model.py
25
26 For help see :py:mod:`unittest`
27 """
28
29 import unittest
30 import os
31 import shutil
32 from textwrap import dedent
33 from tempfile import NamedTemporaryFile
34 from unittest.mock import patch
35
36 from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore
37 from src.arnied_api import Arnied, GetCnfRet
38
39
40 TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf")
41
42
43 class TestStore(unittest.TestCase):
44     """Test querying, commiting and deleting cnfvars using the stores."""
45
46     def test_querying_and_updating(self):
47         """Test querying and updating cnf variables."""
48         # To avoid creating long dummy objects, we can convert our test file
49         # to the arnied API structure using our API itself
50         fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
51                              .to_api_structure()
52         fake_ret = GetCnfRet(vars=fake_output)
53
54         store = CnfStore()
55         with patch.object(Arnied, "get_cnf", return_value=fake_ret),\
56              patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
57              patch.object(store, "_wait_for_generate"):
58             self._query_and_update(store)
59
60         args, kwargs = set_cnf_mock.call_args
61         cnf_api_input = kwargs["vars"]
62         cnf_input = CnfList.from_api_structure(cnf_api_input)
63
64         expected_input = dedent("""\
65             1 NIC,0: ""
66             2   (1) NIC_COMMENT,0: "b0"
67             3   (1) NIC_DRIVER,0: "virtio_net"
68             4   (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
69             5   (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
70             6   (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
71             7   (1) NIC_LAN_IP,0: "192.168.1.1"
72             8   (1) NIC_LAN_NAT_INTO,0: "0"
73             9   (1) NIC_LAN_NETMASK,0: "255.255.255.0"
74             10   (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
75             11   (1) NIC_MAC,0: "02:00:00:00:01:01"
76             12   (1) NIC_TYPE,0: "NATLAN"
77             13   (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
78             14 NIC,1: ""
79             15   (14) NIC_COMMENT,0: "b18"
80             16   (14) NIC_DRIVER,0: "virtio_net"
81             17   (14) NIC_MAC,0: "02:00:00:00:01:02"
82             18   (14) NIC_TYPE,0: "DSLROUTER"
83             """)
84
85         self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
86
87     def test_deleting_cnfvars(self):
88         """Test that we can delete top-level cnfvars."""
89         # To avoid creating long dummy objects, we can convert our test file
90         # to the arnied API structure using our API itself
91         fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
92                              .to_api_structure()
93         fake_ret = GetCnfRet(vars=fake_output)
94
95         store = CnfStore()
96         with patch.object(Arnied, "get_cnf", return_value=fake_ret):
97             cnfvars = store.query() \
98                            .where(lambda c: c.name == "THEME")
99
100         with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
101              patch.object(store, "_wait_for_generate"):
102             store.delete(cnfvars)
103
104         args, kwargs = set_cnf_mock.call_args
105         cnf_api_input = kwargs["vars"]
106         cnf_input = CnfList.from_api_structure(cnf_api_input)
107
108         expected_input = dedent("""\
109             1 THEME,0: "Intra2net System"
110             2 THEME,1: "Xerberus"
111             """)
112
113         self.assertEqual([c.deleted for c in cnf_api_input],
114                          [True] * len(cnfvars))
115         self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
116
117     def _query_and_update(self, store):
118         """
119         Query the items, update a few variables and commit the changes.
120
121         :param store: the cnf store to use
122
123         The changes done were unified in this method so that we test the same
124         thing with different stores.
125         """
126         nics = store.query() \
127                     .where(lambda c: c.name == "nic")
128
129         self.assertEqual(len(nics), 4)
130
131         # test querying by instance and child
132         nic_0 = nics.single_with_instance(0)
133         nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single()
134
135         # test treating cnfvars as flags
136         self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
137         nic_0.enable_child_flag("nic_lan_dns_relaying_allowed")
138         self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
139
140         # test adding comments
141         nic_0.children.single_with_name("nic_comment").comment = "my cool nic"
142         nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router"
143
144         # test adding a reference from another cnfvar
145         proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2)
146         nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance, -1)
147
148         # testing changing the value
149         nic_1.children.first(lambda c: c.name == "nic_comment") \
150                       .value = "b18"
151         store.commit(CnfList([nic_0, nic_1], renumber=True))
152
153
154 class TestBinaryStore(TestStore):
155     """Test querying, commiting and deleting cnfvars using the binary store."""
156
157     def setUp(self):
158         """Set up mocks and replacements for the real cnf binaries."""
159         with NamedTemporaryFile(delete=False) as tmp:
160             self._get_cnf_output = tmp.name
161         with NamedTemporaryFile(delete=False) as tmp:
162             self._set_cnf_input = tmp.name
163
164         with NamedTemporaryFile(delete=False, mode="w") as tmp:
165             self._fake_get_cnf = tmp.name
166             os.chmod(tmp.name, 0o775)  # make file executable
167             tmp.write(dedent(f"""\
168                 #!/bin/bash
169                 cat "{self._get_cnf_output}"
170                 """))
171             tmp.flush()
172
173         with NamedTemporaryFile(delete=False, mode="w") as tmp:
174             self._fake_set_cnf = tmp.name
175             os.chmod(tmp.name, 0o775)  # make file executable
176             tmp.write(dedent(f"""\
177                 #!/bin/bash
178                 input=$(</dev/stdin)
179                 echo $@ > "{self._set_cnf_input}"
180                 echo "$input" >> "{self._set_cnf_input}"
181                 """))
182             tmp.flush()
183
184         get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf)
185         self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start())
186
187         set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf)
188         self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start())
189
190         self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied")
191         self._arnied_mock.start()
192
193     def tearDown(self) -> None:
194         """Drop mocks and clean up files."""
195         self._arnied_mock.stop()
196         self._get_cnf_mock[0].stop()
197         self._set_cnf_mock[0].stop()
198
199         os.unlink(self._get_cnf_output)
200         os.unlink(self._set_cnf_input)
201         os.unlink(self._fake_get_cnf)
202         os.unlink(self._fake_set_cnf)
203
204     def test_querying_and_updating(self):
205         """Test querying and updating cnf variables."""
206         # tell our fake get_cnf to return the whole file
207         shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
208
209         store = BinaryCnfStore()
210
211         self._query_and_update(store)
212
213         expected_input = dedent("""\
214             1 NIC,0: ""
215             2   (1) NIC_COMMENT,0: "b0" # my cool nic
216             3   (1) NIC_DRIVER,0: "virtio_net"
217             4   (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
218             5   (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
219             6   (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
220             7   (1) NIC_LAN_IP,0: "192.168.1.1"
221             8   (1) NIC_LAN_NAT_INTO,0: "0"
222             9   (1) NIC_LAN_NETMASK,0: "255.255.255.0"
223             10   (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
224             11   (1) NIC_MAC,0: "02:00:00:00:01:01"
225             12   (1) NIC_TYPE,0: "NATLAN"
226             13   (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
227             14 NIC,1: ""
228             15   (14) NIC_COMMENT,0: "b18"
229             16   (14) NIC_DRIVER,0: "virtio_net"
230             17   (14) NIC_MAC,0: "02:00:00:00:01:02"
231             18   (14) NIC_TYPE,0: "DSLROUTER" # a dsl router
232             """)
233
234         with open(self._set_cnf_input, "r") as f:
235             set_cnf_args = f.readline()
236             set_cnf_input = f.read()
237
238         self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
239
240     def test_deleting_cnfvars(self):
241         """Test that we can delete top-level cnfvars."""
242         # tell our fake get_cnf to return the whole file
243         shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
244
245         store = BinaryCnfStore()
246         cnfvars = store.query() \
247                        .where(lambda c: c.name == "THEME")
248
249         store.delete(cnfvars)
250
251         with open(self._set_cnf_input, "r") as f:
252             set_cnf_args = f.readline()
253             set_cnf_input = f.read()
254
255         expected_input = dedent("""\
256             1 THEME,0: "Intra2net System"
257             2   (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif"
258             3   (1) THEME_CSS_FILENAME,0: "intranator.css"
259             4   (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf"
260             5   (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico"
261             6   (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html"
262             7   (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif"
263             8   (1) THEME_LOGO_FILENAME,0: "intranator.gif"
264             9   (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml"
265             10 THEME,1: "Xerberus"
266             11   (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif"
267             12   (10) THEME_CSS_FILENAME,0: "xerberus.css"
268             13   (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf"
269             14   (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico"
270             15   (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html"
271             16   (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif"
272             17   (10) THEME_LOGO_FILENAME,0: "xerberus.gif"
273             18   (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml"
274             """)
275
276         self.assertIn("-x", set_cnf_args)
277         self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
278
279
280 if __name__ == '__main__':
281     unittest.main()