Skip to content

Commit 280be41

Browse files
committed
Add BatchIO and load balance on ports to UDPMux
Improve performance of UDPMux by BatchIO and load balance on ports
1 parent 0ec2333 commit 280be41

File tree

6 files changed

+277
-54
lines changed

6 files changed

+277
-54
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/pion/mdns v0.0.7
1111
github.com/pion/randutil v0.1.0
1212
github.com/pion/stun v0.6.1
13-
github.com/pion/transport/v2 v2.2.1
13+
github.com/pion/transport/v2 v2.2.2
1414
github.com/pion/turn/v2 v2.1.3
1515
github.com/stretchr/testify v1.8.4
1616
golang.org/x/net v0.13.0

go.sum

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB
1919
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
2020
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
2121
github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc=
22-
github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c=
2322
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
23+
github.com/pion/transport/v2 v2.2.2 h1:yv+EKSU2dpmInuCebQ1rsBFCYL7p+aV90xIlshSBO+A=
24+
github.com/pion/transport/v2 v2.2.2/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc=
2425
github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA=
2526
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
2627
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

udp_mux.go

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ type UDPMux interface {
2525
GetListenAddresses() []net.Addr
2626
}
2727

28+
// MuxConnCount return count of working connections created by the mux.
29+
type MuxConnCount interface {
30+
ConnCount() int
31+
}
32+
2833
// UDPMuxDefault is an implementation of the interface
2934
type UDPMuxDefault struct {
3035
params UDPMuxParams
@@ -176,6 +181,13 @@ func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, er
176181
return c, nil
177182
}
178183

184+
// ConnCount return count of working connections created by UDPMuxDefault
185+
func (m *UDPMuxDefault) ConnCount() int {
186+
m.mu.Lock()
187+
defer m.mu.Unlock()
188+
return len(m.connsIPv4) + len(m.connsIPv6)
189+
}
190+
179191
// RemoveConnByUfrag stops and removes the muxed packet connection
180192
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
181193
removedConns := make([]*udpMuxedConn, 0, 2)

udp_mux_multi.go

+155-24
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@
44
package ice
55

66
import (
7+
"errors"
78
"fmt"
89
"net"
10+
"time"
911

1012
"github.com/pion/logging"
1113
"github.com/pion/transport/v2"
1214
"github.com/pion/transport/v2/stdnet"
15+
tudp "github.com/pion/transport/v2/udp"
16+
)
17+
18+
var (
19+
errPortBalanceRequireConnCount = errors.New("Port balance requires UDPMux implements MuxConnCount interface")
1320
)
1421

