self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS)
self.dbuf = ""
+ # if aes, we decrypt before the compression
+ if self.enctype == 'aes':
+ self.encryption = aescrypto.AESCrypt(self.password)
+ self.encryption.get_salt(self.fileobj)
+ self.encryption.init()
+
# taken from gzip.GzipFile with some alterations
read2 = self.__read(2)
if read2 != "\037\213":
c = len(self.buf)
t = [self.buf]
while c < size:
- buf = self.fileobj.read(self.bufsize)
+ buf = self.__dec_read(self.bufsize)
if not buf:
break
t.append(buf)
t = "".join(t)
self.buf = t[size:]
return t[:size]
+
+ def __dec_read(self, size):
+ buf = self.fileobj.read(size)
+ if self.enctype == 'aes':
+ buf = self.__split_enc_file(buf)
+ return buf
+
+ def __split_enc_file(self, buf):
+ try:
+ idx = buf.index('Salted__')
+ except ValueError:
+ buf = self.encryption.decrypt(buf)
+ else:
+ b1 = buf[:idx]
+ b2 = buf[idx:]
+ buf = self.encryption.decrypt(b1, True)
+ self.encryption.get_salt_str(b2)
+ self.encryption.init()
+ b2 = b2[len(self.encryption.salt_str):]
+ buf += self.__split_enc_file(b2)
+ return buf
# class _Stream
class _StreamProxy(object):
for key, value in hash.iteritems():
assert os.path.exists(key)
assert value == self.md5sum(key)
+
+ def test_decrypt(self):
+ """
+ Create a tar file with only one file inside, using concat
+ compression and encryption mode. Then decrypt it.
+ """
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.aes",
+ mode="w#gz.aes",
+ format=GNU_FORMAT,
+ concat_compression=True,
+ password='key')
+ tarobj.add("big")
+ tarobj.close()
+ os.unlink("big")
+
+ tarobj = TarFile.open("sample.tar.gz.aes",
+ mode="r#gz.aes",
+ format=GNU_FORMAT,
+ password='key')
+ tarobj.extractall()
+ tarobj.close()
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+
+ def test_multiple_file_decrypt(self):
+ """
+ Create a tar file with only one file inside, using concat
+ compression and encryption mode. Then decrypt it.
+ """
+
+ # create sample data
+ hash = dict()
+ hash["big"] = self.create_file("big", 50000)
+ hash["small"] = self.create_file("small", 100)
+ hash["small2"] = self.create_file("small2", 354)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.aes",
+ mode="w#gz.aes",
+ format=GNU_FORMAT,
+ concat_compression=True,
+ password='key')
+
+ for k in hash:
+ tarobj.add(k)
+ tarobj.close()
+
+ for k in hash:
+ os.unlink(k)
+
+ tarobj = TarFile.open("sample.tar.gz.aes",
+ mode="r#gz.aes",
+ format=GNU_FORMAT,
+ password='key')
+ tarobj.extractall()
+ tarobj.close()
+
+ for key, value in hash.iteritems():
+ assert os.path.exists(key)
+ assert value == self.md5sum(key)