Skip to content

Commit 529b2fe

Browse files
chore: Write marker files for sidecars (#18916)
Signed-off-by: Neeharika-Sompalli <[email protected]>
1 parent 30f623e commit 529b2fe

File tree

11 files changed

+145
-34
lines changed

11 files changed

+145
-34
lines changed

hedera-node/hapi-utils/src/main/java/com/hedera/node/app/hapi/utils/exports/recordstreaming/RecordStreamingUtils.java

+4
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ public static boolean isRecordFile(final String file) {
141141
return isRelevant(file) && !file.contains(SIDECAR_ONLY_TOKEN);
142142
}
143143

144+
public static boolean isSidecarMarkerFile(final String file) {
145+
return file.contains(SIDECAR_ONLY_TOKEN) && file.contains(".mf");
146+
}
147+
144148
public static boolean isSidecarFile(final String file) {
145149
return isRelevant(file) && file.contains(SIDECAR_ONLY_TOKEN);
146150
}

hedera-node/hapi-utils/src/test/resources/forensics/CaseOfTheAbsentResult/node0/sidecar/2022-12-05T14_23_46.192841556Z_01.mf

Whitespace-only changes.

hedera-node/hedera-app/src/main/java/com/hedera/node/app/records/impl/producers/formats/v6/BlockRecordWriterV6.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -376,12 +376,14 @@ private void closeSidecarFileWriter() {
376376
sidecarFileWriter.close();
377377
// get the sidecar hash
378378
final Bytes sidecarHash = sidecarFileWriter.fileHash();
379-
// create and add sidecar metadata to record file
379+
// create and add sidecar metadata to the record file
380380
if (sidecarMetadata == null) sidecarMetadata = new ArrayList<>();
381381
sidecarMetadata.add(new SidecarMetadata(
382382
new HashObject(HashAlgorithm.SHA_384, (int) sidecarHash.length(), sidecarHash),
383383
sidecarFileWriter.id(),
384384
sidecarFileWriter.types()));
385+
// Add a marker file for the sidecar when it is closed
386+
writeSidecarMarkerFile();
385387
}
386388
} catch (final IOException e) {
387389
// NOTE: Writing sidecar files really is best-effort, if it doesn't happen, we're OK with just logging the
@@ -390,6 +392,25 @@ private void closeSidecarFileWriter() {
390392
}
391393
}
392394

395+
/**
396+
* Write a marker file for the sidecar file.
397+
* This is used to indicate that the sidecar file has been closed and is ready to be uploaded.
398+
* The marker file is named the same as the sidecar file, but with a .mf extension.
399+
* The contents of the marker file are the hash of the sidecar file.
400+
*
401+
* @throws IOException if there was a problem writing the marker file
402+
*/
403+
private void writeSidecarMarkerFile() throws IOException {
404+
final Path sidecarPath = getSidecarFilePath(sidecarFileWriter.id());
405+
final Path markerPath =
406+
sidecarPath.resolveSibling(sidecarPath.getFileName().toString().replace(".rcd.gz", ".mf"));
407+
if (Files.exists(markerPath)) {
408+
logger.debug("Side‑car marker already exists: {}", markerPath);
409+
} else {
410+
Files.createFile(markerPath);
411+
}
412+
}
413+
393414
/**
394415
* Get the record file path for a record file with the given consensus time
395416
*

hedera-node/hedera-app/src/test/java/com/hedera/node/app/workflows/handle/record/impl/producers/formats/v6/BlockRecordWriterV6Test.java

+17
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,11 @@ void writingTest(final List<SingleTransactionRecord> singleTransactionRecords) t
360360

361361
// Check that the sidecar file exists (if the block records produced any sidecars)
362362
final var sidecarPath = recordPath.getParent().resolve("sidecar/2018-08-24T16_25_42.000000890Z_01.rcd.gz");
363+
final var sidecarMarker = recordPath.getParent().resolve("sidecar/2018-08-24T16_25_42.000000890Z_01.mf");
363364
assertThat(Files.exists(sidecarPath)).isEqualTo(hasSidecars);
365+
assertThat(Files.exists(sidecarMarker)).isEqualTo(hasSidecars);
366+
assertThat(anyMarkerFilesExist(recordPath.getParent().resolve("sidecar")))
367+
.isEqualTo(hasSidecars);
364368
}
365369

366370
@Test
@@ -413,6 +417,8 @@ void badSidecarsIsNotFatal() throws IOException {
413417

414418
assertThat(logCaptor.warnLogs()).hasSizeGreaterThan(0);
415419
assertThat(logCaptor.warnLogs()).allMatch(msg -> msg.contains("sidecar"));
420+
assertThat(anyMarkerFilesExist(recordPath.getParent().resolve("sidecar")))
421+
.isFalse();
416422
}
417423
}
418424

@@ -463,6 +469,17 @@ void cannotWriteToRecordFile() throws IOException {
463469
assertThat(logCaptor.warnLogs())
464470
.matches(logs -> logs.getFirst().contains("Error closing sidecar file")
465471
&& logs.getLast().contains("Error closing record file"));
472+
assertThat(anyMarkerFilesExist(recordPath.getParent().resolve("sidecar")))
473+
.isFalse();
474+
}
475+
}
476+
477+
private boolean anyMarkerFilesExist(Path dir) {
478+
if (!dir.getFileSystem().isOpen() || Files.notExists(dir)) return false;
479+
try (Stream<Path> paths = Files.walk(dir, 2)) {
480+
return paths.anyMatch(p -> p.toString().endsWith(".mf"));
481+
} catch (IOException e) {
482+
throw new UncheckedIOException(e);
466483
}
467484
}
468485
}

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/BlockStreamAccess.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package com.hedera.services.bdd.junit.support;
33

4+
import static com.hedera.node.app.hapi.utils.exports.recordstreaming.RecordStreamingUtils.SIDECAR_ONLY_TOKEN;
45
import static java.util.Comparator.comparing;
56

67
import com.hedera.hapi.block.stream.Block;
@@ -275,6 +276,8 @@ public static long extractBlockNumber(@NonNull final String fileName) {
275276
* @return true if the file is a block marker file, false otherwise
276277
*/
277278
public static boolean isBlockMarkerFile(@NonNull final File file) {
278-
return file.isFile() && file.getName().endsWith(".mf");
279+
return file.isFile()
280+
&& file.getName().endsWith(".mf")
281+
&& !file.getName().contains(SIDECAR_ONLY_TOKEN);
279282
}
280283
}

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/StreamFileAlterationListener.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
package com.hedera.services.bdd.junit.support;
33

