614193b8a1
* Add some docs, and remove second Code page 874 codec (they handled the non-overridden C1 area differently, but we only need one). * More docs work. * Doc stuff. * Adjusted. * More tweaks (table padding is not the docstring's problem). * CSS and docstring tweaks. * Link from modules to parent packages and vice versa. * More documentation. * Docstrings for all `codecs` submodules. * Move encode_jis7_reduced into dbextra_data_7bit (thus completing the lazy startup which was apparently not complete already) and docstrings added to implementations of base class methods referring up to the base class. * Remove FUSE junk that somehow made it into the repo. * Some more docstrings. * Fix some broken references to `string` (rather than `data`) which would have caused a problem if any existing error handler had returned a negative offset (which no current handler does, but it's worth fixing anyway). * Add a cp042 codec to accompany the x-user-defined codec, and to pave the way for maybe adding Adobe Symbol, Zapf Dingbats or Wingdings codecs in future. * Better Japanese Autodetect behaviour for ISO-2022-JP (add yet another condition in which it will be detected, making it able to conclusively detect it prior to end of stream without being fed an entire escape sequence in one call). Also some docs tweaks. * idstr() → _idstr() since it's internal. * Docs for codecs.pifonts. * Docstrings for dbextra. * Document the sbextra classes. * Docstrings for the web encodings. * Possibly a fairer assessment of likely reality. * Docstrings for codecs.binascii * The *encoding* isn't removed (the BOM is). * Make it clearer when competing OEM code pages use different letter layouts. * Fix copied in error. * Stop generating linking to non-existent "← tools" from tools.gendoc. * Move .fuse_hidden* exclusion to my user-level config. * Constrain the table style changes to class .markdownTable, to avoid any effect on other interface tables generated by Doxygen. * Refer to `__ispackage__` when generating help.
867 lines
36 KiB
Python
867 lines
36 KiB
Python
"""Underpinning infrastructure for the codecs module."""
|
|
|
|
from codecs.isweblabel import map_weblabel
|
|
def _idstr(obj):
|
|
let reprd = object.__repr__(obj)
|
|
return reprd.split(" at 0x")[1].split(">")[0]
|
|
|
|
let _encoder_registry = {}
|
|
let _decoder_registry = {}
|
|
def register_kuroko_codec(labels, incremental_encoder_class, incremental_decoder_class):
|
|
"""
|
|
Register a given `IncrementalEncoder` subclass and a given `IncrementalDecoder` subclass
|
|
with a given list of labels. Usually, this is expected to include the encoding name, along
|
|
with a list labels for aliases and/or subsets of the encoding. Either coder class may be `None`,
|
|
if the encoder/decoder labels are being registered asymmetrically.
|
|
"""
|
|
for label in labels:
|
|
let norm = label.replace("_", "-").lower()
|
|
if incremental_encoder_class:
|
|
if not issubclass(incremental_encoder_class, IncrementalEncoder):
|
|
raise ValueError(f"expected IncrementalEncoder subclass, got {incremental_encoder_class!r}")
|
|
if norm in _encoder_registry and _encoder_registry[norm] != incremental_encoder_class:
|
|
raise ValueError(f"label {label!r} already registered")
|
|
let webname = incremental_encoder_class.html5name
|
|
if webname != None and map_weblabel(webname.lower()) == None:
|
|
raise ValueError(f"purported HTML5 name {webname!r} is not an HTML5 label")
|
|
_encoder_registry[norm] = incremental_encoder_class
|
|
if incremental_decoder_class:
|
|
if not issubclass(incremental_decoder_class, IncrementalDecoder):
|
|
raise ValueError(f"expected IncrementalDecoder subclass, got {incremental_decoder_class!r}")
|
|
if norm in _decoder_registry and _decoder_registry[norm] != incremental_decoder_class:
|
|
raise ValueError(f"label {label!r} already registered")
|
|
let webname = incremental_decoder_class.html5name
|
|
if webname != None and map_weblabel(webname.lower()) == None:
|
|
raise ValueError(f"purported HTML5 name {webname!r} is not an HTML5 label")
|
|
_decoder_registry[norm] = incremental_decoder_class
|
|
|
|
class KurokoCodecInfo:
|
|
"""
|
|
Descriptor for the registered encoder and decoder for a given label. Has five members:
|
|
|
|
- `name`: the label covered by this descriptor.
|
|
- `encode`: encode a complete Unicode sequence.
|
|
- `decode`: decode a complete byte sequence.
|
|
- `incrementalencoder`: IncrementalEncoder subclass.
|
|
- `incrementaldecoder`: IncrementalDecoder subclass.
|
|
"""
|
|
def __init__(label, encoder, decoder):
|
|
self.name = label
|
|
self.incrementalencoder = encoder
|
|
self.incrementaldecoder = decoder
|
|
def encode(string, errors="strict"):
|
|
"""
|
|
Encode a complete Unicode sequence to a complete byte string.
|
|
Semantic of name passed to `errors=` is as documented for `lookup_error()`.
|
|
"""
|
|
if self.incrementalencoder:
|
|
return self.incrementalencoder(errors).encode(string, True)
|
|
raise ValueError(f"unrecognised encoding or decode-only encoding: {self.name!r}")
|
|
def decode(data, errors="strict"):
|
|
"""
|
|
Decode a complete byte sequence to a complete Unicode stream.
|
|
Semantic of name passed to `errors=` is as documented for `lookup_error()`.
|
|
"""
|
|
if self.incrementaldecoder:
|
|
return self.incrementaldecoder(errors).decode(data, True)
|
|
raise ValueError(f"unrecognised encoding or encode-only encoding: {self.name!r}")
|
|
def __repr__():
|
|
let ret = "<" + type(self).__name__ + " " + repr(self.name)
|
|
let enc = self.incrementalencoder
|
|
let dec = self.incrementaldecoder
|
|
if enc:
|
|
if enc.name != self.name or (enc.html5name and enc.html5name != self.name):
|
|
ret += "; encoded as " + repr(enc.name)
|
|
else:
|
|
ret += "; with encoder"
|
|
if not enc.html5name:
|
|
ret += " (non-HTML5)"
|
|
else if enc.html5name != enc.name:
|
|
ret += " (HTML5 " + repr(enc.html5name) + ")"
|
|
else:
|
|
ret += "; no encoder"
|
|
if dec:
|
|
if dec.name != self.name or (dec.html5name and dec.html5name != self.name):
|
|
ret += "; decoded as " + repr(dec.name)
|
|
else:
|
|
ret += "; with decoder"
|
|
if not dec.html5name:
|
|
ret += " (non-HTML5)"
|
|
else if dec.html5name != dec.name:
|
|
ret += " (HTML5 " + repr(dec.html5name) + ")"
|
|
else:
|
|
ret += "; no decoder"
|
|
return ret + "; at 0x" + _idstr(self) + ">"
|
|
|
|
def lookup(label, web=False):
|
|
"""
|
|
Obtain a `KurokoCodecInfo` for a given label. If `web=False` (the default), will always succeed,
|
|
but the resulting `KurokoCodecInfo` might be unable to encode and/or unable to decode if the
|
|
label is not recognised in that direction. If `web=True`, will raise KeyError if the label is
|
|
not a WHATWG-permitted label, and will map certain labels to undefined per the WHATWG spec.
|
|
|
|
Can be simply accessed as `codecs.lookup`.
|
|
"""
|
|
let proclabel = label.lower()
|
|
if web:
|
|
proclabel = map_weblabel(label)
|
|
if not proclabel:
|
|
raise KeyError(f"not a web label: {label!r}")
|
|
let enc = None
|
|
let dec = None
|
|
try:
|
|
enc = _encoder_registry[proclabel.replace("_", "-")]
|
|
except KeyError:
|
|
try:
|
|
dec = _decoder_registry[proclabel.replace("_", "-")]
|
|
except KeyError:
|
|
return KurokoCodecInfo(proclabel, enc, dec)
|
|
|
|
def encode(string, label, web=False, errors="strict"):
|
|
"""
|
|
Encode a complete Unicode sequence to a complete byte string in the given encoding. Semantic
|
|
of the web= argument is the same as with `lookup()`. Semantic of name passed to errors= is as
|
|
documented for `lookup_error()`.
|
|
|
|
Can be simply accessed as `codecs.encode`.
|
|
"""
|
|
return lookup(label, web = web).encode(string, errors=errors)
|
|
|
|
def decode(data, label, web=False, errors="strict"):
|
|
"""
|
|
Decode a complete byte sequence in the given encoding to a complete Unicode stream. Semantic
|
|
of the web= argument is the same as with `lookup()`. Semantic of name passed to errors= is as
|
|
documented for `lookup_error()`.
|
|
|
|
Can be simply accessed as `codecs.decode`.
|
|
"""
|
|
return lookup(label, web = web).decode(data, errors=errors)
|
|
|
|
# Constructor is e.g. UnicodeEncodeError(encoding, object, start, end, reason)
|
|
# Wouldn't it be wonderful if Python bloody documented that anywhere (e.g. manual or docstring)?
|
|
# -- Har.
|
|
class UnicodeError(ValueError):
|
|
"""
|
|
Exception raised when an error is encountered or detected in the process of encoding or
|
|
decoding. May instead be passed to a handler when not in strict mode. Contains machine-readable
|
|
information about the error encountered, allowing approaches to respond to it.
|
|
"""
|
|
def __init__(encoding, object, start, end, reason):
|
|
self.encoding = encoding
|
|
self.object = object
|
|
self.start = start
|
|
self.end = end
|
|
self.reason = reason
|
|
def __repr__():
|
|
let c = type(self)
|
|
return f"{c.__name__}({self.encoding!r}, {self.object!r}, {self.start!r}, {self.end!r}, {self.reason!r})"
|
|
def __str__():
|
|
let c = type(self)
|
|
let slice
|
|
if isinstance(self.object, bytes):
|
|
slice = bytes(list(self.object)[self.start:self.end])
|
|
else:
|
|
slice = self.object[self.start:self.end]
|
|
return f"codec for {self.encoding!r} cannot process sequence {slice!r}: {self.reason}"
|
|
|
|
class UnicodeEncodeError(UnicodeError):
|
|
"""
|
|
UnicodeError subclass raised when an error is encountered in the process of encoding.
|
|
"""
|
|
class UnicodeDecodeError(UnicodeError):
|
|
"""
|
|
UnicodeError subclass raised when an error is encountered in the process of decoding.
|
|
"""
|
|
|
|
let _error_registry = {}
|
|
|
|
def register_error(name, handler):
|
|
"""
|
|
Reister a new error handler. The handler should be a function taking a `UnicodeError` and
|
|
either raising an exception or returning a tuple of (substitute, resume_index). The substitute
|
|
should be bytes (usually expected to be in ASCII) for a `UnicodeEncodeError`, str otherwise.
|
|
"""
|
|
_error_registry[name] = handler
|
|
|
|
def lookup_error(name):
|
|
"""
|
|
Look up an error handler function registered with a certain name. By default, the following
|
|
are registered. It is important to note that nothing obligates a codec to actually *use* the
|
|
error handler if it is not deemed possible or appropriate, and so specifying a non-strict
|
|
error handler will not guarantee an exception will not be raised, especially when working with
|
|
a codec which is not a "normal" text encoding (e.g. `undefined` or `inverse-base64`).
|
|
|
|
- `strict`: raise an exception.
|
|
- `ignore`: skip invalid substrings. Not always recommended: can facilitate masked injection.
|
|
- `replace`: insert a replacement character (decoding) or question mark (encoding).
|
|
- `warnreplace`: like `replace` but prints a message to stderr; good for debugging.
|
|
- `backslashreplace`: replace with Python/Kuroko style Unicode escapes. Note that this only
|
|
matches JavaScript escape syntax for Basic Multilingual Plane characters. Encoding only.
|
|
- `xmlcharrefreplace`: replace with HTML/XML numerical entities. Note that this will, per
|
|
WHATWG, never generate entities for Shift Out, Shift In and Escape (i.e. when encoding to a
|
|
stateful encoding which uses them, e.g. ISO-2022-JP), instead generating an entity for the
|
|
replacement character. Encoding only.
|
|
"""
|
|
return _error_registry[name]
|
|
|
|
def strict_errors(exc):
|
|
"""
|
|
Handler for `strict` errors: raise the exception.
|
|
"""
|
|
raise exc
|
|
register_error("strict", strict_errors)
|
|
|
|
def ignore_errors(exc):
|
|
"""
|
|
Handler for `ignore` errors: skip invalid sequences.
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
return (b"", exc.end)
|
|
return ("", exc.end)
|
|
register_error("ignore", ignore_errors)
|
|
|
|
def replace_errors(exc):
|
|
"""
|
|
Handler for `replace` errors: insert replacement character (if decoding) or
|
|
question mark (if encoding).
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
return (b"?", exc.end)
|
|
else if isinstance(exc, UnicodeDecodeError):
|
|
return ("\uFFFD", exc.end)
|
|
else:
|
|
raise TypeError("'replace' handler expected UnicodeEncodeError or UnicodeDecodeError")
|
|
register_error("replace", replace_errors)
|
|
|
|
def warnreplace_errors(exc):
|
|
"""
|
|
Handler for `warnreplace` errors: insert replacement character (if decoding) or question mark
|
|
(if encoding) and print a warning to `stderr`.
|
|
"""
|
|
import fileio
|
|
fileio.stderr.write(type(exc).__name__ + ": " + str(exc) + "\n")
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
return (b"?", exc.end)
|
|
else if isinstance(exc, UnicodeDecodeError):
|
|
return ("\uFFFD", exc.end)
|
|
else:
|
|
raise TypeError("'warnreplace' handler expected UnicodeEncodeError or UnicodeDecodeError")
|
|
register_error("warnreplace", warnreplace_errors)
|
|
|
|
def backslashreplace_errors(exc):
|
|
"""
|
|
Handler for `backslashreplace` errors: replace unencodable character with Python/Kuroko style
|
|
escape sequence. For Basic Multilingual Plane characters, this also matches JavaScript; beyond
|
|
that, they differ.
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
# Work around str.format not supporting format specifiers
|
|
let myhex = hex(ord(exc.object[exc.start])).split("x", 1)[1]
|
|
let outhex
|
|
if len(myhex) <= 2:
|
|
outhex = "\\x" + ("0" * (2 - len(myhex))) + myhex
|
|
else if len(myhex) <= 4:
|
|
outhex = "\\u" + ("0" * (4 - len(myhex))) + myhex
|
|
else:
|
|
outhex = "\\U" + ("0" * (8 - len(myhex))) + myhex
|
|
return (outhex.encode(), exc.end)
|
|
else:
|
|
raise TypeError("'backslashreplace' handler is only for encoding")
|
|
register_error("backslashreplace", backslashreplace_errors)
|
|
|
|
def xmlcharrefreplace_errors(exc):
|
|
"""
|
|
Handler for `xmlcharrefreplace` errors: replace unencodable character with XML numeric entity
|
|
for the character unless it is Shift Out, Shift In or Escape, in which case insert the XML
|
|
numeric entity for the replacement character (as stipulated by WHATWG for ISO-2022-JP).
|
|
"""
|
|
if isinstance(exc, UnicodeEncodeError):
|
|
let codepoint = ord(exc.object[exc.start])
|
|
# Per WHATWG (specified in its ISO-2022-JP encoder, the only one that
|
|
# generates encoding errors for these three control codes):
|
|
if codepoint in (0x0E, 0x0F, 0x1B): return (b"�", exc.end)
|
|
return (b"&#" + str(codepoint).encode() + b";", exc.end)
|
|
else:
|
|
raise TypeError("'xmlcharrefreplace' handler is only for encoding")
|
|
register_error("xmlcharrefreplace", xmlcharrefreplace_errors)
|
|
|
|
class ByteCatenator:
|
|
"""
|
|
Helper class for maintaining a stream to which `bytes` objects will be repeatedly catenated
|
|
in place.
|
|
"""
|
|
def __init__():
|
|
self.list = []
|
|
def add(data):
|
|
self.list.append(data)
|
|
def getvalue():
|
|
return b"".join(self.list)
|
|
|
|
class StringCatenator:
|
|
"""
|
|
Helper class for maintaining a stream to which `str` objects will be repeatedly catenated
|
|
in place.
|
|
"""
|
|
def __init__():
|
|
self.list = []
|
|
def add(string):
|
|
self.list.append(string)
|
|
def getvalue():
|
|
return "".join(self.list)
|
|
|
|
class IncrementalEncoder:
|
|
"""
|
|
Incremental encoder, allowing more encoded data to be generated as more Unicode data is
|
|
obtained. Note that the return values from `encode` are not guaranteed to encompass all data
|
|
which has been passed in, until it is called with `final=True`.
|
|
|
|
This is the base class and should not be instantiated directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
def __init__(errors):
|
|
self.errors = errors
|
|
self.reset()
|
|
def __repr__():
|
|
let c = type(self)
|
|
let w = "(non-HTML5)"
|
|
if self.html5name:
|
|
w = f"(HTML5 {self.html5name!r})"
|
|
let addr = _idstr(self)
|
|
return f"<{c.__name__} instance: encoder for {self.name!r} {w} at 0x{addr}>"
|
|
def encode(string, final = False):
|
|
"""
|
|
Passes the given string in to the encoder, and returns a sequence of bytes. When
|
|
final=False, the return value might not represent the entire input (some of which may
|
|
become represented at the start of the value returned by the next call). When final=True,
|
|
all of the input will be represented, and any final state change sequence required by the
|
|
encoding will be outputted.
|
|
"""
|
|
raise NotImplementedError("must be implemented by subclass")
|
|
def reset():
|
|
"""
|
|
Reset encoder to initial state, without outputting, discarding any pending data.
|
|
"""
|
|
pass
|
|
def getstate():
|
|
"""
|
|
Returns an arbitrary object encapsulating encoder state.
|
|
"""
|
|
pass
|
|
def setstate(state):
|
|
"""
|
|
Sets encoder state to one previously returned by getstate().
|
|
"""
|
|
pass
|
|
|
|
class IncrementalDecoder:
|
|
"""
|
|
Incremental decoder, allowing more Unicode data to be generated as more encoded data is
|
|
obtained. Note that the return values from `decode` are not guaranteed to encompass all data
|
|
which has been passed in, until it is called with `final=True`.
|
|
|
|
This is the base class and should not be instantiated directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
def __init__(errors):
|
|
self.errors = errors
|
|
self.reset()
|
|
def __repr__():
|
|
let c = type(self)
|
|
let w = "(non-HTML5)"
|
|
if self.html5name:
|
|
w = f"(HTML5 {self.html5name!r})"
|
|
let addr = _idstr(self)
|
|
return f"<{c.__name__} instance: decoder for {self.name!r} {w} at 0x{addr}>"
|
|
def decode(data_in, final = False):
|
|
"""
|
|
Passes the given bytes in to the encoder, and returns a Unicode string. When
|
|
final=False, the return value might not represent the entire input (some of which may
|
|
become represented at the start of the value returned by the next call). When final=True,
|
|
all of the input will be represented, and an error will be generated if it is truncated.
|
|
"""
|
|
raise NotImplementedError("must be implemented by subclass")
|
|
def _handle_truncation(out, unused, final, data, offset, leader):
|
|
"""
|
|
Helper function used by subclasses to handle any pending data when returning from `decode`.
|
|
"""
|
|
if len(leader) == 0:
|
|
return out.getvalue()
|
|
else if final:
|
|
let error = UnicodeDecodeError(self.name, data, offset - len(leader), offset, "truncated sequence")
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
return out.getvalue()
|
|
else:
|
|
self.pending = bytes(leader)
|
|
return out.getvalue()
|
|
def reset():
|
|
"""
|
|
Reset decoder to initial state, without outputting, discarding any pending data.
|
|
"""
|
|
self.pending = b""
|
|
def getstate():
|
|
"""
|
|
Returns an arbitrary object encapsulating decoder state.
|
|
"""
|
|
return self.pending
|
|
def setstate(state):
|
|
"""
|
|
Sets decoder state to one previously returned by getstate().
|
|
"""
|
|
self.pending = state
|
|
|
|
class AsciiIncrementalEncoder(IncrementalEncoder):
|
|
"""
|
|
Encoder for ISO/IEC 4873-DV, and base class for simple _sensu lato_ extended ASCII encoders.
|
|
Encoders for more complex cases, such as ISO-2022-JP, do not inherit from this class.
|
|
|
|
ISO/IEC 4873-DV is, as of the current (third) edition of ISO/IEC 4873, the same as what
|
|
people usually mean when they say "ASCII" (i.e. an eighth bit exists but is never used, and
|
|
backspace composition is not a thing which exists for encoding characters).
|
|
"""
|
|
# The obvious labels for ASCII are all Windows-1252 per WHATWG. Also, what people call
|
|
# "ASCII" in 8-bit-byte contexts (without backspace combining) is properly ISO-4873-DV.
|
|
name = "ecma-43-dv"
|
|
html5name = None
|
|
# For non-ASCII characters (this should work as a base class)
|
|
encoding_map = {}
|
|
ascii_exceptions = ()
|
|
#
|
|
_lead_codes = None
|
|
pending_lead = None
|
|
def __init__(errors):
|
|
IncrementalEncoder.__init__(self, errors)
|
|
self._lead_codes = {}
|
|
for i in self.encoding_map.keys():
|
|
if isinstance(i, tuple):
|
|
self._lead_codes.setdefault(i[0], []).append(i)
|
|
def encode(string_in, final = False):
|
|
"""Implements `IncrementalEncoder.encode`"""
|
|
let string = self.pending_lead + string_in
|
|
self.pending_lead = ""
|
|
let out = ByteCatenator()
|
|
let offset = 0
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(string):
|
|
return out.getvalue()
|
|
let i = string[offset]
|
|
if ord(i) in self._lead_codes:
|
|
let seqs = self._lead_codes[ord(i)]
|
|
let max_length = max([len(j) for j in seqs])
|
|
let string_bit = [ord(i) for i in string[offset:(offset + max_length)]]
|
|
let testable_length = len(string_bit)
|
|
for seq in seqs:
|
|
# TODO: where one mapped multi-codepoint sequence starts with another mapped
|
|
# multi-codepoint sequence is still pathological.
|
|
if tupleOf(*string_bit[:len(seq)]) == seq:
|
|
out.add(bytes(self.encoding_map[seq]))
|
|
offset += len(seq)
|
|
if offset >= len(string):
|
|
return out.getvalue()
|
|
i = string[offset]
|
|
break
|
|
else if (not final) and (tupleOf(*string_bit) ==
|
|
tupleOf(*list(seq)[:testable_length])):
|
|
self.pending_lead = "".join([chr(i) for i in string_bit])
|
|
return out.getvalue()
|
|
if ord(i) < 0x80 and ord(i) not in self.ascii_exceptions:
|
|
out.add(bytes([ord(i)]))
|
|
offset += 1
|
|
else if ord(i) in self.encoding_map:
|
|
let target = self.encoding_map[ord(i)]
|
|
if isinstance(target, tuple):
|
|
for individ in target:
|
|
out.add(bytes([individ]))
|
|
else:
|
|
out.add(bytes([target]))
|
|
offset += 1
|
|
else:
|
|
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
|
"character not supported by target encoding")
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(string)
|
|
def reset():
|
|
"""Implements `IncrementalEncoder.reset`"""
|
|
self.pending_lead = ""
|
|
def getstate():
|
|
"""Implements `IncrementalEncoder.getstate`"""
|
|
return self.pending_lead
|
|
def setstate(state):
|
|
"""Implements `IncrementalEncoder.setstate`"""
|
|
self.pending_lead = state
|
|
|
|
class AsciiIncrementalDecoder(IncrementalDecoder):
|
|
"""
|
|
Decoder for ISO/IEC 4873-DV, and base class for simple _sensu lato_ extended ASCII decoders.
|
|
Decoders for more complex cases, such as ISO-2022-JP, do not inherit from this class.
|
|
|
|
ISO/IEC 4873-DV is, as of the current (third) edition of ISO/IEC 4873, the same as what
|
|
people usually mean when they say "ASCII" (i.e. an eighth bit exists but is never used, and
|
|
backspace composition is not a thing which exists for encoding characters).
|
|
"""
|
|
name = "ecma-43-dv"
|
|
html5name = None
|
|
# For non-ASCII characters (this should work as a base class)
|
|
decoding_map = {}
|
|
dbrange = ()
|
|
tbrange = ()
|
|
trailrange = ()
|
|
ascii_exceptions = ()
|
|
def decode(data_in, final = False):
|
|
"""Implements `IncrementalDecoder.decode`"""
|
|
let data = self.pending + data_in
|
|
self.pending = b""
|
|
let out = StringCatenator()
|
|
let offset = 0
|
|
let leader = []
|
|
let bytemode = 1
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(data):
|
|
return self._handle_truncation(out, bytemode, final, data, offset, leader)
|
|
let i = data[offset]
|
|
if bytemode == 1 and i < 0x80 and i not in self.ascii_exceptions:
|
|
out.add(chr(i))
|
|
offset += 1
|
|
else if bytemode == 1 and i in self.dbrange:
|
|
bytemode = 2
|
|
leader.append(i)
|
|
offset += 1
|
|
else if bytemode == 1 and i in self.tbrange:
|
|
bytemode = 3
|
|
leader.append(i)
|
|
offset += 1
|
|
else if bytemode == 3 and len(leader) == 1 and i in self.trailrange:
|
|
leader.append(i)
|
|
offset += 1
|
|
else if bytemode == 1 and i in self.decoding_map:
|
|
out.add(chr(self.decoding_map[i]))
|
|
offset += 1
|
|
else if bytemode == 2 and (leader[0], i) in self.decoding_map:
|
|
let decoded = self.decoding_map[(leader[0], i)]
|
|
if isinstance(decoded, tuple):
|
|
for codepoint in decoded:
|
|
out.add(chr(codepoint))
|
|
else:
|
|
out.add(chr(decoded))
|
|
offset += 1
|
|
bytemode = 1
|
|
leader = []
|
|
else if bytemode == 3 and (leader[0], leader[1], i) in self.decoding_map:
|
|
out.add(chr(self.decoding_map[(leader[0], leader[1], i)]))
|
|
offset += 1
|
|
bytemode = 1
|
|
leader = []
|
|
else:
|
|
let errorstart = offset - len(leader)
|
|
let errorend = errorstart + bytemode
|
|
# Note: per WHATWG behaviour, if an invalid multi-byte code contains an ASCII byte,
|
|
# parsing shall resume at that byte. Also doing so for bytes outside of the
|
|
# trail byte range is technically a deviation from WHATWG, but seems sensible.
|
|
if bytemode > 1:
|
|
if len(leader) > 1 and leader[1] < 0x80:
|
|
errorend -= 2
|
|
else if i not in self.trailrange or i < 0x80:
|
|
errorend -= 1
|
|
let reason = "invalid sequence"
|
|
if bytemode == 1:
|
|
reason = "invalid byte"
|
|
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
|
|
bytemode = 1
|
|
leader = []
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(data)
|
|
|
|
register_kuroko_codec(["ecma-43-dv", "iso-4873-dv", "646", "cp367", "ibm367", "iso646-us",
|
|
"iso-646.irv-1991", "iso-ir-6", "us", "csascii"],
|
|
AsciiIncrementalEncoder, AsciiIncrementalDecoder)
|
|
|
|
class BaseEbcdicIncrementalEncoder(IncrementalEncoder):
|
|
"""
|
|
Base class for EBCDIC encoders.
|
|
|
|
On its own, it is only capable of encoding `U+3000` (as ``x'0E', x'40', x'40', x'0F'``); hence,
|
|
it should not, generally speaking, be used directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
sbcs_encode = {}
|
|
dbcshost_encode = {}
|
|
sbcsge_encode = {}
|
|
shift_to_dbcs = 0x0E
|
|
shift_to_sbcs = 0x0F
|
|
def encode(string, final = False):
|
|
"""Implements `IncrementalEncoder.encode`"""
|
|
let out = ByteCatenator()
|
|
let offset = 0
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(string):
|
|
if final and self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_sbcs]))
|
|
self.in_dbcshost = False
|
|
return out.getvalue()
|
|
let i = string[offset]
|
|
if ord(i) in self.sbcs_encode and self.sbcs_encode[ord(i)] not in (
|
|
self.shift_to_dbcs, self.shift_to_sbcs, 0x08):
|
|
if self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_sbcs]))
|
|
self.in_dbcshost = False
|
|
out.add(bytes([self.sbcs_encode[ord(i)]]))
|
|
offset += 1
|
|
else if ord(i) in self.sbcsge_encode:
|
|
out.add(b"\x08")
|
|
out.add(bytes([self.sbcsge_encode[ord(i)]]))
|
|
offset += 1
|
|
else if ord(i) in self.dbcshost_encode:
|
|
if not self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_dbcs]))
|
|
self.in_dbcshost = True
|
|
let target = self.dbcshost_encode[ord(i)]
|
|
for individ in target:
|
|
out.add(bytes([individ]))
|
|
offset += 1
|
|
else if ord(i) == 0x3000:
|
|
if not self.in_dbcshost:
|
|
out.add(bytes([self.shift_to_dbcs]))
|
|
self.in_dbcshost = True
|
|
out.add(b"\x40\x40")
|
|
offset += 1
|
|
else:
|
|
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
|
"character not supported by target encoding")
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(encode(errorret[0].decode(), self.name, errors="strict"))
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(string)
|
|
def reset():
|
|
"""Implements `IncrementalEncoder.reset`"""
|
|
self.in_dbcshost = False
|
|
def getstate():
|
|
"""Implements `IncrementalEncoder.getstate`"""
|
|
return self.in_dbcshost
|
|
def setstate(state):
|
|
"""Implements `IncrementalEncoder.setstate`"""
|
|
self.in_dbcshost = state
|
|
|
|
class BaseEbcdicIncrementalDecoder(IncrementalDecoder):
|
|
"""
|
|
Base class for EBCDIC decoders.
|
|
|
|
On its own, it is only capable of decoding `U+3000` (from ``x'0E', x'40', x'40', x'0F'``); hence,
|
|
it should not, generally speaking, be used directly.
|
|
"""
|
|
name = None
|
|
html5name = None
|
|
sbcs_decode = {}
|
|
dbcshost_decode = {}
|
|
sbcsge_decode = {}
|
|
shift_to_dbcs = 0x0E
|
|
shift_to_sbcs = 0x0F
|
|
def decode(data_in, final = False):
|
|
"""Implements `IncrementalDecoder.decode`"""
|
|
let data = self.pending + data_in
|
|
self.pending = b""
|
|
let out = StringCatenator()
|
|
let offset = 0
|
|
let leader = []
|
|
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
|
if offset >= len(data):
|
|
return self._handle_truncation(out, None, final, data, offset, leader)
|
|
let i = data[offset]
|
|
if i == self.shift_to_sbcs and not leader:
|
|
self.in_dbcshost = False
|
|
offset += 1
|
|
else if i == self.shift_to_dbcs and not leader:
|
|
self.in_dbcshost = True
|
|
offset += 1
|
|
else if not self.in_dbcshost and not leader and i in self.sbcs_decode:
|
|
out.add(chr(self.sbcs_decode[i]))
|
|
offset += 1
|
|
else if not leader and i == 0x08:
|
|
leader.append(i)
|
|
offset += 1
|
|
else if leader and leader[0] == 0x08 and i in self.sbcsge_decode:
|
|
out.add(chr(self.sbcsge_decode[i]))
|
|
leader = []
|
|
offset += 1
|
|
else if self.in_dbcshost and not leader and (i < 0x40 or i == 0xFF):
|
|
out.add(chr(self.sbcs_decode[i]))
|
|
offset += 1
|
|
else if self.in_dbcshost and not leader and i == 0x40:
|
|
leader.append(i)
|
|
offset += 1
|
|
else if self.in_dbcshost and leader and leader[0] == 0x40:
|
|
if i == 0x40:
|
|
out.add("\u3000")
|
|
leader = []
|
|
offset += 1
|
|
else:
|
|
# Note: this is a leniency (unpaired 0x40 in DBCS-Host is not valid)
|
|
out.add(" ")
|
|
leader = []
|
|
continue # i.e. without incrementing offset
|
|
else if self.in_dbcshost and not leader and 0x41 <= i and i <= 0xFE:
|
|
offset += 1
|
|
leader.append(i)
|
|
else if self.in_dbcshost and leader and 0x41 <= i and i <= 0xFE and (
|
|
leader[0], i) in self.dbcshost_decode:
|
|
out.add(chr(self.dbcshost_decode[(leader[0], i)]))
|
|
leader = []
|
|
offset += 1
|
|
else:
|
|
let errorstart
|
|
let errorend
|
|
if leader:
|
|
errorstart = offset - len(leader)
|
|
if 0x41 <= leader[0] and leader[0] <= 0xFE and 0x41 <= i and i <= 0xFE:
|
|
errorend = offset + 1
|
|
else:
|
|
errorend = errorstart + 1
|
|
else:
|
|
errorstart = offset
|
|
errorend = offset + 1
|
|
let reason = "invalid sequence"
|
|
if not leader:
|
|
reason = "invalid byte"
|
|
else if self.in_dbcshost and leader and not (0x41 <= i and i <= 0xFE):
|
|
reason = "truncated sequence (lead byte not followed by trail byte)"
|
|
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
|
|
leader = []
|
|
let errorret = lookup_error(self.errors)(error)
|
|
out.add(errorret[0])
|
|
offset = errorret[1]
|
|
if offset < 0:
|
|
offset += len(data)
|
|
def reset():
|
|
"""Implements `IncrementalDecoder.reset`"""
|
|
self.pending = b""
|
|
self.in_dbcshost = False
|
|
def getstate():
|
|
"""Implements `IncrementalDecoder.getstate`"""
|
|
return (self.pending, self.in_dbcshost)
|
|
def setstate(state):
|
|
"""Implements `IncrementalDecoder.setstate`"""
|
|
self.pending = state[0]
|
|
self.in_dbcshost = state[1]
|
|
|
|
class UndefinedIncrementalEncoder(IncrementalEncoder):
|
|
"""
|
|
Encoder which errors out on all input. For use on input for which encoding should not be
|
|
attempted. Error handler is ignored.
|
|
"""
|
|
name = "undefined"
|
|
html5name = "replacement"
|
|
# WHATWG doesn't specify an encoder for "replacement" so follow Python "undefined" here.
|
|
# i.e. ignore the errors specifier and always use strict, and fail on even empty strings.
|
|
def __init__(errors):
|
|
def encode(string, final = False):
|
|
let error = UnicodeEncodeError(self.name, string, 0, len(string), "undefined encoding")
|
|
strict_errors(error)
|
|
|
|
class UndefinedIncrementalDecoder(IncrementalDecoder):
|
|
"""
|
|
Decoder which errors out on all input. For use on input for which decoding should not be
|
|
attempted. Error handler is honoured, and called once per non-empty `decode` method call.
|
|
"""
|
|
name = "undefined"
|
|
html5name = "replacement"
|
|
def decode(data, final = False):
|
|
if len(data) == 0:
|
|
return "" # per WHATWG, contra Python
|
|
let error = UnicodeDecodeError(self.name, data, 0, len(data), "undefined encoding")
|
|
let errorret = lookup_error(self.errors)(error) # per WHATWG, contra Python
|
|
return errorret[0]
|
|
|
|
register_kuroko_codec(
|
|
["undefined", "replacement"],
|
|
UndefinedIncrementalEncoder,
|
|
UndefinedIncrementalDecoder)
|
|
|
|
|
|
def lazy_property(method):
|
|
"""
|
|
Like property(…), but memoises the value returned. The return value is assumed to be
|
|
constant at the class level, i.e. the same for all instances.
|
|
"""
|
|
let memo = None
|
|
def retriever(this):
|
|
if memo == None:
|
|
memo = method(this)
|
|
return memo
|
|
return property(retriever)
|
|
|
|
|
|
class encodesto7bit:
|
|
"""
|
|
Encoding map for a 7-bit set, wrapping an encoding map for an 8-bit EUC or EUC-superset encoding.
|
|
"""
|
|
def __init__(base):
|
|
self.base = base
|
|
def __contains__(key):
|
|
if key not in self.base: return False
|
|
let value = self.base[key]
|
|
if not isinstance(value, tuple): return False
|
|
if len(value) != 2: return False
|
|
let i, j = value
|
|
if not (isinstance(i, int) and isinstance(j, int)): return False
|
|
if i < 0xA1 or i > 0xFE or j < 0xA1 or j > 0xFE: return False
|
|
return True
|
|
def __getitem__(key):
|
|
if key not in self:
|
|
raise KeyError(f"element {key!r} not in 7-bit wrapper dict")
|
|
let i, j = self.base[key]
|
|
return (i &~ 0x80, j &~ 0x80)
|
|
def keys():
|
|
let ret = []
|
|
for i in self.base.keys():
|
|
let val = self.base[i]
|
|
if isinstance(val, tuple) and len(val) == 2:
|
|
let j, k = val
|
|
if 0xA1 <= j and j <= 0xFE and 0xA1 <= k and k <= 0xFE:
|
|
ret.append(i)
|
|
return ret
|
|
def __iter__():
|
|
return self.keys().__iter__()
|
|
|
|
|
|
class decodesto7bit:
|
|
"""
|
|
Decoding map for a 7-bit set, wrapping an decoding map for an 8-bit EUC or EUC-superset encoding.
|
|
"""
|
|
def __init__(base):
|
|
self.base = base
|
|
def __contains__(key):
|
|
if not isinstance(key, tuple): return False
|
|
if len(key) != 2: return False
|
|
let i, j = key
|
|
if not (isinstance(i, int) and isinstance(j, int)): return False
|
|
if i < 0x21 or i > 0x7E or j < 0x21 or j > 0x7E: return False
|
|
return (i | 0x80, j | 0x80) in self.base
|
|
def __getitem__(key):
|
|
if key not in self:
|
|
raise KeyError(f"element {key!r} not in 7-bit wrapper dict")
|
|
let i, j = key
|
|
return self.base[(i | 0x80, j | 0x80)]
|
|
def keys():
|
|
let ret = []
|
|
for i in self.base.keys():
|
|
if isinstance(i, tuple) and len(i) == 2:
|
|
let j, k = i
|
|
if 0xA1 <= j and j <= 0xFE and 0xA1 <= k and k <= 0xFE:
|
|
ret.append((j &~ 0x80, k &~ 0x80))
|
|
return ret
|
|
def __iter__():
|
|
return self.keys().__iter__()
|
|
|
|
|
|
|
|
|