1522
// MultiUDPMuxDefault implements both UDPMux and AllConnsGetter,
@@ -18,21 +25,108 @@ import (
1825
type MultiUDPMuxDefault struct {
1926
muxes []UDPMux
2027
localAddrToMux map[string]UDPMux
28+
29+
enablePortBalance bool
30+
// Manage port balance for mux that listen on multiple ports for same IP,
31+
// for each IP, only return one addr (one port) for each GetListenAddresses call to
32+
// avoid duplicate ip candidates be gathered for a single ice agent.
33+
multiPortsAddresses []*multiPortsAddress
34+
}
35+
36+
type addrMux struct {
37+
addr net.Addr
38+
mux MuxConnCount
39+
}
40+
41+
// each multiPortsAddress represents muxes listen on different ports of a same IP
42+
type multiPortsAddress struct {
43+
addresseMuxes []*addrMux
44+
}
45+
46+
func (mpa *multiPortsAddress) next() net.Addr {
47+
leastAddr, leastConns := mpa.addresseMuxes[0].addr, mpa.addresseMuxes[0].mux.ConnCount()
48+
for i := 1; i < len(mpa.addresseMuxes); i++ {
49+
am := mpa.addresseMuxes[i]
50+
if count := am.mux.ConnCount(); count < leastConns {
51+
leastConns = count
52+
leastAddr = am.addr
53+
}
54+
}
55+
return leastAddr
56+
}
57+
58+
// MultiUDPMuxOption provide options for NewMultiUDPMuxDefault
59+
type MultiUDPMuxOption func(*multipleUDPMuxDefaultParams)
60+
61+
// MultiUDPMuxOptionWithPortBalance enables load balancing traffic on multiple ports belonging to the same IP
62+
// When enabled, GetListenAddresses will return the port with the least number of connections for each corresponding IP
63+
func MultiUDPMuxOptionWithPortBalance() MultiUDPMuxOption {
64+
return func(params *multipleUDPMuxDefaultParams) {
65+
params.portBalance = true
66+
}
67+
}
68+
69+
type multipleUDPMuxDefaultParams struct {
70+
portBalance bool
2171
}
2272

2373
// NewMultiUDPMuxDefault creates an instance of MultiUDPMuxDefault that
2474
// uses the provided UDPMux instances.
2575
func NewMultiUDPMuxDefault(muxes ...UDPMux) *MultiUDPMuxDefault {
76+
mux, err := NewMultiUDPMuxDefaultWithOptions(muxes)
77+
if err != nil {
78+
panic(err)
79+
}
80+
return mux
81+
}
82+
83+
// NewMultiUDPMuxDefaultWithOptions creates an instance of MultiUDPMuxDefault that
84+
// uses the provided UDPMux instances and options.
85+
func NewMultiUDPMuxDefaultWithOptions(muxes []UDPMux, opts ...MultiUDPMuxOption) (*MultiUDPMuxDefault, error) {
86+
var params multipleUDPMuxDefaultParams
87+
for _, opt := range opts {
88+
opt(&params)
89+
}
90+
91+
if params.portBalance {
92+
for _, mux := range muxes {
93+
if _, ok := mux.(MuxConnCount); !ok {
94+
return nil, errPortBalanceRequireConnCount
95+
}
96+
}
97+
}
98+
2699
addrToMux := make(map[string]UDPMux)
100+
ipToAddrs := make(map[string]*multiPortsAddress)
27101
for _, mux := range muxes {
28102
for _, addr := range mux.GetListenAddresses() {
29103
addrToMux[addr.String()] = mux
104+
105+
if params.portBalance {
106+
muxCount, _ := mux.(MuxConnCount)
107+
udpAddr, _ := addr.(*net.UDPAddr)
108+
ip := udpAddr.IP.String()
109+
if mpa, ok := ipToAddrs[ip]; ok {
110+
mpa.addresseMuxes = append(mpa.addresseMuxes, &addrMux{addr, muxCount})
111+
} else {
112+
ipToAddrs[ip] = &multiPortsAddress{
113+
addresseMuxes: []*addrMux{{addr, muxCount}},
114+
}
115+
}
116+
}
30117
}
31118
}
32-
return &MultiUDPMuxDefault{
33-
muxes: muxes,
34-
localAddrToMux: addrToMux,
119+
120+
multiPortsAddresses := make([]*multiPortsAddress, 0, len(ipToAddrs))
121+
for _, mpa := range ipToAddrs {
122+
multiPortsAddresses = append(multiPortsAddresses, mpa)
35123
}
124+
return &MultiUDPMuxDefault{
125+
muxes: muxes,
126+
localAddrToMux: addrToMux,
127+
multiPortsAddresses: multiPortsAddresses,
128+
enablePortBalance: params.portBalance,
129+
}, nil
36130
}
37131

38132
// GetConn returns a PacketConn given the connection's ufrag and network
@@ -64,8 +158,18 @@ func (m *MultiUDPMuxDefault) Close() error {
64158
return err
65159
}
66160

67-
// GetListenAddresses returns the list of addresses that this mux is listening on
161+
// GetListenAddresses returns the list of addresses that this mux is listening on,
162+
// if port balance enabled and there are multiple muxes listening to different ports of the same IP addr,
163+
// it will return the mux that has the least number of connections.
68164
func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
165+
if m.enablePortBalance {
166+
addrs := make([]net.Addr, 0, len(m.multiPortsAddresses))
167+
for _, mpa := range m.multiPortsAddresses {
168+
addrs = append(addrs, mpa.next())
169+
}
170+
return addrs
171+
}
172+
69173
addrs := make([]net.Addr, 0, len(m.localAddrToMux))
70174
for _, mux := range m.muxes {
71175
addrs = append(addrs, mux.GetListenAddresses()...)
@@ -76,6 +180,12 @@ func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
76180
// NewMultiUDPMuxFromPort creates an instance of MultiUDPMuxDefault that
77181
// listen all interfaces on the provided port.
78182
func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
183+
return NewMultiUDPMuxFromPorts([]int{port}, opts...)
184+
}
185+
186+
// NewMultiUDPMuxFromPorts creates an instance of MultiUDPMuxDefault that
187+
// listens to all interfaces and balances traffic on the provided ports.
188+
func NewMultiUDPMuxFromPorts(ports []int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
79189
params := multiUDPMuxFromPortParam{
80190
networks: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
81191
}
@@ -95,20 +205,29 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
95205
return nil, err
96206
}
97207

98-
conns := make([]net.PacketConn, 0, len(ips))
208+
conns := make([]net.PacketConn, 0, len(ports)*len(ips))
99209
for _, ip := range ips {
100-
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
101-
if listenErr != nil {
102-
err = listenErr
103-
break
210+
for _, port := range ports {
211+
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
212+
if listenErr != nil {
213+
err = listenErr
214+
break
215+
}
216+
if params.readBufferSize > 0 {
217+
_ = conn.SetReadBuffer(params.readBufferSize)
218+
}
219+
if params.writeBufferSize > 0 {
220+
_ = conn.SetWriteBuffer(params.writeBufferSize)
221+
}
222+
if params.batchWriteSize > 0 {
223+
conns = append(conns, tudp.NewBatchConn(conn, params.batchWriteSize, params.batchWriteInterval))
224+
} else {
225+
conns = append(conns, conn)
226+
}
104227
}
105-
if params.readBufferSize > 0 {
106-
_ = conn.SetReadBuffer(params.readBufferSize)
107-
}
108-
if params.writeBufferSize > 0 {
109-
_ = conn.SetWriteBuffer(params.writeBufferSize)
228+
if err != nil {
229+
break
110230
}
111-
conns = append(conns, conn)
112231
}
113232

114233
if err != nil {
@@ -128,7 +247,7 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
128247
muxes = append(muxes, mux)
129248
}
130249

131-
return NewMultiUDPMuxDefault(muxes...), nil
250+
return NewMultiUDPMuxDefaultWithOptions(muxes, MultiUDPMuxOptionWithPortBalance())
132251
}
133252

134253
// UDPMuxFromPortOption provide options for NewMultiUDPMuxFromPort
@@ -137,14 +256,16 @@ type UDPMuxFromPortOption interface {
137256
}
138257

139258
type multiUDPMuxFromPortParam struct {
140-
ifFilter func(string) bool
141-
ipFilter func(ip net.IP) bool
142-
networks []NetworkType
143-
readBufferSize int
144-
writeBufferSize int
145-
logger logging.LeveledLogger
146-
includeLoopback bool
147-
net transport.Net
259+
ifFilter func(string) bool
260+
ipFilter func(ip net.IP) bool
261+
networks []NetworkType
262+
readBufferSize int
263+
writeBufferSize int
264+
logger logging.LeveledLogger
265+
includeLoopback bool
266+
net transport.Net
267+
batchWriteSize int
268+
batchWriteInterval time.Duration
148269
}
149270

150271
type udpMuxFromPortOption struct {
@@ -226,3 +347,13 @@ func UDPMuxFromPortWithNet(n transport.Net) UDPMuxFromPortOption {
226347
},
227348
}
228349
}
350+
351+
// UDPMuxFromPortWithBatchWrite enable batch write for UDPMux
352+
func UDPMuxFromPortWithBatchWrite(batchWriteSize int, batchWriteInterval time.Duration) UDPMuxFromPortOption {
353+
return &udpMuxFromPortOption{
354+
f: func(p *multiUDPMuxFromPortParam) {
355+
p.batchWriteSize = batchWriteSize
356+
p.batchWriteInterval = batchWriteInterval
357+
},
358+
}
359+
}

0 commit comments

Comments
 (0)