45
45
import java .util .stream .LongStream ;
46
46
import java .util .stream .Stream ;
47
47
import org .apache .beam .runners .direct .DirectOptions ;
48
+ import org .apache .beam .runners .direct .DirectRunner ;
48
49
import org .apache .beam .sdk .extensions .gcp .options .GcpOptions ;
49
50
import org .apache .beam .sdk .extensions .gcp .options .GcsOptions ;
50
51
import org .apache .beam .sdk .extensions .gcp .util .GcsUtil ;
@@ -212,7 +213,11 @@ public void cleanUp() throws Exception {
212
213
private static final String RANDOM = UUID .randomUUID ().toString ();
213
214
@ Rule public TestPipeline pipeline = TestPipeline .create ();
214
215
@ Rule public TestName testName = new TestName ();
215
- @ Rule public transient Timeout globalTimeout = Timeout .seconds (180 );
216
+
217
+ @ Rule
218
+ public transient Timeout globalTimeout =
219
+ Timeout .seconds (OPTIONS .getRunner ().equals (DirectRunner .class ) ? 180 : 20 * 60 );
220
+
216
221
private static final int NUM_SHARDS = 10 ;
217
222
private static final Logger LOG = LoggerFactory .getLogger (IcebergCatalogBaseIT .class );
218
223
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
@@ -444,15 +449,18 @@ public void testStreamingReadBetweenTimestamps() throws Exception {
444
449
public void testWriteRead () throws IOException {
445
450
Table table = catalog .createTable (TableIdentifier .parse (tableId ()), ICEBERG_SCHEMA );
446
451
List <Row > expectedRows = populateTable (table );
447
- Map <String , Object > config = managedIcebergConfig (tableId ());
452
+ Map <String , Object > readConfig = managedIcebergConfig (tableId ());
453
+ String writeTableId = tableId () + "_2" ;
454
+ Map <String , Object > writeConfig = managedIcebergConfig (writeTableId );
448
455
449
456
pipeline
450
- .apply ("read" , Managed .read (ICEBERG ).withConfig (config ))
457
+ .apply ("read" , Managed .read (ICEBERG ).withConfig (readConfig ))
451
458
.getSinglePCollection ()
452
- .apply ("write" , Managed .write (ICEBERG ).withConfig (config ));
459
+ .apply ("write" , Managed .write (ICEBERG ).withConfig (writeConfig ));
453
460
pipeline .run ().waitUntilFinish ();
454
461
455
- List <Record > returnedRecords = readRecords (table );
462
+ List <Record > returnedRecords =
463
+ readRecords (catalog .loadTable (TableIdentifier .parse (writeTableId )));
456
464
assertThat (
457
465
returnedRecords ,
458
466
containsInAnyOrder (expectedRows .stream ().map (RECORD_FUNC ::apply ).toArray ()));
@@ -469,16 +477,19 @@ public void testReadWriteStreaming() throws IOException {
469
477
readConfig .put ("to_timestamp" , System .currentTimeMillis ());
470
478
readConfig .put ("streaming" , true );
471
479
480
+ String writeTableId = tableId () + "_2" ;
472
481
Map <String , Object > writeConfig = new HashMap <>(config );
473
482
writeConfig .put ("triggering_frequency_seconds" , 5 );
483
+ writeConfig .put ("table" , writeTableId );
474
484
475
485
pipeline
476
486
.apply ("streaming read" , Managed .read (ICEBERG_CDC ).withConfig (readConfig ))
477
487
.getSinglePCollection ()
478
488
.apply ("streaming write" , Managed .write (ICEBERG ).withConfig (writeConfig ));
479
489
pipeline .run ().waitUntilFinish ();
480
490
481
- List <Record > returnedRecords = readRecords (table );
491
+ List <Record > returnedRecords =
492
+ readRecords (catalog .loadTable (TableIdentifier .parse (writeTableId )));
482
493
assertThat (
483
494
returnedRecords ,
484
495
containsInAnyOrder (expectedRows .stream ().map (RECORD_FUNC ::apply ).toArray ()));
0 commit comments