@@ -6,68 +6,166 @@ const tcp = require('net')
6
6
const multiaddr = require ( 'multiaddr' )
7
7
const Address6 = require ( 'ip-address' ) . Address6
8
8
const mafmt = require ( 'mafmt' )
9
- const parallel = require ( 'run-parallel' )
9
+ // const parallel = require('run-parallel')
10
10
const contains = require ( 'lodash.contains' )
11
+ const os = require ( 'os' )
12
+ const Connection = require ( 'interface-connection' ) . Connection
11
13
12
14
exports = module . exports = TCP
13
15
14
16
const IPFS_CODE = 421
15
- const CLOSE_TIMEOUT = 300
17
+ const CLOSE_TIMEOUT = 2000
16
18
17
19
function TCP ( ) {
18
20
if ( ! ( this instanceof TCP ) ) {
19
21
return new TCP ( )
20
22
}
21
23
22
- const listeners = [ ]
23
-
24
- this . dial = function ( multiaddr , options ) {
25
- if ( ! options ) {
24
+ this . dial = function ( ma , options , callback ) {
25
+ if ( typeof options === 'function' ) {
26
+ callback = options
26
27
options = { }
27
28
}
28
- options . ready = options . ready || function noop ( ) { }
29
- const conn = tcp . connect ( multiaddr . toOptions ( ) , options . ready )
30
- conn . getObservedAddrs = ( ) => {
31
- return [ multiaddr ]
29
+
30
+ if ( ! callback ) {
31
+ callback = function noop ( ) { }
32
+ }
33
+
34
+ const socket = tcp . connect ( ma . toOptions ( ) )
35
+ const conn = new Connection ( socket )
36
+
37
+ socket . on ( 'timeout' , ( ) => {
38
+ conn . emit ( 'timeout' )
39
+ } )
40
+
41
+ socket . on ( 'error' , ( err ) => {
42
+ callback ( err )
43
+ conn . emit ( 'error' , err )
44
+ } )
45
+
46
+ socket . on ( 'connect' , ( ) => {
47
+ callback ( null , conn )
48
+ conn . emit ( 'connect' )
49
+ } )
50
+
51
+ conn . getObservedAddrs = ( cb ) => {
52
+ return cb ( null , [ ma ] )
32
53
}
54
+
33
55
return conn
34
56
}
35
57
36
- this . createListener = ( multiaddrs , handler , callback ) => {
37
- if ( ! Array . isArray ( multiaddrs ) ) {
38
- multiaddrs = [ multiaddrs ]
58
+ this . createListener = ( options , handler ) => {
59
+ if ( typeof options === 'function' ) {
60
+ handler = options
61
+ options = { }
39
62
}
40
63
41
- const freshMultiaddrs = [ ]
64
+ const listener = tcp . createServer ( ( socket ) => {
65
+ const conn = new Connection ( socket )
66
+
67
+ conn . getObservedAddrs = ( cb ) => {
68
+ return cb ( null , [ getMultiaddr ( socket ) ] )
69
+ }
70
+ handler ( conn )
71
+ } )
42
72
43
- parallel ( multiaddrs . map ( ( m ) => ( cb ) => {
44
- let ipfsHashId
45
- if ( contains ( m . protoNames ( ) , 'ipfs' ) ) {
46
- ipfsHashId = m . stringTuples ( ) . filter ( ( tuple ) => {
73
+ let ipfsId
74
+ let listeningMultiaddr
75
+
76
+ listener . _listen = listener . listen
77
+ listener . listen = ( ma , callback ) => {
78
+ listeningMultiaddr = ma
79
+ if ( contains ( ma . protoNames ( ) , 'ipfs' ) ) {
80
+ ipfsId = ma . stringTuples ( ) . filter ( ( tuple ) => {
47
81
if ( tuple [ 0 ] === IPFS_CODE ) {
48
82
return true
49
83
}
50
84
} ) [ 0 ] [ 1 ]
51
- m = m . decapsulate ( 'ipfs' )
85
+ listeningMultiaddr = ma . decapsulate ( 'ipfs' )
52
86
}
53
87
54
- const listener = tcp . createServer ( ( conn ) => {
55
- conn . getObservedAddrs = ( ) => {
56
- return [ getMultiaddr ( conn ) ]
57
- }
58
- handler ( conn )
59
- } )
88
+ listener . _listen ( listeningMultiaddr . toOptions ( ) , callback )
89
+ }
60
90
61
- listener . __connections = { }
62
- listener . on ( 'connection' , ( conn ) => {
63
- const key = `${ conn . remoteAddress } :${ conn . remotePort } `
64
- listener . __connections [ key ] = conn
91
+ listener . _close = listener . close
92
+ listener . close = ( options , callback ) => {
93
+ if ( typeof options === 'function' ) {
94
+ callback = options
95
+ options = { }
96
+ }
97
+ if ( ! callback ) { callback = function noop ( ) { } }
98
+ if ( ! options ) { options = { } }
65
99
66
- conn . on ( 'close' , ( ) => {
67
- delete listener . __connections [ key ]
100
+ let closed = false
101
+ listener . _close ( callback )
102
+ listener . once ( 'close' , ( ) => {
103
+ closed = true
104
+ } )
105
+ setTimeout ( ( ) => {
106
+ if ( closed ) {
107
+ return
108
+ }
109
+ log ( 'unable to close graciously, destroying conns' )
110
+ Object . keys ( listener . __connections ) . forEach ( ( key ) => {
111
+ log ( 'destroying %s' , key )
112
+ listener . __connections [ key ] . destroy ( )
68
113
} )
114
+ } , options . timeout || CLOSE_TIMEOUT )
115
+ }
116
+
117
+ // Keep track of open connections to destroy in case of timeout
118
+ listener . __connections = { }
119
+ listener . on ( 'connection' , ( socket ) => {
120
+ const key = `${ socket . remoteAddress } :${ socket . remotePort } `
121
+ listener . __connections [ key ] = socket
122
+
123
+ socket . on ( 'close' , ( ) => {
124
+ delete listener . __connections [ key ]
69
125
} )
126
+ } )
127
+
128
+ listener . getAddrs = ( callback ) => {
129
+ const multiaddrs = [ ]
130
+ const address = listener . address ( )
131
+
132
+ // Because TCP will only return the IPv6 version
133
+ // we need to capture from the passed multiaddr
134
+ if ( listeningMultiaddr . toString ( ) . indexOf ( 'ip4' ) !== - 1 ) {
135
+ let m = listeningMultiaddr . decapsulate ( 'tcp' )
136
+ m = m . encapsulate ( '/tcp/' + address . port )
137
+ if ( ipfsId ) {
138
+ m = m . encapsulate ( '/ipfs/' + ipfsId )
139
+ }
140
+
141
+ if ( m . toString ( ) . indexOf ( '0.0.0.0' ) !== - 1 ) {
142
+ const netInterfaces = os . networkInterfaces ( )
143
+ Object . keys ( netInterfaces ) . forEach ( ( niKey ) => {
144
+ netInterfaces [ niKey ] . forEach ( ( ni ) => {
145
+ if ( ni . family === 'IPv4' ) {
146
+ multiaddrs . push ( multiaddr ( m . toString ( ) . replace ( '0.0.0.0' , ni . address ) ) )
147
+ }
148
+ } )
149
+ } )
150
+ } else {
151
+ multiaddrs . push ( m )
152
+ }
153
+ }
154
+
155
+ if ( address . family === 'IPv6' ) {
156
+ let ma = multiaddr ( '/ip6/' + address . address + '/tcp/' + address . port )
157
+ if ( ipfsId ) {
158
+ ma = ma . encapsulate ( '/ipfs/' + ipfsId )
159
+ }
160
+
161
+ multiaddrs . push ( ma )
162
+ }
163
+
164
+ callback ( null , multiaddrs )
165
+ }
70
166
167
+ return listener
168
+ /*
71
169
listener.listen(m.toOptions(), () => {
72
170
// Node.js likes to convert addr to IPv6 (when 0.0.0.0 for e.g)
73
171
const address = listener.address()
@@ -92,28 +190,7 @@ function TCP () {
92
190
cb()
93
191
})
94
192
listeners.push(listener)
95
- } ) , ( err ) => {
96
- callback ( err , freshMultiaddrs )
97
- } )
98
- }
99
-
100
- this . close = ( callback ) => {
101
- log ( 'closing' )
102
- if ( listeners . length === 0 ) {
103
- log ( 'Called close with no active listeners' )
104
- return callback ( )
105
- }
106
-
107
- parallel ( listeners . map ( ( listener ) => ( cb ) => {
108
- setTimeout ( ( ) => {
109
- Object . keys ( listener . __connections ) . forEach ( ( key ) => {
110
- log ( 'destroying %s' , key )
111
- listener . __connections [ key ] . destroy ( )
112
- } )
113
- } , CLOSE_TIMEOUT )
114
-
115
- listener . close ( cb )
116
- } ) , callback )
193
+ */
117
194
}
118
195
119
196
this . filter = ( multiaddrs ) => {
@@ -129,19 +206,19 @@ function TCP () {
129
206
}
130
207
}
131
208
132
- function getMultiaddr ( conn ) {
209
+ function getMultiaddr ( socket ) {
133
210
var mh
134
211
135
- if ( conn . remoteFamily === 'IPv6' ) {
136
- var addr = new Address6 ( conn . remoteAddress )
212
+ if ( socket . remoteFamily === 'IPv6' ) {
213
+ var addr = new Address6 ( socket . remoteAddress )
137
214
if ( addr . v4 ) {
138
215
var ip4 = addr . to4 ( ) . correctForm ( )
139
- mh = multiaddr ( '/ip4/' + ip4 + '/tcp/' + conn . remotePort )
216
+ mh = multiaddr ( '/ip4/' + ip4 + '/tcp/' + socket . remotePort )
140
217
} else {
141
- mh = multiaddr ( '/ip6/' + conn . remoteAddress + '/tcp/' + conn . remotePort )
218
+ mh = multiaddr ( '/ip6/' + socket . remoteAddress + '/tcp/' + socket . remotePort )
142
219
}
143
220
} else {
144
- mh = multiaddr ( '/ip4/' + conn . remoteAddress + '/tcp/' + conn . remotePort )
221
+ mh = multiaddr ( '/ip4/' + socket . remoteAddress + '/tcp/' + socket . remotePort )
145
222
}
146
223
147
224
return mh
0 commit comments