-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathraredisease.py
299 lines (267 loc) · 13.3 KB
/
raredisease.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""Module for Raredisease Analysis API."""
import logging
from itertools import permutations
from pathlib import Path
from typing import Any
from housekeeper.store.models import File
from cg.clients.chanjo2.models import (
CoverageMetrics,
CoveragePostRequest,
CoveragePostResponse,
CoverageSample,
)
from cg.constants import DEFAULT_CAPTURE_KIT, Workflow
from cg.constants.constants import GenomeVersion
from cg.constants.nf_analysis import (
RAREDISEASE_COVERAGE_FILE_TAGS,
RAREDISEASE_COVERAGE_INTERVAL_TYPE,
RAREDISEASE_COVERAGE_THRESHOLD,
RAREDISEASE_PARENT_PEDDY_METRIC_CONDITION,
RAREDISEASE_METRIC_CONDITIONS_WGS,
RAREDISEASE_METRIC_CONDITIONS_WES,
RAREDISEASE_ADAPTER_BASES_PERCENTAGE_THRESHOLD,
)
from cg.constants.scout import RAREDISEASE_CASE_TAGS, ScoutExportFileName
from cg.constants.sequencing import SeqLibraryPrepCategory, NOVASEQ_SEQUENCING_READ_LENGTH
from cg.constants.subject import PlinkPhenotypeStatus, PlinkSex
from cg.constants.tb import AnalysisType
from cg.meta.workflow.nf_analysis import NfAnalysisAPI
from cg.models.cg_config import CGConfig
from cg.models.deliverables.metric_deliverables import MetricsBase, MultiqcDataJson
from cg.models.raredisease.raredisease import (
RarediseaseParameters,
RarediseaseSampleSheetEntry,
RarediseaseSampleSheetHeaders,
)
from cg.resources import RAREDISEASE_BUNDLE_FILENAMES_PATH
from cg.store.models import CaseSample, Sample
LOG = logging.getLogger(__name__)
class RarediseaseAnalysisAPI(NfAnalysisAPI):
"""Handles communication between RAREDISEASE processes
and the rest of CG infrastructure."""
def __init__(
self,
config: CGConfig,
workflow: Workflow = Workflow.RAREDISEASE,
):
super().__init__(config=config, workflow=workflow)
self.root_dir: str = config.raredisease.root
self.workflow_bin_path: str = config.raredisease.workflow_bin_path
self.profile: str = config.raredisease.profile
self.conda_env: str = config.raredisease.conda_env
self.conda_binary: str = config.raredisease.conda_binary
self.platform: str = config.raredisease.platform
self.params: str = config.raredisease.params
self.workflow_config_path: str = config.raredisease.config
self.resources: str = config.raredisease.resources
self.tower_binary_path: str = config.tower_binary_path
self.tower_workflow: str = config.raredisease.tower_workflow
self.account: str = config.raredisease.slurm.account
self.email: str = config.raredisease.slurm.mail_user
self.compute_env_base: str = config.raredisease.compute_env
self.revision: str = config.raredisease.revision
self.nextflow_binary_path: str = config.raredisease.binary_path
@property
def sample_sheet_headers(self) -> list[str]:
"""Headers for sample sheet."""
return RarediseaseSampleSheetHeaders.list()
def get_sample_sheet_content_per_sample(self, case_sample: CaseSample) -> list[list[str]]:
"""Collect and format information required to build a sample sheet for a single sample."""
fastq_forward_read_paths, fastq_reverse_read_paths = self.get_paired_read_paths(
sample=case_sample.sample
)
sample_sheet_entry = RarediseaseSampleSheetEntry(
name=case_sample.sample.internal_id,
fastq_forward_read_paths=fastq_forward_read_paths,
fastq_reverse_read_paths=fastq_reverse_read_paths,
sex=self.get_sex_code(case_sample.sample.sex),
phenotype=self.get_phenotype_code(case_sample.status),
paternal_id=case_sample.get_paternal_sample_id,
maternal_id=case_sample.get_maternal_sample_id,
case_id=case_sample.case.internal_id,
)
return sample_sheet_entry.reformat_sample_content
@property
def is_gene_panel_required(self) -> bool:
"""Return True if a gene panel is needs to be created using the information in StatusDB and exporting it from Scout."""
return True
def get_target_bed(self, case_id: str, analysis_type: str) -> str:
"""
Return the target bed file from LIMS and use default capture kit for WHOLE_GENOME_SEQUENCING.
"""
target_bed_file: str = self.get_target_bed_from_lims(case_id=case_id)
if not target_bed_file:
if analysis_type == AnalysisType.WGS:
return DEFAULT_CAPTURE_KIT
raise ValueError("No capture kit was found in LIMS")
return target_bed_file
def get_germlinecnvcaller_flag(self, analysis_type: str) -> bool:
if analysis_type == AnalysisType.WGS:
return True
return False
def get_built_workflow_parameters(self, case_id: str) -> RarediseaseParameters:
"""Return parameters."""
analysis_type: AnalysisType = self.get_data_analysis_type(case_id=case_id)
target_bed_file: str = self.get_target_bed(case_id=case_id, analysis_type=analysis_type)
skip_germlinecnvcaller = self.get_germlinecnvcaller_flag(analysis_type=analysis_type)
outdir = self.get_case_path(case_id=case_id)
return RarediseaseParameters(
input=self.get_sample_sheet_path(case_id=case_id),
outdir=outdir,
analysis_type=analysis_type,
target_bed_file=target_bed_file,
save_mapped_as_cram=True,
skip_germlinecnvcaller=skip_germlinecnvcaller,
vcfanno_extra_resources=f"{outdir}/{ScoutExportFileName.MANAGED_VARIANTS}",
vep_filters_scout_fmt=f"{outdir}/{ScoutExportFileName.PANELS}",
)
@staticmethod
def get_phenotype_code(phenotype: str) -> int:
"""Return Raredisease phenotype code."""
LOG.debug("Translate phenotype to integer code")
try:
code = PlinkPhenotypeStatus[phenotype.upper()]
except KeyError:
raise ValueError(f"{phenotype} is not a valid phenotype")
return code
@staticmethod
def get_sex_code(sex: str) -> int:
"""Return Raredisease sex code."""
LOG.debug("Translate sex to integer code")
try:
code = PlinkSex[sex.upper()]
except KeyError:
raise ValueError(f"{sex} is not a valid sex")
return code
@staticmethod
def get_bundle_filenames_path() -> Path:
"""Return Raredisease bundle filenames path."""
return RAREDISEASE_BUNDLE_FILENAMES_PATH
@property
def is_managed_variants_required(self) -> bool:
"""Return True if a managed variants needs to be exported from Scout."""
return True
def write_managed_variants(self, case_id: str, content: list[str]) -> None:
self._write_managed_variants(out_dir=Path(self.root, case_id), content=content)
def get_managed_variants(self, case_id: str) -> list[str]:
"""Create and return the managed variants."""
return self._get_managed_variants(
genome_build=self.get_gene_panel_genome_build(case_id=case_id)
)
def get_workflow_metrics(self, sample_id: str) -> dict:
"""Return Raredisease workflow metric conditions for a sample."""
sample: Sample = self.status_db.get_sample_by_internal_id(internal_id=sample_id)
if "-" not in sample_id:
metric_conditions: dict[str, dict[str, Any]] = (
self.get_metric_conditions_by_prep_category(sample_id=sample.internal_id)
)
self.set_order_sex_for_sample(sample, metric_conditions)
self.set_adapter_bases_for_sample(sample, metric_conditions)
else:
metric_conditions = RAREDISEASE_PARENT_PEDDY_METRIC_CONDITION.copy()
return metric_conditions
def get_metric_conditions_by_prep_category(self, sample_id: str) -> dict:
sample: Sample = self.status_db.get_sample_by_internal_id(internal_id=sample_id)
if (
sample.application_version.application.analysis_type
== SeqLibraryPrepCategory.WHOLE_GENOME_SEQUENCING
):
return RAREDISEASE_METRIC_CONDITIONS_WGS.copy()
return RAREDISEASE_METRIC_CONDITIONS_WES.copy()
def _get_sample_pair_patterns(self, case_id: str) -> list[str]:
"""Return sample-pair patterns for searching in MultiQC."""
sample_ids: list[str] = list(self.status_db.get_sample_ids_by_case_id(case_id=case_id))
pairwise_patterns: list[str] = [
f"{sample1}-{sample2}" for sample1, sample2 in permutations(sample_ids, 2)
]
return pairwise_patterns
def get_parent_error_ped_check_metric(
self, pair_sample_ids: str, multiqc_raw_data: dict[dict]
) -> MetricsBase | None:
"""Return the parsed metrics for pedigree error given a concatenated pair of sample ids."""
metric_name: str = "parent_error_ped_check"
peddy_metrics: dict[str, dict] = multiqc_raw_data["multiqc_peddy"]
if sample_pair_metrics := peddy_metrics.get(pair_sample_ids, None):
return self.get_multiqc_metric(
metric_name=metric_name,
metric_value=sample_pair_metrics[metric_name],
metric_id=pair_sample_ids,
)
def get_raredisease_multiqc_json_metrics(self, case_id: str) -> list[MetricsBase]:
"""Return a list of the metrics specified in a MultiQC json file."""
multiqc_json: MultiqcDataJson = self.get_multiqc_data_json(case_id=case_id)
metrics = []
for search_pattern, metric_id in self.get_multiqc_search_patterns(case_id).items():
metrics_for_pattern: list[MetricsBase] = (
self.get_metrics_from_multiqc_json_with_pattern(
search_pattern=search_pattern,
multiqc_json=multiqc_json,
metric_id=metric_id,
exact_match=self.is_multiqc_pattern_search_exact,
)
)
metrics.extend(metrics_for_pattern)
for sample_pair in self._get_sample_pair_patterns(case_id):
if parent_error_metric := self.get_parent_error_ped_check_metric(
pair_sample_ids=sample_pair, multiqc_raw_data=multiqc_json.report_saved_raw_data
):
metrics.append(parent_error_metric)
metrics = self.get_deduplicated_metrics(metrics=metrics)
return metrics
def create_metrics_deliverables_content(self, case_id: str) -> dict[str, list[dict[str, Any]]]:
"""Create the content of a Raredisease metrics deliverables file."""
metrics: list[MetricsBase] = self.get_raredisease_multiqc_json_metrics(case_id=case_id)
self.ensure_mandatory_metrics_present(metrics=metrics)
return {"metrics": [metric.dict() for metric in metrics]}
@staticmethod
def set_order_sex_for_sample(sample: Sample, metric_conditions: dict) -> None:
metric_conditions["predicted_sex_sex_check"]["threshold"] = sample.sex
metric_conditions["gender"]["threshold"] = sample.sex
@staticmethod
def set_adapter_bases_for_sample(sample: Sample, metric_conditions: dict) -> None:
"""Calculate threshold for maximum number of adapter bases for a given sample"""
adapter_bases_threshold = (
sample.reads
* NOVASEQ_SEQUENCING_READ_LENGTH
* RAREDISEASE_ADAPTER_BASES_PERCENTAGE_THRESHOLD
)
metric_conditions["adapter_cutting_adapter_trimmed_reads"][
"threshold"
] = adapter_bases_threshold
def get_sample_coverage_file_path(self, bundle_name: str, sample_id: str) -> str | None:
"""Return the Raredisease d4 coverage file path."""
coverage_file_tags: list[str] = RAREDISEASE_COVERAGE_FILE_TAGS + [sample_id]
coverage_file: File | None = self.housekeeper_api.get_file_from_latest_version(
bundle_name=bundle_name, tags=coverage_file_tags
)
if coverage_file:
return coverage_file.full_path
LOG.warning(f"No coverage file found with the tags: {coverage_file_tags}")
return None
def get_sample_coverage(
self, case_id: str, sample_id: str, gene_ids: list[int]
) -> CoverageMetrics | None:
"""Return sample coverage metrics from Chanjo2."""
genome_version: GenomeVersion = self.get_genome_build(case_id)
coverage_file_path: str | None = self.get_sample_coverage_file_path(
bundle_name=case_id, sample_id=sample_id
)
try:
post_request = CoveragePostRequest(
build=self.translate_genome_reference(genome_version),
coverage_threshold=RAREDISEASE_COVERAGE_THRESHOLD,
hgnc_gene_ids=gene_ids,
interval_type=RAREDISEASE_COVERAGE_INTERVAL_TYPE,
samples=[CoverageSample(coverage_file_path=coverage_file_path, name=sample_id)],
)
post_response: CoveragePostResponse = self.chanjo2_api.get_coverage(post_request)
return post_response.get_sample_coverage_metrics(sample_id)
except Exception as error:
LOG.error(f"Error getting coverage for sample '{sample_id}', error: {error}")
return None
def get_scout_upload_case_tags(self) -> dict:
"""Return Raredisease Scout upload case tags."""
return RAREDISEASE_CASE_TAGS
def get_genome_build(self, case_id: str) -> GenomeVersion:
"""Return reference genome for a raredisease case. Currently fixed for hg19."""
return GenomeVersion.HG19