Commit | Line | Data |
---|---|---|
d31714a0 SA |
1 | # The software in this package is distributed under the GNU General |
2 | # Public License version 2 (with a special exception described below). | |
3 | # | |
4 | # A copy of GNU General Public License (GPL) is included in this distribution, | |
5 | # in the file COPYING.GPL. | |
6 | # | |
7 | # As a special exception, if other files instantiate templates or use macros | |
8 | # or inline functions from this file, or you compile this file and link it | |
9 | # with other works to produce a work based on this file, this file | |
10 | # does not by itself cause the resulting work to be covered | |
11 | # by the GNU General Public License. | |
12 | # | |
13 | # However the source code for this file must still be made available | |
14 | # in accordance with section (3) of the GNU General Public License. | |
15 | # | |
16 | # This exception does not invalidate any other reasons why a work based | |
17 | # on this file might be covered by the GNU General Public License. | |
18 | # | |
19 | # Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com> | |
20 | ||
21 | """ | |
22 | test_model.py: unit tests for cnfvar/model.py. | |
23 | ||
24 | Tests classes and functions in cnfvar/model.py | |
25 | ||
26 | For help see :py:mod:`unittest` | |
27 | """ | |
28 | ||
29 | import unittest | |
30 | import os | |
31 | import shutil | |
32 | from textwrap import dedent | |
33 | from tempfile import NamedTemporaryFile | |
34 | from unittest.mock import patch | |
35 | ||
36 | from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore | |
37 | from src.arnied_api import Arnied, GetCnfRet | |
38 | ||
39 | ||
40 | TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf") | |
41 | ||
42 | ||
43 | class TestStore(unittest.TestCase): | |
44 | """Test querying, commiting and deleting cnfvars using the stores.""" | |
45 | ||
46 | def test_querying_and_updating(self): | |
47 | """Test querying and updating cnf variables.""" | |
48 | # To avoid creating long dummy objects, we can convert our test file | |
49 | # to the arnied API structure using our API itself | |
50 | fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \ | |
51 | .to_api_structure() | |
52 | fake_ret = GetCnfRet(vars=fake_output) | |
53 | ||
54 | store = CnfStore() | |
55 | with patch.object(Arnied, "get_cnf", return_value=fake_ret),\ | |
56 | patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\ | |
57 | patch.object(store, "_wait_for_generate"): | |
58 | self._query_and_update(store) | |
59 | ||
d5d2c1d7 CH |
60 | args, kwargs = set_cnf_mock.call_args |
61 | cnf_api_input = kwargs["vars"] | |
d31714a0 SA |
62 | cnf_input = CnfList.from_api_structure(cnf_api_input) |
63 | ||
64 | expected_input = dedent("""\ | |
65 | 1 NIC,0: "" | |
66 | 2 (1) NIC_COMMENT,0: "b0" | |
67 | 3 (1) NIC_DRIVER,0: "virtio_net" | |
68 | 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1" | |
69 | 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0" | |
70 | 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99" | |
71 | 7 (1) NIC_LAN_IP,0: "192.168.1.1" | |
72 | 8 (1) NIC_LAN_NAT_INTO,0: "0" | |
73 | 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0" | |
74 | 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1" | |
75 | 11 (1) NIC_MAC,0: "02:00:00:00:01:01" | |
76 | 12 (1) NIC_TYPE,0: "NATLAN" | |
77 | 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2" | |
78 | 14 NIC,1: "" | |
79 | 15 (14) NIC_COMMENT,0: "b18" | |
80 | 16 (14) NIC_DRIVER,0: "virtio_net" | |
81 | 17 (14) NIC_MAC,0: "02:00:00:00:01:02" | |
82 | 18 (14) NIC_TYPE,0: "DSLROUTER" | |
83 | """) | |
84 | ||
85 | self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines()) | |
86 | ||
87 | def test_deleting_cnfvars(self): | |
88 | """Test that we can delete top-level cnfvars.""" | |
89 | # To avoid creating long dummy objects, we can convert our test file | |
90 | # to the arnied API structure using our API itself | |
91 | fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \ | |
92 | .to_api_structure() | |
93 | fake_ret = GetCnfRet(vars=fake_output) | |
94 | ||
95 | store = CnfStore() | |
96 | with patch.object(Arnied, "get_cnf", return_value=fake_ret): | |
97 | cnfvars = store.query() \ | |
98 | .where(lambda c: c.name == "THEME") | |
99 | ||
100 | with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\ | |
101 | patch.object(store, "_wait_for_generate"): | |
102 | store.delete(cnfvars) | |
103 | ||
d5d2c1d7 CH |
104 | args, kwargs = set_cnf_mock.call_args |
105 | cnf_api_input = kwargs["vars"] | |
d31714a0 SA |
106 | cnf_input = CnfList.from_api_structure(cnf_api_input) |
107 | ||
108 | expected_input = dedent("""\ | |
109 | 1 THEME,0: "Intra2net System" | |
110 | 2 THEME,1: "Xerberus" | |
111 | """) | |
112 | ||
113 | self.assertEqual([c.deleted for c in cnf_api_input], | |
114 | [True] * len(cnfvars)) | |
115 | self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines()) | |
116 | ||
117 | def _query_and_update(self, store): | |
118 | """ | |
119 | Query the items, update a few variables and commit the changes. | |
120 | ||
121 | :param store: the cnf store to use | |
122 | ||
123 | The changes done were unified in this method so that we test the same | |
124 | thing with different stores. | |
125 | """ | |
126 | nics = store.query() \ | |
127 | .where(lambda c: c.name == "nic") | |
128 | ||
129 | self.assertEqual(len(nics), 4) | |
130 | ||
131 | # test querying by instance and child | |
132 | nic_0 = nics.single_with_instance(0) | |
133 | nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single() | |
134 | ||
135 | # test treating cnfvars as flags | |
136 | self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED")) | |
137 | nic_0.enable_child_flag("nic_lan_dns_relaying_allowed") | |
138 | self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED")) | |
139 | ||
140 | # test adding comments | |
141 | nic_0.children.single_with_name("nic_comment").comment = "my cool nic" | |
142 | nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router" | |
143 | ||
144 | # test adding a reference from another cnfvar | |
145 | proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2) | |
146 | nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance) | |
147 | ||
148 | # testing changing the value | |
149 | nic_1.children.first(lambda c: c.name == "nic_comment") \ | |
150 | .value = "b18" | |
151 | store.commit(CnfList([nic_0, nic_1], renumber=True)) | |
152 | ||
153 | ||
154 | class TestBinaryStore(TestStore): | |
155 | """Test querying, commiting and deleting cnfvars using the binary store.""" | |
156 | ||
157 | def setUp(self): | |
158 | """Set up mocks and replacements for the real cnf binaries.""" | |
159 | with NamedTemporaryFile(delete=False) as tmp: | |
160 | self._get_cnf_output = tmp.name | |
161 | with NamedTemporaryFile(delete=False) as tmp: | |
162 | self._set_cnf_input = tmp.name | |
163 | ||
164 | with NamedTemporaryFile(delete=False, mode="w") as tmp: | |
165 | self._fake_get_cnf = tmp.name | |
166 | os.chmod(tmp.name, 0o775) # make file executable | |
167 | tmp.write(dedent(f"""\ | |
168 | #!/bin/bash | |
169 | cat "{self._get_cnf_output}" | |
170 | """)) | |
171 | tmp.flush() | |
172 | ||
173 | with NamedTemporaryFile(delete=False, mode="w") as tmp: | |
174 | self._fake_set_cnf = tmp.name | |
175 | os.chmod(tmp.name, 0o775) # make file executable | |
176 | tmp.write(dedent(f"""\ | |
177 | #!/bin/bash | |
178 | input=$(</dev/stdin) | |
179 | echo $@ > "{self._set_cnf_input}" | |
180 | echo "$input" >> "{self._set_cnf_input}" | |
181 | """)) | |
182 | tmp.flush() | |
183 | ||
184 | get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf) | |
185 | self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start()) | |
186 | ||
187 | set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf) | |
188 | self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start()) | |
189 | ||
190 | self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied") | |
191 | self._arnied_mock.start() | |
192 | ||
193 | def tearDown(self) -> None: | |
194 | """Drop mocks and clean up files.""" | |
195 | self._arnied_mock.stop() | |
196 | self._get_cnf_mock[0].stop() | |
197 | self._set_cnf_mock[0].stop() | |
198 | ||
199 | os.unlink(self._get_cnf_output) | |
200 | os.unlink(self._set_cnf_input) | |
201 | os.unlink(self._fake_get_cnf) | |
202 | os.unlink(self._fake_set_cnf) | |
203 | ||
204 | def test_querying_and_updating(self): | |
205 | """Test querying and updating cnf variables.""" | |
206 | # tell our fake get_cnf to return the whole file | |
207 | shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output) | |
208 | ||
209 | store = BinaryCnfStore() | |
210 | ||
211 | self._query_and_update(store) | |
212 | ||
213 | expected_input = dedent("""\ | |
214 | 1 NIC,0: "" | |
215 | 2 (1) NIC_COMMENT,0: "b0" # my cool nic | |
216 | 3 (1) NIC_DRIVER,0: "virtio_net" | |
217 | 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1" | |
218 | 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0" | |
219 | 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99" | |
220 | 7 (1) NIC_LAN_IP,0: "192.168.1.1" | |
221 | 8 (1) NIC_LAN_NAT_INTO,0: "0" | |
222 | 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0" | |
223 | 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1" | |
224 | 11 (1) NIC_MAC,0: "02:00:00:00:01:01" | |
225 | 12 (1) NIC_TYPE,0: "NATLAN" | |
226 | 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2" | |
227 | 14 NIC,1: "" | |
228 | 15 (14) NIC_COMMENT,0: "b18" | |
229 | 16 (14) NIC_DRIVER,0: "virtio_net" | |
230 | 17 (14) NIC_MAC,0: "02:00:00:00:01:02" | |
231 | 18 (14) NIC_TYPE,0: "DSLROUTER" # a dsl router | |
232 | """) | |
233 | ||
234 | with open(self._set_cnf_input, "r") as f: | |
235 | set_cnf_args = f.readline() | |
236 | set_cnf_input = f.read() | |
237 | ||
238 | self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines()) | |
239 | ||
240 | def test_deleting_cnfvars(self): | |
241 | """Test that we can delete top-level cnfvars.""" | |
242 | # tell our fake get_cnf to return the whole file | |
243 | shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output) | |
244 | ||
245 | store = BinaryCnfStore() | |
246 | cnfvars = store.query() \ | |
247 | .where(lambda c: c.name == "THEME") | |
248 | ||
249 | store.delete(cnfvars) | |
250 | ||
251 | with open(self._set_cnf_input, "r") as f: | |
252 | set_cnf_args = f.readline() | |
253 | set_cnf_input = f.read() | |
254 | ||
255 | expected_input = dedent("""\ | |
256 | 1 THEME,0: "Intra2net System" | |
257 | 2 (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif" | |
258 | 3 (1) THEME_CSS_FILENAME,0: "intranator.css" | |
259 | 4 (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf" | |
260 | 5 (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico" | |
261 | 6 (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html" | |
262 | 7 (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif" | |
263 | 8 (1) THEME_LOGO_FILENAME,0: "intranator.gif" | |
264 | 9 (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml" | |
265 | 10 THEME,1: "Xerberus" | |
266 | 11 (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif" | |
267 | 12 (10) THEME_CSS_FILENAME,0: "xerberus.css" | |
268 | 13 (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf" | |
269 | 14 (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico" | |
270 | 15 (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html" | |
271 | 16 (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif" | |
272 | 17 (10) THEME_LOGO_FILENAME,0: "xerberus.gif" | |
273 | 18 (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml" | |
274 | """) | |
275 | ||
276 | self.assertIn("-x", set_cnf_args) | |
277 | self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines()) | |
278 | ||
279 | ||
280 | if __name__ == '__main__': | |
281 | unittest.main() |