Revert the default instance number back to zero
[pyi2ncommon] / test / cnfvar / test_store.py
CommitLineData
d31714a0
SA
1# The software in this package is distributed under the GNU General
2# Public License version 2 (with a special exception described below).
3#
4# A copy of GNU General Public License (GPL) is included in this distribution,
5# in the file COPYING.GPL.
6#
7# As a special exception, if other files instantiate templates or use macros
8# or inline functions from this file, or you compile this file and link it
9# with other works to produce a work based on this file, this file
10# does not by itself cause the resulting work to be covered
11# by the GNU General Public License.
12#
13# However the source code for this file must still be made available
14# in accordance with section (3) of the GNU General Public License.
15#
16# This exception does not invalidate any other reasons why a work based
17# on this file might be covered by the GNU General Public License.
18#
19# Copyright (c) 2016-2022 Intra2net AG <info@intra2net.com>
20
21"""
22test_model.py: unit tests for cnfvar/model.py.
23
24Tests classes and functions in cnfvar/model.py
25
26For help see :py:mod:`unittest`
27"""
28
29import unittest
30import os
31import shutil
32from textwrap import dedent
33from tempfile import NamedTemporaryFile
34from unittest.mock import patch
35
36from src.cnfvar import binary, CnfList, CnfStore, BinaryCnfStore
37from src.arnied_api import Arnied, GetCnfRet
38
39
40TEST_CNF_FILE = os.path.join(os.path.dirname(__file__), "cnfs.cnf")
41
42
43class TestStore(unittest.TestCase):
44 """Test querying, commiting and deleting cnfvars using the stores."""
45
46 def test_querying_and_updating(self):
47 """Test querying and updating cnf variables."""
48 # To avoid creating long dummy objects, we can convert our test file
49 # to the arnied API structure using our API itself
50 fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
51 .to_api_structure()
52 fake_ret = GetCnfRet(vars=fake_output)
53
54 store = CnfStore()
55 with patch.object(Arnied, "get_cnf", return_value=fake_ret),\
56 patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
57 patch.object(store, "_wait_for_generate"):
58 self._query_and_update(store)
59
d5d2c1d7
CH
60 args, kwargs = set_cnf_mock.call_args
61 cnf_api_input = kwargs["vars"]
d31714a0
SA
62 cnf_input = CnfList.from_api_structure(cnf_api_input)
63
64 expected_input = dedent("""\
65 1 NIC,0: ""
66 2 (1) NIC_COMMENT,0: "b0"
67 3 (1) NIC_DRIVER,0: "virtio_net"
68 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
69 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
70 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
71 7 (1) NIC_LAN_IP,0: "192.168.1.1"
72 8 (1) NIC_LAN_NAT_INTO,0: "0"
73 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0"
74 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
75 11 (1) NIC_MAC,0: "02:00:00:00:01:01"
76 12 (1) NIC_TYPE,0: "NATLAN"
77 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
78 14 NIC,1: ""
79 15 (14) NIC_COMMENT,0: "b18"
80 16 (14) NIC_DRIVER,0: "virtio_net"
81 17 (14) NIC_MAC,0: "02:00:00:00:01:02"
82 18 (14) NIC_TYPE,0: "DSLROUTER"
83 """)
84
85 self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
86
87 def test_deleting_cnfvars(self):
88 """Test that we can delete top-level cnfvars."""
89 # To avoid creating long dummy objects, we can convert our test file
90 # to the arnied API structure using our API itself
91 fake_output = CnfList.from_cnf_file(TEST_CNF_FILE) \
92 .to_api_structure()
93 fake_ret = GetCnfRet(vars=fake_output)
94
95 store = CnfStore()
96 with patch.object(Arnied, "get_cnf", return_value=fake_ret):
97 cnfvars = store.query() \
98 .where(lambda c: c.name == "THEME")
99
100 with patch.object(Arnied, "set_commit_cnf") as set_cnf_mock,\
101 patch.object(store, "_wait_for_generate"):
102 store.delete(cnfvars)
103
d5d2c1d7
CH
104 args, kwargs = set_cnf_mock.call_args
105 cnf_api_input = kwargs["vars"]
d31714a0
SA
106 cnf_input = CnfList.from_api_structure(cnf_api_input)
107
108 expected_input = dedent("""\
109 1 THEME,0: "Intra2net System"
110 2 THEME,1: "Xerberus"
111 """)
112
113 self.assertEqual([c.deleted for c in cnf_api_input],
114 [True] * len(cnfvars))
115 self.assertEqual(str(cnf_input).splitlines(), expected_input.splitlines())
116
117 def _query_and_update(self, store):
118 """
119 Query the items, update a few variables and commit the changes.
120
121 :param store: the cnf store to use
122
123 The changes done were unified in this method so that we test the same
124 thing with different stores.
125 """
126 nics = store.query() \
127 .where(lambda c: c.name == "nic")
128
129 self.assertEqual(len(nics), 4)
130
131 # test querying by instance and child
132 nic_0 = nics.single_with_instance(0)
133 nic_1 = nics.where_child(lambda c: c.name == "nic_comment" and c.value == "b1").single()
134
135 # test treating cnfvars as flags
136 self.assertFalse(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
137 nic_0.enable_child_flag("nic_lan_dns_relaying_allowed")
138 self.assertTrue(nic_0.child_flag_enabled("NIC_LAN_DNS_RELAYING_ALLOWED"))
139
140 # test adding comments
141 nic_0.children.single_with_name("nic_comment").comment = "my cool nic"
142 nic_1.children.first_with_name("NIC_TYPE").comment = "a dsl router"
143
144 # test adding a reference from another cnfvar
145 proxy_profile = store.query().with_name("PROXY_PROFILE").first_with_instance(2)
c89cc126 146 nic_0.add_child(("nic_lan_proxy_profile_ref"), proxy_profile.instance, -1)
d31714a0
SA
147
148 # testing changing the value
149 nic_1.children.first(lambda c: c.name == "nic_comment") \
150 .value = "b18"
151 store.commit(CnfList([nic_0, nic_1], renumber=True))
152
153
154class TestBinaryStore(TestStore):
155 """Test querying, commiting and deleting cnfvars using the binary store."""
156
157 def setUp(self):
158 """Set up mocks and replacements for the real cnf binaries."""
159 with NamedTemporaryFile(delete=False) as tmp:
160 self._get_cnf_output = tmp.name
161 with NamedTemporaryFile(delete=False) as tmp:
162 self._set_cnf_input = tmp.name
163
164 with NamedTemporaryFile(delete=False, mode="w") as tmp:
165 self._fake_get_cnf = tmp.name
166 os.chmod(tmp.name, 0o775) # make file executable
167 tmp.write(dedent(f"""\
168 #!/bin/bash
169 cat "{self._get_cnf_output}"
170 """))
171 tmp.flush()
172
173 with NamedTemporaryFile(delete=False, mode="w") as tmp:
174 self._fake_set_cnf = tmp.name
175 os.chmod(tmp.name, 0o775) # make file executable
176 tmp.write(dedent(f"""\
177 #!/bin/bash
178 input=$(</dev/stdin)
179 echo $@ > "{self._set_cnf_input}"
180 echo "$input" >> "{self._set_cnf_input}"
181 """))
182 tmp.flush()
183
184 get_cnf_mock = patch.object(binary, "BIN_GET_CNF", self._fake_get_cnf)
185 self._get_cnf_mock = (get_cnf_mock, get_cnf_mock.start())
186
187 set_cnf_mock = patch.object(binary, "BIN_SET_CNF", self._fake_set_cnf)
188 self._set_cnf_mock = (set_cnf_mock, set_cnf_mock.start())
189
190 self._arnied_mock = patch.object(BinaryCnfStore, "_call_arnied")
191 self._arnied_mock.start()
192
193 def tearDown(self) -> None:
194 """Drop mocks and clean up files."""
195 self._arnied_mock.stop()
196 self._get_cnf_mock[0].stop()
197 self._set_cnf_mock[0].stop()
198
199 os.unlink(self._get_cnf_output)
200 os.unlink(self._set_cnf_input)
201 os.unlink(self._fake_get_cnf)
202 os.unlink(self._fake_set_cnf)
203
204 def test_querying_and_updating(self):
205 """Test querying and updating cnf variables."""
206 # tell our fake get_cnf to return the whole file
207 shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
208
209 store = BinaryCnfStore()
210
211 self._query_and_update(store)
212
213 expected_input = dedent("""\
214 1 NIC,0: ""
215 2 (1) NIC_COMMENT,0: "b0" # my cool nic
216 3 (1) NIC_DRIVER,0: "virtio_net"
217 4 (1) NIC_LAN_DNS_RELAYING_ALLOWED,0: "1"
218 5 (1) NIC_LAN_EMAIL_RELAYING_ALLOWED,0: "0"
219 6 (1) NIC_LAN_FIREWALL_RULESET_REF,0: "99"
220 7 (1) NIC_LAN_IP,0: "192.168.1.1"
221 8 (1) NIC_LAN_NAT_INTO,0: "0"
222 9 (1) NIC_LAN_NETMASK,0: "255.255.255.0"
223 10 (1) NIC_LAN_PROXY_PROFILE_REF,0: "-1"
224 11 (1) NIC_MAC,0: "02:00:00:00:01:01"
225 12 (1) NIC_TYPE,0: "NATLAN"
226 13 (1) NIC_LAN_PROXY_PROFILE_REF,1: "2"
227 14 NIC,1: ""
228 15 (14) NIC_COMMENT,0: "b18"
229 16 (14) NIC_DRIVER,0: "virtio_net"
230 17 (14) NIC_MAC,0: "02:00:00:00:01:02"
231 18 (14) NIC_TYPE,0: "DSLROUTER" # a dsl router
232 """)
233
234 with open(self._set_cnf_input, "r") as f:
235 set_cnf_args = f.readline()
236 set_cnf_input = f.read()
237
238 self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
239
240 def test_deleting_cnfvars(self):
241 """Test that we can delete top-level cnfvars."""
242 # tell our fake get_cnf to return the whole file
243 shutil.copyfile(TEST_CNF_FILE, self._get_cnf_output)
244
245 store = BinaryCnfStore()
246 cnfvars = store.query() \
247 .where(lambda c: c.name == "THEME")
248
249 store.delete(cnfvars)
250
251 with open(self._set_cnf_input, "r") as f:
252 set_cnf_args = f.readline()
253 set_cnf_input = f.read()
254
255 expected_input = dedent("""\
256 1 THEME,0: "Intra2net System"
257 2 (1) THEME_ARROW_FILENAME,0: "arrow_intranator.gif"
258 3 (1) THEME_CSS_FILENAME,0: "intranator.css"
259 4 (1) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-intranator.cnf"
260 5 (1) THEME_FAVICON_FILENAME,0: "favicon_intranator.ico"
261 6 (1) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_intranator.html"
262 7 (1) THEME_LOGIN_FILENAME,0: "login_intranator.gif"
263 8 (1) THEME_LOGO_FILENAME,0: "intranator.gif"
264 9 (1) THEME_STATISTICS_FILENAME,0: "templates-intranator/arnielizer-config.xml"
265 10 THEME,1: "Xerberus"
266 11 (10) THEME_ARROW_FILENAME,0: "arrow_xerberus.gif"
267 12 (10) THEME_CSS_FILENAME,0: "xerberus.css"
268 13 (10) THEME_DEFAULTVALUES_FILENAME,0: "defaultvalues-xerberus.cnf"
269 14 (10) THEME_FAVICON_FILENAME,0: "favicon_xerberus.ico"
270 15 (10) THEME_LICENSE_AGREEMENT_FILENAME,0: "license_agreement_xerberus.html"
271 16 (10) THEME_LOGIN_FILENAME,0: "login_xerberus.gif"
272 17 (10) THEME_LOGO_FILENAME,0: "xerberus.gif"
273 18 (10) THEME_STATISTICS_FILENAME,0: "templates-xerberus/arnielizer-config.xml"
274 """)
275
276 self.assertIn("-x", set_cnf_args)
277 self.assertEqual(set_cnf_input.splitlines(), expected_input.splitlines())
278
279
280if __name__ == '__main__':
281 unittest.main()