Skip to content

Commit 6f72c08

Browse files
committed
chore: support multiple conns
1 parent a5dc7d1 commit 6f72c08

File tree

4 files changed

+139
-113
lines changed

4 files changed

+139
-113
lines changed

src/connection-manager/topology.js

+49-16
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,74 @@
11
'use strict'
22

3+
const assert = require('assert')
4+
35
class Topology {
46
/**
57
* @param {Object} props
68
* @param {number} props.min minimum needed connections (default: 0)
79
* @param {number} props.max maximum needed connections (default: Infinity)
8-
* @param {function} props.onConnect protocol "onConnect" handler
9-
* @param {function} props.onDisconnect protocol "onDisconnect" handler
1010
* @param {Array<string>} props.multicodecs protocol multicodecs
11-
* @param {Registrar} registrar
11+
* @param {Object} props.handlers
12+
* @param {function} props.handlers.onConnect protocol "onConnect" handler
13+
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
1214
* @constructor
1315
*/
1416
constructor ({
1517
min = 0,
1618
max = Infinity,
17-
onConnect,
18-
onDisconnect,
1919
multicodecs,
20-
registrar,
21-
peerStore
20+
handlers
2221
}) {
23-
this.multicodecs = multicodecs
24-
this.registrar = registrar
22+
assert(multicodecs, 'one or more multicodec should be provided')
23+
assert(handlers, 'the handlers should be provided')
24+
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
25+
'the \'onConnect\' handler must be provided')
26+
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
27+
'the \'onDisconnect\' handler must be provided')
28+
29+
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
2530
this.min = min
2631
this.max = max
27-
this.peers = new Map()
2832

2933
// Handlers
30-
this._onConnect = onConnect
31-
this._onDisconnect = onDisconnect
34+
this._onConnect = handlers.onConnect
35+
this._onDisconnect = handlers.onDisconnect
36+
37+
this.peers = new Map()
38+
this._registrar = undefined
3239

3340
this._onProtocolChange = this._onProtocolChange.bind(this)
41+
}
42+
43+
set registrar (registrar) {
44+
this._registrar = registrar
45+
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
3446

35-
// Set by the registrar
36-
this._peerStore = peerStore
47+
// Add connected peers to the topology
48+
this._addConnectedPeers()
49+
// TODO: remaining peers in the store
50+
}
3751

38-
this._peerStore.on('change:protocols', this._onProtocolChange)
52+
/**
53+
* Add connected peers to the topology.
54+
*/
55+
_addConnectedPeers () {
56+
const knownPeers = []
57+
58+
for (const [, peer] of this._registrar.peerStore.peers) {
59+
if (this.multicodecs.filter(multicodec => peer.protocols.has(multicodec))) {
60+
knownPeers.push(peer)
61+
}
62+
}
63+
64+
for (const [id, conn] of this._registrar.connections.entries()) {
65+
const targetPeer = knownPeers.find((peerInfo) => peerInfo.id.toB58String() === id)
66+
67+
if (targetPeer) {
68+
// TODO: what should we return
69+
this.tryToConnect(targetPeer, conn[0])
70+
}
71+
}
3972
}
4073

4174
/**
@@ -85,7 +118,7 @@ class Topology {
85118
// New to protocol support
86119
for (const protocol of protocols) {
87120
if (this.multicodecs.includes(protocol)) {
88-
this.tryToConnect(peerInfo, this.registrar.getConnection(peerInfo))
121+
this.tryToConnect(peerInfo, this._registrar.getConnection(peerInfo))
89122
return
90123
}
91124
}

src/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class Libp2p extends EventEmitter {
7878
onConnectionEnd: (connection) => {
7979
const peerInfo = getPeerInfo(connection.remotePeer)
8080

81-
this.registrar.onDisconnect(peerInfo)
81+
this.registrar.onDisconnect(peerInfo, connection)
8282
this.emit('peer:disconnect', peerInfo)
8383
}
8484
})

src/registrar.js

+34-59
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ const assert = require('assert')
44
const debug = require('debug')
55
const log = debug('libp2p:peer-store')
66
log.error = debug('libp2p:peer-store:error')
7-
const errCode = require('err-code')
87

98
const { Connection } = require('libp2p-interfaces/src/connection')
109
const PeerInfo = require('peer-info')
@@ -25,7 +24,7 @@ class Registrar {
2524
/**
2625
* Map of connections per peer
2726
* TODO: this should be handled by connectionManager
28-
* @type {Map<string, conn>}
27+
* @type {Map<string, Array<conn>>}
2928
*/
3029
this.connections = new Map()
3130

