Add a new cnfvar2 API
[pyi2ncommon] / test / cnfvar / test_store.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21 """
22 test_model.py: unit tests for cnfvar/model.py.
23
24 Tests classes and functions in cnfvar/model.py
25
26 For help see :py:mod:`unittest`
27 """
28
29 import unittest
30 import os
31 import shutil
32 from textwrap import dedent
33 from tempfile import NamedTemporaryFile
34 from unittest.mock import patch
35
36 from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore
37 from src.arnied_api import Arnied, GetCnfRet
38
39
40 TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf")
41
42
43 class TestStore(unittest.TestCase):
44     """Test querying, commiting and deleting cnfvars using the stores."""
45
46     def test_querying_and_updating(self):
47         """Test querying and updating cnf variables."""
48         # To avoid creating long dummy objects, we can convert our test file
49         # to the arnied API structure using our API itself
50         fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
51                              .to_api_structure()
52         fake_ret = GetCnfRet(vars=fake_output)
53
54         store = CnfStore()
55         with patch.object(Arnied, "get_cnf", return_value=fake_ret),\
56              patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
57              patch.object(store, "_wait_for_generate"):
58             self._query_and_update(store)
59
60         cnf_api_input = set_cnf_mock.call_args.kwargs["vars"]
61         cnf_input = CnfList.from_api_structure(cnf_api_input)
62
63         expected_input = dedent("""\
64             1 NIC,0: ""
65             2   (1) NIC_COMMENT,0: "b0"
66             3   (1) NIC_DRIVER,0: "virtio_net"
67             4   (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
68             5   (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
69             6   (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
70             7   (1) NIC_LAN_IP,0: "192.168.1.1"
71             8   (1) NIC_LAN_NAT_INTO,0: "0"
72             9   (1) NIC_LAN_NETMASK,0: "255.255.255.0"
73             10   (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
74             11   (1) NIC_MAC,0: "02:00:00:00:01:01"
75             12   (1) NIC_TYPE,0: "NATLAN"
76             13   (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
77             14 NIC,1: ""
78             15   (14) NIC_COMMENT,0: "b18"
79             16   (14) NIC_DRIVER,0: "virtio_net"
80             17   (14) NIC_MAC,0: "02:00:00:00:01:02"
81             18   (14) NIC_TYPE,0: "DSLROUTER"
82             """)
83
84         self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
85
86     def test_deleting_cnfvars(self):
87         """Test that we can delete top-level cnfvars."""
88         # To avoid creating long dummy objects, we can convert our test file
89         # to the arnied API structure using our API itself
90         fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
91                              .to_api_structure()
92         fake_ret = GetCnfRet(vars=fake_output)
93
94         store = CnfStore()
95         with patch.object(Arnied, "get_cnf", return_value=fake_ret):
96             cnfvars = store.query() \
97                            .where(lambda c: c.name == "THEME")
98
99         with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
100              patch.object(store, "_wait_for_generate"):
101             store.delete(cnfvars)
102
103         cnf_api_input = set_cnf_mock.call_args.kwargs["vars"]
104         cnf_input = CnfList.from_api_structure(cnf_api_input)
105
106         expected_input = dedent("""\
107             1 THEME,0: "Intra2net System"
108             2 THEME,1: "Xerberus"
109             """)
110
111         self.assertEqual([c.deleted for c in cnf_api_input],
112                          [True] * len(cnfvars))
113         self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
114
115     def _query_and_update(self, store):
116         """
117         Query the items, update a few variables and commit the changes.
118
119         :param store: the cnf store to use
120
121         The changes done were unified in this method so that we test the same
122         thing with different stores.
123         """
124         nics = store.query() \
125                     .where(lambda c: c.name == "nic")
126
127         self.assertEqual(len(nics), 4)
128
129         # test querying by instance and child
130         nic_0 = nics.single_with_instance(0)
131         nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single()
132
133         # test treating cnfvars as flags
134         self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
135         nic_0.enable_child_flag("nic_lan_dns_relaying_allowed")
136         self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
137
138         # test adding comments
139         nic_0.children.single_with_name("nic_comment").comment = "my cool nic"
140         nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router"
141
142         # test adding a reference from another cnfvar
143         proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2)
144         nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance)
145
146         # testing changing the value
147         nic_1.children.first(lambda c: c.name == "nic_comment") \
148                       .value = "b18"
149         store.commit(CnfList([nic_0, nic_1], renumber=True))
150
151
152 class TestBinaryStore(TestStore):
153     """Test querying, commiting and deleting cnfvars using the binary store."""
154
155     def setUp(self):
156         """Set up mocks and replacements for the real cnf binaries."""
157         with NamedTemporaryFile(delete=False) as tmp:
158             self._get_cnf_output = tmp.name
159         with NamedTemporaryFile(delete=False) as tmp:
160             self._set_cnf_input = tmp.name
161
162         with NamedTemporaryFile(delete=False, mode="w") as tmp:
163             self._fake_get_cnf = tmp.name
164             os.chmod(tmp.name, 0o775)  # make file executable
165             tmp.write(dedent(f"""\
166                 #!/bin/bash
167                 cat "{self._get_cnf_output}"
168                 """))
169             tmp.flush()
170
171         with NamedTemporaryFile(delete=False, mode="w") as tmp:
172             self._fake_set_cnf = tmp.name
173             os.chmod(tmp.name, 0o775)  # make file executable
174             tmp.write(dedent(f"""\
175                 #!/bin/bash
176                 input=$(</dev/stdin)
177                 echo $@ > "{self._set_cnf_input}"
178                 echo "$input" >> "{self._set_cnf_input}"
179                 """))
180             tmp.flush()
181
182         get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf)
183         self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start())
184
185         set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf)
186         self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start())
187
188         self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied")
189         self._arnied_mock.start()
190
191     def tearDown(self) -> None:
192         """Drop mocks and clean up files."""
193         self._arnied_mock.stop()
194         self._get_cnf_mock[0].stop()
195         self._set_cnf_mock[0].stop()
196
197         os.unlink(self._get_cnf_output)
198         os.unlink(self._set_cnf_input)
199         os.unlink(self._fake_get_cnf)
200         os.unlink(self._fake_set_cnf)
201
202     def test_querying_and_updating(self):
203         """Test querying and updating cnf variables."""
204         # tell our fake get_cnf to return the whole file
205         shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
206
207         store = BinaryCnfStore()
208
209         self._query_and_update(store)
210
211         expected_input = dedent("""\
212             1 NIC,0: ""
213             2   (1) NIC_COMMENT,0: "b0" # my cool nic
214             3   (1) NIC_DRIVER,0: "virtio_net"
215             4   (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
216             5   (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
217             6   (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
218             7   (1) NIC_LAN_IP,0: "192.168.1.1"
219             8   (1) NIC_LAN_NAT_INTO,0: "0"
220             9   (1) NIC_LAN_NETMASK,0: "255.255.255.0"
221             10   (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
222             11   (1) NIC_MAC,0: "02:00:00:00:01:01"
223             12   (1) NIC_TYPE,0: "NATLAN"
224             13   (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
225             14 NIC,1: ""
226             15   (14) NIC_COMMENT,0: "b18"
227             16   (14) NIC_DRIVER,0: "virtio_net"
228             17   (14) NIC_MAC,0: "02:00:00:00:01:02"
229             18   (14) NIC_TYPE,0: "DSLROUTER" # a dsl router
230             """)
231
232         with open(self._set_cnf_input, "r") as f:
233             set_cnf_args = f.readline()
234             set_cnf_input = f.read()
235
236         self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
237
238     def test_deleting_cnfvars(self):
239         """Test that we can delete top-level cnfvars."""
240         # tell our fake get_cnf to return the whole file
241         shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
242
243         store = BinaryCnfStore()
244         cnfvars = store.query() \
245                        .where(lambda c: c.name == "THEME")
246
247         store.delete(cnfvars)
248
249         with open(self._set_cnf_input, "r") as f:
250             set_cnf_args = f.readline()
251             set_cnf_input = f.read()
252
253         expected_input = dedent("""\
254             1 THEME,0: "Intra2net System"
255             2   (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif"
256             3   (1) THEME_CSS_FILENAME,0: "intranator.css"
257             4   (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf"
258             5   (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico"
259             6   (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html"
260             7   (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif"
261             8   (1) THEME_LOGO_FILENAME,0: "intranator.gif"
262             9   (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml"
263             10 THEME,1: "Xerberus"
264             11   (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif"
265             12   (10) THEME_CSS_FILENAME,0: "xerberus.css"
266             13   (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf"
267             14   (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico"
268             15   (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html"
269             16   (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif"
270             17   (10) THEME_LOGO_FILENAME,0: "xerberus.gif"
271             18   (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml"
272             """)
273
274         self.assertIn("-x", set_cnf_args)
275         self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
276
277
278 if __name__ == '__main__':
279     unittest.main()