Skip to content

Commit b6b9f14

Browse files
polyzoswuchong
andauthored
[flink] Introduce FlussDeserializationSchema for DataStream Source (#661)
--------- Co-authored-by: Jark Wu <[email protected]>
1 parent b566f9e commit b6b9f14

File tree

9 files changed

+935
-4
lines changed

9 files changed

+935
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.source.deserializer;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.record.LogRecord;
21+
import com.alibaba.fluss.types.RowType;
22+
23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.metrics.MetricGroup;
25+
import org.apache.flink.util.UserCodeClassLoader;
26+
27+
import java.io.Serializable;
28+
29+
/**
30+
* Interface for deserialization schema used to deserialize {@link LogRecord} objects into specific
31+
* data types.
32+
*
33+
* @param <T> The type created by the deserialization schema.
34+
* @since 0.7
35+
*/
36+
@PublicEvolving
37+
public interface FlussDeserializationSchema<T> extends Serializable {
38+
39+
/**
40+
* Initialization method for the schema. It is called before the actual working methods {@link
41+
* #deserialize} and thus suitable for one time setup work.
42+
*
43+
* <p>The provided {@link InitializationContext} can be used to access additional features such
44+
* as e.g. registering user metrics, accessing row schema.
45+
*
46+
* @param context Contextual information that can be used during initialization.
47+
*/
48+
void open(InitializationContext context) throws Exception;
49+
50+
/**
51+
* Deserializes a {@link LogRecord} into an object of type T.
52+
*
53+
* @param record The Fluss record to deserialize.
54+
* @return The deserialized object.
55+
* @throws Exception If the deserialization fails.
56+
*/
57+
T deserialize(LogRecord record) throws Exception;
58+
59+
/**
60+
* Gets the data type (as a {@link TypeInformation}) produced by this deserializer.
61+
*
62+
* @param rowSchema The schema of the {@link LogRecord#getRow()}.
63+
* @return The data type produced by this deserializer.
64+
*/
65+
TypeInformation<T> getProducedType(RowType rowSchema);
66+
67+
/**
68+
* A contextual information provided for {@link #open(InitializationContext)} method. It can be
69+
* used to:
70+
*
71+
* <ul>
72+
* <li>Register user metrics via {@link InitializationContext#getMetricGroup()}
73+
* <li>Access the user code class loader.
74+
* <li>Access the schema of the {@link LogRecord#getRow()}
75+
* </ul>
76+
*/
77+
@PublicEvolving
78+
interface InitializationContext {
79+
/**
80+
* Returns the metric group for the parallel subtask of the source that runs this {@link
81+
* FlussDeserializationSchema}.
82+
*
83+
* <p>Instances of this class can be used to register new metrics with Flink and to create a
84+
* nested hierarchy based on the group names. See {@link MetricGroup} for more information
85+
* for the metrics system.
86+
*
87+
* @see MetricGroup
88+
*/
89+
MetricGroup getMetricGroup();
90+
91+
/**
92+
* Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath,
93+
* but are part of the jar file of a user job.
94+
*
95+
* @see UserCodeClassLoader
96+
*/
97+
UserCodeClassLoader getUserCodeClassLoader();
98+
99+
/**
100+
* Returns the schema of the {@link LogRecord#getRow()}.
101+
*
102+
* @return The schema of the {@link LogRecord#getRow()}.
103+
*/
104+
RowType getRowSchema();
105+
}
106+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.source.deserializer;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.record.LogRecord;
21+
import com.alibaba.fluss.types.RowType;
22+
23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.common.typeinfo.Types;
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
26+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
28+
29+
import java.util.LinkedHashMap;
30+
import java.util.Map;
31+
32+
/**
33+
* A deserialization schema that converts {@link LogRecord} objects to JSON strings.
34+
*
35+
* <p>This implementation serializes Fluss records into JSON strings, making it useful for
36+
* debugging, logging, or when the downstream processing requires string-based JSON data. The schema
37+
* preserves important metadata such as offset, timestamp, and change type along with the actual row
38+
* data.
39+
*
40+
* <p>The resulting JSON has the following structure:
41+
*
42+
* <pre>{@code
43+
* {
44+
* "offset": <record_offset>,
45+
* "timestamp": <record_timestamp>,
46+
* "changeType": <APPEND_ONLY|INSERT|UPDATE_BEFORE|UPDATE_AFTER|DELETE>,
47+
* "row": <string_representation_of_row>
48+
* }
49+
* }</pre>
50+
*
51+
* <p>Usage example:
52+
*
53+
* <pre>{@code
54+
* FlussSource<String> source = FlussSource.builder()
55+
* .setDeserializationSchema(new JsonStringDeserializationSchema())
56+
* .build();
57+
* }</pre>
58+
*
59+
* @since 0.7
60+
*/
61+
@PublicEvolving
62+
public class JsonStringDeserializationSchema implements FlussDeserializationSchema<String> {
63+
private static final long serialVersionUID = 1L;
64+
65+
/**
66+
* Jackson ObjectMapper used for JSON serialization. Marked as transient because ObjectMapper is
67+
* not serializable and needs to be recreated in the open method.
68+
*/
69+
private transient ObjectMapper objectMapper = new ObjectMapper();
70+
71+
/**
72+
* Reusable map for building the record representation before serializing to JSON. This avoids
73+
* creating a new Map for each record. Using LinkedHashMap to ensure a stable order of fields in
74+
* the JSON output.
75+
*/
76+
private final Map<String, Object> recordMap = new LinkedHashMap<>(4);
77+
78+
/**
79+
* Initializes the JSON serialization mechanism.
80+
*
81+
* <p>This method creates a new ObjectMapper instance and configures it with:
82+
*
83+
* <ul>
84+
* <li>JavaTimeModule for proper serialization of date/time objects
85+
* <li>Configuration to render dates in ISO-8601 format rather than timestamps
86+
* </ul>
87+
*
88+
* @param context Contextual information for initialization (not used in this implementation)
89+
* @throws Exception if initialization fails
90+
*/
91+
@Override
92+
public void open(InitializationContext context) throws Exception {
93+
objectMapper = new ObjectMapper();
94+
95+
objectMapper.registerModule(new JavaTimeModule());
96+
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
97+
}
98+
99+
/**
100+
* Deserializes a {@link LogRecord} into a JSON {@link String}.
101+
*
102+
* <p>The method extracts key information from the record (offset, timestamp, change type, and
103+
* row data) and serializes it as a JSON string.
104+
*
105+
* @param record The Fluss LogRecord to deserialize
106+
* @return JSON string representation of the record
107+
* @throws Exception If JSON serialization fails
108+
*/
109+
@Override
110+
public String deserialize(LogRecord record) throws Exception {
111+
recordMap.put("offset", record.logOffset());
112+
recordMap.put("timestamp", record.timestamp());
113+
recordMap.put("change_type", record.getChangeType().toString());
114+
// TODO: convert row into JSON https://github.com/alibaba/fluss/issues/678
115+
recordMap.put("row", record.getRow().toString());
116+
117+
return objectMapper.writeValueAsString(recordMap);
118+
}
119+
120+
/**
121+
* Returns the TypeInformation for the produced {@link String} type.
122+
*
123+
* @return TypeInformation for String class
124+
*/
125+
@Override
126+
public TypeInformation<String> getProducedType(RowType rowSchema) {
127+
return Types.STRING;
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.flink.source.deserializer;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
import com.alibaba.fluss.client.table.scanner.ScanRecord;
21+
import com.alibaba.fluss.flink.utils.FlinkConversions;
22+
import com.alibaba.fluss.flink.utils.FlussRowToFlinkRowConverter;
23+
import com.alibaba.fluss.record.LogRecord;
24+
import com.alibaba.fluss.types.RowType;
25+
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.table.data.RowData;
28+
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
29+
30+
/**
31+
* A deserialization schema that converts {@link LogRecord} objects to Flink's {@link RowData}
32+
* format.
33+
*
34+
* <p>This implementation takes a {@link RowType} in its constructor and uses a {@link
35+
* FlussRowToFlinkRowConverter} to transform Fluss records into Flink's internal row representation.
36+
*
37+
* <p>Usage example:
38+
*
39+
* <pre>{@code
40+
* FlussSource<RowData> source = FlussSource.builder()
41+
* .setDeserializationSchema(new RowDataDeserializationSchema())
42+
* .build();
43+
* }</pre>
44+
*
45+
* @since 0.7
46+
*/
47+
@PublicEvolving
48+
public class RowDataDeserializationSchema implements FlussDeserializationSchema<RowData> {
49+
private static final long serialVersionUID = 1L;
50+
51+
/**
52+
* Converter responsible for transforming Fluss row data into Flink's {@link RowData} format.
53+
* Initialized during {@link #open(InitializationContext)}.
54+
*/
55+
private transient FlussRowToFlinkRowConverter converter;
56+
57+
/**
58+
* Initializes the deserialization schema.
59+
*
60+
* <p>This implementation doesn't require any initialization.
61+
*
62+
* @param context Contextual information for initialization
63+
* @throws Exception if initialization fails
64+
*/
65+
@Override
66+
public void open(InitializationContext context) throws Exception {
67+
if (converter == null) {
68+
this.converter = new FlussRowToFlinkRowConverter(context.getRowSchema());
69+
}
70+
}
71+
72+
/**
73+
* Deserializes a {@link LogRecord} into a Flink {@link RowData} object.
74+
*
75+
* @param record The Fluss LogRecord to deserialize
76+
* @return The deserialized RowData
77+
* @throws Exception If deserialization fails or if the record is not a valid {@link ScanRecord}
78+
*/
79+
@Override
80+
public RowData deserialize(LogRecord record) throws Exception {
81+
if (converter == null) {
82+
throw new IllegalStateException(
83+
"Converter not initialized. The open() method must be called before deserializing records.");
84+
}
85+
return converter.toFlinkRowData(record);
86+
}
87+
88+
/** Returns the TypeInformation for the produced {@link RowData} type. */
89+
@Override
90+
public TypeInformation<RowData> getProducedType(RowType rowSchema) {
91+
return InternalTypeInfo.of(FlinkConversions.toFlinkRowType(rowSchema));
92+
}
93+
}

Diff for: fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlussRowToFlinkRowConverter.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.alibaba.fluss.flink.utils;
1818

19-
import com.alibaba.fluss.client.table.scanner.ScanRecord;
19+
import com.alibaba.fluss.record.LogRecord;
2020
import com.alibaba.fluss.row.BinaryString;
2121
import com.alibaba.fluss.row.Decimal;
2222
import com.alibaba.fluss.row.InternalRow;
@@ -43,7 +43,6 @@
4343
* modify this class.
4444
*/
4545
public class FlussRowToFlinkRowConverter {
46-
4746
private final FlussDeserializationConverter[] toFlinkFieldConverters;
4847
private final InternalRow.FieldGetter[] flussFieldGetters;
4948

@@ -56,8 +55,8 @@ public FlussRowToFlinkRowConverter(RowType rowType) {
5655
}
5756
}
5857

59-
public RowData toFlinkRowData(ScanRecord scanRecord) {
60-
return toFlinkRowData(scanRecord.getRow(), toFlinkRowKind(scanRecord.getChangeType()));
58+
public RowData toFlinkRowData(LogRecord logRecord) {
59+
return toFlinkRowData(logRecord.getRow(), toFlinkRowKind(logRecord.getChangeType()));
6160
}
6261

6362
public RowData toFlinkRowData(InternalRow flussRow) {

0 commit comments

Comments
 (0)