@@ -161,17 +161,12 @@ public static void main(String[] args) throws Exception {
161
161
.setParallelism (parameterTool .getInt (STREAM_PARALLELISM_OPERATOR ))
162
162
.name ("broadcast alert notify" );
163
163
164
- DataStream <AlertEvent > levelMatchedAlertEventsWithNotify = alertEventsWithNotify
165
- .filter (new AlertEventLevelFilterFunction ())
166
- .setParallelism (parameterTool .getInt (STREAM_PARALLELISM_OPERATOR ))
167
- .name ("filter event notify level" );
168
-
169
164
DataStream <NotifyEvent > notifyEventWithTemplate = notifyEventDataStream .connect (allUniversalTemplates .broadcast (StateDescriptors .notifyTemplate ))
170
165
.process (new NotifyTemplateProcessFunction (parameterTool .getLong (METRIC_METADATA_TTL , 7500 ), StateDescriptors .notifyTemplate ))
171
166
.setParallelism (parameterTool .getInt (STREAM_PARALLELISM_OPERATOR ))
172
167
.name ("notify event with template" );
173
168
174
- DataStream <AlertEvent > alertEventsWithTemplate = levelMatchedAlertEventsWithNotify
169
+ DataStream <AlertEvent > alertEventsWithTemplate = alertEventsWithNotify
175
170
.connect (alertNotifyTemplateQuery .union (alertNotifyCustomTemplateQuery ).broadcast (StateDescriptors .alertNotifyTemplateStateDescriptor ))
176
171
.process (new AlertNotifyTemplateBroadcastProcessFunction (parameterTool .getLong (METRIC_METADATA_TTL ,
177
172
75000 ), StateDescriptors .alertNotifyTemplateStateDescriptor ))
@@ -243,6 +238,9 @@ public static void main(String[] args) throws Exception {
243
238
}
244
239
245
240
DataStream <AlertEvent > alertEventLevel = alertEventsWithTemplate
241
+ .filter (new AlertEventLevelFilterFunction ())
242
+ .setParallelism (parameterTool .getInt (STREAM_PARALLELISM_OPERATOR ))
243
+ .name ("filter event notify level" )
246
244
.assignTimestampsAndWatermarks (new AlertEventWatermarkExtractor ())
247
245
.keyBy (new AlertEventGroupFunction ())
248
246
.window (TumblingProcessingTimeWindows .of (Time .seconds (15 )))
0 commit comments