"""
def __init__(self, name, mode, comptype, fileobj, bufsize,
- concat_stream=False, enctype='', password=""):
+ concat_stream=False, enctype='', password="",
+ key_length=16):
"""Construct a _Stream object.
"""
self._extfileobj = True
self.internal_pos = 0L
self.concat_stream = concat_stream
self.enctype = enctype
+ self.key_length = key_length
self.password = password
if comptype == "gz":
# if aes, we encrypt after compression
if self.enctype == 'aes':
- self.encryption = aescrypto.AESCrypt(self.password)
+ self.encryption = aescrypto.AESCrypt(self.password,
+ key_length=self.key_length)
self.encryption.init()
self.__write_to_file(self.encryption.salt_str)
self.__write("\037\213\010\010%s\002\377" % timestamp)
if type(self.name) is unicode:
self.name = self.name.encode("iso-8859-1", "replace")
- if self.name.endswith(".aes"):
- self.name = self.name[:-4]
+ if self.name.endswith(".aes128") or self.name.endswith(".aes256"):
+ self.name = self.name[:-7]
if self.name.endswith(".gz"):
self.name = self.name[:-3]
self.__write(self.name + NUL)
# if aes, we encrypt after compression
if self.enctype == 'aes':
self.__write_to_file(self.encryption.close_enc())
- self.encryption = aescrypto.AESCrypt(self.password)
+ self.encryption = aescrypto.AESCrypt(self.password,
+ key_length=self.key_length)
self.encryption.init()
self.__write_to_file(self.encryption.salt_str)
# if aes, we decrypt before the compression
if self.enctype == 'aes':
- self.encryption = aescrypto.AESCrypt(self.password)
+ self.encryption = aescrypto.AESCrypt(self.password,
+ key_length=self.key_length)
self.encryption.get_salt(self.fileobj)
self.encryption.init()
'r#gz' open a stream of gzip compressed tar blocks for reading
'w#gz' open a stream of gzip compressed tar blocks for writing
- 'r#gz.aes' open an aes encrypted stream of gzip compressed tar blocks for reading
- 'w#gz.aes' open an aes encrypted stream of gzip compressed tar blocks for writing
+ 'r#gz.aes128' open an aes128 encrypted stream of gzip compressed tar blocks for reading
+ 'w#gz.aes128' open an aes128 encrypted stream of gzip compressed tar blocks for writing
+ 'r#gz.aes256' open an aes256 encrypted stream of gzip compressed tar blocks for reading
+ 'w#gz.aes256' open an aes256 encrypted stream of gzip compressed tar blocks for writing
"""
if not name and not fileobj:
password = ''
# if not enctype there's no encryption
enctype = ''
+ key_length = 16
if filemode not in "rw":
raise ValueError("mode must be 'r' or 'w'")
- if comptype not in ["gz", "gz.aes"]:
+ if comptype not in ["gz", "gz.aes128", "gz.aes256"]:
raise ValueError("comptype must be 'gz'")
- # encryption gz.aes
+ # encryption gz.aes128 or gz.aes256
if "." in comptype:
comptype, enctype = comptype.split(".", 1)
+ kl = enctype[3:]
+ enctype = enctype[:3]
+ key_length = 16 if kl == '128' else 32
password = kwargs.get('password', '')
if not password:
raise ValueError("you should give a password for encryption")
t = cls(name, filemode,
_Stream(name, filemode, comptype, fileobj, bufsize,
concat_stream=True, enctype=enctype,
- password=password),
+ password=password, key_length=key_length),
**kwargs)
t._extfileobj = False
return t
hash = self.create_file("big", 50000)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="w#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="w#gz.aes128",
format=GNU_FORMAT,
concat_compression=True,
password='key')
os.unlink("big")
# extract with normal tar and check output
- filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes")
+ filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128")
assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
assert os.path.exists("sample.tar.gz.aes.1") # first file
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="w#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="w#gz.aes128",
format=GNU_FORMAT,
concat_compression=True,
password='key')
os.unlink(k)
# extract with normal tar and check output
- filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes")
+ filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128")
assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
assert os.path.exists("sample.tar.gz.aes.1") # first file
hash = self.create_file("big", 50000)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="w#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="w#gz.aes128",
format=GNU_FORMAT,
concat_compression=True,
password='key')
tarobj.close()
os.unlink("big")
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="r#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="r#gz.aes128",
format=GNU_FORMAT,
password='key')
tarobj.extractall()
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="w#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="w#gz.aes128",
format=GNU_FORMAT,
concat_compression=True,
password='key')
for k in hash:
os.unlink(k)
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="r#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="r#gz.aes128",
format=GNU_FORMAT,
password='key')
tarobj.extractall()
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="w#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="w#gz.aes128",
password='key',
concat_compression=True,
max_volume_size=20000,
tarobj.add(k)
tarobj.close()
- assert os.path.exists("sample.tar.gz.aes")
+ assert os.path.exists("sample.tar.gz.aes128")
for k in hash:
os.unlink(k)
# extract
- tarobj = TarFile.open("sample.tar.gz.aes",
- mode="r#gz.aes",
+ tarobj = TarFile.open("sample.tar.gz.aes128",
+ mode="r#gz.aes128",
password="key",
new_volume_handler=new_volume_handler)
tarobj.extractall()
for key, value in hash.iteritems():
assert os.path.exists(key)
assert value == self.md5sum(key)
+
+ def test_multivol_file_decrypt_aes256(self):
+ '''
+ Test multivol tarball with encryption aes256
+ '''
+
+ # create sample data
+ hash = dict()
+ hash["big"] = self.create_file("big", 50000)
+ hash["big2"] = self.create_file("big2", 10200)
+ hash["small"] = self.create_file("small", 100)
+ hash["small2"] = self.create_file("small2", 354)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.aes256",
+ mode="w#gz.aes256",
+ password='key',
+ concat_compression=True,
+ max_volume_size=20000,
+ new_volume_handler=new_volume_handler)
+
+ for k in hash:
+ tarobj.add(k)
+ tarobj.close()
+
+ assert os.path.exists("sample.tar.gz.aes256")
+ for k in hash:
+ os.unlink(k)
+
+ # extract
+ tarobj = TarFile.open("sample.tar.gz.aes256",
+ mode="r#gz.aes256",
+ password="key",
+ new_volume_handler=new_volume_handler)
+ tarobj.extractall()
+ tarobj.close()
+
+ # check output
+ for key, value in hash.iteritems():
+ assert os.path.exists(key)
+ assert value == self.md5sum(key)
+
+ def test_openssl_decrypt_256(self):
+ """
+ Create a tar file with only one file inside, using concat
+ compression and encryption mode. Then decrypt with openssl,
+ decompress it with zcat and untar it with gnu tar.
+
+ Using aes 256.
+ """
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.aes256",
+ mode="w#gz.aes256",
+ format=GNU_FORMAT,
+ concat_compression=True,
+ password='key')
+ tarobj.add("big")
+ tarobj.close()
+ os.unlink("big")
+
+ # extract with normal tar and check output
+ filesplit.split_file('Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes256")
+
+ assert os.path.exists("sample.tar.gz.aes.0") # beginning of the tar file
+ assert os.path.exists("sample.tar.gz.aes.1") # first file
+
+ os.system("openssl aes-256-cbc -nopad -k 'key' -d -in sample.tar.gz.aes.1 -out sample.tar.gz")
+ os.system("zcat sample.tar.gz 2>/dev/null > sample.tar")
+ os.system("tar xf sample.tar")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")