Skip to content

Commit 1cee660

Browse files
authored
feat: handle ResendBlock block node responses (#18747)
Signed-off-by: Petar Tonev <[email protected]>
1 parent df28487 commit 1cee660

File tree

3 files changed

+174
-8
lines changed

3 files changed

+174
-8
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

+38-5
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,42 @@ private void handleEndOfStream(@NonNull PublishStreamResponse.EndOfStream endOfS
309309
endStreamAndRestartAtBlock(restartBlockNumber);
310310
}
311311

312+
private void handleResendBlock(@NonNull PublishStreamResponse.ResendBlock resendBlock) {
313+
final var resendBlockNumber = resendBlock.blockNumber();
314+
315+
logger.debug(
316+
"[{}] Received ResendBlock from block node {} for block {}",
317+
Thread.currentThread().getName(),
318+
connectionDescriptor,
319+
resendBlockNumber);
320+
321+
if (blockNodeConnectionManager.isBlockAlreadyAcknowledged(resendBlockNumber)) {
322+
logger.debug(
323+
"[{}] Block {} already acknowledged, skipping resend for block node {}",
324+
Thread.currentThread().getName(),
325+
resendBlockNumber,
326+
connectionDescriptor);
327+
return;
328+
}
329+
330+
final var lastVerifiedBlockNumber = blockNodeConnectionManager.getLastVerifiedBlock(blockNodeConfig);
331+
// Check whether the resend block number is the next block after the last verified one
332+
if (resendBlockNumber == lastVerifiedBlockNumber + 1L) {
333+
logger.debug(
334+
"[{}] Restarting stream at the next block {} after the last verified one for block node {}",
335+
Thread.currentThread().getName(),
336+
resendBlockNumber,
337+
connectionDescriptor);
338+
endStreamAndRestartAtBlock(resendBlockNumber);
339+
} else {
340+
logger.warn(
341+
"[{}] Received ResendBlock for block {} but last verified block is {}",
342+
Thread.currentThread().getName(),
343+
resendBlockNumber,
344+
lastVerifiedBlockNumber);
345+
}
346+
}
347+
312348
private void removeFromActiveConnections(BlockNodeConfig node) {
313349
blockNodeConnectionManager.disconnectFromNode(node);
314350
}
@@ -506,14 +542,11 @@ public void onNext(@NonNull PublishStreamResponse response) {
506542
handleEndOfStream(response.endStream());
507543
} else if (response.hasSkipBlock()) {
508544
logger.debug(
509-
"Received SkipBlock from Block Node {} Block #{}",
545+
"Received SkipBlock from block node {} Block #{}",
510546
connectionDescriptor,
511547
response.skipBlock().blockNumber());
512548
} else if (response.hasResendBlock()) {
513-
logger.debug(
514-
"Received ResendBlock from Block Node {} Block #{}",
515-
connectionDescriptor,
516-
response.resendBlock().blockNumber());
549+
handleResendBlock(response.resendBlock());
517550
}
518551
}
519552

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public void updateLastVerifiedBlock(
315315
@NonNull final BlockNodeConfig blockNodeConfig, @Nullable final Long blockNumber) {
316316
requireNonNull(blockNodeConfig);
317317

318-
final Long latestBlock = lastVerifiedBlockPerConnection.computeIfAbsent(blockNodeConfig, key -> -1L);
318+
final Long latestBlock = getLastVerifiedBlock(blockNodeConfig);
319319
if (blockNumber != null && blockNumber > latestBlock) {
320320
lastVerifiedBlockPerConnection.put(blockNodeConfig, blockNumber);
321321
} else {
@@ -326,4 +326,23 @@ public void updateLastVerifiedBlock(
326326
latestBlock);
327327
}
328328
}
329+
330+
/**
331+
* @param blockNodeConfig the configuration for the block node
332+
* @return the last verified block number by the given block node.
333+
*/
334+
public Long getLastVerifiedBlock(@NonNull final BlockNodeConfig blockNodeConfig) {
335+
requireNonNull(blockNodeConfig);
336+
return lastVerifiedBlockPerConnection.computeIfAbsent(blockNodeConfig, key -> -1L);
337+
}
338+
339+
/**
340+
* @param blockNumber the block number to check for acknowledgements
341+
* @return whether the block has been acknowledged by any connection.
342+
*/
343+
public boolean isBlockAlreadyAcknowledged(@NonNull final Long blockNumber) {
344+
requireNonNull(blockNumber);
345+
return lastVerifiedBlockPerConnection.values().stream()
346+
.anyMatch(verifiedBlock -> verifiedBlock.equals(blockNumber));
347+
}
329348
}

hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java

+116-2
Original file line numberDiff line numberDiff line change
@@ -826,16 +826,130 @@ void testOnNext_WithEndOfStream() {
826826
connectionSpy.onNext(response);
827827

828828
// Assert connection restarts after the last verified block number
829-
verify(connectionSpy).endStreamAndRestartAtBlock(endOfStream.blockNumber() + 1L);
829+
verify(connectionSpy, times(1)).endStreamAndRestartAtBlock(endOfStream.blockNumber() + 1L);
830+
verify(connectionSpy, times(1)).close();
831+
verify(connectionSpy, times(1)).setCurrentBlockNumber(endOfStream.blockNumber() + 1L);
832+
verify(connectionSpy, times(1)).establishStream();
833+
834+
assertEquals(endOfStream.blockNumber() + 1L, connection.getCurrentBlockNumber());
835+
assertEquals(0, connection.getCurrentRequestIndex());
830836

831837
// Verify log messages for end of stream
832-
final String expectedLog = "Received EndOfStream from block node";
838+
final String expectedLog =
839+
"Received EndOfStream from block node " + TEST_CONNECTION_DESCRIPTOR + " at block " + TEST_BLOCK_NUMBER;
833840
assertTrue(
834841
logCaptor.debugLogs().stream().anyMatch(log -> log.contains(expectedLog))
835842
|| logCaptor.errorLogs().stream().anyMatch(log -> log.contains(expectedLog)),
836843
"Expected log message not found: " + expectedLog);
837844
}
838845

