Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit fcbcccc

Browse files
authored
fix: improve dial queue and parallel dials (#315)
* feat: allow dialer queues to do many requests to a peer * fix: parallel dials and validate cancelled conns * feat: make dial timeout configurable * fix: allow already connected peers to dial immediately * refactor: add dial timeout to consts file * fix: keep better track of in progress queues * refactor: make dials race
1 parent 57146a0 commit fcbcccc

File tree

8 files changed

+60
-50
lines changed

8 files changed

+60
-50
lines changed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ const sw = new switch(peerInfo , peerBook [, options])
6262
If defined, `options` should be an object with the following keys and respective values:
6363

6464
- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
65-
- `maxParallelDials` - number of concurrent dials the switch should allow. Defaults to `50`
65+
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50`
66+
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
6667
- `stats`: an object with the following keys and respective values:
6768
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
6869
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.

src/constants.js

+1
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
module.exports = {
44
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
5+
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
56
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
67
}

src/dialer/queue.js

+3-7
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class Queue {
6363
* @constructor
6464
* @param {string} peerId
6565
* @param {Switch} _switch
66-
* @param {function} onStopped Called when the queue stops
66+
* @param {function(string)} onStopped Called when the queue stops
6767
*/
6868
constructor (peerId, _switch, onStopped) {
6969
this.id = peerId
@@ -78,20 +78,16 @@ class Queue {
7878
}
7979

8080
/**
81-
* Adds the dial request to the queue and starts the
82-
* queue if it is stopped
81+
* Adds the dial request to the queue. The queue is not automatically started
8382
* @param {string} protocol
8483
* @param {boolean} useFSM If callback should use a ConnectionFSM instead
8584
* @param {function(Error, Connection)} callback
86-
* @returns {boolean} whether or not the queue has been started
8785
*/
8886
add (protocol, useFSM, callback) {
8987
if (!this.isDialAllowed()) {
9088
nextTick(callback, ERR_BLACKLISTED())
91-
return false
9289
}
9390
this._queue.push({ protocol, useFSM, callback })
94-
return this.start()
9591
}
9692

9793
/**
@@ -133,7 +129,7 @@ class Queue {
133129
if (this.isRunning) {
134130
log('stopping dial queue to %s', this.id)
135131
this.isRunning = false
136-
this.onStopped()
132+
this.onStopped(this.id)
137133
}
138134
}
139135

src/dialer/queueManager.js

+29-22
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
const once = require('once')
44
const Queue = require('./queue')
5-
const { DIAL_ABORTED } = require('../errors')
65
const noop = () => {}
76

87
class DialQueueManager {
@@ -11,10 +10,10 @@ class DialQueueManager {
1110
* @param {Switch} _switch
1211
*/
1312
constructor (_switch) {
14-
this._queue = []
13+
this._queue = new Set()
14+
this._dialingQueues = new Set()
1515
this._queues = {}
1616
this.switch = _switch
17-
this.dials = 0
1817
}
1918

2019
/**
@@ -24,11 +23,8 @@ class DialQueueManager {
2423
* This causes the entire DialerQueue to be drained
2524
*/
2625
abort () {
27-
// Abort items in the general queue
28-
while (this._queue.length > 0) {
29-
let dial = this._queue.shift()
30-
dial.callback(DIAL_ABORTED())
31-
}
26+
// Clear the general queue
27+
this._queue.clear()
3228

3329
// Abort the individual peer queues
3430
const queues = Object.values(this._queues)
@@ -46,29 +42,39 @@ class DialQueueManager {
4642
add ({ peerInfo, protocol, useFSM, callback }) {
4743
callback = callback ? once(callback) : noop
4844

49-
// If the target queue is currently running, just add the dial
50-
// directly to it. This acts as a crude priority lane for multiple
51-
// calls to a peer.
45+
// Add the dial to its respective queue
5246
const targetQueue = this.getQueue(peerInfo)
53-
if (targetQueue.isRunning) {
54-
targetQueue.add(protocol, useFSM, callback)
47+
targetQueue.add(protocol, useFSM, callback)
48+
49+
// If we're already connected to the peer, start the queue now
50+
// While it might cause queues to go over the max parallel amount,
51+
// it avoids blocking peers we're already connected to
52+
if (peerInfo.isConnected()) {
53+
targetQueue.start()
5554
return
5655
}
5756

58-
this._queue.push({ peerInfo, protocol, useFSM, callback })
57+
// Add the id to the general queue set if the queue isn't running
58+
// and if the queue is allowed to dial
59+
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) {
60+
this._queue.add(targetQueue.id)
61+
}
62+
5963
this.run()
6064
}
6165

6266
/**
6367
* Will execute up to `MAX_PARALLEL_DIALS` dials
6468
*/
6569
run () {
66-
if (this.dials < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.length > 0) {
67-
let { peerInfo, protocol, useFSM, callback } = this._queue.shift()
68-
let dialQueue = this.getQueue(peerInfo)
69-
if (dialQueue.add(protocol, useFSM, callback)) {
70-
this.dials++
71-
}
70+
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) {
71+
let nextQueue = this._queue.values().next()
72+
if (nextQueue.done) return
73+
74+
this._queue.delete(nextQueue.value)
75+
let targetQueue = this._queues[nextQueue.value]
76+
this._dialingQueues.add(targetQueue.id)
77+
targetQueue.start()
7278
}
7379
}
7480

@@ -84,9 +90,10 @@ class DialQueueManager {
8490
* A handler for when dialing queues stop. This will trigger
8591
* `run()` in order to keep the queue processing.
8692
* @private
93+
* @param {string} id peer id of the queue that stopped
8794
*/
88-
_onQueueStopped () {
89-
this.dials--
95+
_onQueueStopped (id) {
96+
this._dialingQueues.delete(id)
9097
this.run()
9198
}
9299

src/limit-dialer/index.js

+12-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
'use strict'
22

3-
const tryEach = require('async/tryEach')
3+
const race = require('async/race')
44
const debug = require('debug')
55
const once = require('once')
66

77
const log = debug('libp2p:switch:dialer')
88

99
const DialQueue = require('./queue')
10+
const { CONNECTION_FAILED } = require('../errors')
1011

1112
/**
1213
* Track dials per peer and limited them.
@@ -42,19 +43,21 @@ class LimitDialer {
4243

4344
let errors = []
4445
const tasks = addrs.map((m) => {
45-
return (cb) => this.dialSingle(peer, transport, m, token, (err, result) => {
46-
if (err) {
47-
errors.push(err)
48-
return cb(err)
46+
return (cb) => this.dialSingle(peer, transport, m, token, (err, res) => {
47+
if (res) return cb(null, res)
48+
49+
errors.push(err || CONNECTION_FAILED())
50+
51+
if (errors.length === tasks.length) {
52+
cb(errors)
4953
}
50-
return cb(null, result)
5154
})
5255
})
5356

54-
tryEach(tasks, (_, result) => {
55-
if (result && result.conn) {
57+
race(tasks, (_, successfulDial) => {
58+
if (successfulDial) {
5659
log('dialMany:success')
57-
return callback(null, result)
60+
return callback(null, successfulDial)
5861
}
5962

6063
log('dialMany:error')

src/limit-dialer/queue.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ class DialQueue {
5454
pull(empty(), conn)
5555
// If we can close the connection, do it
5656
if (typeof conn.close === 'function') {
57-
return conn.close((_) => callback(null, { cancel: true }))
57+
return conn.close((_) => callback(null))
5858
}
59-
return callback(null, { cancel: true })
59+
return callback(null)
6060
}
6161

6262
// one is enough

src/transport.js

+2-5
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,19 @@ const debug = require('debug')
88
const log = debug('libp2p:switch:transport')
99

1010
const LimitDialer = require('./limit-dialer')
11+
const { DIAL_TIMEOUT } = require('./constants')
1112

1213
// number of concurrent outbound dials to make per peer, same as go-libp2p-swtch
1314
const defaultPerPeerRateLimit = 8
1415

15-
// the amount of time a single dial has to succeed
16-
// TODO this should be exposed as a option
17-
const dialTimeout = 30 * 1000
18-
1916
/**
2017
* Manages the transports for the switch. This simplifies dialing and listening across
2118
* multiple transports.
2219
*/
2320
class TransportManager {
2421
constructor (_switch) {
2522
this.switch = _switch
26-
this.dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout)
23+
this.dialer = new LimitDialer(defaultPerPeerRateLimit, this.switch._options.dialTimeout || DIAL_TIMEOUT)
2724
}
2825

2926
/**

test/limit-dialer.node.js

+9-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
const chai = require('chai')
55
chai.use(require('dirty-chai'))
6+
chai.use(require('chai-checkmark'))
67
const expect = chai.expect
78
const multiaddr = require('multiaddr')
89
const pull = require('pull-stream')
@@ -52,13 +53,15 @@ describe('LimitDialer', () => {
5253
it('two success', (done) => {
5354
const dialer = new LimitDialer(2, 10)
5455

56+
expect(2).checks(done)
57+
5558
// mock transport
5659
const t1 = {
5760
dial (addr, cb) {
5861
const as = addr.toString()
5962
if (as.match(/191/)) {
6063
setImmediate(() => cb(new Error('fail')))
61-
return {}
64+
return null
6265
} else if (as.match(/192/)) {
6366
setTimeout(cb, 2)
6467
return {
@@ -69,7 +72,10 @@ describe('LimitDialer', () => {
6972
setTimeout(cb, 8)
7073
return {
7174
source: pull.values([2]),
72-
sink: pull.drain()
75+
sink: pull.onEnd((err) => {
76+
// Verify the unused connection gets closed
77+
expect(err).to.not.exist().mark()
78+
})
7379
}
7480
}
7581
}
@@ -83,8 +89,7 @@ describe('LimitDialer', () => {
8389
conn,
8490
pull.collect((err, res) => {
8591
expect(err).to.not.exist()
86-
expect(res).to.be.eql([1])
87-
done()
92+
expect(res).to.be.eql([1]).mark()
8893
})
8994
)
9095
})

0 commit comments

Comments
 (0)