@@ -6,68 +6,169 @@ const tcp = require('net')
6
6
const multiaddr = require ( 'multiaddr' )
7
7
const Address6 = require ( 'ip-address' ) . Address6
8
8
const mafmt = require ( 'mafmt' )
9
- const parallel = require ( 'run-parallel' )
9
+ // const parallel = require('run-parallel')
10
10
const contains = require ( 'lodash.contains' )
11
+ const os = require ( 'os' )
12
+ const Connection = require ( './connection' )
11
13
12
14
exports = module . exports = TCP
13
15
14
16
const IPFS_CODE = 421
15
- const CLOSE_TIMEOUT = 300
17
+ const CLOSE_TIMEOUT = 2000
16
18
17
19
function TCP ( ) {
18
20
if ( ! ( this instanceof TCP ) ) {
19
21
return new TCP ( )
20
22
}
21
23
22
- const listeners = [ ]
23
-
24
- this . dial = function ( multiaddr , options ) {
25
- if ( ! options ) {
24
+ this . dial = function ( ma , options , callback ) {
25
+ if ( typeof options === 'function' ) {
26
+ callback = options
26
27
options = { }
27
28
}
28
- options . ready = options . ready || function noop ( ) { }
29
- const conn = tcp . connect ( multiaddr . toOptions ( ) , options . ready )
29
+
30
+ if ( ! callback ) {
31
+ callback = function noop ( ) { }
32
+ }
33
+
34
+ const socket = tcp . connect ( ma . toOptions ( ) )
35
+ const conn = new Connection ( socket )
30
36
conn . getObservedAddrs = ( ) => {
31
37
return [ multiaddr ]
32
38
}
39
+
40
+ socket . on ( 'timeout' , ( ) => {
41
+ conn . emit ( 'timeout' )
42
+ } )
43
+
44
+ socket . on ( 'error' , ( err ) => {
45
+ callback ( err )
46
+ conn . emit ( 'error' , err )
47
+ } )
48
+
49
+ socket . on ( 'connect' , ( ) => {
50
+ callback ( null , conn )
51
+ conn . emit ( 'connect' )
52
+ } )
53
+
54
+ conn . getObservedAddrs = ( cb ) => {
55
+ return cb ( null , [ ma ] )
56
+ }
57
+
33
58
return conn
34
59
}
35
60
36
- this . createListener = ( multiaddrs , handler , callback ) => {
37
- if ( ! Array . isArray ( multiaddrs ) ) {
38
- multiaddrs = [ multiaddrs ]
61
+ this . createListener = ( options , handler ) => {
62
+ if ( typeof options === 'function' ) {
63
+ handler = options
64
+ options = { }
39
65
}
40
66
41
- const freshMultiaddrs = [ ]
67
+ const listener = tcp . createServer ( ( socket ) => {
68
+ const conn = new Connection ( socket )
69
+
70
+ conn . getObservedAddrs = ( cb ) => {
71
+ return cb ( null , [ getMultiaddr ( socket ) ] )
72
+ }
73
+ handler ( conn )
74
+ } )
42
75
43
- parallel ( multiaddrs . map ( ( m ) => ( cb ) => {
44
- let ipfsHashId
45
- if ( contains ( m . protoNames ( ) , 'ipfs' ) ) {
46
- ipfsHashId = m . stringTuples ( ) . filter ( ( tuple ) => {
76
+ let ipfsId
77
+ let listeningMultiaddr
78
+
79
+ listener . _listen = listener . listen
80
+ listener . listen = ( ma , callback ) => {
81
+ listeningMultiaddr = ma
82
+ if ( contains ( ma . protoNames ( ) , 'ipfs' ) ) {
83
+ ipfsId = ma . stringTuples ( ) . filter ( ( tuple ) => {
47
84
if ( tuple [ 0 ] === IPFS_CODE ) {
48
85
return true
49
86
}
50
87
} ) [ 0 ] [ 1 ]
51
- m = m . decapsulate ( 'ipfs' )
88
+ listeningMultiaddr = ma . decapsulate ( 'ipfs' )
52
89
}
53
90
54
- const listener = tcp . createServer ( ( conn ) => {
55
- conn . getObservedAddrs = ( ) => {
56
- return [ getMultiaddr ( conn ) ]
57
- }
58
- handler ( conn )
59
- } )
91
+ listener . _listen ( listeningMultiaddr . toOptions ( ) , callback )
92
+ }
60
93
61
- listener . __connections = { }
62
- listener . on ( 'connection' , ( conn ) => {
63
- const key = `${ conn . remoteAddress } :${ conn . remotePort } `
64
- listener . __connections [ key ] = conn
94
+ listener . _close = listener . close
95
+ listener . close = ( options , callback ) => {
96
+ if ( typeof options === 'function' ) {
97
+ callback = options
98
+ options = { }
99
+ }
100
+ if ( ! callback ) { callback = function noop ( ) { } }
101
+ if ( ! options ) { options = { } }
65
102
66
- conn . on ( 'close' , ( ) => {
67
- delete listener . __connections [ key ]
103
+ let closed = false
104
+ listener . _close ( callback )
105
+ listener . once ( 'close' , ( ) => {
106
+ closed = true
107
+ } )
108
+ setTimeout ( ( ) => {
109
+ if ( closed ) {
110
+ return
111
+ }
112
+ log ( 'unable to close graciously, destroying conns' )
113
+ Object . keys ( listener . __connections ) . forEach ( ( key ) => {
114
+ log ( 'destroying %s' , key )
115
+ listener . __connections [ key ] . destroy ( )
68
116
} )
117
+ } , options . timeout || CLOSE_TIMEOUT )
118
+ }
119
+
120
+ // Keep track of open connections to destroy in case of timeout
121
+ listener . __connections = { }
122
+ listener . on ( 'connection' , ( socket ) => {
123
+ const key = `${ socket . remoteAddress } :${ socket . remotePort } `
124
+ listener . __connections [ key ] = socket
125
+
126
+ socket . on ( 'close' , ( ) => {
127
+ delete listener . __connections [ key ]
69
128
} )
129
+ } )
130
+
131
+ listener . getAddrs = ( callback ) => {
132
+ const multiaddrs = [ ]
133
+ const address = listener . address ( )
134
+
135
+ // Because TCP will only return the IPv6 version
136
+ // we need to capture from the passed multiaddr
137
+ if ( listeningMultiaddr . toString ( ) . indexOf ( 'ip4' ) !== - 1 ) {
138
+ let m = listeningMultiaddr . decapsulate ( 'tcp' )
139
+ m = m . encapsulate ( '/tcp/' + address . port )
140
+ if ( ipfsId ) {
141
+ m = m . encapsulate ( '/ipfs/' + ipfsId )
142
+ }
143
+
144
+ if ( m . toString ( ) . indexOf ( '0.0.0.0' ) !== - 1 ) {
145
+ const netInterfaces = os . networkInterfaces ( )
146
+ Object . keys ( netInterfaces ) . forEach ( ( niKey ) => {
147
+ netInterfaces [ niKey ] . forEach ( ( ni ) => {
148
+ if ( ni . family === 'IPv4' ) {
149
+ multiaddrs . push ( multiaddr ( m . toString ( ) . replace ( '0.0.0.0' , ni . address ) ) )
150
+ }
151
+ } )
152
+ } )
153
+ } else {
154
+ multiaddrs . push ( m )
155
+ }
156
+ }
157
+
158
+ if ( address . family === 'IPv6' ) {
159
+ let ma = multiaddr ( '/ip6/' + address . address + '/tcp/' + address . port )
160
+ if ( ipfsId ) {
161
+ ma = ma . encapsulate ( '/ipfs/' + ipfsId )
162
+ }
163
+
164
+ multiaddrs . push ( ma )
165
+ }
166
+
167
+ callback ( null , multiaddrs )
168
+ }
70
169
170
+ return listener
171
+ /*
71
172
listener.listen(m.toOptions(), () => {
72
173
// Node.js likes to convert addr to IPv6 (when 0.0.0.0 for e.g)
73
174
const address = listener.address()
@@ -92,28 +193,7 @@ function TCP () {
92
193
cb()
93
194
})
94
195
listeners.push(listener)
95
- } ) , ( err ) => {
96
- callback ( err , freshMultiaddrs )
97
- } )
98
- }
99
-
100
- this . close = ( callback ) => {
101
- log ( 'closing' )
102
- if ( listeners . length === 0 ) {
103
- log ( 'Called close with no active listeners' )
104
- return callback ( )
105
- }
106
-
107
- parallel ( listeners . map ( ( listener ) => ( cb ) => {
108
- setTimeout ( ( ) => {
109
- Object . keys ( listener . __connections ) . forEach ( ( key ) => {
110
- log ( 'destroying %s' , key )
111
- listener . __connections [ key ] . destroy ( )
112
- } )
113
- } , CLOSE_TIMEOUT )
114
-
115
- listener . close ( cb )
116
- } ) , callback )
196
+ */
117
197
}
118
198
119
199
this . filter = ( multiaddrs ) => {
@@ -129,19 +209,19 @@ function TCP () {
129
209
}
130
210
}
131
211
132
- function getMultiaddr ( conn ) {
212
+ function getMultiaddr ( socket ) {
133
213
var mh
134
214
135
- if ( conn . remoteFamily === 'IPv6' ) {
136
- var addr = new Address6 ( conn . remoteAddress )
215
+ if ( socket . remoteFamily === 'IPv6' ) {
216
+ var addr = new Address6 ( socket . remoteAddress )
137
217
if ( addr . v4 ) {
138
218
var ip4 = addr . to4 ( ) . correctForm ( )
139
- mh = multiaddr ( '/ip4/' + ip4 + '/tcp/' + conn . remotePort )
219
+ mh = multiaddr ( '/ip4/' + ip4 + '/tcp/' + socket . remotePort )
140
220
} else {
141
- mh = multiaddr ( '/ip6/' + conn . remoteAddress + '/tcp/' + conn . remotePort )
221
+ mh = multiaddr ( '/ip6/' + socket . remoteAddress + '/tcp/' + socket . remotePort )
142
222
}
143
223
} else {
144
- mh = multiaddr ( '/ip4/' + conn . remoteAddress + '/tcp/' + conn . remotePort )
224
+ mh = multiaddr ( '/ip4/' + socket . remoteAddress + '/tcp/' + socket . remotePort )
145
225
}
146
226
147
227
return mh
0 commit comments