Skip to content

Commit f153a5b

Browse files
ashwinsshettyc2thorn
authored andcommitted
Add new resource for creating Whistle Mapping, Reconciliation and Backfill Pipeline Jobs for Healthcare Data Engine (GoogleCloudPlatform#11812)
Co-authored-by: Cameron Thornton <[email protected]>
1 parent 2f27758 commit f153a5b

5 files changed

+476
-0
lines changed
+257
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
# Copyright 2024 Google Inc.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
---
15+
name: 'PipelineJob'
16+
kind: 'healthcare#pipelineJob'
17+
description: |
18+
PipelineJobs are Long Running Operations on Healthcare API to Map or Reconcile
19+
incoming data into FHIR format
20+
references:
21+
guides:
22+
'Creating a PipelineJob': 'https://cloud.google.com/healthcare-api/private/healthcare-data-engine/docs/reference/rest/v1/projects.locations.datasets.pipelineJobs#PipelineJob'
23+
api: 'https://cloud.google.com/healthcare-api/healthcare-data-engine/docs/reference/rest/v1/projects.locations.datasets.pipelineJobs'
24+
base_url: '{{dataset}}/pipelineJobs?pipelineJobId={{name}}'
25+
self_link: '{{dataset}}/pipelineJobs/{{name}}'
26+
delete_url: '{{dataset}}/pipelineJobs/{{name}}'
27+
exclude_sweeper: true
28+
update_verb: PATCH
29+
update_mask: true
30+
id_format: '{{dataset}}/pipelineJobs/{{name}}'
31+
import_format: ['{{%dataset}}/pipelineJobs/{{name}}', '{{name}}', '{{dataset}}/pipelineJobs?pipelineJobId={{name}}']
32+
examples:
33+
- name: 'healthcare_pipeline_job_reconciliation'
34+
primary_resource_id: 'example-pipeline'
35+
vars:
36+
pipeline_name: 'example_pipeline_job'
37+
dataset_name: 'example_dataset'
38+
fhir_store_name: 'fhir_store'
39+
bucket_name: 'example_bucket_name'
40+
- name: 'healthcare_pipeline_job_backfill'
41+
primary_resource_id: 'example-pipeline'
42+
vars:
43+
backfill_pipeline_name: 'example_backfill_pipeline'
44+
dataset_name: 'example_dataset'
45+
mapping_pipeline_name: 'example_mapping_pipeline'
46+
- name: 'healthcare_pipeline_job_whistle_mapping'
47+
primary_resource_id: 'example-mapping-pipeline'
48+
vars:
49+
pipeline_name: 'example_mapping_pipeline_job'
50+
dataset_name: 'example_dataset'
51+
source_fhirstore_name: 'source_fhir_store'
52+
dest_fhirstore_name: 'dest_fhir_store'
53+
bucket_name: 'example_bucket_name'
54+
- name: 'healthcare_pipeline_job_mapping_recon_dest'
55+
primary_resource_id: 'example-mapping-pipeline'
56+
vars:
57+
pipeline_name: 'example_mapping_pipeline_job'
58+
recon_pipeline_name: 'example_recon_pipeline_job'
59+
dataset_name: 'example_dataset'
60+
source_fhirstore_name: 'source_fhir_store'
61+
dest_fhirstore_name: 'dest_fhir_store'
62+
bucket_name: 'example_bucket_name'
63+
custom_code:
64+
decoder: templates/terraform/decoders/long_name_to_self_link.go.tmpl
65+
parameters:
66+
- name: 'location'
67+
type: String
68+
required: true
69+
immutable: true
70+
url_param_only: true
71+
description: |
72+
Location where the Pipeline Job is to run
73+
- name: 'dataset'
74+
type: String
75+
required: true
76+
immutable: true
77+
url_param_only: true
78+
description: |
79+
Healthcare Dataset under which the Pipeline Job is to run
80+
properties:
81+
- name: 'name'
82+
type: String
83+
description: |
84+
Specifies the name of the pipeline job. This field is user-assigned.
85+
required: true
86+
- name: 'disableLineage'
87+
type: Boolean
88+
description: |
89+
If true, disables writing lineage for the pipeline.
90+
required: false
91+
default_value: false
92+
- name: 'labels'
93+
required: false
94+
type: KeyValueLabels
95+
description: |
96+
User-supplied key-value pairs used to organize Pipeline Jobs.
97+
Label keys must be between 1 and 63 characters long, have a UTF-8 encoding of
98+
maximum 128 bytes, and must conform to the following PCRE regular expression:
99+
[\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}
100+
Label values are optional, must be between 1 and 63 characters long, have a
101+
UTF-8 encoding of maximum 128 bytes, and must conform to the following PCRE
102+
regular expression: [\p{Ll}\p{Lo}\p{N}_-]{0,63}
103+
No more than 64 labels can be associated with a given pipeline.
104+
An object containing a list of "key": value pairs.
105+
Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.
106+
- name: 'selfLink'
107+
type: String
108+
description: |
109+
The fully qualified name of this dataset
110+
output: true
111+
ignore_read: true
112+
- name: mappingPipelineJob
113+
type: NestedObject
114+
conflicts:
115+
- reconciliationPipelineJob
116+
- backfillPipelineJob
117+
description: |
118+
Specifies mapping configuration.
119+
required: false
120+
properties:
121+
- name: mappingConfig
122+
type: NestedObject
123+
description: |
124+
The location of the mapping configuration.
125+
required: true
126+
properties:
127+
- name: description
128+
type: String
129+
description: |
130+
Describes the mapping configuration.
131+
required: false
132+
- name: whistleConfigSource
133+
type: NestedObject
134+
description: |
135+
Specifies the path to the mapping configuration for harmonization pipeline.
136+
required: false
137+
properties:
138+
- name: uri
139+
type: String
140+
description: |
141+
Main configuration file which has the entrypoint or the root function.
142+
Example: gs://{bucket-id}/{path/to/import-root/dir}/entrypoint-file-name.wstl.
143+
required: true
144+
- name: importUriPrefix
145+
type: String
146+
description: |
147+
Directory path where all the Whistle files are located.
148+
Example: gs://{bucket-id}/{path/to/import-root/dir}
149+
required: true
150+
- name: fhirStreamingSource
151+
description: |
152+
A streaming FHIR data source.
153+
required: false
154+
type: NestedObject
155+
properties:
156+
- name: fhirStore
157+
type: String
158+
description: |
159+
The path to the FHIR store in the format projects/{projectId}/locations/{locationId}/datasets/{datasetId}/fhirStores/{fhirStoreId}.
160+
required: true
161+
- name: description
162+
type: String
163+
description: |
164+
Describes the streaming FHIR data source.
165+
required: false
166+
- name: fhirStoreDestination
167+
type: String
168+
conflicts:
169+
- reconciliationDestination
170+
description: |
171+
If set, the mapping pipeline will write snapshots to this
172+
FHIR store without assigning stable IDs. You must
173+
grant your pipeline project's Cloud Healthcare Service
174+
Agent serviceaccount healthcare.fhirResources.executeBundle
175+
and healthcare.fhirResources.create permissions on the
176+
destination store. The destination store must set
177+
[disableReferentialIntegrity][FhirStore.disable_referential_integrity]
178+
to true. The destination store must use FHIR version R4.
179+
Format: project/{projectID}/locations/{locationID}/datasets/{datasetName}/fhirStores/{fhirStoreID}.
180+
required: false
181+
- name: reconciliationDestination
182+
type: Boolean
183+
conflicts:
184+
- fhirStoreDestination
185+
description: |
186+
If set to true, a mapping pipeline will send output snapshots
187+
to the reconciliation pipeline in its dataset. A reconciliation
188+
pipeline must exist in this dataset before a mapping pipeline
189+
with a reconciliation destination can be created.
190+
required: false
191+
- name: reconciliationPipelineJob
192+
conflicts:
193+
- mappingPipelineJob
194+
- backfillPipelineJob
195+
description: |
196+
Specifies reconciliation configuration.
197+
required: false
198+
type: NestedObject
199+
properties:
200+
- name: mergeConfig
201+
description: |
202+
Specifies the location of the reconciliation configuration.
203+
required: true
204+
type: NestedObject
205+
properties:
206+
- name: description
207+
type: String
208+
description: |
209+
Describes the mapping configuration.
210+
required: false
211+
- name: whistleConfigSource
212+
description: |
213+
Specifies the path to the mapping configuration for harmonization pipeline.
214+
required: true
215+
type: NestedObject
216+
properties:
217+
- name: uri
218+
type: String
219+
description: |
220+
Main configuration file which has the entrypoint or the root function.
221+
Example: gs://{bucket-id}/{path/to/import-root/dir}/entrypoint-file-name.wstl.
222+
required: true
223+
- name: importUriPrefix
224+
type: String
225+
description: |
226+
Directory path where all the Whistle files are located.
227+
Example: gs://{bucket-id}/{path/to/import-root/dir}
228+
required: true
229+
- name: matchingUriPrefix
230+
type: String
231+
description: |
232+
Specifies the top level directory of the matching configs used
233+
in all mapping pipelines, which extract properties for resources
234+
to be matched on.
235+
Example: gs://{bucket-id}/{path/to/matching/configs}
236+
required: true
237+
- name: fhirStoreDestination
238+
type: String
239+
description: |
240+
The harmonized FHIR store to write harmonized FHIR resources to,
241+
in the format of: project/{projectID}/locations/{locationID}/datasets/{datasetName}/fhirStores/{id}
242+
required: false
243+
- name: backfillPipelineJob
244+
conflicts:
245+
- mappingPipelineJob
246+
- reconciliationPipelineJob
247+
description: |
248+
Specifies the backfill configuration.
249+
required: false
250+
type: NestedObject
251+
properties:
252+
- name: mappingPipelineJob
253+
type: String
254+
description: |
255+
Specifies the mapping pipeline job to backfill, the name format
256+
should follow: projects/{projectId}/locations/{locationId}/datasets/{datasetId}/pipelineJobs/{pipelineJobId}.
257+
required: false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
resource "google_healthcare_pipeline_job" "{{$.PrimaryResourceId}}" {
2+
name = "{{index $.Vars "backfill_pipeline_name"}}"
3+
location = "us-central1"
4+
dataset = google_healthcare_dataset.dataset.id
5+
backfill_pipeline_job {
6+
mapping_pipeline_job = "${google_healthcare_dataset.dataset.id}/pipelinejobs/{{index $.Vars "mapping_pipeline_name"}}"
7+
}
8+
}
9+
10+
resource "google_healthcare_dataset" "dataset" {
11+
name = "{{index $.Vars "dataset_name"}}"
12+
location = "us-central1"
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
data "google_project" "project" {
2+
}
3+
4+
resource "google_healthcare_pipeline_job" "recon" {
5+
name = "{{index $.Vars "recon_pipeline_name"}}"
6+
location = "us-central1"
7+
dataset = google_healthcare_dataset.dataset.id
8+
disable_lineage = true
9+
reconciliation_pipeline_job {
10+
merge_config {
11+
description = "sample description for reconciliation rules"
12+
whistle_config_source {
13+
uri = "gs://${google_storage_bucket.bucket.name}/${google_storage_bucket_object.merge_file.name}"
14+
import_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
15+
}
16+
}
17+
matching_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
18+
fhir_store_destination = "${google_healthcare_dataset.dataset.id}/fhirStores/${google_healthcare_fhir_store.dest_fhirstore.name}"
19+
}
20+
}
21+
22+
resource "google_healthcare_pipeline_job" "{{$.PrimaryResourceId}}" {
23+
depends_on = [google_healthcare_pipeline_job.recon]
24+
name = "{{index $.Vars "pipeline_name"}}"
25+
location = "us-central1"
26+
dataset = google_healthcare_dataset.dataset.id
27+
disable_lineage = true
28+
labels = {
29+
example_label_key = "example_label_value"
30+
}
31+
mapping_pipeline_job {
32+
mapping_config {
33+
whistle_config_source {
34+
uri = "gs://${google_storage_bucket.bucket.name}/${google_storage_bucket_object.mapping_file.name}"
35+
import_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
36+
}
37+
description = "example description for mapping configuration"
38+
}
39+
fhir_streaming_source {
40+
fhir_store = "${google_healthcare_dataset.dataset.id}/fhirStores/${google_healthcare_fhir_store.source_fhirstore.name}"
41+
description = "example description for streaming fhirstore"
42+
}
43+
reconciliation_destination = true
44+
}
45+
}
46+
47+
resource "google_healthcare_dataset" "dataset" {
48+
name = "{{index $.Vars "dataset_name"}}"
49+
location = "us-central1"
50+
}
51+
52+
resource "google_healthcare_fhir_store" "source_fhirstore" {
53+
name = "{{index $.Vars "source_fhirstore_name"}}"
54+
dataset = google_healthcare_dataset.dataset.id
55+
version = "R4"
56+
enable_update_create = true
57+
disable_referential_integrity = true
58+
}
59+
60+
resource "google_healthcare_fhir_store" "dest_fhirstore" {
61+
name = "{{index $.Vars "dest_fhirstore_name"}}"
62+
dataset = google_healthcare_dataset.dataset.id
63+
version = "R4"
64+
enable_update_create = true
65+
disable_referential_integrity = true
66+
}
67+
68+
resource "google_storage_bucket" "bucket" {
69+
name = "{{index $.Vars "bucket_name"}}"
70+
location = "us-central1"
71+
uniform_bucket_level_access = true
72+
}
73+
74+
resource "google_storage_bucket_object" "mapping_file" {
75+
name = "mapping.wstl"
76+
content = " "
77+
bucket = google_storage_bucket.bucket.name
78+
}
79+
80+
resource "google_storage_bucket_object" "merge_file" {
81+
name = "merge.wstl"
82+
content = " "
83+
bucket = google_storage_bucket.bucket.name
84+
}
85+
86+
resource "google_storage_bucket_iam_member" "hsa" {
87+
bucket = google_storage_bucket.bucket.name
88+
role = "roles/storage.objectUser"
89+
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-healthcare.iam.gserviceaccount.com"
90+
}

0 commit comments

Comments
 (0)