Skip to content

Commit a434be2

Browse files
authored
Switch using __source_ts_ns for data de-duplicatinon, for upsert mode (#546)
* Switch to __source_ts_ns for de-duplicating data for upsert * Switch to __source_ts_ns for de-duplicating data for upsert * Switch to __source_ts_ns for de-duplicating data for upsert * Switch to __source_ts_ns for de-duplicating data for upsert * Switch to __source_ts_ns for de-duplicating data for upsert * Switch to __source_ts_ns for de-duplicating data for upsert * Switch to __source_ts_ns for de-duplicating data for upsert
1 parent 606cbc5 commit a434be2

File tree

10 files changed

+24
-25
lines changed

10 files changed

+24
-25
lines changed

debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ debezium.source.topic.prefix=dbz_
8686
# do event flattening. unwrap message!
8787
debezium.transforms=unwrap
8888
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
89-
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
89+
debezium.transforms.unwrap.add.fields=op,table,source.ts_ns,db
9090
debezium.transforms.unwrap.delete.handling.mode=rewrite
9191
debezium.transforms.unwrap.drop.tombstones=true
9292

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public interface DebeziumConfig {
6565
String unwrapType();
6666

6767
@WithName("debezium.transforms.unwrap.add.fields")
68-
@WithDefault("op,table,source.ts_ms,db,ts_ms")
68+
@WithDefault("op,table,source.ts_ms,source.ts_ns,db,ts_ms,ts_ns")
6969
String unwrapAddFields();
7070

7171
@WithName("debezium.transforms.unwrap.delete.handling.mode")

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergConfig.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@ public interface IcebergConfig {
2525
String cdcOpField();
2626

2727
@WithName("debezium.sink.iceberg.upsert-dedup-column")
28-
@WithDefault("__source_ts_ms")
29-
String cdcSourceTsMsField();
28+
@WithDefault("__source_ts_ns")
29+
String cdcSourceTsField();
3030

31-
//
3231
@WithName("debezium.sink.iceberg.upsert")
3332
@WithDefault("true")
3433
boolean upsert();

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ public JsonNode value() {
8282

8383
public Long cdcSourceTsMsValue() {
8484

85-
final JsonNode element = value().get(config.iceberg().cdcSourceTsMsField());
85+
final JsonNode element = value().get(config.iceberg().cdcSourceTsField());
8686
if (element == null) {
87-
throw new DebeziumException("Field '" + config.iceberg().cdcSourceTsMsField() + "' not found in JSON object: " + value());
87+
throw new DebeziumException("Field '" + config.iceberg().cdcSourceTsField() + "' not found in JSON object: " + value());
8888
}
8989

9090
try {
9191
return element.asLong();
9292
} catch (NumberFormatException e) {
93-
throw new DebeziumException("Error converting field '" + config.iceberg().cdcSourceTsMsField() + "' value '" + element + "' to Long: " + e.getMessage(), e);
93+
throw new DebeziumException("Error converting field '" + config.iceberg().cdcSourceTsField() + "' value '" + element + "' to Long: " + e.getMessage(), e);
9494
}
9595
}
9696

debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMongodbTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public Map<String, String> getConfigOverrides() {
5757
config.put("quarkus.profile", "mongodb");
5858
config.put("%mongodb.debezium.source.connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
5959
config.put("%mongodb.debezium.transforms.unwrap.type", "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState");
60-
config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ms,db");
60+
config.put("%mongodb.debezium.transforms.unwrap.add.fields", "op,source.ts_ns,db");
6161
config.put("%mongodb.debezium.sink.iceberg.allow-field-addition", "false");
6262
config.put("%mongodb.debezium.source.topic.prefix", "testc");
6363
config.put("%mongodb.debezium.source.database.include.list", "inventory"); // ok

debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestChangeEventFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public TestChangeEvent<Object, Object> of(String destination, Integer id, String
3333
.addKeyField("id", id)
3434
.addField("first_name", name)
3535
.addField("__op", operation)
36-
.addField("__source_ts_ms", epoch)
36+
.addField("__source_ts_ns", epoch)
3737
.addField("__deleted", operation.equals("d"))
3838
.build();
3939

@@ -55,7 +55,7 @@ public TestChangeEvent<Object, Object> ofCompositeKey(String destination, Intege
5555
.addKeyField("id", id)
5656
.addKeyField("first_name", name)
5757
.addField("__op", operation)
58-
.addField("__source_ts_ms", epoch)
58+
.addField("__source_ts_ns", epoch)
5959
.addField("__deleted", operation.equals("d"))
6060
.build();
6161

@@ -90,7 +90,7 @@ public TestChangeEvent<Object, Object> ofNoKey(String destination, Integer id, S
9090
.addField("id", id)
9191
.addField("first_name", name)
9292
.addField("__op", operation)
93-
.addField("__source_ts_ms", epoch)
93+
.addField("__source_ts_ns", epoch)
9494
.addField("__deleted", operation.equals("d"))
9595
.build();
9696

debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public TestConfigSource() {
6565
// debezium unwrap message
6666
config.put("debezium.transforms", "unwrap");
6767
config.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");
68-
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db,ts_ms");
68+
config.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ns,db,ts_ms");
6969
config.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite");
7070
config.put("debezium.transforms.unwrap.drop.tombstones", "true");
7171

debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -102,31 +102,31 @@ public void testDeduplicateBatch() throws Exception {
102102
.destination("destination")
103103
.addKeyField("id", 1)
104104
.addKeyField("first_name", "row1")
105-
.addField("__source_ts_ms", 1L)
105+
.addField("__source_ts_ns", 1L)
106106
.build();
107107
RecordConverter e2 = eventBuilder
108108
.destination("destination")
109109
.addKeyField("id", 1)
110110
.addKeyField("first_name", "row1")
111-
.addField("__source_ts_ms", 3L)
111+
.addField("__source_ts_ns", 3L)
112112
.build();
113113

114114
List<RecordConverter> records = List.of(e1, e2);
115115
List<RecordConverter> dedups = icebergTableOperator.deduplicateBatch(records);
116116
Assertions.assertEquals(1, dedups.size());
117-
Assertions.assertEquals(3L, dedups.get(0).value().get("__source_ts_ms").asLong(0L));
117+
Assertions.assertEquals(3L, dedups.get(0).value().get("__source_ts_ns").asLong(0L));
118118

119119
RecordConverter e21 = eventBuilder
120120
.destination("destination")
121121
.addKeyField("id", 1)
122122
.addField("__op", "r")
123-
.addField("__source_ts_ms", 1L)
123+
.addField("__source_ts_ns", 1L)
124124
.build();
125125
RecordConverter e22 = eventBuilder
126126
.destination("destination")
127127
.addKeyField("id", 1)
128128
.addField("__op", "u")
129-
.addField("__source_ts_ms", 1L)
129+
.addField("__source_ts_ns", 1L)
130130
.build();
131131

132132
List<RecordConverter> records2 = List.of(e21, e22);
@@ -139,13 +139,13 @@ public void testDeduplicateBatch() throws Exception {
139139
.destination("destination")
140140
.addField("id", 3)
141141
.addField("__op", "r")
142-
.addField("__source_ts_ms", 1L)
142+
.addField("__source_ts_ns", 1L)
143143
.build();
144144
RecordConverter e32 = eventBuilder
145145
.destination("destination")
146146
.addField("id", 3)
147147
.addField("__op", "u")
148-
.addField("__source_ts_ms", 1L)
148+
.addField("__source_ts_ns", 1L)
149149
.build();
150150

151151
List<RecordConverter> records3 = List.of(e31, e32);

docs/DOCS.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ When event and key schema information is enabled (`debezium.format.value.schemas
2222
| `debezium.sink.iceberg.allow-field-addition` | `true` | Allow field addition to target tables. Enables automatic schema evolution, expansion. |
2323
| `debezium.sink.iceberg.upsert` | `true` | Upsert mode overwrites updated rows. Any existing rows that are updated will be overwritten with the new values. Explained further below. |
2424
| `debezium.sink.iceberg.upsert-keep-deletes` | `true` | When running in upsert mode, deleted rows are marked as deleted but retained in the target table (soft delete) |
25-
| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ms` | With upsert mode this field is used to deduplicate data. The row with the highest `__source_ts_ms` timestamp (the latest change event) is retained. _dont change!_ |
25+
| `debezium.sink.iceberg.upsert-dedup-column` | `__source_ts_ns` | With upsert mode this field is used to deduplicate data. The row with the highest `__source_ts_ns` timestamp (the latest change event) is retained. _dont change!_ |
2626
| `debezium.sink.iceberg.upsert-op-field` | `__op` | Field name for operation type in upsert mode. _dont change!_ |
2727
| `debezium.sink.iceberg.create-identifier-fields` | `true` | When set to `false`, the consumer will create tables without identifier fields. This is useful for scenarios where users want to consume nested events in append-only mode. |
2828
| `debezium.sink.iceberg.destination-regexp` | `` | A regular expression used to modify the destination Iceberg table name. This setting allows for combining multiple tables, such as `table_ptt1` and `table_ptt2`, into a single table, `table_combined`. |
@@ -43,7 +43,7 @@ When event and key schema information is enabled (`debezium.format.value.schemas
4343
| `debezium.format.key.schemas.enable` | `true` | Enable schema inclusion in the event key format. |
4444
| `debezium.transforms` | `unwrap` | Enables Debezium transforms for event processing. |
4545
| `debezium.transforms.unwrap.type` | `io.debezium.transforms.ExtractNewRecordState` | Type of the Debezium unwrap transform. |
46-
| `debezium.transforms.unwrap.add.fields` | `op,table,source.ts_ms,db,ts_ms` | List of fields to add in the Debezium unwrap transform. |
46+
| `debezium.transforms.unwrap.add.fields` | `op,table,source.ts_ns,db,ts_ms` | List of fields to add in the Debezium unwrap transform. |
4747
| `debezium.transforms.unwrap.delete.handling.mode` | `rewrite` | Handling mode for delete events in the Debezium unwrap transform. |
4848
| `debezium.transforms.unwrap.drop.tombstones` | `true` | Handling mode for tombstone events (delete markers) in the Debezium unwrap transform. |
4949
| `quarkus.log.level` | `INFO` | Global Quarkus logging level |
@@ -57,7 +57,7 @@ mode.
5757

5858
#### Upsert Mode Data Deduplication
5959

60-
Upsert mode enables data deduplication, prioritizing records based on their `__source_ts_ms` timestamp and operation
60+
Upsert mode enables data deduplication, prioritizing records based on their `__source_ts_ns` timestamp and operation
6161
type (`__op`). The `debezium.sink.iceberg.upsert-dedup-column` property can be used to specify a different column for
6262
deduplication (currently limited to Long type).
6363

@@ -163,7 +163,7 @@ the [Debezium documentation](https://debezium.io/documentation/reference/stable/
163163
```properties
164164
debezium.transforms=unwrap
165165
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
166-
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
166+
debezium.transforms.unwrap.add.fields=op,table,source.ts_ns,db
167167
debezium.transforms.unwrap.add.headers=db
168168
debezium.transforms.unwrap.delete.handling.mode=rewrite
169169
```

examples/conf/application.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ debezium.format.key=json
5050
# do event flattening. unwrap message!
5151
debezium.transforms=unwrap
5252
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
53-
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
53+
debezium.transforms.unwrap.add.fields=op,table,source.ts_ns,db
5454
debezium.transforms.unwrap.delete.handling.mode=rewrite
5555
debezium.transforms.unwrap.drop.tombstones=true
5656

0 commit comments

Comments
 (0)