867 lines
36 KiB
Python
867 lines
36 KiB
Python
"""Underpinning infrastructure for the codecs module."""
|
|
|
|
from codecs.isweblabel import map_weblabel
|
|
def _idstr(obj):
|
|
let reprd = object.__repr__(obj)
|
|
return reprd.split(" at 0x")[1].split(">")[0]
|
|
|
|
let _encoder_registry = {}
|
|
let _decoder_registry = {}
|
|
def register_kuroko_codec(labels, incremental_encoder_class, incremental_decoder_class):
|
|
"""
|
|
Register a given `IncrementalEncoder` subclass and a given `IncrementalDecoder` subclass
|
|
with a given list of labels. Usually, this is expected to include the encoding name, along
|
|
with a list labels for aliases and/or subsets of the encoding. Either coder class may be `None`,
|
|
if the encoder/decoder labels are being registered asymmetrically.
|
|
"""
|
|
for label in labels:
|
|
let norm = label.replace("_", "-").lower()
|
|
if incremental_encoder_class:
|
|
if not issubclass(incremental_encoder_class, IncrementalEncoder):
|
|
raise ValueError(f"expected IncrementalEncoder subclass, got {incremental_encoder_class!r}")
|
|
if norm in _encoder_registry and _encoder_registry[norm] != incremental_encoder_class:
|
|
raise ValueError(f"label {label!r} already registered")
|
|
let webname = incremental_encoder_class.html5name
|
|
if webname != None and map_weblabel(webname.lower()) == None:
|
|
raise ValueError(f"purported HTML5 name {webname!r} is not an HTML5 label")
|
|
_encoder_registry[norm] = incremental_encoder_class
|
|
if incremental_decoder_class:
|
|
if not issubclass(incremental_decoder_class, IncrementalDecoder):
|
|
raise ValueError(f"expected IncrementalDecoder subclass, got {incremental_decoder_class!r}")
|
|
if norm in _decoder_registry and _decoder_registry[norm] != incremental_decoder_class:
|
|
raise ValueError(f"label {label!r} already registered")
|
|
let webname = incremental_decoder_class.html5name
|
|
if webname != None and map_weblabel(webname.lower()) == None:
|
|
raise ValueError(f"purported HTML5 name {webname!r} is not an HTML5 label")
|
|
_decoder_registry[norm] = incremental_decoder_class
|
|
|
|
class KurokoCodecInfo:
|
|
"""
|
|
Descriptor for the registered encoder and decoder for a given label. Has five members:
|
|
|
|
- `name`: the label covered by this descriptor.
|
|
- `encode`: encode a complete Unicode sequence.
|
|
- `decode`: decode a complete byte sequence.
|
|
- `incrementalencoder`: IncrementalEncoder subclass.
|
|
- `incrementaldecoder`: IncrementalDecoder subclass.
|
|
"""
|
|
def __init__(label, encoder, decoder):
|
|
self.name = label
|
|
self.incrementalencoder = encoder
|
|
self.incrementaldecoder = decoder
|
|
def encode(string, errors="strict"):
|
|
"""
|
|
Encode a complete Unicode sequence to a complete byte string.
|
|
Semantic of name passed to `errors=` is as documented for `lookup_error()`.
|
|
"""
|
|
if self.incrementalencoder:
|
|
return self.incrementalencoder(errors).encode(string, True)
|
|
raise ValueError(f"unrecognised encoding or decode-only encoding: {self.name!r}")
|
|
def decode(data, errors="strict"):
|
|
"""
|
|
Decode a complete byte sequence to a complete Unicode stream.
|
|
Semantic of name passed to `errors=` is as documented for `lookup_error()`.
|
|
"""
|
|
if self.incrementaldecoder:
|
|
return self.incrementaldecoder(errors).decode(data, True)
|
|
raise ValueError(f"unrecognised encoding or encode-only encoding: {self.name!r}")
|
|
def __repr__():
|
|
let ret = "<" + type(self).__name__ + " " + repr(self.name)
|
|
let enc = self.incrementalencoder
|
|
let dec = self.incrementaldecoder
|
|
if enc:
|
|
if enc.name != self.name or (enc.html5name and enc.html5name != self.name):
|
|
ret += "; encoded as " + repr(enc.name)
|
|
else:
|
|
ret += "; with encoder"
|
|
if not enc.html5name:
|
|
ret += " (non-HTML5)"
|
|
else if enc.html5name != enc.name:
|
|
ret += " (HTML5 " + repr(enc.html5name) + ")"
|
|
else:
|
|
ret += "; no encoder"
|
|
if dec:
|
|
if dec.name != self.name or (dec.html5name and dec.html5name != self.name):
|
|
ret += "; decoded as " + repr(dec.name)
|
|
else:
|
|
ret += "; with decoder"
|
|
if not dec.html5name:
|
|
ret += " (non-HTML5)"
|
|
else if dec.html5name != dec.name:
|
|
ret += " (HTML5 " + repr(dec.html5name) + ")"
|
|
else:
|
|
ret += "; no decoder"
|
|
return ret + "; at 0x" + _idstr(self) + ">"
|
|
|
|
def lookup(label, web=False):
|
|
"""
|
|
Obtain a `KurokoCodecInfo` for a given label. If `web=False` (the default), will always succeed,
|
|
but the resulting `KurokoCodecInfo` might be unable to encode and/or unable to decode if the
|
|
label is not recognised in that direction. If `web=True`, will raise KeyError if the label is
|
|
not a WHATWG-permitted label, and will map certain labels to undefined per the WHATWG spec.
|
|
|
|
Can be simply accessed as `codecs.lookup`.
|
|
"""
|
|
let proclabel = label.lower()
|
|
if web:
|
|
proclabel = map_weblabel(label)
|
|
if not proclabel:
|
|
raise KeyError(f"not a web label: {label!r}")
|
|
let enc = None
|
|
let dec = None
|
|
try:
|
|
enc = _encoder_registry[proclabel.replace("_", "-")]
|
|
except KeyError:
|
|
try:
|
|
dec = _decoder_registry[proclabel.replace("_", "-")]
|
|
except KeyError:
|
|
return KurokoCodecInfo(proclabel, enc, dec)
|
|
|
|
def encode(string, label, web=False, errors="strict"):
|
|
"""
|
|
Encode a complete Unicode sequence to a complete byte string in the given encoding. Semantic
|
|
of the web= argument is the same as with `lookup()`. Semantic of name passed to errors= is as
|
|
documented for `lookup_error()`.
|
|
|
|
Can be simply accessed as `codecs.encode`.
|
|
"""
|
|
return lookup(label, web = web).encode(string, errors=errors)
|
|
|
|
def decode(data, label, web=False, errors="strict"):
|
|
"""
|
|
Decode a complete byte sequence in the given encoding to a complete Unicode stream. Semantic
|
|
of the web= argument is the same as with `lookup()`. Semantic of name passed to errors= is as
|
|
documented for `lookup_error()`.
|
|
|
|
Can be simply accessed as `codecs.decode`.
|
|
"""
|
|
return lookup(label, web = web).decode(data, errors=errors)
|
|
|
|
# Constructor is e.g. UnicodeEncodeError(encoding, object, start, end, reason)
|
|
# Wouldn't it be wonderful if Python bloody documented that anywhere (e.g. manual or docstring)?
|
|
# -- Har.
|
|
class UnicodeError(ValueError):
|
|
"""
|
|
Exception raised when an error is encountered or detected in the process of encoding or
|
|
decoding. May instead be passed to a handler when not in strict mode. Contains machine-readable
|
|
information about the error encountered, allowing approaches to respond to it.
|
|
"""
|
|
def __init__(encoding, object, start, end, reason):
|
|
self.encoding = encoding
|
|
self.object = object
|
|
self.start = start
|
|
self.end = end
|
|
self.reason = reason
|
|
def __repr__():
|
|
let c = type(self)
|
|
return f"{c.__name__}({self.encoding!r}, {self.object!r}, {self.start!r}, {self.end!r}, {self.reason!r})"
|
|
def __str__():
|
|
let c = type(self)
|
|
let slice
|
|
if isinstance(self.object, bytes):
|
|
slice = bytes(list(self.object)[self.start:self.end])
|
|
else:
|
|
slice = self.object[self.start:self.end]
|
|
return f"codec for {self.encoding!r} cannot process sequence {slice!r}: {self.reason}"
|
|
|
|
class UnicodeEncodeError(UnicodeError):
|
|
"""
|
|
UnicodeError subclass raised when an error is encountered in the process of encoding.
|
|
"""
|
|
class UnicodeDecodeError(UnicodeError):
|
|
"""
|
|
UnicodeError subclass raised when an error is encountered in the process of decoding.
|
|
"""
|
|
|
|
let _error_registry = {}
|
|
|
|
def register_error(name, handler):
|
|
"""
|
|
Reister a new error handler. The handler should be a function taking a `UnicodeError` and
|
|
either raising an exception or returning a tuple of (substitute, resume_index). The substitute
|
|
should be bytes (usually expected to be in ASCII) for a `UnicodeEncodeError`, str otherwise.
|
|
"""
|
|
_error_registry[name] = handler
|
|
|
|
def lookup_error(name):
|
|
"""
|
|
Look up an error handler function registered with a certain name. By default, the following
|
|
are registered. It is important to note that nothing obligates a codec to actually *use* the
|
|
error handler if it is not deemed possible or appropriate, and so specifying a non-strict
|
|
error handler will not guarantee an exception will not be raised, especially when working with
|
|
a codec which is not a "normal" text encoding (e.g. `undefined` or `inverse-base64`).
|
|
|
|
- `strict`: raise an exception.
|
|
- `ignore`: skip invalid substrings. Not always recommended: can facilitate masked injection.
|
|
- `replace`: insert a replacement character (decoding) or question mark (encoding).
|
|
- `warnreplace`: like `replace` but prints a message to stderr; good for debugging.
|
|
- `backslashreplace`: replace with Python/Kuroko style Unicode escapes. Note that this only
|
|
matches JavaScript escape syntax for Basic Multilingual Plane characters. Encoding only.
|
|
- `xmlcharrefreplace`: replace with HTML/XML numerical entities. Note that this will, per
|
|
WHATWG, never generate entities for Shift Out, Shift In and Escape (i.e. when encoding to a
|
|
stateful encoding which uses them, e.g. ISO-2022-JP), instead generating an entity for the
|
|
replacement character. Encoding only.
|
|
"""
|
|
return _error_registry[name]
|
|
|
|
def strict_errors(exc):
|
|
"""
|
|
Handler for `strict` errors: raise the exception.
|
|
"""
|
|
raise exc
|
|
register_error("strict", strict_errors)
|
|
|
|
def ignore_errors(exc):
|
|
"""
|
|
Handler for `ignore` errors: skip invalid sequences.
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
return (b"", exc.end)
|
|
return ("", exc.end)
|
|
register_error("ignore", ignore_errors)
|
|
|
|
def replace_errors(exc):
|
|
"""
|
|
Handler for `replace` errors: insert replacement character (if decoding) or
|
|
question mark (if encoding).
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
return (b"?", exc.end)
|
|
else if isinstance(exc, UnicodeDecodeError):
|
|
return ("\uFFFD", exc.end)
|
|
else:
|
|
raise TypeError("'replace' handler expected UnicodeEncodeError or UnicodeDecodeError")
|
|
register_error("replace", replace_errors)
|
|
|
|
def warnreplace_errors(exc):
|
|
"""
|
|
Handler for `warnreplace` errors: insert replacement character (if decoding) or question mark
|
|
(if encoding) and print a warning to `stderr`.
|
|
"""
|
|
import fileio
|
|
fileio.stderr.write(type(exc).__name__ + ": " + str(exc) + "\n")
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
return (b"?", exc.end)
|
|
else if isinstance(exc, UnicodeDecodeError):
|
|
return ("\uFFFD", exc.end)
|
|
else:
|
|
raise TypeError("'warnreplace' handler expected UnicodeEncodeError or UnicodeDecodeError")
|
|
register_error("warnreplace", warnreplace_errors)
|
|
|
|
def backslashreplace_errors(exc):
|
|
"""
|
|
Handler for `backslashreplace` errors: replace unencodable character with Python/Kuroko style
|
|
escape sequence. For Basic Multilingual Plane characters, this also matches JavaScript; beyond
|
|
that, they differ.
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
# Work around str.format not supporting format specifiers
|
|
let myhex = hex(ord(exc.object[exc.start])).split("x", 1)[1]
|
|
let outhex
|
|
if len(myhex) <= 2:
|
|
outhex = "\\x" + ("0" * (2 - len(myhex))) + myhex
|
|
else if len(myhex) <= 4:
|
|
outhex = "\\u" + ("0" * (4 - len(myhex))) + myhex
|
|
else:
|
|
outhex = "\\U" + ("0" * (8 - len(myhex))) + myhex
|
|
return (outhex.encode(), exc.end)
|
|
else:
|
|
raise TypeError("'backslashreplace' handler is only for encoding")
|
|
register_error("backslashreplace", backslashreplace_errors)
|
|
|
|
def xmlcharrefreplace_errors(exc):
|
|
"""
|
|
Handler for `xmlcharrefreplace` errors: replace unencodable character with XML numeric entity
|
|
for the character unless it is Shift Out, Shift In or Escape, in which case insert the XML
|
|
numeric entity for the replacement character (as stipulated by WHATWG for ISO-2022-JP).
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
let codepoint = ord(exc.object[exc.start])
|
|
# Per WHATWG (specified in its ISO-2022-JP encoder, the only one that
|
|
# generates encoding errors for these three control codes):
|
|
if codepoint in (0x0E, 0x0F, 0x1B): return (b"�", exc.end)
|
|
return (b"&#" + str(codepoint).encode() + b";", exc.end)
|
|
else:
|
|
raise TypeError("'xmlcharrefreplace' handler is only for encoding")
|
|
register_error("xmlcharrefreplace", xmlcharrefreplace_errors)
|
|
|
|
class ByteCatenator:
|
|
"""
|
|
Helper class for maintaining a stream to which `bytes` objects will be repeatedly catenated
|
|
in place.
|
|
"""
|
|
def __init__():
|
|
self.list = []
|
|
def add(data):
|
|
self.list.append(data)
|
|
def getvalue():
|
|
return b"".join(self.list)
|
|
|
|
class StringCatenator:
|
|
"""
|
|
Helper class for maintaining a stream to which `str` objects will be repeatedly catenated
|
|
in place.
|
|
"""
|
|
def __init__():
|
|
self.list = []
|
|
def add(string):
|
|
self.list.append(string)
|
|
def getvalue():
|
|
return "".join(self.list)
|
|
|
|
class IncrementalEncoder:
|
|
"""
|
|
Incremental encoder, allowing more encoded data to be generated as more Unicode data is
|
|
obtained. Note that the return values from `encode` are not guaranteed to encompass all data
|
|
which has been passed in, until it is called with `final=True`.
|
|
|
|
This is the base class and should not be instantiated directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
def __init__(errors):
|
|
self.errors = errors
|
|
self.reset()
|
|
def __repr__():
|
|
let c = type(self)
|
|
let w = "(non-HTML5)"
|
|
if self.html5name:
|
|
w = f"(HTML5 {self.html5name!r})"
|
|
let addr = _idstr(self)
|
|
return f"<{c.__name__} instance: encoder for {self.name!r} {w} at 0x{addr}>"
|
|
def encode(string, final = False):
|
|
"""
|
|
Passes the given string in to the encoder, and returns a sequence of bytes. When
|
|
final=False, the return value might not represent the entire input (some of which may
|
|
become represented at the start of the value returned by the next call). When final=True,
|
|
all of the input will be represented, and any final state change sequence required by the
|
|
encoding will be outputted.
|
|
"""
|
|
raise NotImplementedError("must be implemented by subclass")
|
|
def reset():
|
|
"""
|
|
Reset encoder to initial state, without outputting, discarding any pending data.
|
|
"""
|
|
pass
|
|
def getstate():
|
|
"""
|
|
Returns an arbitrary object encapsulating encoder state.
|
|
"""
|
|
pass
|
|
def setstate(state):
|
|
"""
|
|
Sets encoder state to one previously returned by getstate().
|
|
"""
|
|
pass
|
|
|
|
class IncrementalDecoder:
|
|
"""
|
|
Incremental decoder, allowing more Unicode data to be generated as more encoded data is
|
|
obtained. Note that the return values from `decode` are not guaranteed to encompass all data
|
|
which has been passed in, until it is called with `final=True`.
|
|
|
|
This is the base class and should not be instantiated directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
def __init__(errors):
|
|
self.errors = errors
|
|
self.reset()
|
|
def __repr__():
|
|
let c = type(self)
|
|
let w = "(non-HTML5)"
|
|
if self.html5name:
|
|
w = f"(HTML5 {self.html5name!r})"
|
|
let addr = _idstr(self)
|
|
return f"<{c.__name__} instance: decoder for {self.name!r} {w} at 0x{addr}>"
|
|
def decode(data_in, final = False):
|
|
"""
|
|
Passes the given bytes in to the encoder, and returns a Unicode string. When
|
|
final=False, the return value might not represent the entire input (some of which may
|
|
become represented at the start of the value returned by the next call). When final=True,
|
|
all of the input will be represented, and an error will be generated if it is truncated.
|
|
"""
|
|
raise NotImplementedError("must be implemented by subclass")
|
|
def _handle_truncation(out, unused, final, data, offset, leader):
|
|
"""
|
|
Helper function used by subclasses to handle any pending data when returning from `decode`.
|
|
"""
|
|
if len(leader) == 0:
|
|
return out.getvalue()
|
|
else if final:
|
|
let error = UnicodeDecodeError(self.name, data, offset - len(leader), offset, "truncated sequence")
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
return out.getvalue()
|
|
else:
|
|
self.pending = bytes(leader)
|
|
return out.getvalue()
|
|
def reset():
|
|
"""
|
|
Reset decoder to initial state, without outputting, discarding any pending data.
|
|
"""
|
|
self.pending = b""
|
|
def getstate():
|
|
"""
|
|
Returns an arbitrary object encapsulating decoder state.
|
|
"""
|
|
return self.pending
|
|
def setstate(state):
|
|
"""
|
|
Sets decoder state to one previously returned by getstate().
|
|
"""
|
|
self.pending = state
|
|
|
|
class AsciiIncrementalEncoder(IncrementalEncoder):
|
|
"""
|
|
Encoder for ISO/IEC 4873-DV, and base class for simple _sensu lato_ extended ASCII encoders.
|
|
Encoders for more complex cases, such as ISO-2022-JP, do not inherit from this class.
|
|
|
|
ISO/IEC 4873-DV is, as of the current (third) edition of ISO/IEC 4873, the same as what
|
|
people usually mean when they say "ASCII" (i.e. an eighth bit exists but is never used, and
|
|
backspace composition is not a thing which exists for encoding characters).
|
|
"""
|
|
# The obvious labels for ASCII are all Windows-1252 per WHATWG. Also, what people call
|
|
# "ASCII" in 8-bit-byte contexts (without backspace combining) is properly ISO-4873-DV.
|
|
name = "ecma-43-dv"
|
|
html5name = None
|
|
# For non-ASCII characters (this should work as a base class)
|
|
encoding_map = {}
|
|
ascii_exceptions = ()
|
|
#
|
|
_lead_codes = None
|
|
pending_lead = None
|
|
def __init__(errors):
|
|
IncrementalEncoder.__init__(self, errors)
|
|
self._lead_codes = {}
|
|
for i in self.encoding_map.keys():
|
|
if isinstance(i, tuple):
|
|
self._lead_codes.setdefault(i[0], []).append(i)
|
|
def encode(string_in, final = False):
|
|
"""Implements `IncrementalEncoder.encode`"""
|
|
let string = self.pending_lead + string_in
|
|
self.pending_lead = ""
|
|
let out = ByteCatenator()
|
|
let offset = 0
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(string):
|
|
return out.getvalue()
|
|
let i = string[offset]
|
|
if ord(i) in self._lead_codes:
|
|
let seqs = self._lead_codes[ord(i)]
|
|
let max_length = max([len(j) for j in seqs])
|
|
let string_bit = [ord(i) for i in string[offset:(offset + max_length)]]
|
|
let testable_length = len(string_bit)
|
|
for seq in seqs:
|
|
# TODO: where one mapped multi-codepoint sequence starts with another mapped
|
|
# multi-codepoint sequence is still pathological.
|
|
if tuple(string_bit[:len(seq)]) == seq:
|
|
out.add(bytes(self.encoding_map[seq]))
|
|
offset += len(seq)
|
|
if offset >= len(string):
|
|
return out.getvalue()
|
|
i = string[offset]
|
|
break
|
|
else if (not final) and (tuple(string_bit) ==
|
|
tuple(list(seq)[:testable_length])):
|
|
self.pending_lead = "".join([chr(i) for i in string_bit])
|
|
return out.getvalue()
|
|
if ord(i) < 0x80 and ord(i) not in self.ascii_exceptions:
|
|
out.add(bytes([ord(i)]))
|
|
offset += 1
|
|
else if ord(i) in self.encoding_map:
|
|
let target = self.encoding_map[ord(i)]
|
|
if isinstance(target, tuple):
|
|
for individ in target:
|
|
out.add(bytes([individ]))
|
|
else:
|
|
out.add(bytes([target]))
|
|
offset += 1
|
|
else:
|
|
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
|
"character not supported by target encoding")
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(string)
|
|
def reset():
|
|
"""Implements `IncrementalEncoder.reset`"""
|
|
self.pending_lead = ""
|
|
def getstate():
|
|
"""Implements `IncrementalEncoder.getstate`"""
|
|
return self.pending_lead
|
|
def setstate(state):
|
|
"""Implements `IncrementalEncoder.setstate`"""
|
|
self.pending_lead = state
|
|
|
|
class AsciiIncrementalDecoder(IncrementalDecoder):
|
|
"""
|
|
Decoder for ISO/IEC 4873-DV, and base class for simple _sensu lato_ extended ASCII decoders.
|
|
Decoders for more complex cases, such as ISO-2022-JP, do not inherit from this class.
|
|
|
|
ISO/IEC 4873-DV is, as of the current (third) edition of ISO/IEC 4873, the same as what
|
|
people usually mean when they say "ASCII" (i.e. an eighth bit exists but is never used, and
|
|
backspace composition is not a thing which exists for encoding characters).
|
|
"""
|
|
name = "ecma-43-dv"
|
|
html5name = None
|
|
# For non-ASCII characters (this should work as a base class)
|
|
decoding_map = {}
|
|
dbrange = ()
|
|
tbrange = ()
|
|
trailrange = ()
|
|
ascii_exceptions = ()
|
|
def decode(data_in, final = False):
|
|
"""Implements `IncrementalDecoder.decode`"""
|
|
let data = self.pending + data_in
|
|
self.pending = b""
|
|
let out = StringCatenator()
|
|
let offset = 0
|
|
let leader = []
|
|
let bytemode = 1
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(data):
|
|
return self._handle_truncation(out, bytemode, final, data, offset, leader)
|
|
let i = data[offset]
|
|
if bytemode == 1 and i < 0x80 and i not in self.ascii_exceptions:
|
|
out.add(chr(i))
|
|
offset += 1
|
|
else if bytemode == 1 and i in self.dbrange:
|
|
bytemode = 2
|
|
leader.append(i)
|
|
offset += 1
|
|
else if bytemode == 1 and i in self.tbrange:
|
|
bytemode = 3
|
|
leader.append(i)
|
|
offset += 1
|
|
else if bytemode == 3 and len(leader) == 1 and i in self.trailrange:
|
|
leader.append(i)
|
|
offset += 1
|
|
else if bytemode == 1 and i in self.decoding_map:
|
|
out.add(chr(self.decoding_map[i]))
|
|
offset += 1
|
|
else if bytemode == 2 and (leader[0], i) in self.decoding_map:
|
|
let decoded = self.decoding_map[(leader[0], i)]
|
|
if isinstance(decoded, tuple):
|
|
for codepoint in decoded:
|
|
out.add(chr(codepoint))
|
|
else:
|
|
out.add(chr(decoded))
|
|
offset += 1
|
|
bytemode = 1
|
|
leader = []
|
|
else if bytemode == 3 and (leader[0], leader[1], i) in self.decoding_map:
|
|
out.add(chr(self.decoding_map[(leader[0], leader[1], i)]))
|
|
offset += 1
|
|
bytemode = 1
|
|
leader = []
|
|
else:
|
|
let errorstart = offset - len(leader)
|
|
let errorend = errorstart + bytemode
|
|
# Note: per WHATWG behaviour, if an invalid multi-byte code contains an ASCII byte,
|
|
# parsing shall resume at that byte. Also doing so for bytes outside of the
|
|
# trail byte range is technically a deviation from WHATWG, but seems sensible.
|
|
if bytemode > 1:
|
|
if len(leader) > 1 and leader[1] < 0x80:
|
|
errorend -= 2
|
|
else if i not in self.trailrange or i < 0x80:
|
|
errorend -= 1
|
|
let reason = "invalid sequence"
|
|
if bytemode == 1:
|
|
reason = "invalid byte"
|
|
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
|
|
bytemode = 1
|
|
leader = []
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(data)
|
|
|
|
register_kuroko_codec(["ecma-43-dv", "iso-4873-dv", "646", "cp367", "ibm367", "iso646-us",
|
|
"iso-646.irv-1991", "iso-ir-6", "us", "csascii"],
|
|
AsciiIncrementalEncoder, AsciiIncrementalDecoder)
|
|
|
|
class BaseEbcdicIncrementalEncoder(IncrementalEncoder):
|
|
"""
|
|
Base class for EBCDIC encoders.
|
|
|
|
On its own, it is only capable of encoding `U+3000` (as ``x'0E', x'40', x'40', x'0F'``); hence,
|
|
it should not, generally speaking, be used directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
sbcs_encode = {}
|
|
dbcshost_encode = {}
|
|
sbcsge_encode = {}
|
|
shift_to_dbcs = 0x0E
|
|
shift_to_sbcs = 0x0F
|
|
def encode(string, final = False):
|
|
"""Implements `IncrementalEncoder.encode`"""
|
|
let out = ByteCatenator()
|
|
let offset = 0
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(string):
|
|
if final and self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_sbcs]))
|
|
self.in_dbcshost = False
|
|
return out.getvalue()
|
|
let i = string[offset]
|
|
if ord(i) in self.sbcs_encode and self.sbcs_encode[ord(i)] not in (
|
|
self.shift_to_dbcs, self.shift_to_sbcs, 0x08):
|
|
if self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_sbcs]))
|
|
self.in_dbcshost = False
|
|
out.add(bytes([self.sbcs_encode[ord(i)]]))
|
|
offset += 1
|
|
else if ord(i) in self.sbcsge_encode:
|
|
out.add(b"\x08")
|
|
out.add(bytes([self.sbcsge_encode[ord(i)]]))
|
|
offset += 1
|
|
else if ord(i) in self.dbcshost_encode:
|
|
if not self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_dbcs]))
|
|
self.in_dbcshost = True
|
|
let target = self.dbcshost_encode[ord(i)]
|
|
for individ in target:
|
|
out.add(bytes([individ]))
|
|
offset += 1
|
|
else if ord(i) == 0x3000:
|
|
if not self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_dbcs]))
|
|
self.in_dbcshost = True
|
|
out.add(b"\x40\x40")
|
|
offset += 1
|
|
else:
|
|
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
|
"character not supported by target encoding")
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(encode(errorret[0].decode(), self.name, errors="strict"))
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(string)
|
|
def reset():
|
|
"""Implements `IncrementalEncoder.reset`"""
|
|
self.in_dbcshost = False
|
|
def getstate():
|
|
"""Implements `IncrementalEncoder.getstate`"""
|
|
return self.in_dbcshost
|
|
def setstate(state):
|
|
"""Implements `IncrementalEncoder.setstate`"""
|
|
self.in_dbcshost = state
|
|
|
|
class BaseEbcdicIncrementalDecoder(IncrementalDecoder):
|
|
"""
|
|
Base class for EBCDIC decoders.
|
|
|
|
On its own, it is only capable of decoding `U+3000` (from ``x'0E', x'40', x'40', x'0F'``); hence,
|
|
it should not, generally speaking, be used directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
sbcs_decode = {}
|
|
dbcshost_decode = {}
|
|
sbcsge_decode = {}
|
|
shift_to_dbcs = 0x0E
|
|
shift_to_sbcs = 0x0F
|
|
def decode(data_in, final = False):
|
|
"""Implements `IncrementalDecoder.decode`"""
|
|
let data = self.pending + data_in
|
|
self.pending = b""
|
|
let out = StringCatenator()
|
|
let offset = 0
|
|
let leader = []
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(data):
|
|
return self._handle_truncation(out, None, final, data, offset, leader)
|
|
let i = data[offset]
|
|
if i == self.shift_to_sbcs and not leader:
|
|
self.in_dbcshost = False
|
|
offset += 1
|
|
else if i == self.shift_to_dbcs and not leader:
|
|
self.in_dbcshost = True
|
|
offset += 1
|
|
else if not self.in_dbcshost and not leader and i in self.sbcs_decode:
|
|
out.add(chr(self.sbcs_decode[i]))
|
|
offset += 1
|
|
else if not leader and i == 0x08:
|
|
leader.append(i)
|
|
offset += 1
|
|
else if leader and leader[0] == 0x08 and i in self.sbcsge_decode:
|
|
out.add(chr(self.sbcsge_decode[i]))
|
|
leader = []
|
|
offset += 1
|
|
else if self.in_dbcshost and not leader and (i < 0x40 or i == 0xFF):
|
|
out.add(chr(self.sbcs_decode[i]))
|
|
offset += 1
|
|
else if self.in_dbcshost and not leader and i == 0x40:
|
|
leader.append(i)
|
|
offset += 1
|
|
else if self.in_dbcshost and leader and leader[0] == 0x40:
|
|
if i == 0x40:
|
|
out.add("\u3000")
|
|
leader = []
|
|
offset += 1
|
|
else:
|
|
# Note: this is a leniency (unpaired 0x40 in DBCS-Host is not valid)
|
|
out.add(" ")
|
|
leader = []
|
|
continue # i.e. without incrementing offset
|
|
else if self.in_dbcshost and not leader and 0x41 <= i and i <= 0xFE:
|
|
offset += 1
|
|
leader.append(i)
|
|
else if self.in_dbcshost and leader and 0x41 <= i and i <= 0xFE and (
|
|
leader[0], i) in self.dbcshost_decode:
|
|
out.add(chr(self.dbcshost_decode[(leader[0], i)]))
|
|
leader = []
|
|
offset += 1
|
|
else:
|
|
let errorstart
|
|
let errorend
|
|
if leader:
|
|
errorstart = offset - len(leader)
|
|
if 0x41 <= leader[0] and leader[0] <= 0xFE and 0x41 <= i and i <= 0xFE:
|
|
errorend = offset + 1
|
|
else:
|
|
errorend = errorstart + 1
|
|
else:
|
|
errorstart = offset
|
|
errorend = offset + 1
|
|
let reason = "invalid sequence"
|
|
if not leader:
|
|
reason = "invalid byte"
|
|
else if self.in_dbcshost and leader and not (0x41 <= i and i <= 0xFE):
|
|
reason = "truncated sequence (lead byte not followed by trail byte)"
|
|
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
|
|
leader = []
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(data)
|
|
def reset():
|
|
"""Implements `IncrementalDecoder.reset`"""
|
|
self.pending = b""
|
|
self.in_dbcshost = False
|
|
def getstate():
|
|
"""Implements `IncrementalDecoder.getstate`"""
|
|
return (self.pending, self.in_dbcshost)
|
|
def setstate(state):
|
|
"""Implements `IncrementalDecoder.setstate`"""
|
|
self.pending = state[0]
|
|
self.in_dbcshost = state[1]
|
|
|
|
class UndefinedIncrementalEncoder(IncrementalEncoder):
|
|
"""
|
|
Encoder which errors out on all input. For use on input for which encoding should not be
|
|
attempted. Error handler is ignored.
|
|
"""
|
|
name = "undefined"
|
|
html5name = "replacement"
|
|
# WHATWG doesn't specify an encoder for "replacement" so follow Python "undefined" here.
|
|
# i.e. ignore the errors specifier and always use strict, and fail on even empty strings.
|
|
def __init__(errors):
|
|
def encode(string, final = False):
|
|
let error = UnicodeEncodeError(self.name, string, 0, len(string), "undefined encoding")
|
|
strict_errors(error)
|
|
|
|
class UndefinedIncrementalDecoder(IncrementalDecoder):
|
|
"""
|
|
Decoder which errors out on all input. For use on input for which decoding should not be
|
|
attempted. Error handler is honoured, and called once per non-empty `decode` method call.
|
|
"""
|
|
name = "undefined"
|
|
html5name = "replacement"
|
|
def decode(data, final = False):
|
|
if len(data) == 0:
|
|
return "" # per WHATWG, contra Python
|
|
let error = UnicodeDecodeError(self.name, data, 0, len(data), "undefined encoding")
|
|
let errorret = lookup_error(self.errors)(error) # per WHATWG, contra Python
|
|
return errorret[0]
|
|
|
|
register_kuroko_codec(
|
|
["undefined", "replacement"],
|
|
UndefinedIncrementalEncoder,
|
|
UndefinedIncrementalDecoder)
|
|
|
|
|
|
def lazy_property(method):
|
|
"""
|
|
Like property(…), but memoises the value returned. The return value is assumed to be
|
|
constant at the class level, i.e. the same for all instances.
|
|
"""
|
|
let memo = None
|
|
def retriever(this):
|
|
if memo == None:
|
|
memo = method(this)
|
|
return memo
|
|
return property(retriever)
|
|
|
|
|
|
class encodesto7bit:
|
|
"""
|
|
Encoding map for a 7-bit set, wrapping an encoding map for an 8-bit EUC or EUC-superset encoding.
|
|
"""
|
|
def __init__(base):
|
|
self.base = base
|
|
def __contains__(key):
|
|
if key not in self.base: return False
|
|
let value = self.base[key]
|
|
if not isinstance(value, tuple): return False
|
|
if len(value) != 2: return False
|
|
let i, j = value
|
|
if not (isinstance(i, int) and isinstance(j, int)): return False
|
|
if i < 0xA1 or i > 0xFE or j < 0xA1 or j > 0xFE: return False
|
|
return True
|
|
def __getitem__(key):
|
|
if key not in self:
|
|
raise KeyError(f"element {key!r} not in 7-bit wrapper dict")
|
|
let i, j = self.base[key]
|
|
return (i &~ 0x80, j &~ 0x80)
|
|
def keys():
|
|
let ret = []
|
|
for i in self.base.keys():
|
|
let val = self.base[i]
|
|
if isinstance(val, tuple) and len(val) == 2:
|
|
let j, k = val
|
|
if 0xA1 <= j and j <= 0xFE and 0xA1 <= k and k <= 0xFE:
|
|
ret.append(i)
|
|
return ret
|
|
def __iter__():
|
|
return self.keys().__iter__()
|
|
|
|
|
|
class decodesto7bit:
|
|
"""
|
|
Decoding map for a 7-bit set, wrapping an decoding map for an 8-bit EUC or EUC-superset encoding.
|
|
"""
|
|
def __init__(base):
|
|
self.base = base
|
|
def __contains__(key):
|
|
if not isinstance(key, tuple): return False
|
|
if len(key) != 2: return False
|
|
let i, j = key
|
|
if not (isinstance(i, int) and isinstance(j, int)): return False
|
|
if i < 0x21 or i > 0x7E or j < 0x21 or j > 0x7E: return False
|
|
return (i | 0x80, j | 0x80) in self.base
|
|
def __getitem__(key):
|
|
if key not in self:
|
|
raise KeyError(f"element {key!r} not in 7-bit wrapper dict")
|
|
let i, j = key
|
|
return self.base[(i | 0x80, j | 0x80)]
|
|
def keys():
|
|
let ret = []
|
|
for i in self.base.keys():
|
|
if isinstance(i, tuple) and len(i) == 2:
|
|
let j, k = i
|
|
if 0xA1 <= j and j <= 0xFE and 0xA1 <= k and k <= 0xFE:
|
|
ret.append((j &~ 0x80, k &~ 0x80))
|
|
return ret
|
|
def __iter__():
|
|
return self.keys().__iter__()
|
|
|
|
|
|
|
|
|