Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit 8ba4249

Browse files
committed
refactor: split transport dialer and circuit logic
1 parent 6304d55 commit 8ba4249

File tree

10 files changed

+499
-406
lines changed

10 files changed

+499
-406
lines changed

src/circuit/constants.js

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict'
2+
3+
module.exports = {
4+
PRIOIRY: 100,
5+
DIALER: {
6+
ONION: 'onion',
7+
TELESCOPE: 'telescope'
8+
},
9+
RESPONSE: {
10+
SUCCESS: 100,
11+
HOP: {
12+
SRC_ADDR_TOO_LONG: 220,
13+
DST_ADDR_TOO_LONG: 221,
14+
SRC_MULTIADDR_INVALID: 250,
15+
DST_MULTIADDR_INVALID: 251,
16+
NO_CONN_TO_DST: 260,
17+
CANT_DIAL_DST: 261,
18+
CANT_OPEN_DST_STREAM: 262,
19+
CANT_SPEAK_RELAY: 270,
20+
CANT_CONNECT_TO_SELF: 280
21+
},
22+
STOP: {
23+
SRC_ADDR_TOO_LONG: 320,
24+
DST_ADDR_TOO_LONG: 321,
25+
SRC_MULTIADDR_INVALID: 350,
26+
DST_MULTIADDR_INVALID: 351
27+
}
28+
}
29+
}

src/circuit/dialer.js

