14db828233
(I'd commented about the heuristic of characters at the start of the plane being rare, but failed to actually implement said heuristic, only having implemented the detection of the high eight bits (which can be expanded to eleven) having to be false.)
1587 lines
62 KiB
Python
1587 lines
62 KiB
Python
"""
|
||
This module includes some additional variable-width or wide encodings not specified by WHATWG.
|
||
|
||
As such, none of the codecs in this module should be used in HTML.
|
||
"""
|
||
|
||
from codecs.dbextra_data_8bit import data_8bit
|
||
from codecs.dbextra_data_7bit import data_7bit
|
||
|
||
from codecs.infrastructure import register_kuroko_codec, ByteCatenator, StringCatenator, UnicodeEncodeError, UnicodeDecodeError, lookup_error, lookup, BaseEbcdicIncrementalEncoder, BaseEbcdicIncrementalDecoder, AsciiIncrementalEncoder, AsciiIncrementalDecoder, IncrementalEncoder, IncrementalDecoder, lazy_property
|
||
from codecs.dbdata import more_dbdata, XEucJpIncrementalDecoder, Big5EtenIncrementalEncoder, Big5HkscsIncrementalDecoder
|
||
from codecs.bespokecodecs import Iso2022JpIncrementalEncoder, Iso2022JpIncrementalDecoder, Utf8IncrementalDecoder, Utf16BeIncrementalEncoder, Utf16BeIncrementalDecoder, Utf8SigIncrementalEncoder
|
||
from collections import xraydict
|
||
|
||
|
||
class Big5NonEtenKanaIncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for Big5 with non-ETEN layout of kana, Cyrillic, list markers.
|
||
|
||
The other ETEN extension section (the one retained by Microsoft's version) is still included.
|
||
|
||
Although this is the kana/Cyrillic/list marker layout included in the UTC's BIG5.TXT, it is the
|
||
less common of the two (most extension schemes for Big5 use the ETEN layout), and has several
|
||
problems (katakana lacks the vowel extender, and Cyrillic lacks several capitals) which the
|
||
ETEN layout does not have. However, this codec corresponds roughly to Python's `big5`, and more
|
||
closely to its (built-in, as opposed to if/when Python aliases it to `mbcs`) `cp950`.
|
||
"""
|
||
name = "big5-nonetenkana"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return xraydict(data_8bit.cp950_no_eudc_encoding_map, data_8bit.encode_big5_nonetenkana)
|
||
|
||
class Big5NonEtenKanaIncrementalDecoder(AsciiIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for Big5 with non-ETEN layout of kana, Cyrillic, list markers.
|
||
|
||
The other ETEN extension section (the one retained by Microsoft's version) is still included.
|
||
|
||
Although this is the kana/Cyrillic/list marker layout included in the UTC's BIG5.TXT, it is the
|
||
less common of the two (most extension schemes for Big5 use the ETEN layout), and has several
|
||
problems (katakana lacks the vowel extender, and Cyrillic lacks several capitals) which the
|
||
ETEN layout does not have. However, this codec corresponds roughly to Python's `big5`, and more
|
||
closely to its (built-in, as opposed to if/when Python aliases it to `mbcs`) `cp950`.
|
||
"""
|
||
name = "big5-nonetenkana"
|
||
html5name = None
|
||
@lazy_property
|
||
def decoding_map():
|
||
return xraydict(data_8bit.cp950_no_eudc_decoding_map, data_8bit.decode_big5_nonetenkana)
|
||
dbrange = Big5HkscsIncrementalDecoder.dbrange
|
||
trailrange = Big5HkscsIncrementalDecoder.trailrange
|
||
|
||
register_kuroko_codec(["big5-nonetenkana", "big5-tw"],
|
||
Big5NonEtenKanaIncrementalEncoder, Big5NonEtenKanaIncrementalDecoder)
|
||
|
||
class XMacChineseTradIncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for Big5 with Apple's additions and reduced lead byte range.
|
||
|
||
The Unicode mappings are partly changed to be closer to Apple's (as opposed to Microsoft's)
|
||
correspondences; however, Microsoft's are retained where following Apple's would have required
|
||
PUA transcoding hints to round-trip.
|
||
"""
|
||
name = "x-mac-chinesetrad"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return xraydict(data_8bit.cp950_no_eudc_encoding_map, {
|
||
0xB7: (0xA1, 0x45),
|
||
0x22EF: (0xA1, 0x4B),
|
||
0x203E: (0xA1, 0xC2),
|
||
0x223C: (0xA1, 0xE3),
|
||
0x2609: (0xA1, 0xF3),
|
||
0xA5: (0xA2, 0x44),
|
||
0xA2: (0xA2, 0x46),
|
||
0xA3: (0xA2, 0x47),
|
||
0xF880: 0x81,
|
||
0xF881: 0x82,
|
||
0xA0: 0xA0,
|
||
0xA9: 0xFD,
|
||
0x2122: 0xFE,
|
||
0x2026: 0xFF,
|
||
})
|
||
|
||
class XMacChineseTradIncrementalDecoder(AsciiIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for Big5 with Apple's additions and reduced lead byte range.
|
||
|
||
The Unicode mappings are partly changed to be closer to Apple's (as opposed to Microsoft's)
|
||
correspondences; however, Microsoft's are retained where following Apple's would have required
|
||
PUA transcoding hints to round-trip.
|
||
"""
|
||
name = "x-mac-chinesetrad"
|
||
html5name = None
|
||
@lazy_property
|
||
def decoding_map():
|
||
return xraydict(data_8bit.cp950_no_eudc_decoding_map, {
|
||
(0xA1, 0x45): 0xB7,
|
||
(0xA1, 0x4B): 0x22EF,
|
||
(0xA1, 0xC2): 0x203E,
|
||
(0xA1, 0xE3): 0x223C,
|
||
(0xA1, 0xF3): 0x2609,
|
||
(0xA2, 0x44): 0xA5,
|
||
(0xA2, 0x46): 0xA2,
|
||
(0xA2, 0x47): 0xA3,
|
||
0x80: 0x5C,
|
||
0x81: 0xF880,
|
||
0x82: 0xF881,
|
||
0xA0: 0xA0,
|
||
0xFD: 0xA9,
|
||
0xFE: 0x2122,
|
||
0xFF: 0x2026,
|
||
})
|
||
dbrange = tupleOf(*range(0xA1, 0xFC + 1))
|
||
trailrange = Big5HkscsIncrementalDecoder.trailrange
|
||
|
||
register_kuroko_codec(["x-mac-chinesetrad", "x-mac-trad-chinese"],
|
||
XMacChineseTradIncrementalEncoder, XMacChineseTradIncrementalDecoder)
|
||
|
||
|
||
class XMacChineseSimpIncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for EUC-CN, Apple version (hence slightly reduced lead byte range).
|
||
|
||
Mappings to more-recently added characters are used for the vertical forms, rather than
|
||
Apple transcoding hints (or GB18030 private use codes).
|
||
"""
|
||
name = "x-mac-chinesesimp"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return xraydict(data_8bit.encode_gb8, {
|
||
0x301C: (0xA1, 0xAB),
|
||
0x22EF: (0xA1, 0xAD),
|
||
0xA2: (0xA1, 0xE9),
|
||
0xA3: (0xA1, 0xEA),
|
||
0x203E: (0xA3, 0xFE),
|
||
0xF880: 0x81,
|
||
0xF881: 0x82,
|
||
0xA0: 0xA0,
|
||
0xA9: 0xFD,
|
||
0x2122: 0xFE,
|
||
0x2026: 0xFF,
|
||
})
|
||
|
||
class XMacChineseSimpIncrementalDecoder(AsciiIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for EUC-CN, Apple version (hence slightly reduced lead byte range).
|
||
|
||
Mappings to more-recently added characters are used for the vertical forms, rather than
|
||
Apple transcoding hints (or GB18030 private use codes).
|
||
"""
|
||
name = "x-mac-chinesesimp"
|
||
html5name = None
|
||
@lazy_property
|
||
def decoding_map():
|
||
return xraydict(data_8bit.decode_gb8, {
|
||
(0xA1, 0xAB): 0x301C,
|
||
(0xA1, 0xAD): 0x22EF,
|
||
(0xA1, 0xE9): 0xA2,
|
||
(0xA1, 0xEA): 0xA3,
|
||
(0xA3, 0xFE): 0x203E,
|
||
0x80: 0xFC,
|
||
0x81: 0xF880,
|
||
0x82: 0xF881,
|
||
0xA0: 0xA0,
|
||
0xFD: 0xA9,
|
||
0xFE: 0x2122,
|
||
0xFF: 0x2026,
|
||
})
|
||
dbrange = tupleOf(*range(0xA1, 0xFC + 1))
|
||
trailrange = tupleOf(*range(0xA1, 0xFE + 1))
|
||
|
||
register_kuroko_codec(["x-mac-chinesesimp", "x-mac-simp-chinese", "euc-cn", "euccn", "eucgb2312-cn"],
|
||
XMacChineseSimpIncrementalEncoder, XMacChineseSimpIncrementalDecoder)
|
||
|
||
|
||
class Cesu8IncrementalEncoder(IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for CESU-8, a deprecated UTF-8-like encoding still used by
|
||
some systems, such as TCL, and still mis-called "utf8" in some places for legacy reasons.
|
||
"""
|
||
name = "cesu-8"
|
||
html5name = None
|
||
# -1: expecting BOM
|
||
# 0: Normal
|
||
state = None
|
||
include_bom = False
|
||
def encode(string, final = False):
|
||
"""Implements `IncrementalEncoder.encode`"""
|
||
let out = ByteCatenator()
|
||
if self.include_bom and self.state == -1:
|
||
out.add("\uFEFF".encode())
|
||
self.state = 0
|
||
let first_offset = 0
|
||
let second_offset = 0
|
||
while second_offset < len(string):
|
||
let codepoint = ord(string[second_offset])
|
||
if 0x10000 <= codepoint and codepoint <= 0x10FFFF:
|
||
out.add(string[first_offset:second_offset].encode())
|
||
let bits_remaining = codepoint - 0x10000
|
||
let sixth = 0x80 | (bits_remaining & 0x3F)
|
||
bits_remaining >>= 6
|
||
let fifth = 0xB0 | (bits_remaining & 0xF)
|
||
bits_remaining >>= 4
|
||
let third = 0x80 | (bits_remaining & 0x3F)
|
||
bits_remaining >>= 6
|
||
let second = 0xA0 | bits_remaining
|
||
out.add(bytes([0xED, second, third, 0xED, fifth, sixth]))
|
||
second_offset += 1
|
||
first_offset = second_offset
|
||
else:
|
||
second_offset += 1
|
||
out.add(string[first_offset:second_offset].encode())
|
||
return out.getvalue()
|
||
def reset():
|
||
"""Implements `IncrementalEncoder.reset`"""
|
||
self.state = -1
|
||
def getstate():
|
||
"""Implements `IncrementalEncoder.getstate`"""
|
||
return self.state
|
||
def setstate(state):
|
||
"""Implements `IncrementalEncoder.setstate`"""
|
||
self.state = state
|
||
|
||
class Cesu8IncrementalDecoder(Utf8IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for CESU-8, a deprecated UTF-8-like encoding still used by
|
||
some systems, such as TCL, and still mis-called "utf8" in some places for legacy reasons.
|
||
"""
|
||
name = "cesu-8"
|
||
html5name = None
|
||
def _error_handler(error):
|
||
# Note: not error.end (which is set after noticing the CESU seq, not at the end of it).
|
||
let after_cesu = error.start + 6
|
||
let maybe_cesu = list(error.object)[error.start:after_cesu]
|
||
if len(maybe_cesu) == 6 and (
|
||
maybe_cesu[0] == 0xED and 0xA0 <= maybe_cesu[1] and maybe_cesu[1] <= 0xAF
|
||
) and (
|
||
maybe_cesu[3] == 0xED and 0xB0 <= maybe_cesu[4] and maybe_cesu[4] <= 0xBF):
|
||
let codepoint = 0
|
||
codepoint |= maybe_cesu[1] & 0xF
|
||
codepoint <<= 6
|
||
codepoint |= maybe_cesu[2] & 0x3F
|
||
codepoint <<= 4
|
||
codepoint |= maybe_cesu[4] & 0xF
|
||
codepoint <<= 6
|
||
codepoint |= maybe_cesu[5] & 0x3F
|
||
codepoint += 0x10000
|
||
return (chr(codepoint), after_cesu)
|
||
elif len(maybe_cesu) >= 2 and maybe_cesu[0] == 0xC0 and maybe_cesu[1] == 0x80:
|
||
# mUTF-8 is a fairly common CESU-8 variant, using the two-byte code for embedded NUL
|
||
return ("\x00", error.start + 2)
|
||
else:
|
||
return lookup_error(self.errors)(error)
|
||
|
||
register_kuroko_codec(["utf8-ucs2", "utf8mb3", "cesu-8", "cesu8"],
|
||
Cesu8IncrementalEncoder, Cesu8IncrementalDecoder)
|
||
|
||
|
||
let _verbatim_utf7 = (
|
||
list(range(ord("A"), ord("Z") + 1)) +
|
||
list(range(ord("a"), ord("z") + 1)) +
|
||
list(range(ord("0"), ord("9") + 1)) + [ord(i) for i in "/-(),.:? \r\n"]
|
||
)
|
||
let _base64_alphabet = (
|
||
list(range(ord("A"), ord("Z") + 1)) +
|
||
list(range(ord("a"), ord("z") + 1)) +
|
||
list(range(ord("0"), ord("9") + 1)) + [ord("+"), ord("/")]
|
||
)
|
||
let _utf7_not_need_hyphen = [ord(i) for i in "(),.:? \r\n"]
|
||
|
||
class Utf7IncrementalEncoder(IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for UTF-7, a largely obsolete (and forbidden in HTML5)
|
||
scheme for mixing ASCII with Base64'd UTF-16BE in e-mail.
|
||
"""
|
||
name = "utf-7"
|
||
html5name = None
|
||
utf16encoder = None
|
||
mode = "ascii"
|
||
pending = []
|
||
def __init__(errors):
|
||
self.utf16encoder = Utf16BeIncrementalEncoder(errors)
|
||
IncrementalEncoder.__init__(self, errors)
|
||
def encode(data, final=False):
|
||
"""Implements `IncrementalEncoder.encode`"""
|
||
let incoming = self.pending + list(self.utf16encoder.encode(data, final=final))
|
||
self.pending = []
|
||
let offset = 0
|
||
let out = ByteCatenator()
|
||
let chunksize = 6 if self.mode == "base64" else 2
|
||
while offset < len(incoming):
|
||
let chunk = incoming[offset:offset + chunksize]
|
||
if len(chunk) < chunksize and not final:
|
||
self.pending = chunk
|
||
return out.getvalue()
|
||
if self.mode == "ascii":
|
||
if chunk[0] or (chunk[1] not in _verbatim_utf7):
|
||
out.add(b"+")
|
||
self.mode = "base64"
|
||
chunksize = 6
|
||
continue
|
||
out.add(bytes([chunk[1]]))
|
||
else:
|
||
if (not chunk[0]) and (chunk[1] in _verbatim_utf7):
|
||
if chunk[1] not in _utf7_not_need_hyphen:
|
||
out.add(b"-")
|
||
self.mode = "ascii"
|
||
chunksize = 2
|
||
continue
|
||
else if len(chunk) >= 4 and (not chunk[2]) and (chunk[3] in _verbatim_utf7):
|
||
chunk = chunk[:2]
|
||
else if len(chunk) == 6 and (not chunk[4]) and (chunk[5] in _verbatim_utf7):
|
||
chunk = chunk[:4]
|
||
out.add(lookup("inverse-base64").decode(bytes(chunk)).rstrip("=").encode())
|
||
offset += len(chunk)
|
||
if final and self.mode == "base64":
|
||
self.mode = "ascii"
|
||
return out.getvalue()
|
||
def reset():
|
||
"""Implements `IncrementalEncoder.reset`"""
|
||
self.utf16encoder.reset()
|
||
self.mode = "ascii"
|
||
def getstate():
|
||
"""Implements `IncrementalEncoder.getstate`"""
|
||
return (self.utf16encoder.getstate(), self.mode, self.pending)
|
||
def setstate(state):
|
||
"""Implements `IncrementalEncoder.setstate`"""
|
||
self.utf16encoder.setstate(state[0])
|
||
self.mode = state[1]
|
||
self.pending = state[2]
|
||
|
||
class Utf7IncrementalDecoder(IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for UTF-7, a largely obsolete (and forbidden in HTML5)
|
||
scheme for mixing ASCII with Base64'd UTF-16BE in e-mail.
|
||
"""
|
||
name = "utf-7"
|
||
html5name = None
|
||
utf16decoder = None
|
||
mode = "ascii"
|
||
pending = []
|
||
def __init__(errors):
|
||
self.utf16decoder = Utf16BeIncrementalDecoder(errors)
|
||
IncrementalDecoder.__init__(self, errors)
|
||
def decode(data_in, final=False):
|
||
"""Implements `IncrementalDecoder.decode`"""
|
||
let data = self.pending + data_in
|
||
self.pending = b""
|
||
let incoming = list(data)
|
||
let offset = 0
|
||
let out = StringCatenator()
|
||
let chunksize = 8 if self.mode in ("base64", "maybebase64") else 1
|
||
while offset < len(incoming):
|
||
let chunk = incoming[offset:offset + chunksize]
|
||
if len(chunk) < chunksize and not final:
|
||
self.pending = bytes(chunk)
|
||
return out.getvalue()
|
||
if self.mode == "ascii":
|
||
if chunk[0] == b"+"[0]:
|
||
self.mode = "maybebase64"
|
||
chunksize = 8
|
||
else:
|
||
out.add(chr(chunk[0]))
|
||
offset += 1
|
||
else:
|
||
if self.mode == "maybebase64":
|
||
if chunk[0] == b"-"[0]:
|
||
out.add("+")
|
||
offset += 1
|
||
self.mode = "ascii"
|
||
chunksize = 1
|
||
continue
|
||
else:
|
||
self.mode = "base64"
|
||
let cutpoint = len(chunk)
|
||
let stride = len(chunk)
|
||
for n, i in enumerate(chunk):
|
||
if i not in _base64_alphabet:
|
||
cutpoint = n
|
||
stride = n if i != b"-"[0] else (n + 1)
|
||
# In preparation for the next iteration, which will be in ASCII mode:
|
||
self.mode = "ascii"
|
||
chunksize = 1
|
||
break
|
||
chunk = chunk[:cutpoint]
|
||
if len(chunk) > 0:
|
||
let padbytes = (4 - (len(chunk) % 4)) % 4
|
||
if padbytes > 2:
|
||
let error = UnicodeDecodeError(self.name, data, offset, offset + cutpoint,
|
||
"truncated Base64 sequence")
|
||
let errorret = lookup_error(self.errors)(error)
|
||
out.add(errorret[0])
|
||
offset = errorret[1]
|
||
continue
|
||
let base64 = bytes(chunk).decode() + ("=" * padbytes)
|
||
let utf16 = lookup("inverse-base64").encode(base64)
|
||
out.add(self.utf16decoder.decode(utf16, final=final))
|
||
offset += stride
|
||
if final and self.mode != "ascii":
|
||
self.mode = "ascii"
|
||
return out.getvalue()
|
||
def reset():
|
||
"""Implements `IncrementalDecoder.reset`"""
|
||
self.utf16decoder.reset()
|
||
self.mode = "ascii"
|
||
self.pending = b""
|
||
def getstate():
|
||
"""Implements `IncrementalDecoder.getstate`"""
|
||
return (self.utf16encoder.getstate(), self.mode, self.pending)
|
||
def setstate(state):
|
||
"""Implements `IncrementalDecoder.setstate`"""
|
||
self.utf16encoder.setstate(state[0])
|
||
self.mode = state[1]
|
||
self.pending = state[2]
|
||
|
||
register_kuroko_codec(["utf-7", "utf7", "u7", "unicode-1-1-utf-7"],
|
||
Utf7IncrementalEncoder, Utf7IncrementalDecoder)
|
||
|
||
|
||
class EucJpFullIncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for EUC-JP, including JIS X 0212.
|
||
"""
|
||
name = "euc-jp-full"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return data_8bit.encode_euc90
|
||
|
||
register_kuroko_codec(["euc-jp-full"],
|
||
EucJpFullIncrementalEncoder, XEucJpIncrementalDecoder)
|
||
|
||
|
||
class EucJis2004IncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for the JIS X 0213 version of EUC-JP.
|
||
"""
|
||
name = "euc-jis-2004"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return data_8bit.encode_euc04
|
||
|
||
class EucJis2004IncrementalDecoder(AsciiIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for the JIS X 0213 version of EUC-JP.
|
||
"""
|
||
name = "euc-jis-2004"
|
||
html5name = None
|
||
@lazy_property
|
||
def decoding_map():
|
||
return data_8bit.decode_euc04
|
||
dbrange = tupleOf(0x8E, *range(0xA1, 0xFE + 1))
|
||
tbrange = (0x8F,)
|
||
trailrange = tupleOf(*range(0xA1, 0xFE + 1))
|
||
|
||
register_kuroko_codec(["euc-jis-2004", "jisx0213", "eucjis2004", "euc_jis2004",
|
||
"euc_jisx0213", "eucjisx0213"],
|
||
EucJis2004IncrementalEncoder, EucJis2004IncrementalDecoder)
|
||
|
||
|
||
class ShiftJis2004IncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for the JIS X 0213 version of Shift_JIS.
|
||
"""
|
||
name = "shift-jis-2004"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return data_8bit.encode_sjis04
|
||
ascii_exceptions = (0x5C, 0x7E)
|
||
|
||
class ShiftJis2004IncrementalDecoder(AsciiIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for the JIS X 0213 version of Shift_JIS.
|
||
"""
|
||
name = "shift-jis-2004"
|
||
html5name = None
|
||
@lazy_property
|
||
def decoding_map():
|
||
return data_8bit.decode_sjis04
|
||
ascii_exceptions = (0x5C, 0x7E)
|
||
dbrange = (129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145,
|
||
146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 224, 225, 226,
|
||
227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243,
|
||
244, 245, 246, 247, 248, 249, 250, 251, 252)
|
||
trailrange = tupleOf(*range(64, 126 + 1), *range(128, 252 + 1))
|
||
|
||
register_kuroko_codec(["shift_jis-2004", "shiftjis2004", "sjis_2004", "s_jis_2004",
|
||
"shift_jisx0213", "shiftjisx0213", "sjisx0213", "s_jisx0213"],
|
||
ShiftJis2004IncrementalEncoder, ShiftJis2004IncrementalDecoder)
|
||
|
||
|
||
class AsciiJohabIncrementalEncoder(AsciiIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for the PC Johab encoding (code page 1361).
|
||
"""
|
||
name = "johab-ascii"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return data_8bit.encode_johab_ascii
|
||
|
||
class AsciiJohabIncrementalDecoder(AsciiIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for the PC Johab encoding (code page 1361).
|
||
"""
|
||
name = "johab-ascii"
|
||
html5name = None
|
||
@lazy_property
|
||
def decoding_map():
|
||
return data_8bit.decode_johab_ascii
|
||
dbrange = tupleOf(*range(0x84, 0xF9 + 1))
|
||
# Trail ranges for hangul and nonhangul are different, but this is their union.
|
||
trailrange = tupleOf(*range(0x31, 0x7E + 1), *range(0x81, 0xFE + 1))
|
||
|
||
register_kuroko_codec(["cp1361", "ms1361", "johab", "x-johab", "johab-ascii"],
|
||
AsciiJohabIncrementalEncoder, AsciiJohabIncrementalDecoder)
|
||
|
||
|
||
class EbcdicJohabIncrementalEncoder(BaseEbcdicIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for code page 1364, a stateful EBCDIC variant of Johab.
|
||
"""
|
||
name = "johab-ebcdic"
|
||
html5name = None
|
||
@lazy_property
|
||
def sbcs_encode():
|
||
return data_8bit.encode_nbyte_ebcdic
|
||
@lazy_property
|
||
def dbcshost_encode():
|
||
return data_8bit.encode_johab_ebcdic
|
||
|
||
class EbcdicJohabIncrementalDecoder(BaseEbcdicIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for code page 1364, a stateful EBCDIC variant of Johab.
|
||
"""
|
||
name = "johab-ebcdic"
|
||
html5name = None
|
||
@lazy_property
|
||
def sbcs_decode():
|
||
return data_8bit.decode_nbyte_ebcdic
|
||
@lazy_property
|
||
def dbcshost_decode():
|
||
return data_8bit.decode_johab_ebcdic
|
||
|
||
register_kuroko_codec(["cp933", "ibm-933", "933", "x-IBM933", "cp1364", "ibm-1364", "x-IBM1364",
|
||
"johab-ebcdic"],
|
||
EbcdicJohabIncrementalEncoder, EbcdicJohabIncrementalDecoder)
|
||
|
||
|
||
class JisEncodingIncrementalEncoder(Iso2022JpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for 7-bit stateful Japanese with all features.
|
||
|
||
This differs from the ISO-2022-JP encoder in that it will:
|
||
|
||
- Encode forms present in 1978 JIS but simplified by (and absent in) 1983 JIS to 1978 JIS.
|
||
- For characters not present in either table, try JIS X 0212, 2000 JIS and 2004 JIS in that order.
|
||
- For characters not present in any JIS set, try GB 2312 and Wansung.
|
||
- Preserve width of katakana.
|
||
"""
|
||
name = "jis_encoding"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes_sbcs():
|
||
return [None, None, data_7bit.encode_jis7katakana]
|
||
@lazy_property
|
||
def encodes_dbcs():
|
||
return [None, None, None,
|
||
more_dbdata.encode_jis7,
|
||
data_7bit.encode_jis78,
|
||
data_7bit.encode_jis90p2,
|
||
data_7bit.encode_jis00,
|
||
data_7bit.encode_jis00p2,
|
||
data_7bit.encode_jis04,
|
||
data_7bit.encode_gb7,
|
||
data_7bit.encode_ksc7]
|
||
@lazy_property
|
||
def encode_supershift_latin():
|
||
return data_7bit.encode_lat1supp
|
||
@lazy_property
|
||
def encode_supershift_greek():
|
||
return data_7bit.encode_greksupp
|
||
super_shift = True
|
||
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
|
||
escs_twobyte = {3: 0x42, 4: 0x40, 5: 0x44, 6: 0x4F, 7: 0x50, 8: 0x51, 9: 0x41, 10: 0x43}
|
||
attitude = "eager"
|
||
|
||
class JisEncodingIncrementalDecoder(Iso2022JpIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for 7-bit stateful Japanese.
|
||
|
||
This is differs from the ISO-2022-JP decoder in that it will:
|
||
|
||
- Decode 1978 JIS with a separate table, including 1978 JIS, NEC extensions and IBM backports.
|
||
- Accept and decode extensions from ISO-2022-JP-2 (and -1), ISO-2022-JP-3 and ISO-2022-JP-2004.
|
||
- Not generate an error for immediately concatenated JIS-Kanji→ASCII→JIS-Kanji designations.
|
||
- Accept katakana via Shift Out / Shift In.
|
||
|
||
This is used as the decoder for all other ISO-2022-JP variants besides plain ISO-2022-JP.
|
||
"""
|
||
name = "jis_encoding"
|
||
html5name = None
|
||
@lazy_property
|
||
def decodes_sbcs():
|
||
return [None, None, more_dbdata.decode_jis7katakana]
|
||
@lazy_property
|
||
def decodes_dbcs():
|
||
return [None, None, None,
|
||
more_dbdata.decode_jis7,
|
||
data_7bit.decode_jis78,
|
||
data_7bit.decode_jis90p2,
|
||
data_7bit.decode_jis00,
|
||
data_7bit.decode_jis00p2,
|
||
data_7bit.decode_jis04,
|
||
data_7bit.decode_gb7,
|
||
data_7bit.decode_ksc7]
|
||
@lazy_property
|
||
def decode_shiftout():
|
||
return more_dbdata.decode_jis7katakana
|
||
@lazy_property
|
||
def decode_supershift_latin():
|
||
return data_7bit.decode_lat1supp
|
||
@lazy_property
|
||
def decode_supershift_greek():
|
||
return data_7bit.decode_greksupp
|
||
# 0x48 is not ASCII or JIS-Roman, but SEN 85 02 00 Annex C. It is however misused for either
|
||
# ASCII or JIS-Roman in some encoders, so it is a "good idea for software to recognise,
|
||
# but not to generate" (—Lunde) it for JIS-Roman when decoding JIS_encoding.
|
||
escs_onebyte = {0x42: 0, 0x48: 1, 0x49: 2, 0x4A: 1}
|
||
escs_twobyte = {0x40: 4, 0x41: 9, 0x42: 3, 0x43: 10, 0x44: 5, 0x4F: 6, 0x50: 7, 0x51: 8}
|
||
two_byte_modes = [3, 4, 5, 6, 7, 8, 9, 10]
|
||
new_twobytes = True
|
||
shift_out = True
|
||
super_shift = True
|
||
concat_lenient = True
|
||
|
||
register_kuroko_codec(["jis_encoding", "csjisencoding", "jis", "jis7"],
|
||
JisEncodingIncrementalEncoder, JisEncodingIncrementalDecoder)
|
||
|
||
|
||
class Iso2022Jp1IncrementalEncoder(Iso2022JpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for 7-bit stateful Japanese with JIS X 0212.
|
||
|
||
This differs from the ISO-2022-JP encoder in that it will encode to JIS X 0212, and does so
|
||
whenever possible (i.e. it will favour it over any web extensions to JIS X 0208).
|
||
"""
|
||
name = "iso-2022-jp-1"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes_sbcs():
|
||
return [None, None]
|
||
@lazy_property
|
||
def encodes_dbcs():
|
||
# Favour JIS X 0212 over any extensions in the web JIS X 0208 table.
|
||
return [None, None, data_7bit.encode_jis90p2, more_dbdata.encode_jis7]
|
||
escs_onebyte = {0: 0x42, 1: 0x4A}
|
||
escs_twobyte = {3: 0x42, 2: 0x44}
|
||
attitude = "eager"
|
||
|
||
register_kuroko_codec(["iso-2022-jp-1", "iso2022-jp-1", "iso2022jp-1"],
|
||
Iso2022Jp1IncrementalEncoder, JisEncodingIncrementalDecoder)
|
||
|
||
|
||
class Iso2022JpExtIncrementalEncoder(Iso2022JpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for 7-bit stateful Japanese.
|
||
|
||
This differs from the ISO-2022-JP-1 encoder in that it preserves katakana width.
|
||
"""
|
||
name = "iso-2022-jp-ext"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes_sbcs():
|
||
return [None, None, data_7bit.encode_jis7katakana]
|
||
@lazy_property
|
||
def encodes_dbcs():
|
||
return [None, None, None, data_7bit.encode_jis90p2, more_dbdata.encode_jis7]
|
||
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
|
||
escs_twobyte = {4: 0x42, 3: 0x44}
|
||
attitude = "eager"
|
||
|
||
register_kuroko_codec(["iso-2022-jp-ext", "iso2022-jp-ext", "iso2022jp-ext"],
|
||
Iso2022JpExtIncrementalEncoder, JisEncodingIncrementalDecoder)
|
||
|
||
|
||
class Iso2022Jp2IncrementalEncoder(Iso2022JpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for 7-bit stateful Japanese with multilingual extensions.
|
||
"""
|
||
name = "iso-2022-jp-2"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes_sbcs():
|
||
return [None, None]
|
||
@lazy_property
|
||
def encodes_dbcs():
|
||
# Favour JIS X 0212 over any extensions in the web JIS X 0208 table.
|
||
return [None, None,
|
||
data_7bit.encode_jis90p2,
|
||
more_dbdata.encode_jis7,
|
||
data_7bit.encode_gb7,
|
||
data_7bit.encode_ksc7]
|
||
@lazy_property
|
||
def encode_supershift_latin():
|
||
return data_7bit.encode_lat1supp
|
||
@lazy_property
|
||
def encode_supershift_greek():
|
||
return data_7bit.encode_greksupp
|
||
super_shift = True
|
||
escs_onebyte = {0: 0x42, 1: 0x4A}
|
||
escs_twobyte = {3: 0x42, 2: 0x44, 4: 0x41, 5: 0x43}
|
||
attitude = "eager"
|
||
|
||
register_kuroko_codec(["iso-2022-jp-2", "iso2022-jp-2", "iso2022jp-2", "csISO2022JP2"],
|
||
Iso2022Jp2IncrementalEncoder, JisEncodingIncrementalDecoder)
|
||
|
||
|
||
class Iso2022Jp3IncrementalEncoder(Iso2022JpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for 7-bit stateful Japanese with JIS X 0213-2000.
|
||
"""
|
||
name = "iso-2022-jp-3"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes_sbcs():
|
||
return [None, None, data_7bit.encode_jis7katakana]
|
||
@lazy_property
|
||
def encodes_dbcs():
|
||
return [None, None, None,
|
||
data_7bit.encode_jis7_reduced,
|
||
data_7bit.encode_jis00,
|
||
data_7bit.encode_jis00p2]
|
||
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
|
||
escs_twobyte = {3: 0x42, 4: 0x4F, 5: 0x50}
|
||
attitude = "eager"
|
||
|
||
register_kuroko_codec(["iso-2022-jp-3", "iso2022-jp-3", "iso2022jp-3"],
|
||
Iso2022Jp3IncrementalEncoder, JisEncodingIncrementalDecoder)
|
||
|
||
|
||
class Iso2022Jp2004IncrementalEncoder(Iso2022JpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for 7-bit stateful Japanese with JIS X 0213-2004.
|
||
"""
|
||
name = "iso-2022-jp-2004"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes_sbcs():
|
||
return [None, None, data_7bit.encode_jis7katakana]
|
||
@lazy_property
|
||
def encodes_dbcs():
|
||
return [None, None, None,
|
||
data_7bit.encode_jis7_reduced,
|
||
data_7bit.encode_jis00p2,
|
||
data_7bit.encode_jis04]
|
||
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
|
||
escs_twobyte = {3: 0x42, 4: 0x50, 5: 0x51}
|
||
attitude = "eager"
|
||
|
||
register_kuroko_codec(["iso-2022-jp-2004", "iso2022-jp-2004", "iso2022jp-2004"],
|
||
Iso2022Jp2004IncrementalEncoder, JisEncodingIncrementalDecoder)
|
||
|
||
|
||
class Utf32IncrementalEncoder(IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for UTF-32 with byte order mark.
|
||
"""
|
||
name = "utf-32"
|
||
html5name = None
|
||
@lazy_property
|
||
def encoding_map():
|
||
return {}
|
||
endian = "little"
|
||
include_bom = True
|
||
# -1: BOM not yet emitted if applicable
|
||
# 0: BOM emitted
|
||
state = None
|
||
def push_word(word, out):
|
||
if self.endian == "little":
|
||
out.add(bytes([word & 0xFF, (word >> 8) & 0xFF, (word >> 16) & 0xFF, (word >> 24) & 0xFF]))
|
||
else if self.endian == "big":
|
||
out.add(bytes([(word >> 24) & 0xFF, (word >> 16) & 0xFF, (word >> 8) & 0xFF, word & 0xFF]))
|
||
else:
|
||
raise ValueError("unexpected endian value: " + repr(self.endian))
|
||
def encode(string, final = False):
|
||
"""Implements `IncrementalEncoder.encode`"""
|
||
let out = ByteCatenator()
|
||
let offset = 0
|
||
if self.include_bom and self.state == -1:
|
||
self.push_word(0xFEFF, out)
|
||
self.state = 0
|
||
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
||
if offset >= len(string):
|
||
return out.getvalue()
|
||
let i = string[offset]
|
||
if not (0xD800 <= ord(i) and ord(i) < 0xE000):
|
||
self.push_word(ord(i), out)
|
||
offset += 1
|
||
else: # i.e. trying to encode a surrogate "codepoint"
|
||
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
||
"surrogate codepoint")
|
||
let errorret = lookup_error(self.errors)(error)
|
||
for i in errorret[0]:
|
||
self.push_word(i, out)
|
||
offset = errorret[1]
|
||
if offset < 0:
|
||
offset += len(string)
|
||
def reset():
|
||
"""Implements `IncrementalEncoder.reset`"""
|
||
self.state = -1
|
||
def getstate():
|
||
"""Implements `IncrementalEncoder.getstate`"""
|
||
return self.state
|
||
def setstate(state):
|
||
"""Implements `IncrementalEncoder.setstate`"""
|
||
self.state = state
|
||
|
||
class Utf32IncrementalDecoder(IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for UTF-32, detected byte order, removing any byte order mark.
|
||
"""
|
||
name = "utf-32"
|
||
html5name = None
|
||
force_endian = None # subclass may set to "little" or "big"
|
||
# -1: expecting BOM
|
||
# 0: LE
|
||
# 1: BE
|
||
state = None
|
||
pending = b""
|
||
def decode(data_in, final = False):
|
||
"""Implements `IncrementalDecoder.decode`"""
|
||
let data = self.pending + data_in
|
||
self.pending = b""
|
||
let out = StringCatenator()
|
||
let offset = 0
|
||
let leader = []
|
||
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
||
if (offset + 3) >= len(data):
|
||
let leader_bytes = []
|
||
for i in leader:
|
||
if self.state == 1:
|
||
leader_bytes.append((i >> 8) & 0xFF)
|
||
leader_bytes.append(i & 0xFF)
|
||
else:
|
||
leader_bytes.append(i & 0xFF)
|
||
leader_bytes.append((i >> 8) & 0xFF)
|
||
if offset < len(data): # i.e. one to three isolated bytes at the end
|
||
leader_bytes.extend(list(data)[offset:])
|
||
return self._handle_truncation(out, None, final, data, offset, leader_bytes)
|
||
let i
|
||
if self.state != 1:
|
||
i = data[offset] | (data[offset + 1] << 8) | (data[offset + 2] << 16) | (data[offset + 3] << 24)
|
||
else:
|
||
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
|
||
if self.state == -1:
|
||
if self.force_endian == "little":
|
||
self.state = 0 # keep BOM if endian specified, per Python.
|
||
i = data[offset] | (data[offset + 1] << 8) | (data[offset + 2] << 16) | (data[offset + 3] << 24)
|
||
else if self.force_endian == "big":
|
||
self.state = 1
|
||
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
|
||
else if i == 0xFEFF:
|
||
self.state = 0
|
||
i = None
|
||
else if i == 0xFFFE0000:
|
||
self.state = 1
|
||
i = None
|
||
else if i & 0xFFE00000:
|
||
# UTF-32's highest eleven bits will never be used, so if they have a value it's
|
||
# obviously the other endian.
|
||
self.state = 1
|
||
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
|
||
else if not i & 0xFFFF:
|
||
# More likely to the the other endian than the first character in a plane (null,
|
||
# a Linear B character, two rare Chinese characters and two PUA characters).
|
||
self.state = 1
|
||
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
|
||
else:
|
||
# Default to LE, to be consistent with our (WHATWG-influenced) UTF-16 handling.
|
||
# Note that except in the relatively unlikely event of the stream starting with
|
||
# the first character in a plane, the previous clause would have detected
|
||
# UTF-32BE already though.
|
||
self.state = 0
|
||
if i == None:
|
||
offset += 4
|
||
else if not (0xD800 <= i and i < 0xE000) and (i < 0x110000):
|
||
out.add(chr(i))
|
||
offset += 4
|
||
else:
|
||
let errorstart = offset - (len(leader) * 2)
|
||
let errorend = errorstart + 4
|
||
let reason
|
||
if i > 0x110000:
|
||
reason = "UTF-32 code beyond Unicode"
|
||
else:
|
||
reason = "surrogate word in UTF-32"
|
||
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
|
||
leader = []
|
||
let errorret = lookup_error(self.errors)(error)
|
||
out.add(errorret[0])
|
||
offset = errorret[1]
|
||
if offset < 0:
|
||
offset += len(data)
|
||
def reset():
|
||
"""Implements `IncrementalDecoder.reset`"""
|
||
self.pending = b""
|
||
self.state = -1
|
||
def getstate():
|
||
"""Implements `IncrementalDecoder.getstate`"""
|
||
return (self.pending, self.state)
|
||
def setstate(state):
|
||
"""Implements `IncrementalDecoder.setstate`"""
|
||
self.pending = state[0]
|
||
self.state = state[1]
|
||
|
||
class Utf32BeIncrementalEncoder(Utf32IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for UTF-32, big endian, without a byte order mark.
|
||
"""
|
||
name = "utf-32be"
|
||
html5name = None
|
||
endian = "big"
|
||
include_bom = False
|
||
|
||
class Utf32BeIncrementalDecoder(Utf32IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for UTF-32, big endian, without a byte order mark.
|
||
"""
|
||
name = "utf-32be"
|
||
html5name = None
|
||
force_endian = "big"
|
||
|
||
class Utf32LeIncrementalEncoder(Utf32IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for UTF-32, little endian, without a byte order mark.
|
||
"""
|
||
name = "utf-32le"
|
||
html5name = None
|
||
endian = "little"
|
||
include_bom = False
|
||
|
||
class Utf32LeIncrementalDecoder(Utf32IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for UTF-32, little endian, without a byte order mark.
|
||
"""
|
||
name = "utf-32le"
|
||
html5name = None
|
||
force_endian = "little"
|
||
|
||
register_kuroko_codec(["utf-32", "utf32", "iso-10646-ucs-4", "ucs-4", "u32"],
|
||
Utf32IncrementalEncoder, Utf32IncrementalDecoder)
|
||
register_kuroko_codec(["utf-32le", "utf-32-le"],
|
||
Utf32LeIncrementalEncoder, Utf32LeIncrementalDecoder)
|
||
register_kuroko_codec(["utf-32be", "utf-32-be"],
|
||
Utf32BeIncrementalEncoder, Utf32BeIncrementalDecoder)
|
||
|
||
|
||
class HzIncrementalEncoder(IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for HZ-GB-2312 (Usenet simplified Chinese).
|
||
|
||
This is an old scheme for embedding GB 2312 data into a pure ASCII stream.
|
||
"""
|
||
name = "hz-gb-2312"
|
||
html5name = None
|
||
def ensure_state_number(state, out):
|
||
# Limit lines to 76 bytes for consistency with QuoPri.
|
||
if self.linelength >= 73 and self.state == 1:
|
||
out.add(b"~}~\n")
|
||
self.state = 0
|
||
self.linelength = 0
|
||
else if self.linelength >= 75 and self.state == 0:
|
||
out.add(b"~\n")
|
||
self.linelength = 0
|
||
#
|
||
if self.state == state:
|
||
else if state == 0:
|
||
out.add(b"~}")
|
||
self.linelength += 2
|
||
else if state == 1:
|
||
out.add(b"~{")
|
||
self.linelength += 2
|
||
else:
|
||
raise ValueError("set to invalid state: " + repr(state))
|
||
self.state = state
|
||
def encode(string, final = False):
|
||
"""Implements `IncrementalEncoder.encode`"""
|
||
let out = ByteCatenator()
|
||
let offset = 0
|
||
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
||
if offset >= len(string):
|
||
if final:
|
||
self.ensure_state_number(0, out)
|
||
return out.getvalue()
|
||
let i = string[offset]
|
||
if ord(i) < 0x80:
|
||
self.ensure_state_number(0, out)
|
||
if i == "~":
|
||
out.add(b"~") # i.e. a second one
|
||
self.linelength += 1
|
||
out.add(bytes([ord(i)]))
|
||
self.linelength += 1
|
||
offset += 1
|
||
else if ord(i) in data_7bit.encode_gb7:
|
||
self.ensure_state_number(1, out)
|
||
out.add(bytes(data_7bit.encode_gb7[ord(i)]))
|
||
self.linelength += 2
|
||
offset += 1
|
||
else:
|
||
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
||
"character not supported by target encoding")
|
||
let errorret = lookup_error(self.errors)(error)
|
||
self.ensure_state_number(0, out)
|
||
out.add(errorret[0])
|
||
self.linelength += len(errorret[0])
|
||
offset = errorret[1]
|
||
if offset < 0:
|
||
offset += len(string)
|
||
def reset():
|
||
"""Implements `IncrementalEncoder.reset`"""
|
||
self.state = 0
|
||
self.linelength = 0
|
||
def getstate():
|
||
"""Implements `IncrementalEncoder.getstate`"""
|
||
return (self.state, self.linelength)
|
||
def setstate(state):
|
||
"""Implements `IncrementalEncoder.setstate`"""
|
||
self.state = state[0]
|
||
self.linelength = state[1]
|
||
|
||
class HzIncrementalDecoder(IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for HZ-GB-2312 (Usenet simplified Chinese).
|
||
|
||
This is an old scheme for embedding GB 2312 data into a pure ASCII stream.
|
||
"""
|
||
name = "hz-gb-2312"
|
||
html5name = None
|
||
def decode(data_in, final = False):
|
||
"""Implements `IncrementalDecoder.decode`"""
|
||
let data = self.pending + data_in
|
||
self.pending = b""
|
||
let out = StringCatenator()
|
||
let offset = 0
|
||
let leader = []
|
||
let in_esc = False
|
||
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
||
if offset >= len(data):
|
||
return self._handle_truncation(out, None, final, data, offset, leader)
|
||
let i = data[offset]
|
||
if i == 0x7E and len(leader) == 0:
|
||
in_esc = True
|
||
leader.append(i)
|
||
offset += 1
|
||
else if in_esc and (self.state_set == 0) and len(leader) == 1 and i in (
|
||
0x0A, 0x0D, 0x7B, 0x7E):
|
||
in_esc = False
|
||
leader = []
|
||
if i == 0x0D:
|
||
# Be lenient about decoding soft line breaks with CR or CRLF rather than LF
|
||
# (they might be changed to CRLF over e.g. RFC822; see also the corresponding
|
||
# considerations in QuoPri)
|
||
if len(data) > (offset + 1) and data[offset + 1] == 0x0A: offset += 1
|
||
else if i == 0x0A: # Do nothing
|
||
else if i == 0x7B: self.state_set = 1
|
||
else: out.add(chr(i))
|
||
offset += 1
|
||
else if in_esc and (self.state_set == 1) and len(leader) == 1 and i == 0x7D:
|
||
in_esc = False
|
||
leader = []
|
||
self.state_set = 0
|
||
offset += 1
|
||
else if not in_esc and (self.state_set == 0) and (i < 0x7E or i == 0x7F):
|
||
out.add(chr(i))
|
||
offset += 1
|
||
else if not in_esc and (self.state_set == 1) and (len(leader) == 0) and (0x21 <= i and i <= 0x7E):
|
||
leader.append(i)
|
||
offset += 1
|
||
else if not in_esc and (self.state_set == 1) and (leader[0], i) in data_7bit.decode_gb7:
|
||
let decoded = data_7bit.decode_gb7[(leader[0], i)]
|
||
if isinstance(decoded, tuple):
|
||
for individ in decoded:
|
||
out.add(chr(individ))
|
||
else:
|
||
out.add(chr(decoded))
|
||
offset += 1
|
||
leader = []
|
||
else:
|
||
let errorstart = offset - len(leader)
|
||
let errorend = offset + 1
|
||
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
|
||
"invalid sequence")
|
||
in_esc = False
|
||
leader = []
|
||
let errorret = lookup_error(self.errors)(error)
|
||
out.add(errorret[0])
|
||
offset = errorret[1]
|
||
if offset < 0:
|
||
offset += len(data)
|
||
def reset():
|
||
"""Implements `IncrementalDecoder.reset`"""
|
||
self.pending = b""
|
||
self.state_set = 0
|
||
def getstate():
|
||
"""Implements `IncrementalDecoder.getstate`"""
|
||
return (self.pending, self.state_set)
|
||
def setstate(state):
|
||
"""Implements `IncrementalDecoder.setstate`"""
|
||
self.pending = state[0]
|
||
self.state_set = state[1]
|
||
|
||
register_kuroko_codec(["hz-gb-2312", "hz", "hzgb", "hz_gb"],
|
||
HzIncrementalEncoder, HzIncrementalDecoder)
|
||
|
||
|
||
class JapaneseAutodetectIncrementalDecoder(IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for the automatic "Japanese" character encoding option.
|
||
|
||
This will attempt to interpret the stream as the web versions of ISO-2022-JP, Shift_JIS and
|
||
EUC-JP, as well as UTF-8, at once, and start returning the data once it has narrowed it down
|
||
to one. If it fails to narrow it down conclusively, it will wait until the final call before
|
||
making an educated guess. If it doesn't seem to be any of them, it will raise `ValueError`.
|
||
"""
|
||
name = "japanese"
|
||
html5name = None
|
||
# State flags:
|
||
# 0x01: eliminated ISO-2022-JP
|
||
# 0x02: eliminated Shift JIS
|
||
# 0x04: eliminated EUC-JP
|
||
# 0x08: eliminated UTF-8
|
||
state = None
|
||
def __init__(errors):
|
||
self.errors = errors
|
||
self.jis = lookup("iso-2022-jp").incrementaldecoder("strict")
|
||
self.sjis = lookup("windows-31j").incrementaldecoder("strict")
|
||
self.ujis = lookup("euc-jp").incrementaldecoder("strict")
|
||
self.utf = lookup("utf-8-sig").incrementaldecoder("strict")
|
||
self.reset()
|
||
def decode(data, final = False):
|
||
"""Implements `IncrementalDecoder.decode`"""
|
||
if not (self.state & 0x01):
|
||
try:
|
||
self.pendingjis.add(self.jis.decode(data, final))
|
||
except UnicodeDecodeError:
|
||
self.state |= 0x01
|
||
if self.jis.state_set != 0:
|
||
self.state |= 0x0E
|
||
#
|
||
if not (self.state & 0x02):
|
||
try:
|
||
let further_sjis = self.sjis.decode(data, final)
|
||
self.pendingsjis.add(further_sjis)
|
||
if "\[$B" in further_sjis and not (self.state & 0x01):
|
||
self.state |= 0x02
|
||
except UnicodeDecodeError:
|
||
self.state |= 0x02
|
||
#
|
||
if not (self.state & 0x04):
|
||
try:
|
||
let further_ujis = self.ujis.decode(data, final)
|
||
self.pendingujis.add(further_ujis)
|
||
if "\[$B" in further_ujis and not (self.state & 0x01):
|
||
self.state |= 0x04
|
||
except UnicodeDecodeError:
|
||
self.state |= 0x04
|
||
#
|
||
if not (self.state & 0x08):
|
||
try:
|
||
let further_utf = self.utf.decode(data, final)
|
||
self.pendingutf.add(further_utf)
|
||
if "\[$B" in further_utf and not (self.state & 0x01):
|
||
self.state |= 0x08
|
||
except UnicodeDecodeError:
|
||
self.state |= 0x08
|
||
#
|
||
let use_encoding = None
|
||
if self.state == (0x01 | 0x02 | 0x04 | 0x08):
|
||
raise ValueError("does not appear to be ISO-2022-JP, Shift JIS, EUC-JP or UTF-8")
|
||
else if self.state == (0x02 | 0x04 | 0x08):
|
||
use_encoding = "jis"
|
||
else if self.state == (0x01 | 0x04 | 0x08):
|
||
use_encoding = "sjis"
|
||
else if self.state == (0x01 | 0x02 | 0x08):
|
||
use_encoding = "ujis"
|
||
else if self.state == (0x01 | 0x02 | 0x04):
|
||
use_encoding = "utf"
|
||
else if final:
|
||
# Give priority to easier-to-eliminate encodings which haven't been eliminated.
|
||
if not (self.state & 0x01):
|
||
# Anything not 7-bit clean would be enough to eliminate ISO-2022-JP.
|
||
use_encoding = "jis"
|
||
else if not (self.state & 0x08):
|
||
# With its non-overlapping single/lead/trail ranges, UTF-8 is easy to eliminate.
|
||
use_encoding = "utf"
|
||
else if not (self.state & 0x02):
|
||
# Shift JIS uses many 0x80–9F bytes; our EUC-JP codec rejects most of them (though
|
||
# some may accept them all as control codes, most don't), making it relatively
|
||
# easy to eliminate EUC-JP when given Shift JIS data.
|
||
use_encoding = "ujis"
|
||
else:
|
||
raise RuntimeError("this case should not be reachable")
|
||
#
|
||
if use_encoding == "jis":
|
||
let ret = self.pendingjis.getvalue()
|
||
self.pendingjis = StringCatenator()
|
||
return ret
|
||
else if use_encoding == "sjis":
|
||
let ret = self.pendingsjis.getvalue()
|
||
self.pendingsjis = StringCatenator()
|
||
return ret
|
||
else if use_encoding == "ujis":
|
||
let ret = self.pendingujis.getvalue()
|
||
self.pendingujis = StringCatenator()
|
||
return ret
|
||
else if use_encoding == "utf":
|
||
let ret = self.pendingutf.getvalue()
|
||
self.pendingutf = StringCatenator()
|
||
return ret
|
||
return ""
|
||
def reset():
|
||
"""Implements `IncrementalDecoder.reset`"""
|
||
self.state = 0
|
||
self.pending = b""
|
||
self.jis.reset()
|
||
self.pendingjis = StringCatenator()
|
||
self.sjis.reset()
|
||
self.pendingsjis = StringCatenator()
|
||
self.ujis.reset()
|
||
self.pendingujis = StringCatenator()
|
||
self.utf.reset()
|
||
self.pendingutf = StringCatenator()
|
||
def getstate():
|
||
"""Implements `IncrementalDecoder.getstate`"""
|
||
return (self.jis.getstate(), self.pendingjis.getvalue(),
|
||
self.sjis.getstate(), self.pendingsjis.getvalue(),
|
||
self.ujis.getstate(), self.pendingujis.getvalue(),
|
||
self.utf.getstate(), self.pendingutf.getvalue(),
|
||
self.state)
|
||
def setstate(state):
|
||
"""Implements `IncrementalDecoder.setstate`"""
|
||
self.jis.setstate(state[0])
|
||
self.pendingjis = StringCatenator()
|
||
self.pendingjis.add(state[1])
|
||
self.sjis.setstate(state[2])
|
||
self.pendingsjis = StringCatenator()
|
||
self.pendingsjis.add(state[3])
|
||
self.ujis.setstate(state[4])
|
||
self.pendingujis = StringCatenator()
|
||
self.pendingujis.add(state[5])
|
||
self.utf.setstate(state[6])
|
||
self.pendingutf = StringCatenator()
|
||
self.pendingutf.add(state[7])
|
||
self.state = state[8]
|
||
|
||
# If data is so insufficiently tagged that you have to autodetect its encoding, you probably want
|
||
# a BOM on any UTF-8 you send back:
|
||
register_kuroko_codec(["japanese"], Utf8SigIncrementalEncoder,
|
||
JapaneseAutodetectIncrementalDecoder)
|
||
|
||
|
||
class Iso2022NonJpIncrementalEncoder(IncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder subclass, base class for ISO-2022-KR and ISO-2022-CN. Not used directly.
|
||
"""
|
||
name = None
|
||
html5name = None
|
||
encodes = []
|
||
escs_shift = {}
|
||
escs_super = {}
|
||
escs_super3 = {}
|
||
def ensure_shift_state(state, out):
|
||
if self.shift == state:
|
||
else if state == False: out.add(b"\x0F")
|
||
else: out.add(b"\x0E")
|
||
self.shift = state
|
||
def ensure_shift_designation(state, out):
|
||
if self.shift_desig == state: return
|
||
out.add(b"\[$)")
|
||
out.add(bytes([self.escs_shift[state]]))
|
||
self.shift_desig = state
|
||
def ensure_super_designation(state, out):
|
||
if self.super_desig == state: return
|
||
out.add(b"\[$*")
|
||
out.add(bytes([self.escs_super[state]]))
|
||
self.super_desig = state
|
||
def ensure_super3_designation(state, out):
|
||
if self.super3_desig == state: return
|
||
out.add(b"\[$+")
|
||
out.add(bytes([self.escs_super3[state]]))
|
||
self.super3_desig = state
|
||
def run_prelude(out):
|
||
def encode(string, final = False):
|
||
"""Implements `IncrementalEncoder.encode`"""
|
||
let out = ByteCatenator()
|
||
let offset = 0
|
||
if self.shift_desig == None and self.super_desig == None and self.super3_desig == None:
|
||
self.run_prelude(out)
|
||
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
||
if offset >= len(string):
|
||
if final:
|
||
self.ensure_shift_state(False, out)
|
||
return out.getvalue()
|
||
let i = string[offset]
|
||
let is_ascii = ord(i) < 0x80 and i not in ("\x0E\x0F\[")
|
||
let is_shift = self.shift_desig != None and ord(i) in self.encodes[self.shift_desig]
|
||
let is_super = self.super_desig != None and ord(i) in self.encodes[self.super_desig]
|
||
let is_super3 = self.super3_desig != None and ord(i) in self.encodes[self.super3_desig]
|
||
if is_ascii:
|
||
self.ensure_shift_state(False, out)
|
||
out.add(bytes([ord(i)]))
|
||
offset += 1
|
||
continue
|
||
if not is_shift:
|
||
for desig in self.escs_shift.keys():
|
||
if ord(i) in self.encodes[desig]:
|
||
self.ensure_shift_designation(desig, out)
|
||
is_shift = True
|
||
break
|
||
if is_shift:
|
||
self.ensure_shift_state(True, out)
|
||
out.add(bytes(self.encodes[self.shift_desig][ord(i)]))
|
||
offset += 1
|
||
continue
|
||
if not is_super:
|
||
for desig in self.escs_super.keys():
|
||
if ord(i) in self.encodes[desig]:
|
||
self.ensure_super_designation(desig, out)
|
||
is_super = True
|
||
break
|
||
if is_super:
|
||
out.add(b"\x1B\x4E")
|
||
out.add(bytes(self.encodes[self.super_desig][ord(i)]))
|
||
offset += 1
|
||
continue
|
||
if not is_super3:
|
||
for desig in self.escs_super3.keys():
|
||
if ord(i) in self.encodes[desig]:
|
||
self.ensure_super3_designation(desig, out)
|
||
is_super3 = True
|
||
break
|
||
if is_super3:
|
||
out.add(b"\x1B\x4F")
|
||
out.add(bytes(self.encodes[self.super3_desig][ord(i)]))
|
||
offset += 1
|
||
continue
|
||
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
|
||
"character not supported by target encoding")
|
||
let errorret = lookup_error(self.errors)(error)
|
||
self.ensure_shift_state(False, out)
|
||
out.add(errorret[0])
|
||
offset = errorret[1]
|
||
if offset < 0:
|
||
offset += len(string)
|
||
def reset():
|
||
"""Implements `IncrementalEncoder.reset`"""
|
||
self.shift = False
|
||
self.shift_desig = None
|
||
self.super_desig = None
|
||
self.super3_desig = None
|
||
def getstate():
|
||
"""Implements `IncrementalEncoder.getstate`"""
|
||
return (self.shift, self.shift_desig, self.super_desig, self.super3_desig)
|
||
def setstate(state):
|
||
"""Implements `IncrementalEncoder.setstate`"""
|
||
self.shift = state[0]
|
||
self.shift_desig = state[1]
|
||
self.super_desig = state[2]
|
||
self.super3_desig = state[2]
|
||
|
||
class Iso2022NonJpIncrementalDecoder(IncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder subclass, base class for ISO-2022-KR and ISO-2022-CN. Not used directly.
|
||
"""
|
||
name = None
|
||
html5name = None
|
||
decodes = []
|
||
escs_shift = {}
|
||
escs_super = {}
|
||
escs_super3 = {}
|
||
def decode(data_in, final = False):
|
||
"""Implements `IncrementalDecoder.decode`"""
|
||
let data = self.pending + data_in
|
||
self.pending = b""
|
||
let out = StringCatenator()
|
||
let offset = 0
|
||
let leader = []
|
||
let in_esc = False
|
||
while 1: # offset can be arbitrarily changed by the error handler, so not a for
|
||
if offset >= len(data):
|
||
return self._handle_truncation(out, None, final, data, offset, leader)
|
||
let i = data[offset]
|
||
if i == 0x1B and len(leader) == 0:
|
||
in_esc = True
|
||
leader.append(i)
|
||
offset += 1
|
||
continue
|
||
if i == 0x0E and len(leader) == 0 and not self.shift:
|
||
self.shift = True
|
||
offset += 1
|
||
continue
|
||
if i == 0x0F and len(leader) == 0 and self.shift:
|
||
self.shift = False
|
||
offset += 1
|
||
continue
|
||
if in_esc:
|
||
if len(leader) == 1 and i in (0x24, 0x4E, 0x4F):
|
||
leader.append(i)
|
||
offset += 1
|
||
continue
|
||
if len(leader) == 2:
|
||
if leader[1] == 0x24 and i in (0x29, 0x2A, 0x2B):
|
||
leader.append(i)
|
||
offset += 1
|
||
continue
|
||
if leader[1] in (0x4E, 0x4F) and (0x21 <= i and i <= 0x7E):
|
||
leader.append(i)
|
||
offset += 1
|
||
continue
|
||
if len(leader) == 3:
|
||
if leader[1] in (0x4E, 0x4F) and (0x21 <= i and i <= 0x7E):
|
||
let desig = self.super_desig if leader[1] == 0x4E else self.super3_desig
|
||
if desig != None and (leader[2], i) in self.decodes[desig]:
|
||
let ucs = self.decodes[desig][(leader[2], i)]
|
||
if isinstance(ucs, tuple):
|
||
for individ in ucs:
|
||
out.add(chr(individ))
|
||
else:
|
||
out.add(chr(ucs))
|
||
leader = []
|
||
in_esc = False
|
||
offset += 1
|
||
continue
|
||
if leader[1] == 0x24:
|
||
if leader[2] == 0x29: # to G1 (shift)
|
||
if i in self.escs_shift:
|
||
self.shift_desig = self.escs_shift[i]
|
||
in_esc = False
|
||
leader = []
|
||
offset += 1
|
||
continue
|
||
if leader[2] == 0x2A: # to G2 (super shift 2)
|
||
if i in self.escs_super:
|
||
self.super_desig = self.escs_super[i]
|
||
in_esc = False
|
||
leader = []
|
||
offset += 1
|
||
continue
|
||
if leader[2] == 0x2B: # to G3 (super shift 3)
|
||
if i in self.escs_super3:
|
||
self.super3_desig = self.escs_super3[i]
|
||
in_esc = False
|
||
leader = []
|
||
offset += 1
|
||
continue
|
||
else:
|
||
if self.shift:
|
||
if len(leader) == 0 and (0x21 <= i and i <= 0x7E):
|
||
leader.append(i)
|
||
offset += 1
|
||
continue
|
||
if len(leader) == 1 and (0x21 <= i and i <= 0x7E):
|
||
if self.shift_desig != None and (leader[0], i) in self.decodes[self.shift_desig]:
|
||
let ucs = self.decodes[self.shift_desig][(leader[0], i)]
|
||
if isinstance(ucs, tuple):
|
||
for individ in ucs:
|
||
out.add(chr(individ))
|
||
else:
|
||
out.add(chr(ucs))
|
||
leader = []
|
||
in_esc = False
|
||
offset += 1
|
||
continue
|
||
else if i < 0x80:
|
||
out.add(chr(i))
|
||
offset += 1
|
||
continue
|
||
let errorstart = offset - len(leader)
|
||
let errorend
|
||
if self.shift and i != 0x1B:
|
||
errorend = errorstart + 2
|
||
else:
|
||
errorend = errorstart + 1
|
||
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
|
||
"invalid sequence")
|
||
in_esc = False
|
||
leader = []
|
||
let errorret = lookup_error(self.errors)(error)
|
||
out.add(errorret[0])
|
||
offset = errorret[1]
|
||
if offset < 0:
|
||
offset += len(data)
|
||
def reset():
|
||
"""Implements `IncrementalDecoder.reset`"""
|
||
self.pending = b""
|
||
self.shift = False
|
||
self.shift_desig = None
|
||
self.super_desig = None
|
||
self.super3_desig = None
|
||
def getstate():
|
||
"""Implements `IncrementalDecoder.getstate`"""
|
||
return (self.pending, self.shift, self.shift_desig, self.super_desig, self.super3_desig)
|
||
def setstate(state):
|
||
"""Implements `IncrementalDecoder.setstate`"""
|
||
self.pending = state[0]
|
||
self.shift = state[1]
|
||
self.shift_desig = state[2]
|
||
self.super_desig = state[3]
|
||
self.super3_desig = state[4]
|
||
|
||
class Iso2022KrIncrementalEncoder(Iso2022NonJpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for ISO-2022-KR (7-bit stateful Korean, South).
|
||
"""
|
||
name = "iso-2022-kr"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes():
|
||
return [data_7bit.encode_ksc7]
|
||
escs_shift = {0: 0x43}
|
||
def run_prelude(out):
|
||
# Per RFC 1557, the designation has to be at the "beginning of a line", hence we're doing
|
||
# it at the start of the stream. This is contra Python, which just emits it before the
|
||
# first SO, even if that's in the middle of a line.
|
||
self.ensure_shift_designation(0, out)
|
||
|
||
class Iso2022KrIncrementalDecoder(Iso2022NonJpIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for ISO-2022-KR (7-bit stateful Korean, South).
|
||
"""
|
||
name = "iso-2022-kr"
|
||
html5name = None
|
||
@lazy_property
|
||
def decodes():
|
||
return [data_7bit.decode_ksc7]
|
||
escs_shift = {0x43: 0}
|
||
|
||
register_kuroko_codec(["iso-2022-kr", "iso2022-kr", "iso2022kr", "csiso2022kr"],
|
||
Iso2022KrIncrementalEncoder, Iso2022KrIncrementalDecoder)
|
||
|
||
class Iso2022CnIncrementalEncoder(Iso2022NonJpIncrementalEncoder):
|
||
"""
|
||
IncrementalEncoder implementation for ISO-2022-CN (7-bit stateful Chinese).
|
||
|
||
ISO-2022-CN-Ext is not included (it requires a much larger set of tables and is very rare).
|
||
"""
|
||
name = "iso-2022-cn"
|
||
html5name = None
|
||
@lazy_property
|
||
def encodes():
|
||
return [data_7bit.encode_gb7, data_7bit.encode_csic1, data_7bit.encode_csic2]
|
||
escs_shift = {0: 0x41, 1: 0x47}
|
||
escs_super = {2: 0x48}
|
||
|
||
class Iso2022CnIncrementalDecoder(Iso2022NonJpIncrementalDecoder):
|
||
"""
|
||
IncrementalDecoder implementation for ISO-2022-CN (7-bit stateful Chinese).
|
||
|
||
ISO-2022-CN-Ext is not included (it requires a much larger set of tables and is very rare).
|
||
"""
|
||
name = "iso-2022-cn"
|
||
html5name = None
|
||
@lazy_property
|
||
def decodes():
|
||
return [data_7bit.decode_gb7, data_7bit.decode_csic1, data_7bit.decode_csic2]
|
||
escs_shift = {0x41: 0, 0x47: 1}
|
||
escs_super = {0x48: 2}
|
||
|
||
register_kuroko_codec(["iso-2022-cn", "iso2022-cn", "iso2022cn", "csiso2022cn"],
|
||
Iso2022CnIncrementalEncoder, Iso2022CnIncrementalDecoder)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|