Skip to content

No need to sort variants in HaplotypeCallerSpark. #5909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class VariantsSparkSink {
public static void writeVariants(
final JavaSparkContext ctx, final String outputFile, final JavaRDD<VariantContext> variants,
final VCFHeader header) throws IOException {
writeVariants(ctx, outputFile, variants, header, false, null, 0, 0, true);
writeVariants(ctx, outputFile, variants, header, false, null, 0, 0, true, true);
}

/**
Expand All @@ -51,13 +51,19 @@ public static void writeVariants(
public static void writeVariants(
final JavaSparkContext ctx, final String outputFile, final JavaRDD<VariantContext> variants,
final VCFHeader header, final boolean writeTabixIndex) throws IOException {
writeVariants(ctx, outputFile, variants, header, false, null, 0, 0, writeTabixIndex);
writeVariants(ctx, outputFile, variants, header, writeTabixIndex, true);
}

public static void writeVariants(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc this method

final JavaSparkContext ctx, final String outputFile, final JavaRDD<VariantContext> variants,
final VCFHeader header, final boolean writeTabixIndex, final boolean sort) throws IOException {
writeVariants(ctx, outputFile, variants, header, false, null, 0, 0, writeTabixIndex, true);
}

public static void writeVariants(
final JavaSparkContext ctx, final String outputFile, final JavaRDD<VariantContext> variants,
final VCFHeader header, final boolean writeGvcf, final List<Number> gqPartitions, final int defaultPloidy) throws IOException {
writeVariants(ctx, outputFile, variants, header, writeGvcf, gqPartitions, defaultPloidy, 0, true);
writeVariants(ctx, outputFile, variants, header, writeGvcf, gqPartitions, defaultPloidy, 0, true, true);
}

/**
Expand All @@ -74,21 +80,21 @@ public static void writeVariants(
public static void writeVariants(
final JavaSparkContext ctx, final String outputFile, final JavaRDD<VariantContext> variants,
final VCFHeader header, final boolean writeGvcf, final List<Number> gqPartitions, final int defaultPloidy,
final int numReducers, final boolean writeTabixIndex) throws IOException {
final int numReducers, final boolean writeTabixIndex, final boolean sort) throws IOException {
String absoluteOutputFile = BucketUtils.makeFilePathAbsolute(outputFile);
writeVariantsSingle(ctx, absoluteOutputFile, variants, header, writeGvcf, gqPartitions, defaultPloidy, numReducers, writeTabixIndex);
writeVariantsSingle(ctx, absoluteOutputFile, variants, header, writeGvcf, gqPartitions, defaultPloidy, numReducers, writeTabixIndex, sort);
}

private static void writeVariantsSingle(
final JavaSparkContext ctx, final String outputFile, final JavaRDD<VariantContext> variants,
final VCFHeader header, final boolean writeGvcf, final List<Number> gqPartitions, final int defaultPloidy,
final int numReducers, final boolean writeTabixIndex) throws IOException {
final int numReducers, final boolean writeTabixIndex, final boolean sortVariantsToHeader) throws IOException {

//TODO remove me when https://github.com/broadinstitute/gatk/issues/4303 is fixed
if (outputFile.endsWith(IOUtil.BCF_FILE_EXTENSION) || outputFile.endsWith(IOUtil.BCF_FILE_EXTENSION + ".gz")) {
throw new UserException.UnimplementedFeature("It is currently not possible to write a BCF file on spark. See https://github.com/broadinstitute/gatk/issues/4303 for more details .");
}
final JavaRDD<VariantContext> sortedVariants = sortVariants(variants, header, numReducers);
final JavaRDD<VariantContext> sortedVariants = sortVariantsToHeader ? sortVariants(variants, header, numReducers) : variants;
final JavaRDD<VariantContext> variantsToSave;
if (writeGvcf) {
GVCFBlockCombiner gvcfBlockCombiner = new GVCFBlockCombiner(gqPartitions, defaultPloidy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,10 @@ private static void processAssemblyRegions(

final JavaRDD<VariantContext> variants = rdd.mapPartitions(assemblyFunction(header, referenceFileName, hcArgsBroadcast, annotatorEngineBroadcast));

variants.cache(); // without caching, computations are run twice as a side effect of finding partition boundaries for sorting
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh, does this have to do with the change in sorting? what sort of difference in runtime does this branch have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache call was added to avoid repeat work. I haven't tried re-running benchmarks yet but I suspect it will give some speed up.

try {
VariantsSparkSink.writeVariants(ctx, output, variants, hcEngine.makeVCFHeader(header.getSequenceDictionary(), new HashSet<>()),
hcArgs.emitReferenceConfidence == ReferenceConfidenceMode.GVCF, new ArrayList<Number>(hcArgs.GVCFGQBands), hcArgs.standardArgs.genotypeArgs.samplePloidy,
0, createOutputVariantIndex);
0, createOutputVariantIndex, false);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(output, "writing failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class PrintVariantsSpark extends VariantWalkerSpark {
protected void processVariants(JavaRDD<VariantWalkerContext> rdd, JavaSparkContext ctx) {
try {
VariantsSparkSink.writeVariants(ctx, output, rdd.map(VariantWalkerContext::getVariant), getHeaderForVariants(),
createOutputVariantIndex);
createOutputVariantIndex, false);
} catch (IOException e) {
throw new UserException.CouldNotCreateOutputFile(output, "writing failed", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static Object[][] brokenCases() {
public void testBrokenGVCFCasesAreDisallowed(boolean writeGvcf, String extension) throws IOException {
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
VariantsSparkSink.writeVariants(ctx, createTempFile("test", extension).toString(), null,
new VCFHeader(), writeGvcf, Arrays.asList(1, 2, 4, 5), 2, 1, false);
new VCFHeader(), writeGvcf, Arrays.asList(1, 2, 4, 5), 2, 1, false, false);
}

@DataProvider
Expand All @@ -142,7 +142,7 @@ public void testEnableDisableGVCFWriting(boolean writeGvcf, String extension) th

final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();
final File output = createTempFile(outputFileName, extension);
VariantsSparkSink.writeVariants(ctx, output.toString(), ctx.parallelize(vcs), getHeader(), writeGvcf, Arrays.asList(100), 2, 1, true);
VariantsSparkSink.writeVariants(ctx, output.toString(), ctx.parallelize(vcs), getHeader(), writeGvcf, Arrays.asList(100), 2, 1, true, true);

checkFileExtensionConsistentWithContents(output.toString(), true);

Expand Down