--- /dev/null
+#!/usr/bin/env python
+
+# Copyright (C) 2013 Daniel Garcia <danigm@wadobo.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+'''
+AES encryption and decryption lib.
+This is a simple utility lib over pycrypto to encrypt and decrypt using AES
+compatible with openssl command.
+'''
+
+
+from hashlib import md5
+from Crypto.Cipher import AES
+from Crypto import Random
+
+
+class AESCrypt:
+ '''
+ This class provides a simple method to encrypt and decrypt text using
+ AES.
+ '''
+ def __init__(self, password, salt=''):
+ self.bs = AES.block_size
+ self.mode = AES.MODE_CBC
+ self.key_length = 32
+ self.buf = ''
+ if salt:
+ self.salt = salt
+ else:
+ self.salt = Random.new().read(self.bs - len('Salted__'))
+ self.password = password
+
+ self.get_pad = self.get_pkcs5_pad
+ self.split_pad = self.split_pkcs5_pad
+
+ def init(self):
+ '''
+ Initialize the Crypto.AES object with the password provided and the
+ salt calculated.
+
+ For decrypt you should call to get_salt or get_salt_str before the
+ decryption to get the correct salt
+ '''
+ self.derive_key_and_iv()
+ self.cipher = AES.new(self.key, self.mode, self.iv)
+ self.salt_str = 'Salted__' + self.salt
+
+ def close_enc(self):
+ '''
+ Adds the needed padding to the chunk to be able to encrypt and
+ encrypts the remaining buf
+
+ returns the encrypted text
+ '''
+ chunk = self.buf
+ self.buf = ''
+ need_padding = len(chunk) % self.bs != 0
+ padding_length = self.bs - len(chunk) % self.bs
+ chunk += self.get_pad(padding_length)
+ return self.cipher.encrypt(chunk)
+
+ def encrypt(self, chunk):
+ '''
+ Encrypts the text chunk given. If it's not multiple of Block Size
+ the chunk is buffered and '' is returned, in other case the chunk
+ encrypted is returned.
+ '''
+
+ self.buf += chunk
+
+ chunk = self.buf
+ if len(chunk) % self.bs == 0:
+ self.buf = ''
+ return self.cipher.encrypt(chunk)
+
+ return ''
+
+ def decrypt(self, buf, end=False):
+ '''
+ Decrypts the buf. If end is True this will split the encryption
+ padding.
+
+ Returns the decrypted text
+ '''
+
+ bs = self.bs
+
+ # Adding pad, only needed when there's no pad, when using OFB
+ if len(buf) % bs != 0:
+ buf += get_pad(bs - len(buf) % bs)
+
+ chunk = self.cipher.decrypt(buf)
+ if end:
+ chunk = self.split_pad(chunk)
+ return chunk
+
+ def get_salt(self, instream):
+ '''
+ Calculates the salt for an input encrypted file
+ '''
+ self.salt = instream.read(self.bs)[len('Salted__'):]
+
+ def get_salt_str(self, instr):
+ '''
+ Calculates the salt for an input encrypted string
+ '''
+ self.salt = instr[len('Salted__'):self.bs]
+
+ def derive_key_and_iv(self):
+ '''
+ Generates the key and iv using the password and salt as seed
+ '''
+ d = d_i = ''
+ l = self.key_length + self.bs
+ while len(d) < l:
+ d_i = md5(d_i + self.password + self.salt).digest()
+ d += d_i
+ self.key = d[:self.key_length]
+ self.iv = d[self.key_length:self.key_length + self.bs]
+
+ def get_random_pad(self, padding_length):
+ '''
+ Returns an ISO_10126 pad, which is random
+ '''
+ return Random.new().read(padding_length - 1) + chr(padding_length)
+
+ def split_random_pad(self, chunk):
+ '''
+ Returns the chunk without the ISO_10126 pad
+ '''
+ return chunk[:-ord(chunk[-1])]
+
+ def get_pkcs5_pad(self, padding_length):
+ '''
+ Returns the PKCS pad
+ '''
+ return padding_length * chr(padding_length)
+
+ def split_pkcs5_pad(self, chunk):
+ '''
+ Returns the chunk without the PKCS pad
+ '''
+ return chunk.rstrip(chunk[-1])
+
+
+def encrypt(in_file, out_file, password):
+ aes = AESCrypt(password)
+ aes.init()
+ out_file.write(aes.salt_str)
+
+ finished = False
+ while not finished:
+ chunk = in_file.read(1024 * aes.bs)
+ if not chunk or len(chunk) < 1024 * aes.bs:
+ finished = True
+
+ chunk = aes.encrypt(chunk)
+ out_file.write(chunk)
+ # adding padding
+ out_file.write(aes.close_enc())
+
+
+def decrypt(in_file, out_file, password):
+ aes = AESCrypt(password)
+ salt = aes.get_salt(in_file)
+ aes.init()
+
+ next_chunk = ''
+ finished = False
+ while not finished:
+ buf = in_file.read(1024 * aes.bs)
+ if not buf:
+ finished = True
+ chunk = next_chunk
+ next_chunk = buf
+ out_file.write(aes.decrypt(chunk, finished))
+
+
+if __name__ == '__main__':
+ from StringIO import StringIO
+ infile = StringIO('clear text')
+ cipher = StringIO()
+ out = StringIO()
+ encrypt(infile, cipher, 'key')
+ cipher.seek(0)
+ decrypt(cipher, out, 'key')
+ out.seek(0)
+ print out.read()
import re
import operator
+import aescrypto
+
try:
import grp, pwd
except ImportError:
_Stream is intended to be used only internally.
"""
- def __init__(self, name, mode, comptype, fileobj, bufsize, concat_stream=False):
+ def __init__(self, name, mode, comptype, fileobj, bufsize,
+ concat_stream=False, enctype='', password=""):
"""Construct a _Stream object.
"""
self._extfileobj = True
self.flags = 0L
self.internal_pos = 0L
self.concat_stream = concat_stream
+ self.enctype = enctype
+ self.password = password
if comptype == "gz":
try:
-self.zlib.MAX_WBITS,
self.zlib.DEF_MEM_LEVEL,
0)
+
+ # if aes, we encrypt after compression
+ if self.enctype == 'aes':
+ self.encryption = aescrypto.AESCrypt(self.password)
+ self.encryption.init()
+ self.__write_to_file(self.encryption.salt_str)
+
timestamp = struct.pack("<L", long(time.time()))
self.__write("\037\213\010\010%s\002\377" % timestamp)
if type(self.name) is unicode:
self.name = self.name.encode("iso-8859-1", "replace")
+ if self.name.endswith(".aes"):
+ self.name = self.name[:-4]
if self.name.endswith(".gz"):
self.name = self.name[:-3]
self.__write(self.name + NUL)
-self.zlib.MAX_WBITS,
self.zlib.DEF_MEM_LEVEL,
0)
+
+ # if aes, we encrypt after compression
+ if self.enctype == 'aes':
+ self.__write_to_file(self.encryption.close_enc())
+ self.encryption = aescrypto.AESCrypt(self.password)
+ self.encryption.init()
+ self.__write_to_file(self.encryption.salt_str)
+
timestamp = struct.pack("<L", long(time.time()))
self.__write("\037\213\010\000%s\002\377" % timestamp)
"""
self.buf += s
while len(self.buf) > self.bufsize:
- self.fileobj.write(self.buf[:self.bufsize])
+ self.__enc_write(self.buf[:self.bufsize])
self.buf = self.buf[self.bufsize:]
+ def __write_to_file(self, s):
+ '''
+ Writes directly to the fileobj
+ '''
+ self.fileobj.write(s)
+
+ def __enc_write(self, s):
+ '''
+ If there's encryption, the string s is encrypted before write it to
+ the file
+ '''
+ tow = s
+ if self.enctype == 'aes':
+ tow = self.encryption.encrypt(s)
+ self.__write_to_file(tow)
+
def close(self, close_fileobj=True):
"""Close the _Stream object. No operation should be
done on it afterwards.
self.buf += self.cmp.flush()
if self.mode == "w" and self.buf:
- self.fileobj.write(self.buf)
+ chunk = self.buf
self.buf = ""
if self.comptype == "gz":
# The native zlib crc is an unsigned 32-bit integer, but
# while the same crc on a 64-bit box may "look positive".
# To avoid irksome warnings from the `struct` module, force
# it to look positive on all boxes.
- self.fileobj.write(struct.pack("<L", self.crc & 0xffffffffL))
- self.fileobj.write(struct.pack("<L", self.concat_pos & 0xffffFFFFL))
+ chunk += struct.pack("<L", self.crc & 0xffffffffL)
+ chunk += struct.pack("<L", self.concat_pos & 0xffffFFFFL)
+
+ self.__enc_write(chunk)
if close_fileobj and not self._extfileobj:
+ if self.enctype == 'aes' and self.mode == "w":
+ self.__write_to_file(self.encryption.close_enc())
self.fileobj.close()
# read the zlib crc and length and check them
concat_compression = False # Used to separate in different zip members each
# file, used for robustness.
+ password = '' # Used for aes encryption
+
def __init__(self, name=None, mode="r", fileobj=None, format=None,
tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
errors=None, pax_headers=None, debug=None, errorlevel=None,
max_volume_size=None, new_volume_handler=None,
- concat_compression=False):
+ concat_compression=False, password=''):
"""Open an (uncompressed) tar archive `name'. `mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
file or 'w' to create a new file overwriting an existing one. `mode'
raise ValueError("mode must be 'r', 'a' or 'w'")
self.mode = mode
self.concat_compression = concat_compression
+ self.password = password
self._mode = {"r": "rb", "a": "r+b", "w": "wb"}[mode]
if not fileobj:
'r#gz' open a stream of gzip compressed tar blocks for reading
'w#gz' open a stream of gzip compressed tar blocks for writing
+
+ 'r#gz.aes' open an aes encrypted stream of gzip compressed tar blocks for reading
+ 'w#gz.aes' open an aes encrypted stream of gzip compressed tar blocks for writing
"""
if not name and not fileobj:
elif "#" in mode:
filemode, comptype = mode.split("#", 1)
filemode = filemode or "r"
+ password = ''
+ # if not enctype there's no encryption
+ enctype = ''
if filemode not in "rw":
raise ValueError("mode must be 'r' or 'w'")
- if comptype not in ["gz"]:
+ if comptype not in ["gz", "gz.aes"]:
raise ValueError("comptype must be 'gz'")
+ # encryption gz.aes
+ if "." in comptype:
+ comptype, enctype = comptype.split(".", 1)
+ password = kwargs.get('password', '')
+ if not password:
+ raise ValueError("you should give a password for encryption")
+
kwargs['concat_compression'] = True
t = cls(name, filemode,
- _Stream(name, filemode, comptype, fileobj, bufsize, concat_stream=True),
+ _Stream(name, filemode, comptype, fileobj, bufsize,
+ concat_stream=True, enctype=enctype,
+ password=password),
**kwargs)
t._extfileobj = False
return t
from testing.test_multivol import MultivolGnuFormatTest, MultivolPaxFormatTest
from testing.test_concat_compress import ConcatCompressTest
from testing.test_rescue_tar import RescueTarTest
+from testing.test_encryption import EncryptionTest
if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
+ unittest.main()
--- /dev/null
+import os
+
+from deltatar.tarfile import TarFile, GNU_FORMAT
+
+import filesplit
+from . import BaseTest
+
+
+class EncryptionTest(BaseTest):
+ """
+ Test encryption after compression in tarfiles
+ """
+
+ def test_openssl_decrypt(self):
+ """
+ Create a tar file with only one file inside, using concat
+ compression and encryption mode. Then decrypt with openssl,
+ decompress it with zcat and untar it with gnu tar.
+ """
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.aes",
+ mode="w#gz.aes",
+ format=GNU_FORMAT,
+ concat_compression=True,
+ password='key')
+ tarobj.add("big")
+ tarobj.close()
+ os.unlink("big")
+
+ # extract with normal tar and check output
+ filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes")
+
+ assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
+ assert os.path.exists("sample.tar.gz.aes.1") # first file
+
+ os.system("openssl aes-256-cbc -k 'key' -d -in sample.tar.gz.aes.1 -out sample.tar.gz")
+ os.system("zcat sample.tar.gz > sample.tar")
+ os.system("tar xf sample.tar")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+ def test_openssl_multiple_files_decrypt(self):
+ """
+ Create a tar file with multiple files inside, using concat
+ compression and encryption mode. Then decrypt with openssl,
+ decompress it with zcat and untar it with gnu tar.
+ """
+
+ # create sample data
+ hash = dict()
+ hash["big"] = self.create_file("big", 50000)
+ hash["small"] = self.create_file("small", 100)
+ hash["small2"] = self.create_file("small2", 354)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.aes",
+ mode="w#gz.aes",
+ format=GNU_FORMAT,
+ concat_compression=True,
+ password='key')
+
+ for k in hash:
+ tarobj.add(k)
+ tarobj.close()
+
+ for k in hash:
+ os.unlink(k)
+
+ # extract with normal tar and check output
+ filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes")
+
+ assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
+ assert os.path.exists("sample.tar.gz.aes.1") # first file
+ assert os.path.exists("sample.tar.gz.aes.2") # second file
+ assert os.path.exists("sample.tar.gz.aes.3") # third file
+ assert not os.path.exists("sample.tar.gz.aes.4") # nothing else
+
+ # extract and check output
+ for i in xrange(1, 4):
+ fname = "sample.tar.gz.aes.%d" % i
+ os.system("openssl aes-256-cbc -k 'key' -d -in %s -out sample.tar.gz" % fname)
+ os.system("zcat sample.tar.gz > sample.tar")
+ os.system("tar xf sample.tar")
+
+ for key, value in hash.iteritems():
+ assert os.path.exists(key)
+ assert value == self.md5sum(key)