kuroko/modules/codecs/dbextra.krk
HarJIT a580a835b8
Codecs revisited (#28)
* xraydict functionality and usage improvements

Add a filter_function to xraydict, allowing fewer big data structures. Make
uses of xraydict prefer exclusion sets to exclusion lists, to avoid
repeated linear search of a list.

* Make `big5_coded_forms_from_hkscs` a set, remove set trailing commas.

* Remove `big5_coded_forms_from_hkscs` in favour of a filter function.

* Similarly, use sets for 7-bit exclusion lists except when really short.

* Revise mappings for seven 78JIS codepoints.

Mappings for 25-23 and 90-22 were previously the same as those used for
97JIS; they have been swapped to correspond with how the IBM extension
versus the standard code are mapped in the "old sequence" (78JIS-based)
as opposed to the "new sequence".

Mappings for 32-70, 34-45, 35-29, 39-77 and 54-02 in 78JIS have been
changed to reflect disunifications made in 2000-JIS and 2004-JIS, assigning
the 1978-edition unsimplified variants of those characters separate coded
forms (where previously, only swaps and disunifications in 83JIS and
disunifications in 90JIS (including JIS X 0212) had been considered).

This only affects the `jis_encoding` codec (including the decoding
direction for `iso-2022-jp-2`, `iso-2022-jp-3` and `iso-2022-jp-2004`),
and the decoding is only affected when `ESC $ @` (not `ESC $ B`) is used.
The `iso-2022-jp` codec is unaffected, and remains similar to (but more
consistently pedantic than) the WHATWG specification, thus using the same
table for both 78JIS and 97JIS.

* Make `johab-ebcdic` decoder use many-to-one, not corporate PUA.

Many-to-one decodes are not uncommon in CJK encodings (e.g. Windows-31J),
and mapping to the IBM Corporate PUA (code page 1449) would probably make
it render as completely the wrong character if at all in practice.

* Switch `cp950_no_eudc_encoding_map` away from a hardcoded exclusion list.

* Codec support for `x-mac-korean`.

* Add a test bit for the UTF-8 wrapper.

* Document the unique error-condition definition of the ISO-2022-JP codec.

* Update docs now there is an actual implementation for `x-mac-korean`.

* Further explanations of the hazards of `jis_encoding`.

* Sanitised → Sanitised or escaped.

* Further clarify the status with not verifying Shift In.

* Corrected description of End State 2.

* Changes to MacKorean to avoid mapping non-ASCII using ASCII punctuation.

* Extraneous word "still".

* Fix omitting MacKorean single-byte codes.
2022-07-23 08:32:54 +09:00

1624 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
This module includes some additional variable-width or wide encodings not specified by WHATWG.
As such, none of the codecs in this module should be used in HTML.
"""
from codecs.dbextra_data_8bit import data_8bit
from codecs.dbextra_data_7bit import data_7bit
from codecs.infrastructure import register_kuroko_codec, ByteCatenator, StringCatenator, UnicodeEncodeError, UnicodeDecodeError, lookup_error, lookup, BaseEbcdicIncrementalEncoder, BaseEbcdicIncrementalDecoder, AsciiIncrementalEncoder, AsciiIncrementalDecoder, IncrementalEncoder, IncrementalDecoder, lazy_property
from codecs.dbdata import more_dbdata, XEucJpIncrementalDecoder, Big5EtenIncrementalEncoder, Big5HkscsIncrementalDecoder
from codecs.bespokecodecs import Iso2022JpIncrementalEncoder, Iso2022JpIncrementalDecoder, Utf8IncrementalDecoder, Utf16BeIncrementalEncoder, Utf16BeIncrementalDecoder, Utf8SigIncrementalEncoder
from collections import xraydict
class Big5NonEtenKanaIncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for Big5 with non-ETEN layout of kana, Cyrillic, list markers.
The other ETEN extension section (the one retained by Microsoft's version) is still included.
Although this is the kana/Cyrillic/list marker layout included in the UTC's BIG5.TXT, it is the
less common of the two (most extension schemes for Big5 use the ETEN layout), and has several
problems (katakana lacks the vowel extender, and Cyrillic lacks several capitals) which the
ETEN layout does not have. However, this codec corresponds roughly to Python's `big5`, and more
closely to its (built-in, as opposed to if/when Python aliases it to `mbcs`) `cp950`.
"""
name = "big5-nonetenkana"
html5name = None
@lazy_property
def encoding_map():
return xraydict(data_8bit.cp950_no_eudc_encoding_map, data_8bit.encode_big5_nonetenkana)
class Big5NonEtenKanaIncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for Big5 with non-ETEN layout of kana, Cyrillic, list markers.
The other ETEN extension section (the one retained by Microsoft's version) is still included.
Although this is the kana/Cyrillic/list marker layout included in the UTC's BIG5.TXT, it is the
less common of the two (most extension schemes for Big5 use the ETEN layout), and has several
problems (katakana lacks the vowel extender, and Cyrillic lacks several capitals) which the
ETEN layout does not have. However, this codec corresponds roughly to Python's `big5`, and more
closely to its (built-in, as opposed to if/when Python aliases it to `mbcs`) `cp950`.
"""
name = "big5-nonetenkana"
html5name = None
@lazy_property
def decoding_map():
return xraydict(data_8bit.cp950_no_eudc_decoding_map, data_8bit.decode_big5_nonetenkana)
dbrange = Big5HkscsIncrementalDecoder.dbrange
trailrange = Big5HkscsIncrementalDecoder.trailrange
register_kuroko_codec(["big5-nonetenkana", "big5-tw"],
Big5NonEtenKanaIncrementalEncoder, Big5NonEtenKanaIncrementalDecoder)
class XMacChineseTradIncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for Big5 with Apple's additions and reduced lead byte range.
The Unicode mappings are partly changed to be closer to Apple's (as opposed to Microsoft's)
correspondences; however, Microsoft's are retained where following Apple's would have required
PUA transcoding hints to round-trip.
"""
name = "x-mac-chinesetrad"
html5name = None
@lazy_property
def encoding_map():
return xraydict(data_8bit.cp950_no_eudc_encoding_map, {
0xB7: (0xA1, 0x45),
0x22EF: (0xA1, 0x4B),
0x203E: (0xA1, 0xC2),
0x223C: (0xA1, 0xE3),
0x2609: (0xA1, 0xF3),
0xA5: (0xA2, 0x44),
0xA2: (0xA2, 0x46),
0xA3: (0xA2, 0x47),
0xF880: 0x81,
0xF881: 0x82,
0xA0: 0xA0,
0xA9: 0xFD,
0x2122: 0xFE,
0x2026: 0xFF,
})
class XMacChineseTradIncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for Big5 with Apple's additions and reduced lead byte range.
The Unicode mappings are partly changed to be closer to Apple's (as opposed to Microsoft's)
correspondences; however, Microsoft's are retained where following Apple's would have required
PUA transcoding hints to round-trip.
"""
name = "x-mac-chinesetrad"
html5name = None
@lazy_property
def decoding_map():
return xraydict(data_8bit.cp950_no_eudc_decoding_map, {
(0xA1, 0x45): 0xB7,
(0xA1, 0x4B): 0x22EF,
(0xA1, 0xC2): 0x203E,
(0xA1, 0xE3): 0x223C,
(0xA1, 0xF3): 0x2609,
(0xA2, 0x44): 0xA5,
(0xA2, 0x46): 0xA2,
(0xA2, 0x47): 0xA3,
0x80: 0x5C,
0x81: 0xF880,
0x82: 0xF881,
0xA0: 0xA0,
0xFD: 0xA9,
0xFE: 0x2122,
0xFF: 0x2026,
})
dbrange = tuple(range(0xA1, 0xFC + 1))
trailrange = Big5HkscsIncrementalDecoder.trailrange
register_kuroko_codec(["x-mac-chinesetrad", "x-mac-trad-chinese"],
XMacChineseTradIncrementalEncoder, XMacChineseTradIncrementalDecoder)
class XMacChineseSimpIncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for EUC-CN, Apple version (hence slightly reduced lead byte range).
Mappings to more-recently added characters are used for the vertical forms, rather than
Apple transcoding hints (or GB18030 private use codes).
"""
name = "x-mac-chinesesimp"
html5name = None
@lazy_property
def encoding_map():
return xraydict(data_8bit.encode_gb8, {
0x301C: (0xA1, 0xAB),
0x22EF: (0xA1, 0xAD),
0xA2: (0xA1, 0xE9),
0xA3: (0xA1, 0xEA),
0x203E: (0xA3, 0xFE),
0xF880: 0x81,
0xF881: 0x82,
0xA0: 0xA0,
0xA9: 0xFD,
0x2122: 0xFE,
0x2026: 0xFF,
})
class XMacChineseSimpIncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for EUC-CN, Apple version (hence slightly reduced lead byte range).
Mappings to more-recently added characters are used for the vertical forms, rather than
Apple transcoding hints (or GB18030 private use codes).
"""
name = "x-mac-chinesesimp"
html5name = None
@lazy_property
def decoding_map():
return xraydict(data_8bit.decode_gb8, {
(0xA1, 0xAB): 0x301C,
(0xA1, 0xAD): 0x22EF,
(0xA1, 0xE9): 0xA2,
(0xA1, 0xEA): 0xA3,
(0xA3, 0xFE): 0x203E,
0x80: 0xFC,
0x81: 0xF880,
0x82: 0xF881,
0xA0: 0xA0,
0xFD: 0xA9,
0xFE: 0x2122,
0xFF: 0x2026,
})
dbrange = tuple(range(0xA1, 0xFC + 1))
trailrange = tuple(range(0xA1, 0xFE + 1))
register_kuroko_codec(["x-mac-chinesesimp", "x-mac-simp-chinese", "euc-cn", "euccn", "eucgb2312-cn"],
XMacChineseSimpIncrementalEncoder, XMacChineseSimpIncrementalDecoder)
class XMacKoreanIncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for the HangulTalk (MacKorean) encoding.
HangulTalk is notorious for frequently not corresponding one-to-one to Unicode. In places,
multiple Unicode representations will be accepted for a given HangulTalk representation.
"""
name = "x-mac-korean"
html5name = None
@lazy_property
def encoding_map():
return data_8bit.encode_hangultalk
class XMacKoreanIncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for the HangulTalk (MacKorean) encoding.
HangulTalk is notorious for frequently not corresponding one-to-one to Unicode; the mappings
used here are somewhat updated and improved compared to all versions of Apple's mappings and
especially the Adobe CID mappings. However, bear in mind that content will not necessarily be
decoded to the same Unicode sequences as by other implementations. In particular, decoding to
the Apple's Corporate Private Use Area has been avoided for the most part, even where this
results in poorly matched and/or convergent decoded forms, since preserving legibility has been
afforded greater priority than round tripping.
"""
name = "x-mac-korean"
html5name = None
@lazy_property
def decoding_map():
return data_8bit.decode_hangultalk
dbrange = tuple(range(0xA1, 0xFE + 1))
trailrange = tuple(range(0x41, 0x7D + 1)) + tuple(range(0xA1, 0xFE + 1))
register_kuroko_codec(["x-mac-korean"],
XMacKoreanIncrementalEncoder, XMacKoreanIncrementalDecoder)
class Cesu8IncrementalEncoder(IncrementalEncoder):
"""
IncrementalEncoder implementation for CESU-8, a deprecated UTF-8-like encoding still used by
some systems, such as TCL, and still mis-called "utf8" in some places for legacy reasons.
"""
name = "cesu-8"
html5name = None
# -1: expecting BOM
# 0: Normal
state = None
include_bom = False
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
if self.include_bom and self.state == -1:
out.add("\uFEFF".encode())
self.state = 0
let first_offset = 0
let second_offset = 0
while second_offset < len(string):
let codepoint = ord(string[second_offset])
if 0x10000 <= codepoint and codepoint <= 0x10FFFF:
out.add(string[first_offset:second_offset].encode())
let bits_remaining = codepoint - 0x10000
let sixth = 0x80 | (bits_remaining & 0x3F)
bits_remaining >>= 6
let fifth = 0xB0 | (bits_remaining & 0xF)
bits_remaining >>= 4
let third = 0x80 | (bits_remaining & 0x3F)
bits_remaining >>= 6
let second = 0xA0 | bits_remaining
out.add(bytes([0xED, second, third, 0xED, fifth, sixth]))
second_offset += 1
first_offset = second_offset
else:
second_offset += 1
out.add(string[first_offset:second_offset].encode())
return out.getvalue()
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.state = -1
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return self.state
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.state = state
class Cesu8IncrementalDecoder(Utf8IncrementalDecoder):
"""
IncrementalDecoder implementation for CESU-8, a deprecated UTF-8-like encoding still used by
some systems, such as TCL, and still mis-called "utf8" in some places for legacy reasons.
"""
name = "cesu-8"
html5name = None
def _error_handler(error):
# Note: not error.end (which is set after noticing the CESU seq, not at the end of it).
let after_cesu = error.start + 6
let maybe_cesu = list(error.object)[error.start:after_cesu]
if len(maybe_cesu) == 6 and (
maybe_cesu[0] == 0xED and 0xA0 <= maybe_cesu[1] and maybe_cesu[1] <= 0xAF
) and (
maybe_cesu[3] == 0xED and 0xB0 <= maybe_cesu[4] and maybe_cesu[4] <= 0xBF):
let codepoint = 0
codepoint |= maybe_cesu[1] & 0xF
codepoint <<= 6
codepoint |= maybe_cesu[2] & 0x3F
codepoint <<= 4
codepoint |= maybe_cesu[4] & 0xF
codepoint <<= 6
codepoint |= maybe_cesu[5] & 0x3F
codepoint += 0x10000
return (chr(codepoint), after_cesu)
elif len(maybe_cesu) >= 2 and maybe_cesu[0] == 0xC0 and maybe_cesu[1] == 0x80:
# mUTF-8 is a fairly common CESU-8 variant, using the two-byte code for embedded NUL
return ("\x00", error.start + 2)
else:
return lookup_error(self.errors)(error)
register_kuroko_codec(["utf8-ucs2", "utf8mb3", "cesu-8", "cesu8"],
Cesu8IncrementalEncoder, Cesu8IncrementalDecoder)
let _verbatim_utf7 = (
list(range(ord("A"), ord("Z") + 1)) +
list(range(ord("a"), ord("z") + 1)) +
list(range(ord("0"), ord("9") + 1)) + [ord(i) for i in "/-(),.:? \r\n"]
)
let _base64_alphabet = (
list(range(ord("A"), ord("Z") + 1)) +
list(range(ord("a"), ord("z") + 1)) +
list(range(ord("0"), ord("9") + 1)) + [ord("+"), ord("/")]
)
let _utf7_not_need_hyphen = [ord(i) for i in "(),.:? \r\n"]
class Utf7IncrementalEncoder(IncrementalEncoder):
"""
IncrementalEncoder implementation for UTF-7, a largely obsolete (and forbidden in HTML5)
scheme for mixing ASCII with Base64'd UTF-16BE in e-mail.
"""
name = "utf-7"
html5name = None
utf16encoder = None
mode = "ascii"
pending = []
def __init__(errors):
self.utf16encoder = Utf16BeIncrementalEncoder(errors)
IncrementalEncoder.__init__(self, errors)
def encode(data, final=False):
"""Implements `IncrementalEncoder.encode`"""
let incoming = self.pending + list(self.utf16encoder.encode(data, final=final))
self.pending = []
let offset = 0
let out = ByteCatenator()
let chunksize = 6 if self.mode == "base64" else 2
while offset < len(incoming):
let chunk = incoming[offset:offset + chunksize]
if len(chunk) < chunksize and not final:
self.pending = chunk
return out.getvalue()
if self.mode == "ascii":
if chunk[0] or (chunk[1] not in _verbatim_utf7):
out.add(b"+")
self.mode = "base64"
chunksize = 6
continue
out.add(bytes([chunk[1]]))
else:
if (not chunk[0]) and (chunk[1] in _verbatim_utf7):
if chunk[1] not in _utf7_not_need_hyphen:
out.add(b"-")
self.mode = "ascii"
chunksize = 2
continue
else if len(chunk) >= 4 and (not chunk[2]) and (chunk[3] in _verbatim_utf7):
chunk = chunk[:2]
else if len(chunk) == 6 and (not chunk[4]) and (chunk[5] in _verbatim_utf7):
chunk = chunk[:4]
out.add(lookup("inverse-base64").decode(bytes(chunk)).rstrip("=").encode())
offset += len(chunk)
if final and self.mode == "base64":
self.mode = "ascii"
return out.getvalue()
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.utf16encoder.reset()
self.mode = "ascii"
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return (self.utf16encoder.getstate(), self.mode, self.pending)
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.utf16encoder.setstate(state[0])
self.mode = state[1]
self.pending = state[2]
class Utf7IncrementalDecoder(IncrementalDecoder):
"""
IncrementalDecoder implementation for UTF-7, a largely obsolete (and forbidden in HTML5)
scheme for mixing ASCII with Base64'd UTF-16BE in e-mail.
"""
name = "utf-7"
html5name = None
utf16decoder = None
mode = "ascii"
pending = []
def __init__(errors):
self.utf16decoder = Utf16BeIncrementalDecoder(errors)
IncrementalDecoder.__init__(self, errors)
def decode(data_in, final=False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let incoming = list(data)
let offset = 0
let out = StringCatenator()
let chunksize = 8 if self.mode in ("base64", "maybebase64") else 1
while offset < len(incoming):
let chunk = incoming[offset:offset + chunksize]
if len(chunk) < chunksize and not final:
self.pending = bytes(chunk)
return out.getvalue()
if self.mode == "ascii":
if chunk[0] == b"+"[0]:
self.mode = "maybebase64"
chunksize = 8
else:
out.add(chr(chunk[0]))
offset += 1
else:
if self.mode == "maybebase64":
if chunk[0] == b"-"[0]:
out.add("+")
offset += 1
self.mode = "ascii"
chunksize = 1
continue
else:
self.mode = "base64"
let cutpoint = len(chunk)
let stride = len(chunk)
for n, i in enumerate(chunk):
if i not in _base64_alphabet:
cutpoint = n
stride = n if i != b"-"[0] else (n + 1)
# In preparation for the next iteration, which will be in ASCII mode:
self.mode = "ascii"
chunksize = 1
break
chunk = chunk[:cutpoint]
if len(chunk) > 0:
let padbytes = (4 - (len(chunk) % 4)) % 4
if padbytes > 2:
let error = UnicodeDecodeError(self.name, data, offset, offset + cutpoint,
"truncated Base64 sequence")
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
continue
let base64 = bytes(chunk).decode() + ("=" * padbytes)
let utf16 = lookup("inverse-base64").encode(base64)
out.add(self.utf16decoder.decode(utf16, final=final))
offset += stride
if final and self.mode != "ascii":
self.mode = "ascii"
return out.getvalue()
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.utf16decoder.reset()
self.mode = "ascii"
self.pending = b""
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.utf16encoder.getstate(), self.mode, self.pending)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.utf16encoder.setstate(state[0])
self.mode = state[1]
self.pending = state[2]
register_kuroko_codec(["utf-7", "utf7", "u7", "unicode-1-1-utf-7"],
Utf7IncrementalEncoder, Utf7IncrementalDecoder)
class EucJpFullIncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for EUC-JP, including JIS X 0212.
"""
name = "euc-jp-full"
html5name = None
@lazy_property
def encoding_map():
return data_8bit.encode_euc90
register_kuroko_codec(["euc-jp-full"],
EucJpFullIncrementalEncoder, XEucJpIncrementalDecoder)
class EucJis2004IncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for the JIS X 0213 version of EUC-JP.
"""
name = "euc-jis-2004"
html5name = None
@lazy_property
def encoding_map():
return data_8bit.encode_euc04
class EucJis2004IncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for the JIS X 0213 version of EUC-JP.
"""
name = "euc-jis-2004"
html5name = None
@lazy_property
def decoding_map():
return data_8bit.decode_euc04
dbrange = (0x8E,) + tuple(range(0xA1, 0xFE + 1))
tbrange = (0x8F,)
trailrange = tuple(range(0xA1, 0xFE + 1))
register_kuroko_codec(["euc-jis-2004", "jisx0213", "eucjis2004", "euc_jis2004",
"euc_jisx0213", "eucjisx0213"],
EucJis2004IncrementalEncoder, EucJis2004IncrementalDecoder)
class ShiftJis2004IncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for the JIS X 0213 version of Shift_JIS.
"""
name = "shift-jis-2004"
html5name = None
@lazy_property
def encoding_map():
return data_8bit.encode_sjis04
ascii_exceptions = (0x5C, 0x7E)
class ShiftJis2004IncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for the JIS X 0213 version of Shift_JIS.
"""
name = "shift-jis-2004"
html5name = None
@lazy_property
def decoding_map():
return data_8bit.decode_sjis04
ascii_exceptions = (0x5C, 0x7E)
dbrange = (129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145,
146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 224, 225, 226,
227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243,
244, 245, 246, 247, 248, 249, 250, 251, 252)
trailrange = tuple(range(64, 126 + 1)) + tuple(range(128, 252 + 1))
register_kuroko_codec(["shift_jis-2004", "shiftjis2004", "sjis_2004", "s_jis_2004",
"shift_jisx0213", "shiftjisx0213", "sjisx0213", "s_jisx0213"],
ShiftJis2004IncrementalEncoder, ShiftJis2004IncrementalDecoder)
class AsciiJohabIncrementalEncoder(AsciiIncrementalEncoder):
"""
IncrementalEncoder implementation for the PC Johab encoding (code page 1361).
"""
name = "johab-ascii"
html5name = None
@lazy_property
def encoding_map():
return data_8bit.encode_johab_ascii
class AsciiJohabIncrementalDecoder(AsciiIncrementalDecoder):
"""
IncrementalDecoder implementation for the PC Johab encoding (code page 1361).
"""
name = "johab-ascii"
html5name = None
@lazy_property
def decoding_map():
return data_8bit.decode_johab_ascii
dbrange = tuple(range(0x84, 0xF9 + 1))
# Trail ranges for hangul and nonhangul are different, but this is their union.
trailrange = tuple(range(0x31, 0x7E + 1)) + tuple(range(0x81, 0xFE + 1))
register_kuroko_codec(["cp1361", "ms1361", "johab", "x-johab", "johab-ascii"],
AsciiJohabIncrementalEncoder, AsciiJohabIncrementalDecoder)
class EbcdicJohabIncrementalEncoder(BaseEbcdicIncrementalEncoder):
"""
IncrementalEncoder implementation for code page 1364, a stateful EBCDIC variant of Johab.
"""
name = "johab-ebcdic"
html5name = None
@lazy_property
def sbcs_encode():
return data_8bit.encode_nbyte_ebcdic
@lazy_property
def dbcshost_encode():
return data_8bit.encode_johab_ebcdic
class EbcdicJohabIncrementalDecoder(BaseEbcdicIncrementalDecoder):
"""
IncrementalDecoder implementation for code page 1364, a stateful EBCDIC variant of Johab.
"""
name = "johab-ebcdic"
html5name = None
@lazy_property
def sbcs_decode():
return data_8bit.decode_nbyte_ebcdic
@lazy_property
def dbcshost_decode():
return data_8bit.decode_johab_ebcdic
register_kuroko_codec(["cp933", "ibm-933", "933", "x-IBM933", "cp1364", "ibm-1364", "x-IBM1364",
"johab-ebcdic"],
EbcdicJohabIncrementalEncoder, EbcdicJohabIncrementalDecoder)
class JisEncodingIncrementalEncoder(Iso2022JpIncrementalEncoder):
"""
IncrementalEncoder implementation for 7-bit stateful Japanese with all features.
This differs from the ISO-2022-JP encoder in that it will:
- Encode forms present in 1978 JIS but simplified by (and absent in) 1983 JIS to 1978 JIS.
- For characters not present in either table, try JIS X 0212, 2000 JIS and 2004 JIS in that order.
- For characters not present in any JIS set, try GB 2312 and Wansung.
- Preserve width of katakana.
"""
name = "jis_encoding"
html5name = None
@lazy_property
def encodes_sbcs():
return [None, None, data_7bit.encode_jis7katakana]
@lazy_property
def encodes_dbcs():
return [None, None, None,
more_dbdata.encode_jis7,
data_7bit.encode_jis78,
data_7bit.encode_jis90p2,
data_7bit.encode_jis00,
data_7bit.encode_jis00p2,
data_7bit.encode_jis04,
data_7bit.encode_gb7,
data_7bit.encode_ksc7]
@lazy_property
def encode_supershift_latin():
return data_7bit.encode_lat1supp
@lazy_property
def encode_supershift_greek():
return data_7bit.encode_greksupp
super_shift = True
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
escs_twobyte = {3: 0x42, 4: 0x40, 5: 0x44, 6: 0x4F, 7: 0x50, 8: 0x51, 9: 0x41, 10: 0x43}
attitude = "eager"
class JisEncodingIncrementalDecoder(Iso2022JpIncrementalDecoder):
"""
IncrementalDecoder implementation for 7-bit stateful Japanese.
This is differs from the ISO-2022-JP decoder in that it will:
- Decode 1978 JIS with a separate table, including 1978 JIS, NEC extensions and IBM backports.
- Accept and decode extensions from ISO-2022-JP-2 (and -1), ISO-2022-JP-3 and ISO-2022-JP-2004.
- Not generate an error for immediately concatenated JIS-Kanji→ASCII→JIS-Kanji designations.
- Accept katakana via Shift Out / Shift In.
This is used as the decoder for all other ISO-2022-JP variants besides plain ISO-2022-JP.
"""
name = "jis_encoding"
html5name = None
@lazy_property
def decodes_sbcs():
return [None, None, more_dbdata.decode_jis7katakana]
@lazy_property
def decodes_dbcs():
return [None, None, None,
more_dbdata.decode_jis7,
data_7bit.decode_jis78,
data_7bit.decode_jis90p2,
data_7bit.decode_jis00,
data_7bit.decode_jis00p2,
data_7bit.decode_jis04,
data_7bit.decode_gb7,
data_7bit.decode_ksc7]
@lazy_property
def decode_shiftout():
return more_dbdata.decode_jis7katakana
@lazy_property
def decode_supershift_latin():
return data_7bit.decode_lat1supp
@lazy_property
def decode_supershift_greek():
return data_7bit.decode_greksupp
# 0x48 is not ASCII or JIS-Roman, but SEN 85 02 00 Annex C. It is however misused for either
# ASCII or JIS-Roman in some encoders, so it is a "good idea for software to recognise,
# but not to generate" (—Lunde) it for JIS-Roman when decoding JIS_encoding.
escs_onebyte = {0x42: 0, 0x48: 1, 0x49: 2, 0x4A: 1}
escs_twobyte = {0x40: 4, 0x41: 9, 0x42: 3, 0x43: 10, 0x44: 5, 0x4F: 6, 0x50: 7, 0x51: 8}
two_byte_modes = [3, 4, 5, 6, 7, 8, 9, 10]
new_twobytes = True
shift_out = True
super_shift = True
concat_lenient = True
register_kuroko_codec(["jis_encoding", "csjisencoding", "jis", "jis7"],
JisEncodingIncrementalEncoder, JisEncodingIncrementalDecoder)
class Iso2022Jp1IncrementalEncoder(Iso2022JpIncrementalEncoder):
"""
IncrementalEncoder implementation for 7-bit stateful Japanese with JIS X 0212.
This differs from the ISO-2022-JP encoder in that it will encode to JIS X 0212, and does so
whenever possible (i.e. it will favour it over any web extensions to JIS X 0208).
"""
name = "iso-2022-jp-1"
html5name = None
@lazy_property
def encodes_sbcs():
return [None, None]
@lazy_property
def encodes_dbcs():
# Favour JIS X 0212 over any extensions in the web JIS X 0208 table.
return [None, None, data_7bit.encode_jis90p2, more_dbdata.encode_jis7]
escs_onebyte = {0: 0x42, 1: 0x4A}
escs_twobyte = {3: 0x42, 2: 0x44}
attitude = "eager"
register_kuroko_codec(["iso-2022-jp-1", "iso2022-jp-1", "iso2022jp-1"],
Iso2022Jp1IncrementalEncoder, JisEncodingIncrementalDecoder)
class Iso2022JpExtIncrementalEncoder(Iso2022JpIncrementalEncoder):
"""
IncrementalEncoder implementation for 7-bit stateful Japanese.
This differs from the ISO-2022-JP-1 encoder in that it preserves katakana width.
"""
name = "iso-2022-jp-ext"
html5name = None
@lazy_property
def encodes_sbcs():
return [None, None, data_7bit.encode_jis7katakana]
@lazy_property
def encodes_dbcs():
return [None, None, None, data_7bit.encode_jis90p2, more_dbdata.encode_jis7]
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
escs_twobyte = {4: 0x42, 3: 0x44}
attitude = "eager"
register_kuroko_codec(["iso-2022-jp-ext", "iso2022-jp-ext", "iso2022jp-ext"],
Iso2022JpExtIncrementalEncoder, JisEncodingIncrementalDecoder)
class Iso2022Jp2IncrementalEncoder(Iso2022JpIncrementalEncoder):
"""
IncrementalEncoder implementation for 7-bit stateful Japanese with multilingual extensions.
"""
name = "iso-2022-jp-2"
html5name = None
@lazy_property
def encodes_sbcs():
return [None, None]
@lazy_property
def encodes_dbcs():
# Favour JIS X 0212 over any extensions in the web JIS X 0208 table.
return [None, None,
data_7bit.encode_jis90p2,
more_dbdata.encode_jis7,
data_7bit.encode_gb7,
data_7bit.encode_ksc7]
@lazy_property
def encode_supershift_latin():
return data_7bit.encode_lat1supp
@lazy_property
def encode_supershift_greek():
return data_7bit.encode_greksupp
super_shift = True
escs_onebyte = {0: 0x42, 1: 0x4A}
escs_twobyte = {3: 0x42, 2: 0x44, 4: 0x41, 5: 0x43}
attitude = "eager"
register_kuroko_codec(["iso-2022-jp-2", "iso2022-jp-2", "iso2022jp-2", "csISO2022JP2"],
Iso2022Jp2IncrementalEncoder, JisEncodingIncrementalDecoder)
class Iso2022Jp3IncrementalEncoder(Iso2022JpIncrementalEncoder):
"""
IncrementalEncoder implementation for 7-bit stateful Japanese with JIS X 0213-2000.
"""
name = "iso-2022-jp-3"
html5name = None
@lazy_property
def encodes_sbcs():
return [None, None, data_7bit.encode_jis7katakana]
@lazy_property
def encodes_dbcs():
return [None, None, None,
data_7bit.encode_jis7_reduced,
data_7bit.encode_jis00,
data_7bit.encode_jis00p2]
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
escs_twobyte = {3: 0x42, 4: 0x4F, 5: 0x50}
attitude = "eager"
register_kuroko_codec(["iso-2022-jp-3", "iso2022-jp-3", "iso2022jp-3"],
Iso2022Jp3IncrementalEncoder, JisEncodingIncrementalDecoder)
class Iso2022Jp2004IncrementalEncoder(Iso2022JpIncrementalEncoder):
"""
IncrementalEncoder implementation for 7-bit stateful Japanese with JIS X 0213-2004.
"""
name = "iso-2022-jp-2004"
html5name = None
@lazy_property
def encodes_sbcs():
return [None, None, data_7bit.encode_jis7katakana]
@lazy_property
def encodes_dbcs():
return [None, None, None,
data_7bit.encode_jis7_reduced,
data_7bit.encode_jis00p2,
data_7bit.encode_jis04]
escs_onebyte = {0: 0x42, 1: 0x4A, 2: 0x49}
escs_twobyte = {3: 0x42, 4: 0x50, 5: 0x51}
attitude = "eager"
register_kuroko_codec(["iso-2022-jp-2004", "iso2022-jp-2004", "iso2022jp-2004"],
Iso2022Jp2004IncrementalEncoder, JisEncodingIncrementalDecoder)
class Utf32IncrementalEncoder(IncrementalEncoder):
"""
IncrementalEncoder implementation for UTF-32 with byte order mark.
"""
name = "utf-32"
html5name = None
@lazy_property
def encoding_map():
return {}
endian = "little"
include_bom = True
# -1: BOM not yet emitted if applicable
# 0: BOM emitted
state = None
def push_word(word, out):
if self.endian == "little":
out.add(bytes([word & 0xFF, (word >> 8) & 0xFF, (word >> 16) & 0xFF, (word >> 24) & 0xFF]))
else if self.endian == "big":
out.add(bytes([(word >> 24) & 0xFF, (word >> 16) & 0xFF, (word >> 8) & 0xFF, word & 0xFF]))
else:
raise ValueError("unexpected endian value: " + repr(self.endian))
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
let offset = 0
if self.include_bom and self.state == -1:
self.push_word(0xFEFF, out)
self.state = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
return out.getvalue()
let i = string[offset]
if not (0xD800 <= ord(i) and ord(i) < 0xE000):
self.push_word(ord(i), out)
offset += 1
else: # i.e. trying to encode a surrogate "codepoint"
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"surrogate codepoint")
let errorret = lookup_error(self.errors)(error)
for i in errorret[0]:
self.push_word(i, out)
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.state = -1
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return self.state
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.state = state
class Utf32IncrementalDecoder(IncrementalDecoder):
"""
IncrementalDecoder implementation for UTF-32, detected byte order, removing any byte order mark.
"""
name = "utf-32"
html5name = None
force_endian = None # subclass may set to "little" or "big"
# -1: expecting BOM
# 0: LE
# 1: BE
state = None
pending = b""
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if (offset + 3) >= len(data):
let leader_bytes = []
for i in leader:
if self.state == 1:
leader_bytes.append((i >> 8) & 0xFF)
leader_bytes.append(i & 0xFF)
else:
leader_bytes.append(i & 0xFF)
leader_bytes.append((i >> 8) & 0xFF)
if offset < len(data): # i.e. one to three isolated bytes at the end
leader_bytes.extend(list(data)[offset:])
return self._handle_truncation(out, None, final, data, offset, leader_bytes)
let i
if self.state != 1:
i = data[offset] | (data[offset + 1] << 8) | (data[offset + 2] << 16) | (data[offset + 3] << 24)
else:
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
if self.state == -1:
if self.force_endian == "little":
self.state = 0 # keep BOM if endian specified, per Python.
i = data[offset] | (data[offset + 1] << 8) | (data[offset + 2] << 16) | (data[offset + 3] << 24)
else if self.force_endian == "big":
self.state = 1
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
else if i == 0xFEFF:
self.state = 0
i = None
else if i == 0xFFFE0000:
self.state = 1
i = None
else if i & 0xFFE00000:
# UTF-32's highest eleven bits will never be used, so if they have a value it's
# obviously the other endian.
self.state = 1
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
else if not i & 0xFFFF:
# More likely to the the other endian than the first character in a plane (null,
# a Linear B character, two rare Chinese characters and two PUA characters).
self.state = 1
i = data[offset + 3] | (data[offset + 2] << 8) | (data[offset + 1] << 16) | (data[offset] << 24)
else:
# Default to LE, to be consistent with our (WHATWG-influenced) UTF-16 handling.
# Note that except in the relatively unlikely event of the stream starting with
# the first character in a plane, the previous clause would have detected
# UTF-32BE already though.
self.state = 0
if i == None:
offset += 4
else if not (0xD800 <= i and i < 0xE000) and (i < 0x110000):
out.add(chr(i))
offset += 4
else:
let errorstart = offset - (len(leader) * 2)
let errorend = errorstart + 4
let reason
if i > 0x110000:
reason = "UTF-32 code beyond Unicode"
else:
reason = "surrogate word in UTF-32"
let error = UnicodeDecodeError(self.name, data, errorstart, errorend, reason)
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(data)
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.pending = b""
self.state = -1
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.pending, self.state)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.pending = state[0]
self.state = state[1]
class Utf32BeIncrementalEncoder(Utf32IncrementalEncoder):
"""
IncrementalEncoder implementation for UTF-32, big endian, without a byte order mark.
"""
name = "utf-32be"
html5name = None
endian = "big"
include_bom = False
class Utf32BeIncrementalDecoder(Utf32IncrementalDecoder):
"""
IncrementalDecoder implementation for UTF-32, big endian, without a byte order mark.
"""
name = "utf-32be"
html5name = None
force_endian = "big"
class Utf32LeIncrementalEncoder(Utf32IncrementalEncoder):
"""
IncrementalEncoder implementation for UTF-32, little endian, without a byte order mark.
"""
name = "utf-32le"
html5name = None
endian = "little"
include_bom = False
class Utf32LeIncrementalDecoder(Utf32IncrementalDecoder):
"""
IncrementalDecoder implementation for UTF-32, little endian, without a byte order mark.
"""
name = "utf-32le"
html5name = None
force_endian = "little"
register_kuroko_codec(["utf-32", "utf32", "iso-10646-ucs-4", "ucs-4", "u32"],
Utf32IncrementalEncoder, Utf32IncrementalDecoder)
register_kuroko_codec(["utf-32le", "utf-32-le"],
Utf32LeIncrementalEncoder, Utf32LeIncrementalDecoder)
register_kuroko_codec(["utf-32be", "utf-32-be"],
Utf32BeIncrementalEncoder, Utf32BeIncrementalDecoder)
class HzIncrementalEncoder(IncrementalEncoder):
"""
IncrementalEncoder implementation for HZ-GB-2312 (Usenet simplified Chinese).
This is an old scheme for embedding GB 2312 data into a pure ASCII stream.
"""
name = "hz-gb-2312"
html5name = None
def ensure_state_number(state, out):
# Limit lines to 76 bytes for consistency with QuoPri.
if self.linelength >= 73 and self.state == 1:
out.add(b"~}~\n")
self.state = 0
self.linelength = 0
else if self.linelength >= 75 and self.state == 0:
out.add(b"~\n")
self.linelength = 0
#
if self.state == state:
else if state == 0:
out.add(b"~}")
self.linelength += 2
else if state == 1:
out.add(b"~{")
self.linelength += 2
else:
raise ValueError("set to invalid state: " + repr(state))
self.state = state
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
let offset = 0
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
if final:
self.ensure_state_number(0, out)
return out.getvalue()
let i = string[offset]
if ord(i) < 0x80:
self.ensure_state_number(0, out)
if i == "~":
out.add(b"~") # i.e. a second one
self.linelength += 1
out.add(bytes([ord(i)]))
self.linelength += 1
offset += 1
else if ord(i) in data_7bit.encode_gb7:
self.ensure_state_number(1, out)
out.add(bytes(data_7bit.encode_gb7[ord(i)]))
self.linelength += 2
offset += 1
else:
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"character not supported by target encoding")
let errorret = lookup_error(self.errors)(error)
self.ensure_state_number(0, out)
out.add(errorret[0])
self.linelength += len(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.state = 0
self.linelength = 0
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return (self.state, self.linelength)
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.state = state[0]
self.linelength = state[1]
class HzIncrementalDecoder(IncrementalDecoder):
"""
IncrementalDecoder implementation for HZ-GB-2312 (Usenet simplified Chinese).
This is an old scheme for embedding GB 2312 data into a pure ASCII stream.
"""
name = "hz-gb-2312"
html5name = None
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
let in_esc = False
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(data):
return self._handle_truncation(out, None, final, data, offset, leader)
let i = data[offset]
if i == 0x7E and len(leader) == 0:
in_esc = True
leader.append(i)
offset += 1
else if in_esc and (self.state_set == 0) and len(leader) == 1 and i in (
0x0A, 0x0D, 0x7B, 0x7E):
in_esc = False
leader = []
if i == 0x0D:
# Be lenient about decoding soft line breaks with CR or CRLF rather than LF
# (they might be changed to CRLF over e.g. RFC822; see also the corresponding
# considerations in QuoPri)
if len(data) > (offset + 1) and data[offset + 1] == 0x0A: offset += 1
else if i == 0x0A: # Do nothing
else if i == 0x7B: self.state_set = 1
else: out.add(chr(i))
offset += 1
else if in_esc and (self.state_set == 1) and len(leader) == 1 and i == 0x7D:
in_esc = False
leader = []
self.state_set = 0
offset += 1
else if not in_esc and (self.state_set == 0) and (i < 0x7E or i == 0x7F):
out.add(chr(i))
offset += 1
else if not in_esc and (self.state_set == 1) and (len(leader) == 0) and (0x21 <= i and i <= 0x7E):
leader.append(i)
offset += 1
else if not in_esc and (self.state_set == 1) and (leader[0], i) in data_7bit.decode_gb7:
let decoded = data_7bit.decode_gb7[(leader[0], i)]
if isinstance(decoded, tuple):
for individ in decoded:
out.add(chr(individ))
else:
out.add(chr(decoded))
offset += 1
leader = []
else:
let errorstart = offset - len(leader)
let errorend = offset + 1
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
"invalid sequence")
in_esc = False
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(data)
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.pending = b""
self.state_set = 0
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.pending, self.state_set)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.pending = state[0]
self.state_set = state[1]
register_kuroko_codec(["hz-gb-2312", "hz", "hzgb", "hz_gb"],
HzIncrementalEncoder, HzIncrementalDecoder)
class JapaneseAutodetectIncrementalDecoder(IncrementalDecoder):
"""
IncrementalDecoder implementation for the automatic "Japanese" character encoding option.
This will attempt to interpret the stream as the web versions of ISO-2022-JP, Shift_JIS and
EUC-JP, as well as UTF-8, at once, and start returning the data once it has narrowed it down
to one. If it fails to narrow it down conclusively, it will wait until the final call before
making an educated guess. If it doesn't seem to be any of them, it will raise `ValueError`.
"""
name = "japanese"
html5name = None
# State flags:
# 0x01: eliminated ISO-2022-JP
# 0x02: eliminated Shift JIS
# 0x04: eliminated EUC-JP
# 0x08: eliminated UTF-8
state = None
def __init__(errors):
self.errors = errors
self.jis = lookup("iso-2022-jp").incrementaldecoder("strict")
self.sjis = lookup("windows-31j").incrementaldecoder("strict")
self.ujis = lookup("euc-jp").incrementaldecoder("strict")
self.utf = lookup("utf-8-sig").incrementaldecoder("strict")
self.reset()
def decode(data, final = False):
"""Implements `IncrementalDecoder.decode`"""
if not (self.state & 0x01):
try:
self.pendingjis.add(self.jis.decode(data, final))
except UnicodeDecodeError:
self.state |= 0x01
if self.jis.state_set != 0:
self.state |= 0x0E
#
if not (self.state & 0x02):
try:
let further_sjis = self.sjis.decode(data, final)
self.pendingsjis.add(further_sjis)
if "\[$B" in further_sjis and not (self.state & 0x01):
self.state |= 0x02
except UnicodeDecodeError:
self.state |= 0x02
#
if not (self.state & 0x04):
try:
let further_ujis = self.ujis.decode(data, final)
self.pendingujis.add(further_ujis)
if "\[$B" in further_ujis and not (self.state & 0x01):
self.state |= 0x04
except UnicodeDecodeError:
self.state |= 0x04
#
if not (self.state & 0x08):
try:
let further_utf = self.utf.decode(data, final)
self.pendingutf.add(further_utf)
if "\[$B" in further_utf and not (self.state & 0x01):
self.state |= 0x08
except UnicodeDecodeError:
self.state |= 0x08
#
let use_encoding = None
if self.state == (0x01 | 0x02 | 0x04 | 0x08):
raise ValueError("does not appear to be ISO-2022-JP, Shift JIS, EUC-JP or UTF-8")
else if self.state == (0x02 | 0x04 | 0x08):
use_encoding = "jis"
else if self.state == (0x01 | 0x04 | 0x08):
use_encoding = "sjis"
else if self.state == (0x01 | 0x02 | 0x08):
use_encoding = "ujis"
else if self.state == (0x01 | 0x02 | 0x04):
use_encoding = "utf"
else if final:
# Give priority to easier-to-eliminate encodings which haven't been eliminated.
if not (self.state & 0x01):
# Anything not 7-bit clean would be enough to eliminate ISO-2022-JP.
use_encoding = "jis"
else if not (self.state & 0x08):
# With its non-overlapping single/lead/trail ranges, UTF-8 is easy to eliminate.
use_encoding = "utf"
else if not (self.state & 0x02):
# Shift JIS uses many 0x809F bytes; our EUC-JP codec rejects most of them (though
# some may accept them all as control codes, most don't), making it relatively
# easy to eliminate EUC-JP when given Shift JIS data.
use_encoding = "ujis"
else:
raise RuntimeError("this case should not be reachable")
#
if use_encoding == "jis":
let ret = self.pendingjis.getvalue()
self.pendingjis = StringCatenator()
return ret
else if use_encoding == "sjis":
let ret = self.pendingsjis.getvalue()
self.pendingsjis = StringCatenator()
return ret
else if use_encoding == "ujis":
let ret = self.pendingujis.getvalue()
self.pendingujis = StringCatenator()
return ret
else if use_encoding == "utf":
let ret = self.pendingutf.getvalue()
self.pendingutf = StringCatenator()
return ret
return ""
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.state = 0
self.pending = b""
self.jis.reset()
self.pendingjis = StringCatenator()
self.sjis.reset()
self.pendingsjis = StringCatenator()
self.ujis.reset()
self.pendingujis = StringCatenator()
self.utf.reset()
self.pendingutf = StringCatenator()
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.jis.getstate(), self.pendingjis.getvalue(),
self.sjis.getstate(), self.pendingsjis.getvalue(),
self.ujis.getstate(), self.pendingujis.getvalue(),
self.utf.getstate(), self.pendingutf.getvalue(),
self.state)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.jis.setstate(state[0])
self.pendingjis = StringCatenator()
self.pendingjis.add(state[1])
self.sjis.setstate(state[2])
self.pendingsjis = StringCatenator()
self.pendingsjis.add(state[3])
self.ujis.setstate(state[4])
self.pendingujis = StringCatenator()
self.pendingujis.add(state[5])
self.utf.setstate(state[6])
self.pendingutf = StringCatenator()
self.pendingutf.add(state[7])
self.state = state[8]
# If data is so insufficiently tagged that you have to autodetect its encoding, you probably want
# a BOM on any UTF-8 you send back:
register_kuroko_codec(["japanese"], Utf8SigIncrementalEncoder,
JapaneseAutodetectIncrementalDecoder)
class Iso2022NonJpIncrementalEncoder(IncrementalEncoder):
"""
IncrementalEncoder subclass, base class for ISO-2022-KR and ISO-2022-CN. Not used directly.
"""
name = None
html5name = None
encodes = []
escs_shift = {}
escs_super = {}
escs_super3 = {}
def ensure_shift_state(state, out):
if self.shift == state:
else if state == False: out.add(b"\x0F")
else: out.add(b"\x0E")
self.shift = state
def ensure_shift_designation(state, out):
if self.shift_desig == state: return
out.add(b"\[$)")
out.add(bytes([self.escs_shift[state]]))
self.shift_desig = state
def ensure_super_designation(state, out):
if self.super_desig == state: return
out.add(b"\[$*")
out.add(bytes([self.escs_super[state]]))
self.super_desig = state
def ensure_super3_designation(state, out):
if self.super3_desig == state: return
out.add(b"\[$+")
out.add(bytes([self.escs_super3[state]]))
self.super3_desig = state
def run_prelude(out):
def encode(string, final = False):
"""Implements `IncrementalEncoder.encode`"""
let out = ByteCatenator()
let offset = 0
if self.shift_desig == None and self.super_desig == None and self.super3_desig == None:
self.run_prelude(out)
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(string):
if final:
self.ensure_shift_state(False, out)
return out.getvalue()
let i = string[offset]
let is_ascii = ord(i) < 0x80 and i not in ("\x0E\x0F\[")
let is_shift = self.shift_desig != None and ord(i) in self.encodes[self.shift_desig]
let is_super = self.super_desig != None and ord(i) in self.encodes[self.super_desig]
let is_super3 = self.super3_desig != None and ord(i) in self.encodes[self.super3_desig]
if is_ascii:
self.ensure_shift_state(False, out)
out.add(bytes([ord(i)]))
offset += 1
continue
if not is_shift:
for desig in self.escs_shift.keys():
if ord(i) in self.encodes[desig]:
self.ensure_shift_designation(desig, out)
is_shift = True
break
if is_shift:
self.ensure_shift_state(True, out)
out.add(bytes(self.encodes[self.shift_desig][ord(i)]))
offset += 1
continue
if not is_super:
for desig in self.escs_super.keys():
if ord(i) in self.encodes[desig]:
self.ensure_super_designation(desig, out)
is_super = True
break
if is_super:
out.add(b"\x1B\x4E")
out.add(bytes(self.encodes[self.super_desig][ord(i)]))
offset += 1
continue
if not is_super3:
for desig in self.escs_super3.keys():
if ord(i) in self.encodes[desig]:
self.ensure_super3_designation(desig, out)
is_super3 = True
break
if is_super3:
out.add(b"\x1B\x4F")
out.add(bytes(self.encodes[self.super3_desig][ord(i)]))
offset += 1
continue
let error = UnicodeEncodeError(self.name, string, offset, offset + 1,
"character not supported by target encoding")
let errorret = lookup_error(self.errors)(error)
self.ensure_shift_state(False, out)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(string)
def reset():
"""Implements `IncrementalEncoder.reset`"""
self.shift = False
self.shift_desig = None
self.super_desig = None
self.super3_desig = None
def getstate():
"""Implements `IncrementalEncoder.getstate`"""
return (self.shift, self.shift_desig, self.super_desig, self.super3_desig)
def setstate(state):
"""Implements `IncrementalEncoder.setstate`"""
self.shift = state[0]
self.shift_desig = state[1]
self.super_desig = state[2]
self.super3_desig = state[2]
class Iso2022NonJpIncrementalDecoder(IncrementalDecoder):
"""
IncrementalDecoder subclass, base class for ISO-2022-KR and ISO-2022-CN. Not used directly.
"""
name = None
html5name = None
decodes = []
escs_shift = {}
escs_super = {}
escs_super3 = {}
def decode(data_in, final = False):
"""Implements `IncrementalDecoder.decode`"""
let data = self.pending + data_in
self.pending = b""
let out = StringCatenator()
let offset = 0
let leader = []
let in_esc = False
while 1: # offset can be arbitrarily changed by the error handler, so not a for
if offset >= len(data):
return self._handle_truncation(out, None, final, data, offset, leader)
let i = data[offset]
if i == 0x1B and len(leader) == 0:
in_esc = True
leader.append(i)
offset += 1
continue
if i == 0x0E and len(leader) == 0 and not self.shift:
self.shift = True
offset += 1
continue
if i == 0x0F and len(leader) == 0 and self.shift:
self.shift = False
offset += 1
continue
if in_esc:
if len(leader) == 1 and i in (0x24, 0x4E, 0x4F):
leader.append(i)
offset += 1
continue
if len(leader) == 2:
if leader[1] == 0x24 and i in (0x29, 0x2A, 0x2B):
leader.append(i)
offset += 1
continue
if leader[1] in (0x4E, 0x4F) and (0x21 <= i and i <= 0x7E):
leader.append(i)
offset += 1
continue
if len(leader) == 3:
if leader[1] in (0x4E, 0x4F) and (0x21 <= i and i <= 0x7E):
let desig = self.super_desig if leader[1] == 0x4E else self.super3_desig
if desig != None and (leader[2], i) in self.decodes[desig]:
let ucs = self.decodes[desig][(leader[2], i)]
if isinstance(ucs, tuple):
for individ in ucs:
out.add(chr(individ))
else:
out.add(chr(ucs))
leader = []
in_esc = False
offset += 1
continue
if leader[1] == 0x24:
if leader[2] == 0x29: # to G1 (shift)
if i in self.escs_shift:
self.shift_desig = self.escs_shift[i]
in_esc = False
leader = []
offset += 1
continue
if leader[2] == 0x2A: # to G2 (super shift 2)
if i in self.escs_super:
self.super_desig = self.escs_super[i]
in_esc = False
leader = []
offset += 1
continue
if leader[2] == 0x2B: # to G3 (super shift 3)
if i in self.escs_super3:
self.super3_desig = self.escs_super3[i]
in_esc = False
leader = []
offset += 1
continue
else:
if self.shift:
if len(leader) == 0 and (0x21 <= i and i <= 0x7E):
leader.append(i)
offset += 1
continue
if len(leader) == 1 and (0x21 <= i and i <= 0x7E):
if self.shift_desig != None and (leader[0], i) in self.decodes[self.shift_desig]:
let ucs = self.decodes[self.shift_desig][(leader[0], i)]
if isinstance(ucs, tuple):
for individ in ucs:
out.add(chr(individ))
else:
out.add(chr(ucs))
leader = []
in_esc = False
offset += 1
continue
else if i < 0x80:
out.add(chr(i))
offset += 1
continue
let errorstart = offset - len(leader)
let errorend
if self.shift and i != 0x1B:
errorend = errorstart + 2
else:
errorend = errorstart + 1
let error = UnicodeDecodeError(self.name, data, errorstart, errorend,
"invalid sequence")
in_esc = False
leader = []
let errorret = lookup_error(self.errors)(error)
out.add(errorret[0])
offset = errorret[1]
if offset < 0:
offset += len(data)
def reset():
"""Implements `IncrementalDecoder.reset`"""
self.pending = b""
self.shift = False
self.shift_desig = None
self.super_desig = None
self.super3_desig = None
def getstate():
"""Implements `IncrementalDecoder.getstate`"""
return (self.pending, self.shift, self.shift_desig, self.super_desig, self.super3_desig)
def setstate(state):
"""Implements `IncrementalDecoder.setstate`"""
self.pending = state[0]
self.shift = state[1]
self.shift_desig = state[2]
self.super_desig = state[3]
self.super3_desig = state[4]
class Iso2022KrIncrementalEncoder(Iso2022NonJpIncrementalEncoder):
"""
IncrementalEncoder implementation for ISO-2022-KR (7-bit stateful Korean, South).
"""
name = "iso-2022-kr"
html5name = None
@lazy_property
def encodes():
return [data_7bit.encode_ksc7]
escs_shift = {0: 0x43}
def run_prelude(out):
# Per RFC 1557, the designation has to be at the "beginning of a line", hence we're doing
# it at the start of the stream. This is contra Python, which just emits it before the
# first SO, even if that's in the middle of a line.
self.ensure_shift_designation(0, out)
class Iso2022KrIncrementalDecoder(Iso2022NonJpIncrementalDecoder):
"""
IncrementalDecoder implementation for ISO-2022-KR (7-bit stateful Korean, South).
"""
name = "iso-2022-kr"
html5name = None
@lazy_property
def decodes():
return [data_7bit.decode_ksc7]
escs_shift = {0x43: 0}
register_kuroko_codec(["iso-2022-kr", "iso2022-kr", "iso2022kr", "csiso2022kr"],
Iso2022KrIncrementalEncoder, Iso2022KrIncrementalDecoder)
class Iso2022CnIncrementalEncoder(Iso2022NonJpIncrementalEncoder):
"""
IncrementalEncoder implementation for ISO-2022-CN (7-bit stateful Chinese).
ISO-2022-CN-Ext is not included (it requires a much larger set of tables and is very rare).
"""
name = "iso-2022-cn"
html5name = None
@lazy_property
def encodes():
return [data_7bit.encode_gb7, data_7bit.encode_csic1, data_7bit.encode_csic2]
escs_shift = {0: 0x41, 1: 0x47}
escs_super = {2: 0x48}
class Iso2022CnIncrementalDecoder(Iso2022NonJpIncrementalDecoder):
"""
IncrementalDecoder implementation for ISO-2022-CN (7-bit stateful Chinese).
ISO-2022-CN-Ext is not included (it requires a much larger set of tables and is very rare).
"""
name = "iso-2022-cn"
html5name = None
@lazy_property
def decodes():
return [data_7bit.decode_gb7, data_7bit.decode_csic1, data_7bit.decode_csic2]
escs_shift = {0x41: 0, 0x47: 1}
escs_super = {0x48: 2}
register_kuroko_codec(["iso-2022-cn", "iso2022-cn", "iso2022cn", "csiso2022cn"],
Iso2022CnIncrementalEncoder, Iso2022CnIncrementalDecoder)