Skip to content

Commit 458d7a7

Browse files
authored
Fix new flaky postcommit python tests (#34638)
* Fix new flaky postcommit python test * Formatting * Change streaming wordcount pipeline's wait duration to 1200 seconds
1 parent bebfcbc commit 458d7a7

File tree

4 files changed

+15
-9
lines changed

4 files changed

+15
-9
lines changed

sdks/python/apache_beam/examples/streaming_wordcount_it_test.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
OUTPUT_SUB = 'wc_subscription_output'
4040

4141
DEFAULT_INPUT_NUMBERS = 500
42-
WAIT_UNTIL_FINISH_DURATION = 10 * 60 * 1000 # in milliseconds
42+
WAIT_UNTIL_FINISH_DURATION = 20 * 60 * 1000 # in milliseconds
4343

4444

4545
class StreamingWordCountIT(unittest.TestCase):

sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py

+3
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,8 @@ def test_iobase_source(self):
729729

730730

731731
class ReadAllBQTests(BigQueryReadIntegrationTests):
732+
TABLE_DATA_AVAILABILITY_WAIT_SECONDS = 30
733+
732734
TABLE_DATA_1 = [{
733735
'number': 1, 'str': 'abc'
734736
}, {
@@ -789,6 +791,7 @@ def create_table(cls, table_name, data, table_schema):
789791
_ = cls.bigquery_client.get_table(cls.project, cls.dataset_id, table_name)
790792
cls.bigquery_client.insert_rows(
791793
cls.project, cls.dataset_id, table_name, data)
794+
time.sleep(cls.TABLE_DATA_AVAILABILITY_WAIT_SECONDS)
792795
return table_schema
793796

794797
@classmethod

sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def test_big_query_write_schema_autodetect(self):
180180
if self.runner_name == 'TestDataflowRunner':
181181
self.skipTest('DataflowRunner does not support schema autodetection')
182182

183-
table_name = 'python_write_table'
183+
table_name = 'python_write_table_schema_autodetect'
184184
table_id = '{}.{}'.format(self.dataset_id, table_name)
185185

186186
input_data = [
@@ -383,7 +383,7 @@ def test_big_query_write_insert_errors_reporting(self):
383383
Test that errors returned by beam.io.WriteToBigQuery
384384
contain both the failed rows and the reason for it failing.
385385
"""
386-
table_name = 'python_write_table'
386+
table_name = 'python_write_table_insert_errors'
387387
table_id = '{}.{}'.format(self.dataset_id, table_name)
388388

389389
input_data = [{

sdks/python/apache_beam/io/gcp/gcsio_integration_test.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@
5959
except ImportError:
6060
NotFound = None
6161

62-
# Number of seconds to wait for bucket deletion to propagate.
63-
WAIT_DELETE_BUCKET_PROPAGATION_SECONDS = 10
62+
# Number of seconds to wait for bucket deletion or creation to propagate.
63+
WAIT_BUCKET_PROPAGATION_SECONDS = 60
6464

6565

6666
@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
@@ -208,15 +208,17 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
208208
google_cloud_options.dataflow_kms_key = None
209209

210210
import random
211-
from hashlib import md5
211+
from hashlib import blake2b
212212
# Add a random number to avoid collision if multiple test instances
213213
# are run at the same time. To avoid too many dangling buckets if bucket
214214
# removal fails, we limit the max number of possible bucket names in this
215215
# test to 1000.
216-
overridden_bucket_name = 'gcsio-it-%d-%s-%s' % (
216+
overridden_bucket_name = 'gcsio-it-%d-%s-%s-%d' % (
217217
random.randint(0, 999),
218218
google_cloud_options.region,
219-
md5(google_cloud_options.project.encode('utf8')).hexdigest())
219+
blake2b(google_cloud_options.project.encode('utf8'),
220+
digest_size=4).hexdigest(),
221+
int(time.time()))
220222

221223
mock_default_gcs_bucket_name.return_value = overridden_bucket_name
222224

@@ -225,13 +227,14 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
225227
if existing_bucket:
226228
try:
227229
existing_bucket.delete()
228-
time.sleep(WAIT_DELETE_BUCKET_PROPAGATION_SECONDS)
230+
time.sleep(WAIT_BUCKET_PROPAGATION_SECONDS)
229231
except NotFound:
230232
# Bucket existence check from get_bucket may be inaccurate due to gcs
231233
# cache or delay
232234
pass
233235

234236
bucket = gcsio.get_or_create_default_gcs_bucket(google_cloud_options)
237+
time.sleep(WAIT_BUCKET_PROPAGATION_SECONDS)
235238
self.assertIsNotNone(bucket)
236239
self.assertEqual(bucket.name, overridden_bucket_name)
237240

0 commit comments

Comments
 (0)