@@ -32,6 +32,11 @@ type trackEncoding struct {
32
32
rtxSrtpStream * srtpWriterFuture
33
33
rtxRtcpInterceptor interceptor.RTCPReader
34
34
rtxStreamInfo interceptor.StreamInfo
35
+
36
+ fecSsrc SSRC
37
+ fecSrtpStream * srtpWriterFuture
38
+ fecRtcpInterceptor interceptor.RTCPReader
39
+ fecStreamInfo interceptor.StreamInfo
35
40
}
36
41
37
42
// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
@@ -125,6 +130,7 @@ func (r *RTPSender) getParameters() RTPSendParameters {
125
130
SSRC : trackEncoding .ssrc ,
126
131
PayloadType : r .payloadType ,
127
132
RTX : RTPRtxParameters {SSRC : trackEncoding .rtxSsrc },
133
+ FEC : RTPFecParameters {SSRC : trackEncoding .fecSsrc },
128
134
},
129
135
})
130
136
}
@@ -223,6 +229,13 @@ func (r *RTPSender) addEncoding(track TrackLocal) {
223
229
}
224
230
}
225
231
232
+ if r .api .settingEngine .trackLocalFlexfec {
233
+ codecs := r .api .mediaEngine .getCodecsByKind (track .Kind ())
234
+ if len (codecParametersSearchByMimeType (MimeTypeFlexFEC03 , codecs )) > 0 {
235
+ trackEncoding .fecSsrc = SSRC (randutil .NewMathRandomGenerator ().Uint32 ())
236
+ }
237
+ }
238
+
226
239
r .trackEncodings = append (r .trackEncodings , trackEncoding )
227
240
}
228
241
@@ -314,8 +327,14 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
314
327
return errRTPSenderTrackRemoved
315
328
}
316
329
317
- for idx , trackEncoding := range r .trackEncodings {
330
+ for idx := range r .trackEncodings {
331
+ trackEncoding := r .trackEncodings [idx ]
332
+ srtpStream := & srtpWriterFuture {ssrc : parameters .Encodings [idx ].SSRC , rtpSender : r }
318
333
writeStream := & interceptorToTrackLocalWriter {}
334
+ fecCodecs := codecParametersSearchByMimeType (MimeTypeFlexFEC03 , r .api .mediaEngine .getCodecsByKind (r .kind ))
335
+
336
+ trackEncoding .srtpStream = srtpStream
337
+ trackEncoding .ssrc = parameters .Encodings [idx ].SSRC
319
338
trackEncoding .context = & baseTrackLocalContext {
320
339
id : r .id ,
321
340
params : r .api .mediaEngine .getRTPParametersByKind (trackEncoding .track .Kind (), []RTPTransceiverDirection {RTPTransceiverDirectionSendonly }),
@@ -337,7 +356,18 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
337
356
codec .RTPCodecCapability ,
338
357
parameters .HeaderExtensions ,
339
358
)
340
- srtpStream := trackEncoding .srtpStream
359
+
360
+ if len (fecCodecs ) > 0 {
361
+ trackEncoding .streamInfo .Attributes .Set ("flexfec-03" , struct {}{})
362
+ }
363
+
364
+ trackEncoding .rtcpInterceptor = r .api .interceptor .BindRTCPReader (
365
+ interceptor .RTCPReaderFunc (func (in []byte , a interceptor.Attributes ) (n int , _ interceptor.Attributes , err error ) {
366
+ n , err = trackEncoding .srtpStream .Read (in )
367
+ return n , a , err
368
+ }),
369
+ )
370
+
341
371
rtpInterceptor := r .api .interceptor .BindLocalStream (
342
372
& trackEncoding .streamInfo ,
343
373
interceptor .RTPWriterFunc (func (header * rtp.Header , payload []byte , attributes interceptor.Attributes ) (int , error ) {
@@ -376,6 +406,37 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
376
406
}),
377
407
)
378
408
}
409
+
410
+ if len (fecCodecs ) > 0 &&
411
+ parameters .Encodings [idx ].FEC .SSRC != 0 {
412
+ fecSrtpStream := & srtpWriterFuture {ssrc : parameters .Encodings [idx ].FEC .SSRC , rtpSender : r }
413
+
414
+ trackEncoding .fecSrtpStream = fecSrtpStream
415
+ trackEncoding .fecSsrc = parameters .Encodings [idx ].FEC .SSRC
416
+
417
+ trackEncoding .fecStreamInfo = * createStreamInfo (
418
+ r .id + "_fec" ,
419
+ parameters .Encodings [idx ].FEC .SSRC ,
420
+ fecCodecs [0 ].PayloadType ,
421
+ fecCodecs [0 ].RTPCodecCapability ,
422
+ parameters .HeaderExtensions ,
423
+ )
424
+ trackEncoding .fecStreamInfo .Attributes .Set ("apt_ssrc" , uint32 (parameters .Encodings [idx ].SSRC ))
425
+
426
+ trackEncoding .fecRtcpInterceptor = r .api .interceptor .BindRTCPReader (
427
+ interceptor .RTCPReaderFunc (func (in []byte , a interceptor.Attributes ) (n int , attributes interceptor.Attributes , err error ) {
428
+ n , err = trackEncoding .fecSrtpStream .Read (in )
429
+ return n , a , err
430
+ }),
431
+ )
432
+
433
+ r .api .interceptor .BindLocalStream (
434
+ & trackEncoding .fecStreamInfo ,
435
+ interceptor .RTPWriterFunc (func (header * rtp.Header , payload []byte , _ interceptor.Attributes ) (int , error ) {
436
+ return fecSrtpStream .WriteRTP (header , payload )
437
+ }),
438
+ )
439
+ }
379
440
}
380
441
381
442
close (r .sendCalled )
@@ -415,6 +476,10 @@ func (r *RTPSender) Stop() error {
415
476
r .api .interceptor .UnbindLocalStream (& trackEncoding .rtxStreamInfo )
416
477
errs = append (errs , trackEncoding .rtxSrtpStream .Close ())
417
478
}
479
+ if trackEncoding .fecSrtpStream != nil {
480
+ r .api .interceptor .UnbindLocalStream (& trackEncoding .fecStreamInfo )
481
+ errs = append (errs , trackEncoding .fecSrtpStream .Close ())
482
+ }
418
483
}
419
484
420
485
return util .FlattenErrs (errs )
@@ -476,6 +541,36 @@ func (r *RTPSender) ReadRtxRTCP() ([]rtcp.Packet, interceptor.Attributes, error)
476
541
return pkts , attributes , nil
477
542
}
478
543
544
+ // ReadFec reads incoming FEC Stream RTCP for this RTPSender
545
+ func (r * RTPSender ) ReadFec (b []byte ) (n int , a interceptor.Attributes , err error ) {
546
+ if r .trackEncodings [0 ].fecRtcpInterceptor == nil {
547
+ return 0 , nil , io .ErrNoProgress
548
+ }
549
+
550
+ select {
551
+ case <- r .sendCalled :
552
+ return r .trackEncodings [0 ].fecRtcpInterceptor .Read (b , a )
553
+ case <- r .stopCalled :
554
+ return 0 , nil , io .ErrClosedPipe
555
+ }
556
+ }
557
+
558
+ // ReadFecRTCP is a convenience method that wraps ReadFec and unmarshals for you.
559
+ func (r * RTPSender ) ReadFecRTCP () ([]rtcp.Packet , interceptor.Attributes , error ) {
560
+ b := make ([]byte , r .api .settingEngine .getReceiveMTU ())
561
+ i , attributes , err := r .ReadFec (b )
562
+ if err != nil {
563
+ return nil , nil , err
564
+ }
565
+
566
+ pkts , err := rtcp .Unmarshal (b [:i ])
567
+ if err != nil {
568
+ return nil , nil , err
569
+ }
570
+
571
+ return pkts , attributes , nil
572
+ }
573
+
479
574
// ReadSimulcast reads incoming RTCP for this RTPSender for given rid
480
575
func (r * RTPSender ) ReadSimulcast (b []byte , rid string ) (n int , a interceptor.Attributes , err error ) {
481
576
select {
0 commit comments