Skip to content

Commit 21c72e0

Browse files
author
julien
committed
Updated sink documentation for RedisJSON and RedisTimeSeries
1 parent e872e35 commit 21c72e0

File tree

1 file changed

+130
-91
lines changed

1 file changed

+130
-91
lines changed

src/docs/asciidoc/_sink.adoc

Lines changed: 130 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ The {name} includes the following features:
99

1010
* <<sink-at-least-once-delivery,At least once delivery>>
1111
* <<sink-tasks,Multiple tasks>>
12-
* <<data-formats,Supported Data Formats>>
1312
* <<data-structures,Redis Data Structures>>
13+
* <<data-formats,Supported Data Formats>>
1414

1515
[[sink-at-least-once-delivery]]
1616
=== At least once delivery
@@ -21,100 +21,18 @@ The {name} guarantees that records from the Kafka topic are delivered at least o
2121

2222
The {name} supports running one or more tasks. You can specify the number of tasks with the `tasks.max` configuration property.
2323

24-
[[data-formats]]
25-
=== Data Formats
26-
27-
The {name} supports different data formats for record keys and values depending on the target Redis data structure.
28-
29-
==== Kafka Record Keys
30-
The {name} expects Kafka record keys in a specific format depending on the configured target <<data-structures,Redis data structure>>:
31-
32-
[options="header"]
33-
|====
34-
|Target|Record Key|Assigned To
35-
|Stream|Any|None
36-
|Hash|String|Key
37-
|String|<<key-string,String>> or <<key-bytes,bytes>>|Key
38-
|List|<<key-string,String>> or <<key-bytes,bytes>>|Member
39-
|Set|<<key-string,String>> or <<key-bytes,bytes>>|Member
40-
|Sorted Set|<<key-string,String>> or <<key-bytes,bytes>>|Member
41-
|====
42-
43-
[[key-string]]
44-
===== StringConverter
45-
If record keys are already serialized as strings use the StringConverter:
46-
47-
[source,properties]
48-
----
49-
key.converter=org.apache.kafka.connect.storage.StringConverter
50-
----
51-
52-
[[key-bytes]]
53-
===== ByteArrayConverter
54-
Use the byte array converter to use the binary serialized form of the Kafka record keys:
55-
56-
[source,properties]
57-
----
58-
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
59-
----
60-
61-
==== Kafka Record Values
62-
Multiple data formats are supported for Kafka record values depending on the configured target <<data-structures,Redis data structure>>. Each data structure expects a specific format. If your data in Kafka is not in the format expected for a given data structure, consider using https://docs.confluent.io/platform/current/connect/transforms/overview.html[Single Message Transformations] to convert to a byte array, string, Struct, or map before it is written to Redis.
63-
64-
[options="header"]
65-
|====
66-
|Target|Record Value|Assigned To
67-
|Stream|<<avro,Avro>> or <<json,JSON>>|Message body
68-
|Hash|<<avro,Avro>> or <<json,JSON>>|Fields
69-
|String|<<value-string,String>> or <<value-bytes,bytes>>|Value
70-
|List|Any|Removal if null
71-
|Set|Any|Removal if null
72-
|Sorted Set|Float64|Score or removal if null
73-
|====
74-
75-
[[value-string]]
76-
===== StringConverter
77-
If record values are already serialized as strings, use the StringConverter to store values in Redis as strings:
78-
79-
[source,properties]
80-
----
81-
value.converter=org.apache.kafka.connect.storage.StringConverter
82-
----
83-
84-
[[value-bytes]]
85-
===== ByteArrayConverter
86-
Use the byte array converter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in Redis as byte arrays:
87-
88-
[source,properties]
89-
----
90-
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
91-
----
92-
93-
[[avro]]
94-
===== Avro
95-
[source,properties]
96-
----
97-
value.converter=io.confluent.connect.avro.AvroConverter
98-
value.converter.schema.registry.url=http://localhost:8081
99-
----
100-
101-
[[json]]
102-
===== JSON
103-
[source,properties]
104-
----
105-
value.converter=org.apache.kafka.connect.json.JsonConverter
106-
value.converter.schemas.enable=<true|false> <1>
107-
----
108-
109-
<1> Set to `true` if the JSON record structure has an attached schema
110-
11124
[[data-structures]]
11225
=== Redis Data Structures
11326

114-
Record keys and values have different roles depending on the target data structure:
27+
Record keys and values have different roles depending on the target data structure.
11528

11629
[[collection-key]]
117-
For collections (stream, list, set, sorted set) a single key is used which is independent of the record key. Use the `redis.key` configuration property (default: `${topic}`) to specify a format string for the destination collection, which may contain `${topic}` as a placeholder for the originating topic name. For example `kafka_${topic}` for the topic `orders` will map to the Redis key `kafka_orders`
30+
==== Collections
31+
For collections (stream, list, set, sorted set, timeseries) a single key is used which is independent of the record key.
32+
33+
Use the `redis.key` configuration property (default: `${topic}`) to specify a format string for the destination collection, which may contain `${topic}` as a placeholder for the originating topic name.
34+
35+
For example `kafka_${topic}` for the topic `orders` will map to the Redis key `kafka_orders`.
11836

