return cands
+HDR_CAND_GOOD = 0 # header marks begin of valid object
+HDR_CAND_FISHY = 1 # inconclusive (tag mismatch, obj overlap etc.)
+HDR_CAND_JUNK = 2 # not a header / object unreadable
+
+
+def inspect_hdr (fd, off):
+ """
+ Attempt to parse a header in *fd* at position *off*.
+
+ Returns a verdict about the quality of that header plus the parsed header
+ when readable.
+ """
+
+ _ = os.lseek (fd, off, os.SEEK_SET)
+
+ if os.lseek (fd, 0, os.SEEK_CUR) != off:
+ if PDTCRYPT_VERBOSE is True:
+ noise ("PDT: %d → dismissed (lseek() past EOF)" % off)
+ return HDR_CAND_JUNK, None
+
+ raw = os.read (fd, PDTCRYPT_HDR_SIZE)
+ if len (raw) != PDTCRYPT_HDR_SIZE:
+ if PDTCRYPT_VERBOSE is True:
+ noise ("PDT: %d → dismissed (EOF inside header)" % off)
+ return HDR_CAND_JUNK, None
+
+ try:
+ hdr = hdr_read (raw)
+ except InvalidHeader as exn:
+ if PDTCRYPT_VERBOSE is True:
+ noise ("PDT: %d → dismissed (invalid: [%s])" % (off, str (exn)))
+ return HDR_CAND_JUNK, None
+
+ obj0 = off + PDTCRYPT_HDR_SIZE
+ objX = obj0 + hdr ["ctsize"]
+
+ eof = os.lseek (fd, 0, os.SEEK_END)
+ if eof < objX:
+ if PDTCRYPT_VERBOSE is True:
+ noise ("PDT: %d → EOF inside object (%d≤%d≤%d); adjusting size to "
+ "%d" % (off, obj0, eof, objX, (eof - obj0)))
+ # try reading up to the end
+ hdr ["ctsize"] = eof - obj0
+ return HDR_CAND_FISHY, hdr
+
+ return HDR_CAND_GOOD, hdr
+
+
+def try_decrypt (fd, off, hdr, fname=None):
+ """
+ Attempt to decrypt the object in the (seekable) descriptor *fd* starting at
+ *off* using the metadata in *hdr*. An output file can be specified with
+ *fname*; if it is *None*, the decrypted payload will be discarded.
+ """
+
+ raise NotImplementedError
+
+
###############################################################################
## passthrough / null encryption
###############################################################################
if PDTCRYPT_VERBOSE is True:
noise ("PDT: scan complete: found %d candidates:" % len (cands))
noise_output_candidates (cands)
- finally:
+ except:
os.close (fd)
+ raise
+ junk, todo = [], []
+ try:
+ for cand in cands:
+ vdt, hdr = inspect_hdr (fd, cand)
+ if vdt == HDR_CAND_JUNK:
+ junk.append (cand)
+ else:
+ off0 = cand + PDTCRYPT_HDR_SIZE
+ if PDTCRYPT_VERBOSE is True:
+ noise ("PDT: read payload @%d: %s" % (off0, hdr_fmt (hdr)))
+ vdtt = try_decrypt (fd, off0, hdr)
+
+ if vdt == HDR_CAND_GOOD and vdtt == HDR_CAND_GOOD:
+ noise ("PDT: %d → ✓ valid object %d–%d"
+ % (cand, off0, off0 + hdr ["ctsize"]))
+ elif vdt == HDR_CAND_FISHY and vdtt == HDR_CAND_GOOD:
+ noise ("PDT: %d → × object %d–%d, corrupt header"
+ % (cand, off0, off0 + hdr ["ctsize"]))
+ elif vdt == HDR_CAND_GOOD and vdtt == HDR_CAND_FISHY:
+ noise ("PDT: %d → × object %d–%d, problematic payload"
+ % (cand, off0, off0 + hdr ["ctsize"]))
+ elif vdt == HDR_CAND_FISHY and vdtt == HDR_CAND_FISHY:
+ noise ("PDT: %d → × object %d–%d, corrupt header, problematic "
+ "ciphertext" % (cand, off0, off0 + hdr ["ctsize"]))
+ else:
+ raise Unreachable
+ finally:
+ os.close (fd)
def usage (err=False):
out = print