18
18
import argparse
19
19
import contextlib
20
20
import json
21
+ import os
22
+ import sys
23
+ import unittest
21
24
22
25
import yaml
23
26
26
29
from apache_beam .transforms import resources
27
30
from apache_beam .typehints .schemas import LogicalType
28
31
from apache_beam .typehints .schemas import MillisInstant
32
+ from apache_beam .yaml import yaml_testing
29
33
from apache_beam .yaml import yaml_transform
34
+ from apache_beam .yaml import yaml_utils
30
35
31
36
32
37
def _preparse_jinja_flags (argv ):
@@ -90,6 +95,28 @@ def _parse_arguments(argv):
90
95
type = json .loads ,
91
96
help = 'A json dict of variables used when invoking the jinja preprocessor '
92
97
'on the provided yaml pipeline.' )
98
+ parser .add_argument (
99
+ '--test' ,
100
+ action = argparse .BooleanOptionalAction ,
101
+ help = 'Run the tests associated with the given pipeline, rather than the '
102
+ 'pipeline itself.' )
103
+ parser .add_argument (
104
+ '--fix_tests' ,
105
+ action = argparse .BooleanOptionalAction ,
106
+ help = 'Update failing test expectations to match the actual ouput. '
107
+ 'Requires --test_suite if the pipeline uses jinja formatting.' )
108
+ parser .add_argument (
109
+ '--create_test' ,
110
+ action = argparse .BooleanOptionalAction ,
111
+ help = 'Automatically creates a regression test for the given pipeline, '
112
+ 'adding it to the pipeline spec or test suite dependon on whether '
113
+ '--test_suite is given. '
114
+ 'Requires --test_suite if the pipeline uses jinja formatting.' )
115
+ parser .add_argument (
116
+ '--test_suite' ,
117
+ help = 'Run the given tests against the given pipeline, rather than the '
118
+ 'pipeline itself. '
119
+ 'Should be a file containing a list of yaml test specifications.' )
93
120
return parser .parse_known_args (argv )
94
121
95
122
@@ -130,12 +157,109 @@ def run(argv=None):
130
157
print ('Running pipeline...' )
131
158
132
159
133
- def build_pipeline_components_from_argv (argv ):
160
+ def run_tests (argv = None , exit = True ):
161
+ known_args , pipeline_args , _ , pipeline_yaml = _build_pipeline_yaml_from_argv (
162
+ argv )
163
+ pipeline_spec = yaml .load (pipeline_yaml , Loader = yaml_transform .SafeLineLoader )
164
+ options = _build_pipeline_options (pipeline_spec , pipeline_args )
165
+
166
+ if known_args .create_test and known_args .fix_tests :
167
+ raise ValueError (
168
+ 'At most one of --create_test and --fix_tests may be specified.' )
169
+ elif known_args .create_test :
170
+ result = unittest .TestResult ()
171
+ tests = []
172
+ else :
173
+ if known_args .test_suite :
174
+ with open (known_args .test_suite ) as fin :
175
+ test_suite_holder = yaml .load (
176
+ fin , Loader = yaml_transform .SafeLineLoader ) or {}
177
+ else :
178
+ test_suite_holder = pipeline_spec
179
+ test_specs = test_suite_holder .get ('tests' , [])
180
+ if not isinstance (test_specs , list ):
181
+ raise TypeError ('tests attribute must be a list of test specifications.' )
182
+ elif not test_specs :
183
+ raise RuntimeError (
184
+ 'No tests found. '
185
+ "If you haven't added a set of tests yet, you can get started by "
186
+ 'running your pipeline with the --create_test flag enabled.' )
187
+
188
+ with _fix_xlang_instant_coding ():
189
+ tests = [
190
+ yaml_testing .YamlTestCase (
191
+ pipeline_spec , test_spec , options , known_args .fix_tests )
192
+ for test_spec in test_specs
193
+ ]
194
+ suite = unittest .TestSuite (tests )
195
+ result = unittest .TextTestRunner ().run (suite )
196
+
197
+ if known_args .fix_tests or known_args .create_test :
198
+ update_tests (known_args , pipeline_yaml , pipeline_spec , options , tests )
199
+
200
+ if exit :
201
+ # emulates unittest.main()
202
+ sys .exit (0 if result .wasSuccessful () else 1 )
203
+ else :
204
+ if not result .wasSuccessful ():
205
+ raise RuntimeError (result )
206
+
207
+
208
+ def update_tests (known_args , pipeline_yaml , pipeline_spec , options , tests ):
209
+ if known_args .test_suite :
210
+ path = known_args .test_suite
211
+ if not os .path .exists (path ) and known_args .create_test :
212
+ with open (path , 'w' ) as fout :
213
+ fout .write ('tests: []' )
214
+ elif known_args .yaml_pipeline_file :
215
+ path = known_args .yaml_pipeline_file
216
+ else :
217
+ raise RuntimeError (
218
+ 'Test fixing only supported for file-backed tests. '
219
+ 'Please use the --test_suite flag.' )
220
+ with open (path ) as fin :
221
+ original_yaml = fin .read ()
222
+ if path == known_args .yaml_pipeline_file and pipeline_yaml .strip (
223
+ ) != original_yaml .strip ():
224
+ raise RuntimeError (
225
+ 'In-file test fixing not yet supported for templated pipelines. '
226
+ 'Please use the --test_suite flag.' )
227
+ updated_spec = yaml .load (original_yaml , Loader = yaml .SafeLoader ) or {}
228
+
229
+ if known_args .fix_tests :
230
+ updated_spec ['tests' ] = [test .fixed_test () for test in tests ]
231
+
232
+ if known_args .create_test :
233
+ if 'tests' not in updated_spec :
234
+ updated_spec ['tests' ] = []
235
+ updated_spec ['tests' ].append (
236
+ yaml_testing .create_test (pipeline_spec , options ))
237
+
238
+ updated_yaml = yaml_utils .patch_yaml (original_yaml , updated_spec )
239
+ with open (path , 'w' ) as fout :
240
+ fout .write (updated_yaml )
241
+
242
+
243
+ def _build_pipeline_yaml_from_argv (argv ):
134
244
argv = _preparse_jinja_flags (argv )
135
245
known_args , pipeline_args = _parse_arguments (argv )
136
246
pipeline_template = _pipeline_spec_from_args (known_args )
137
247
pipeline_yaml = yaml_transform .expand_jinja (
138
248
pipeline_template , known_args .jinja_variables or {})
249
+ return known_args , pipeline_args , pipeline_template , pipeline_yaml
250
+
251
+
252
+ def _build_pipeline_options (pipeline_spec , pipeline_args ):
253
+ return beam .options .pipeline_options .PipelineOptions (
254
+ pipeline_args ,
255
+ pickle_library = 'cloudpickle' ,
256
+ ** yaml_transform .SafeLineLoader .strip_metadata (
257
+ pipeline_spec .get ('options' , {})))
258
+
259
+
260
+ def build_pipeline_components_from_argv (argv ):
261
+ (known_args , pipeline_args , pipeline_template ,
262
+ pipeline_yaml ) = _build_pipeline_yaml_from_argv (argv )
139
263
display_data = {
140
264
'yaml' : pipeline_yaml ,
141
265
'yaml_jinja_template' : pipeline_template ,
@@ -154,11 +278,7 @@ def build_pipeline_components_from_yaml(
154
278
pipeline_yaml , pipeline_args , validate_schema = 'generic' , pipeline_path = '' ):
155
279
pipeline_spec = yaml .load (pipeline_yaml , Loader = yaml_transform .SafeLineLoader )
156
280
157
- options = beam .options .pipeline_options .PipelineOptions (
158
- pipeline_args ,
159
- pickle_library = 'cloudpickle' ,
160
- ** yaml_transform .SafeLineLoader .strip_metadata (
161
- pipeline_spec .get ('options' , {})))
281
+ options = _build_pipeline_options (pipeline_spec , pipeline_args )
162
282
163
283
def constructor (root ):
164
284
if 'resource_hints' in pipeline_spec .get ('pipeline' , {}):
@@ -180,4 +300,7 @@ def constructor(root):
180
300
if __name__ == '__main__' :
181
301
import logging
182
302
logging .getLogger ().setLevel (logging .INFO )
183
- run ()
303
+ if '--test' in sys .argv :
304
+ run_tests ()
305
+ else :
306
+ run ()
0 commit comments