+238
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
'use strict'
2+
3+
const pull = require('pull-stream')
4+
const handshake = require('pull-handshake')
5+
const Connection = require('interface-connection').Connection
6+
const mafmt = require('mafmt')
7+
const isFunction = require('lodash.isfunction')
8+
const multiaddr = require('multiaddr')
9+
const lp = require('pull-length-prefixed')
10+
const constants = require('./constants')
11+
const once = require('once')
12+
const utils = require('./utils')
13+
14+
const debug = require('debug')
15+
const log = debug('libp2p:circuit:dialer')
16+
log.err = debug('libp2p:circuit:error:dialer')
17+
18+
const multicodec = require('../multicodec')
19+
20+
class Dialer {
21+
/**
22+
* Creates an instance of Dialer.
23+
* @param {Swarm} swarm - the swarm
24+
* @param {any} options - config options
25+
*
26+
* @memberOf Dialer
27+
*/
28+
constructor (swarm, options) {
29+
this.swarm = swarm
30+
this.relayPeers = new Map()
31+
this.options = options
32+
// this.handler = handler // this handler is passed to the listener
33+
34+
// get all the relay addresses for this swarm
35+
const relays = this.filter(swarm._peerInfo.multiaddrs.toArray())
36+
37+
// if no explicit relays, add a default relay addr
38+
if (relays.length === 0) {
39+
this.swarm
40+
._peerInfo
41+
.multiaddrs
42+
.add(`/p2p-circuit/ipfs/${this.swarm._peerInfo.id.toB58String()}`)
43+
}
44+
45+
this.dialSwarmRelays(relays)
46+
47+
this.swarm.on('peer-mux-established', this.dialRelay.bind(this))
48+
this.swarm.on('peer-mux-closed', (peerInfo) => {
49+
this.relayPeers.delete(peerInfo.id.toB58String())
50+
})
51+
}
52+
53+
/**
54+
* Dial the relays in the Addresses.Swarm config
55+
*
56+
* @param {Array} relays
57+
* @return {void}
58+
*/
59+
dialSwarmRelays (relays) {
60+
// if we have relay addresses in swarm config, then dial those relays
61+
this.swarm.on('listening', () => {
62+
relays.forEach((relay) => {
63+
let relaySegments = relay
64+
.toString()
65+
.split('/p2p-circuit')
66+
.filter(segment => segment.length)
67+
68+
relaySegments.forEach((relaySegment) => {
69+
this.dialRelay(utils.peerInfoFromMa(relaySegment))
70+
})
71+
})
72+
})
73+
}
74+
75+
/**
76+
* Dial a peer over a relay
77+
*
78+
* @param {multiaddr} ma - the multiaddr of the peer to dial
79+
* @param {Object} options - dial options
80+
* @param {Function} cb - a callback called once dialed
81+
* @returns {Connection} - the connection
82+
*
83+
* @memberOf Dialer
84+
*/
85+
dial (ma, options, cb) {
86+
throw new Error('abstract class, method not implemented')
87+
}
88+
89+
/**
90+
* Dial the destination peer over a relay
91+
*
92+
* @param {multiaddr} dstMa
93+
* @param {Connection} relay
94+
* @param {Function} cb
95+
* @return {Function|void}
96+
* @private
97+
*/
98+
dialPeer (dstMa, relay, cb) {
99+
if (isFunction(relay)) {
100+
cb = relay
101+
relay = null
102+
}
103+
104+
if (!cb) {
105+
cb = () => {}
106+
}
107+
108+
dstMa = multiaddr(dstMa)
109+
// if no relay provided, dial on all available relays until one succeeds
110+
if (!relay) {
111+
const relays = Array.from(this.relayPeers.values())
112+
let next = (nextRelay) => {
113+
if (!nextRelay) {
114+
let err = `no relay peers were found`
115+
log.err(err)
116+
return cb(err)
117+
}
118+
119+
this.dialPeer(dstMa, nextRelay, (err, conn) => {
120+
if (err) {
121+
log.err(err)
122+
return next(relays.shift())
123+
}
124+
cb(null, new Connection(conn))
125+
})
126+
}
127+
next(relays.shift())
128+
} else {
129+
this.negotiateRelay(relay, dstMa, (err, conn) => {
130+
if (err) {
131+
log.err(`An error has occurred negotiating the relay connection`, err)
132+
return cb(err)
133+
}
134+
135+
return cb(null, conn)
136+
})
137+
}
138+
}
139+
140+
/**
141+
* Negotiate the relay connection
142+
*
143+
* @param {Connection} conn - a connection to the relay
144+
* @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for
145+
* @param {Function} cb - a callback with that return the negotiated relay connection
146+
* @returns {void}
147+
*
148+
* @memberOf Dialer
149+
*/
150+
negotiateRelay (conn, dstMa, cb) {
151+
dstMa = multiaddr(dstMa)
152+
153+
let stream = handshake({timeout: 1000 * 60}, cb)
154+
let shake = stream.handshake
155+
156+
log(`negotiating relay for peer ${dstMa.getPeerId()}`)
157+
const values = [new Buffer(dstMa.toString())]
158+
159+
pull(
160+
pull.values(values),
161+
lp.encode(),
162+
pull.collect((err, encoded) => {
163+
if (err) {
164+
return cb(err)
165+
}
166+
167+
shake.write(encoded[0])
168+
shake.read(3, (err, data) => {
169+
if (err) {
170+
log.err(err)
171+
return cb(err)
172+
}
173+
174+
if (Number(data.toString()) !== constants.RESPONSE.SUCCESS) {
175+
cb(new Error(`Got ${data.toString()} error code trying to dial over relay`))
176+
}
177+
178+
cb(null, shake.rest())
179+
})
180+
})
181+
)
182+
183+
pull(stream, conn, stream)
184+
}
185+
186+
/**
187+
* Dial a relay peer by its PeerInfo
188+
*
189+
* @param {PeerInfo} peer - the PeerInfo of the relay peer
190+
* @param {Function} cb - a callback with the connection to the relay peer
191+
* @returns {Function|void}
192+
*
193+
* @memberOf Dialer
194+
*/
195+
dialRelay (peer, cb) {
196+
cb = once(cb || (() => {}))
197+
198+
const b58Id = utils.getB58String(peer)
199+
const relay = this.relayPeers.get(b58Id)
200+
if (relay) {
201+
cb(null, relay)
202+
}
203+
204+
const relayConn = new Connection()
205+
relayConn.setPeerInfo(peer)
206+
// attempt to dia the relay so that we have a connection
207+
this.swarm.dial(peer, multicodec.hop, once((err, conn) => {
208+
if (err) {
209+
log.err(err)
210+
return cb(err)
211+
}
212+
213+
relayConn.setInnerConn(conn)
214+
this.relayPeers.set(b58Id, relayConn)
215+
cb(null, relayConn)
216+
}))
217+
}
218+
219+
/**
220+
* Filter check for all multiaddresses
221+
* that this transport can dial on
222+
*
223+
* @param {any} multiaddrs
224+
* @returns {Array<multiaddr>}
225+
*
226+
* @memberOf Dialer
227+
*/
228+
filter (multiaddrs) {
229+
if (!Array.isArray(multiaddrs)) {
230+
multiaddrs = [multiaddrs]
231+
}
232+
return multiaddrs.filter((ma) => {
233+
return mafmt.Circuit.matches(ma)
234+
})
235+
}
236+
}
237+
238+
module.exports = Dialer

0 commit comments

Comments
 (0)