Skip to content

Commit 95ea39f

Browse files
authored
fix: 11507 Enabled logging exceptions processed by StandardWorkGroup into System.err. (#12262)
Signed-off-by: Ivan Malygin <[email protected]>
1 parent 653f57f commit 95ea39f

File tree

7 files changed

+150
-66
lines changed

7 files changed

+150
-66
lines changed

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/LearningSynchronizer.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private <T> MerkleNode receiveTree(final MerkleNode root) throws InterruptedExce
268268
return false;
269269
};
270270
final StandardWorkGroup workGroup =
271-
new StandardWorkGroup(threadManager, WORK_GROUP_NAME, breakConnection, reconnectExceptionListener);
271+
createStandardWorkGroup(threadManager, breakConnection, reconnectExceptionListener);
272272

273273
final LearnerTreeView<T> view;
274274
if (root == null || !root.hasCustomReconnectView()) {
@@ -329,6 +329,13 @@ private <T> MerkleNode receiveTree(final MerkleNode root) throws InterruptedExce
329329
return view.getMerkleRoot(reconstructedRoot.get());
330330
}
331331

332+
protected StandardWorkGroup createStandardWorkGroup(
333+
ThreadManager threadManager,
334+
Runnable breakConnection,
335+
Function<Throwable, Boolean> reconnectExceptionListener) {
336+
return new StandardWorkGroup(threadManager, WORK_GROUP_NAME, breakConnection, reconnectExceptionListener);
337+
}
338+
332339
/**
333340
* Build the output stream. Exposed to allow unit tests to override implementation to simulate latency.
334341
*/

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java

+25-21
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Objects;
4040
import java.util.Queue;
4141
import java.util.concurrent.atomic.AtomicReference;
42+
import java.util.function.Function;
4243
import org.apache.logging.log4j.LogManager;
4344
import org.apache.logging.log4j.Logger;
4445

@@ -151,28 +152,26 @@ private <T> void sendTree(final MerkleNode root, final TeacherTreeView<T> view)
151152

152153
final AtomicReference<Throwable> firstReconnectException = new AtomicReference<>();
153154
// A future improvement might be to reuse threads between subtrees.
154-
final StandardWorkGroup workGroup =
155-
new StandardWorkGroup(threadManager, WORK_GROUP_NAME, breakConnection, ex -> {
156-
Throwable cause = ex;
157-
while (cause != null) {
158-
if (cause instanceof SocketException socketEx) {
159-
if (socketEx.getMessage().equalsIgnoreCase("Connection reset by peer")) {
160-
// Connection issues during reconnects are expected and recoverable, just
161-
// log them as info. All other exceptions should be treated as real errors
162-
logger.info(
163-
RECONNECT.getMarker(),
164-
"Connection reset while sending tree at {} with route {}. Aborting",
165-
root == null ? null : root.getClass().getName(),
166-
root == null ? "[]" : root.getRoute());
167-
return true;
168-
}
169-
}
170-
cause = cause.getCause();
155+
final StandardWorkGroup workGroup = createStandardWorkGroup(threadManager, breakConnection, cause -> {
156+
while (cause != null) {
157+
if (cause instanceof SocketException socketEx) {
158+
if (socketEx.getMessage().equalsIgnoreCase("Connection reset by peer")) {
159+
// Connection issues during reconnects are expected and recoverable, just
160+
// log them as info. All other exceptions should be treated as real errors
161+
logger.info(
162+
RECONNECT.getMarker(),
163+
"Connection reset while sending tree at {} with route {}. Aborting",
164+
root == null ? null : root.getClass().getName(),
165+
root == null ? "[]" : root.getRoute());
166+
return true;
171167
}
172-
firstReconnectException.compareAndSet(null, ex);
173-
// Let StandardWorkGroup log it as an error using the EXCEPTION marker
174-
return false;
175-
});
168+
}
169+
cause = cause.getCause();
170+
}
171+
firstReconnectException.compareAndSet(null, cause);
172+
// Let StandardWorkGroup log it as an error using the EXCEPTION marker
173+
return false;
174+
});
176175

177176
view.startTeacherTasks(this, time, workGroup, inputStream, outputStream, subtrees);
178177

@@ -191,6 +190,11 @@ private <T> void sendTree(final MerkleNode root, final TeacherTreeView<T> view)
191190
logger.info(RECONNECT.getMarker(), "finished sending tree");
192191
}
193192

