Skip to content

Commit a09ead0

Browse files
committed
fix kafka plugin
Signed-off-by: Yupeng Fu <[email protected]>
1 parent f267858 commit a09ead0

File tree

2 files changed

+5
-14
lines changed

2 files changed

+5
-14
lines changed

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,25 +125,17 @@ public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
125125
long maxMessages,
126126
int timeoutMillis
127127
) throws TimeoutException {
128-
List<ReadResult<KafkaOffset, KafkaMessage>> records =
129-
AccessController.doPrivileged((PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(offset.getOffset(),
128+
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
129+
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
130+
offset.getOffset(),
130131
includeStart,
131132
maxMessages,
132133
timeoutMillis
133-
));
134+
)
135+
);
134136
return records;
135137
}
136138

137-
@Override
138-
public KafkaOffset nextPointer() {
139-
return new KafkaOffset(lastFetchedOffset + 1);
140-
}
141-
142-
@Override
143-
public KafkaOffset nextPointer(KafkaOffset pointer) {
144-
return new KafkaOffset(pointer.getOffset() + 1);
145-
}
146-
147139
@Override
148140
public IngestionShardPointer earliestPointer() {
149141
long startOffset = AccessController.doPrivileged(

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public void testReadNext() throws Exception {
6565

6666
assertEquals(1, result.size());
6767
assertEquals("message", new String(result.get(0).getMessage().getPayload(), StandardCharsets.UTF_8));
68-
assertEquals(1, consumer.nextPointer().getOffset());
6968
assertEquals(0, consumer.getShardId());
7069
assertEquals("client1", consumer.getClientId());
7170
}

0 commit comments

Comments
 (0)