@@ -58,24 +57,39 @@ class Registrar {
5857
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
5958
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection')
6059

61-
this.connections.set(peerInfo.id.toB58String(), conn)
60+
const id = peerInfo.id.toB58String()
61+
const storedConn = this.connections.get(id)
62+
63+
if (storedConn) {
64+
storedConn.push(conn)
65+
} else {
66+
this.connections.set(id, [conn])
67+
}
6268
}
6369

6470
/**
6571
* Remove a disconnected peer from the record
6672
* TODO: this should live in the ConnectionManager
6773
* @param {PeerInfo} peerInfo
74+
* @param {Connection} connection
6875
* @param {Error} [error]
6976
* @returns {void}
7077
*/
71-
onDisconnect (peerInfo, error) {
78+
onDisconnect (peerInfo, connection, error) {
7279
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
7380

74-
for (const [, topology] of this.topologies) {
75-
topology.disconnect(peerInfo, error)
76-
}
81+
const id = peerInfo.id.toB58String()
82+
let storedConn = this.connections.get(id)
7783

78-
this.connections.delete(peerInfo.id.toB58String())
84+
if (storedConn && storedConn.length > 1) {
85+
storedConn = storedConn.filter((conn) => conn.id === connection.id)
86+
} else if (storedConn) {
87+
for (const [, topology] of this.topologies) {
88+
topology.disconnect(peerInfo, error)
89+
}
90+
91+
this.connections.delete(peerInfo.id.toB58String())
92+
}
7993
}
8094

8195
/**
@@ -86,71 +100,32 @@ class Registrar {
86100
getConnection (peerInfo) {
87101
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
88102

89-
return this.connections.get(peerInfo.id.toB58String())
103+
// TODO: what should we return
104+
return this.connections.get(peerInfo.id.toB58String())[0]
90105
}
91106

92107
/**
93108
* Register handlers for a set of multicodecs given
94-
* @param {Array<string>|string} multicodecs
95-
* @param {object} handlers
96-
* @param {function} handlers.onConnect
97-
* @param {function} handlers.onDisconnect
98-
* @param {object} topologyProps properties for topology
109+
* @param {Object} topologyProps properties for topology
110+
* @param {Array<string>|string} topologyProps.multicodecs
111+
* @param {Object} topologyProps.handlers
112+
* @param {function} topologyProps.handlers.onConnect
113+
* @param {function} topologyProps.handlers.onDisconnect
99114
* @return {string} registrar identifier
100115
*/
101-
register (multicodecs, handlers, topologyProps = {}) {
102-
if (!multicodecs) {
103-
throw errCode(new Error('one or more multicodec should be provided'), 'ERR_NO_MULTICODECS')
104-
} else if (!Array.isArray(multicodecs)) {
105-
multicodecs = [multicodecs]
106-
}
107-
108-
if (!handlers) {
109-
throw errCode(new Error('the handlers should be provided'), 'ERR_NO_HANDLERS')
110-
} else if (!handlers.onConnect || typeof handlers.onConnect !== 'function') {
111-
throw errCode(new Error('the \'onConnect\' handler must be provided'), 'ERR_NO_ONCONNECT_HANDLER')
112-
} else if (!handlers.onDisconnect || typeof handlers.onDisconnect !== 'function') {
113-
throw errCode(new Error('the \'onDisconnect\' handler must be provided'), 'ERR_NO_ONDISCONNECT_HANDLER')
114-
}
115-
116+
register (topologyProps) {
116117
// Create multicodec topology
117-
const topology = new Toplogy({
118-
onConnect: handlers.onConnect,
119-
onDisconnect: handlers.onDisconnect,
120-
registrar: this,
121-
multicodecs,
122-
peerStore: this.peerStore,
123-
...topologyProps
124-
})
125-
126118
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
127-
this.topologies.set(id, topology)
119+
const topology = new Toplogy(topologyProps)
128120

129-
this._addConnectedPeers(multicodecs, topology)
121+
this.topologies.set(id, topology)
130122

131-
// TODO: try to connect to peers-store peers according to current topology
123+
// Set registrar
124+
topology.registrar = this
132125

133126
return id
134127
}
135128

136-
_addConnectedPeers (multicodecs, topology) {
137-
const knownPeers = []
138-
139-
for (const [, peer] of this.peerStore.peers) {
140-
if (multicodecs.filter(multicodec => peer.protocols.has(multicodec))) {
141-
knownPeers.push(peer)
142-
}
143-
}
144-
145-
for (const [id, conn] of this.connections.entries()) {
146-
const targetPeer = knownPeers.find((peerInfo) => peerInfo.id.toB58String() === id)
147-
148-
if (targetPeer) {
149-
topology.tryToConnect(targetPeer, conn)
150-
}
151-
}
152-
}
153-
154129
/**
155130
* Unregister topology.
156131
* @param {string} id registrar identifier

0 commit comments

Comments
 (0)