193+
protected StandardWorkGroup createStandardWorkGroup(
194+
ThreadManager threadManager, Runnable breakConnection, Function<Throwable, Boolean> exceptionListener) {
195+
return new StandardWorkGroup(threadManager, WORK_GROUP_NAME, breakConnection, exceptionListener);
196+
}
197+
194198
/**
195199
* Build the output stream. Exposed to allow unit tests to override implementation to simulate latency.
196200
*/

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/threading/pool/StandardWorkGroup.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class StandardWorkGroup {
4545
private static final String DEFAULT_TASK_NAME = "IDLE";
4646

4747
private final String groupName;
48+
private final boolean logExceptionsToStdErr;
4849
private final ExecutorService executorService;
4950

5051
private final ConcurrentFuturePool<Void> futures;
@@ -56,7 +57,7 @@ public class StandardWorkGroup {
5657
private final Function<Throwable, Boolean> exceptionListener;
5758

5859
public StandardWorkGroup(final ThreadManager threadManager, final String groupName, final Runnable abortAction) {
59-
this(threadManager, groupName, abortAction, null);
60+
this(threadManager, groupName, abortAction, null, false);
6061
}
6162

6263
/**
@@ -83,7 +84,17 @@ public StandardWorkGroup(
8384
final String groupName,
8485
final Runnable abortAction,
8586
final Function<Throwable, Boolean> exceptionListener) {
87+
this(threadManager, groupName, abortAction, exceptionListener, false);
88+
}
89+
90+
public StandardWorkGroup(
91+
final ThreadManager threadManager,
92+
final String groupName,
93+
final Runnable abortAction,
94+
final Function<Throwable, Boolean> exceptionListener,
95+
final boolean logExceptionsToStdErr) {
8696
this.groupName = groupName;
97+
this.logExceptionsToStdErr = logExceptionsToStdErr;
8798
this.futures = new ConcurrentFuturePool<>(this::handleError);
8899

89100
this.abortAction = abortAction;
@@ -177,6 +188,10 @@ public void handleError(final Throwable ex) {
177188
}
178189
if (!exceptionHandled) {
179190
logger.error(EXCEPTION.getMarker(), "Work Group Exception [ groupName = {} ]", groupName, ex);
191+
// Log to stderr for testing purposes
192+
if (logExceptionsToStdErr) {
193+
ex.printStackTrace(System.err);
194+
}
180195
}
181196

182197
hasExceptions = true;

platform-sdk/swirlds-common/src/test/java/com/swirlds/common/threading/pool/StandardWorkGroupTest.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,15 @@ void executeTasksWithName() throws InterruptedException {
158158
@Test
159159
void exceptionsPropagatedToListener() throws InterruptedException {
160160
final AtomicReference<Throwable> caught = new AtomicReference<>();
161-
subject = new StandardWorkGroup(getStaticThreadManager(), "groupName", abortCount::incrementAndGet, ex -> {
162-
caught.set(ex);
163-
return true;
164-
});
161+
subject = new StandardWorkGroup(
162+
getStaticThreadManager(),
163+
"groupName",
164+
abortCount::incrementAndGet,
165+
ex -> {
166+
caught.set(ex);
167+
return true;
168+
},
169+
true);
165170

166171
final String exceptionMessage = "ExceptionMessage";
167172
subject.execute("task", () -> {

platform-sdk/swirlds-common/src/testFixtures/java/com/swirlds/common/test/fixtures/merkle/util/MerkleTestUtils.java

+91-33
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.swirlds.common.test.fixtures.merkle.dummy.DummyMerkleLeaf2;
4444
import com.swirlds.common.test.fixtures.merkle.dummy.DummyMerkleNode;
4545
import com.swirlds.common.test.fixtures.platform.TestPlatformContextBuilder;
46+
import com.swirlds.common.threading.manager.ThreadManager;
4647
import com.swirlds.common.threading.pool.StandardWorkGroup;
4748
import java.io.ByteArrayOutputStream;
4849
import java.io.IOException;
@@ -990,51 +991,108 @@ public static <T extends MerkleNode> T testSynchronization(
990991
final TeachingSynchronizer teacher;
991992

992993
if (latencyMilliseconds == 0) {
993-
learner = new LearningSynchronizer(
994-
getStaticThreadManager(),
995-
streams.getLearnerInput(),
996-
streams.getLearnerOutput(),
997-
startingTree,
998-
streams::disconnect,
999-
reconnectConfig);
994+
learner =
995+
new LearningSynchronizer(
996+
getStaticThreadManager(),
997+
streams.getLearnerInput(),
998+
streams.getLearnerOutput(),
999+
startingTree,
1000+
streams::disconnect,
1001+
reconnectConfig) {
1002+
1003+
@Override
1004+
protected StandardWorkGroup createStandardWorkGroup(
1005+
ThreadManager threadManager,
1006+
Runnable breakConnection,
1007+
Function<Throwable, Boolean> reconnectExceptionListener) {
1008+
return new StandardWorkGroup(
1009+
threadManager,
1010+
"test-learning-synchronizer",
1011+
breakConnection,
1012+
reconnectExceptionListener,
1013+
true);
1014+
}
1015+
};
10001016
final PlatformContext platformContext =
10011017
TestPlatformContextBuilder.create().build();
1002-
teacher = new TeachingSynchronizer(
1003-
platformContext.getConfiguration(),
1004-
Time.getCurrent(),
1005-
getStaticThreadManager(),
1006-
streams.getTeacherInput(),
1007-
streams.getTeacherOutput(),
1008-
desiredTree,
1009-
streams::disconnect,
1010-
reconnectConfig);
1018+
teacher =
1019+
new TeachingSynchronizer(
1020+
platformContext.getConfiguration(),
1021+
Time.getCurrent(),
1022+
getStaticThreadManager(),
1023+
streams.getTeacherInput(),
1024+
streams.getTeacherOutput(),
1025+
desiredTree,
1026+
streams::disconnect,
1027+
reconnectConfig) {
1028+
@Override
1029+
protected StandardWorkGroup createStandardWorkGroup(
1030+
ThreadManager threadManager,
1031+
Runnable breakConnection,
1032+
Function<Throwable, Boolean> exceptionListener) {
1033+
return new StandardWorkGroup(
1034+
threadManager,
1035+
"test-teaching-synchronizer",
1036+
breakConnection,
1037+
exceptionListener,
1038+
true);
1039+
}
1040+
};
10111041
} else {
1012-
learner = new LaggingLearningSynchronizer(
1013-
streams.getLearnerInput(),
1014-
streams.getLearnerOutput(),
1015-
startingTree,
1016-
latencyMilliseconds,
1017-
streams::disconnect,
1018-
reconnectConfig);
1042+
learner =
1043+
new LaggingLearningSynchronizer(
1044+
streams.getLearnerInput(),
1045+
streams.getLearnerOutput(),
1046+
startingTree,
1047+
latencyMilliseconds,
1048+
streams::disconnect,
1049+
reconnectConfig) {
1050+
@Override
1051+
protected StandardWorkGroup createStandardWorkGroup(
1052+
ThreadManager threadManager,
1053+
Runnable breakConnection,
1054+
Function<Throwable, Boolean> reconnectExceptionListener) {
1055+
return new StandardWorkGroup(
1056+
threadManager,
1057+
"test-learning-synchronizer",
1058+
breakConnection,
1059+
reconnectExceptionListener,
1060+
true);
1061+
}
1062+
};
10191063
final PlatformContext platformContext =
10201064
TestPlatformContextBuilder.create().build();
1021-
teacher = new LaggingTeachingSynchronizer(
1022-
platformContext,
1023-
streams.getTeacherInput(),
1024-
streams.getTeacherOutput(),
1025-
desiredTree,
1026-
latencyMilliseconds,
1027-
streams::disconnect,
1028-
reconnectConfig);
1065+
teacher =
1066+
new LaggingTeachingSynchronizer(
1067+
platformContext,
1068+
streams.getTeacherInput(),
1069+
streams.getTeacherOutput(),
1070+
desiredTree,
1071+
latencyMilliseconds,
1072+
streams::disconnect,
1073+
reconnectConfig) {
1074+
@Override
1075+
protected StandardWorkGroup createStandardWorkGroup(
1076+
ThreadManager threadManager,
1077+
Runnable breakConnection,
1078+
Function<Throwable, Boolean> reconnectExceptionListener) {
1079+
return new StandardWorkGroup(
1080+
threadManager,
1081+
"test-teaching-synchronizer",
1082+
breakConnection,
1083+
reconnectExceptionListener,
1084+
true);
1085+
}
1086+
};
10291087
}
10301088

10311089
final AtomicReference<Throwable> firstReconnectException = new AtomicReference<>();
10321090
final Function<Throwable, Boolean> exceptionListener = t -> {
10331091
firstReconnectException.compareAndSet(null, t);
10341092
return false;
10351093
};
1036-
final StandardWorkGroup workGroup =
1037-
new StandardWorkGroup(getStaticThreadManager(), "synchronization-test", null, exceptionListener);
1094+
final StandardWorkGroup workGroup = new StandardWorkGroup(
1095+
getStaticThreadManager(), "synchronization-test", null, exceptionListener, true);
10381096
workGroup.execute("teaching-synchronizer-main", () -> teachingSynchronizerThread(teacher));
10391097
workGroup.execute("learning-synchronizer-main", () -> learningSynchronizerThread(learner));
10401098

platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/VirtualMapLargeReconnectTest.java

-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.List;
2727
import java.util.Random;
2828
import java.util.stream.Stream;
29-
import org.junit.jupiter.api.Disabled;
3029
import org.junit.jupiter.api.DisplayName;
3130
import org.junit.jupiter.api.Tag;
3231
import org.junit.jupiter.api.Tags;
@@ -44,8 +43,6 @@ class VirtualMapLargeReconnectTest extends VirtualMapReconnectTestBase {
4443
@Tags({@Tag("VirtualMerkle"), @Tag("Reconnect"), @Tag("VMAP-003"), @Tag("VMAP-003.14")})
4544
@Tag(TIME_CONSUMING)
4645
@DisplayName("Permutations of very large trees reconnecting")
47-
// FUTURE WORK: https://github.com/hashgraph/hedera-services/issues/11507
48-
@Disabled
4946
void largeTeacherLargerLearnerPermutations(int teacherStart, int teacherEnd, int learnerStart, int learnerEnd) {
5047

5148
for (int i = teacherStart; i < teacherEnd; i++) {
@@ -64,8 +61,6 @@ void largeTeacherLargerLearnerPermutations(int teacherStart, int teacherEnd, int
6461
@Tags({@Tag("VirtualMerkle"), @Tag("Reconnect"), @Tag("VMAP-005"), @Tag("VMAP-006")})
6562
@Tag(TIME_CONSUMING)
6663
@DisplayName("Reconnect aborts 3 times before success")
67-
// FUTURE WORK: https://github.com/hashgraph/hedera-services/issues/11507
68-
@Disabled
6964
void multipleAbortedReconnectsCanSucceed(int teacherStart, int teacherEnd, int learnerStart, int learnerEnd) {
7065
for (int i = teacherStart; i < teacherEnd; i++) {
7166
teacherMap.put(new TestKey(i), new TestValue(i));

platform-sdk/swirlds-merkle/src/test/java/com/swirlds/virtual/merkle/reconnect/VirtualMapReconnectTestBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public class VirtualMapReconnectTestBase {
8585

8686
// Custom reconnect config to make tests with timeouts faster
8787
protected static ReconnectConfig reconnectConfig = new TestConfigBuilder()
88-
.withValue("reconnect.asyncStreamTimeout", "5s")
88+
.withValue("reconnect.asyncStreamTimeout", "20s")
8989
.withValue("reconnect.maxAckDelay", "1000ms")
9090
.getOrCreateConfig()
9191
.getConfigData(ReconnectConfig.class);

0 commit comments

Comments
 (0)