Commit | Line | Data |
---|---|---|
d31714a0 SA |
1 | # The software in this package is distributed under the GNU General |
2 | # Public License version 2 (with a special exception described below). | |
3 | # | |
4 | # A copy of GNU General Public License (GPL) is included in this distribution, | |
5 | # in the file COPYING.GPL. | |
6 | # | |
7 | # As a special exception, if other files instantiate templates or use macros | |
8 | # or inline functions from this file, or you compile this file and link it | |
9 | # with other works to produce a work based on this file, this file | |
10 | # does not by itself cause the resulting work to be covered | |
11 | # by the GNU General Public License. | |
12 | # | |
13 | # However the source code for this file must still be made available | |
14 | # in accordance with section (3) of the GNU General Public License. | |
15 | # | |
16 | # This exception does not invalidate any other reasons why a work based | |
17 | # on this file might be covered by the GNU General Public License. | |
18 | # | |
19 | # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com> | |
20 | ||
21 | """ | |
22 | test_model.py: unit tests for cnfvar/model.py. | |
23 | ||
24 | Tests classes and functions in cnfvar/model.py | |
25 | ||
26 | For help see :py:mod:`unittest` | |
27 | """ | |
28 | ||
29 | import unittest | |
30 | import os | |
31 | import shutil | |
32 | from textwrap import dedent | |
33 | from tempfile import NamedTemporaryFile | |
34 | from unittest.mock import patch | |
35 | ||
36 | from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore | |
37 | from src.arnied_api import Arnied, GetCnfRet | |
38 | ||
39 | ||
40 | TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf") | |
41 | ||
42 | ||
43 | class TestStore(unittest.TestCase): | |
44 | """Test querying, commiting and deleting cnfvars using the stores.""" | |
45 | ||
46 | def test_querying_and_updating(self): | |
47 | """Test querying and updating cnf variables.""" | |
48 | # To avoid creating long dummy objects, we can convert our test file | |
49 | # to the arnied API structure using our API itself | |
50 | fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \ | |
51 | .to_api_structure() | |
52 | fake_ret = GetCnfRet(vars=fake_output) | |
53 | ||
54 | store = CnfStore() | |
55 | with patch.object(Arnied, "get_cnf", return_value=fake_ret),\ | |
56 | patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\ | |
57 | patch.object(store, "_wait_for_generate"): | |
58 | self._query_and_update(store) | |
59 | ||
d5d2c1d7 CH |
60 | args, kwargs = set_cnf_mock.call_args |
61 | cnf_api_input = kwargs["vars"] | |
d31714a0 SA |
62 | cnf_input = CnfList.from_api_structure(cnf_api_input) |
63 | ||
64 | expected_input = dedent("""\ | |
65 | 1 NIC,0: "" | |
66 | 2 (1) NIC_COMMENT,0: "b0" | |
67 | 3 (1) NIC_DRIVER,0: "virtio_net" | |
68 | 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1" | |
69 | 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0" | |
70 | 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99" | |
71 | 7 (1) NIC_LAN_IP,0: "192.168.1.1" | |
72 | 8 (1) NIC_LAN_NAT_INTO,0: "0" | |
73 | 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0" | |
74 | 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1" | |
75 | 11 (1) NIC_MAC,0: "02:00:00:00:01:01" | |
76 | 12 (1) NIC_TYPE,0: "NATLAN" | |
77 | 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2" | |
78 | 14 NIC,1: "" | |
79 | 15 (14) NIC_COMMENT,0: "b18" | |
80 | 16 (14) NIC_DRIVER,0: "virtio_net" | |
81 | 17 (14) NIC_MAC,0: "02:00:00:00:01:02" | |
82 | 18 (14) NIC_TYPE,0: "DSLROUTER" | |
83 | """) | |
84 | ||
85 | self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines()) | |
86 | ||
10de9068 A |
87 | def test_correct_parsing_of_nested_cnf_from_string(self): |
88 | """Test that grandchildren cnfvars aren't disregarded when no other child cnfvar follows them.""" | |
89 | input = dedent("""\ | |
90 | 1 FIREWALL_NETGROUP,99: "QA host IP" | |
91 | 2 (1) FIREWALL_NETGROUP_NETWORK,0: "" | |
92 | 3 (2) FIREWALL_NETGROUP_NETWORK_IP,0: "192.168.1.254" | |
93 | 4 (2) FIREWALL_NETGROUP_NETWORK_NETMASK,0: "255.255.255.255" | |
94 | """) | |
95 | ||
96 | cnf = CnfList.from_cnf_string(input) | |
97 | ||
98 | self.assertEqual(str(cnf).splitlines(), input.splitlines()) | |
99 | ||
d31714a0 SA |
100 | def test_deleting_cnfvars(self): |
101 | """Test that we can delete top-level cnfvars.""" | |
102 | # To avoid creating long dummy objects, we can convert our test file | |
103 | # to the arnied API structure using our API itself | |
104 | fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \ | |
105 | .to_api_structure() | |
106 | fake_ret = GetCnfRet(vars=fake_output) | |
107 | ||
108 | store = CnfStore() | |
109 | with patch.object(Arnied, "get_cnf", return_value=fake_ret): | |
110 | cnfvars = store.query() \ | |
111 | .where(lambda c: c.name == "THEME") | |
112 | ||
113 | with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\ | |
114 | patch.object(store, "_wait_for_generate"): | |
115 | store.delete(cnfvars) | |
116 | ||
d5d2c1d7 CH |
117 | args, kwargs = set_cnf_mock.call_args |
118 | cnf_api_input = kwargs["vars"] | |
d31714a0 SA |
119 | cnf_input = CnfList.from_api_structure(cnf_api_input) |
120 | ||
121 | expected_input = dedent("""\ | |
122 | 1 THEME,0: "Intra2net System" | |
123 | 2 THEME,1: "Xerberus" | |
124 | """) | |
125 | ||
126 | self.assertEqual([c.deleted for c in cnf_api_input], | |
127 | [True] * len(cnfvars)) | |
128 | self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines()) | |
129 | ||
130 | def _query_and_update(self, store): | |
131 | """ | |
132 | Query the items, update a few variables and commit the changes. | |
133 | ||
134 | :param store: the cnf store to use | |
135 | ||
136 | The changes done were unified in this method so that we test the same | |
137 | thing with different stores. | |
138 | """ | |
139 | nics = store.query() \ | |
140 | .where(lambda c: c.name == "nic") | |
141 | ||
142 | self.assertEqual(len(nics), 4) | |
143 | ||
144 | # test querying by instance and child | |
145 | nic_0 = nics.single_with_instance(0) | |
146 | nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single() | |
147 | ||
148 | # test treating cnfvars as flags | |
149 | self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED")) | |
150 | nic_0.enable_child_flag("nic_lan_dns_relaying_allowed") | |
151 | self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED")) | |
152 | ||
153 | # test adding comments | |
154 | nic_0.children.single_with_name("nic_comment").comment = "my cool nic" | |
155 | nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router" | |
156 | ||
157 | # test adding a reference from another cnfvar | |
158 | proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2) | |
c89cc126 | 159 | nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance, -1) |
d31714a0 SA |
160 | |
161 | # testing changing the value | |
162 | nic_1.children.first(lambda c: c.name == "nic_comment") \ | |
163 | .value = "b18" | |
164 | store.commit(CnfList([nic_0, nic_1], renumber=True)) | |
165 | ||
166 | ||
167 | class TestBinaryStore(TestStore): | |
168 | """Test querying, commiting and deleting cnfvars using the binary store.""" | |
169 | ||
170 | def setUp(self): | |
171 | """Set up mocks and replacements for the real cnf binaries.""" | |
172 | with NamedTemporaryFile(delete=False) as tmp: | |
173 | self._get_cnf_output = tmp.name | |
174 | with NamedTemporaryFile(delete=False) as tmp: | |
175 | self._set_cnf_input = tmp.name | |
176 | ||
177 | with NamedTemporaryFile(delete=False, mode="w") as tmp: | |
178 | self._fake_get_cnf = tmp.name | |
179 | os.chmod(tmp.name, 0o775) # make file executable | |
180 | tmp.write(dedent(f"""\ | |
181 | #!/bin/bash | |
182 | cat "{self._get_cnf_output}" | |
183 | """)) | |
184 | tmp.flush() | |
185 | ||
186 | with NamedTemporaryFile(delete=False, mode="w") as tmp: | |
187 | self._fake_set_cnf = tmp.name | |
188 | os.chmod(tmp.name, 0o775) # make file executable | |
189 | tmp.write(dedent(f"""\ | |
190 | #!/bin/bash | |
191 | input=$(</dev/stdin) | |
192 | echo $@ > "{self._set_cnf_input}" | |
193 | echo "$input" >> "{self._set_cnf_input}" | |
194 | """)) | |
195 | tmp.flush() | |
196 | ||
197 | get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf) | |
198 | self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start()) | |
199 | ||
200 | set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf) | |
201 | self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start()) | |
202 | ||
203 | self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied") | |
204 | self._arnied_mock.start() | |
205 | ||
206 | def tearDown(self) -> None: | |
207 | """Drop mocks and clean up files.""" | |
208 | self._arnied_mock.stop() | |
209 | self._get_cnf_mock[0].stop() | |
210 | self._set_cnf_mock[0].stop() | |
211 | ||
212 | os.unlink(self._get_cnf_output) | |
213 | os.unlink(self._set_cnf_input) | |
214 | os.unlink(self._fake_get_cnf) | |
215 | os.unlink(self._fake_set_cnf) | |
216 | ||
217 | def test_querying_and_updating(self): | |
218 | """Test querying and updating cnf variables.""" | |
219 | # tell our fake get_cnf to return the whole file | |
220 | shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output) | |
221 | ||
222 | store = BinaryCnfStore() | |
223 | ||
224 | self._query_and_update(store) | |
225 | ||
226 | expected_input = dedent("""\ | |
227 | 1 NIC,0: "" | |
228 | 2 (1) NIC_COMMENT,0: "b0" # my cool nic | |
229 | 3 (1) NIC_DRIVER,0: "virtio_net" | |
230 | 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1" | |
231 | 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0" | |
232 | 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99" | |
233 | 7 (1) NIC_LAN_IP,0: "192.168.1.1" | |
234 | 8 (1) NIC_LAN_NAT_INTO,0: "0" | |
235 | 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0" | |
236 | 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1" | |
237 | 11 (1) NIC_MAC,0: "02:00:00:00:01:01" | |
238 | 12 (1) NIC_TYPE,0: "NATLAN" | |
239 | 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2" | |
240 | 14 NIC,1: "" | |
241 | 15 (14) NIC_COMMENT,0: "b18" | |
242 | 16 (14) NIC_DRIVER,0: "virtio_net" | |
243 | 17 (14) NIC_MAC,0: "02:00:00:00:01:02" | |
244 | 18 (14) NIC_TYPE,0: "DSLROUTER" # a dsl router | |
245 | """) | |
246 | ||
247 | with open(self._set_cnf_input, "r") as f: | |
248 | set_cnf_args = f.readline() | |
249 | set_cnf_input = f.read() | |
250 | ||
251 | self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines()) | |
252 | ||
253 | def test_deleting_cnfvars(self): | |
254 | """Test that we can delete top-level cnfvars.""" | |
255 | # tell our fake get_cnf to return the whole file | |
256 | shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output) | |
257 | ||
258 | store = BinaryCnfStore() | |
259 | cnfvars = store.query() \ | |
260 | .where(lambda c: c.name == "THEME") | |
261 | ||
262 | store.delete(cnfvars) | |
263 | ||
264 | with open(self._set_cnf_input, "r") as f: | |
265 | set_cnf_args = f.readline() | |
266 | set_cnf_input = f.read() | |
267 | ||
268 | expected_input = dedent("""\ | |
269 | 1 THEME,0: "Intra2net System" | |
270 | 2 (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif" | |
271 | 3 (1) THEME_CSS_FILENAME,0: "intranator.css" | |
272 | 4 (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf" | |
273 | 5 (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico" | |
274 | 6 (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html" | |
275 | 7 (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif" | |
276 | 8 (1) THEME_LOGO_FILENAME,0: "intranator.gif" | |
277 | 9 (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml" | |
278 | 10 THEME,1: "Xerberus" | |
279 | 11 (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif" | |
280 | 12 (10) THEME_CSS_FILENAME,0: "xerberus.css" | |
281 | 13 (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf" | |
282 | 14 (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico" | |
283 | 15 (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html" | |
284 | 16 (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif" | |
285 | 17 (10) THEME_LOGO_FILENAME,0: "xerberus.gif" | |
286 | 18 (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml" | |
287 | """) | |
288 | ||
289 | self.assertIn("-x", set_cnf_args) | |
290 | self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines()) | |
291 | ||
292 | ||
293 | if __name__ == '__main__': | |
294 | unittest.main() |