Skip to content

Commit 15f7c2a

Browse files
committed
feat: coalescing dial support (#518)
* docs: fix spelling in api * fix: dont create peerstore twice * feat: add support for dial coalescing * doc(fix): add setPeerValue to API TOC * docs: add more jsdocs to dialer * chore: remove old comment * fix: ensure connections are closed * fix: registrar.getConnections returns first open conn * fix: directly set the closed status * chore: remove unneeded log * refactor: peerStore.put takes an options object
1 parent 4384d13 commit 15f7c2a

File tree

14 files changed

+325
-172
lines changed

14 files changed

+325
-172
lines changed

doc/API.md

+4-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* [`pubsub.publish`](#pubsubpublish)
2323
* [`pubsub.subscribe`](#pubsubsubscribe)
2424
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
25+
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
2526
* [`metrics.global`](#metricsglobal)
2627
* [`metrics.peers`](#metricspeers)
2728
* [`metrics.protocols`](#metricsprotocols)
@@ -92,7 +93,7 @@ Required keys in the `options` object:
9293

9394
</details>
9495

95-
Once you have a libp2p instance, you are able to listen to several events it emmits, so that you can be noticed of relevant network events.
96+
Once you have a libp2p instance, you are able to listen to several events it emits, so that you can be noticed of relevant network events.
9697

9798
<details><summary>Events</summary>
9899

@@ -666,12 +667,8 @@ Enables users to change the value of certain peers in a range of 0 to 1. Peers w
666667
#### Example
667668

668669
```js
669-
const topic = 'topic'
670-
const handler = (msg) => {
671-
// msg.data - pubsub data received
672-
}
673-
674-
libp2p.pubsub.unsubscribe(topic, handler)
670+
libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
671+
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
675672
```
676673

677674
### metrics.global

doc/DIALER.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# js-libp2p Dialer
22

33
**Synopsis**
4+
* Parallel dials to the same peer will yield the same connection/error when the first dial settles.
45
* All Dial Requests in js-libp2p must request a token(s) from the Dialer.
56
* The number of tokens requested should be between 1 and the MAX_PER_PEER_DIALS max set in the Dialer.
67
* If the number of available tokens is less than requested, the Dialer may return less than requested.

src/circuit/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class Circuit {
109109
let disconnectOnFailure = false
110110
let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer))
111111
if (!relayConnection) {
112-
relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
112+
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
113113
disconnectOnFailure = true
114114
}
115115

src/circuit/listener.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ module.exports = (circuit) => {
2424
listener.listen = async (addr) => {
2525
const [addrString] = String(addr).split('/p2p-circuit').slice(-1)
2626

27-
const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString))
27+
const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
2828
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
2929

3030
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)

src/dialer/index.js

+115-43
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ const multiaddr = require('multiaddr')
44
const errCode = require('err-code')
55
const TimeoutController = require('timeout-abort-controller')
66
const anySignal = require('any-signal')
7+
const PeerId = require('peer-id')
8+
const PeerInfo = require('peer-info')
79
const debug = require('debug')
810
const log = debug('libp2p:dialer')
911
log.error = debug('libp2p:dialer:error')
@@ -38,7 +40,7 @@ class Dialer {
3840
this.timeout = timeout
3941
this.perPeerLimit = perPeerLimit
4042
this.tokens = [...new Array(concurrency)].map((_, index) => index)
41-
this._pendingDials = new Set()
43+
this._pendingDials = new Map()
4244
}
4345

4446
/**
@@ -56,72 +58,111 @@ class Dialer {
5658
}
5759

5860
/**
59-
* Connects to the first success of a given list of `Multiaddr`. `addrs` should
60-
* include the id of the peer being dialed, it will be used for encryption verification.
61+
* Connects to a given `PeerId` or `Multiaddr` by dialing all of its known addresses.
62+
* The dial to the first address that is successfully able to upgrade a connection
63+
* will be used.
6164
*
62-
* @param {Array<Multiaddr>|Multiaddr} addrs
65+
* @param {PeerInfo|Multiaddr} peer The peer to dial
6366
* @param {object} [options]
6467
* @param {AbortSignal} [options.signal] An AbortController signal
6568
* @returns {Promise<Connection>}
6669
*/
67-
async connectToMultiaddr (addrs, options = {}) {
68-
if (!Array.isArray(addrs)) addrs = [multiaddr(addrs)]
69-
70-
const dialAction = (addr, options) => {
71-
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
72-
return this.transportManager.dial(addr, options)
70+
async connectToPeer (peer, options = {}) {
71+
const dialTarget = this._createDialTarget(peer)
72+
if (dialTarget.addrs.length === 0) {
73+
throw errCode(new Error('The dial request has no addresses'), 'ERR_NO_DIAL_MULTIADDRS')
7374
}
74-
const dialRequest = new DialRequest({
75-
addrs,
76-
dialAction,
77-
dialer: this
78-
})
79-
80-
// Combine the timeout signal and options.signal, if provided
81-
const timeoutController = new TimeoutController(this.timeout)
82-
const signals = [timeoutController.signal]
83-
options.signal && signals.push(options.signal)
84-
const signal = anySignal(signals)
85-
86-
const dial = {
87-
dialRequest,
88-
controller: timeoutController
89-
}
90-
this._pendingDials.add(dial)
75+
const pendingDial = this._pendingDials.get(dialTarget.id) || this._createPendingDial(dialTarget, options)
9176

9277
try {
93-
const dialResult = await dialRequest.run({ ...options, signal })
94-
log('dial succeeded to %s', dialResult.remoteAddr)
95-
return dialResult
78+
const connection = await pendingDial.promise
79+
log('dial succeeded to %s', dialTarget.id)
80+
return connection
9681
} catch (err) {
9782
// Error is a timeout
98-
if (timeoutController.signal.aborted) {
83+
if (pendingDial.controller.signal.aborted) {
9984
err.code = codes.ERR_TIMEOUT
10085
}
10186
log.error(err)
10287
throw err
10388
} finally {
104-
timeoutController.clear()
105-
this._pendingDials.delete(dial)
89+
pendingDial.destroy()
10690
}
10791
}
10892

10993
/**
110-
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
111-
* The dial to the first address that is successfully able to upgrade a connection
112-
* will be used.
113-
*
114-
* @param {PeerId} peerId The remote peer id to dial
94+
* @typedef DialTarget
95+
* @property {string} id
96+
* @property {Multiaddr[]} addrs
97+
*/
98+
99+
/**
100+
* Creates a DialTarget. The DialTarget is used to create and track
101+
* the DialRequest to a given peer.
102+
* @private
103+
* @param {PeerInfo|Multiaddr} peer A PeerId or Multiaddr
104+
* @returns {DialTarget}
105+
*/
106+
_createDialTarget (peer) {
107+
const dialable = Dialer.getDialable(peer)
108+
if (multiaddr.isMultiaddr(dialable)) {
109+
return {
110+
id: dialable.toString(),
111+
addrs: [dialable]
112+
}
113+
}
114+
const addrs = this.peerStore.multiaddrsForPeer(dialable)
115+
return {
116+
id: dialable.id.toString(),
117+
addrs
118+
}
119+
}
120+
121+
/**
122+
* @typedef PendingDial
123+
* @property {DialRequest} dialRequest
124+
* @property {TimeoutController} controller
125+
* @property {Promise} promise
126+
* @property {function():void} destroy
127+
*/
128+
129+
/**
130+
* Creates a PendingDial that wraps the underlying DialRequest
131+
* @private
132+
* @param {DialTarget} dialTarget
115133
* @param {object} [options]
116134
* @param {AbortSignal} [options.signal] An AbortController signal
117-
* @returns {Promise<Connection>}
135+
* @returns {PendingDial}
118136
*/
119-
connectToPeer (peerId, options = {}) {
120-
const addrs = this.peerStore.multiaddrsForPeer(peerId)
137+
_createPendingDial (dialTarget, options) {
138+
const dialAction = (addr, options) => {
139+
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
140+
return this.transportManager.dial(addr, options)
141+
}
121142

122-
// TODO: ensure the peer id is on the multiaddr
143+
const dialRequest = new DialRequest({
144+
addrs: dialTarget.addrs,
145+
dialAction,
146+
dialer: this
147+
})
148+
149+
// Combine the timeout signal and options.signal, if provided
150+
const timeoutController = new TimeoutController(this.timeout)
151+
const signals = [timeoutController.signal]
152+
options.signal && signals.push(options.signal)
153+
const signal = anySignal(signals)
123154

124-
return this.connectToMultiaddr(addrs, options)
155+
const pendingDial = {
156+
dialRequest,
157+
controller: timeoutController,
158+
promise: dialRequest.run({ ...options, signal }),
159+
destroy: () => {
160+
timeoutController.clear()
161+
this._pendingDials.delete(dialTarget.id)
162+
}
163+
}
164+
this._pendingDials.set(dialTarget.id, pendingDial)
165+
return pendingDial
125166
}
126167

127168
getTokens (num) {
@@ -137,6 +178,37 @@ class Dialer {
137178
log('token %d released', token)
138179
this.tokens.push(token)
139180
}
181+
182+
/**
183+
* Converts the given `peer` into a `PeerInfo` or `Multiaddr`.
184+
* @static
185+
* @param {PeerInfo|PeerId|Multiaddr|string} peer
186+
* @returns {PeerInfo|Multiaddr}
187+
*/
188+
static getDialable (peer) {
189+
if (PeerInfo.isPeerInfo(peer)) return peer
190+
if (typeof peer === 'string') {
191+
peer = multiaddr(peer)
192+
}
193+
194+
let addr
195+
if (multiaddr.isMultiaddr(peer)) {
196+
addr = peer
197+
try {
198+
peer = PeerId.createFromCID(peer.getPeerId())
199+
} catch (err) {
200+
// Couldn't get the PeerId, just use the address
201+
return peer
202+
}
203+
}
204+
205+
if (PeerId.isPeerId(peer)) {
206+
peer = new PeerInfo(peer)
207+
}
208+
209+
addr && peer.multiaddrs.add(addr)
210+
return peer
211+
}
140212
}
141213

142214
module.exports = Dialer

src/index.js

+14-23
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ const log = debug('libp2p')
66
log.error = debug('libp2p:error')
77

88
const PeerInfo = require('peer-info')
9-
const multiaddr = require('multiaddr')
109

1110
const peerRouting = require('./peer-routing')
1211
const contentRouting = require('./content-routing')
1312
const pubsub = require('./pubsub')
14-
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
13+
const { getPeerInfo } = require('./get-peer-info')
1514
const { validate: validateConfig } = require('./config')
1615
const { codes } = require('./errors')
1716

@@ -51,8 +50,6 @@ class Libp2p extends EventEmitter {
5150
this._transport = [] // Transport instances/references
5251
this._discovery = new Map() // Discovery service instances/references
5352

54-
this.peerStore = new PeerStore()
55-
5653
if (this._options.metrics.enabled) {
5754
this.metrics = new Metrics(this._options.metrics)
5855
}
@@ -62,7 +59,7 @@ class Libp2p extends EventEmitter {
6259
localPeer: this.peerInfo.id,
6360
metrics: this.metrics,
6461
onConnection: (connection) => {
65-
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
62+
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer), { silent: true })
6663
this.registrar.onConnect(peerInfo, connection)
6764
this.connectionManager.onConnect(connection)
6865
this.emit('peer:connect', peerInfo)
@@ -74,7 +71,7 @@ class Libp2p extends EventEmitter {
7471
}
7572
},
7673
onConnectionEnd: (connection) => {
77-
const peerInfo = getPeerInfo(connection.remotePeer)
74+
const peerInfo = Dialer.getDialable(connection.remotePeer)
7875
this.registrar.onDisconnect(peerInfo, connection)
7976
this.connectionManager.onDisconnect(connection)
8077

@@ -266,27 +263,22 @@ class Libp2p extends EventEmitter {
266263
* @returns {Promise<Connection|*>}
267264
*/
268265
async dialProtocol (peer, protocols, options) {
266+
const dialable = Dialer.getDialable(peer)
269267
let connection
270-
if (multiaddr.isMultiaddr(peer)) {
271-
connection = await this.dialer.connectToMultiaddr(peer, options)
272-
} else {
273-
peer = await getPeerInfoRemote(peer, this)
274-
connection = await this.dialer.connectToPeer(peer.id, options)
268+
if (PeerInfo.isPeerInfo(dialable)) {
269+
this.peerStore.put(dialable, { silent: true })
270+
connection = this.registrar.getConnection(dialable)
275271
}
276272

277-
const peerInfo = getPeerInfo(connection.remotePeer)
273+
if (!connection) {
274+
connection = await this.dialer.connectToPeer(dialable, options)
275+
}
278276

279277
// If a protocol was provided, create a new stream
280278
if (protocols) {
281-
const stream = await connection.newStream(protocols)
282-
283-
peerInfo.protocols.add(stream.protocol)
284-
this.peerStore.put(peerInfo)
285-
286-
return stream
279+
return connection.newStream(protocols)
287280
}
288281

289-
this.peerStore.put(peerInfo)
290282
return connection
291283
}
292284

@@ -428,11 +420,10 @@ class Libp2p extends EventEmitter {
428420
// If auto dialing is on and we have no connection to the peer, check if we should dial
429421
if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerInfo)) {
430422
const minPeers = this._options.connectionManager.minPeers || 0
431-
// TODO: This does not account for multiple connections to a peer
432-
if (minPeers > this.registrar.connections.size) {
433-
log('connecting to discovered peer')
423+
if (minPeers > this.connectionManager._connections.size) {
424+
log('connecting to discovered peer %s', peerInfo.id.toString())
434425
try {
435-
await this.dialer.connectToPeer(peerInfo.id)
426+
await this.dialer.connectToPeer(peerInfo)
436427
} catch (err) {
437428
log.error('could not connect to discovered peer', err)
438429
}

0 commit comments

Comments
 (0)