Increase version to 1.7.4
[pyi2ncommon] / test / cnfvar / test_store.py
... / ...
CommitLineData
1# The software in this package is distributed under the GNU General
2# Public License version 2 (with a special exception described below).
3#
4# A copy of GNU General Public License (GPL) is included in this distribution,
5# in the file COPYING.GPL.
6#
7# As a special exception, if other files instantiate templates or use macros
8# or inline functions from this file, or you compile this file and link it
9# with other works to produce a work based on this file, this file
10# does not by itself cause the resulting work to be covered
11# by the GNU General Public License.
12#
13# However the source code for this file must still be made available
14# in accordance with section (3) of the GNU General Public License.
15#
16# This exception does not invalidate any other reasons why a work based
17# on this file might be covered by the GNU General Public License.
18#
19# Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21"""
22test_model.py: unit tests for cnfvar/model.py.
23
24Tests classes and functions in cnfvar/model.py
25
26For help see :py:mod:`unittest`
27"""
28
29import unittest
30import os
31import shutil
32from textwrap import dedent
33from tempfile import NamedTemporaryFile
34from unittest.mock import patch
35
36from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore
37from src.arnied_api import Arnied, GetCnfRet
38
39
40TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf")
41
42
43class TestStore(unittest.TestCase):
44 """Test querying, commiting and deleting cnfvars using the stores."""
45
46 def test_querying_and_updating(self):
47 """Test querying and updating cnf variables."""
48 # To avoid creating long dummy objects, we can convert our test file
49 # to the arnied API structure using our API itself
50 fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
51 .to_api_structure()
52 fake_ret = GetCnfRet(vars=fake_output)
53
54 store = CnfStore()
55 with patch.object(Arnied, "get_cnf", return_value=fake_ret),\
56 patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
57 patch.object(store, "_wait_for_generate"):
58 self._query_and_update(store)
59
60 args, kwargs = set_cnf_mock.call_args
61 cnf_api_input = kwargs["vars"]
62 cnf_input = CnfList.from_api_structure(cnf_api_input)
63
64 expected_input = dedent("""\
65 1 NIC,0: ""
66 2 (1) NIC_COMMENT,0: "b0"
67 3 (1) NIC_DRIVER,0: "virtio_net"
68 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
69 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
70 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
71 7 (1) NIC_LAN_IP,0: "192.168.1.1"
72 8 (1) NIC_LAN_NAT_INTO,0: "0"
73 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0"
74 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
75 11 (1) NIC_MAC,0: "02:00:00:00:01:01"
76 12 (1) NIC_TYPE,0: "NATLAN"
77 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
78 14 NIC,1: ""
79 15 (14) NIC_COMMENT,0: "b18"
80 16 (14) NIC_DRIVER,0: "virtio_net"
81 17 (14) NIC_MAC,0: "02:00:00:00:01:02"
82 18 (14) NIC_TYPE,0: "DSLROUTER"
83 """)
84
85 self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
86
87 def test_correct_parsing_of_nested_cnf_from_string(self):
88 """Test that grandchildren cnfvars aren't disregarded when no other child cnfvar follows them."""
89 input = dedent("""\
90 1 FIREWALL_NETGROUP,99: "QA host IP"
91 2 (1) FIREWALL_NETGROUP_NETWORK,0: ""
92 3 (2) FIREWALL_NETGROUP_NETWORK_IP,0: "192.168.1.254"
93 4 (2) FIREWALL_NETGROUP_NETWORK_NETMASK,0: "255.255.255.255"
94 """)
95
96 cnf = CnfList.from_cnf_string(input)
97
98 self.assertEqual(str(cnf).splitlines(), input.splitlines())
99
100 def test_deleting_cnfvars(self):
101 """Test that we can delete top-level cnfvars."""
102 # To avoid creating long dummy objects, we can convert our test file
103 # to the arnied API structure using our API itself
104 fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
105 .to_api_structure()
106 fake_ret = GetCnfRet(vars=fake_output)
107
108 store = CnfStore()
109 with patch.object(Arnied, "get_cnf", return_value=fake_ret):
110 cnfvars = store.query() \
111 .where(lambda c: c.name == "THEME")
112
113 with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
114 patch.object(store, "_wait_for_generate"):
115 store.delete(cnfvars)
116
117 args, kwargs = set_cnf_mock.call_args
118 cnf_api_input = kwargs["vars"]
119 cnf_input = CnfList.from_api_structure(cnf_api_input)
120
121 expected_input = dedent("""\
122 1 THEME,0: "Intra2net System"
123 2 THEME,1: "Xerberus"
124 """)
125
126 self.assertEqual([c.deleted for c in cnf_api_input],
127 [True] * len(cnfvars))
128 self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
129
130 def _query_and_update(self, store):
131 """
132 Query the items, update a few variables and commit the changes.
133
134 :param store: the cnf store to use
135
136 The changes done were unified in this method so that we test the same
137 thing with different stores.
138 """
139 nics = store.query() \
140 .where(lambda c: c.name == "nic")
141
142 self.assertEqual(len(nics), 4)
143
144 # test querying by instance and child
145 nic_0 = nics.single_with_instance(0)
146 nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single()
147
148 # test treating cnfvars as flags
149 self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
150 nic_0.enable_child_flag("nic_lan_dns_relaying_allowed")
151 self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
152
153 # test adding comments
154 nic_0.children.single_with_name("nic_comment").comment = "my cool nic"
155 nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router"
156
157 # test adding a reference from another cnfvar
158 proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2)
159 nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance, -1)
160
161 # testing changing the value
162 nic_1.children.first(lambda c: c.name == "nic_comment") \
163 .value = "b18"
164 store.commit(CnfList([nic_0, nic_1], renumber=True))
165
166
167class TestBinaryStore(TestStore):
168 """Test querying, commiting and deleting cnfvars using the binary store."""
169
170 def setUp(self):
171 """Set up mocks and replacements for the real cnf binaries."""
172 with NamedTemporaryFile(delete=False) as tmp:
173 self._get_cnf_output = tmp.name
174 with NamedTemporaryFile(delete=False) as tmp:
175 self._set_cnf_input = tmp.name
176
177 with NamedTemporaryFile(delete=False, mode="w") as tmp:
178 self._fake_get_cnf = tmp.name
179 os.chmod(tmp.name, 0o775) # make file executable
180 tmp.write(dedent(f"""\
181 #!/bin/bash
182 cat "{self._get_cnf_output}"
183 """))
184 tmp.flush()
185
186 with NamedTemporaryFile(delete=False, mode="w") as tmp:
187 self._fake_set_cnf = tmp.name
188 os.chmod(tmp.name, 0o775) # make file executable
189 tmp.write(dedent(f"""\
190 #!/bin/bash
191 input=$(</dev/stdin)
192 echo $@ > "{self._set_cnf_input}"
193 echo "$input" >> "{self._set_cnf_input}"
194 """))
195 tmp.flush()
196
197 get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf)
198 self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start())
199
200 set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf)
201 self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start())
202
203 self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied")
204 self._arnied_mock.start()
205
206 def tearDown(self) -> None:
207 """Drop mocks and clean up files."""
208 self._arnied_mock.stop()
209 self._get_cnf_mock[0].stop()
210 self._set_cnf_mock[0].stop()
211
212 os.unlink(self._get_cnf_output)
213 os.unlink(self._set_cnf_input)
214 os.unlink(self._fake_get_cnf)
215 os.unlink(self._fake_set_cnf)
216
217 def test_querying_and_updating(self):
218 """Test querying and updating cnf variables."""
219 # tell our fake get_cnf to return the whole file
220 shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
221
222 store = BinaryCnfStore()
223
224 self._query_and_update(store)
225
226 expected_input = dedent("""\
227 1 NIC,0: ""
228 2 (1) NIC_COMMENT,0: "b0" # my cool nic
229 3 (1) NIC_DRIVER,0: "virtio_net"
230 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
231 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
232 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
233 7 (1) NIC_LAN_IP,0: "192.168.1.1"
234 8 (1) NIC_LAN_NAT_INTO,0: "0"
235 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0"
236 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
237 11 (1) NIC_MAC,0: "02:00:00:00:01:01"
238 12 (1) NIC_TYPE,0: "NATLAN"
239 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
240 14 NIC,1: ""
241 15 (14) NIC_COMMENT,0: "b18"
242 16 (14) NIC_DRIVER,0: "virtio_net"
243 17 (14) NIC_MAC,0: "02:00:00:00:01:02"
244 18 (14) NIC_TYPE,0: "DSLROUTER" # a dsl router
245 """)
246
247 with open(self._set_cnf_input, "r") as f:
248 set_cnf_args = f.readline()
249 set_cnf_input = f.read()
250
251 self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
252
253 def test_deleting_cnfvars(self):
254 """Test that we can delete top-level cnfvars."""
255 # tell our fake get_cnf to return the whole file
256 shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
257
258 store = BinaryCnfStore()
259 cnfvars = store.query() \
260 .where(lambda c: c.name == "THEME")
261
262 store.delete(cnfvars)
263
264 with open(self._set_cnf_input, "r") as f:
265 set_cnf_args = f.readline()
266 set_cnf_input = f.read()
267
268 expected_input = dedent("""\
269 1 THEME,0: "Intra2net System"
270 2 (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif"
271 3 (1) THEME_CSS_FILENAME,0: "intranator.css"
272 4 (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf"
273 5 (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico"
274 6 (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html"
275 7 (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif"
276 8 (1) THEME_LOGO_FILENAME,0: "intranator.gif"
277 9 (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml"
278 10 THEME,1: "Xerberus"
279 11 (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif"
280 12 (10) THEME_CSS_FILENAME,0: "xerberus.css"
281 13 (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf"
282 14 (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico"
283 15 (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html"
284 16 (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif"
285 17 (10) THEME_LOGO_FILENAME,0: "xerberus.gif"
286 18 (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml"
287 """)
288
289 self.assertIn("-x", set_cnf_args)
290 self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
291
292
293if __name__ == '__main__':
294 unittest.main()