total_ct = 0 # total ciphertext bytes
total_read = 0 # total bytes read
+ def tell (s):
+ """ESPIPE is normal on stdio stream."""
+ try:
+ return s.tell ()
+ except OSError as exn:
+ if exn.errno == 29:
+ return -1
+
def out (pt):
npt = len (pt)
nonlocal total_pt
# the start of a new header or the end of the input
if ctleft == 0: # current object requires finalization
if PDTCRYPT_VERBOSE is True:
- noise ("PDT: %d finalize" % ins.tell ())
+ noise ("PDT: %d finalize" % tell (ins))
ret, pt = decr.done ()
if ret is False:
raise DecryptionError ("error finalizing object (%s)"
noise ("PDT:\t· object validated")
if PDTCRYPT_VERBOSE is True:
- noise ("PDT: %d hdr" % ins.tell ())
+ noise ("PDT: %d hdr" % tell (ins))
try:
hdr = hdr_read_stream (ins)
total_read += I2N_HDR_SIZE
return total_read, total_obj, total_ct, total_pt
except InvalidHeader as exn:
raise PDTDecryptionError ("invalid header at position %d in %r "
- "(%s)" % (exn, ins.tell (), ins))
+ "(%s)" % (exn, tell (ins), ins))
if PDTCRYPT_VERBOSE is True:
pretty = hdr_fmt_pretty (hdr)
noise (reduce (lambda a, e: (a + "\n" if a else "") + "PDT:\t· " + e,
if PDTCRYPT_VERBOSE is True:
noise ("PDT: %d decrypt obj no. %d, %d B"
- % (ins.tell (), total_obj, ctleft))
+ % (tell (ins), total_obj, ctleft))
# always allocate a new buffer since python-cryptography doesn’t allow
# passing a bytearray :/
nexpect = min (ctleft, PDTCRYPT_BLOCKSIZE)
if PDTCRYPT_VERBOSE is True:
noise ("PDT:\t· [%d] %d%% done, read block (%d B of %d B remaining)"
- % (ins.tell (),
+ % (tell (ins),
100 - ctleft * 100 / (ctcurrent > 0 and ctcurrent or 1),
nexpect, ctleft))
ct = ins.read (nexpect)
nct = len (ct)
if nct < nexpect:
- off = ins.tell ()
+ off = tell (ins)
raise EndOfFile ("hit EOF after %d of %d B in block [%d:%d); "
"%d B ciphertext remaining for object no %d"
% (nct, nexpect, off, off + nexpect, ctleft,