0da57a8258b597b5a24667900bfc771243116f8a
[pyi2ncommon] / test / cnfvar / test_store.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 test_model.py: unit tests for cnfvar/model.py.
23
24 Tests classes and functions in cnfvar/model.py
25
26 For help see :py:mod:`unittest`
27 """
28
29 import unittest
30 import os
31 import shutil
32 from textwrap import dedent
33 from tempfile import NamedTemporaryFile
34 from unittest.mock import patch
35
36 from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore
37 from src.arnied_api import Arnied, GetCnfRet
38
39
40 TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf")
41
42
43 class TestStore(unittest.TestCase):
44     """Test querying, commiting and deleting cnfvars using the stores."""
45
46     def test_querying_and_updating(self):
47         """Test querying and updating cnf variables."""
48         # To avoid creating long dummy objects, we can convert our test file
49         # to the arnied API structure using our API itself
50         fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
51                              .to_api_structure()
52         fake_ret = GetCnfRet(vars=fake_output)
53
54         store = CnfStore()
55         with patch.object(Arnied, "get_cnf", return_value=fake_ret),\
56              patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
57              patch.object(store, "_wait_for_generate"):
58             self._query_and_update(store)
59
60         args, kwargs = set_cnf_mock.call_args
61         cnf_api_input = kwargs["vars"]
62         cnf_input = CnfList.from_api_structure(cnf_api_input)
63
64         expected_input = dedent("""\
65             1 NIC,0: ""
66             2   (1) NIC_COMMENT,0: "b0"
67             3   (1) NIC_DRIVER,0: "virtio_net"
68             4   (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
69             5   (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
70             6   (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
71             7   (1) NIC_LAN_IP,0: "192.168.1.1"
72             8   (1) NIC_LAN_NAT_INTO,0: "0"
73             9   (1) NIC_LAN_NETMASK,0: "255.255.255.0"
74             10   (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
75             11   (1) NIC_MAC,0: "02:00:00:00:01:01"
76             12   (1) NIC_TYPE,0: "NATLAN"
77             13   (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
78             14 NIC,1: ""
79             15   (14) NIC_COMMENT,0: "b18"
80             16   (14) NIC_DRIVER,0: "virtio_net"
81             17   (14) NIC_MAC,0: "02:00:00:00:01:02"
82             18   (14) NIC_TYPE,0: "DSLROUTER"
83             """)
84
85         self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
86
87     def test_correct_parsing_of_nested_cnf_from_string(self):
88         """Test that grandchildren cnfvars aren't disregarded when no other child cnfvar follows them."""
89         input = dedent("""\
90             1 FIREWALL_NETGROUP,99: "QA host IP"
91             2   (1) FIREWALL_NETGROUP_NETWORK,0: ""
92             3     (2) FIREWALL_NETGROUP_NETWORK_IP,0: "192.168.1.254"
93             4     (2) FIREWALL_NETGROUP_NETWORK_NETMASK,0: "255.255.255.255"
94             """)
95
96         cnf = CnfList.from_cnf_string(input)
97
98         self.assertEqual(str(cnf).splitlines(), input.splitlines())
99
100     def test_deleting_cnfvars(self):
101         """Test that we can delete top-level cnfvars."""
102         # To avoid creating long dummy objects, we can convert our test file
103         # to the arnied API structure using our API itself
104         fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
105                              .to_api_structure()
106         fake_ret = GetCnfRet(vars=fake_output)
107
108         store = CnfStore()
109         with patch.object(Arnied, "get_cnf", return_value=fake_ret):
110             cnfvars = store.query() \
111                            .where(lambda c: c.name == "THEME")
112
113         with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
114              patch.object(store, "_wait_for_generate"):
115             store.delete(cnfvars)
116
117         args, kwargs = set_cnf_mock.call_args
118         cnf_api_input = kwargs["vars"]
119         cnf_input = CnfList.from_api_structure(cnf_api_input)
120
121         expected_input = dedent("""\
122             1 THEME,0: "Intra2net System"
123             2 THEME,1: "Xerberus"
124             """)
125
126         self.assertEqual([c.deleted for c in cnf_api_input],
127                          [True] * len(cnfvars))
128         self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
129
130     def _query_and_update(self, store):
131         """
132         Query the items, update a few variables and commit the changes.
133
134         :param store: the cnf store to use
135
136         The changes done were unified in this method so that we test the same
137         thing with different stores.
138         """
139         nics = store.query() \
140                     .where(lambda c: c.name == "nic")
141
142         self.assertEqual(len(nics), 4)
143
144         # test querying by instance and child
145         nic_0 = nics.single_with_instance(0)
146         nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single()
147
148         # test treating cnfvars as flags
149         self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
150         nic_0.enable_child_flag("nic_lan_dns_relaying_allowed")
151         self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
152
153         # test adding comments
154         nic_0.children.single_with_name("nic_comment").comment = "my cool nic"
155         nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router"
156
157         # test adding a reference from another cnfvar
158         proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2)
159         nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance, -1)
160
161         # testing changing the value
162         nic_1.children.first(lambda c: c.name == "nic_comment") \
163                       .value = "b18"
164         store.commit(CnfList([nic_0, nic_1], renumber=True))
165
166
167 class TestBinaryStore(TestStore):
168     """Test querying, commiting and deleting cnfvars using the binary store."""
169
170     def setUp(self):
171         """Set up mocks and replacements for the real cnf binaries."""
172         with NamedTemporaryFile(delete=False) as tmp:
173             self._get_cnf_output = tmp.name
174         with NamedTemporaryFile(delete=False) as tmp:
175             self._set_cnf_input = tmp.name
176
177         with NamedTemporaryFile(delete=False, mode="w") as tmp:
178             self._fake_get_cnf = tmp.name
179             os.chmod(tmp.name, 0o775)  # make file executable
180             tmp.write(dedent(f"""\
181                 #!/bin/bash
182                 cat "{self._get_cnf_output}"
183                 """))
184             tmp.flush()
185
186         with NamedTemporaryFile(delete=False, mode="w") as tmp:
187             self._fake_set_cnf = tmp.name
188             os.chmod(tmp.name, 0o775)  # make file executable
189             tmp.write(dedent(f"""\
190                 #!/bin/bash
191                 input=$(</dev/stdin)
192                 echo $@ > "{self._set_cnf_input}"
193                 echo "$input" >> "{self._set_cnf_input}"
194                 """))
195             tmp.flush()
196
197         get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf)
198         self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start())
199
200         set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf)
201         self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start())
202
203         self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied")
204         self._arnied_mock.start()
205
206     def tearDown(self) -> None:
207         """Drop mocks and clean up files."""
208         self._arnied_mock.stop()
209         self._get_cnf_mock[0].stop()
210         self._set_cnf_mock[0].stop()
211
212         os.unlink(self._get_cnf_output)
213         os.unlink(self._set_cnf_input)
214         os.unlink(self._fake_get_cnf)
215         os.unlink(self._fake_set_cnf)
216
217     def test_querying_and_updating(self):
218         """Test querying and updating cnf variables."""
219         # tell our fake get_cnf to return the whole file
220         shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
221
222         store = BinaryCnfStore()
223
224         self._query_and_update(store)
225
226         expected_input = dedent("""\
227             1 NIC,0: ""
228             2   (1) NIC_COMMENT,0: "b0" # my cool nic
229             3   (1) NIC_DRIVER,0: "virtio_net"
230             4   (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
231             5   (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
232             6   (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
233             7   (1) NIC_LAN_IP,0: "192.168.1.1"
234             8   (1) NIC_LAN_NAT_INTO,0: "0"
235             9   (1) NIC_LAN_NETMASK,0: "255.255.255.0"
236             10   (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
237             11   (1) NIC_MAC,0: "02:00:00:00:01:01"
238             12   (1) NIC_TYPE,0: "NATLAN"
239             13   (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
240             14 NIC,1: ""
241             15   (14) NIC_COMMENT,0: "b18"
242             16   (14) NIC_DRIVER,0: "virtio_net"
243             17   (14) NIC_MAC,0: "02:00:00:00:01:02"
244             18   (14) NIC_TYPE,0: "DSLROUTER" # a dsl router
245             """)
246
247         with open(self._set_cnf_input, "r") as f:
248             set_cnf_args = f.readline()
249             set_cnf_input = f.read()
250
251         self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
252
253     def test_deleting_cnfvars(self):
254         """Test that we can delete top-level cnfvars."""
255         # tell our fake get_cnf to return the whole file
256         shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
257
258         store = BinaryCnfStore()
259         cnfvars = store.query() \
260                        .where(lambda c: c.name == "THEME")
261
262         store.delete(cnfvars)
263
264         with open(self._set_cnf_input, "r") as f:
265             set_cnf_args = f.readline()
266             set_cnf_input = f.read()
267
268         expected_input = dedent("""\
269             1 THEME,0: "Intra2net System"
270             2   (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif"
271             3   (1) THEME_CSS_FILENAME,0: "intranator.css"
272             4   (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf"
273             5   (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico"
274             6   (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html"
275             7   (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif"
276             8   (1) THEME_LOGO_FILENAME,0: "intranator.gif"
277             9   (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml"
278             10 THEME,1: "Xerberus"
279             11   (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif"
280             12   (10) THEME_CSS_FILENAME,0: "xerberus.css"
281             13   (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf"
282             14   (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico"
283             15   (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html"
284             16   (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif"
285             17   (10) THEME_LOGO_FILENAME,0: "xerberus.gif"
286             18   (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml"
287             """)
288
289         self.assertIn("-x", set_cnf_args)
290         self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
291
292
293 if __name__ == '__main__':
294     unittest.main()