PDTCRYPT_HDR_MAGIC,
version, paramversion, nacl, iv, ctsize, tag)
except Exception as exn:
- return False, "error writing header: %s" % str (exn)
+ return False, "error assembling header: %s" % str (exn)
return True, bytes (buf)
"""
self.next_fixed ()
self.set_object_counter (counter)
-
self.strict_ivs = strict_ivs
+ if paramversion is not None:
+ self.paramversion = paramversion
+
if key is not None:
self.key, self.nacl = key, nacl
return
- if isinstance (password, bytes) is False: password = str.encode (password)
- self.password = password
- if paramversion is None and nacl is None:
- # postpone until first header is available
- return
- kdf = kdf_by_version (paramversion)
- if kdf is not None:
- self.key, self.nacl = kdf (password, nacl)
- self.paramversion = paramversion
+ if password is not None:
+ if isinstance (password, bytes) is False:
+ password = str.encode (password)
+ self.password = password
+ if paramversion is None and nacl is None:
+ # postpone key setup until first header is available
+ return
+ kdf = kdf_by_version (paramversion)
+ if kdf is not None:
+ self.key, self.nacl = kdf (password, nacl)
def process (self, buf):
self.fixed = fixedparts
self.fixed.sort ()
- super().__init__ (password, key, counter=counter, strict_ivs=strict_ivs)
+ super().__init__ (password=password, key=key, counter=counter,
+ strict_ivs=strict_ivs)
def valid_fixed_part (self, iv):
{ "process" : PDTCRYPT_SUB_PROCESS
, "scrypt" : PDTCRYPT_SUB_SCRYPT }
+PDTCRYPT_SECRET_PW = 0
+PDTCRYPT_SECRET_KEY = 1
+
PDTCRYPT_DECRYPT = 1 << 0 # decrypt archive with password
PDTCRYPT_SPLIT = 1 << 1 # split archive into individual objects
PDTCRYPT_HASH = 1 << 2 # output scrypt hash for file and given password
return d
-def depdtcrypt (mode, pw, ins, outs):
+def depdtcrypt (mode, secret, ins, outs):
"""
- Remove PDTCRYPT layer from obj encrypted with pw. Used on a Deltatar
- backup this will yield a (possibly Gzip compressed) tarball.
+ Remove PDTCRYPT layer from all objects encrypted with the secret. Used on a
+ Deltatar backup this will yield a (possibly Gzip compressed) tarball.
"""
ctleft = -1 # length of ciphertext to consume
ctcurrent = 0 # total ciphertext of current object
outfile = None # Python file object for output
if mode & PDTCRYPT_DECRYPT: # decryptor
- decr = Decrypt (password=pw, strict_ivs=PDTCRYPT_STRICTIVS)
+ ks = secret [0]
+ if ks == PDTCRYPT_SECRET_PW:
+ decr = Decrypt (password=secret [1], strict_ivs=PDTCRYPT_STRICTIVS)
+ elif ks == PDTCRYPT_SECRET_KEY:
+ key = binascii.unhexlify (secret [1])
+ decr = Decrypt (key=key, strict_ivs=PDTCRYPT_STRICTIVS)
+ else:
+ raise InternalError ("‘%d’ does not specify a valid kind of secret"
+ % ks)
else:
decr = PassthroughDecryptor ()
raise ValueError ("bogus stream “%s” / %s" % (kind, path))
-def depdtcrypt_file (pw, spath, dpath):
- """
- Remove PDTCRYPT layer from file at ``spath`` using password ``pw``, writing
- the decrypted result to dpath.
- """
- if PDTCRYPT_VERBOSE is True:
- noise ("PDT: decrypt %s → %s" % (spath, dpath), file=sys.stderr)
- with deptdcrypt_mk_stream (PDTCRYPT_SOURCE, spath) as ins:
- with deptdcrypt_mk_stream (PDTCRYPT_SINK, dpath) as outs:
- return depdtcrypt (pw, ins, outs)
-
-
-def mode_depdtcrypt (mode, pw, ins, outs):
+def mode_depdtcrypt (mode, secret, ins, outs):
try:
total_read, total_obj, total_ct, total_pt = \
- depdtcrypt (mode, pw, ins, outs)
+ depdtcrypt (mode, secret, ins, outs)
except DecryptionError as exn:
noise ("PDT: Decryption failed:")
noise ("PDT:")
noise ("PDT: “%s”" % exn)
noise ("PDT:")
- noise ("PDT: Did you specify the correct password?")
+ noise ("PDT: Did you specify the correct key / password?")
noise ("")
return 1
except PDTSplitError as exn:
noise ("PDT:")
noise ("PDT: “%s”" % exn)
noise ("PDT:")
- noise ("PDT: Hint: target directory should to be empty.")
+ noise ("PDT: Hint: target directory should be empty.")
noise ("")
return 1
if err is True:
out = noise
out ("usage: %s SUBCOMMAND { --help" % SELF)
- out (" | [ -v ] { PASSWORD } { { -i | --in } { - | SOURCE } }")
- out (" { { -o | --out } { - | DESTINATION } }")
- out (" { -D | --no-decrypt } { -S | --split }")
+ out (" | [ -v ] { -p PASSWORD | -k KEY }")
+ out (" { { -i | --in } { - | SOURCE } }")
+ out (" { { -o | --out } { - | DESTINATION } }")
+ out (" { -D | --no-decrypt } { -S | --split }")
out ("")
out ("\twhere")
out ("\t\tSUBCOMMAND main mode: { process | scrypt }")
out ("\t\t where:")
out ("\t\t process: extract objects from PDT archive")
out ("\t\t scrypt: calculate hash from password and first object")
- out ("\t\tPASSWORD password to derive the encryption key from")
+ out ("\t\t-p PASSWORD password to derive the encryption key from")
+ out ("\t\t-k KEY encryption key as 16 bytes in hexadecimal notation")
out ("\t\t-s enforce strict handling of initialization vectors")
out ("\t\t-i SOURCE file name to read from")
out ("\t\t-o DESTINATION file to write output to")
sys.exit ((err is True) and 42 or 0)
+def bail (msg):
+ noise (msg)
+ noise ("")
+ usage (err=True)
+ raise Unreachable
+
+
def parse_argv (argv):
global SELF
mode = PDTCRYPT_DECRYPT
- pw = None
+ secret = None
insspec = None
outsspec = None
rawsubcmd = next (argvi)
subcommand = PDTCRYPT_SUB [rawsubcmd]
except StopIteration:
- noise ("ERROR: subcommand required")
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: subcommand required")
except KeyError:
- noise ("ERROR: invalid subcommand “%s” specified" % rawsubcmd)
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: invalid subcommand “%s” specified" % rawsubcmd)
- def checked_pw (arg):
- nonlocal pw
- if pw is None:
- pw = arg
+ def checked_secret (t, arg):
+ nonlocal secret
+ if secret is None:
+ secret = (t, arg)
else:
- noise ("ERROR: unqualified argument “%s” but password already "
- "given" % arg)
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: encountered “%s” but secret already given" % arg)
for arg in argvi:
if arg in [ "-h", "--help" ]:
elif arg in [ "-o", "--out", "--dest", "--sink" ]:
outsspec = next (argvi)
if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt to %s" % outsspec)
+ elif arg in [ "-p", "--password" ]:
+ arg = next (argvi)
+ checked_secret (PDTCRYPT_SECRET_PW, arg)
+ if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with password")
else:
if subcommand == PDTCRYPT_SUB_PROCESS:
if arg in [ "-s", "--strict-ivs" ]:
elif arg in [ "-D", "--no-decrypt" ]:
mode &= ~PDTCRYPT_DECRYPT
if PDTCRYPT_VERBOSE is True: noise ("PDT: not decrypting")
+ elif arg in [ "-k", "--key" ]:
+ arg = next (argvi)
+ checked_secret (PDTCRYPT_SECRET_KEY, arg)
+ if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with key")
else:
- checked_pw (arg)
+ bail ("ERROR: unexpected positional argument “%s”" % arg)
elif subcommand == PDTCRYPT_SUB_SCRYPT:
- checked_pw (arg)
- else:
- raise Unreachable
+ bail ("ERROR: unexpected positional argument “%s”" % arg)
- if pw is None:
+ if secret is None:
if PDTCRYPT_VERBOSE is True:
- noise ("ERROR: no password specified, trying $PDTCRYPT_PASSWORD")
+ noise ("ERROR: no password or key specified, trying $PDTCRYPT_PASSWORD")
epw = os.getenv ("PDTCRYPT_PASSWORD")
if epw is not None:
- pw = epw.strip ()
+ checked_secret (PDTCRYPT_SECRET_PW, epw.strip ())
+
+ if secret is None:
+ if PDTCRYPT_VERBOSE is True:
+ noise ("ERROR: no password or key specified, trying $PDTCRYPT_KEY")
+ ek = os.getenv ("PDTCRYPT_KEY")
+ if ek is not None:
+ checked_secret (PDTCRYPT_SECRET_KEY, ek.strip ())
- if pw is None:
+ if secret is None:
if subcommand == PDTCRYPT_SUB_SCRYPT:
- noise ("ERROR: scrypt hash mode requested but no password given")
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: scrypt hash mode requested but no password given")
elif mode & PDTCRYPT_DECRYPT:
- noise ("ERROR: encryption requested but no password given")
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: encryption requested but no password given")
+
+ if subcommand == PDTCRYPT_SUB_SCRYPT and secret [0] == PDTCRYPT_SECRET_KEY:
+ bail ("ERROR: scrypt mode requires a password")
# default to stdout
ins = deptdcrypt_mk_stream (PDTCRYPT_SOURCE, insspec or "-")
if subcommand == PDTCRYPT_SUB_SCRYPT:
- return True, partial (mode_scrypt, pw.encode (), ins)
+ return True, partial (mode_scrypt, secret [1].encode (), ins)
if mode & PDTCRYPT_SPLIT: # destination must be directory
if outsspec is None or outsspec == "-":
- noise ("ERROR: split mode is incompatible with stdout sink")
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: split mode is incompatible with stdout sink")
try:
try:
pass
outs = os.open (outsspec, os.O_DIRECTORY, 0o600)
except FileNotFoundError as exn:
- noise ("ERROR: cannot create target directory “%s”" % outsspec)
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: cannot create target directory “%s”" % outsspec)
except NotADirectoryError as exn:
- noise ("ERROR: target path “%s” is not a directory" % outsspec)
- noise ("")
- usage (err=True)
- raise Unreachable
+ bail ("ERROR: target path “%s” is not a directory" % outsspec)
else:
outs = deptdcrypt_mk_stream (PDTCRYPT_SINK, outsspec or "-")
- return True, partial (mode_depdtcrypt, mode, pw, ins, outs)
+ return True, partial (mode_depdtcrypt, mode, secret, ins, outs)
def main (argv):
# <http://www.gnu.org/licenses/lgpl-3.0.html>
+import binascii
+import hashlib
import os
from deltatar import crypto
Test encryption after compression in tarfiles
"""
- def test_cli_decrypt(self):
+ def test_cli_decrypt_pw (self):
"""
Create a tar file with only one file inside, using concat
compression and encryption mode. Then decrypt with crypto.py,
``crypto.py`` to decrypt on the command line; the rest of the
data pipeline (→ gzip → tar → files) remains the same.
"""
+ pw = "Where is my cow?"
# create the content of the file to compress and hash it
hash = self.create_file("big", 50000)
# create the encryption handler
- encryptor = crypto.Encrypt (password="key",
+ encryptor = crypto.Encrypt (password=pw,
+ version=DELTATAR_HEADER_VERSION,
+ paramversion=DELTATAR_PARAMETER_VERSION)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.pdtcrypt",
+ mode="w#gz",
+ format=GNU_FORMAT,
+ encryption=encryptor)
+ tarobj.add("big")
+ tarobj.close()
+ os.unlink("big")
+
+ # decrypt outer archive layer with crypto.py
+ assert os.path.exists("sample.tar.gz.pdtcrypt")
+ ret = os.system("python3 ./deltatar/crypto.py process -p '%s' "
+ "<sample.tar.gz.pdtcrypt >sample.tar.gz" % pw)
+ assert ret == 0
+ assert os.path.exists("sample.tar.gz")
+
+ # extract with normal tar and check output
+ os.system("zcat sample.tar.gz 2>/dev/null > sample.tar")
+ os.system("tar xf sample.tar")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+
+ def test_cli_decrypt_key_argv (self):
+ """
+ Encrypt, then decrypt using the crypto.py → gzip → tar pipe,
+ supplying the encryption key on the command line.
+ """
+ key = os.urandom (16)
+ nacl = hashlib.md5 ("Stráðu á mig salti".encode ()).digest ()
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the encryption handler
+ encryptor = crypto.Encrypt (key=key, nacl=nacl,
version=DELTATAR_HEADER_VERSION,
paramversion=DELTATAR_PARAMETER_VERSION)
tarobj.close()
os.unlink("big")
- #filesplit.split_file(b'Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128")
+ # decrypt outer archive layer with crypto.py
+ assert os.path.exists("sample.tar.gz.pdtcrypt")
+ ret = os.system("python3 ./deltatar/crypto.py process -k '%s' "
+ "<sample.tar.gz.pdtcrypt >sample.tar.gz"
+ % binascii.hexlify (key).decode ())
+ assert ret == 0
+ assert os.path.exists("sample.tar.gz")
+
+ # extract with normal tar and check output
+ os.system("zcat sample.tar.gz 2>/dev/null > sample.tar")
+ os.system("tar xf sample.tar")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+
+ def test_cli_decrypt_key_envp (self):
+ """
+ Encrypt, then decrypt using the crypto.py → gzip → tar pipe,
+ supplying the encryption key through the process environment.
+ """
+ key = os.urandom (16)
+ nacl = hashlib.md5 ("Stráðu á mig salti".encode ()).digest ()
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the encryption handler
+ encryptor = crypto.Encrypt (key=key, nacl=nacl,
+ version=DELTATAR_HEADER_VERSION,
+ paramversion=DELTATAR_PARAMETER_VERSION)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.pdtcrypt",
+ mode="w#gz",
+ format=GNU_FORMAT,
+ encryption=encryptor)
+ tarobj.add("big")
+ tarobj.close()
+ os.unlink("big")
# decrypt outer archive layer with crypto.py
assert os.path.exists("sample.tar.gz.pdtcrypt")
- ret = os.system("python3 -s ./deltatar/crypto.py process key <sample.tar.gz.pdtcrypt >sample.tar.gz")
+ ret = os.system("PDTCRYPT_KEY='%s' python3 ./deltatar/crypto.py process "
+ "<sample.tar.gz.pdtcrypt >sample.tar.gz"
+ % binascii.hexlify (key).decode ())
assert ret == 0
assert os.path.exists("sample.tar.gz")
assert os.path.exists("big")
assert hash == self.md5sum("big")
- def test_cli_multiple_files_decrypt(self):
+
+ def test_cli_multiple_files_decrypt_pw_argv (self):
"""
Create a tar file with multiple files inside, using concat
compression and encryption mode. Then decrypt and split with
``crypto.py``, decompress it with zcat and untar it with gnu tar.
+ The password is specified as command line argument.
"""
+ pw = "Is that my cow?"
# create sample data
hash = dict()
hash["small2"] = self.create_file("small2", 354)
# create the encryption handler
- encryptor = crypto.Encrypt (password="key",
+ encryptor = crypto.Encrypt (password=pw,
version=DELTATAR_HEADER_VERSION,
paramversion=DELTATAR_PARAMETER_VERSION)
os.unlink(k)
assert os.path.exists("sample.tar.gz.pdtcrypt")
- ret = os.system("python3 -s ./deltatar/crypto.py process key --split "
- "-i sample.tar.gz.pdtcrypt -o .")
+ ret = os.system("python3 ./deltatar/crypto.py process -p '%s' -s --split "
+ "-i sample.tar.gz.pdtcrypt -o ." % pw)
assert ret == 0
for i in range (len (hash)):
fname = "pdtcrypt-object-%d.bin" % (i + 1)
os.system("zcat '%s' 2>/dev/null > sample.tar" % fname)
os.system("tar xf sample.tar")
- for key, value in hash.items():
- assert os.path.exists(key)
- assert value == self.md5sum(key)
+ for fname, digest in hash.items():
+ assert os.path.exists(fname)
+ assert digest == self.md5sum(fname)
+
+
+ def test_cli_multiple_files_decrypt_envp (self):
+ """
+ Create a tar file with multiple files inside, using concat
+ compression and encryption mode. Then decrypt and split with
+ ``crypto.py``, decompress it with zcat and untar it with gnu tar.
+ The password is given as environment variable.
+ """
+ pw = "Is that my cow?"
+
+ # create sample data
+ hash = dict()
+ hash["big"] = self.create_file("big", 50000)
+ hash["small"] = self.create_file("small", 100)
+ hash["small2"] = self.create_file("small2", 354)
+
+ # create the encryption handler
+ encryptor = crypto.Encrypt (password=pw,
+ version=DELTATAR_HEADER_VERSION,
+ paramversion=DELTATAR_PARAMETER_VERSION)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz.pdtcrypt",
+ mode="w#gz",
+ format=GNU_FORMAT,
+ encryption=encryptor)
+
+ for k in hash:
+ tarobj.add(k)
+ tarobj.close()
+
+ for k in hash:
+ os.unlink(k)
+
+ assert os.path.exists("sample.tar.gz.pdtcrypt")
+ ret = os.system("PDTCRYPT_PASSWORD='%s' python3 ./deltatar/crypto.py "
+ "process -s --split -i sample.tar.gz.pdtcrypt -o ."
+ % pw)
+ assert ret == 0
+ for i in range (len (hash)):
+ fname = "pdtcrypt-object-%d.bin" % (i + 1)
+ assert os.path.exists(fname)
+ os.system("zcat '%s' 2>/dev/null > sample.tar" % fname)
+ os.system("tar xf sample.tar")
+
+ for fname, digest in hash.items():
+ assert os.path.exists(fname)
+ assert digest == self.md5sum(fname)
def test_decrypt(self):