Skip to content

Commit 5b1a643

Browse files
freeznetlhotari
authored andcommitted
[improve][io] Enhance Kafka connector logging with focused bootstrap server information (#24128)
(cherry picked from commit d5f126f)
1 parent cd7168d commit 5b1a643

File tree

4 files changed

+35
-33
lines changed

4 files changed

+35
-33
lines changed

Diff for: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import lombok.Getter;
4545
import lombok.extern.slf4j.Slf4j;
4646
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
47+
import org.apache.kafka.clients.producer.ProducerConfig;
4748
import org.apache.kafka.common.TopicPartition;
4849
import org.apache.kafka.common.record.TimestampType;
4950
import org.apache.kafka.connect.connector.Task;
@@ -160,7 +161,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
160161
kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
161162
Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set");
162163
Preconditions.checkArgument(ctx.getSubscriptionType() == SubscriptionType.Failover
163-
|| ctx.getSubscriptionType() == SubscriptionType.Exclusive,
164+
|| ctx.getSubscriptionType() == SubscriptionType.Exclusive,
164165
"Source must run with Exclusive or Failover subscription type");
165166
topicName = kafkaSinkConfig.getTopic();
166167
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
@@ -220,7 +221,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
220221
scheduledExecutor.scheduleWithFixedDelay(() ->
221222
this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS);
222223

223-
log.info("Kafka sink started : {}.", props);
224+
log.info("Kafka sink started : {}.", props.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ""));
224225
}
225226

226227
private void flushIfNeeded(boolean force) {
@@ -282,15 +283,15 @@ private static boolean areMapsEqual(Map<TopicPartition, OffsetAndMetadata> first
282283

283284
@VisibleForTesting
284285
protected void ackUntil(Record<GenericObject> lastNotFlushed,
285-
Map<TopicPartition, OffsetAndMetadata> committedOffsets,
286-
java.util.function.Consumer<Record<GenericObject>> cb) {
286+
Map<TopicPartition, OffsetAndMetadata> committedOffsets,
287+
java.util.function.Consumer<Record<GenericObject>> cb) {
287288
// lastNotFlushed is needed in case of default preCommit() implementation
288289
// which calls flush() and returns currentOffsets passed to it.
289290
// We don't want to ack messages added to pendingFlushQueue after the preCommit/flush call
290291

291292
// to avoid creation of new TopicPartition for each record in pendingFlushQueue
292293
Map<String, Map<Integer, Long>> topicOffsets = new HashMap<>();
293-
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: committedOffsets.entrySet()) {
294+
for (Map.Entry<TopicPartition, OffsetAndMetadata> e : committedOffsets.entrySet()) {
294295
TopicPartition tp = e.getKey();
295296
if (!topicOffsets.containsKey(tp.topic())) {
296297
topicOffsets.put(tp.topic(), new HashMap<>());

Diff for: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
8181
kafkaSinkConfig = KafkaSinkConfig.load(config, sinkContext);
8282
if (kafkaSinkConfig.getBatchSize() <= 0) {
8383
throw new IllegalArgumentException("Invalid Kafka Producer batchSize : "
84-
+ kafkaSinkConfig.getBatchSize());
84+
+ kafkaSinkConfig.getBatchSize());
8585
}
8686
if (kafkaSinkConfig.getMaxRequestSize() <= 0) {
8787
throw new IllegalArgumentException("Invalid Kafka Producer maxRequestSize : "
88-
+ kafkaSinkConfig.getMaxRequestSize());
88+
+ kafkaSinkConfig.getMaxRequestSize());
8989
}
9090
if (kafkaSinkConfig.getProducerConfigProperties() != null) {
9191
props.putAll(kafkaSinkConfig.getProducerConfigProperties());
@@ -122,7 +122,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
122122

123123
producer = new KafkaProducer<>(beforeCreateProducer(props));
124124

125-
log.info("Kafka sink started : {}.", props);
125+
log.info("Kafka sink started : {}.", props.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ""));
126126
}
127127

128128
public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);

Diff for: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333
* apply schema into it.
3434
*/
3535
@Connector(
36-
name = "kafka",
37-
type = IOType.SINK,
38-
help = "The KafkaBytesSink is used for moving messages from Pulsar to Kafka.",
39-
configClass = KafkaSinkConfig.class
36+
name = "kafka",
37+
type = IOType.SINK,
38+
help = "The KafkaBytesSink is used for moving messages from Pulsar to Kafka.",
39+
configClass = KafkaSinkConfig.class
4040
)
4141
@Slf4j
4242
public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
@@ -45,7 +45,7 @@ public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
4545
protected Properties beforeCreateProducer(Properties props) {
4646
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4747
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
48-
log.info("Created kafka producer config : {}", props);
48+
log.info("Created kafka producer on : {}", props.getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ""));
4949
return props;
5050
}
5151

Diff for: pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java

+21-20
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,22 @@
4949
import org.apache.pulsar.io.core.annotations.IOType;
5050

