Clean up, remove compat with py < 3.6
[pyi2ncommon] / test / test_zip_stream.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18
19 """ test_zip_stream.py: unit tests for zip_stream
20
21 Test classes and functions in :py:mod:`pyi2ncommon.zip_stream`.
22
23 .. codeauthor:: Intra2net
24 """
25
26 import unittest
27 from tempfile import mkdtemp
28 import os
29 from os.path import join, isfile
30 import shutil
31 import io
32 import zipfile       # just to make sure: for comparing read zips using zipfile
33
34 # relative import of tested module ensures we do not test installed version
35 try:
36     from src.zip_stream import ZipStream
37 except ImportError as ie:
38     # prevent misleading error from unittest loader
39     # AttributeError: 'module' object has no attribute 'test_zip_stream'
40     raise RuntimeError('Failed to import tested module: {}'.format(ie))
41
42
43 # test data
44 TEXT_DATA = 'Test text file\nfor testing pyi2ncommon.zip_stream\n'
45 BIN_DATA = b'Test binary file\nfor testing pyi2ncommon.zip_stream\n' \
46            b'Bytes: \x00\x01\x02\nEnd\n'
47
48 # test file names (all in temp test dir)
49 TEXT_FILE = 'text.txt'
50 BIN_FILE = 'binary.bin'
51 SUBDIR = 'subdir'
52 ZIP_FILE = 'test.zip'
53
54
55 class BytesIONoSeekNorRead(io.BytesIO):
56     """Subclass of :py:class:`io.BytesIO` with seek() and read() removed"""
57     def seekable(self):
58         return False
59
60     def readable(self):
61         return False
62
63     def seek(self, *args):
64         raise AttributeError('this function was removed')
65
66     def readline(self, *args):
67         raise AttributeError('this function was removed')
68
69     def readlines(self, *args):
70         raise AttributeError('this function was removed')
71
72     def read(self, *args):
73         raise AttributeError('this function was removed')
74
75     def readinto(self, *args):
76         raise AttributeError('this function was removed')
77
78     def readall(self, *args):
79         raise AttributeError('this function was removed')
80
81     def read1(self, *args):
82         raise AttributeError('this function was removed')
83
84
85 class ZipStreamTester(unittest.TestCase):
86     """ only test case in this module, see module doc for more help """
87
88     #: directory used for all files; managed in setUpClass / tearDownClass
89     temp_dir = None
90
91     @classmethod
92     def temp_path(cls, *path_components):
93         """Quick alias to create file name in temp dir"""
94         return join(cls.temp_dir, *path_components)
95
96     @classmethod
97     def setUpClass(cls):
98         """
99         called once before tests in this class
100
101         creates temp dir with a few files for zipping in it
102         """
103         cls.temp_dir = mkdtemp(prefix='pyi2ncommon-test-zip-stream-')
104         with open(cls.temp_path(TEXT_FILE), 'wt') as writer:
105             writer.write(TEXT_DATA)
106         with open(cls.temp_path(BIN_FILE), 'wb') as writer:
107             writer.write(BIN_DATA)
108         os.mkdir(cls.temp_path(SUBDIR))
109
110     @classmethod
111     def tearDownClass(cls):
112         """called once when all tests in this class are done. rm temp_dir"""
113         shutil.rmtree(cls.temp_dir)
114
115     def tearDown(self):
116         """ called after each test function """
117         if isfile(self.temp_path(ZIP_FILE)):
118             os.unlink(self.temp_path(ZIP_FILE))
119
120     def _test_zip(self, test_subdir=False):
121         """Helper for test_* functions: check given zip for contents"""
122         expect_contents = {TEXT_FILE, BIN_FILE}
123         if test_subdir:
124             expect_contents.add(SUBDIR + '/')
125
126         with zipfile.ZipFile(self.temp_path(ZIP_FILE), 'r') as unzipper:
127             self.assertEqual(unzipper.testzip(), None)
128             self.assertSetEqual(set(unzipper.namelist()), expect_contents)
129             with unzipper.open(TEXT_FILE) as reader:
130                 self.assertEqual(reader.read().decode('utf8'), TEXT_DATA)
131             with unzipper.open(BIN_FILE) as reader:
132                 self.assertEqual(reader.read(), BIN_DATA)
133
134     def test_base_class(self):
135         """Check that we did not destroy ZipStream's base class's behaviour"""
136         # create zip archive using functions in ZipFile
137         with ZipStream(self.temp_path(ZIP_FILE), 'w') as zipper:
138             zipper.write(self.temp_path(TEXT_FILE), TEXT_FILE)
139             zipper.writestr(BIN_FILE, BIN_DATA)
140             zipper.write(self.temp_path(SUBDIR), SUBDIR)
141         self._test_zip(test_subdir=True)
142
143     def test_stream_input(self):
144         """Test function write_stream we added; write to file"""
145         # create zip archive using write_stream
146         with ZipStream(self.temp_path(ZIP_FILE), 'w') as zipper:
147             info = zipper.create_zipinfo(self.temp_path(BIN_FILE), BIN_FILE)
148             with open(self.temp_path(BIN_FILE), 'rb') as reader:
149                 zipper.write_stream(reader, info)
150             info = zipper.create_zipinfo(self.temp_path(TEXT_FILE), TEXT_FILE)
151             with open(self.temp_path(TEXT_FILE), 'rb') as reader:  # read byte!
152                 zipper.write_stream(reader, info)
153         self._test_zip()
154
155     def test_stream_output(self):
156         """Test writing to an output stream like sys.stdout"""
157         # create zip archive that writes to buffer
158         output = BytesIONoSeekNorRead()
159         with ZipStream(output, 'w') as zipper:
160             info = zipper.create_zipinfo(self.temp_path(BIN_FILE), BIN_FILE)
161             with open(self.temp_path(BIN_FILE), 'rb') as reader:
162                 zipper.write_stream(reader, info)
163             info = zipper.create_zipinfo(self.temp_path(TEXT_FILE), TEXT_FILE)
164             with open(self.temp_path(TEXT_FILE), 'rb') as reader:  # read byte!
165                 zipper.write_stream(reader, info)
166
167         # now write to file and test
168         with open(self.temp_path(ZIP_FILE), 'wb') as writer:
169             writer.write(output.getvalue())
170         self._test_zip()
171
172
173 if __name__ == '__main__':
174     unittest.main()