Implemented aes decryption support
authorDaniel Garcia Moreno <danigm@wadobo.com>
Fri, 12 Jul 2013 13:58:38 +0000 (15:58 +0200)
committerDaniel Garcia Moreno <danigm@wadobo.com>
Fri, 12 Jul 2013 13:58:38 +0000 (15:58 +0200)
deltatar/aescrypto.py
deltatar/tarfile.py
testing/test_encryption.py

index 3d8066f..85d2dbb 100644 (file)
@@ -104,7 +104,7 @@ class AESCrypt:
 
         # Adding pad, only needed when there's no pad, when using OFB
         if len(buf) % bs != 0:
-            buf += get_pad(bs  - len(buf) % bs)
+            buf += self.get_pad(bs  - len(buf) % bs)
 
         chunk = self.cipher.decrypt(buf)
         if end:
index 3cf854f..e632421 100644 (file)
@@ -596,6 +596,12 @@ class _Stream:
         self.cmp = self.zlib.decompressobj(-self.zlib.MAX_WBITS)
         self.dbuf = ""
 
+        # if aes, we decrypt before the compression
+        if self.enctype == 'aes':
+            self.encryption = aescrypto.AESCrypt(self.password)
+            self.encryption.get_salt(self.fileobj)
+            self.encryption.init()
+
         # taken from gzip.GzipFile with some alterations
         read2 = self.__read(2)
         if read2 != "\037\213":
@@ -702,7 +708,7 @@ class _Stream:
         c = len(self.buf)
         t = [self.buf]
         while c < size:
-            buf = self.fileobj.read(self.bufsize)
+            buf = self.__dec_read(self.bufsize)
             if not buf:
                 break
             t.append(buf)
@@ -711,6 +717,27 @@ class _Stream:
         t = "".join(t)
         self.buf = t[size:]
         return t[:size]
+
+    def __dec_read(self, size):
+        buf = self.fileobj.read(size)
+        if self.enctype == 'aes':
+            buf = self.__split_enc_file(buf)
+        return buf
+
+    def __split_enc_file(self, buf):
+        try:
+            idx = buf.index('Salted__')
+        except ValueError:
+            buf = self.encryption.decrypt(buf)
+        else:
+            b1 = buf[:idx]
+            b2 = buf[idx:]
+            buf = self.encryption.decrypt(b1, True)
+            self.encryption.get_salt_str(b2)
+            self.encryption.init()
+            b2 = b2[len(self.encryption.salt_str):]
+            buf += self.__split_enc_file(b2)
+        return buf
 # class _Stream
 
 class _StreamProxy(object):
index 9c3c5bb..12a2e11 100644 (file)
@@ -89,3 +89,69 @@ class EncryptionTest(BaseTest):
         for key, value in hash.iteritems():
             assert os.path.exists(key)
             assert value == self.md5sum(key)
+
+    def test_decrypt(self):
+        """
+        Create a tar file with only one file inside, using concat
+        compression and encryption mode. Then decrypt it.
+        """
+
+        # create the content of the file to compress and hash it
+        hash = self.create_file("big", 50000)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.aes",
+                              mode="w#gz.aes",
+                              format=GNU_FORMAT,
+                              concat_compression=True,
+                              password='key')
+        tarobj.add("big")
+        tarobj.close()
+        os.unlink("big")
+
+        tarobj = TarFile.open("sample.tar.gz.aes",
+                              mode="r#gz.aes",
+                              format=GNU_FORMAT,
+                              password='key')
+        tarobj.extractall()
+        tarobj.close()
+        assert os.path.exists("big")
+        assert hash == self.md5sum("big")
+
+
+    def test_multiple_file_decrypt(self):
+        """
+        Create a tar file with only one file inside, using concat
+        compression and encryption mode. Then decrypt it.
+        """
+
+        # create sample data
+        hash = dict()
+        hash["big"] = self.create_file("big", 50000)
+        hash["small"] = self.create_file("small", 100)
+        hash["small2"] = self.create_file("small2", 354)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.aes",
+                              mode="w#gz.aes",
+                              format=GNU_FORMAT,
+                              concat_compression=True,
+                              password='key')
+
+        for k in hash:
+            tarobj.add(k)
+        tarobj.close()
+
+        for k in hash:
+            os.unlink(k)
+
+        tarobj = TarFile.open("sample.tar.gz.aes",
+                              mode="r#gz.aes",
+                              format=GNU_FORMAT,
+                              password='key')
+        tarobj.extractall()
+        tarobj.close()
+
+        for key, value in hash.iteritems():
+            assert os.path.exists(key)
+            assert value == self.md5sum(key)