Skip to content

Commit 6fb8119

Browse files
committed
Merge remote-tracking branch 'origin/master' into issues/753
2 parents 5539128 + c2be45f commit 6fb8119

File tree

4 files changed

+18
-0
lines changed

4 files changed

+18
-0
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.kafka.clients.consumer.ConsumerRecord;
1818
import org.apache.kafka.clients.consumer.KafkaConsumer;
1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.errors.InterruptException;
2021
import org.apache.kafka.common.utils.Bytes;
2122
import reactor.core.publisher.FluxSink;
2223

@@ -85,6 +86,9 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
8586
}
8687
sendFinishStatsAndCompleteSink(sink);
8788
log.debug("Polling finished");
89+
} catch (InterruptException kafkaInterruptException) {
90+
log.debug("Polling finished due to thread interruption");
91+
sink.complete();
8892
} catch (Exception e) {
8993
log.error("Error occurred while consuming records", e);
9094
sink.error(e);

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.kafka.clients.consumer.ConsumerRecord;
1010
import org.apache.kafka.clients.consumer.ConsumerRecords;
1111
import org.apache.kafka.clients.consumer.KafkaConsumer;
12+
import org.apache.kafka.common.errors.InterruptException;
1213
import org.apache.kafka.common.utils.Bytes;
1314
import reactor.core.publisher.FluxSink;
1415

@@ -59,6 +60,9 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
5960
}
6061
sendFinishStatsAndCompleteSink(sink);
6162
log.debug("Polling finished");
63+
} catch (InterruptException kafkaInterruptException) {
64+
log.debug("Polling finished due to thread interruption");
65+
sink.complete();
6266
} catch (Exception e) {
6367
log.error("Error occurred while consuming records", e);
6468
sink.error(e);

kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
4141
sink.complete();
4242
log.debug("Tailing finished");
4343
} catch (InterruptException kafkaInterruptException) {
44+
log.debug("Tailing finished due to thread interruption");
4445
sink.complete();
4546
} catch (Exception e) {
4647
log.error("Error consuming {}", consumerPosition, e);

kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ const Filters: React.FC<FiltersProps> = ({
219219
default:
220220
props.seekType = currentSeekType;
221221
}
222+
223+
if (offset && currentSeekType === SeekType.OFFSET) {
224+
props.seekType = SeekType.OFFSET;
225+
}
226+
227+
if (timestamp && currentSeekType === SeekType.TIMESTAMP) {
228+
props.seekType = SeekType.TIMESTAMP;
229+
}
230+
222231
props.seekTo = selectedPartitions.map(({ value }) => {
223232
const offsetProperty =
224233
seekDirection === SeekDirection.FORWARD ? 'offsetMin' : 'offsetMax';

0 commit comments

Comments
 (0)