kuroko/modules/codecs/bespokecodecs.krk
HarJIT f5f314a42d
One fix and one improvement to GB18030: (#15)
— The codec had been failing to decode 0x81308130 to U+0080, even though
it successfully encoded it. Since U+0080 is not used for anything in most
contexts (it's allocated as a control code in the ECMA-35 sense, but
ECMA-48 does not use it) this is unlikely to have hurt anything, but I
have fixed it anyway (it arose from 0 and None being conflated in a
conditional).

— The encoding and decoding of GB18030 four-byte codes now uses binary
search rather than linear search. This significantly improves performance
on four-byte codes, though performance on two-byte codes is unaffected.
2021-08-12 19:17:59 +09:00

866 lines
40 KiB
Python

"""Contains various WHATWG-defined codecs which require dedicated implementations.
Also includes `utf-8-sig` which, while not a WHATWG-specified codec _per se_, is detected,
interpreted and handled by WHATWG BOM tag logic, in preference above any label, before the codec
gets to see it. WHATWG BOM tag logic is not implemented here (it is not always sensible in a
non-browser context); hence, they remain separate codecs."""
from codecs.infrastructure import register_kuroko_codec, ByteCatenator, StringCatenator, UnicodeEncodeError, UnicodeDecodeError, lookup_error, lookup, IncrementalDecoder, IncrementalEncoder, lazy_property
from codecs.dbdata import more_dbdata
class Gb18030IncrementalEncoder(IncrementalEncoder):
"""IncrementalEncoder implementation for GB18030 (Mainland Chinese Unicode format)"""
name = "gb18030"
html5name = "gb18030"
four_byte_codes = True
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
let offset = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
return out.getvalue()
let i = string[offset]
if ord(i) < 0x80:
out.add(bytes([ord(i)]))
offset += 1
else if (not self.four_byte_codes) and (ord(i) == 0x20AC):
out.add(b"\x80")
offset += 1
else if ord(i) in more_dbdata.encode_gbk:
let target = more_dbdata.encode_gbk[ord(i)]
if isinstance(target, tuple):
for individ in target:
out.add(bytes([individ]))
else:
out.add(bytes([target]))
offset += 1
else if (not self.four_byte_codes) or (ord(i) == 0xE5E5):
let reason = ""
if not self.four_byte_codes:
reason = "character not supported in plain GBK mode"
else if ord(i) == 0xE5E5:
reason = "U+E5E5 corresponds to 0xA3A0, which is mapped to U+3000 as legacy"
let error = UnicodeEncodeError(self.name, string, offset, offset + 1, reason)
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
else:
let pointer
if ord(i) == 0xE7C7:
pointer = 7457
else:
let leftindex = 0
let rightindex = len(more_dbdata.gb_surrogate_ranges) - 1
while leftindex != rightindex:
if leftindex == rightindex - 1:
if more_dbdata.gb_surrogate_ranges[rightindex][1] > ord(i):
rightindex = leftindex
else:
leftindex = rightindex
break
let centreindex = (leftindex + rightindex) // 2
if more_dbdata.gb_surrogate_ranges[centreindex][1] > ord(i):
rightindex = centreindex
else:
leftindex = centreindex
pointer = ((ord(i) - more_dbdata.gb_surrogate_ranges[leftindex][1])
+ more_dbdata.gb_surrogate_ranges[rightindex][0])
let running = pointer
let first = 0x81 + (running // (10 * 126 * 10))
running %= 10 * 126 * 10
let second = 0x30 + (running // (10 * 126))
running %= 10 * 126
let third = 0x81 + (running // 10)
let fourth = 0x30 + (running % 10)
out.add(bytes([first, second, third, fourth]))
offset += 1
class GbkIncrementalEncoder(Gb18030IncrementalEncoder):
"""IncrementalEncoder implementation for GBK (Chinese),
extension of GB2312 (Simplified Chinese)"""
name = "gbk"
html5name = "gbk"
four_byte_codes = False
def _get_gbsurrogate_pointer(leader, i):
let ret = (leader[0] - 0x81) * (10 * 126 * 10)
ret += (leader[1] - 0x30) * (10 * 126)
ret += (leader[2] - 0x81) * 10
ret += i - 0x30
if (39419 < ret and ret < 189000) or (ret > 1237575): return None
return ret
class Gb18030IncrementalDecoder(IncrementalDecoder):
"""IncrementalDecoder implementation for GB18030 (Mainland Chinese Unicode),
extension of GB2312 (Simplified Chinese)"""
name = "gb18030"
html5name = "gb18030"
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
let bytemode = 1
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(data):
return self._handle_truncation(out, bytemode, final, data, offset, leader)
let i = data[offset]
if bytemode == 1 and i < 0x80:
out.add(chr(i))
offset += 1
else if bytemode == 1 and (0x81 <= i and i <= 0xFE):
bytemode = 2
leader.append(i)
offset += 1
else if bytemode == 1 and i == 0x80:
out.add("")
offset += 1
else if bytemode == 2 and (leader[0], i) in more_dbdata.decode_gbk:
out.add(chr(more_dbdata.decode_gbk[(leader[0], i)]))
offset += 1
bytemode = 1
leader = []
else if bytemode == 2 and (0x30 <= i and i <= 0x39):
bytemode = 4
leader.append(i)
offset += 1
else if bytemode == 4 and len(leader) == 2 and (0x81 <= i and i <= 0xFE):
leader.append(i)
offset += 1
else if bytemode == 4 and len(leader) == 3 and (_get_gbsurrogate_pointer(leader, i) != None):
let pointer = _get_gbsurrogate_pointer(leader, i)
let codepoint
if pointer == 7457:
codepoint = 0xE7C7
else:
let leftindex = 0
let rightindex = len(more_dbdata.gb_surrogate_ranges) - 1
while leftindex != rightindex:
if leftindex == rightindex - 1:
if more_dbdata.gb_surrogate_ranges[rightindex][0] > pointer:
rightindex = leftindex
else:
leftindex = rightindex
break
let centreindex = (leftindex + rightindex) // 2
if more_dbdata.gb_surrogate_ranges[centreindex][0] > pointer:
rightindex = centreindex
else:
leftindex = centreindex
codepoint = ((pointer - more_dbdata.gb_surrogate_ranges[leftindex][0])
+ more_dbdata.gb_surrogate_ranges[rightindex][1])
out.add(chr(codepoint))
offset += 1
bytemode = 1
leader = []
else:
let errorstart = offset - len(leader)
let errorend = errorstart + bytemode
# Note: per WHATWG behaviour, if an invalid multi-byte code contains an ASCII byte,
# parsing shall resume at that byte.
if bytemode > 1:
if len(leader) > 1:
errorend -= 3
else if i < 0x80:
errorend -= 1
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
"invalid sequence")
bytemode = 1
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(data)
register_kuroko_codec(["gb18030", "gb18030_2000"], Gb18030IncrementalEncoder, Gb18030IncrementalDecoder)
register_kuroko_codec(
["chinese", "csgb2312", "csiso58gb231280", "gb2312", "gb_2312", "gb_2312-80", "936", "cp936", "ms936",
"gbk", "iso-ir-58", "x-gbk", "gb2312_1980", "gb2312_80"],
GbkIncrementalEncoder, Gb18030IncrementalDecoder)
class Iso2022JpIncrementalEncoder(IncrementalEncoder):
"""IncrementalEncoder implementation for ISO-2022-JP (7-bit stateful Japanese JIS)"""
name = "iso-2022-jp"
html5name = "iso-2022-jp"
encodes_sbcs = []
@lazy_property
def encodes_dbcs():
return [None, None, more_dbdata.encode_jis7]
encode_supershift_latin = None
encode_supershift_greek = None
super_shift = False
escs_onebyte = {0: 0x42, 1: 0x4A}
escs_twobyte = {2: 0x42}
# eager: switch to a lower numbered state when possible (à la Python)
# lazy: switch state only when necessary (à la WHATWG)
attitude = "lazy"
def ensure_state_number(state, out):
if self.state == state:
else if state in self.escs_onebyte:
out.add(b"\[(")
out.add(bytes([self.escs_onebyte[state]]))
else if state in self.escs_twobyte:
out.add(b"\[$")
if self.escs_twobyte[state] not in (0x40, 0x41, 0x42):
out.add(b"(")
out.add(bytes([self.escs_twobyte[state]]))
else:
raise ValueError("set to invalid state: " + repr(state))
self.state = state
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
let offset = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
if final:
self.ensure_state_number(0, out)
return out.getvalue()
let i = string[offset]
# Do not pass Shift Out, Shift In or Escape through from data lest this generate
# state changes (SO and SI are used in some ISO-2022-JP variants though not this one)
let is_ascii = ord(i) < 0x80 and i not in ("\x0E\x0F\[")
let is_jiscii = (is_ascii and i not in "\\~") or (i in "¥\u203E")
let is_sets = [is_ascii, is_jiscii]
let try_state = 2
while 1:
if try_state in self.escs_onebyte:
is_sets.append(ord(i) in self.encodes_sbcs[try_state])
else if try_state in self.escs_twobyte:
is_sets.append(ord(i) in self.encodes_dbcs[try_state])
else:
break
try_state += 1
if (not is_sets[self.state]) or (self.attitude == "eager"):
for n, j in enumerate(is_sets):
if j:
self.ensure_state_number(n, out)
break
if not is_sets[self.state]: # i.e. still isn't.
if self.super_shift and ord(i) in self.encode_supershift_latin:
if self.state_greekmode or not self.state_desigsupershift:
out.add(b"\[.A")
self.state_greekmode = False
self.state_desigsupershift = True
out.add(b"\[N")
out.add(bytes([self.encode_supershift_latin[ord(i)]]))
offset += 1
else if self.super_shift and ord(i) in self.encode_supershift_greek:
if not self.state_greekmode:
out.add(b"\[.F")
self.state_greekmode = True
self.state_desigsupershift = True
out.add(b"\[N")
out.add(bytes([self.encode_supershift_greek[ord(i)]]))
offset += 1
else:
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"character not supported by target encoding")
let errorret = lookup_error(self.errors)(error)
self.ensure_state_number(0, out)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
else if self.state in (0, 1):
# By this point we know whether ASCII or JIS-Roman is appropriate and have switched
# to the appropriate one, so we can just treat these the same now.
let j
if i == "¥":
j = "\\"
else if i == "\u203E":
j = "~"
else:
j = i
out.add(bytes([ord(j)]))
offset += 1
else if self.state in self.escs_onebyte:
out.add(bytes([self.encodes_sbcs[self.state][ord(i)]]))
offset += 1
else if self.state in self.escs_twobyte:
out.add(bytes(self.encodes_dbcs[self.state][ord(i)]))
offset += 1
else:
raise RuntimeError("inconsistently configured encoder")
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.state = 0
self.state_greekmode = False
self.state_desigsupershift = False
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return (self.state, self.state_desigsupershift, self.state_greekmode)
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.state = state[0]
self.state_desigsupershift = state[1]
self.state_greekmode = state[2]
class Iso2022JpIncrementalDecoder(IncrementalDecoder):
"""IncrementalDecoder implementation for ISO-2022-JP (7-bit stateful Japanese JIS)"""
name = "iso-2022-jp"
html5name = "iso-2022-jp"
@lazy_property
def decodes_sbcs():
return [None, None, more_dbdata.decode_jis7katakana]
@lazy_property
def decodes_dbcs():
return [None, None, None, more_dbdata.decode_jis7]
decode_shiftout = None
decode_supershift_latin = None
decode_supershift_greek = None
escs_onebyte = {0x42: 0, 0x49: 2, 0x4A: 1}
escs_twobyte = {0x40: 3, 0x42: 3}
two_byte_modes = [3]
new_twobytes = False
shift_out = False
super_shift = False
concat_lenient = False
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
let bytemode = 1
if self.state_set in self.two_byte_modes and not self.state_shiftoutmode:
bytemode = 2
let in_esc = False
while 1: # offset can be arbitrarily changed by the error handler, so not a for
self.scrutinising_inter646 = self.scrutinising_inter646 and self.state_justswitched
if offset >= len(data):
return self._handle_truncation(out, bytemode, final, data, offset, leader)
let i = data[offset]
if i == 0x1B and len(leader) == 0:
in_esc = True
leader.append(i)
offset += 1
else if i == 0x0E and len(leader) == 0 and self.shift_out and not self.state_shiftoutmode:
# state_justswitched is for the HTML5 version which doesn't use Shift Out
self.state_justswitched = False
self.state_shiftoutmode = True
offset += 1
else if i == 0x0F and len(leader) == 0 and self.state_shiftoutmode:
# state_justswitched is for the HTML5 version which doesn't use Shift In
self.state_justswitched = False
self.state_shiftoutmode = False
offset += 1
else if in_esc and len(leader) == 1 and i in (0x24, 0x28):
leader.append(i)
offset += 1
else if in_esc and len(leader) == 1 and i in (0x2E, 0x4E) and self.super_shift:
leader.append(i)
offset += 1
else if in_esc and len(leader) == 2 and leader[1] == 0x2E and i == 0x41:
self.state_greekmode = False
in_esc = False
leader = []
offset += 1
else if in_esc and len(leader) == 2 and leader[1] == 0x4E and (
not self.state_greekmode and i in self.decode_supershift_latin):
out.add(chr(self.decode_supershift_latin[i]))
in_esc = False
leader = []
offset += 1
else if in_esc and len(leader) == 2 and leader[1] == 0x2E and i == 0x46:
self.state_greekmode = True
in_esc = False
leader = []
offset += 1
else if in_esc and len(leader) == 2 and leader[1] == 0x4E and (
self.state_greekmode and i in self.decode_supershift_latin):
out.add(chr(self.decode_supershift_greek[i]))
in_esc = False
leader = []
offset += 1
else if in_esc and len(leader) == 2 and leader[1] == 0x28 and i in self.escs_onebyte:
if self.state_justswitched:
out.add(lookup_error(self.errors)(UnicodeDecodeError(self.name, data,
offset - 5, offset + 1, "no content between two G0 re-designations"))[0])
else if self.state_set == self.escs_onebyte[i]:
# Don't penalise a switch to ASCII at the very start.
if (self.state_set != 0) or self.state_last646seen:
out.add(lookup_error(self.errors)(UnicodeDecodeError(self.name, data,
offset - 2, offset + 1, "G0 re-designation to the same one-byte set"))[0])
else if self.state_set in (0, 1) and self.escs_onebyte[i] in (0, 1) and self.state_last646seen:
self.scrutinising_inter646 = True
self.state_set = self.escs_onebyte[i]
self.state_justswitched = True
in_esc = False
bytemode = 1
leader = []
offset += 1
else if in_esc and len(leader) == 2 and leader[1] == 0x24 and i == 0x28 and self.new_twobytes:
leader.append(i)
offset += 1
else if in_esc and len(leader) in (2, 3) and leader[1] == 0x24 and i in self.escs_twobyte:
if self.state_justswitched and not self.concat_lenient:
# This can break concatenations of two ISO-2022-JP streams and is questionable
# whether it actually secures anything, but WHATWG does not exempt double-
# byte targets from errors on being switched to straight after another
# switch.
# The concat_lenient attribute can be set by subclasses to stop this, but this
# class should be at least as scrutinous as the WHATWG spec.
out.add(lookup_error(self.errors)(UnicodeDecodeError(self.name, data,
offset - 5, offset + 1, "no content between two G0 re-designations"))[0])
# Don't error upon re-designation of the same two-byte set: remember that WHATWG
# treats both \[$@ and \[$B as designating the same set, but not all encoders do
# necessarily (_vide_ the "jis_encoding" subclassing this class). WHATWG does not
# require erroring on nil-effect re-designation at all.
self.state_set = self.escs_twobyte[i]
self.state_justswitched = True
in_esc = False
bytemode = 2
leader = []
offset += 1
else if bytemode == 2 and len(leader) == 0 and not in_esc and not self.state_shiftoutmode:
self.state_justswitched = False
leader.append(i)
offset += 1
else if bytemode == 1 and self.state_set == 0 and i < 0x80 and not in_esc and not self.state_shiftoutmode:
self.state_justswitched = False
if self.scrutinising_inter646:
let unjustif = lambda i: i >= 0x20 and i not in (0x5C, 0x7E, 0x7F)
if unjustif(self.state_last646seen) and unjustif(i):
out.add(lookup_error(self.errors)(UnicodeDecodeError(self.name, data,
offset - 3, offset, "unjustified JIS-Roman → ASCII switch"))[0])
self.scrutinising_inter646 = False
self.state_last646seen = i
out.add(chr(i))
offset += 1
else if bytemode == 1 and self.state_set == 1 and i < 0x80 and not in_esc and not self.state_shiftoutmode:
self.state_justswitched = False
if self.scrutinising_inter646:
let unjustif = lambda i: i >= 0x20 and i not in (0x5C, 0x7E, 0x7F)
if unjustif(self.state_last646seen) and unjustif(i):
out.add(lookup_error(self.errors)(UnicodeDecodeError(self.name, data,
offset - 3, offset, "unjustified ASCII → JIS-Roman switch"))[0])
self.scrutinising_inter646 = False
self.state_last646seen = i
let char = chr(i)
if char == "\\":
char = "¥"
else if char == "~":
char = "\u203E"
out.add(char)
offset += 1
else if self.state_shiftoutmode and not in_esc and i in self.decode_shiftout:
out.add(chr(self.decode_shiftout[i]))
offset += 1
else if bytemode == 1 and self.state_set not in (0, 1) and not in_esc and (
i in self.decodes_sbcs[self.state_set] and not self.state_shiftoutmode):
self.state_justswitched = False
out.add(chr(self.decodes_sbcs[self.state_set][i]))
offset += 1
else if bytemode == 2 and (leader[0], i) in self.decodes_dbcs[self.state_set] and (
not self.state_shiftoutmode and not in_esc):
let decoded = self.decodes_dbcs[self.state_set][(leader[0], i)]
if isinstance(decoded, int) and 0x21 <= decoded and decoded <= 0x7E:
# Never decode double-byte characters straight to ASCII, since this is very
# likely to be used for masking (the characters being maybe either seen as
# fullwidth or passed through as unrecognised extensions by filters).
out.add(chr(decoded + 0xFEE0))
else if isinstance(decoded, tuple):
for individ in decoded:
out.add(chr(individ))
else:
out.add(chr(decoded))
offset += 1
leader = []
else:
self.state_justswitched = False
let errorstart = offset - len(leader)
let errorend
if bytemode == 2 and i != 0x1B:
errorend = errorstart + 2
else:
errorend = errorstart + 1
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
"invalid sequence")
in_esc = False
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(data)
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.pending = b""
self.state_set = 0
self.state_greekmode = False
self.state_shiftoutmode = False
# The following are used to monitor if unnecessary shift sequences are being used to mask
# ASCII characters. Do not be lured into a false sense of security though. Our approach
# expands the WHATWG-specified approach to be a bit more thorough, but it still ignores
# Shift In / Shift Out (which aren't used in the WHATWG version but are accepted by our
# "jis_encoding" subclass) and allows both lazy and eager switches between the 646-sets.
# As a general rule, do not presume to be able to sanitise anything while it is encoded as
# ISO-2022-JP. You have been warned.
# Some further reading: https://www.unicode.org/L2/L2020/20202-empty-iso-2022-jp.pdf
self.state_justswitched = False
self.state_last646seen = None
self.scrutinising_inter646 = False
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.pending, self.state_set, self.state_greekmode, self.state_shiftoutmode,
self.state_justswitched, self.state_last646seen, self.scrutinising_inter646)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.pending = state[0]
self.state_set = state[1]
self.state_greekmode = state[2]
self.state_shiftoutmode = state[3]
self.state_justswitched = state[4]
self.state_last646seen = state[5]
self.scrutinising_inter646 = state[6]
register_kuroko_codec(["iso-2022-jp", "iso2022-jp", "iso2022jp", "csiso2022jp", "cp50220", "cscp50220"],
Iso2022JpIncrementalEncoder, Iso2022JpIncrementalDecoder)
class Utf16IncrementalEncoder(IncrementalEncoder):
"""IncrementalEncoder implementation for UTF-16 with Byte Order Mark"""
name = "utf-16"
html5name = "utf-16"
encoding_map = {}
endian = "little"
include_bom = True
# -1: BOM not yet emitted if applicable
# 0: BOM emitted
state = None
def push_word(word, out):
if self.endian == "little":
out.add(bytes([word & 0xFF, (word >> 8) & 0xFF]))
else if self.endian == "big":
out.add(bytes([(word >> 8) & 0xFF, word & 0xFF]))
else:
raise ValueError("unexpected endian value: " + repr(self.endian))
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
let offset = 0
if self.include_bom and self.state == -1:
self.push_word(0xFEFF, out)
self.state = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
return out.getvalue()
let i = string[offset]
if (ord(i) < 0x10000) and not (0xD800 <= ord(i) and ord(i) < 0xE000):
self.push_word(ord(i), out)
offset += 1
else if ord(i) >= 0x10000:
let astrality = ord(i) - 0x10000
let lead_surrogate = ((astrality >> 10) & 0x3FF) + 0xD800
let trail_surrogate = (astrality & 0x3FF) + 0xDC00
self.push_word(lead_surrogate, out)
self.push_word(trail_surrogate, out)
offset += 1
else: # i.e. trying to encode a surrogate "codepoint"
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"isolated surrogate word")
let errorret = lookup_error(self.errors)(error)
for i in errorret[0]:
self.push_word(i, out)
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.state = -1
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return self.state
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.state = state
class Utf16IncrementalDecoder(IncrementalDecoder):
"""IncrementalDecoder implementation for UTF-16"""
name = "utf-16"
html5name = "utf-16"
force_endian = None # subclass may set to "little" or "big"
# -1: expecting BOM
# 0: LE
# 1: BE
state = None
pending = b""
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
let wordmode = 1
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if (offset + 1) >= len(data):
let leader_bytes = []
for i in leader:
if self.state == 1:
leader_bytes.append((i >> 8) & 0xFF)
leader_bytes.append(i & 0xFF)
else:
leader_bytes.append(i & 0xFF)
leader_bytes.append((i >> 8) & 0xFF)
if offset == (len(data) - 1): # i.e. one isolated byte at the end
leader_bytes.append(data[offset])
return self._handle_truncation(out, None, final, data, offset, leader_bytes)
let i
if self.state != 1:
i = data[offset] | (data[offset + 1] << 8)
else:
i = data[offset + 1] | (data[offset] << 8)
if self.state == -1:
if self.force_endian == "little":
self.state = 0 # keep BOM if endian specified, per Python.
i = data[offset] | (data[offset + 1] << 8)
else if self.force_endian == "big":
self.state = 1
i = data[offset + 1] | (data[offset] << 8)
else if i == 0xFEFF:
self.state = 0
i = None
else if i == 0xFFFE:
self.state = 1
i = None
else:
self.state = 0 # Default to LE, per WHATWG, contra Unicode
if i == None:
offset += 2
else if wordmode == 1 and not (0xD800 <= i and i < 0xE000):
out.add(chr(i))
offset += 2
else if wordmode == 1 and (0xD800 <= i and i < 0xDC00):
leader.append(i)
wordmode = 2
offset += 2
else if wordmode == 2 and (0xDC00 <= i and i < 0xE000):
out.add(chr((((leader[0] & 0x3FF) << 10) | (i & 0x3FF)) + 0x10000))
wordmode = 1
leader = []
offset += 2
else:
let errorstart = offset - (len(leader) * 2)
let errorend = errorstart + 2
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
"isolated surrogate word")
wordmode = 1
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(data)
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.pending = b""
self.state = -1
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.pending, self.state)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.pending = state[0]
self.state = state[1]
class Utf16BeIncrementalEncoder(Utf16IncrementalEncoder):
"""IncrementalEncoder implementation for UTF-16 Big Endian without Byte Order Mark"""
name = "utf-16be"
html5name = "utf-16be"
endian = "big"
include_bom = False
class Utf16BeIncrementalDecoder(Utf16IncrementalDecoder):
"""IncrementalDecoder implementation for UTF-16 Big Endian without Byte Order Mark"""
name = "utf-16be"
html5name = "utf-16be"
force_endian = "big"
class Utf16LeIncrementalEncoder(Utf16IncrementalEncoder):
"""IncrementalEncoder implementation for UTF-16 Little Endian without Byte Order Mark"""
name = "utf-16le"
html5name = "utf-16le"
endian = "little"
include_bom = False
class Utf16LeIncrementalDecoder(Utf16IncrementalDecoder):
"""IncrementalDecoder implementation for UTF-16 Little Endian without Byte Order Mark"""
name = "utf-16le"
html5name = "utf-16le"
force_endian = "little"
# Note: this behaves explicitly differently to WHATWG since WHATWG has the BOM, if present,
# override *any label at all*, and doesn't specify an encoder for UTF-16. So it aliases UTF-16 to
# UTF-16LE, while we take UTF-16 as meaning with BOM and UTF-16LE/BE as without, per Python.
register_kuroko_codec(["utf-16", "utf16", "iso-10646-ucs-2", "ucs-2", "unicode", "csunicode", "u16"],
Utf16IncrementalEncoder, Utf16IncrementalDecoder)
register_kuroko_codec(["utf-16le", "utf-16-le", "unicodefeff", "unicodelittleunmarked"],
Utf16LeIncrementalEncoder, Utf16LeIncrementalDecoder)
register_kuroko_codec(["utf-16be", "utf-16-be", "unicodefffe", "unicodebigunmarked"],
Utf16BeIncrementalEncoder, Utf16BeIncrementalDecoder)
class Utf8IncrementalEncoder(IncrementalEncoder):
"""IncrementalEncoder implementation for UTF-8"""
name = "utf-8"
html5name = "utf-8"
# -1: expecting BOM
# 0: Normal
state = None
include_bom = False
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
# We use UTF-8 natively, so this is fairly simple
let out = ByteCatenator()
if self.include_bom and self.state == -1:
out.add("\uFEFF".encode())
self.state = 0
out.add(string.encode())
return out.getvalue()
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.state = -1
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return self.state
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.state = state
class Utf8IncrementalDecoder(IncrementalDecoder):
"""IncrementalDecoder implementation for UTF-8"""
name = "utf-8"
html5name = "utf-8"
# -1: expecting BOM
# 0: Normal
state = None
remove_bom = False
pending = b""
def _error_handler(error):
return lookup_error(self.errors)(error)
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
# We use UTF-8 natively, so this only validates it and applies the error handler
# (and removes a BOM if remove_bom is set)
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let running_offset = 0
if self.remove_bom and self.state == -1 and len(data) >= 3:
if data[0] == 0xEF and data[1] == 0xBB and data[2] == 0xBF:
running_offset = 3
if data not in (b"\xEF\xBB", b"\xEF"):
self.state = 0
let first_offset = running_offset
let second_offset = running_offset
let countdown = 0
let trail_byte_bottom = 0x80
let trail_byte_top = 0xBF
let dlist = list(data)
let lingering_reason = None
let bolster = 1
while running_offset < len(data):
let is_error = False
let reason = lingering_reason or "byte does not begin valid sequence"
lingering_reason = None
if countdown == 0:
if data[running_offset] < 0x80:
else if 0xC2 <= data[running_offset] and data[running_offset] <= 0xDF:
countdown = 1
else if 0xE0 <= data[running_offset] and data[running_offset] <= 0xEF:
if data[running_offset] == 0xE0:
trail_byte_bottom = 0xA0
lingering_reason = "start of overlong sequence"
else if data[running_offset] == 0xED:
trail_byte_top = 0x9F
lingering_reason = "start of sequence for surrogate code point"
countdown = 2
else if 0xF0 <= data[running_offset] and data[running_offset] <= 0xF4:
if data[running_offset] == 0xF0:
trail_byte_bottom = 0x90
lingering_reason = "start of overlong sequence"
else if data[running_offset] == 0xF4:
trail_byte_top = 0x8F
lingering_reason = "start of sequence beyond Unicode"
countdown = 3
else:
is_error = True
if 0x80 <= data[running_offset] and data[running_offset] <= 0xBF:
reason = "isolated trail byte"
else if data[running_offset] in (0xC0, 0xC1):
reason = "start of overlong sequence"
else if 0xF5 <= data[running_offset] and data[running_offset] <= 0xFD:
reason = "start of sequence beyond Unicode"
else: reason = "invalid lead byte"
else:
if not (trail_byte_bottom <= data[running_offset] and data[running_offset] <= trail_byte_top):
is_error = True
if not (0x80 <= data[running_offset] and data[running_offset] <= 0xBF):
reason = "not followed by trail byte"
bolster -= 1
trail_byte_bottom = 0x80
trail_byte_top = 0xBF
countdown -= 1
running_offset += 1
if is_error:
out.add(bytes(dlist[first_offset:second_offset]).decode())
let error = UnicodeDecodeError(self.name, data, second_offset,
second_offset + bolster, reason)
let errorret = self._error_handler(error)
out.add(errorret[0])
running_offset = errorret[1]
if running_offset < 0:
running_offset += len(data)
countdown = 0
bolster = 1
first_offset = running_offset
second_offset = running_offset
else if countdown == 0:
second_offset = running_offset
bolster = 1
else:
bolster += 1
out.add(bytes(dlist[first_offset:second_offset]).decode())
if second_offset < len(data):
self.pending = bytes(dlist[second_offset:])
return out.getvalue()
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.pending = b""
self.state = -1
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.pending, self.state)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.pending = state[0]
self.state = state[1]
class Utf8SigIncrementalEncoder(Utf8IncrementalEncoder):
"""IncrementalEncoder implementation for UTF-8 with Byte Order Mark"""
name = "utf-8-sig"
html5name = None
include_bom = True
class Utf8SigIncrementalDecoder(Utf8IncrementalDecoder):
"""IncrementalDecoder implementation for UTF-8 with Byte Order Mark"""
name = "utf-8-sig"
html5name = None
remove_bom = True
register_kuroko_codec(["unicode-1-1-utf-8", "unicode11utf8", "unicode20utf8", "utf-8", "utf8",
"x-unicode20utf8", "u8", "utf", "cp65001", "utf8-ucs4", "utf8mb4",
"al32utf8"],
Utf8IncrementalEncoder, Utf8IncrementalDecoder)
register_kuroko_codec(["utf-8-sig", "utf-8-bom"], Utf8SigIncrementalEncoder, Utf8SigIncrementalDecoder)