Skip to content

Commit 723b205

Browse files
committed
error fix json array, json object
ruff fix delete unused code
1 parent b58a334 commit 723b205

File tree

2 files changed

+141
-198
lines changed

2 files changed

+141
-198
lines changed

pymysqlreplication/event.py

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from pymysqlreplication.constants.STATUS_VAR_KEY import *
1010
from pymysqlreplication.exceptions import StatusVariableMismatch
11+
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
1112
from typing import Union, Optional
1213

1314

@@ -783,9 +784,7 @@ def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
783784
self.precision = self.temp_value_buffer[0]
784785
self.decimals = self.temp_value_buffer[1]
785786
raw_decimal = self.temp_value_buffer[2:]
786-
return self._parse_decimal_from_bytes(
787-
raw_decimal, self.precision, self.decimals
788-
)
787+
return parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)
789788

790789
def _read_default(self) -> bytes:
791790
"""
@@ -794,57 +793,6 @@ def _read_default(self) -> bytes:
794793
"""
795794
return self.packet.read(self.value_len)
796795

797-
@staticmethod
798-
def _parse_decimal_from_bytes(
799-
raw_decimal: bytes, precision: int, decimals: int
800-
) -> decimal.Decimal:
801-
"""
802-
Parse decimal from bytes.
803-
"""
804-
digits_per_integer = 9
805-
compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]
806-
integral = precision - decimals
807-
808-
uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
809-
uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)
810-
811-
res = "-" if not raw_decimal[0] & 0x80 else ""
812-
mask = -1 if res == "-" else 0
813-
raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]
814-
815-
def decode_decimal_decompress_value(comp_indx, data, mask):
816-
size = compressed_bytes[comp_indx]
817-
if size > 0:
818-
databuff = bytearray(data[:size])
819-
for i in range(size):
820-
databuff[i] = (databuff[i] ^ mask) & 0xFF
821-
return size, int.from_bytes(databuff, byteorder="big")
822-
return 0, 0
823-
824-
pointer, value = decode_decimal_decompress_value(
825-
comp_integral, raw_decimal, mask
826-
)
827-
res += str(value)
828-
829-
for _ in range(uncomp_integral):
830-
value = struct.unpack(">i", raw_decimal[pointer : pointer + 4])[0] ^ mask
831-
res += "%09d" % value
832-
pointer += 4
833-
834-
res += "."
835-
836-
for _ in range(uncomp_fractional):
837-
value = struct.unpack(">i", raw_decimal[pointer : pointer + 4])[0] ^ mask
838-
res += "%09d" % value
839-
pointer += 4
840-
841-
size, value = decode_decimal_decompress_value(
842-
comp_fractional, raw_decimal[pointer:], mask
843-
)
844-
if size > 0:
845-
res += "%0*d" % (comp_fractional, value)
846-
return decimal.Decimal(res)
847-
848796
def _dump(self) -> None:
849797
super(UserVarEvent, self)._dump()
850798
print("User variable name: %s" % self.name)

pymysqlreplication/packet.py

Lines changed: 139 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# -*- coding: utf-8 -*-
22

3-
import struct
4-
53
from pymysqlreplication import constants, event, row_event
4+
from pymysqlreplication.constants import FIELD_TYPE
5+
from pymysqlreplication.util.bytes import *
66

77
# Constants from PyMYSQL source code
88
NULL_COLUMN = 251
@@ -15,7 +15,6 @@
1515
UNSIGNED_INT24_LENGTH = 3
1616
UNSIGNED_INT64_LENGTH = 8
1717

18-
1918
JSONB_TYPE_SMALL_OBJECT = 0x0
2019
JSONB_TYPE_LARGE_OBJECT = 0x1
2120
JSONB_TYPE_SMALL_ARRAY = 0x2
@@ -35,18 +34,144 @@
3534
JSONB_LITERAL_TRUE = 0x1
3635
JSONB_LITERAL_FALSE = 0x2
3736

