8f3bbbe72587c49e891d492b25dd68a13d7db908
[python-delta-tar] / testing / __init__.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17
18 import os, unittest, hashlib, string
19 import random
20
21 from deltatar import crypto
22
23 import sys
24
25 BLOCK_SIZE = 8096
26
27 def new_volume_handler(tarobj, base_name, volume_number, encryption=None):
28     '''
29     Handles the new volumes
30     '''
31     volume_path = "%s.%d" % (base_name, volume_number)
32     tarobj.close()
33     tarobj.open_volume(volume_path, encryption=encryption)
34
35 def make_new_encryption_volume_handler(encryption):
36     '''
37     Handles the new volumes using the right crypto context.
38     '''
39     return lambda tarobj, base_name, volume_number: \
40         new_volume_handler (tarobj, base_name, volume_number,
41                             encryption=encryption)
42
43 def closing_new_volume_handler(tarobj, base_name, volume_number):
44     '''
45     Handles the new volumes
46     '''
47     volume_path = "%s.%d" % (base_name, volume_number)
48     tarobj.fileobj.close()
49     tarobj.open_volume(volume_path)
50
51 class BaseTest(unittest.TestCase):
52     """
53     Test concatenated compression in tarfiles
54     """
55
56     def tearDown(self):
57         '''
58         Remove temporal files created by unit tests
59         '''
60         os.system("rm -rf big big2 small small2 sample.* pdtcrypt-object-*.bin")
61
62     def create_file_low_entropy(self, path, length):
63         '''
64         Creates a file with some gibberish inside, returning the md5sum of that
65         file. File path and length are specified as function arguments.
66         '''
67         data = string.ascii_lowercase + string.digits + "\n"
68
69         # determine how often need to repeat data and how much part of data is
70         # left in the end to fill file up to length
71         n_blocks, remainder = divmod(length, len(data))
72         with open(path, 'w') as write_handle:
73             for _ in range(n_blocks):
74                 write_handle.write(data)
75             write_handle.write(data[:remainder])
76         return self.md5sum(path)
77
78
79     def create_file_high_entropy(self, path, length):
80         """
81         Create a file with quality random content to counteract compression.
82         """
83         fd = os.open (path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
84         try:
85             p = 0
86             while p < length:
87                 todo = min (length - p, BLOCK_SIZE)
88                 os.write (fd, os.urandom (todo))
89                 p += todo
90         finally:
91             os.close (fd)
92         assert p == length
93         return self.md5sum (path)
94
95
96     def create_file(self, path, length, random=False):
97         '''
98         Creates a file with some gibberish inside, returning the md5sum of that
99         file. File path and length are specified as function arguments.
100         '''
101         if random is True:
102             return self.create_file_high_entropy (path, length)
103
104         return self.create_file_low_entropy (path, length)
105
106
107     def create_symlink(self, linkname, path):
108         os.symlink(linkname, path)
109         return self.md5sum(path, linkname=linkname)
110
111     def md5sum(self, filename, linkname=None):
112         '''
113         Returns the md5sum of a file specified by its filename/path or, if
114         ``linkname`` is specified, the hash of both paths (for symlinks).
115         '''
116         md5 = hashlib.md5()
117         if linkname is None:
118             with open(filename,'rb') as f:
119                 for chunk in iter(lambda: f.read(128*md5.block_size), b''):
120                     md5.update(chunk)
121         else: # symlink; hash paths
122             md5.update(filename.encode("UTF-8"))
123             md5.update(linkname.encode("UTF-8"))
124         return md5.hexdigest()