ebaf6c867e07e9580ab24aa055a876f84c6968ed
[python-delta-tar] / testing / __init__.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17
18 import os, unittest, hashlib, string
19 import random
20
21 from deltatar import crypto
22
23 import sys
24
25 BLOCK_SIZE = 8096
26
27 def new_volume_handler(tarobj, base_name, volume_number, encryption=None):
28     '''
29     Handles the new volumes
30     '''
31     volume_path = "%s.%d" % (base_name, volume_number)
32     tarobj.open_volume(volume_path, encryption=encryption)
33
34 def make_new_encryption_volume_handler(encryption):
35     '''
36     Handles the new volumes using the right crypto context.
37     '''
38     return lambda tarobj, base_name, volume_number: \
39         new_volume_handler (tarobj, base_name, volume_number,
40                             encryption=encryption)
41
42 def closing_new_volume_handler(tarobj, base_name, volume_number):
43     '''
44     Handles the new volumes
45     '''
46     volume_path = "%s.%d" % (base_name, volume_number)
47     tarobj.fileobj.close()
48     tarobj.open_volume(volume_path)
49
50 class BaseTest(unittest.TestCase):
51     """
52     Test concatenated compression in tarfiles
53     """
54
55     def tearDown(self):
56         '''
57         Remove temporal files created by unit tests
58         '''
59         os.system("rm -rf big big2 small small2 sample.* pdtcrypt-object-*.bin")
60
61     def create_file_low_entropy(self, path, length):
62         '''
63         Creates a file with some gibberish inside, returning the md5sum of that
64         file. File path and length are specified as function arguments.
65         '''
66         data = string.ascii_lowercase + string.digits + "\n"
67
68         # determine how often need to repeat data and how much part of data is
69         # left in the end to fill file up to length
70         n_blocks, remainder = divmod(length, len(data))
71         with open(path, 'w') as write_handle:
72             for _ in range(n_blocks):
73                 write_handle.write(data)
74             write_handle.write(data[:remainder])
75         return self.md5sum(path)
76
77
78     def create_file_high_entropy(self, path, length):
79         """
80         Create a file with quality random content to counteract compression.
81         """
82         fd = os.open (path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
83         try:
84             p = 0
85             while p < length:
86                 todo = min (length - p, BLOCK_SIZE)
87                 os.write (fd, os.urandom (todo))
88                 p += todo
89         finally:
90             os.close (fd)
91         assert p == length
92         return self.md5sum (path)
93
94
95     def create_file(self, path, length, random=False):
96         '''
97         Creates a file with some gibberish inside, returning the md5sum of that
98         file. File path and length are specified as function arguments.
99         '''
100         if random is True:
101             return self.create_file_high_entropy (path, length)
102
103         return self.create_file_low_entropy (path, length)
104
105
106     def create_symlink(self, linkname, path):
107         os.symlink(linkname, path)
108         return self.md5sum(path, linkname=linkname)
109
110     def md5sum(self, filename, linkname=None):
111         '''
112         Returns the md5sum of a file specified by its filename/path or, if
113         ``linkname`` is specified, the hash of both paths (for symlinks).
114         '''
115         md5 = hashlib.md5()
116         if linkname is None:
117             with open(filename,'rb') as f:
118                 for chunk in iter(lambda: f.read(128*md5.block_size), b''):
119                     md5.update(chunk)
120         else: # symlink; hash paths
121             md5.update(filename.encode("UTF-8"))
122             md5.update(linkname.encode("UTF-8"))
123         return md5.hexdigest()