11937
==== Stream
12038

@@ -201,4 +119,125 @@ key.converter=<string or bytes> <2>
201119
<1> <<collection-key,Sorted set key>>
202120
<2> <<key-string,String>> or <<key-bytes,bytes>>: Kafka record keys to add to the set
203121

204-
The Kafka record value should be Float64 and is used for the score. If the score is null then the member is removed from the sorted set (instead of added to the sorted set).
122+
The Kafka record value should be `float64` and is used for the score. If the score is null then the member is removed from the sorted set (instead of added to the sorted set).
123+
124+
==== JSON
125+
Use the following properties to write Kafka records as RedisJSON documents:
126+
127+
[source,properties]
128+
----
129+
redis.type=JSON
130+
key.converter=<string or bytes> <1>
131+
value.converter=<string or bytes> <2>
132+
----
133+
134+
<1> <<key-string,String>> or <<key-bytes,bytes>>
135+
<2> <<value-string,String>> or <<value-bytes,bytes>>. If value is null the key is https://redis.io/commands/del[deleted].
136+
137+
==== TimeSeries
138+
139+
Use the following properties to write Kafka records as RedisTimeSeries samples:
140+
141+
[source,properties]
142+
----
143+
redis.type=TIMESERIES
144+
redis.key=<key name> <1>
145+
----
146+
147+
<1> <<collection-key,Timeseries key>>
148+
149+
The Kafka record key must be an integer (e.g. `int64`) as it is used for the sample time in milliseconds.
150+
151+
The Kafka record value must be a number (e.g. `float64`) as it is used as the sample value.
152+
153+
154+
[[data-formats]]
155+
=== Data Formats
156+
157+
The {name} supports different data formats for record keys and values depending on the target Redis data structure.
158+
159+
==== Kafka Record Keys
160+
The {name} expects Kafka record keys in a specific format depending on the configured target <<data-structures,Redis data structure>>:
161+
162+
[options="header",cols="h,1,1"]
163+
|====
164+
|Target|Record Key|Assigned To
165+
|Stream|Any|None
166+
|Hash|String|Key
167+
|String|<<key-string,String>> or <<key-bytes,bytes>>|Key
168+
|List|<<key-string,String>> or <<key-bytes,bytes>>|Member
169+
|Set|<<key-string,String>> or <<key-bytes,bytes>>|Member
170+
|Sorted Set|<<key-string,String>> or <<key-bytes,bytes>>|Member
171+
|JSON|<<key-string,String>> or <<key-bytes,bytes>>|Key
172+
|TimeSeries|Integer|Sample time in milliseconds
173+
|====
174+
175+
[[key-string]]
176+
===== StringConverter
177+
If record keys are already serialized as strings use the StringConverter:
178+
179+
[source,properties]
180+
----
181+
key.converter=org.apache.kafka.connect.storage.StringConverter
182+
----
183+
184+
[[key-bytes]]
185+
===== ByteArrayConverter
186+
Use the byte array converter to use the binary serialized form of the Kafka record keys:
187+
188+
[source,properties]
189+
----
190+
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
191+
----
192+
193+
==== Kafka Record Values
194+
Multiple data formats are supported for Kafka record values depending on the configured target <<data-structures,Redis data structure>>. Each data structure expects a specific format. If your data in Kafka is not in the format expected for a given data structure, consider using https://docs.confluent.io/platform/current/connect/transforms/overview.html[Single Message Transformations] to convert to a byte array, string, Struct, or map before it is written to Redis.
195+
196+
[options="header",cols="h,1,1"]
197+
|====
198+
|Target|Record Value|Assigned To
199+
|Stream|<<avro,Avro>> or <<json,JSON>>|Message body
200+
|Hash|<<avro,Avro>> or <<json,JSON>>|Fields
201+
|String|<<value-string,String>> or <<value-bytes,bytes>>|Value
202+
|List|Any|Removal if null
203+
|Set|Any|Removal if null
204+
|Sorted Set|Number|Score or removal if null
205+
|JSON|<<value-string,String>> or <<value-bytes,bytes>>|Value
206+
|TimeSeries|Number|Sample value
207+
|====
208+
209+
[[value-string]]
210+
===== StringConverter
211+
If record values are already serialized as strings, use the StringConverter to store values in Redis as strings:
212+
213+
[source,properties]
214+
----
215+
value.converter=org.apache.kafka.connect.storage.StringConverter
216+
----
217+
218+
[[value-bytes]]
219+
===== ByteArrayConverter
220+
Use the byte array converter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in Redis as byte arrays:
221+
222+
[source,properties]
223+
----
224+
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
225+
----
226+
227+
[[avro]]
228+
===== Avro
229+
[source,properties]
230+
----
231+
value.converter=io.confluent.connect.avro.AvroConverter
232+
value.converter.schema.registry.url=http://localhost:8081
233+
----
234+
235+
[[json]]
236+
===== JSON
237+
[source,properties]
238+
----
239+
value.converter=org.apache.kafka.connect.json.JsonConverter
240+
value.converter.schemas.enable=<true|false> <1>
241+
----
242+
243+
<1> Set to `true` if the JSON record structure has an attached schema

0 commit comments

Comments
 (0)