kuroko/modules/codecs/infrastructure.krk
HarJIT 5c2de206b9
Codecs package (#4)
Codecs package

Co-authored-by: HarJIT <harjit@harjit.moe>
2021-03-24 04:53:02 -07:00

644 lines
25 KiB
Python

from codecs.isweblabel import map_weblabel
def idstr(obj):
let reprd = object.__repr__(obj)
return reprd.split(" at 0x")[1].split(">")[0]
let _encoder_registry = {}
let _decoder_registry = {}
def register_kuroko_codec(labels, incremental_encoder_class, incremental_decoder_class):
for label in labels:
let norm = label.replace("_", "-").lower()
if incremental_encoder_class:
if not issubclass(incremental_encoder_class, IncrementalEncoder):
raise ValueError(f"expected IncrementalEncoder subclass, got {incremental_encoder_class!r}")
if norm in _encoder_registry and _encoder_registry[norm] != incremental_encoder_class:
raise ValueError(f"label {label!r} already registered")
let webname = incremental_encoder_class.html5name
if webname != None and map_weblabel(webname.lower()) == None:
raise ValueError(f"purported HTML5 name {webname!r} is not an HTML5 label")
_encoder_registry[norm] = incremental_encoder_class
if incremental_decoder_class:
if not issubclass(incremental_decoder_class, IncrementalDecoder):
raise ValueError(f"expected IncrementalDecoder subclass, got {incremental_decoder_class!r}")
if norm in _decoder_registry and _decoder_registry[norm] != incremental_decoder_class:
raise ValueError(f"label {label!r} already registered")
let webname = incremental_decoder_class.html5name
if webname != None and map_weblabel(webname.lower()) == None:
raise ValueError(f"purported HTML5 name {webname!r} is not an HTML5 label")
_decoder_registry[norm] = incremental_decoder_class
class KurokoCodecInfo:
def __init__(label, encoder, decoder):
self.name = label
self.incrementalencoder = encoder
self.incrementaldecoder = decoder
def encode(string, errors="strict"):
if self.incrementalencoder:
return self.incrementalencoder(errors).encode(string, True)
raise ValueError(f"unrecognised encoding or decode-only encoding: {self.name!r}")
def decode(data, errors="strict"):
if self.incrementaldecoder:
return self.incrementaldecoder(errors).decode(data, True)
raise ValueError(f"unrecognised encoding or encode-only encoding: {self.name!r}")
def __repr__():
let ret = "<" + type(self).__name__ + " " + repr(self.name)
let enc = self.incrementalencoder
let dec = self.incrementaldecoder
if enc:
if enc.name != self.name or (enc.html5name and enc.html5name != self.name):
ret += "; encoded as " + repr(enc.name)
else:
ret += "; with encoder"
if not enc.html5name:
ret += " (non-HTML5)"
else if enc.html5name != enc.name:
ret += " (HTML5 " + repr(enc.html5name) + ")"
else:
ret += "; no encoder"
if dec:
if dec.name != self.name or (dec.html5name and dec.html5name != self.name):
ret += "; decoded as " + repr(dec.name)
else:
ret += "; with decoder"
if not dec.html5name:
ret += " (non-HTML5)"
else if dec.html5name != dec.name:
ret += " (HTML5 " + repr(dec.html5name) + ")"
else:
ret += "; no decoder"
return ret + "; at 0x" + idstr(self) + ">"
def lookup(label, web=False):
let proclabel = label.lower()
if web:
proclabel = map_weblabel(label)
if not proclabel:
raise KeyError(f"not a web label: {label!r}")
let enc = None
let dec = None
try:
enc = _encoder_registry[proclabel.replace("_", "-")]
except KeyError:
try:
dec = _decoder_registry[proclabel.replace("_", "-")]
except KeyError:
return KurokoCodecInfo(proclabel, enc, dec)
def encode(string, label, web=False, errors="strict"):
return lookup(label, web = web).encode(string, errors=errors)
def decode(data, label, web=False, errors="strict"):
return lookup(label, web = web).decode(data, errors=errors)
# Constructor is e.g. UnicodeEncodeError(encoding, object, start, end, reason)
# Wouldn't it be wonderful if Python bloody documented that anywhere (e.g. manual or docstring)?
# -- Har.
class UnicodeError(ValueError):
def __init__(encoding, object, start, end, reason):
self.encoding = encoding
self.object = object
self.start = start
self.end = end
self.reason = reason
def __repr__():
let c = type(self)
return f"{c.__name__}({self.encoding!r}, {self.object!r}, {self.start!r}, {self.end!r}, {self.reason!r})"
def __str__():
let c = type(self)
let slice
if isinstance(self.object, bytes):
slice = bytes(list(self.object)[self.start:self.end])
else:
slice = self.object[self.start:self.end]
return f"codec for {self.encoding!r} cannot process sequence {slice!r}: {self.reason}"
class UnicodeEncodeError(UnicodeError):
class UnicodeDecodeError(UnicodeError):
let _error_registry = {}
def register_error(name, handler):
_error_registry[name] = handler
def lookup_error(name):
return _error_registry[name]
def strict_errors(exc):
raise exc
register_error("strict", strict_errors)
def ignore_errors(exc):
if isinstance(exc, UnicodeEncodeError):
return (b"", exc.end)
return ("", exc.end)
register_error("ignore", ignore_errors)
def replace_errors(exc):
if isinstance(exc, UnicodeEncodeError):
return (b"?", exc.end)
else if isinstance(exc, UnicodeDecodeError):
return ("\uFFFD", exc.end)
else:
raise TypeError("'replace' handler expected UnicodeEncodeError or UnicodeDecodeError")
register_error("replace", replace_errors)
def warnreplace_errors(exc):
import fileio
fileio.stderr.write(type(exc).__name__ + ": " + str(exc) + "\n")
if isinstance(exc, UnicodeEncodeError):
return (b"?", exc.end)
else if isinstance(exc, UnicodeDecodeError):
return ("\uFFFD", exc.end)
else:
raise TypeError("'warnreplace' handler expected UnicodeEncodeError or UnicodeDecodeError")
register_error("warnreplace", warnreplace_errors)
def backslashreplace_errors(exc):
if isinstance(exc, UnicodeEncodeError):
# Work around str.format not supporting format specifiers
let myhex = hex(ord(exc.object[exc.start])).split("x", 1)[1]
let outhex
if len(myhex) <= 2:
outhex = "\\x" + ("0" * (2 - len(myhex))) + myhex
else if len(myhex) <= 4:
outhex = "\\u" + ("0" * (4 - len(myhex))) + myhex
else:
outhex = "\\U" + ("0" * (8 - len(myhex))) + myhex
return (outhex.encode(), exc.end)
else:
raise TypeError("'backslashreplace' handler is only for encoding")
register_error("backslashreplace", backslashreplace_errors)
def xmlcharrefreplace_errors(exc):
if isinstance(exc, UnicodeEncodeError):
let codepoint = ord(exc.object[exc.start])
# Per WHATWG (specified in its ISO-2022-JP encoder, the only one that
# generates encoding errors for these three control codes):
if codepoint in (0x0E, 0x0F, 0x1B): return (b"&#65533;", exc.end)
return (b"&#" + str(codepoint).encode() + b";", exc.end)
else:
raise TypeError("'xmlcharrefreplace' handler is only for encoding")
register_error("xmlcharrefreplace", xmlcharrefreplace_errors)
class ByteCatenator:
def __init__():
self.list = []
def add(data):
self.list.append(data)
def getvalue():
return b"".join(self.list)
class StringCatenator:
def __init__():
self.list = []
def add(string):
self.list.append(string)
def getvalue():
return "".join(self.list)
class IncrementalEncoder:
name = None
html5name = None
def __init__(errors):
self.errors = errors
self.reset()
def __repr__():
let c = type(self)
let w = "(non-HTML5)"
if self.html5name:
w = f"(HTML5 {self.html5name!r})"
let addr = idstr(self)
return f"<{c.__name__} instance: encoder for {self.name!r} {w} at 0x{addr}>"
def encode(string, final = False):
raise NotImplementedError("must be implemented by subclass")
def reset():
def getstate():
def setstate(state):
class IncrementalDecoder:
name = None
html5name = None
def __init__(errors):
self.errors = errors
self.reset()
def __repr__():
let c = type(self)
let w = "(non-HTML5)"
if self.html5name:
w = f"(HTML5 {self.html5name!r})"
let addr = idstr(self)
return f"<{c.__name__} instance: decoder for {self.name!r} {w} at 0x{addr}>"
def decode(data_in, final = False):
raise NotImplementedError("must be implemented by subclass")
def _handle_truncation(out, unused, final, data, offset, leader):
if len(leader) == 0:
return out.getvalue()
else if final:
let error = UnicodeDecodeError(self.name, data, offset - len(leader), offset, "truncated sequence")
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
return out.getvalue()
else:
self.pending = bytes(leader)
return out.getvalue()
def reset():
self.pending = b""
def getstate():
return self.pending
def setstate(state):
self.pending = state
class AsciiIncrementalEncoder(IncrementalEncoder):
# The obvious labels for ASCII are all Windows-1252 per WHATWG. Also, what people call
# "ASCII" in 8-bit-byte contexts (without backspace combining) is properly ISO-4873-DV.
name = "ecma-43-dv"
html5name = None
# For non-ASCII characters (this should work as a base class)
encoding_map = {}
ascii_exceptions = ()
#
_lead_codes = None
pending_lead = None
def __init__(errors):
IncrementalEncoder.__init__(self, errors)
self._lead_codes = {}
for i in self.encoding_map.keys():
if isinstance(i, tuple):
self._lead_codes.setdefault(i[0], []).append(i)
def encode(string_in, final = False):
let string = self.pending_lead + string_in
self.pending_lead = ""
let out = ByteCatenator()
let offset = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
return out.getvalue()
let i = string[offset]
if ord(i) in self._lead_codes:
let seqs = self._lead_codes[ord(i)]
let max_length = max([len(j) for j in seqs])
let string_bit = [ord(i) for i in string[offset:(offset + max_length)]]
let testable_length = len(string_bit)
for seq in seqs:
# TODO: where one mapped multi-codepoint sequence starts with another mapped
# multi-codepoint sequence is still pathological.
if tupleOf(*string_bit[:len(seq)]) == seq:
out.add(bytes(self.encoding_map[seq]))
offset += len(seq)
if offset >= len(string):
return out.getvalue()
i = string[offset]
break
else if (not final) and (tupleOf(*string_bit) ==
tupleOf(*list(seq)[:testable_length])):
self.pending_lead = "".join([chr(i) for i in string_bit])
return out.getvalue()
if ord(i) < 0x80 and ord(i) not in self.ascii_exceptions:
out.add(bytes([ord(i)]))
offset += 1
else if ord(i) in self.encoding_map:
let target = self.encoding_map[ord(i)]
if isinstance(target, tuple):
for individ in target:
out.add(bytes([individ]))
else:
out.add(bytes([target]))
offset += 1
else:
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"character not supported by target encoding")
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
self.pending_lead = ""
def getstate():
return self.pending_lead
def setstate(state):
self.pending_lead = state
class AsciiIncrementalDecoder(IncrementalDecoder):
name = "ecma-43-dv"
html5name = None
# For non-ASCII characters (this should work as a base class)
decoding_map = {}
dbrange = ()
tbrange = ()
trailrange = ()
ascii_exceptions = ()
def decode(data_in, final = False):
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
let bytemode = 1
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(data):
return self._handle_truncation(out, bytemode, final, data, offset, leader)
let i = data[offset]
if bytemode == 1 and i < 0x80 and i not in self.ascii_exceptions:
out.add(chr(i))
offset += 1
else if bytemode == 1 and i in self.dbrange:
bytemode = 2
leader.append(i)
offset += 1
else if bytemode == 1 and i in self.tbrange:
bytemode = 3
leader.append(i)
offset += 1
else if bytemode == 3 and len(leader) == 1 and i in self.trailrange:
leader.append(i)
offset += 1
else if bytemode == 1 and i in self.decoding_map:
out.add(chr(self.decoding_map[i]))
offset += 1
else if bytemode == 2 and (leader[0], i) in self.decoding_map:
let decoded = self.decoding_map[(leader[0], i)]
if isinstance(decoded, tuple):
for codepoint in decoded:
out.add(chr(codepoint))
else:
out.add(chr(decoded))
offset += 1
bytemode = 1
leader = []
else if bytemode == 3 and (leader[0], leader[1], i) in self.decoding_map:
out.add(chr(self.decoding_map[(leader[0], leader[1], i)]))
offset += 1
bytemode = 1
leader = []
else:
let errorstart = offset - len(leader)
let errorend = errorstart + bytemode
# Note: per WHATWG behaviour, if an invalid multi-byte code contains an ASCII byte,
# parsing shall resume at that byte. Also doing so for bytes outside of the
# trail byte range is technically a deviation from WHATWG, but seems sensible.
if bytemode > 1:
if len(leader) > 1 and leader[1] < 0x80:
errorend -= 2
else if i not in self.trailrange or i < 0x80:
errorend -= 1
let reason = "invalid sequence"
if bytemode == 1:
reason = "invalid byte"
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
bytemode = 1
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
register_kuroko_codec(["ecma-43-dv", "iso-4873-dv", "646", "cp367", "ibm367", "iso646-us",
"iso-646.irv-1991", "iso-ir-6", "us", "csascii"],
AsciiIncrementalEncoder, AsciiIncrementalDecoder)
class BaseEbcdicIncrementalEncoder(IncrementalEncoder):
name = None
html5name = None
sbcs_encode = {}
dbcshost_encode = {}
sbcsge_encode = {}
shift_to_dbcs = 0x0E
shift_to_sbcs = 0x0F
def encode(string, final = False):
let out = ByteCatenator()
let offset = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
if final and self.in_dbcshost:
out.add(bytes([self.shift_to_sbcs]))
self.in_dbcshost = False
return out.getvalue()
let i = string[offset]
if ord(i) in self.sbcs_encode and self.sbcs_encode[ord(i)] not in (
self.shift_to_dbcs, self.shift_to_sbcs, 0x08):
if self.in_dbcshost:
out.add(bytes([self.shift_to_sbcs]))
self.in_dbcshost = False
out.add(bytes([self.sbcs_encode[ord(i)]]))
offset += 1
else if ord(i) in self.sbcsge_encode:
out.add(b"\x08")
out.add(bytes([self.sbcsge_encode[ord(i)]]))
offset += 1
else if ord(i) in self.dbcshost_encode:
if not self.in_dbcshost:
out.add(bytes([self.shift_to_dbcs]))
self.in_dbcshost = True
let target = self.dbcshost_encode[ord(i)]
for individ in target:
out.add(bytes([individ]))
offset += 1
else if ord(i) == 0x3000:
if not self.in_dbcshost:
out.add(bytes([self.shift_to_dbcs]))
self.in_dbcshost = True
out.add(b"\x40\x40")
offset += 1
else:
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"character not supported by target encoding")
let errorret = lookup_error(self.errors)(error)
out.add(encode(errorret[0].decode(), self.name, errors="strict"))
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
self.in_dbcshost = False
def getstate():
return self.in_dbcshost
def setstate(state):
self.in_dbcshost = state
class BaseEbcdicIncrementalDecoder(IncrementalDecoder):
name = None
html5name = None
sbcs_decode = {}
dbcshost_decode = {}
sbcsge_decode = {}
shift_to_dbcs = 0x0E
shift_to_sbcs = 0x0F
def decode(data_in, final = False):
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(data):
return self._handle_truncation(out, None, final, data, offset, leader)
let i = data[offset]
if i == self.shift_to_sbcs and not leader:
self.in_dbcshost = False
offset += 1
else if i == self.shift_to_dbcs and not leader:
self.in_dbcshost = True
offset += 1
else if not self.in_dbcshost and not leader and i in self.sbcs_decode:
out.add(chr(self.sbcs_decode[i]))
offset += 1
else if not leader and i == 0x08:
leader.append(i)
offset += 1
else if leader and leader[0] == 0x08 and i in self.sbcsge_decode:
out.add(chr(self.sbcsge_decode[i]))
leader = []
offset += 1
else if self.in_dbcshost and not leader and (i < 0x40 or i == 0xFF):
out.add(chr(self.sbcs_decode[i]))
offset += 1
else if self.in_dbcshost and not leader and i == 0x40:
leader.append(i)
offset += 1
else if self.in_dbcshost and leader and leader[0] == 0x40:
if i == 0x40:
out.add("\u3000")
leader = []
offset += 1
else:
# Note: this is a leniency (unpaired 0x40 in DBCS-Host is not valid)
out.add(" ")
leader = []
continue # i.e. without incrementing offset
else if self.in_dbcshost and not leader and 0x41 <= i and i <= 0xFE:
offset += 1
leader.append(i)
else if self.in_dbcshost and leader and 0x41 <= i and i <= 0xFE and (
leader[0], i) in self.dbcshost_decode:
out.add(chr(self.dbcshost_decode[(leader[0], i)]))
leader = []
offset += 1
else:
let errorstart
let errorend
if leader:
errorstart = offset - len(leader)
if 0x41 <= leader[0] and leader[0] <= 0xFE and 0x41 <= i and i <= 0xFE:
errorend = offset + 1
else:
errorend = errorstart + 1
else:
errorstart = offset
errorend = offset + 1
let reason = "invalid sequence"
if not leader:
reason = "invalid byte"
else if self.in_dbcshost and leader and not (0x41 <= i and i <= 0xFE):
reason = "truncated sequence (lead byte not followed by trail byte)"
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
self.pending = b""
self.in_dbcshost = False
def getstate():
return (self.pending, self.in_dbcshost)
def setstate(state):
self.pending = state[0]
self.in_dbcshost = state[1]
class UndefinedIncrementalEncoder(IncrementalEncoder):
name = "undefined"
html5name = "replacement"
# WHATWG doesn't specify an encoder for "replacement" so follow Python "undefined" here.
# i.e. ignore the errors specifier and always use strict, and fail on even empty strings.
def __init__(errors):
def encode(string, final = False):
let error = UnicodeEncodeError(self.name, string, 0, len(string), "undefined encoding")
strict_errors(error)
class UndefinedIncrementalDecoder(IncrementalDecoder):
name = "undefined"
html5name = "replacement"
def decode(data, final = False):
if len(data) == 0:
return "" # per WHATWG, contra Python
let error = UnicodeDecodeError(self.name, data, 0, len(data), "undefined encoding")
let errorret = lookup_error(self.errors)(error) # per WHATWG, contra Python
return errorret[0]
register_kuroko_codec(
["undefined", "replacement"],
UndefinedIncrementalEncoder,
UndefinedIncrementalDecoder)
def lazy_property(method):
let memo = None
def retriever(this):
if memo == None:
memo = method(this)
return memo
return property(retriever)
class encodesto7bit:
def __init__(base):
self.base = base
def __contains__(key):
if key not in self.base: return False
let value = self.base[key]
if not isinstance(value, tuple): return False
if len(value) != 2: return False
let i, j = value
if not (isinstance(i, int) and isinstance(j, int)): return False
if i < 0xA1 or i > 0xFE or j < 0xA1 or j > 0xFE: return False
return True
def __getitem__(key):
if key not in self:
raise KeyError(f"element {key!r} not in 7-bit wrapper dict")
let i, j = self.base[key]
return (i &~ 0x80, j &~ 0x80)
def keys():
let ret = []
for i in self.base.keys():
let val = self.base[i]
if isinstance(val, tuple) and len(val) == 2:
let j, k = val
if 0xA1 <= j and j <= 0xFE and 0xA1 <= k and k <= 0xFE:
ret.append(i)
return ret
def __iter__():
return self.keys().__iter__()
class decodesto7bit:
def __init__(base):
self.base = base
def __contains__(key):
if not isinstance(key, tuple): return False
if len(key) != 2: return False
let i, j = key
if not (isinstance(i, int) and isinstance(j, int)): return False
if i < 0x21 or i > 0x7E or j < 0x21 or j > 0x7E: return False
return (i | 0x80, j | 0x80) in self.base
def __getitem__(key):
if key not in self:
raise KeyError(f"element {key!r} not in 7-bit wrapper dict")
let i, j = key
return self.base[(i | 0x80, j | 0x80)]
def keys():
let ret = []
for i in self.base.keys():
if isinstance(i, tuple) and len(i) == 2:
let j, k = i
if 0xA1 <= j and j <= 0xFE and 0xA1 <= k and k <= 0xFE:
ret.append((j &~ 0x80, k &~ 0x80))
return ret
def __iter__():
return self.keys().__iter__()