#!/usr/bin/env python3
"""
+Intra2net 2017
===============================================================================
crypto -- Encryption Layer for the Intra2net Backup
- Authentication weaknesses in GCM
http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/comments/CWC-GCM/Ferguson2.pdf
+Trouble with python-cryptography packages: authentication tags can only be
+passed in advance: https://github.com/pyca/cryptography/pull/3421
+
"""
import binascii
I2N_HDR_SIZE_NACL = 16
I2N_HDR_SIZE_IV = 12
I2N_HDR_SIZE_CTSIZE = 8
-I2N_HDR_SIZE_TAG = 16
+I2N_TLR_SIZE_TAG = 16 # GCM auth tag, appended to data
I2N_HDR_SIZE = I2N_HDR_SIZE_MAGIC + I2N_HDR_SIZE_VERSION \
+ I2N_HDR_SIZE_PARAMVERSION + I2N_HDR_SIZE_NACL \
- + I2N_HDR_SIZE_IV + I2N_HDR_SIZE_CTSIZE \
- + I2N_HDR_SIZE_TAG # = 64
+ + I2N_HDR_SIZE_IV + I2N_HDR_SIZE_CTSIZE # = 48
# precalculate offsets since Python can’t do constant folding over names
HDR_OFF_VERSION = I2N_HDR_SIZE_MAGIC
HDR_OFF_NACL = HDR_OFF_PARAMVERSION + I2N_HDR_SIZE_PARAMVERSION
HDR_OFF_IV = HDR_OFF_NACL + I2N_HDR_SIZE_NACL
HDR_OFF_CTSIZE = HDR_OFF_IV + I2N_HDR_SIZE_IV
-HDR_OFF_TAG = HDR_OFF_CTSIZE + I2N_HDR_SIZE_CTSIZE
FMT_UINT16_LE = "<H"
FMT_UINT64_LE = "<Q"
-
-PASSPHRASE = b"test1234"
-AES_GCM_AAD = b"authenticated plain text"
+FMT_I2N_HDR = ("<" # host byte order
+ "8s" # magic
+ "H" # version
+ "H" # paramversion
+ "16s" # sodium chloride
+ "12s" # iv
+ "Q") # size
# aes+gcm
AES_GCM_IV_LEN = 12
# , paramversion : u16
# , nacl : [u8; 16]
# , iv : [u8; 12]
-# , ctsize : usize
-# , tag : [u8; 16] }
+# , ctsize : usize }
+#
+# tag : [u8; 16]
+#
# fn hdr_read (f : handle) -> hdrinfo;
# fn hdr_write (f : handle, h : hdrinfo) -> IOResult<usize>;
# fn hdr_fmt (h : hdrinfo) -> String;
#
-def hdr_read (fin):
- if isinstance (fin, int):
- try:
- fin = os.fdopen (fin)
- except OSError as exn: # probably EBADF
- return False, "error converting fd %d to io for reading: %r" \
- % (fin, exn)
+def hdr_read (data):
+
try:
- hdr = fin.read(I2N_HDR_SIZE)
- except OSError as exn:
- return False, "error reading %d B from %r: %r" \
- % (I2N_HDR_SIZE, fin, exn)
+ mag, version, paramversion, nacl, iv, ctsize = \
+ struct.unpack (FMT_I2N_HDR, data)
+ except Exception as exn:
+ return False, "error reading header from [%r]: %s" % (data, str (exn))
- mag = hdr[:I2N_HDR_SIZE_MAGIC]
if mag != I2N_HDR_MAGIC:
- return False, "error reading header from %r: expected %s, got %r" \
- % (fin, I2N_HDR_MAGIC, mag)
-
- version = hdr[HDR_OFF_VERSION : HDR_OFF_PARAMVERSION]
- paramversion = hdr[HDR_OFF_PARAMVERSION : HDR_OFF_NACL]
- nacl = hdr[HDR_OFF_NACL : HDR_OFF_IV]
- iv = hdr[HDR_OFF_IV : HDR_OFF_CTSIZE]
- ctsize = hdr[HDR_OFF_CTSIZE : HDR_OFF_TAG]
- tag = hdr[HDR_OFF_TAG : I2N_HDR_SIZE]
-
- version = struct.unpack (FMT_UINT16_LE, version)
- paramversion = struct.unpack (FMT_UINT16_LE, paramversion)
- ctsize = struct.unpack (FMT_UINT64_LE, ctsize)
+ return False, "bad magic in header: expected [%s], got [%s]" \
+ % (I2N_HDR_MAGIC, mag)
return True, \
{ "version" : version
, "nacl" : nacl
, "iv" : iv
, "ctsize" : ctsize
- , "tag" : tag
}
-def hdr_write (fout, hdr):
- if isinstance (fout, int):
- try:
- fout = os.fdopen (fout)
- except OSError as exn: # probably EBADF
- return False, "error converting fd %d to io for writing: %r" \
- % (fout, exn)
- version = struct.pack (FMT_UINT16_LE, hdr[ "version"])
- paramversion = struct.pack (FMT_UINT16_LE, hdr["paramversion"])
- ctsize = struct.pack (FMT_UINT64_LE, hdr[ "ctsize"])
+def hdr_make (hdr):
+ buf = bytearray (I2N_HDR_SIZE)
+ bufv = memoryview (buf)
- sum = 0
+ print(">>>", hdr)
try:
- def aux (f, v, s):
- ret = fout.write (v)
- if ret != s:
- return False, \
- "error writing header %s to %r: expected %d B, wrote %d B" \
- % (f, fout, s, ret)
- nonlocal sum
- sum += ret
- return True, sum
-
- s, e = aux ("magic", I2N_HDR_MAGIC, I2N_HDR_SIZE_MAGIC)
- if s is False: return False, e
+ struct.pack_into (FMT_I2N_HDR, bufv, 0,
+ I2N_HDR_MAGIC,
+ hdr["version"],
+ hdr["paramversion"],
+ hdr["nacl"],
+ hdr["iv"],
+ hdr["ctsize"])
+ except Exception as exn:
+ return False, "error writing header: %s" % str (exn)
- s, e = aux ("version", version, I2N_HDR_SIZE_VERSION)
- if s is False: return False, e
+ return True, bytes (buf)
- s, e = aux ("paramversion", paramversion, I2N_HDR_SIZE_PARAMVERSION)
- if s is False: return False, e
- s, e = aux ("nacl", hdr["nacl"], I2N_HDR_SIZE_NACL)
- if s is False: return False, e
+HDR_FMT = "I2n_header { version: %d, paramversion: %d, nacl: %s[%d]," \
+ " iv: %s[%d], ctsize: %d }"
- s, e = aux ("iv", hdr["iv"], I2N_HDR_SIZE_IV)
- if s is False: return False, e
-
- s, e = aux ("ctsize", ctsize, I2N_HDR_SIZE_CTSIZE)
- if s is False: return False, e
-
- s, e = aux ("tag", hdr["tag"], I2N_HDR_SIZE_TAG)
- if s is False: return False, e
-
- except OSError as exn:
- return False, "error writing header to %r after %d B written" \
- % (fout, sum)
+def hdr_fmt (h):
+ return HDR_FMT % (h["version"], h["paramversion"],
+ binascii.hexlify (h["nacl"]), len(h["nacl"]),
+ binascii.hexlify (h["iv"]), len(h["iv"]),
+ h["ctsize"])
- if sum != I2N_HDR_SIZE:
- return False, "error writing header to %r; wrote %d B total, " \
- "expected %d" % (fout, sum, I2N_HDR_SIZE)
- return True, sum
+def hex_spaced_of_bytes (b):
+ return " ".join ([ "%.2x%.2x" % (c1, c2)
+ for c1, c2 in zip (b[0::2], b[1::2]) ]) \
+ + (len (b) | 1 == len (b) and " %.2x" % b[-1] or "") # odd lengths
+hdr_dump = hex_spaced_of_bytes
-HDR_FMT = "I2n_header { version: %d, paramversion: %d, nacl: %s[%d]\"," \
- " iv: %s[%d], ctsize: %d, tag: %s[%d] }"
-def hdr_fmt (h):
- return HDR_FMT % (h["version"], h["paramversion"],
- binascii.hexlify (h["nacl"]), len(h["nacl"]),
- binascii.hexlify (h["iv"]), len(h["iv"]),
- h["ctsize"],
- binascii.hexlify (h["tag"]), len(h["tag"]))
+HDR_FMT_PRETTY = """
+version = %-4d : %s
+paramversion = %-4d : %s
+nacl : %s
+iv : %s
+ctsize = %-20d : %s
+"""
+def hdr_fmt_pretty (h):
+ return HDR_FMT_PRETTY \
+ % (h["version"],
+ hex_spaced_of_bytes (struct.pack (FMT_UINT16_LE, h["version"])),
+ h["paramversion"],
+ hex_spaced_of_bytes (struct.pack (FMT_UINT16_LE, h["paramversion"])),
+ hex_spaced_of_bytes (h["nacl"]),
+ hex_spaced_of_bytes (h["iv"]),
+ h["ctsize"],
+ hex_spaced_of_bytes (struct.pack (FMT_UINT64_LE, h["ctsize"])))
-def hdr_dump (h):
- return " ".join ("%.2x%.2x" % (c1, c2) for c1, c2 in zip (h[0::2], h[1::2]))
###############################################################################
## {de,en}cryption
###############################################################################
+
def aesgcm_enc (key, data, aad, iv=None):
iv = iv or os.urandom(AES_GCM_IV_LEN)
enc = Cipher \
def aesgcm_dec (key, data, aad, iv, tag):
dec = Cipher \
( algorithms.AES (key)
- , modes.GCM (iv, tag)
+ , modes.GCM (iv)
, backend = default_backend ()) \
. decryptor ()
dec.authenticate_additional_data (aad)
- return dec.update (data) + dec.finalize ()
-
-class AES_GCM (object):
+ return dec.update (data) + dec.finalize_with_tag (tag)
+
+
+ENCRYPT = 0
+DECRYPT = 1
+
+def aesgcm_context_create (kind, key, aad, iv):
+ if kind == ENCRYPT:
+ iv = iv or os.urandom(AES_GCM_IV_LEN)
+ ctx = Cipher \
+ ( algorithms.AES (key)
+ , modes.GCM (iv)
+ , backend = default_backend ()) \
+ .encryptor ()
+ elif kind == DECRYPT:
+ ctx = Cipher \
+ ( algorithms.AES (key)
+ , modes.GCM (iv)
+ , backend = default_backend ()) \
+ . decryptor ()
+ ctx.authenticate_additional_data (aad)
+ return ctx
+
+
+class AES_GCM_context (object):
"""
Thin wrapper context over AES encryption.
"""
key = None
- data = None
aad = None
+ iv = None
- def __init__ (key, data, aad):
- self.key = key
- self.data = data
- self.aad = aad
+ def __init__ (self, kind, key, aad, iv=None):
+ self.key = key
+ self.aad = aad
+ if not iv:
+ iv = os.urandom (AES_GCM_IV_LEN)
+ self.iv = iv
+ self.ctx = aesgcm_context_create (kind, key, aad, iv)
+
+
+ def encrypt_chunk (self, cnk):
+ if self.ctx is None:
+ return False, "no valid encryption context"
+ return True, self.ctx.update (cnk)
+
+
+ def done (self, tag=None):
+ if self.ctx is None:
+ return False, "no valid encryption context"
+ if tag is None:
+ ret = self.ctx.finalize ()
+ return True, ret, self.ctx.tag
+ ret = self.ctx.finalize_with_tag (tag)
+ return True, ret, None
###############################################################################
## freestanding invocation
###############################################################################
-def faux_hdr ():
- return \
- { "version" : 42
- , "paramversion" : 2187
- , "nacl" : binascii.unhexlify(b"0011223344556677"
- b"8899aabbccddeeff")
- , "iv" : binascii.unhexlify(b"0011223344556677"
- b"8899aabb")
- , "ctsize" : 1337
- , "tag" : binascii.unhexlify(b"ffeeddccbbaa9988"
- b"7766554433221100")
- }
-
-
def main (argv):
- h = faux_hdr ()
- print ("before: %s" % hdr_fmt (h))
- c = io.BytesIO ()
- s, v = hdr_write (c, h)
- if s is False:
- print ("error writing header [%s] to %r (%s)"
- % (hdr_dump (h), s, v), file=sys.stderr)
- return -1
- b = c.getvalue()
- print ("packed value: %s[%d]" % (hdr_dump (b), len (b)))
- s, v = hdr_read (io.BytesIO(b))
- if s is False:
- print ("error reading header from %r (%s)"
- % (s, v), file=sys.stderr)
- return -1
- print ("after: %s" % hdr_fmt (h))
- #NaCl, key = scrypt_derive (PASSPHRASE)
- #iv, ciphertext, tag = aesgcm_enc (key, b"plain text", AES_GCM_AAD)
- #plaintext = aesgcm_dec (key, ciphertext, AES_GCM_AAD, iv, tag)
- return 0
+ print("For testing run unit tests, kthxbye!")
+ return -1
if __name__ == "__main__":
sys.exit (main (sys.argv))