|
| 1 | + |
| 2 | +# Copyright Materialize, Inc. and contributors. All rights reserved. |
| 3 | +# |
| 4 | +# Use of this software is governed by the Business Source License |
| 5 | +# included in the LICENSE file at the root of this repository. |
| 6 | +# |
| 7 | +# As of the Change Date specified in that file, in accordance with |
| 8 | +# the Business Source License, use of this software will be governed |
| 9 | +# by the Apache License, Version 2.0. |
| 10 | + |
| 11 | +# Additional test for recorded views, on top of those in test/sqllogictest/recorded_views.slt |
| 12 | + |
| 13 | + |
| 14 | + |
| 15 | +# Kafka source as a source for a recorded view |
| 16 | + |
| 17 | +$ set recorded-views={ |
| 18 | + "type" : "record", |
| 19 | + "name" : "test", |
| 20 | + "fields" : [ |
| 21 | + {"name":"f1", "type":"string"} |
| 22 | + ] |
| 23 | + } |
| 24 | + |
| 25 | +$ kafka-create-topic topic=recorded-views |
| 26 | + |
| 27 | +$ kafka-ingest format=avro topic=recorded-views schema=${recorded-views} publish=true |
| 28 | +{"f1": "123"} |
| 29 | + |
| 30 | +> CREATE MATERIALIZED SOURCE s1 |
| 31 | + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC |
| 32 | + 'testdrive-recorded-views-${testdrive.seed}' |
| 33 | + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.schema-registry-url}' |
| 34 | + ENVELOPE NONE |
| 35 | + |
| 36 | +$ kafka-ingest format=avro topic=recorded-views schema=${recorded-views} publish=true |
| 37 | +{"f1": "234"} |
| 38 | + |
| 39 | +> SELECT COUNT(*) FROM s1; |
| 40 | +2 |
| 41 | + |
| 42 | +> CREATE RECORDED VIEW v1 AS SELECT COUNT(f1::integer) AS c1 FROM s1; |
| 43 | + |
| 44 | +$ kafka-ingest format=avro topic=recorded-views schema=${recorded-views} publish=true |
| 45 | +{"f1": "345"} |
| 46 | + |
| 47 | +> CREATE SINK sink1 FROM v1 |
| 48 | + INTO KAFKA BROKER '${testdrive.kafka-addr}' |
| 49 | + TOPIC 'recorded-view-sink' |
| 50 | + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.schema-registry-url}' |
| 51 | + |
| 52 | +$ kafka-ingest format=avro topic=recorded-views schema=${recorded-views} publish=true |
| 53 | +{"f1": "456"} |
| 54 | + |
| 55 | +$ set-regex match=\d{13} replacement=<TIMESTAMP> |
| 56 | + |
| 57 | +> SELECT * FROM v1; |
| 58 | +4 |
| 59 | + |
| 60 | +$ kafka-verify format=avro sink=materialize.public.sink1 sort-messages=true |
| 61 | +{"before": null, "after": {"row": {"c1": 2}}} |
| 62 | +{"before": {"row": {"c1": 2}}, "after": {"row": {"c1": 4}}} |
| 63 | + |
| 64 | +> BEGIN |
| 65 | + |
| 66 | +> DECLARE c CURSOR FOR TAIL v1; |
| 67 | + |
| 68 | +> FETCH ALL c; |
| 69 | +<TIMESTAMP> 1 4 |
| 70 | + |
| 71 | +> COMMIT |
| 72 | + |
| 73 | +# Inject failure in the source |
| 74 | + |
| 75 | +$ kafka-ingest format=avro topic=recorded-views schema=${recorded-views} publish=true |
| 76 | +{"f1": "ABC"} |
| 77 | + |
| 78 | +! SELECT * FROM v1; |
| 79 | +contains: invalid input syntax for type integer |
0 commit comments