Skip to content

Commit a569c97

Browse files
cmnbroaddroazen
authored andcommitted
Bug fix for using a ReservoirDownsampler with a ReadsDownsamplingIterator. (#5594)
* Bug fix to allow a ReservoirDownsampler to be used within a ReadsDownsamplingIterator (a combination which previously didn't work correctly) * As part of this fix, don't consider items in a ReservoirDownsampler to be finalized until signalEndOfInput() is called, and update client code to call signalEndOfInput() appropriately. Resolves #4768
1 parent 19ddf36 commit a569c97

File tree

6 files changed

+161
-26
lines changed

6 files changed

+161
-26
lines changed

src/main/java/org/broadinstitute/hellbender/utils/downsampling/MutectDownsampler.java

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ private void finalizePendingReads() {
9494
// the desired coverage, but if the region is decently mappable the shortfall will be minor.
9595
final ReservoirDownsampler wellMappedDownsampler = new ReservoirDownsampler(maxCoverage, false);
9696
pendingReads.stream().filter(read -> read.getMappingQuality() > SUSPICIOUS_MAPPING_QUALITY).forEach(wellMappedDownsampler::submit);
97+
wellMappedDownsampler.signalEndOfInput();
9798
final List<GATKRead> readsToFinalize = wellMappedDownsampler.consumeFinalizedItems();
9899
if (stride > 1) {
99100
Collections.sort(readsToFinalize, Comparator.comparingInt(GATKRead::getAssignedStart));

src/main/java/org/broadinstitute/hellbender/utils/downsampling/PositionalDownsampler.java

+31-11
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package org.broadinstitute.hellbender.utils.downsampling;
22

33
import htsjdk.samtools.SAMFileHeader;
4+
import org.broadinstitute.hellbender.exceptions.GATKException;
45
import org.broadinstitute.hellbender.utils.Utils;
56
import org.broadinstitute.hellbender.utils.read.GATKRead;
67
import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator;
78
import org.broadinstitute.hellbender.utils.read.ReadUtils;
89

910
import java.util.ArrayList;
1011
import java.util.List;
12+
import java.util.Optional;
1113

1214

1315
/**
@@ -71,16 +73,29 @@ private void handlePositionalChange( final GATKRead newRead ) {
7173
// Use ReadCoordinateComparator to determine whether we've moved to a new start position.
7274
// ReadCoordinateComparator will correctly distinguish between purely unmapped reads and unmapped reads that
7375
// are assigned a nominal position.
74-
if ( previousRead != null && ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header) != 0 ) {
75-
if ( reservoir.hasFinalizedItems() ) {
76-
finalizeReservoir();
76+
if ( previousRead != null) {
77+
final int cmpDiff = ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header);
78+
if (cmpDiff == 1) {
79+
throw new IllegalStateException(
80+
String.format("Reads must be coordinate sorted (earlier %s later %s)", previousRead, newRead));
81+
} else if (cmpDiff != 0) {
82+
finalizeReservoir(true);
7783
}
7884
}
7985
}
8086

81-
private void finalizeReservoir() {
87+
private void finalizeReservoir(final boolean expectFinalizedItems) {
88+
// We can't consume finalized reads from the reservoir unless we first signal EOI.
89+
// Once signalEndOfInput has been called and propagated to the ReservoirDownsampler, consumeFinalizedItems
90+
// must be called on the ReservoirDownsampler before any new items can be submitted to it, to reset its
91+
// state so it can be recycled/reused for the next downsampling position.
92+
reservoir.signalEndOfInput();
93+
if (expectFinalizedItems && ! reservoir.hasFinalizedItems() ) {
94+
throw new GATKException.ShouldNeverReachHereException("Expected downsampled items to be present when none are");
95+
}
8296
finalizedReads.addAll(reservoir.consumeFinalizedItems());
8397
reservoir.resetStats();
98+
previousRead = null;
8499
}
85100

86101
@Override
@@ -97,9 +112,11 @@ public List<GATKRead> consumeFinalizedItems() {
97112

98113
@Override
99114
public boolean hasPendingItems() {
100-
// The finalized items in the ReservoirDownsampler are pending items from the perspective of the
101-
// enclosing PositionalDownsampler
102-
return reservoir.hasFinalizedItems();
115+
// The ReservoirDownsampler accumulates pending items until signalEndOfInput has been called, at which
116+
// point all items that have survived downsampling become finalized. From the perspective of the enclosing
117+
// PositionalDownsampler, both finalized items and pending items in the ReservoirDownsampler are considered
118+
// pending.
119+
return reservoir.hasFinalizedItems() || reservoir.hasPendingItems();
103120
}
104121

105122
@Override
@@ -109,9 +126,11 @@ public GATKRead peekFinalized() {
109126

110127
@Override
111128
public GATKRead peekPending() {
112-
// The finalized items in the ReservoirDownsampler are pending items from the perspective of the
113-
// enclosing PositionalDownsampler
114-
return reservoir.peekFinalized();
129+
// The ReservoirDownsampler accumulates pending items until signalEndOfInput has been called, at which
130+
// point all items that have survived downsampling become finalized. From the perspective of the enclosing
131+
// PositionalDownsampler, both finalized items and pending items in the ReservoirDownsampler are considered
132+
// pending.
133+
return Optional.ofNullable(reservoir.peekFinalized()).orElse(reservoir.peekPending());
115134
}
116135

117136
@Override
@@ -121,7 +140,7 @@ public int size() {
121140

122141
@Override
123142
public void signalEndOfInput() {
124-
finalizeReservoir();
143+
finalizeReservoir(false);
125144
}
126145

127146
@Override
@@ -139,6 +158,7 @@ public boolean requiresCoordinateSortOrder() {
139158

140159
@Override
141160
public void signalNoMoreReadsBefore( final GATKRead read ) {
161+
Utils.nonNull(read, "Positional downsampler requires non-null reads");
142162
handlePositionalChange(read);
143163
}
144164
}

src/main/java/org/broadinstitute/hellbender/utils/downsampling/ReservoirDownsampler.java

+33-6
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ public final class ReservoirDownsampler extends ReadsDownsampler {
5151
*/
5252
private int totalReadsSeen;
5353

54+
/**
55+
* In order to guarantee that all reads have equal probability of being discarded, we need to have consumed the
56+
* entire input stream before any items can become finalized. All submitted items (that survive downsampling)
57+
* remain pending until endOfInputStream is called, at which point they become finalized.
58+
*/
59+
private boolean endOfInputStream;
60+
5461

5562
/**
5663
* Construct a ReservoirDownsampler
@@ -88,6 +95,9 @@ public ReservoirDownsampler(final int targetSampleSize ) {
8895
@Override
8996
public void submit ( final GATKRead newRead ) {
9097
Utils.nonNull(newRead, "newRead");
98+
// Once the end of the input stream has been seen, consumeFinalizedItems or clearItems must be called to
99+
// reset the state of the ReservoirDownsampler before more items can be submitted
100+
Utils.validate(! endOfInputStream, "attempt to submit read after end of input stream has been signaled");
91101

92102
// Only count reads that are actually eligible for discarding for the purposes of the reservoir downsampling algorithm
93103
totalReadsSeen++;
@@ -110,35 +120,50 @@ public void submit ( final GATKRead newRead ) {
110120

111121
@Override
112122
public boolean hasFinalizedItems() {
113-
return ! reservoir.isEmpty();
123+
// All items in the reservoir are pending until endOfInputStream is seen, at which point all items become finalized
124+
return endOfInputStream && !reservoir.isEmpty();
114125
}
115126

116127
@Override
117128
public List<GATKRead> consumeFinalizedItems() {
129+
// This method clears state (including the end of input stream flag) when called after
130+
// end of input stream has been signaled, but has no side effects when called
131+
// before end of input stream has been signaled (since in that case, the downsampling
132+
// process is still ongoing and we shouldn't clear pending items).
133+
118134
if (hasFinalizedItems()) {
119135
// pass reservoir by reference rather than make a copy, for speed
120136
final List<GATKRead> downsampledItems = reservoir;
121137
clearItems();
122138
return downsampledItems;
139+
} else if ( ! endOfInputStream ) {
140+
// Don't call clearItems() here, since endOfInputStream is false and therefore the
141+
// downsampling process is still ongoing. We want to preserve existing pending items,
142+
// and return an empty List without side effects.
143+
return Collections.emptyList();
123144
} else {
124-
// if there's nothing here, don't bother allocating a new list
145+
// This is the case where endOfInputStream == true and our reservoir is empty. We return an empty
146+
// list, but we also call clearItems() here for consistency with the case above where we have
147+
// finalized items, so that in both cases we reset the endOfInputStream flag.
148+
clearItems();
125149
return Collections.emptyList();
126150
}
127151
}
128152

129153
@Override
130154
public boolean hasPendingItems() {
131-
return false;
155+
// All items in the reservoir are pending until endOfInputStream is seen, at which point all items become finalized
156+
return !endOfInputStream && !reservoir.isEmpty();
132157
}
133158

134159
@Override
135160
public GATKRead peekFinalized() {
136-
return reservoir.isEmpty() ? null : reservoir.get(0);
161+
return hasFinalizedItems() ? reservoir.get(0) : null;
137162
}
138163

139164
@Override
140165
public GATKRead peekPending() {
141-
return null;
166+
return hasPendingItems() ? reservoir.get(0) : null;
142167
}
143168

144169
@Override
@@ -148,7 +173,7 @@ public int size() {
148173

149174
@Override
150175
public void signalEndOfInput() {
151-
// NO-OP
176+
endOfInputStream = true;
152177
}
153178

154179
/**
@@ -164,6 +189,8 @@ public void clearItems() {
164189

165190
// an internal stat used by the downsampling process, so not cleared by resetStats() below
166191
totalReadsSeen = 0;
192+
193+
endOfInputStream = false;
167194
}
168195

169196
@Override

src/test/java/org/broadinstitute/hellbender/tools/HaplotypeCallerSparkIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void testVCFModeIsConcordantWithGATK3_8Results() throws Exception {
145145
* Test that in VCF mode we're >= 99% concordant with GATK3.8 results
146146
* THIS TEST explodes with an exception because Allele-Specific annotations are not supported in vcf mode yet.
147147
* It's included to parallel the matching (also exploding) test for the non-spark HaplotypeCaller
148-
* {@link org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest#testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations()}
148+
* {@link org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest#testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations(String, String)}}
149149
*/
150150
@Test(expectedExceptions = UserException.class)
151151
public void testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations() throws Exception {

src/test/java/org/broadinstitute/hellbender/utils/downsampling/ReadsDownsamplingIteratorUnitTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.testng.annotations.Test;
1010

1111
import java.util.*;
12+
import java.util.stream.IntStream;
1213

1314
public class ReadsDownsamplingIteratorUnitTest extends GATKBaseTest {
1415

@@ -132,6 +133,22 @@ public void testRemoveThrows() {
132133
iter.remove();
133134
}
134135

136+
@Test
137+
public void testReadsDownsamplingIteratorWithReservoirDownsampler() {
138+
final int TOTAL_READ_COUNT = 100;
139+
final int TARGET_COVERAGE = 45;
140+
final List<GATKRead> reads = new ArrayList<>(TOTAL_READ_COUNT);
141+
IntStream.range(1, TOTAL_READ_COUNT).forEach(i -> reads.add(readWithName(Integer.toString(i))));
142+
143+
final List<GATKRead> downsampledReads = new ArrayList<>();
144+
final ReadsDownsamplingIterator downsamplingIter = new ReadsDownsamplingIterator(reads.iterator(), new ReservoirDownsampler(TARGET_COVERAGE));
145+
for ( final GATKRead read : downsamplingIter ) {
146+
downsampledReads.add(read);
147+
}
148+
149+
Assert.assertEquals(downsampledReads.size(), TARGET_COVERAGE);
150+
}
151+
135152
private GATKRead readWithName( final String name ) {
136153
return ArtificialReadUtils.createArtificialRead(TextCigarCodec.decode("10M"), name);
137154
}

src/test/java/org/broadinstitute/hellbender/utils/downsampling/ReservoirDownsamplerUnitTest.java

+78-8
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,29 @@ public void testReservoirDownsampler(final ReservoirDownsamplerTest test ) {
6161
logger.warn("Running test: " + test);
6262

6363
Utils.resetRandomGenerator();
64-
6564
final ReadsDownsampler downsampler = new ReservoirDownsampler(test.reservoirSize);
66-
6765
downsampler.submit(test.createReads());
6866

67+
// after submit, but before signalEndOfInput, all reads are pending, none are finalized
6968
if ( test.totalReads > 0 ) {
70-
Assert.assertTrue(downsampler.hasFinalizedItems());
71-
Assert.assertTrue(downsampler.peekFinalized() != null);
72-
Assert.assertFalse(downsampler.hasPendingItems());
73-
Assert.assertTrue(downsampler.peekPending() == null);
69+
Assert.assertFalse(downsampler.hasFinalizedItems());
70+
Assert.assertNull(downsampler.peekFinalized());
71+
Assert.assertTrue(downsampler.hasPendingItems());
72+
Assert.assertNotNull(downsampler.peekPending());
7473
}
7574
else {
7675
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
7776
Assert.assertTrue(downsampler.peekFinalized() == null && downsampler.peekPending() == null);
7877
}
7978

79+
// after signalEndOfInput, no reads are pending, all are finalized
8080
downsampler.signalEndOfInput();
8181

8282
if ( test.totalReads > 0 ) {
8383
Assert.assertTrue(downsampler.hasFinalizedItems());
84-
Assert.assertTrue(downsampler.peekFinalized() != null);
84+
Assert.assertNotNull(downsampler.peekFinalized());
8585
Assert.assertFalse(downsampler.hasPendingItems());
86-
Assert.assertTrue(downsampler.peekPending() == null);
86+
Assert.assertNull(downsampler.peekPending());
8787
}
8888
else {
8989
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
@@ -102,6 +102,19 @@ public void testReservoirDownsampler(final ReservoirDownsamplerTest test ) {
102102

103103
downsampler.resetStats();
104104
Assert.assertEquals(downsampler.getNumberOfDiscardedItems(), 0);
105+
106+
// use the same downsampling parameters, but this time consume the reads through a
107+
// ReadsDownsamplingIterator, and validate that we get the same results as using downsampler directly
108+
Utils.resetRandomGenerator();
109+
final ReadsDownsampler downsamplerForIterator = new ReservoirDownsampler(test.reservoirSize);
110+
final ReadsDownsamplingIterator downsamplingIterator = new ReadsDownsamplingIterator(
111+
test.createReads().iterator(),
112+
downsamplerForIterator);
113+
final List<GATKRead> downsampledReadsFromIterator = new ArrayList<>(test.reservoirSize);
114+
downsamplingIterator.forEach(downsampledReadsFromIterator::add);
115+
116+
Assert.assertEquals(downsamplerForIterator.getNumberOfDiscardedItems(), test.expectedNumDiscardedItems);
117+
Assert.assertEquals(downsampledReadsFromIterator, downsampledReads);
105118
}
106119

107120
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -129,4 +142,61 @@ public void testNoNullSignalNoMoreReadsBefore() throws Exception {
129142
ReadsDownsampler rd = new ReservoirDownsampler(1, true);
130143
rd.signalNoMoreReadsBefore(null);
131144
}
145+
146+
// Calling consumeFinalizeItems() before end of input on a non-empty downsampler should
147+
// return an empty List and not change the state of the downsampler:
148+
@Test
149+
public void testConsumeFinalizedItemsBeforeEndOfInput() {
150+
final ReservoirDownsampler downsampler = new ReservoirDownsampler(10, true);
151+
final GATKRead read = ArtificialReadUtils.createArtificialRead("100M");
152+
downsampler.submit(read);
153+
154+
Assert.assertFalse(downsampler.hasFinalizedItems());
155+
Assert.assertTrue(downsampler.hasPendingItems());
156+
157+
final List<GATKRead> returnedReads = downsampler.consumeFinalizedItems();
158+
Assert.assertTrue(returnedReads.isEmpty());
159+
160+
// The downsampler should still have pending reads after the call to consumeFinalizedItems()
161+
// (ie., the downsampling process should still be ongoing, and the state of the downsampler
162+
// should have been preserved):
163+
Assert.assertFalse(downsampler.hasFinalizedItems());
164+
Assert.assertTrue(downsampler.hasPendingItems());
165+
}
166+
167+
// Calling consumeFinalizedItems() after end of input on an empty downsampler should
168+
// return an empty List and reset the end of input flag in the downsampler:
169+
@Test
170+
public void testConsumeFinalizedItemsAfterEndOfInputOnEmptyDownsampler() {
171+
final ReservoirDownsampler downsampler = new ReservoirDownsampler(10, true);
172+
173+
Assert.assertFalse(downsampler.hasFinalizedItems());
174+
Assert.assertFalse(downsampler.hasPendingItems());
175+
176+
downsampler.signalEndOfInput();
177+
178+
final List<GATKRead> returnedReads = downsampler.consumeFinalizedItems();
179+
Assert.assertTrue(returnedReads.isEmpty());
180+
181+
Assert.assertFalse(downsampler.hasFinalizedItems());
182+
Assert.assertFalse(downsampler.hasPendingItems());
183+
184+
// Submitting an additional read after both signalEndOfInput() and consumeFinalizedItems()
185+
// should succeed (not throw), since the call to consumeFinalizedItems() should reset the
186+
// end of input flag:
187+
final GATKRead read = ArtificialReadUtils.createArtificialRead("100M");
188+
downsampler.submit(read);
189+
190+
Assert.assertFalse(downsampler.hasFinalizedItems());
191+
Assert.assertTrue(downsampler.hasPendingItems());
192+
}
193+
194+
// Calling submit() directly after signalEndOfInput() should throw:
195+
@Test(expectedExceptions = IllegalStateException.class)
196+
public void testSubmitAfterEndOfInputThrows() {
197+
final ReservoirDownsampler downsampler = new ReservoirDownsampler(10, true);
198+
downsampler.signalEndOfInput();
199+
final GATKRead read = ArtificialReadUtils.createArtificialRead("100M");
200+
downsampler.submit(read);
201+
}
132202
}

0 commit comments

Comments
 (0)