44
import static com.hedera.node.app.hapi.utils.exports.recordstreaming.RecordStreamingUtils.isRecordFile;
5-
import static com.hedera.node.app.hapi.utils.exports.recordstreaming.RecordStreamingUtils.isSidecarFile;
5+
import static com.hedera.node.app.hapi.utils.exports.recordstreaming.RecordStreamingUtils.isSidecarMarkerFile;
66
import static com.hedera.services.bdd.junit.support.BlockStreamAccess.isBlockMarkerFile;
77
import static java.util.concurrent.TimeUnit.MILLISECONDS;
88

@@ -121,7 +121,14 @@ private void exposeBlock(@NonNull final File file) {
121121
}
122122

123123
private void exposeSidecars(final File file) {
124-
final var contents = StreamFileAccess.ensurePresentSidecarFile(file.getAbsolutePath());
124+
// Get Sidecar file path using marker file path
125+
final var markerPath = file.toPath();
126+
final var baseName = file.getName().replace(".mf", "");
127+
final var gzPath = markerPath.resolveSibling(baseName + ".rcd.gz");
128+
final var plainPath = markerPath.resolveSibling(baseName + ".rcd");
129+
final var sidecarPath = Files.exists(gzPath) ? gzPath : plainPath;
130+
131+
final var contents = StreamFileAccess.ensurePresentSidecarFile(sidecarPath.toString());
125132
contents.getSidecarRecordsList().forEach(sidecar -> listeners.forEach(l -> l.onNewSidecar(sidecar)));
126133
}
127134