846+
/**
847+
* Tests the onNext method handling a PublishStreamResponse
848+
* with a ResendBlock for the next block after the last verified one.
849+
*/
850+
@Test
851+
void testOnNext_WithResendBlock() {
852+
// Arrange
853+
final BlockNodeConnection connectionSpy = spy(connection);
854+
final PublishStreamResponse.ResendBlock resendBlock = PublishStreamResponse.ResendBlock.newBuilder()
855+
.blockNumber(TEST_BLOCK_NUMBER)
856+
.build();
857+
final PublishStreamResponse response =
858+
PublishStreamResponse.newBuilder().resendBlock(resendBlock).build();
859+
860+
when(connectionManager.getLastVerifiedBlock(blockNodeConfig)).thenReturn(TEST_BLOCK_NUMBER - 1L);
861+
// Act
862+
connectionSpy.onNext(response);
863+
864+
// Assert connection restarts after the last verified block number
865+
verify(connectionSpy, times(1)).endStreamAndRestartAtBlock(resendBlock.blockNumber());
866+
verify(connectionSpy, times(1)).close();
867+
verify(connectionSpy, times(1)).setCurrentBlockNumber(resendBlock.blockNumber());
868+
verify(connectionSpy, times(1)).establishStream();
869+
870+
assertEquals(resendBlock.blockNumber(), connection.getCurrentBlockNumber());
871+
assertEquals(0, connection.getCurrentRequestIndex());
872+
873+
// Verify log messages for resend block
874+
final String expectedLog = "Received ResendBlock from block node " + TEST_CONNECTION_DESCRIPTOR + " for block "
875+
+ TEST_BLOCK_NUMBER;
876+
assertTrue(
877+
logCaptor.debugLogs().stream().anyMatch(log -> log.contains(expectedLog)),
878+
"Expected log message not found: " + expectedLog);
879+
final String expectedLogForCorrectResendBlock = "Restarting stream at the next block " + TEST_BLOCK_NUMBER
880+
+ " after the last verified one for block node " + TEST_CONNECTION_DESCRIPTOR;
881+
assertTrue(
882+
logCaptor.debugLogs().stream().anyMatch(log -> log.contains(expectedLogForCorrectResendBlock)),
883+
"Expected log message not found: " + expectedLogForCorrectResendBlock);
884+
}
885+
886+
/**
887+
* Tests the onNext method handling a PublishStreamResponse
888+
* with a ResendBlock for the next block after the last verified one.
889+
*/
890+
@Test
891+
void testOnNext_WithResendBlockDifferentThanExpected() {
892+
final var lastVerifiedBlockNumber = TEST_BLOCK_NUMBER * 2L;
893+
894+
// Arrange
895+
final BlockNodeConnection connectionSpy = spy(connection);
896+
final PublishStreamResponse.ResendBlock resendBlock = PublishStreamResponse.ResendBlock.newBuilder()
897+
.blockNumber(TEST_BLOCK_NUMBER)
898+
.build();
899+
final PublishStreamResponse response =
900+
PublishStreamResponse.newBuilder().resendBlock(resendBlock).build();
901+
902+
when(connectionManager.getLastVerifiedBlock(blockNodeConfig)).thenReturn(lastVerifiedBlockNumber);
903+
904+
// Act
905+
connectionSpy.onNext(response);
906+
907+
// Verify log messages for resend block
908+
final String expectedLog = "Received ResendBlock from block node " + TEST_CONNECTION_DESCRIPTOR + " for block "
909+
+ TEST_BLOCK_NUMBER;
910+
assertTrue(
911+
logCaptor.debugLogs().stream().anyMatch(log -> log.contains(expectedLog)),
912+
"Expected log message not found: " + expectedLog);
913+
final String expectedLogForDifferentResendBlock = "Received ResendBlock for block " + TEST_BLOCK_NUMBER
914+
+ " but last verified block is " + lastVerifiedBlockNumber;
915+
assertTrue(
916+
logCaptor.warnLogs().stream().anyMatch(log -> log.contains(expectedLogForDifferentResendBlock)),
917+
"Expected log message not found: " + expectedLogForDifferentResendBlock);
918+
}
919+
920+
/**
921+
* Tests the onNext method handling a PublishStreamResponse
922+
* with a ResendBlock for already acknowledged block.
923+
*/
924+
@Test
925+
void testOnNext_WithResendBlockForAlreadyAcknowledgedBlock() {
926+
// Arrange
927+
final BlockNodeConnection connectionSpy = spy(connection);
928+
final PublishStreamResponse.ResendBlock resendBlock = PublishStreamResponse.ResendBlock.newBuilder()
929+
.blockNumber(TEST_BLOCK_NUMBER)
930+
.build();
931+
final PublishStreamResponse response =
932+
PublishStreamResponse.newBuilder().resendBlock(resendBlock).build();
933+
934+
when(connectionManager.isBlockAlreadyAcknowledged(TEST_BLOCK_NUMBER)).thenReturn(true);
935+
936+
// Act
937+
connectionSpy.onNext(response);
938+
939+
// Verify log messages for resend block
940+
final String expectedLog = "Received ResendBlock from block node " + TEST_CONNECTION_DESCRIPTOR + " for block "
941+
+ TEST_BLOCK_NUMBER;
942+
assertTrue(
943+
logCaptor.debugLogs().stream().anyMatch(log -> log.contains(expectedLog)),
944+
"Expected log message not found: " + expectedLog);
945+
final String expectedLogForResendBlockAlreadyAcknowledged = "Block " + TEST_BLOCK_NUMBER
946+
+ " already acknowledged, skipping resend for block node " + TEST_CONNECTION_DESCRIPTOR;
947+
assertTrue(
948+
logCaptor.debugLogs().stream()
949+
.anyMatch(log -> log.contains(expectedLogForResendBlockAlreadyAcknowledged)),
950+
"Expected log message not found: " + expectedLogForResendBlockAlreadyAcknowledged);
951+
}
952+
839953
/**
840954
* Tests the onError method when an error is received from the stream.
841955
*/

0 commit comments

Comments
 (0)