@@ -6,6 +6,11 @@ const LatencyMonitor = require('latency-monitor').default
6
6
const debug = require ( 'debug' ) ( 'libp2p:connection-manager' )
7
7
const retimer = require ( 'retimer' )
8
8
9
+ const { EventEmitter } = require ( 'events' )
10
+
11
+ const PeerId = require ( 'peer-id' )
12
+ const { Connection } = require ( 'libp2p-interfaces/src/connection' )
13
+
9
14
const {
10
15
ERR_INVALID_PARAMETERS
11
16
} = require ( '../errors' )
@@ -22,7 +27,12 @@ const defaultOptions = {
22
27
defaultPeerValue : 1
23
28
}
24
29
25
- class ConnectionManager {
30
+ /**
31
+ * Responsible for managing known connections.
32
+ * @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
33
+ * @fires ConnectionManager#peer:disconnect Emitted when a known peer supports a different set of protocols.
34
+ */
35
+ class ConnectionManager extends EventEmitter {
26
36
/**
27
37
* @constructor
28
38
* @param {Libp2p } libp2p
@@ -38,30 +48,50 @@ class ConnectionManager {
38
48
* @param {Number } options.defaultPeerValue The value of the peer. Default=1
39
49
*/
40
50
constructor ( libp2p , options ) {
51
+ super ( )
52
+
41
53
this . _libp2p = libp2p
42
- this . _registrar = libp2p . registrar
43
54
this . _peerId = libp2p . peerId . toB58String ( )
55
+
44
56
this . _options = mergeOptions . call ( { ignoreUndefined : true } , defaultOptions , options )
45
57
if ( this . _options . maxConnections < this . _options . minConnections ) {
46
58
throw errcode ( new Error ( 'Connection Manager maxConnections must be greater than minConnections' ) , ERR_INVALID_PARAMETERS )
47
59
}
48
60
49
61
debug ( 'options: %j' , this . _options )
50
62
51
- this . _metrics = libp2p . metrics
63
+ this . _libp2p = libp2p
52
64
65
+ /**
66
+ * Map of peer identifiers to their peer value for pruning connections.
67
+ * @type {Map<string, number> }
68
+ */
53
69
this . _peerValues = new Map ( )
54
- this . _connections = new Map ( )
70
+
71
+ /**
72
+ * Map of connections per peer
73
+ * @type {Map<string, Array<conn>> }
74
+ */
75
+ this . connections = new Map ( )
76
+
55
77
this . _timer = null
56
78
this . _checkMetrics = this . _checkMetrics . bind ( this )
57
79
}
58
80
81
+ /**
82
+ * Get current number of open connections.
83
+ */
84
+ get size ( ) {
85
+ return Array . from ( this . connections . values ( ) )
86
+ . reduce ( ( accumulator , value ) => accumulator + value . length , 0 )
87
+ }
88
+
59
89
/**
60
90
* Starts the Connection Manager. If Metrics are not enabled on libp2p
61
91
* only event loop and connection limits will be monitored.
62
92
*/
63
93
start ( ) {
64
- if ( this . _metrics ) {
94
+ if ( this . _libp2p . metrics ) {
65
95
this . _timer = this . _timer || retimer ( this . _checkMetrics , this . _options . pollInterval )
66
96
}
67
97
@@ -77,13 +107,33 @@ class ConnectionManager {
77
107
78
108
/**
79
109
* Stops the Connection Manager
110
+ * @async
80
111
*/
81
- stop ( ) {
112
+ async stop ( ) {
82
113
this . _timer && this . _timer . clear ( )
83
114
this . _latencyMonitor && this . _latencyMonitor . removeListener ( 'data' , this . _onLatencyMeasure )
115
+
116
+ await this . _close ( )
84
117
debug ( 'stopped' )
85
118
}
86
119
120
+ /**
121
+ * Cleans up the connections
122
+ * @async
123
+ */
124
+ async _close ( ) {
125
+ // Close all connections we're tracking
126
+ const tasks = [ ]
127
+ for ( const connectionList of this . connections . values ( ) ) {
128
+ for ( const connection of connectionList ) {
129
+ tasks . push ( connection . close ( ) )
130
+ }
131
+ }
132
+
133
+ await tasks
134
+ this . connections . clear ( )
135
+ }
136
+
87
137
/**
88
138
* Sets the value of the given peer. Peers with lower values
89
139
* will be disconnected first.
@@ -106,7 +156,7 @@ class ConnectionManager {
106
156
* @private
107
157
*/
108
158
_checkMetrics ( ) {
109
- const movingAverages = this . _metrics . global . movingAverages
159
+ const movingAverages = this . _libp2p . metrics . global . movingAverages
110
160
const received = movingAverages . dataReceived [ this . _options . movingAverageInterval ] . movingAverage ( )
111
161
this . _checkLimit ( 'maxReceivedData' , received )
112
162
const sent = movingAverages . dataSent [ this . _options . movingAverageInterval ] . movingAverage ( )
@@ -122,21 +172,65 @@ class ConnectionManager {
122
172
* @param {Connection } connection
123
173
*/
124
174
onConnect ( connection ) {
125
- const peerId = connection . remotePeer . toB58String ( )
126
- this . _connections . set ( connection . id , connection )
127
- if ( ! this . _peerValues . has ( peerId ) ) {
128
- this . _peerValues . set ( peerId , this . _options . defaultPeerValue )
175
+ if ( ! Connection . isConnection ( connection ) ) {
176
+ throw errcode ( new Error ( 'conn must be an instance of interface-connection' ) , ERR_INVALID_PARAMETERS )
129
177
}
130
- this . _checkLimit ( 'maxConnections' , this . _connections . size )
178
+
179
+ const peerId = connection . remotePeer
180
+ const peerIdStr = peerId . toB58String ( )
181
+ const storedConn = this . connections . get ( peerIdStr )
182
+
183
+ if ( storedConn ) {
184
+ storedConn . push ( connection )
185
+ } else {
186
+ this . connections . set ( peerIdStr , [ connection ] )
187
+ this . emit ( 'peer:connect' , peerId )
188
+ }
189
+
190
+ if ( ! this . _peerValues . has ( peerIdStr ) ) {
191
+ this . _peerValues . set ( peerIdStr , this . _options . defaultPeerValue )
192
+ }
193
+
194
+ this . _checkLimit ( 'maxConnections' , this . size )
131
195
}
132
196
133
197
/**
134
198
* Removes the connection from tracking
135
199
* @param {Connection } connection
136
200
*/
137
201
onDisconnect ( connection ) {
138
- this . _connections . delete ( connection . id )
139
- this . _peerValues . delete ( connection . remotePeer . toB58String ( ) )
202
+ const peerId = connection . remotePeer
203
+ const peerIdStr = peerId . toB58String ( )
204
+ let storedConn = this . connections . get ( peerIdStr )
205
+
206
+ if ( storedConn && storedConn . length > 1 ) {
207
+ storedConn = storedConn . filter ( ( conn ) => conn . id !== connection . id )
208
+ this . connections . set ( peerIdStr , storedConn )
209
+ } else if ( storedConn ) {
210
+ this . connections . delete ( peerIdStr )
211
+ this . _peerValues . delete ( connection . remotePeer . toB58String ( ) )
212
+ this . emit ( 'peer:disconnect' , peerId )
213
+ }
214
+ }
215
+
216
+ /**
217
+ * Get a connection with a peer.
218
+ * @param {PeerId } peerId
219
+ * @returns {Connection }
220
+ */
221
+ get ( peerId ) {
222
+ if ( ! PeerId . isPeerId ( peerId ) ) {
223
+ throw errcode ( new Error ( 'peerId must be an instance of peer-id' ) , ERR_INVALID_PARAMETERS )
224
+ }
225
+
226
+ const id = peerId . toB58String ( )
227
+ const connections = this . connections . get ( id )
228
+
229
+ // Return the first, open connection
230
+ if ( connections ) {
231
+ return connections . find ( connection => connection . stat . status === 'open' )
232
+ }
233
+ return null
140
234
}
141
235
142
236
/**
@@ -169,17 +263,17 @@ class ConnectionManager {
169
263
* @private
170
264
*/
171
265
_maybeDisconnectOne ( ) {
172
- if ( this . _options . minConnections < this . _connections . size ) {
266
+ if ( this . _options . minConnections < this . connections . size ) {
173
267
const peerValues = Array . from ( this . _peerValues ) . sort ( byPeerValue )
174
268
debug ( '%s: sorted peer values: %j' , this . _peerId , peerValues )
175
269
const disconnectPeer = peerValues [ 0 ]
176
270
if ( disconnectPeer ) {
177
271
const peerId = disconnectPeer [ 0 ]
178
272
debug ( '%s: lowest value peer is %s' , this . _peerId , peerId )
179
273
debug ( '%s: closing a connection to %j' , this . _peerId , peerId )
180
- for ( const connection of this . _connections . values ( ) ) {
181
- if ( connection . remotePeer . toB58String ( ) === peerId ) {
182
- connection . close ( )
274
+ for ( const connections of this . connections . values ( ) ) {
275
+ if ( connections [ 0 ] . remotePeer . toB58String ( ) === peerId ) {
276
+ connections [ 0 ] . close ( )
183
277
break
184
278
}
185
279
}
0 commit comments