37+
JSONB_SMALL_OFFSET_SIZE = 2
38+
JSONB_LARGE_OFFSET_SIZE = 4
39+
JSONB_KEY_ENTRY_SIZE_SMALL = 2 + JSONB_SMALL_OFFSET_SIZE
40+
JSONB_KEY_ENTRY_SIZE_LARGE = 2 + JSONB_LARGE_OFFSET_SIZE
41+
JSONB_VALUE_ENTRY_SIZE_SMALL = 1 + JSONB_SMALL_OFFSET_SIZE
42+
JSONB_VALUE_ENTRY_SIZE_LARGE = 1 + JSONB_LARGE_OFFSET_SIZE
43+
44+
45+
def is_json_inline_value(type: bytes, is_small: bool) -> bool:
46+
if type in [JSONB_TYPE_UINT16, JSONB_TYPE_INT16, JSONB_TYPE_LITERAL]:
47+
return True
48+
elif type in [JSONB_TYPE_INT32, JSONB_TYPE_UINT32]:
49+
return not is_small
50+
return False
51+
52+
53+
def parse_json(type: bytes, data: bytes):
54+
if type == JSONB_TYPE_SMALL_OBJECT:
55+
v = parse_json_object_or_array(data, True, True)
56+
elif type == JSONB_TYPE_LARGE_OBJECT:
57+
v = parse_json_object_or_array(data, False, True)
58+
elif type == JSONB_TYPE_SMALL_ARRAY:
59+
v = parse_json_object_or_array(data, True, False)
60+
elif type == JSONB_TYPE_LARGE_ARRAY:
61+
v = parse_json_object_or_array(data, False, False)
62+
elif type == JSONB_TYPE_LITERAL:
63+
v = parse_literal(data)
64+
elif type == JSONB_TYPE_INT16:
65+
v = parse_int16(data)
66+
elif type == JSONB_TYPE_UINT16:
67+
v = parse_uint16(data)
68+
elif type == JSONB_TYPE_INT32:
69+
v = parse_int32(data)
70+
elif type == JSONB_TYPE_UINT32:
71+
v = parse_uint32(data)
72+
elif type == JSONB_TYPE_INT64:
73+
v = parse_int64(data)
74+
elif type == JSONB_TYPE_UINT64:
75+
v = parse_uint64(data)
76+
elif type == JSONB_TYPE_DOUBLE:
77+
v = parse_double(data)
78+
elif type == JSONB_TYPE_STRING:
79+
length, n = decode_variable_length(data)
80+
v = parse_string(n, length, data)
81+
elif type == JSONB_TYPE_OPAQUE:
82+
v = parse_opaque(data)
83+
else:
84+
raise ValueError("Json type %d is not handled" % t)
85+
return v
86+
87+
88+
def parse_json_object_or_array(bytes, is_small, is_object):
89+
offset_size = JSONB_SMALL_OFFSET_SIZE if is_small else JSONB_LARGE_OFFSET_SIZE
90+
count = decode_count(bytes, is_small)
91+
size = decode_count(bytes[offset_size:], is_small)
92+
if is_small:
93+
key_entry_size = JSONB_KEY_ENTRY_SIZE_SMALL
94+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_SMALL
95+
else:
96+
key_entry_size = JSONB_KEY_ENTRY_SIZE_LARGE
97+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_LARGE
98+
if is_data_short(bytes, size):
99+
raise ValueError(
100+
"Before MySQL 5.7.22, json type generated column may have invalid value"
101+
)
102+
103+
header_size = 2 * offset_size + count * value_entry_size
38104

39-
def read_offset_or_inline(packet, large):
40-
t = packet.read_uint8()
105+
if is_object:
106+
header_size += count * key_entry_size
41107

