Supporing aes128 and aes256 encryption type
authorDaniel Garcia Moreno <danigm@wadobo.com>
Thu, 18 Jul 2013 18:12:47 +0000 (20:12 +0200)
committerDaniel Garcia Moreno <danigm@wadobo.com>
Thu, 18 Jul 2013 18:12:47 +0000 (20:12 +0200)
deltatar/aescrypto.py
deltatar/tarfile.py
testing/test_encryption.py

index ee5d546..973400e 100644 (file)
@@ -35,10 +35,10 @@ class AESCrypt:
     This class provides a simple method to encrypt and decrypt text using
     AES.
     '''
-    def __init__(self, password, salt=''):
+    def __init__(self, password, salt='', key_length=16):
         self.bs = AES.block_size
         self.mode = AES.MODE_CBC
-        self.key_length = 16
+        self.key_length = key_length
         self.buf = ''
         if salt:
             self.salt = salt
index d732c7c..849942e 100644 (file)
@@ -400,7 +400,8 @@ class _Stream:
     """
 
     def __init__(self, name, mode, comptype, fileobj, bufsize,
-                 concat_stream=False, enctype='', password=""):
+                 concat_stream=False, enctype='', password="",
+                 key_length=16):
         """Construct a _Stream object.
         """
         self._extfileobj = True
@@ -427,6 +428,7 @@ class _Stream:
         self.internal_pos = 0L
         self.concat_stream = concat_stream
         self.enctype  = enctype
+        self.key_length = key_length
         self.password = password
 
         if comptype == "gz":
@@ -466,7 +468,8 @@ class _Stream:
 
         # if aes, we encrypt after compression
         if self.enctype == 'aes':
-            self.encryption = aescrypto.AESCrypt(self.password)
+            self.encryption = aescrypto.AESCrypt(self.password,
+                                                 key_length=self.key_length)
             self.encryption.init()
             self.__write_to_file(self.encryption.salt_str)
 
@@ -474,8 +477,8 @@ class _Stream:
         self.__write("\037\213\010\010%s\002\377" % timestamp)
         if type(self.name) is unicode:
             self.name = self.name.encode("iso-8859-1", "replace")
-        if self.name.endswith(".aes"):
-            self.name = self.name[:-4]
+        if self.name.endswith(".aes128") or self.name.endswith(".aes256"):
+            self.name = self.name[:-7]
         if self.name.endswith(".gz"):
             self.name = self.name[:-3]
         self.__write(self.name + NUL)
@@ -507,7 +510,8 @@ class _Stream:
         # if aes, we encrypt after compression
         if self.enctype == 'aes':
             self.__write_to_file(self.encryption.close_enc())
-            self.encryption = aescrypto.AESCrypt(self.password)
+            self.encryption = aescrypto.AESCrypt(self.password,
+                                                 key_length=self.key_length)
             self.encryption.init()
             self.__write_to_file(self.encryption.salt_str)
 
@@ -598,7 +602,8 @@ class _Stream:
 
         # if aes, we decrypt before the compression
         if self.enctype == 'aes':
-            self.encryption = aescrypto.AESCrypt(self.password)
+            self.encryption = aescrypto.AESCrypt(self.password,
+                                                 key_length=self.key_length)
             self.encryption.get_salt(self.fileobj)
             self.encryption.init()
 
@@ -1841,8 +1846,10 @@ class TarFile(object):
            'r#gz'       open a stream of gzip compressed tar blocks for reading
            'w#gz'       open a stream of gzip compressed tar blocks for writing
 
-           'r#gz.aes'   open an aes encrypted stream of gzip compressed tar blocks for reading
-           'w#gz.aes'   open an aes encrypted stream of gzip compressed tar blocks for writing
+           'r#gz.aes128'   open an aes128 encrypted stream of gzip compressed tar blocks for reading
+           'w#gz.aes128'   open an aes128 encrypted stream of gzip compressed tar blocks for writing
+           'r#gz.aes256'   open an aes256 encrypted stream of gzip compressed tar blocks for reading
+           'w#gz.aes256'   open an aes256 encrypted stream of gzip compressed tar blocks for writing
         """
 
         if not name and not fileobj:
@@ -1895,16 +1902,20 @@ class TarFile(object):
             password = ''
             # if not enctype there's no encryption
             enctype = ''
+            key_length = 16
 
             if filemode not in "rw":
                 raise ValueError("mode must be 'r' or 'w'")
 
-            if comptype not in ["gz", "gz.aes"]:
+            if comptype not in ["gz", "gz.aes128", "gz.aes256"]:
                 raise ValueError("comptype must be 'gz'")
 
-            # encryption gz.aes
+            # encryption gz.aes128 or gz.aes256
             if "." in comptype:
                 comptype, enctype = comptype.split(".", 1)
+                kl = enctype[3:]
+                enctype = enctype[:3]
+                key_length = 16 if kl == '128' else 32
                 password = kwargs.get('password', '')
                 if not password:
                     raise ValueError("you should give a password for encryption")
@@ -1914,7 +1925,7 @@ class TarFile(object):
             t = cls(name, filemode,
                     _Stream(name, filemode, comptype, fileobj, bufsize,
                             concat_stream=True, enctype=enctype,
-                            password=password),
+                            password=password, key_length=key_length),
                     **kwargs)
             t._extfileobj = False
             return t
index 2af0732..03b597a 100644 (file)
@@ -40,8 +40,8 @@ class EncryptionTest(BaseTest):
         hash = self.create_file("big", 50000)
 
         # create the tar file with volumes
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="w#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="w#gz.aes128",
                               format=GNU_FORMAT,
                               concat_compression=True,
                               password='key')
@@ -50,7 +50,7 @@ class EncryptionTest(BaseTest):
         os.unlink("big")
 
         # extract with normal tar and check output
-        filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes")
+        filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128")
 
         assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
         assert os.path.exists("sample.tar.gz.aes.1") # first file
@@ -75,8 +75,8 @@ class EncryptionTest(BaseTest):
         hash["small2"] = self.create_file("small2", 354)
 
         # create the tar file with volumes
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="w#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="w#gz.aes128",
                               format=GNU_FORMAT,
                               concat_compression=True,
                               password='key')
@@ -89,7 +89,7 @@ class EncryptionTest(BaseTest):
             os.unlink(k)
 
         # extract with normal tar and check output
-        filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes")
+        filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128")
 
         assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
         assert os.path.exists("sample.tar.gz.aes.1") # first file
@@ -118,8 +118,8 @@ class EncryptionTest(BaseTest):
         hash = self.create_file("big", 50000)
 
         # create the tar file with volumes
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="w#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="w#gz.aes128",
                               format=GNU_FORMAT,
                               concat_compression=True,
                               password='key')
@@ -127,8 +127,8 @@ class EncryptionTest(BaseTest):
         tarobj.close()
         os.unlink("big")
 
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="r#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="r#gz.aes128",
                               format=GNU_FORMAT,
                               password='key')
         tarobj.extractall()
@@ -150,8 +150,8 @@ class EncryptionTest(BaseTest):
         hash["small2"] = self.create_file("small2", 354)
 
         # create the tar file with volumes
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="w#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="w#gz.aes128",
                               format=GNU_FORMAT,
                               concat_compression=True,
                               password='key')
@@ -163,8 +163,8 @@ class EncryptionTest(BaseTest):
         for k in hash:
             os.unlink(k)
 
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="r#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="r#gz.aes128",
                               format=GNU_FORMAT,
                               password='key')
         tarobj.extractall()
@@ -187,8 +187,8 @@ class EncryptionTest(BaseTest):
         hash["small2"] = self.create_file("small2", 354)
 
         # create the tar file with volumes
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="w#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="w#gz.aes128",
                               password='key',
                               concat_compression=True,
                               max_volume_size=20000,
@@ -198,13 +198,13 @@ class EncryptionTest(BaseTest):
             tarobj.add(k)
         tarobj.close()
 
-        assert os.path.exists("sample.tar.gz.aes")
+        assert os.path.exists("sample.tar.gz.aes128")
         for k in hash:
             os.unlink(k)
 
         # extract
-        tarobj = TarFile.open("sample.tar.gz.aes",
-                              mode="r#gz.aes",
+        tarobj = TarFile.open("sample.tar.gz.aes128",
+                              mode="r#gz.aes128",
                               password="key",
                               new_volume_handler=new_volume_handler)
         tarobj.extractall()
@@ -214,3 +214,78 @@ class EncryptionTest(BaseTest):
         for key, value in hash.iteritems():
             assert os.path.exists(key)
             assert value == self.md5sum(key)
+
+    def test_multivol_file_decrypt_aes256(self):
+        '''
+        Test multivol tarball with encryption aes256
+        '''
+
+        # create sample data
+        hash = dict()
+        hash["big"] = self.create_file("big", 50000)
+        hash["big2"] = self.create_file("big2", 10200)
+        hash["small"] = self.create_file("small", 100)
+        hash["small2"] = self.create_file("small2", 354)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.aes256",
+                              mode="w#gz.aes256",
+                              password='key',
+                              concat_compression=True,
+                              max_volume_size=20000,
+                              new_volume_handler=new_volume_handler)
+
+        for k in hash:
+            tarobj.add(k)
+        tarobj.close()
+
+        assert os.path.exists("sample.tar.gz.aes256")
+        for k in hash:
+            os.unlink(k)
+
+        # extract
+        tarobj = TarFile.open("sample.tar.gz.aes256",
+                              mode="r#gz.aes256",
+                              password="key",
+                              new_volume_handler=new_volume_handler)
+        tarobj.extractall()
+        tarobj.close()
+
+        # check output
+        for key, value in hash.iteritems():
+            assert os.path.exists(key)
+            assert value == self.md5sum(key)
+
+    def test_openssl_decrypt_256(self):
+        """
+        Create a tar file with only one file inside, using concat
+        compression and encryption mode. Then decrypt with openssl,
+        decompress it with zcat and untar it with gnu tar.
+
+        Using aes 256.
+        """
+
+        # create the content of the file to compress and hash it
+        hash = self.create_file("big", 50000)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.aes256",
+                              mode="w#gz.aes256",
+                              format=GNU_FORMAT,
+                              concat_compression=True,
+                              password='key')
+        tarobj.add("big")
+        tarobj.close()
+        os.unlink("big")
+
+        # extract with normal tar and check output
+        filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes256")
+
+        assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
+        assert os.path.exists("sample.tar.gz.aes.1") # first file
+
+        os.system("openssl aes-256-cbc -nopad -k 'key' -d -in sample.tar.gz.aes.1 -out sample.tar.gz")
+        os.system("zcat sample.tar.gz 2>/dev/null > sample.tar")
+        os.system("tar xf sample.tar")
+        assert os.path.exists("big")
+        assert hash == self.md5sum("big")