Skip to content

Fix ReservoirDownsampler, PositionalDownsampler, and ReadsDownsamplingIterator. #5594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 29, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private void finalizePendingReads() {
// the desired coverage, but if the region is decently mappable the shortfall will be minor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SamplePartitioner also uses a ReservoirDownsampler directly -- did you check the usage there to ensure it's consistent with the new semantics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, and just did so again to make sure- it looks like it follows the protocol pretty well and reliably calls signalEndOfInput before consuming items.

final ReservoirDownsampler wellMappedDownsampler = new ReservoirDownsampler(maxCoverage, false);
pendingReads.stream().filter(read -> read.getMappingQuality() > SUSPICIOUS_MAPPING_QUALITY).forEach(wellMappedDownsampler::submit);
wellMappedDownsampler.signalEndOfInput();
final List<GATKRead> readsToFinalize = wellMappedDownsampler.consumeFinalizedItems();
if (stride > 1) {
Collections.sort(readsToFinalize, Comparator.comparingInt(GATKRead::getAssignedStart));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.broadinstitute.hellbender.utils.downsampling;

import htsjdk.samtools.SAMFileHeader;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadCoordinateComparator;
import org.broadinstitute.hellbender.utils.read.ReadUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;


/**
Expand Down Expand Up @@ -71,16 +73,31 @@ private void handlePositionalChange( final GATKRead newRead ) {
// Use ReadCoordinateComparator to determine whether we've moved to a new start position.
// ReadCoordinateComparator will correctly distinguish between purely unmapped reads and unmapped reads that
// are assigned a nominal position.
if ( previousRead != null && ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header) != 0 ) {
if ( reservoir.hasFinalizedItems() ) {
finalizeReservoir();
if ( previousRead != null) {
final int cmpDiff = ReadCoordinateComparator.compareCoordinates(previousRead, newRead, header);
if (cmpDiff == 1) {
throw new GATKException.ShouldNeverReachHereException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be a ShouldNeverReachHereException -- we can easily reach here if the client provides reads in the wrong order. Make it just a regular GATKException or an IllegalStateException

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. BTW, I originally added this check because the HaplotypeCallerSpark tests were getting into this state after I made my changes, but before I added the code that resets previousRead to null in finalizeReservoir (see my response to your comment about that below).

String.format("Reads must be coordinate sorted (earlier %s later %s)", previousRead, newRead));
}
if (cmpDiff != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if -> else if

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

reservoir.signalEndOfInput();
if ( reservoir.hasFinalizedItems() ) {
finalizeReservoir();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the reservoir somehow had no finalized items after a positional change, wouldn't it get stuck in an "end of stream" state, since you're now calling signalEndOfInput() here? And wouldn't it then just explode on the subsequent reservoir.submit() call?

Perhaps we should just unconditionally call into finalizeReservoir() on every positional change, and get rid of the signalEndOfInput()/hasFinalizedItems() calls here (note that you're still calling signalEndOfInput() within finalizeReservoir() itself). That would ensure that the reservoir gets reset even if there were somehow no finalized items for a given position. It would also eliminate the need to call signalEndOfInput() twice as you're currently doing (you're calling it both here and in finalizeReservoir())

Copy link
Collaborator Author

@cmnbroad cmnbroad Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should be possible to have a position change and not have any finalized items, so the hasFinalizedItems, and thats what required the redundant call to signalEndOfInput. So I think what you propose is both simpler and less redundant. I'm going to add an arg to finalizeReservoir to control triggering an assert that there are always finalized items, unless you can think of a counter-example.

}
}
}
}

private void finalizeReservoir() {
// We can't consume finalized reads from the reservoir unless we first signal EOI.
// Once signalEndOfInput has been called and propagated to the ReservoirDownsampler, consumeFinalizedItems
// must be called on the ReservoirDownsampler before any new items can be submitted to it, to reset it's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's -> its

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// state so it can be recycled/reused for the next downsampling position.
reservoir.signalEndOfInput();
finalizedReads.addAll(reservoir.consumeFinalizedItems());
reservoir.clearItems();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call above to reservoir.consumeFinalizedItems() calls clearItems() internally, so you don't need to call it again here. Each clearItems() call results in an extra array allocation, so we want to minimize calls to this method, since downsampling gets performed in some performance-critical sections of code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

reservoir.resetStats();
previousRead = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to set previousRead to null here? Does this fix an actual problem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see my response comment above.

}

@Override
Expand All @@ -97,9 +114,11 @@ public List<GATKRead> consumeFinalizedItems() {

@Override
public boolean hasPendingItems() {
// The finalized items in the ReservoirDownsampler are pending items from the perspective of the
// enclosing PositionalDownsampler
return reservoir.hasFinalizedItems();
// The ReservoirDownsampler accumulates pending items until signalEndOfInput has been called, at which
// point all items that have survived downsampling become finalized. From the perspective of the enclosing
// PositionalDownsampler, both finalized items and pending items in the ReservoirDownsampler are considered
// pending.
return reservoir.hasFinalizedItems() || reservoir.hasPendingItems();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems right.

}

@Override
Expand All @@ -109,9 +128,11 @@ public GATKRead peekFinalized() {

@Override
public GATKRead peekPending() {
// The finalized items in the ReservoirDownsampler are pending items from the perspective of the
// enclosing PositionalDownsampler
return reservoir.peekFinalized();
// The ReservoirDownsampler accumulates pending items until signalEndOfInput has been called, at which
// point all items that have survived downsampling become finalized. From the perspective of the enclosing
// PositionalDownsampler, both finalized items and pending items in the ReservoirDownsampler are considered
// pending.
return Optional.ofNullable(reservoir.peekFinalized()).orElse(reservoir.peekPending());
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When signalEndOfInput() is called on a PositionalDownsampler, should it then forbid subsequent attempts to submit additional items, to mirror the new ReservoirDownsampler behavior?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should, with the same protocol (either consumeFinalizedItems or clearItems) to reset the state. I'll make those changes in a separate commit.

Expand Down Expand Up @@ -139,6 +160,7 @@ public boolean requiresCoordinateSortOrder() {

@Override
public void signalNoMoreReadsBefore( final GATKRead read ) {
Utils.nonNull(read, "Positional downsampler requires non-null reads");
handlePositionalChange(read);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public final class ReservoirDownsampler extends ReadsDownsampler {
*/
private int totalReadsSeen;

/**
* In order to guarantee that all reads have equal probability of being discarded, we need to have consumed the
* entire input stream before any items can become finalized. All submitted items (that survive downsampling)
* remain pending until endOfInputStream is called, at which point they become finalized.
*/
private boolean endOfInputStream;


/**
* Construct a ReservoirDownsampler
Expand Down Expand Up @@ -88,6 +95,9 @@ public ReservoirDownsampler(final int targetSampleSize ) {
@Override
public void submit ( final GATKRead newRead ) {
Utils.nonNull(newRead, "newRead");
// Once the end of the input stream has been seen, consumeFinalizedItems must be called to reset the state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumeFinalizedItems -> consumeFinalizedItems() or clearItems()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// of the ReservoirDownsampler before more items can be submitted
Utils.validate(endOfInputStream == false, "attempt to submit read after end of input stream has been seen");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endOfInputStream == false -> ! endOfInputStream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has been seen -> has been signaled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Only count reads that are actually eligible for discarding for the purposes of the reservoir downsampling algorithm
totalReadsSeen++;
Expand All @@ -110,35 +120,39 @@ public void submit ( final GATKRead newRead ) {

@Override
public boolean hasFinalizedItems() {
return ! reservoir.isEmpty();
// All items in the reservoir are pending until endOfInputStream is seen, at which point all items become finalized
return endOfInputStream && !reservoir.isEmpty();
}

@Override
public List<GATKRead> consumeFinalizedItems() {
Utils.validate(endOfInputStream == true, "signalEndOfInput must be called before finalized items can be consumed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If consumeFinalizedItems() is called when there are no finalized items, it should return an empty list, not throw an exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that this validate call be removed ? It just validates that signalEndOfInput has been called, which is a precondition for having finalized items. To me, hitting this exception indicates a contract violation - the caller shouldn't expect there to be finalized items until eoi is called. This caught the case in MutectDownsampler where it was calling submit followed by consume with no intervening signalEndOfInput.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should be removed. The contract of consumeFinalizedItems() is that it should return an empty list if there are no finalized items (without any side effects such as resetting the downsampler) -- this check violates that contract by throwing an exception instead.

if (hasFinalizedItems()) {
// pass reservoir by reference rather than make a copy, for speed
final List<GATKRead> downsampledItems = reservoir;
clearItems();
return downsampledItems;
} else {
// if there's nothing here, don't bother allocating a new list
clearItems();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above -- calling consumeFinalizedItems() when there are no finalized items should return an empty list, and not change the internal state of the downsampler (eg., by calling clearItems()).

Copy link
Collaborator Author

@cmnbroad cmnbroad Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if-branch is the case where end of input has been signaled, but no items were ever submitted. Removing the clearItems call would require the caller to detect when consumeFinalizedItems returns 0 items, and then also call clearItems(), in order to reset the state to reuse the downsampler. Doesn't that unnecessarily complicate the contract ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we should consider adding a method to the Downsampler interface called something like resetState to make the contract on how to recycle these more explicit.

Copy link
Contributor

@droazen droazen Jan 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most downsamplers don't get reused after signalEndOfInput() -- the ReservoirDownsampler within a downsampling iterator is an exception, so we shouldn't over-optimize for that case. It also doesn't make sense to me for consumeFinalizedItems() to have side effects like resetting the downsampler if there are no finalized items. Returning an empty list without any side effects seems much more logical.

return Collections.emptyList();
}
}

@Override
public boolean hasPendingItems() {
return false;
// All items in the reservoir are pending until endOfInputStream is seen, at which point all items become finalized
return !endOfInputStream && !reservoir.isEmpty();
}

@Override
public GATKRead peekFinalized() {
return reservoir.isEmpty() ? null : reservoir.get(0);
return hasFinalizedItems() ? reservoir.get(0) : null;
}

@Override
public GATKRead peekPending() {
return null;
return hasPendingItems() ? reservoir.get(0) : null;
}

@Override
Expand All @@ -148,7 +162,7 @@ public int size() {

@Override
public void signalEndOfInput() {
// NO-OP
endOfInputStream = true;
}

/**
Expand All @@ -164,6 +178,8 @@ public void clearItems() {

// an internal stat used by the downsampling process, so not cleared by resetStats() below
totalReadsSeen = 0;

endOfInputStream = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testVCFModeIsConcordantWithGATK3_8Results() throws Exception {
* Test that in VCF mode we're >= 99% concordant with GATK3.8 results
* THIS TEST explodes with an exception because Allele-Specific annotations are not supported in vcf mode yet.
* It's included to parallel the matching (also exploding) test for the non-spark HaplotypeCaller
* {@link org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest#testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations()}
* {@link org.broadinstitute.hellbender.tools.walkers.haplotypecaller.HaplotypeCallerIntegrationTest#testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations(String, String)}}
*/
@Test(expectedExceptions = UserException.class)
public void testVCFModeIsConcordantWithGATK3_8ResultsAlleleSpecificAnnotations() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.testng.annotations.Test;

import java.util.*;
import java.util.stream.IntStream;

public class ReadsDownsamplingIteratorUnitTest extends GATKBaseTest {

Expand Down Expand Up @@ -132,6 +133,22 @@ public void testRemoveThrows() {
iter.remove();
}

@Test
public void testReadsDownsamplingIteratorWithReservoirDownsampler() {
final int TOTAL_READ_COUNT = 100;
final int TARGET_COVERAGE = 45;
final List<GATKRead> reads = new ArrayList<>(TOTAL_READ_COUNT);
IntStream.range(1, TOTAL_READ_COUNT).forEach(i -> reads.add(readWithName(Integer.toString(i))));

final List<GATKRead> downsampledReads = new ArrayList<>();
final ReadsDownsamplingIterator downsamplingIter = new ReadsDownsamplingIterator(reads.iterator(), new ReservoirDownsampler(TARGET_COVERAGE));
for ( final GATKRead read : downsamplingIter ) {
downsampledReads.add(read);
}

Assert.assertEquals(downsampledReads.size(), TARGET_COVERAGE);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I suggest that you also modify the existing test ReservoirDownsamplerUnitTest.testReservoirDownsampler() to check, at the end, that the results from running with a ReservoirDownsampler wrapped in a ReadsDownsamplingIterator match the results from running standalone?

PositionalDownsamplerUnitTest.testPositionalDownsampler() has a check like this at the end, to illustrate what I mean:

        // Now test with a PositionalDownsampler wrapped in an iterator, and make sure we get the same results.
        // It's crucial to reset the random number generator again in order to match the selections made by the
        // first downsampling pass.
        Utils.resetRandomGenerator();
        final ReadsDownsamplingIterator downsamplingIter = new ReadsDownsamplingIterator(allReads.iterator(), new PositionalDownsampler(targetCoverage, header));
        final List<GATKRead> downsampledReadsFromIter = new ArrayList<>();
        for ( final GATKRead downsampledRead : downsamplingIter ) {
            downsampledReadsFromIter.add(downsampledRead);
        }

        Assert.assertEquals(downsampledReadsFromIter, downsampledReads, "Results from PositionalDownsampler wrapped in a ReadsDownsamplingIterator do not match results from standalone PositionalDownsampler");

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good idea. Done.


private GATKRead readWithName( final String name ) {
return ArtificialReadUtils.createArtificialRead(TextCigarCodec.decode("10M"), name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,26 @@ public void testReservoirDownsampler(final ReservoirDownsamplerTest test ) {

downsampler.submit(test.createReads());

// after submit, but before signalEndOfInput, all reads are pending, none are finalized
if ( test.totalReads > 0 ) {
Assert.assertTrue(downsampler.hasFinalizedItems());
Assert.assertTrue(downsampler.peekFinalized() != null);
Assert.assertFalse(downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekPending() == null);
Assert.assertFalse(downsampler.hasFinalizedItems());
Assert.assertNull(downsampler.peekFinalized());
Assert.assertTrue(downsampler.hasPendingItems());
Assert.assertNotNull(downsampler.peekPending());
}
else {
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekFinalized() == null && downsampler.peekPending() == null);
}

// after signalEndOfInput, not reads are pending, all are finalized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not -> no

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

downsampler.signalEndOfInput();

if ( test.totalReads > 0 ) {
Assert.assertTrue(downsampler.hasFinalizedItems());
Assert.assertTrue(downsampler.peekFinalized() != null);
Assert.assertNotNull(downsampler.peekFinalized());
Assert.assertFalse(downsampler.hasPendingItems());
Assert.assertTrue(downsampler.peekPending() == null);
Assert.assertNull(downsampler.peekPending());
}
else {
Assert.assertFalse(downsampler.hasFinalizedItems() || downsampler.hasPendingItems());
Expand Down