From: Daniel Garcia Moreno Date: Thu, 18 Jul 2013 18:12:47 +0000 (+0200) Subject: Supporing aes128 and aes256 encryption type X-Git-Tag: v2.2~157 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=169fcbb4199f9fd8cb5cf9f393d5dc0353a2f58c;p=python-delta-tar Supporing aes128 and aes256 encryption type --- diff --git a/deltatar/aescrypto.py b/deltatar/aescrypto.py index ee5d546..973400e 100644 --- a/deltatar/aescrypto.py +++ b/deltatar/aescrypto.py @@ -35,10 +35,10 @@ class AESCrypt: This class provides a simple method to encrypt and decrypt text using AES. ''' - def __init__(self, password, salt=''): + def __init__(self, password, salt='', key_length=16): self.bs = AES.block_size self.mode = AES.MODE_CBC - self.key_length = 16 + self.key_length = key_length self.buf = '' if salt: self.salt = salt diff --git a/deltatar/tarfile.py b/deltatar/tarfile.py index d732c7c..849942e 100644 --- a/deltatar/tarfile.py +++ b/deltatar/tarfile.py @@ -400,7 +400,8 @@ class _Stream: """ def __init__(self, name, mode, comptype, fileobj, bufsize, - concat_stream=False, enctype='', password=""): + concat_stream=False, enctype='', password="", + key_length=16): """Construct a _Stream object. """ self._extfileobj = True @@ -427,6 +428,7 @@ class _Stream: self.internal_pos = 0L self.concat_stream = concat_stream self.enctype = enctype + self.key_length = key_length self.password = password if comptype == "gz": @@ -466,7 +468,8 @@ class _Stream: # if aes, we encrypt after compression if self.enctype == 'aes': - self.encryption = aescrypto.AESCrypt(self.password) + self.encryption = aescrypto.AESCrypt(self.password, + key_length=self.key_length) self.encryption.init() self.__write_to_file(self.encryption.salt_str) @@ -474,8 +477,8 @@ class _Stream: self.__write("\037\213\010\010%s\002\377" % timestamp) if type(self.name) is unicode: self.name = self.name.encode("iso-8859-1", "replace") - if self.name.endswith(".aes"): - self.name = self.name[:-4] + if self.name.endswith(".aes128") or self.name.endswith(".aes256"): + self.name = self.name[:-7] if self.name.endswith(".gz"): self.name = self.name[:-3] self.__write(self.name + NUL) @@ -507,7 +510,8 @@ class _Stream: # if aes, we encrypt after compression if self.enctype == 'aes': self.__write_to_file(self.encryption.close_enc()) - self.encryption = aescrypto.AESCrypt(self.password) + self.encryption = aescrypto.AESCrypt(self.password, + key_length=self.key_length) self.encryption.init() self.__write_to_file(self.encryption.salt_str) @@ -598,7 +602,8 @@ class _Stream: # if aes, we decrypt before the compression if self.enctype == 'aes': - self.encryption = aescrypto.AESCrypt(self.password) + self.encryption = aescrypto.AESCrypt(self.password, + key_length=self.key_length) self.encryption.get_salt(self.fileobj) self.encryption.init() @@ -1841,8 +1846,10 @@ class TarFile(object): 'r#gz' open a stream of gzip compressed tar blocks for reading 'w#gz' open a stream of gzip compressed tar blocks for writing - 'r#gz.aes' open an aes encrypted stream of gzip compressed tar blocks for reading - 'w#gz.aes' open an aes encrypted stream of gzip compressed tar blocks for writing + 'r#gz.aes128' open an aes128 encrypted stream of gzip compressed tar blocks for reading + 'w#gz.aes128' open an aes128 encrypted stream of gzip compressed tar blocks for writing + 'r#gz.aes256' open an aes256 encrypted stream of gzip compressed tar blocks for reading + 'w#gz.aes256' open an aes256 encrypted stream of gzip compressed tar blocks for writing """ if not name and not fileobj: @@ -1895,16 +1902,20 @@ class TarFile(object): password = '' # if not enctype there's no encryption enctype = '' + key_length = 16 if filemode not in "rw": raise ValueError("mode must be 'r' or 'w'") - if comptype not in ["gz", "gz.aes"]: + if comptype not in ["gz", "gz.aes128", "gz.aes256"]: raise ValueError("comptype must be 'gz'") - # encryption gz.aes + # encryption gz.aes128 or gz.aes256 if "." in comptype: comptype, enctype = comptype.split(".", 1) + kl = enctype[3:] + enctype = enctype[:3] + key_length = 16 if kl == '128' else 32 password = kwargs.get('password', '') if not password: raise ValueError("you should give a password for encryption") @@ -1914,7 +1925,7 @@ class TarFile(object): t = cls(name, filemode, _Stream(name, filemode, comptype, fileobj, bufsize, concat_stream=True, enctype=enctype, - password=password), + password=password, key_length=key_length), **kwargs) t._extfileobj = False return t diff --git a/testing/test_encryption.py b/testing/test_encryption.py index 2af0732..03b597a 100644 --- a/testing/test_encryption.py +++ b/testing/test_encryption.py @@ -40,8 +40,8 @@ class EncryptionTest(BaseTest): hash = self.create_file("big", 50000) # create the tar file with volumes - tarobj = TarFile.open("sample.tar.gz.aes", - mode="w#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="w#gz.aes128", format=GNU_FORMAT, concat_compression=True, password='key') @@ -50,7 +50,7 @@ class EncryptionTest(BaseTest): os.unlink("big") # extract with normal tar and check output - filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes") + filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128") assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file assert os.path.exists("sample.tar.gz.aes.1") # first file @@ -75,8 +75,8 @@ class EncryptionTest(BaseTest): hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes - tarobj = TarFile.open("sample.tar.gz.aes", - mode="w#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="w#gz.aes128", format=GNU_FORMAT, concat_compression=True, password='key') @@ -89,7 +89,7 @@ class EncryptionTest(BaseTest): os.unlink(k) # extract with normal tar and check output - filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes") + filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128") assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file assert os.path.exists("sample.tar.gz.aes.1") # first file @@ -118,8 +118,8 @@ class EncryptionTest(BaseTest): hash = self.create_file("big", 50000) # create the tar file with volumes - tarobj = TarFile.open("sample.tar.gz.aes", - mode="w#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="w#gz.aes128", format=GNU_FORMAT, concat_compression=True, password='key') @@ -127,8 +127,8 @@ class EncryptionTest(BaseTest): tarobj.close() os.unlink("big") - tarobj = TarFile.open("sample.tar.gz.aes", - mode="r#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="r#gz.aes128", format=GNU_FORMAT, password='key') tarobj.extractall() @@ -150,8 +150,8 @@ class EncryptionTest(BaseTest): hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes - tarobj = TarFile.open("sample.tar.gz.aes", - mode="w#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="w#gz.aes128", format=GNU_FORMAT, concat_compression=True, password='key') @@ -163,8 +163,8 @@ class EncryptionTest(BaseTest): for k in hash: os.unlink(k) - tarobj = TarFile.open("sample.tar.gz.aes", - mode="r#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="r#gz.aes128", format=GNU_FORMAT, password='key') tarobj.extractall() @@ -187,8 +187,8 @@ class EncryptionTest(BaseTest): hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes - tarobj = TarFile.open("sample.tar.gz.aes", - mode="w#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="w#gz.aes128", password='key', concat_compression=True, max_volume_size=20000, @@ -198,13 +198,13 @@ class EncryptionTest(BaseTest): tarobj.add(k) tarobj.close() - assert os.path.exists("sample.tar.gz.aes") + assert os.path.exists("sample.tar.gz.aes128") for k in hash: os.unlink(k) # extract - tarobj = TarFile.open("sample.tar.gz.aes", - mode="r#gz.aes", + tarobj = TarFile.open("sample.tar.gz.aes128", + mode="r#gz.aes128", password="key", new_volume_handler=new_volume_handler) tarobj.extractall() @@ -214,3 +214,78 @@ class EncryptionTest(BaseTest): for key, value in hash.iteritems(): assert os.path.exists(key) assert value == self.md5sum(key) + + def test_multivol_file_decrypt_aes256(self): + ''' + Test multivol tarball with encryption aes256 + ''' + + # create sample data + hash = dict() + hash["big"] = self.create_file("big", 50000) + hash["big2"] = self.create_file("big2", 10200) + hash["small"] = self.create_file("small", 100) + hash["small2"] = self.create_file("small2", 354) + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar.gz.aes256", + mode="w#gz.aes256", + password='key', + concat_compression=True, + max_volume_size=20000, + new_volume_handler=new_volume_handler) + + for k in hash: + tarobj.add(k) + tarobj.close() + + assert os.path.exists("sample.tar.gz.aes256") + for k in hash: + os.unlink(k) + + # extract + tarobj = TarFile.open("sample.tar.gz.aes256", + mode="r#gz.aes256", + password="key", + new_volume_handler=new_volume_handler) + tarobj.extractall() + tarobj.close() + + # check output + for key, value in hash.iteritems(): + assert os.path.exists(key) + assert value == self.md5sum(key) + + def test_openssl_decrypt_256(self): + """ + Create a tar file with only one file inside, using concat + compression and encryption mode. Then decrypt with openssl, + decompress it with zcat and untar it with gnu tar. + + Using aes 256. + """ + + # create the content of the file to compress and hash it + hash = self.create_file("big", 50000) + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar.gz.aes256", + mode="w#gz.aes256", + format=GNU_FORMAT, + concat_compression=True, + password='key') + tarobj.add("big") + tarobj.close() + os.unlink("big") + + # extract with normal tar and check output + filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes256") + + assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file + assert os.path.exists("sample.tar.gz.aes.1") # first file + + os.system("openssl aes-256-cbc -nopad -k 'key' -d -in sample.tar.gz.aes.1 -out sample.tar.gz") + os.system("zcat sample.tar.gz 2>/dev/null > sample.tar") + os.system("tar xf sample.tar") + assert os.path.exists("big") + assert hash == self.md5sum("big")