Skip to content

Commit d2fac8f

Browse files
Fetch writer schema to decode Avro messages (#119)
Fixes #108 ### Motivation Currently the Python client uses the reader schema, which is the schema of the consumer, to decode Avro messages. However, when the writer schema is different from the reader schema, the decode will fail. ### Modifications Add `attach_client` method to `Schema` and call it when creating consumers and readers. This method stores a reference to a `_pulsar.Client` instance, which leverages the C++ APIs added in apache/pulsar-client-cpp#257 to fetch schema info. The `AvroSchema` class fetches and caches the writer schema if it is not cached, then use both the writer schema and reader schema to decode messages. Add `test_schema_evolve` to test consumers or readers can decode any message whose writer schema is different with the reader schema.
1 parent ce25b36 commit d2fac8f

File tree

6 files changed

+120
-2
lines changed

6 files changed

+120
-2
lines changed

pulsar/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def value(self):
127127
"""
128128
Returns object with the de-serialized version of the message content
129129
"""
130-
return self._schema.decode(self._message.data())
130+
return self._schema.decode_message(self._message)
131131

132132
def properties(self):
133133
"""
@@ -841,6 +841,7 @@ def my_listener(consumer, message):
841841

842842
c._client = self
843843
c._schema = schema
844+
c._schema.attach_client(self._client)
844845
self._consumers.append(c)
845846
return c
846847

@@ -942,6 +943,7 @@ def my_listener(reader, message):
942943
c._reader = self._client.create_reader(topic, start_message_id, conf)
943944
c._client = self
944945
c._schema = schema
946+
c._schema.attach_client(self._client)
945947
self._consumers.append(c)
946948
return c
947949

pulsar/schema/schema.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,15 @@ def encode(self, obj):
3838
def decode(self, data):
3939
pass
4040

41+
def decode_message(self, msg: _pulsar.Message):
42+
return self.decode(msg.data())
43+
4144
def schema_info(self):
4245
return self._schema_info
4346

47+
def attach_client(self, client: _pulsar.Client):
48+
self._client = client
49+
4450
def _validate_object_type(self, obj):
4551
if not isinstance(obj, self._record_cls):
4652
raise TypeError('Invalid record obj of type ' + str(type(obj))

pulsar/schema/schema_avro.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import _pulsar
2121
import io
22+
import json
23+
import logging
2224
import enum
2325

2426
from . import Record
@@ -40,6 +42,8 @@ def __init__(self, record_cls, schema_definition=None):
4042
self._schema = record_cls.schema()
4143
else:
4244
self._schema = schema_definition
45+
self._writer_schemas = dict()
46+
self._logger = logging.getLogger()
4347
super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
4448

4549
def _get_serialized_value(self, x):
@@ -76,8 +80,47 @@ def encode_dict(self, d):
7680
return obj
7781

7882
def decode(self, data):
83+
return self._decode_bytes(data, self._schema)
84+
85+
def decode_message(self, msg: _pulsar.Message):
86+
if self._client is None:
87+
return self.decode(msg.data())
88+
topic = msg.topic_name()
89+
version = msg.int_schema_version()
90+
try:
91+
writer_schema = self._get_writer_schema(topic, version)
92+
return self._decode_bytes(msg.data(), writer_schema)
93+
except Exception as e:
94+
self._logger.error('Failed to get schema info of {topic} version {version}: {e}')
95+
return self._decode_bytes(msg.data(), self._schema)
96+
97+
def _get_writer_schema(self, topic: str, version: int) -> 'dict':
98+
if self._writer_schemas.get(topic) is None:
99+
self._writer_schemas[topic] = dict()
100+
writer_schema = self._writer_schemas[topic].get(version)
101+
if writer_schema is not None:
102+
return writer_schema
103+
if self._client is None:
104+
return self._schema
105+
106+
self._logger.info('Downloading schema of %s version %d...', topic, version)
107+
info = self._client.get_schema_info(topic, version)
108+
self._logger.info('Downloaded schema of %s version %d', topic, version)
109+
if info.schema_type() != _pulsar.SchemaType.AVRO:
110+
raise RuntimeError(f'The schema type of topic "{topic}" and version {version}'
111+
f' is {info.schema_type()}')
112+
writer_schema = json.loads(info.schema())
113+
self._writer_schemas[topic][version] = writer_schema
114+
return writer_schema
115+
116+
def _decode_bytes(self, data: bytes, writer_schema: dict):
79117
buffer = io.BytesIO(data)
80-
d = fastavro.schemaless_reader(buffer, self._schema)
118+
# If the record names are different between the writer schema and the reader schema,
119+
# schemaless_reader will fail with fastavro._read_common.SchemaResolutionError.
120+
# So we make the record name fields consistent here.
121+
reader_schema: dict = self._schema
122+
writer_schema['name'] = reader_schema['name']
123+
d = fastavro.schemaless_reader(buffer, writer_schema, reader_schema)
81124
if self._record_cls is not None:
82125
return self._record_cls(**d)
83126
else:

src/client.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ std::vector<std::string> Client_getTopicPartitions(Client& client, const std::st
5858
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
5959
}
6060

61+
SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, int64_t version) {
62+
return waitForAsyncValue<SchemaInfo>([&](std::function<void(Result, const SchemaInfo&)> callback) {
63+
client.getSchemaInfoAsync(topic, version, callback);
64+
});
65+
}
66+
6167
void Client_close(Client& client) {
6268
waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
6369
}
@@ -71,6 +77,7 @@ void export_client(py::module_& m) {
7177
.def("subscribe_pattern", &Client_subscribe_pattern)
7278
.def("create_reader", &Client_createReader)
7379
.def("get_topic_partitions", &Client_getTopicPartitions)
80+
.def("get_schema_info", &Client_getSchemaInfo)
7481
.def("close", &Client_close)
7582
.def("shutdown", &Client::shutdown);
7683
}

src/message.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ void export_message(py::module_& m) {
9898
})
9999
.def("topic_name", &Message::getTopicName, return_value_policy::copy)
100100
.def("redelivery_count", &Message::getRedeliveryCount)
101+
.def("int_schema_version", &Message::getLongSchemaVersion)
101102
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy);
102103

103104
MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,

tests/schema_test.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
# under the License.
1919
#
2020

21+
import math
22+
import logging
23+
import requests
24+
from typing import List
2125
from unittest import TestCase, main
2226

2327
import fastavro
@@ -27,6 +31,9 @@
2731
import json
2832
from fastavro.schema import load_schema
2933

34+
logging.basicConfig(level=logging.INFO,
35+
format='%(asctime)s %(levelname)-5s %(message)s')
36+
3037

3138
class SchemaTest(TestCase):
3239

@@ -1287,5 +1294,57 @@ class SomeSchema(Record):
12871294
with self.assertRaises(TypeError) as e:
12881295
SomeSchema(some_field=["not", "integer"])
12891296
self.assertEqual(str(e.exception), "Array field some_field items should all be of type int")
1297+
1298+
def test_schema_evolve(self):
1299+
class User1(Record):
1300+
name = String()
1301+
age = Integer()
1302+
1303+
class User2(Record):
1304+
_sorted_fields = True
1305+
name = String()
1306+
age = Integer(required=True)
1307+
1308+
response = requests.put('http://localhost:8080/admin/v2/namespaces/'
1309+
'public/default/schemaCompatibilityStrategy',
1310+
data='"FORWARD"'.encode(),
1311+
headers={'Content-Type': 'application/json'})
1312+
self.assertEqual(response.status_code, 204)
1313+
1314+
topic = 'schema-test-schema-evolve-2'
1315+
client = pulsar.Client(self.serviceUrl)
1316+
producer1 = client.create_producer(topic, schema=AvroSchema(User1))
1317+
consumer = client.subscribe(topic, 'sub', schema=AvroSchema(User1))
1318+
reader = client.create_reader(topic,
1319+
schema=AvroSchema(User1),
1320+
start_message_id=pulsar.MessageId.earliest)
1321+
producer2 = client.create_producer(topic, schema=AvroSchema(User2))
1322+
1323+
num_messages = 10 * 2
1324+
for i in range(int(num_messages / 2)):
1325+
producer1.send(User1(age=i+100, name=f'User1 {i}'))
1326+
producer2.send(User2(age=i+200, name=f'User2 {i}'))
1327+
1328+
def verify_messages(msgs: List[pulsar.Message]):
1329+
for i, msg in enumerate(msgs):
1330+
value = msg.value()
1331+
index = math.floor(i / 2)
1332+
if i % 2 == 0:
1333+
self.assertEqual(value.age, index + 100)
1334+
self.assertEqual(value.name, f'User1 {index}')
1335+
else:
1336+
self.assertEqual(value.age, index + 200)
1337+
self.assertEqual(value.name, f'User2 {index}')
1338+
1339+
msgs1 = []
1340+
msgs2 = []
1341+
for i in range(num_messages):
1342+
msgs1.append(consumer.receive())
1343+
msgs2.append(reader.read_next(1000))
1344+
verify_messages(msgs1)
1345+
verify_messages(msgs2)
1346+
1347+
client.close()
1348+
12901349
if __name__ == '__main__':
12911350
main()

0 commit comments

Comments
 (0)