Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit 61340e3

Browse files
committed
Merge pull request #56 from diasdavid/fix/ws-ipfs
fix: handling of ipfs addresses in available transports
2 parents 163624c + 8e1413b commit 61340e3

File tree

7 files changed

+387
-328
lines changed

7 files changed

+387
-328
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,4 @@
7979
"Richard Littauer <[email protected]>",
8080
"dignifiedquire <[email protected]>"
8181
]
82-
}
82+
}

src/connection.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
'use strict'
2+
3+
const connHandler = require('./default-handler')
4+
const identify = require('./identify')
5+
6+
module.exports = function connection (swarm) {
7+
return {
8+
addUpgrade () {},
9+
10+
addStreamMuxer (muxer) {
11+
// for dialing
12+
swarm.muxers[muxer.multicodec] = muxer
13+
14+
// for listening
15+
swarm.handle(muxer.multicodec, (conn) => {
16+
const muxedConn = muxer(conn, true)
17+
18+
var peerIdForConn
19+
20+
muxedConn.on('stream', (conn) => {
21+
function gotId () {
22+
if (peerIdForConn) {
23+
conn.peerId = peerIdForConn
24+
connHandler(swarm.protocols, conn)
25+
} else {
26+
setTimeout(gotId, 100)
27+
}
28+
}
29+
30+
if (swarm.identify) {
31+
return gotId()
32+
}
33+
34+
connHandler(swarm.protocols, conn)
35+
})
36+
37+
// if identify is enabled, attempt to do it for muxer reuse
38+
if (swarm.identify) {
39+
identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => {
40+
if (err) {
41+
return console.log('Identify exec failed', err)
42+
}
43+
44+
peerIdForConn = pi.id
45+
swarm.muxedConns[pi.id.toB58String()] = {}
46+
swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn
47+
swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
48+
49+
swarm.emit('peer-mux-established', pi)
50+
51+
muxedConn.on('close', () => {
52+
delete swarm.muxedConns[pi.id.toB58String()]
53+
swarm.emit('peer-mux-closed', pi)
54+
})
55+
})
56+
}
57+
})
58+
},
59+
60+
reuse () {
61+
swarm.identify = true
62+
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm))
63+
}
64+
}
65+
}

src/default-handler.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
'use strict'
2+
3+
const multistream = require('multistream-select')
4+
5+
// incomming connection handler
6+
module.exports = function connHandler (protocols, conn) {
7+
var msS = new multistream.Select()
8+
9+
Object.keys(protocols).forEach((protocol) => {
10+
if (!protocol) {
11+
return
12+
}
13+
14+
msS.addHandler(protocol, protocols[protocol])
15+
})
16+
17+
msS.handle(conn)
18+
}

src/dial.js

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
'use strict'
2+
3+
const multistream = require('multistream-select')
4+
const DuplexPassThrough = require('duplex-passthrough')
5+
6+
const connHandler = require('./default-handler')
7+
8+
module.exports = function dial (swarm) {
9+
return (pi, protocol, callback) => {
10+
if (typeof protocol === 'function') {
11+
callback = protocol
12+
protocol = null
13+
}
14+
15+
if (!callback) {
16+
callback = function noop () {}
17+
}
18+
19+
const pt = new DuplexPassThrough()
20+
21+
const b58Id = pi.id.toB58String()
22+
23+
if (!swarm.muxedConns[b58Id]) {
24+
if (!swarm.conns[b58Id]) {
25+
attemptDial(pi, (err, conn) => {
26+
if (err) {
27+
return callback(err)
28+
}
29+
gotWarmedUpConn(conn)
30+
})
31+
} else {
32+
const conn = swarm.conns[b58Id]
33+
swarm.conns[b58Id] = undefined
34+
gotWarmedUpConn(conn)
35+
}
36+
} else {
37+
if (!protocol) {
38+
return callback()
39+
}
40+
gotMuxer(swarm.muxedConns[b58Id].muxer)
41+
}
42+
43+
return pt
44+
45+
function gotWarmedUpConn (conn) {
46+
attemptMuxerUpgrade(conn, (err, muxer) => {
47+
if (!protocol) {
48+
if (err) {
49+
swarm.conns[b58Id] = conn
50+
}
51+
return callback()
52+
}
53+
54+
if (err) {
55+
// couldn't upgrade to Muxer, it is ok
56+
protocolHandshake(conn, protocol, callback)
57+
} else {
58+
gotMuxer(muxer)
59+
}
60+
})
61+
}
62+
63+
function gotMuxer (muxer) {
64+
openConnInMuxedConn(muxer, (conn) => {
65+
protocolHandshake(conn, protocol, callback)
66+
})
67+
}
68+
69+
function attemptDial (pi, cb) {
70+
const tKeys = swarm.availableTransports(pi)
71+
72+
if (tKeys.length === 0) {
73+
return cb(new Error('No available tranport to dial to'))
74+
}
75+
76+
nextTransport(tKeys.shift())
77+
78+
function nextTransport (key) {
79+
const multiaddrs = pi.multiaddrs.slice()
80+
swarm.transport.dial(key, multiaddrs, (err, conn) => {
81+
if (err) {
82+
if (tKeys.length === 0) {
83+
return cb(new Error('Could not dial in any of the transports'))
84+
}
85+
return nextTransport(tKeys.shift())
86+
}
87+
cb(null, conn)
88+
})
89+
}
90+
}
91+
92+
function attemptMuxerUpgrade (conn, cb) {
93+
const muxers = Object.keys(swarm.muxers)
94+
if (muxers.length === 0) {
95+
return cb(new Error('no muxers available'))
96+
}
97+
98+
// 1. try to handshake in one of the muxers available
99+
// 2. if succeeds
100+
// - add the muxedConn to the list of muxedConns
101+
// - add incomming new streams to connHandler
102+
103+
nextMuxer(muxers.shift())
104+
105+
function nextMuxer (key) {
106+
var msI = new multistream.Interactive()
107+
msI.handle(conn, function () {
108+
msI.select(key, (err, conn) => {
109+
if (err) {
110+
if (muxers.length === 0) {
111+
cb(new Error('could not upgrade to stream muxing'))
112+
} else {
113+
nextMuxer(muxers.shift())
114+
}
115+
return
116+
}
117+
118+
const muxedConn = swarm.muxers[key](conn, false)
119+
swarm.muxedConns[b58Id] = {}
120+
swarm.muxedConns[b58Id].muxer = muxedConn
121+
swarm.muxedConns[b58Id].conn = conn
122+
123+
swarm.emit('peer-mux-established', pi)
124+
125+
muxedConn.on('close', () => {
126+
delete swarm.muxedConns[pi.id.toB58String()]
127+
swarm.emit('peer-mux-closed', pi)
128+
})
129+
130+
// in case identify is on
131+
muxedConn.on('stream', (conn) => {
132+
conn.peerId = pi.id
133+
connHandler(swarm.protocols, conn)
134+
})
135+
136+
cb(null, muxedConn)
137+
})
138+
})
139+
}
140+
}
141+
142+
function openConnInMuxedConn (muxer, cb) {
143+
cb(muxer.newStream())
144+
}
145+
146+
function protocolHandshake (conn, protocol, cb) {
147+
var msI = new multistream.Interactive()
148+
msI.handle(conn, function () {
149+
msI.select(protocol, (err, conn) => {
150+
if (err) {
151+
return callback(err)
152+
}
153+
154+
pt.wrapStream(conn)
155+
pt.peerId = pi.id
156+
callback(null, pt)
157+
})
158+
})
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)