@@ -393,89 +393,103 @@ describe('createWebSocketStream', () => {
393
393
} ) ;
394
394
395
395
it ( 'handles backpressure (2/3)' , ( done ) => {
396
- const wss = new WebSocket . Server ( { port : 0 } , ( ) => {
397
- const called = [ ] ;
398
- const ws = new WebSocket ( `ws://localhost:${ wss . address ( ) . port } ` ) ;
399
- const duplex = createWebSocketStream ( ws ) ;
400
- const read = duplex . _read ;
396
+ const wss = new WebSocket . Server (
397
+ { port : 0 , perMessageDeflate : true } ,
398
+ ( ) => {
399
+ const called = [ ] ;
400
+ const ws = new WebSocket ( `ws://localhost:${ wss . address ( ) . port } ` ) ;
401
+ const duplex = createWebSocketStream ( ws ) ;
402
+ const read = duplex . _read ;
401
403
402
- duplex . _read = ( ) => {
403
- called . push ( 'read' ) ;
404
- assert . ok ( ws . _receiver . _writableState . needDrain ) ;
405
- read ( ) ;
406
- assert . ok ( ws . _socket . isPaused ( ) ) ;
407
- } ;
404
+ duplex . _read = ( ) => {
405
+ duplex . _read = read ;
406
+ called . push ( 'read' ) ;
407
+ assert . ok ( ws . _receiver . _writableState . needDrain ) ;
408
+ read ( ) ;
409
+ assert . ok ( ws . _socket . isPaused ( ) ) ;
410
+ } ;
408
411
409
- ws . on ( 'open' , ( ) => {
410
- ws . _socket . on ( 'pause' , ( ) => {
411
- duplex . resume ( ) ;
412
- } ) ;
412
+ ws . on ( 'open' , ( ) => {
413
+ ws . _socket . on ( 'pause' , ( ) => {
414
+ duplex . resume ( ) ;
415
+ } ) ;
413
416
414
- ws . _receiver . on ( 'drain' , ( ) => {
415
- called . push ( 'drain' ) ;
416
- assert . ok ( ! ws . _socket . isPaused ( ) ) ;
417
- duplex . end ( ) ;
418
- } ) ;
417
+ ws . _receiver . on ( 'drain' , ( ) => {
418
+ called . push ( 'drain' ) ;
419
+ assert . ok ( ! ws . _socket . isPaused ( ) ) ;
420
+ duplex . end ( ) ;
421
+ } ) ;
419
422
420
- const list = Sender . frame ( randomBytes ( 16 * 1024 ) , {
421
- fin : true ,
422
- rsv1 : false ,
423
- opcode : 0x02 ,
424
- mask : false ,
425
- readOnly : false
423
+ const opts = {
424
+ fin : true ,
425
+ opcode : 0x02 ,
426
+ mask : false ,
427
+ readOnly : false
428
+ } ;
429
+
430
+ const list = [
431
+ ...Sender . frame ( randomBytes ( 16 * 1024 ) , { rsv1 : false , ...opts } ) ,
432
+ ...Sender . frame ( Buffer . alloc ( 1 ) , { rsv1 : true , ...opts } )
433
+ ] ;
434
+
435
+ // This hack is used because there is no guarantee that more than
436
+ // 16 KiB will be sent as a single TCP packet.
437
+ ws . _socket . push ( Buffer . concat ( list ) ) ;
426
438
} ) ;
427
439
428
- // This hack is used because there is no guarantee that more than
429
- // 16KiB will be sent as a single TCP packet.
430
- ws . _socket . push ( Buffer . concat ( list ) ) ;
431
- } ) ;
432
-
433
- duplex . on ( 'close' , ( ) => {
434
- assert . deepStrictEqual ( called , [ 'read' , 'drain' ] ) ;
435
- wss . close ( done ) ;
436
- } ) ;
437
- } ) ;
440
+ duplex . on ( 'close' , ( ) => {
441
+ assert . deepStrictEqual ( called , [ 'read' , 'drain' ] ) ;
442
+ wss . close ( done ) ;
443
+ } ) ;
444
+ }
445
+ ) ;
438
446
} ) ;
439
447
440
448
it ( 'handles backpressure (3/3)' , ( done ) => {
441
- const wss = new WebSocket . Server ( { port : 0 } , ( ) => {
442
- const called = [ ] ;
443
- const ws = new WebSocket ( `ws://localhost:${ wss . address ( ) . port } ` ) ;
444
- const duplex = createWebSocketStream ( ws ) ;
449
+ const wss = new WebSocket . Server (
450
+ { port : 0 , perMessageDeflate : true } ,
451
+ ( ) => {
452
+ const called = [ ] ;
453
+ const ws = new WebSocket ( `ws://localhost:${ wss . address ( ) . port } ` ) ;
454
+ const duplex = createWebSocketStream ( ws ) ;
455
+ const read = duplex . _read ;
445
456
446
- const read = duplex . _read ;
457
+ duplex . _read = ( ) => {
458
+ called . push ( 'read' ) ;
459
+ assert . ok ( ! ws . _receiver . _writableState . needDrain ) ;
460
+ read ( ) ;
461
+ assert . ok ( ! ws . _socket . isPaused ( ) ) ;
462
+ duplex . end ( ) ;
463
+ } ;
447
464
448
- duplex . _read = ( ) => {
449
- called . push ( 'read' ) ;
450
- assert . ok ( ! ws . _receiver . _writableState . needDrain ) ;
451
- read ( ) ;
452
- assert . ok ( ! ws . _socket . isPaused ( ) ) ;
453
- duplex . end ( ) ;
454
- } ;
465
+ ws . on ( 'open' , ( ) => {
466
+ ws . _receiver . on ( 'drain' , ( ) => {
467
+ called . push ( 'drain' ) ;
468
+ assert . ok ( ws . _socket . isPaused ( ) ) ;
469
+ duplex . resume ( ) ;
470
+ } ) ;
455
471
456
- ws . on ( 'open' , ( ) => {
457
- ws . _receiver . on ( 'drain' , ( ) => {
458
- called . push ( 'drain' ) ;
459
- assert . ok ( ws . _socket . isPaused ( ) ) ;
460
- duplex . resume ( ) ;
461
- } ) ;
472
+ const opts = {
473
+ fin : true ,
474
+ opcode : 0x02 ,
475
+ mask : false ,
476
+ readOnly : false
477
+ } ;
462
478
463
- const list = Sender . frame ( randomBytes ( 16 * 1024 ) , {
464
- fin : true ,
465
- rsv1 : false ,
466
- opcode : 0x02 ,
467
- mask : false ,
468
- readOnly : false
469
- } ) ;
479
+ const list = [
480
+ ...Sender . frame ( randomBytes ( 16 * 1024 ) , { rsv1 : false , ...opts } ) ,
481
+ ...Sender . frame ( Buffer . alloc ( 1 ) , { rsv1 : true , ...opts } )
482
+ ] ;
470
483
471
- ws . _socket . push ( Buffer . concat ( list ) ) ;
472
- } ) ;
484
+ ws . _socket . push ( Buffer . concat ( list ) ) ;
485
+ } ) ;
473
486
474
- duplex . on ( 'close' , ( ) => {
475
- assert . deepStrictEqual ( called , [ 'drain' , 'read' ] ) ;
476
- wss . close ( done ) ;
477
- } ) ;
478
- } ) ;
487
+ duplex . on ( 'close' , ( ) => {
488
+ assert . deepStrictEqual ( called , [ 'drain' , 'read' ] ) ;
489
+ wss . close ( done ) ;
490
+ } ) ;
491
+ }
492
+ ) ;
479
493
} ) ;
480
494
481
495
it ( 'can be destroyed (1/2)' , ( done ) => {
0 commit comments