Skip to content

Commit d7bbde6

Browse files
committed
chore: refactor connection manager and registrar
1 parent b283126 commit d7bbde6

File tree

16 files changed

+382
-374
lines changed

16 files changed

+382
-374
lines changed

doc/API.md

+39-1
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
* [`handle`](#handle)
1212
* [`unhandle`](#unhandle)
1313
* [`ping`](#ping)
14-
* [`peerRouting.findPeer`](#peerroutingfindpeer)
1514
* [`contentRouting.findProviders`](#contentroutingfindproviders)
1615
* [`contentRouting.provide`](#contentroutingprovide)
1716
* [`contentRouting.put`](#contentroutingput)
1817
* [`contentRouting.get`](#contentroutingget)
1918
* [`contentRouting.getMany`](#contentroutinggetmany)
19+
* [`peerRouting.findPeer`](#peerroutingfindpeer)
2020
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
2121
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
2222
* [`peerStore.addressBook.get`](#peerstoreaddressbookget)
@@ -34,14 +34,17 @@
3434
* [`pubsub.publish`](#pubsubpublish)
3535
* [`pubsub.subscribe`](#pubsubsubscribe)
3636
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
37+
* [`connectionManager.get`](#connectionmanagerget)
3738
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
39+
* [`connectionManager.size`](#connectionmanagersize)
3840
* [`metrics.global`](#metricsglobal)
3941
* [`metrics.peers`](#metricspeers)
4042
* [`metrics.protocols`](#metricsprotocols)
4143
* [`metrics.forPeer`](#metricsforpeer)
4244
* [`metrics.forProtocol`](#metricsforprotocol)
4345
* [Events](#events)
4446
* [`libp2p`](#libp2p)
47+
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
4548
* [`libp2p.peerStore`](#libp2ppeerStore)
4649
* [Types](#types)
4750
* [`Stats`](#stats)
@@ -999,6 +1002,28 @@ const handler = (msg) => {
9991002
libp2p.pubsub.unsubscribe(topic, handler)
10001003
```
10011004

1005+
### connectionManager.get
1006+
1007+
Get a connection with a given peer, if it exists.
1008+
1009+
#### Parameters
1010+
1011+
| Name | Type | Description |
1012+
|------|------|-------------|
1013+
| peerId | [`PeerId`][peer-id] | The peer to find |
1014+
1015+
#### Returns
1016+
1017+
| Type | Description |
1018+
|------|-------------|
1019+
| [`Connection`][connection] | Connection with the given peer |
1020+
1021+
#### Example
1022+
1023+
```js
1024+
libp2p.connectionManager.get(peerId)
1025+
```
1026+
10021027
### connectionManager.setPeerValue
10031028

10041029
Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.
@@ -1025,6 +1050,17 @@ libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
10251050
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
10261051
```
10271052

1053+
### connectionManager.size
1054+
1055+
Getter for obtaining the current number of open connections.
1056+
1057+
#### Example
1058+
1059+
```js
1060+
libp2p.connectionManager.size
1061+
// 10
1062+
```
1063+
10281064
### metrics.global
10291065

10301066
A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node.
@@ -1126,6 +1162,8 @@ unless they are performing a specific action. See [peer discovery and auto dial]
11261162

11271163
- `peer`: instance of [`PeerId`][peer-id]
11281164

1165+
### libp2p.connectionManager
1166+
11291167
#### A new connection to a peer has been opened
11301168

11311169
This event will be triggered anytime a new Connection is established to another peer.

src/circuit/circuit/hop.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({
4141
// Get the connection to the destination (stop) peer
4242
const destinationPeer = new PeerId(request.dstPeer.id)
4343

44-
const destinationConnection = circuit._registrar.getConnection(destinationPeer)
44+
const destinationConnection = circuit._connectionManager.get(destinationPeer)
4545
if (!destinationConnection && !circuit._options.hop.active) {
4646
log('HOP request received but we are not connected to the destination peer')
4747
return streamHandler.end({

src/circuit/index.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class Circuit {
2929
constructor ({ libp2p, upgrader }) {
3030
this._dialer = libp2p.dialer
3131
this._registrar = libp2p.registrar
32+
this._connectionManager = libp2p.connectionManager
3233
this._upgrader = upgrader
3334
this._options = libp2p._config.relay
3435
this.addresses = libp2p.addresses
@@ -107,7 +108,7 @@ class Circuit {
107108
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
108109

109110
let disconnectOnFailure = false
110-
let relayConnection = this._registrar.getConnection(relayPeer)
111+
let relayConnection = this._connectionManager.get(relayPeer)
111112
if (!relayConnection) {
112113
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
113114
disconnectOnFailure = true

src/connection-manager/index.js

+109-15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ const LatencyMonitor = require('latency-monitor').default
66
const debug = require('debug')('libp2p:connection-manager')
77
const retimer = require('retimer')
88

9+
const { EventEmitter } = require('events')
10+
11+
const PeerId = require('peer-id')
12+
const { Connection } = require('libp2p-interfaces/src/connection')
13+
914
const {
1015
ERR_INVALID_PARAMETERS
1116
} = require('../errors')
@@ -22,7 +27,12 @@ const defaultOptions = {
2227
defaultPeerValue: 1
2328
}
2429

25-
class ConnectionManager {
30+
/**
31+
* Responsible for managing known connections.
32+
* @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
33+
* @fires ConnectionManager#peer:disconnect Emitted when a known peer supports a different set of protocols.
34+
*/
35+
class ConnectionManager extends EventEmitter {
2636
/**
2737
* @constructor
2838
* @param {Libp2p} libp2p
@@ -38,9 +48,11 @@ class ConnectionManager {
3848
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
3949
*/
4050
constructor (libp2p, options) {
51+
super()
52+
4153
this._libp2p = libp2p
42-
this._registrar = libp2p.registrar
4354
this._peerId = libp2p.peerId.toB58String()
55+
4456
this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
4557
if (this._options.maxConnections < this._options.minConnections) {
4658
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
@@ -50,12 +62,30 @@ class ConnectionManager {
5062

5163
this._metrics = libp2p.metrics
5264

65+
/**
66+
* Map of peer identifiers to their peer value for pruning connections.
67+
* @type {Map<string, number>}
68+
*/
5369
this._peerValues = new Map()
54-
this._connections = new Map()
70+
71+
/**
72+
* Map of connections per peer
73+
* @type {Map<string, Array<conn>>}
74+
*/
75+
this.connections = new Map()
76+
5577
this._timer = null
5678
this._checkMetrics = this._checkMetrics.bind(this)
5779
}
5880

81+
/**
82+
* Get current number of open connections.
83+
*/
84+
get size () {
85+
return Array.from(this.connections.values())
86+
.reduce((accumulator, value) => accumulator + value.length, 0)
87+
}
88+
5989
/**
6090
* Starts the Connection Manager. If Metrics are not enabled on libp2p
6191
* only event loop and connection limits will be monitored.
@@ -77,13 +107,33 @@ class ConnectionManager {
77107

78108
/**
79109
* Stops the Connection Manager
110+
* @async
80111
*/
81-
stop () {
112+
async stop () {
82113
this._timer && this._timer.clear()
83114
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)
115+
116+
await this._close()
84117
debug('stopped')
85118
}
86119

120+
/**
121+
* Cleans up the connections
122+
* @async
123+
*/
124+
async _close () {
125+
// Close all connections we're tracking
126+
const tasks = []
127+
for (const connectionList of this.connections.values()) {
128+
for (const connection of connectionList) {
129+
tasks.push(connection.close())
130+
}
131+
}
132+
133+
await tasks
134+
this.connections.clear()
135+
}
136+
87137
/**
88138
* Sets the value of the given peer. Peers with lower values
89139
* will be disconnected first.
@@ -122,21 +172,65 @@ class ConnectionManager {
122172
* @param {Connection} connection
123173
*/
124174
onConnect (connection) {
125-
const peerId = connection.remotePeer.toB58String()
126-
this._connections.set(connection.id, connection)
127-
if (!this._peerValues.has(peerId)) {
128-
this._peerValues.set(peerId, this._options.defaultPeerValue)
175+
if (!Connection.isConnection(connection)) {
176+
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
129177
}
130-
this._checkLimit('maxConnections', this._connections.size)
178+
179+
const peerId = connection.remotePeer
180+
const peerIdStr = peerId.toB58String()
181+
const storedConn = this.connections.get(peerIdStr)
182+
183+
if (storedConn) {
184+
storedConn.push(connection)
185+
} else {
186+
this.connections.set(peerIdStr, [connection])
187+
this.emit('peer:connect', peerId)
188+
}
189+
190+
if (!this._peerValues.has(peerIdStr)) {
191+
this._peerValues.set(peerIdStr, this._options.defaultPeerValue)
192+
}
193+
194+
this._checkLimit('maxConnections', this.size)
131195
}
132196

133197
/**
134198
* Removes the connection from tracking
135199
* @param {Connection} connection
136200
*/
137201
onDisconnect (connection) {
138-
this._connections.delete(connection.id)
139-
this._peerValues.delete(connection.remotePeer.toB58String())
202+
const peerId = connection.remotePeer
203+
const peerIdStr = peerId.toB58String()
204+
let storedConn = this.connections.get(peerIdStr)
205+
206+
if (storedConn && storedConn.length > 1) {
207+
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
208+
this.connections.set(peerIdStr, storedConn)
209+
} else if (storedConn) {
210+
this.connections.delete(peerIdStr)
211+
this._peerValues.delete(connection.remotePeer.toB58String())
212+
this.emit('peer:disconnect', peerId)
213+
}
214+
}
215+
216+
/**
217+
* Get a connection with a peer.
218+
* @param {PeerId} peerId
219+
* @returns {Connection}
220+
*/
221+
get (peerId) {
222+
if (!PeerId.isPeerId(peerId)) {
223+
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
224+
}
225+
226+
const id = peerId.toB58String()
227+
const connections = this.connections.get(id)
228+
229+
// Return the first, open connection
230+
if (connections) {
231+
return connections.find(connection => connection.stat.status === 'open')
232+
}
233+
return null
140234
}
141235

142236
/**
@@ -169,17 +263,17 @@ class ConnectionManager {
169263
* @private
170264
*/
171265
_maybeDisconnectOne () {
172-
if (this._options.minConnections < this._connections.size) {
266+
if (this._options.minConnections < this.connections.size) {
173267
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
174268
debug('%s: sorted peer values: %j', this._peerId, peerValues)
175269
const disconnectPeer = peerValues[0]
176270
if (disconnectPeer) {
177271
const peerId = disconnectPeer[0]
178272
debug('%s: lowest value peer is %s', this._peerId, peerId)
179273
debug('%s: closing a connection to %j', this._peerId, peerId)
180-
for (const connection of this._connections.values()) {
181-
if (connection.remotePeer.toB58String() === peerId) {
182-
connection.close()
274+
for (const connections of this.connections.values()) {
275+
if (connections[0].remotePeer.toB58String() === peerId) {
276+
connections[0].close()
183277
break
184278
}
185279
}

src/identify/index.js

+15-8
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,23 @@ class IdentifyService {
4545
/**
4646
* @constructor
4747
* @param {object} options
48-
* @param {Registrar} options.registrar
48+
* @param {PeerStore} options.peerStore
49+
* @param {ConnectionManager} options.connectionManager
4950
* @param {Map<string, handler>} options.protocols A reference to the protocols we support
5051
* @param {PeerId} options.peerId The peer running the identify service
5152
* @param {{ listen: Array<Multiaddr>}} options.addresses The peer aaddresses
5253
*/
5354
constructor (options) {
5455
/**
55-
* @property {Registrar}
56+
* @property {PeerStore}
5657
*/
57-
this.registrar = options.registrar
58+
this.peerStore = options.peerStore
59+
60+
/**
61+
* @property {ConnectionManager}
62+
*/
63+
this.connectionManager = options.connectionManager
64+
5865
/**
5966
* @property {PeerId}
6067
*/
@@ -103,7 +110,7 @@ class IdentifyService {
103110
const connections = []
104111
let connection
105112
for (const peer of peerStore.peers.values()) {
106-
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.registrar.getConnection(peer.id))) {
113+
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
107114
connections.push(connection)
108115
}
109116
}
@@ -159,8 +166,8 @@ class IdentifyService {
159166
observedAddr = IdentifyService.getCleanMultiaddr(observedAddr)
160167

161168
// Update peers data in PeerStore
162-
this.registrar.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
163-
this.registrar.peerStore.protoBook.set(id, protocols)
169+
this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
170+
this.peerStore.protoBook.set(id, protocols)
164171

165172
// TODO: Track our observed address so that we can score it
166173
log('received observed address of %s', observedAddr)
@@ -244,13 +251,13 @@ class IdentifyService {
244251
// Update peers data in PeerStore
245252
const id = connection.remotePeer
246253
try {
247-
this.registrar.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
254+
this.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
248255
} catch (err) {
249256
return log.error('received invalid listen addrs', err)
250257
}
251258

252259
// Update the protocols
253-
this.registrar.peerStore.protoBook.set(id, message.protocols)
260+
this.peerStore.protoBook.set(id, message.protocols)
254261
}
255262
}
256263

0 commit comments

Comments
 (0)