5151
/**
52-
* Kafka Source that transfers the data from Kafka to Pulsar and sets the Schema type properly.
53-
* We use the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar.
54-
* In case of KafkaAvroDeserializer we use the Schema Registry to download the schema and apply it to the topic.
55-
* Please refer to {@link #getSchemaFromDeserializerAndAdaptConfiguration(String, Properties, boolean)} for the list
56-
* of supported Deserializers.
57-
* If you set StringDeserializer for the key then we use the raw key as key for the Pulsar message.
58-
* If you set another Deserializer for the key we use the KeyValue schema type in Pulsar with the SEPARATED encoding.
59-
* This way the Key is stored in the Pulsar key, encoded as base64 string and with a Schema, the Value of the message
60-
* is stored in the Pulsar value with a Schema.
61-
* This way there is a one-to-one mapping between Kafka key/value pair and the Pulsar data model.
52+
* Kafka Source that transfers the data from Kafka to Pulsar and sets the Schema type properly.
53+
* We use the key and the value deserializer in order to decide the type of Schema to be set on the topic on Pulsar.
54+
* In case of KafkaAvroDeserializer we use the Schema Registry to download the schema and apply it to the topic.
55+
* Please refer to {@link #getSchemaFromDeserializerAndAdaptConfiguration(String, Properties, boolean)} for the list
56+
* of supported Deserializers.
57+
* If you set StringDeserializer for the key then we use the raw key as key for the Pulsar message.
58+
* If you set another Deserializer for the key we use the KeyValue schema type in Pulsar with the SEPARATED encoding.
59+
* This way the Key is stored in the Pulsar key, encoded as base64 string and with a Schema, the Value of the message
60+
* is stored in the Pulsar value with a Schema.
61+
* This way there is a one-to-one mapping between Kafka key/value pair and the Pulsar data model.
6262
*/
6363
@Connector(
64-
name = "kafka",
65-
type = IOType.SOURCE,
66-
help = "Transfer data from Kafka to Pulsar.",
67-
configClass = KafkaSourceConfig.class
64+
name = "kafka",
65+
type = IOType.SOURCE,
66+
help = "Transfer data from Kafka to Pulsar.",
67+
configClass = KafkaSourceConfig.class
6868
)
6969
@Slf4j
7070
public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
@@ -78,15 +78,15 @@ public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
7878
protected Properties beforeCreateConsumer(Properties props) {
7979
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
8080
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
81-
log.info("Created kafka consumer config : {}", props);
81+
log.info("Created kafka consumer on : {}", props.getOrDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ""));
8282

8383
keySchema = getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
8484
props, true);
8585
valueSchema = getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
8686
props, false);
8787

8888
boolean needsSchemaCache = keySchema == DeferredSchemaPlaceholder.INSTANCE
89-
|| valueSchema == DeferredSchemaPlaceholder.INSTANCE;
89+
|| valueSchema == DeferredSchemaPlaceholder.INSTANCE;
9090

9191
if (needsSchemaCache) {
9292
initSchemaCache(props);
@@ -174,8 +174,8 @@ private static Schema<ByteBuffer> getSchemaFromDeserializerAndAdaptConfiguration
174174

175175
Schema<?> result;
176176
if (ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass)
177-
|| ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass)
178-
|| BytesDeserializer.class.getName().equals(kafkaDeserializerClass)) {
177+
|| ByteBufferDeserializer.class.getName().equals(kafkaDeserializerClass)
178+
|| BytesDeserializer.class.getName().equals(kafkaDeserializerClass)) {
179179
result = Schema.BYTEBUFFER;
180180
} else if (StringDeserializer.class.getName().equals(kafkaDeserializerClass)) {
181181
if (isKey) {
@@ -193,7 +193,7 @@ private static Schema<ByteBuffer> getSchemaFromDeserializerAndAdaptConfiguration
193193
result = Schema.INT64;
194194
} else if (ShortDeserializer.class.getName().equals(kafkaDeserializerClass)) {
195195
result = Schema.INT16;
196-
} else if (KafkaAvroDeserializer.class.getName().equals(kafkaDeserializerClass)){
196+
} else if (KafkaAvroDeserializer.class.getName().equals(kafkaDeserializerClass)) {
197197
// in this case we have to inject our custom deserializer
198198
// that extracts Avro schema information
199199
props.put(key, ExtractKafkaAvroSchemaDeserializer.class.getName());
@@ -238,7 +238,7 @@ public BytesWithKafkaSchema deserialize(String topic, byte[] payload) {
238238
}
239239
}
240240

241-
static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
241+
static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
242242
DeferredSchemaPlaceholder() {
243243
super(SchemaInfoImpl
244244
.builder()
@@ -247,6 +247,7 @@ static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
247247
.schema(new byte[0])
248248
.build());
249249
}
250+
250251
static final DeferredSchemaPlaceholder INSTANCE = new DeferredSchemaPlaceholder();
251252
}
252253

0 commit comments

Comments
 (0)