@@ -6,68 +6,144 @@ const tcp = require('net')
6
6
const multiaddr = require ( 'multiaddr' )
7
7
const Address6 = require ( 'ip-address' ) . Address6
8
8
const mafmt = require ( 'mafmt' )
9
- const parallel = require ( 'run-parallel' )
9
+ // const parallel = require('run-parallel')
10
10
const contains = require ( 'lodash.contains' )
11
+ const os = require ( 'os' )
11
12
12
13
exports = module . exports = TCP
13
14
14
15
const IPFS_CODE = 421
15
- const CLOSE_TIMEOUT = 300
16
+ const CLOSE_TIMEOUT = 2000
16
17
17
18
function TCP ( ) {
18
19
if ( ! ( this instanceof TCP ) ) {
19
20
return new TCP ( )
20
21
}
21
22
22
- const listeners = [ ]
23
-
24
- this . dial = function ( multiaddr , options ) {
23
+ /*
24
+ this.dial = function (ma, options) {
25
25
if (!options) {
26
26
options = {}
27
27
}
28
28
options.ready = options.ready || function noop () {}
29
- const conn = tcp . connect ( multiaddr . toOptions ( ) , options . ready )
29
+ const conn = tcp.connect(ma .toOptions(), options.ready)
30
30
conn.getObservedAddrs = () => {
31
31
return [multiaddr]
32
32
}
33
33
return conn
34
34
}
35
+ */
35
36
36
- this . createListener = ( multiaddrs , handler , callback ) => {
37
- if ( ! Array . isArray ( multiaddrs ) ) {
38
- multiaddrs = [ multiaddrs ]
37
+ this . createListener = ( options , handler ) => {
38
+ if ( typeof options === 'function' ) {
39
+ handler = options
40
+ options = { }
39
41
}
40
42
41
- const freshMultiaddrs = [ ]
43
+ const listener = tcp . createServer ( ( socket ) => {
44
+ // TODO update to Connection
45
+ socket . getObservedAddrs = ( ) => {
46
+ return [ getMultiaddr ( socket ) ]
47
+ }
48
+ handler ( socket )
49
+ } )
42
50
43
- parallel ( multiaddrs . map ( ( m ) => ( cb ) => {
44
- let ipfsHashId
45
- if ( contains ( m . protoNames ( ) , 'ipfs' ) ) {
46
- ipfsHashId = m . stringTuples ( ) . filter ( ( tuple ) => {
51
+ let ipfsId
52
+ let listeningMultiaddr
53
+
54
+ listener . _listen = listener . listen
55
+ listener . listen = ( ma , callback ) => {
56
+ listeningMultiaddr = ma
57
+ if ( contains ( ma . protoNames ( ) , 'ipfs' ) ) {
58
+ ipfsId = ma . stringTuples ( ) . filter ( ( tuple ) => {
47
59
if ( tuple [ 0 ] === IPFS_CODE ) {
48
60
return true
49
61
}
50
62
} ) [ 0 ] [ 1 ]
51
- m = m . decapsulate ( 'ipfs' )
63
+ listeningMultiaddr = ma . decapsulate ( 'ipfs' )
52
64
}
53
65
54
- const listener = tcp . createServer ( ( conn ) => {
55
- conn . getObservedAddrs = ( ) => {
56
- return [ getMultiaddr ( conn ) ]
57
- }
58
- handler ( conn )
59
- } )
66
+ listener . _listen ( listeningMultiaddr . toOptions ( ) , callback )
67
+ }
60
68
61
- listener . __connections = { }
62
- listener . on ( 'connection' , ( conn ) => {
63
- const key = `${ conn . remoteAddress } :${ conn . remotePort } `
64
- listener . __connections [ key ] = conn
69
+ listener . _close = listener . close
70
+ listener . close = ( options , callback ) => {
71
+ if ( typeof options === 'function' ) {
72
+ callback = options
73
+ options = { }
74
+ }
75
+ if ( ! callback ) { callback = function noop ( ) { } }
76
+ if ( ! options ) { options = { } }
65
77
66
- conn . on ( 'close' , ( ) => {
67
- delete listener . __connections [ key ]
78
+ let closed = false
79
+ listener . _close ( callback )
80
+ listener . once ( 'close' , ( ) => {
81
+ closed = true
82
+ } )
83
+ setTimeout ( ( ) => {
84
+ if ( closed ) {
85
+ return
86
+ }
87
+ log ( 'unable to close graciously, destroying conns' )
88
+ Object . keys ( listener . __connections ) . forEach ( ( key ) => {
89
+ log ( 'destroying %s' , key )
90
+ listener . __connections [ key ] . destroy ( )
68
91
} )
92
+ } , options . timeout || CLOSE_TIMEOUT )
93
+ }
94
+
95
+ // Keep track of open connections to destroy in case of timeout
96
+ listener . __connections = { }
97
+ listener . on ( 'connection' , ( socket ) => {
98
+ const key = `${ socket . remoteAddress } :${ socket . remotePort } `
99
+ listener . __connections [ key ] = socket
100
+
101
+ socket . on ( 'close' , ( ) => {
102
+ delete listener . __connections [ key ]
69
103
} )
104
+ } )
70
105
106
+ listener . getAddrs = ( callback ) => {
107
+ const multiaddrs = [ ]
108
+ const address = listener . address ( )
109
+
110
+ // Because TCP will only return the IPv6 version
111
+ // we need to capture from the passed multiaddr
112
+ if ( listeningMultiaddr . toString ( ) . indexOf ( 'ip4' ) !== - 1 ) {
113
+ let m = listeningMultiaddr . decapsulate ( 'tcp' )
114
+ m = m . encapsulate ( '/tcp/' + address . port )
115
+ if ( ipfsId ) {
116
+ m = m . encapsulate ( '/ipfs/' + ipfsId )
117
+ }
118
+
119
+ if ( m . toString ( ) . indexOf ( '0.0.0.0' ) !== - 1 ) {
120
+ const netInterfaces = os . networkInterfaces ( )
121
+ Object . keys ( netInterfaces ) . forEach ( ( niKey ) => {
122
+ netInterfaces [ niKey ] . forEach ( ( ni ) => {
123
+ if ( ni . family === 'IPv4' ) {
124
+ multiaddrs . push ( multiaddr ( m . toString ( ) . replace ( '0.0.0.0' , ni . address ) ) )
125
+ }
126
+ } )
127
+ } )
128
+ } else {
129
+ multiaddrs . push ( m )
130
+ }
131
+ }
132
+
133
+ if ( address . family === 'IPv6' ) {
134
+ let ma = multiaddr ( '/ip6/' + address . address + '/tcp/' + address . port )
135
+ if ( ipfsId ) {
136
+ ma = ma . encapsulate ( '/ipfs/' + ipfsId )
137
+ }
138
+
139
+ multiaddrs . push ( ma )
140
+ }
141
+
142
+ callback ( null , multiaddrs )
143
+ }
144
+
145
+ return listener
146
+ /*
71
147
listener.listen(m.toOptions(), () => {
72
148
// Node.js likes to convert addr to IPv6 (when 0.0.0.0 for e.g)
73
149
const address = listener.address()
@@ -92,28 +168,7 @@ function TCP () {
92
168
cb()
93
169
})
94
170
listeners.push(listener)
95
- } ) , ( err ) => {
96
- callback ( err , freshMultiaddrs )
97
- } )
98
- }
99
-
100
- this . close = ( callback ) => {
101
- log ( 'closing' )
102
- if ( listeners . length === 0 ) {
103
- log ( 'Called close with no active listeners' )
104
- return callback ( )
105
- }
106
-
107
- parallel ( listeners . map ( ( listener ) => ( cb ) => {
108
- setTimeout ( ( ) => {
109
- Object . keys ( listener . __connections ) . forEach ( ( key ) => {
110
- log ( 'destroying %s' , key )
111
- listener . __connections [ key ] . destroy ( )
112
- } )
113
- } , CLOSE_TIMEOUT )
114
-
115
- listener . close ( cb )
116
- } ) , callback )
171
+ */
117
172
}
118
173
119
174
this . filter = ( multiaddrs ) => {
0 commit comments