@@ -138,12 +145,12 @@ private FileType typeOf(final File file) {
138145
// Ignore empty files, which are likely to be in the process of being written
139146
if (isBlockMarkerFile(file)) {
140147
return FileType.BLOCK_FILE;
148+
} else if (isSidecarMarkerFile(file.getName())) {
149+
return FileType.SIDE_CAR_FILE;
141150
} else if (file.length() == 0L) {
142151
return FileType.OTHER;
143152
} else if (isRecordFile(file.getName())) {
144153
return FileType.RECORD_STREAM_FILE;
145-
} else if (isSidecarFile(file.getName())) {
146-
return FileType.SIDE_CAR_FILE;
147154
} else {
148155
return FileType.OTHER;
149156
}

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/UtilVerbs.java

+37-2
Original file line numberDiff line numberDiff line change
@@ -1208,12 +1208,19 @@ public static HapiSpecOperation remembering(final Map<String, String> props, fin
12081208
/* Stream validation. */
12091209
public static EventualRecordStreamAssertion recordStreamMustIncludeNoFailuresFrom(
12101210
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertion) {
1211+
return EventualRecordStreamAssertion.eventuallyAssertingNoFailures(assertion)
1212+
.withBackgroundTraffic();
1213+
}
1214+
1215+
public static EventualRecordStreamAssertion recordStreamMustIncludeNoFailuresWithoutBackgroundTrafficFrom(
1216+
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertion) {
12111217
return EventualRecordStreamAssertion.eventuallyAssertingNoFailures(assertion);
12121218
}
12131219

12141220
public static EventualRecordStreamAssertion recordStreamMustIncludePassFrom(
12151221
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertion) {
1216-
return EventualRecordStreamAssertion.eventuallyAssertingExplicitPass(assertion);
1222+
return EventualRecordStreamAssertion.eventuallyAssertingExplicitPass(assertion)
1223+
.withBackgroundTraffic();
12171224
}
12181225

12191226
/**
@@ -1225,9 +1232,37 @@ public static EventualRecordStreamAssertion recordStreamMustIncludePassFrom(
12251232
*/
12261233
public static EventualRecordStreamAssertion recordStreamMustIncludePassFrom(
12271234
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertion, @NonNull final Duration timeout) {
1235+
return recordStreamMustIncludePassFrom(assertion, timeout, true);
1236+
}
1237+
1238+
/**
1239+
* Returns an operation that asserts that the record stream must include a pass from the given assertion
1240+
* before its timeout elapses, and that background traffic is running.
1241+
* @param assertion the assertion to apply to the record stream
1242+
* @param timeout the timeout for the assertion
1243+
* @return the operation that asserts a passing record stream
1244+
*/
1245+
public static EventualRecordStreamAssertion recordStreamMustIncludePassWithoutBackgroundTrafficFrom(
1246+
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertion, @NonNull final Duration timeout) {
1247+
return recordStreamMustIncludePassFrom(assertion, timeout, false);
1248+
}
1249+
1250+
/**
1251+
* Returns an operation that asserts that the record stream must include a pass from the given assertion
1252+
* before its timeout elapses, and if the background traffic should be running.
1253+
* @param assertion the assertion to apply to the record stream
1254+
* @param timeout the timeout for the assertion
1255+
* @param needsBackgroundTraffic whether background traffic should be running
1256+
* @return the operation that asserts a passing record stream
1257+
*/
1258+
private static EventualRecordStreamAssertion recordStreamMustIncludePassFrom(
1259+
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertion,
1260+
@NonNull final Duration timeout,
1261+
final boolean needsBackgroundTraffic) {
12281262
requireNonNull(assertion);
12291263
requireNonNull(timeout);
1230-
return EventualRecordStreamAssertion.eventuallyAssertingExplicitPass(assertion, timeout);
1264+
final var result = EventualRecordStreamAssertion.eventuallyAssertingExplicitPass(assertion, timeout);
1265+
return needsBackgroundTraffic ? result.withBackgroundTraffic() : result;
12311266
}
12321267

12331268
/**

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/streams/assertions/EventualRecordStreamAssertion.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class EventualRecordStreamAssertion extends AbstractEventualStreamAsserti
3535
@Nullable
3636
private RecordStreamAssertion assertion;
3737

38+
private boolean needsBackgroundTraffic = false;
39+
3840
/**
3941
* Returns an {@link EventualRecordStreamAssertion} that will pass as long as the given assertion does not
4042
* throw an {@link AssertionError} before its timeout.
@@ -43,7 +45,7 @@ public class EventualRecordStreamAssertion extends AbstractEventualStreamAsserti
4345
*/
4446
public static EventualRecordStreamAssertion eventuallyAssertingNoFailures(
4547
final Function<HapiSpec, RecordStreamAssertion> assertionFactory) {
46-
return new EventualRecordStreamAssertion(assertionFactory, true, false);
48+
return new EventualRecordStreamAssertion(assertionFactory, true, false).withBackgroundTraffic();
4749
}
4850

4951
/**
@@ -67,7 +69,21 @@ public static EventualRecordStreamAssertion eventuallyAssertingExplicitPassWithR
6769
@NonNull final Function<HapiSpec, RecordStreamAssertion> assertionFactory,
6870
@NonNull final Duration timeout) {
6971
requireNonNull(assertionFactory);
70-
return new EventualRecordStreamAssertion(assertionFactory, false, timeout, true);
72+
return new EventualRecordStreamAssertion(assertionFactory, false, timeout, true).withBackgroundTraffic();
73+
}
74+
75+
@Override
76+
public boolean needsBackgroundTraffic() {
77+
return needsBackgroundTraffic;
78+
}
79+
80+
/**
81+
* Returns an {@link EventualRecordStreamAssertion} enabling background traffic.
82+
* @return the eventual record stream assertion with background traffic
83+
*/
84+
public EventualRecordStreamAssertion withBackgroundTraffic() {
85+
this.needsBackgroundTraffic = true;
86+
return this;
7187
}
7288

7389
/**

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/crypto/CryptoUpdateSuite.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doingContextual;
3737
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.newKeyNamed;
3838
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.overridingTwo;
39-
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.recordStreamMustIncludePassFrom;
39+
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.recordStreamMustIncludePassWithoutBackgroundTrafficFrom;
4040
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.sourcing;
4141
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.submitModified;
4242
import static com.hedera.services.bdd.spec.utilops.UtilVerbs.validateChargedUsd;
@@ -158,7 +158,7 @@ final Stream<DynamicTest> keyRotationDoesNotChangeEvmAddress() {
158158
.toArray(String[]::new);
159159
return hapiTest(flatten(
160160
cryptoTransfer(tinyBarsFromTo(GENESIS, ADDRESS_BOOK_CONTROL, 1)),
161-
recordStreamMustIncludePassFrom(
161+
recordStreamMustIncludePassWithoutBackgroundTrafficFrom(
162162
visibleNonSyntheticItems(keyRotationsValidator(accountsToHaveKeysRotated), allTxnIds),
163163
Duration.ofSeconds(15)),
164164
// If the FileAlterationObserver just started the monitor, there's a chance we could miss the

0 commit comments

Comments
 (0)