Skip to content

Commit a3cdc08

Browse files
committed
chore: refactor connection manager and registrar
1 parent 7039cd0 commit a3cdc08

File tree

18 files changed

+424
-411
lines changed

18 files changed

+424
-411
lines changed

doc/API.md

+43-5
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
* [`handle`](#handle)
1212
* [`unhandle`](#unhandle)
1313
* [`ping`](#ping)
14-
* [`peerRouting.findPeer`](#peerroutingfindpeer)
1514
* [`contentRouting.findProviders`](#contentroutingfindproviders)
1615
* [`contentRouting.provide`](#contentroutingprovide)
1716
* [`contentRouting.put`](#contentroutingput)
1817
* [`contentRouting.get`](#contentroutingget)
1918
* [`contentRouting.getMany`](#contentroutinggetmany)
19+
* [`peerRouting.findPeer`](#peerroutingfindpeer)
2020
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
2121
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
2222
* [`peerStore.addressBook.get`](#peerstoreaddressbookget)
@@ -34,14 +34,17 @@
3434
* [`pubsub.publish`](#pubsubpublish)
3535
* [`pubsub.subscribe`](#pubsubsubscribe)
3636
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
37+
* [`connectionManager.get`](#connectionmanagerget)
3738
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
39+
* [`connectionManager.size`](#connectionmanagersize)
3840
* [`metrics.global`](#metricsglobal)
3941
* [`metrics.peers`](#metricspeers)
4042
* [`metrics.protocols`](#metricsprotocols)
4143
* [`metrics.forPeer`](#metricsforpeer)
4244
* [`metrics.forProtocol`](#metricsforprotocol)
4345
* [Events](#events)
4446
* [`libp2p`](#libp2p)
47+
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
4548
* [`libp2p.peerStore`](#libp2ppeerStore)
4649
* [Types](#types)
4750
* [`Stats`](#stats)
@@ -999,6 +1002,28 @@ const handler = (msg) => {
9991002
libp2p.pubsub.unsubscribe(topic, handler)
10001003
```
10011004

1005+
### connectionManager.get
1006+
1007+
Get a connection with a given peer, if it exists.
1008+
1009+
#### Parameters
1010+
1011+
| Name | Type | Description |
1012+
|------|------|-------------|
1013+
| peerId | [`PeerId`][peer-id] | The peer to find |
1014+
1015+
#### Returns
1016+
1017+
| Type | Description |
1018+
|------|-------------|
1019+
| [`Connection`][connection] | Connection with the given peer |
1020+
1021+
#### Example
1022+
1023+
```js
1024+
libp2p.connectionManager.get(peerId)
1025+
```
1026+
10021027
### connectionManager.setPeerValue
10031028

10041029
Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.
@@ -1025,6 +1050,17 @@ libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
10251050
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
10261051
```
10271052

1053+
### connectionManager.size
1054+
1055+
Getter for obtaining the current number of open connections.
1056+
1057+
#### Example
1058+
1059+
```js
1060+
libp2p.connectionManager.size
1061+
// 10
1062+
```
1063+
10281064
### metrics.global
10291065

10301066
A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node.
@@ -1126,21 +1162,23 @@ unless they are performing a specific action. See [peer discovery and auto dial]
11261162

11271163
- `peer`: instance of [`PeerId`][peer-id]
11281164

1165+
### libp2p.connectionManager
1166+
11291167
#### A new connection to a peer has been opened
11301168

11311169
This event will be triggered anytime a new Connection is established to another peer.
11321170

1133-
`libp2p.on('peer:connect', (peer) => {})`
1171+
`libp2p.on('peer:connect', (connection) => {})`
11341172

1135-
- `peer`: instance of [`PeerId`][peer-id]
1173+
- `connection`: instance of [`Connection`][connection]
11361174

11371175
#### An existing connection to a peer has been closed
11381176

11391177
This event will be triggered anytime we are disconnected from another peer, regardless of the circumstances of that disconnection. If we happen to have multiple connections to a peer, this event will **only** be triggered when the last connection is closed.
11401178

1141-
`libp2p.on('peer:disconnect', (peer) => {})`
1179+
`libp2p.on('peer:disconnect', (connection) => {})`
11421180

1143-
- `peer`: instance of [`PeerId`][peer-id]
1181+
- `connection`: instance of [`Connection`][connection]
11441182

11451183
### libp2p.peerStore
11461184

src/circuit/circuit/hop.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({
4141
// Get the connection to the destination (stop) peer
4242
const destinationPeer = new PeerId(request.dstPeer.id)
4343

44-
const destinationConnection = circuit._registrar.getConnection(destinationPeer)
44+
const destinationConnection = circuit._connectionManager.get(destinationPeer)
4545
if (!destinationConnection && !circuit._options.hop.active) {
4646
log('HOP request received but we are not connected to the destination peer')
4747
return streamHandler.end({

src/circuit/index.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class Circuit {
2929
constructor ({ libp2p, upgrader }) {
3030
this._dialer = libp2p.dialer
3131
this._registrar = libp2p.registrar
32+
this._connectionManager = libp2p.connectionManager
3233
this._upgrader = upgrader
3334
this._options = libp2p._config.relay
3435
this.addresses = libp2p.addresses
@@ -107,7 +108,7 @@ class Circuit {
107108
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
108109

109110
let disconnectOnFailure = false
110-
let relayConnection = this._registrar.getConnection(relayPeer)
111+
let relayConnection = this._connectionManager.get(relayPeer)
111112
if (!relayConnection) {
112113
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
113114
disconnectOnFailure = true

src/connection-manager/index.js

+107-15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ const LatencyMonitor = require('latency-monitor').default
66
const debug = require('debug')('libp2p:connection-manager')
77
const retimer = require('retimer')
88

9+
const { EventEmitter } = require('events')
10+
11+
const PeerId = require('peer-id')
12+
const { Connection } = require('libp2p-interfaces/src/connection')
13+
914
const {
1015
ERR_INVALID_PARAMETERS
1116
} = require('../errors')
@@ -22,7 +27,12 @@ const defaultOptions = {
2227
defaultPeerValue: 1
2328
}
2429

25-
class ConnectionManager {
30+
/**
31+
* Responsible for managing known connections.
32+
* @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
33+
* @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected.
34+
*/
35+
class ConnectionManager extends EventEmitter {
2636
/**
2737
* @constructor
2838
* @param {Libp2p} libp2p
@@ -38,30 +48,50 @@ class ConnectionManager {
3848
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
3949
*/
4050
constructor (libp2p, options) {
51+
super()
52+
4153
this._libp2p = libp2p
42-
this._registrar = libp2p.registrar
4354
this._peerId = libp2p.peerId.toB58String()
55+
4456
this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
4557
if (this._options.maxConnections < this._options.minConnections) {
4658
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
4759
}
4860

4961
debug('options: %j', this._options)
5062

51-
this._metrics = libp2p.metrics
63+
this._libp2p = libp2p
5264

65+
/**
66+
* Map of peer identifiers to their peer value for pruning connections.
67+
* @type {Map<string, number>}
68+
*/
5369
this._peerValues = new Map()
54-
this._connections = new Map()
70+
71+
/**
72+
* Map of connections per peer
73+
* @type {Map<string, Array<conn>>}
74+
*/
75+
this.connections = new Map()
76+
5577
this._timer = null
5678
this._checkMetrics = this._checkMetrics.bind(this)
5779
}
5880

81+
/**
82+
* Get current number of open connections.
83+
*/
84+
get size () {
85+
return Array.from(this.connections.values())
86+
.reduce((accumulator, value) => accumulator + value.length, 0)
87+
}
88+
5989
/**
6090
* Starts the Connection Manager. If Metrics are not enabled on libp2p
6191
* only event loop and connection limits will be monitored.
6292
*/
6393
start () {
64-
if (this._metrics) {
94+
if (this._libp2p.metrics) {
6595
this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval)
6696
}
6797

@@ -77,13 +107,33 @@ class ConnectionManager {
77107

78108
/**
79109
* Stops the Connection Manager
110+
* @async
80111
*/
81-
stop () {
112+
async stop () {
82113
this._timer && this._timer.clear()
83114
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
115+
116+
await this._close()
84117
debug('stopped')
85118
}
86119

120+
/**
121+
* Cleans up the connections
122+
* @async
123+
*/
124+
async _close () {
125+
// Close all connections we're tracking
126+
const tasks = []
127+
for (const connectionList of this.connections.values()) {
128+
for (const connection of connectionList) {
129+
tasks.push(connection.close())
130+
}
131+
}
132+
133+
await tasks
134+
this.connections.clear()
135+
}
136+
87137
/**
88138
* Sets the value of the given peer. Peers with lower values
89139
* will be disconnected first.
@@ -106,7 +156,7 @@ class ConnectionManager {
106156
* @private
107157
*/
108158
_checkMetrics () {
109-
const movingAverages = this._metrics.global.movingAverages
159+
const movingAverages = this._libp2p.metrics.global.movingAverages
110160
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
111161
this._checkLimit('maxReceivedData', received)
112162
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
@@ -122,21 +172,63 @@ class ConnectionManager {
122172
* @param {Connection} connection
123173
*/
124174
onConnect (connection) {
175+
if (!Connection.isConnection(connection)) {
176+
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
177+
}
178+
125179
const peerId = connection.remotePeer.toB58String()
126-
this._connections.set(connection.id, connection)
180+
const storedConn = this.connections.get(peerId)
181+
182+
if (storedConn) {
183+
storedConn.push(connection)
184+
} else {
185+
this.connections.set(peerId, [connection])
186+
this.emit('peer:connect', connection)
187+
}
188+
127189
if (!this._peerValues.has(peerId)) {
128190
this._peerValues.set(peerId, this._options.defaultPeerValue)
129191
}
130-
this._checkLimit('maxConnections', this._connections.size)
192+
193+
this._checkLimit('maxConnections', this.size)
131194
}
132195

133196
/**
134197
* Removes the connection from tracking
135198
* @param {Connection} connection
136199
*/
137200
onDisconnect (connection) {
138-
this._connections.delete(connection.id)
139-
this._peerValues.delete(connection.remotePeer.toB58String())
201+
const peerId = connection.remotePeer.toB58String()
202+
let storedConn = this.connections.get(peerId)
203+
204+
if (storedConn && storedConn.length > 1) {
205+
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
206+
this.connections.set(peerId, storedConn)
207+
} else if (storedConn) {
208+
this.connections.delete(peerId)
209+
this._peerValues.delete(connection.remotePeer.toB58String())
210+
this.emit('peer:disconnect', connection)
211+
}
212+
}
213+
214+
/**
215+
* Get a connection with a peer.
216+
* @param {PeerId} peerId
217+
* @returns {Connection}
218+
*/
219+
get (peerId) {
220+
if (!PeerId.isPeerId(peerId)) {
221+
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
222+
}
223+
224+
const id = peerId.toB58String()
225+
const connections = this.connections.get(id)
226+
227+
// Return the first, open connection
228+
if (connections) {
229+
return connections.find(connection => connection.stat.status === 'open')
230+
}
231+
return null
140232
}
141233

142234
/**
@@ -169,17 +261,17 @@ class ConnectionManager {
169261
* @private
170262
*/
171263
_maybeDisconnectOne () {
172-
if (this._options.minConnections < this._connections.size) {
264+
if (this._options.minConnections < this.connections.size) {
173265
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
174266
debug('%s: sorted peer values: %j', this._peerId, peerValues)
175267
const disconnectPeer = peerValues[0]
176268
if (disconnectPeer) {
177269
const peerId = disconnectPeer[0]
178270
debug('%s: lowest value peer is %s', this._peerId, peerId)
179271
debug('%s: closing a connection to %j', this._peerId, peerId)
180-
for (const connection of this._connections.values()) {
181-
if (connection.remotePeer.toB58String() === peerId) {
182-
connection.close()
272+
for (const connections of this.connections.values()) {
273+
if (connections[0].remotePeer.toB58String() === peerId) {
274+
connections[0].close()
183275
break
184276
}
185277
}

0 commit comments

Comments
 (0)