42-
if t in (JSONB_TYPE_LITERAL, JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
43-
return (t, None, packet.read_binary_json_type_inlined(t, large))
44-
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
45-
return (t, None, packet.read_binary_json_type_inlined(t, large))
108+
if header_size > size:
109+
raise ValueError("header size > size")
46110

47-
if large:
48-
return (t, packet.read_uint32(), None)
49-
return (t, packet.read_uint16(), None)
111+
keys = []
112+
if is_object:
113+
keys = []
114+
for i in range(count):
115+
entry_offset = 2 * offset_size + key_entry_size * i
116+
key_offset = decode_count(bytes[entry_offset:], is_small)
117+
key_length = decode_uint(bytes[entry_offset + offset_size :])
118+
keys.append(bytes[key_offset : key_offset + key_length])
119+
120+
values = {}
121+
for i in range(count):
122+
entry_offset = 2 * offset_size + value_entry_size * i
123+
if is_object:
124+
entry_offset += key_entry_size * count
125+
json_type = bytes[entry_offset]
126+
if is_json_inline_value(json_type, is_small):
127+
values[i] = parse_json(
128+
json_type, bytes[entry_offset + 1 : entry_offset + value_entry_size]
129+
)
130+
continue
131+
value_offset = decode_count(bytes[entry_offset + 1 :], is_small)
132+
if is_data_short(bytes, value_offset):
133+
return None
134+
values[i] = parse_json(json_type, bytes[value_offset:])
135+
if not is_object:
136+
return values
137+
out = {}
138+
for i in range(count):
139+
out[keys[i]] = values[i]
140+
return out
141+
142+
143+
def parse_literal(data: bytes):
144+
json_type = data[0]
145+
if json_type == JSONB_LITERAL_NULL:
146+
return None
147+
elif json_type == JSONB_LITERAL_TRUE:
148+
return True
149+
elif json_type == JSONB_LITERAL_FALSE:
150+
return False
151+
152+
raise ValueError("NOT LITERAL TYPE")
153+
154+
155+
def parse_opaque(data: bytes):
156+
if is_data_short(data, 1):
157+
return None
158+
type_ = data[0]
159+
data = data[1:]
160+
161+
length, n = decode_variable_length(data)
162+
data = data[n : n + length]
163+
164+
if type_ in [FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.DECIMAL]:
165+
return decode_decimal(data)
166+
elif type_ in [FIELD_TYPE.TIME, FIELD_TYPE.TIME2]:
167+
# TO-DO: parse decode_TIME
168+
decode_decimal(data)
169+
return None
170+
elif type_ in [FIELD_TYPE.DATE, FIELD_TYPE.DATETIME, FIELD_TYPE.DATETIME2]:
171+
# TO-DO: parse decode_DATETIME
172+
return decode_datetime(data)
173+
else:
174+
return data.decode(errors="ignore")
50175

51176

52177
class BinLogPacketWrapper(object):
@@ -377,138 +502,8 @@ def read_binary_json(self, size):
377502
if length == 0:
378503
# handle NULL value
379504
return None
380-
until_read = self.read_bytes + length
381-
t = self.read_uint8()
382-
value = self.read_binary_json_type(t, length)
383-
if self.read_bytes < until_read:
384-
self.read(until_read - self.read_bytes)
385-
return value
386-
387-
def read_binary_json_type(self, t, length):
388-
large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)
389-
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
390-
return self.read_binary_json_object(length - 1, large)
391-
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
392-
return self.read_binary_json_array(length - 1, large)
393-
elif t in (JSONB_TYPE_STRING,):
394-
return self.read_variable_length_string()
395-
elif t in (JSONB_TYPE_LITERAL,):
396-
value = self.read_uint8()
397-
if value == JSONB_LITERAL_NULL:
398-
return None
399-
elif value == JSONB_LITERAL_TRUE:
400-
return True
401-
elif value == JSONB_LITERAL_FALSE:
402-
return False
403-
elif t == JSONB_TYPE_INT16:
404-
return self.read_int16()
405-
elif t == JSONB_TYPE_UINT16:
406-
return self.read_uint16()
407-
elif t in (JSONB_TYPE_DOUBLE,):
408-
return struct.unpack("<d", self.read(8))[0]
409-
elif t == JSONB_TYPE_INT32:
410-
return self.read_int32()
411-
elif t == JSONB_TYPE_UINT32:
412-
return self.read_uint32()
413-
elif t == JSONB_TYPE_INT64:
414-
return self.read_int64()
415-
elif t == JSONB_TYPE_UINT64:
416-
return self.read_uint64()
417-
418-
raise ValueError("Json type %d is not handled" % t)
419-
420-
def read_binary_json_type_inlined(self, t, large):
421-
if t == JSONB_TYPE_LITERAL:
422-
value = self.read_uint32() if large else self.read_uint16()
423-
if value == JSONB_LITERAL_NULL:
424-
return None
425-
elif value == JSONB_LITERAL_TRUE:
426-
return True
427-
elif value == JSONB_LITERAL_FALSE:
428-
return False
429-
elif t == JSONB_TYPE_INT16:
430-
return self.read_int32() if large else self.read_int16()
431-
elif t == JSONB_TYPE_UINT16:
432-
return self.read_uint32() if large else self.read_uint16()
433-
elif t == JSONB_TYPE_INT32:
434-
return self.read_int32()
435-
elif t == JSONB_TYPE_UINT32:
436-
return self.read_uint32()
437-
438-
raise ValueError("Json type %d is not handled" % t)
439-
440-
def read_binary_json_object(self, length, large):
441-
offset = self.read_bytes
442-
if large:
443-
elements = self.read_uint32()
444-
size = self.read_uint32()
445-
else:
446-
elements = self.read_uint16()
447-
size = self.read_uint16()
448-
449-
if size > length:
450-
raise ValueError("Json length is larger than packet length")
451-
452-
if large:
453-
key_offset_lengths = [
454-
(
455-
self.read_uint32(), # offset (we don't actually need that)
456-
self.read_uint16(), # size of the key
457-
)
458-
for _ in range(elements)
459-
]
460-
else:
461-
key_offset_lengths = [
462-
(
463-
self.read_uint16(), # offset (we don't actually need that)
464-
self.read_uint16(), # size of key
465-
)
466-
for _ in range(elements)
467-
]
468-
469-
value_type_inlined_lengths = [
470-
read_offset_or_inline(self, large) for _ in range(elements)
471-
]
472-
473-
keys = []
474-
for i in range(elements):
475-
skip_bytes = key_offset_lengths[i][0] + offset - self.read_bytes
476-
if skip_bytes != 0:
477-
self.read(skip_bytes)
478-
keys.append(self.read(key_offset_lengths[i][1]))
479-
480-
out = {}
481-
for i in range(elements):
482-
if value_type_inlined_lengths[i][1] is None:
483-
data = value_type_inlined_lengths[i][2]
484-
else:
485-
t = value_type_inlined_lengths[i][0]
486-
data = self.read_binary_json_type(t, length)
487-
out[keys[i]] = data
488-
489-
return out
490-
491-
def read_binary_json_array(self, length, large):
492-
if large:
493-
elements = self.read_uint32()
494-
size = self.read_uint32()
495-
else:
496-
elements = self.read_uint16()
497-
size = self.read_uint16()
498-
499-
if size > length:
500-
raise ValueError("Json length is larger than packet length")
501-
502-
values_type_offset_inline = [
503-
read_offset_or_inline(self, large) for _ in range(elements)
504-
]
505-
506-
def _read(x):
507-
if x[1] is None:
508-
return x[2]
509-
return self.read_binary_json_type(x[0], length)
510-
511-
return [_read(x) for x in values_type_offset_inline]
505+
data = self.read(length)
506+
return parse_json(data[0], data[1:])
512507

513508
def read_string(self):
514509
"""Read a 'Length Coded String' from the data buffer.

0 commit comments

Comments
 (0)