Skip to content

Commit 08cb39d

Browse files
committed
fix: improve connection maintenance with circuit
1 parent a0abe0f commit 08cb39d

File tree

6 files changed

+137
-37
lines changed

6 files changed

+137
-37
lines changed

src/circuit/circuit/hop.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ const { validateAddrs } = require('./utils')
77
const StreamHandler = require('./stream-handler')
88
const { CircuitRelay: CircuitPB } = require('../protocol')
99
const pipe = require('it-pipe')
10+
const errCode = require('err-code')
11+
const { codes: Errors } = require('../../errors')
1012

1113
const { stop } = require('./stop')
1214

1315
const multicodec = require('./../multicodec')
1416

1517
const log = debug('libp2p:circuit:hop')
16-
log.err = debug('libp2p:circuit:hop:error')
18+
log.error = debug('libp2p:circuit:hop:error')
1719

1820
module.exports.handleHop = async function handleHop ({
1921
connection,
@@ -112,7 +114,8 @@ module.exports.hop = async function hop ({
112114
}
113115

114116
log('hop request failed with code %d, closing stream', response.code)
115-
return streamHandler.close()
117+
streamHandler.close()
118+
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
116119
}
117120

118121
/**

src/circuit/circuit/stream-handler.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class StreamHandler {
3232
const msg = await this.decoder.next()
3333
if (msg.value) {
3434
const value = CircuitPB.decode(msg.value.slice())
35-
log('read', value)
35+
log('read message type', value.type)
3636
return value
3737
}
3838

@@ -47,7 +47,7 @@ class StreamHandler {
4747
* @param {*} msg An unencoded CircuitRelay protobuf message
4848
*/
4949
write (msg) {
50-
log('write', msg)
50+
log('write message type %s', msg.type)
5151
this.shake.write(lp.encode.single(CircuitPB.encode(msg)))
5252
}
5353

src/circuit/index.js

+29-19
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
const mafmt = require('mafmt')
44
const multiaddr = require('multiaddr')
55
const PeerId = require('peer-id')
6+
const PeerInfo = require('peer-info')
67
const withIs = require('class-is')
78
const { CircuitRelay: CircuitPB } = require('./protocol')
89

@@ -103,26 +104,33 @@ class Circuit {
103104
const addrs = ma.toString().split('/p2p-circuit')
104105
const relayAddr = multiaddr(addrs[0])
105106
const destinationAddr = multiaddr(addrs[addrs.length - 1])
106-
107+
const relayPeer = PeerId.createFromCID(relayAddr.getPeerId())
107108
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())
108-
const relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
109-
const virtualConnection = await hop({
110-
connection: relayConnection,
111-
circuit: this,
112-
request: {
113-
type: CircuitPB.Type.HOP,
114-
srcPeer: {
115-
id: this.peerInfo.id.toBytes(),
116-
addrs: this.peerInfo.multiaddrs.toArray().map(addr => addr.buffer)
117-
},
118-
dstPeer: {
119-
id: destinationPeer.toBytes(),
120-
addrs: [multiaddr(destinationAddr).buffer]
109+
110+
let disconnectOnFailure = false
111+
let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer))
112+
if (!relayConnection) {
113+
relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
114+
disconnectOnFailure = true
115+
}
116+
117+
try {
118+
const virtualConnection = await hop({
119+
connection: relayConnection,
120+
circuit: this,
121+
request: {
122+
type: CircuitPB.Type.HOP,
123+
srcPeer: {
124+
id: this.peerInfo.id.toBytes(),
125+
addrs: this.peerInfo.multiaddrs.toArray().map(addr => addr.buffer)
126+
},
127+
dstPeer: {
128+
id: destinationPeer.toBytes(),
129+
addrs: [multiaddr(destinationAddr).buffer]
130+
}
121131
}
122-
}
123-
})
132+
})
124133

125-
if (virtualConnection) {
126134
const localAddr = relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerInfo.id.toB58String()}`)
127135
const maConn = toConnection({
128136
stream: virtualConnection,
@@ -132,8 +140,10 @@ class Circuit {
132140
log('new outbound connection %s', maConn.remoteAddr)
133141

134142
return this._upgrader.upgradeOutbound(maConn)
135-
} else {
136-
// TODO: throw an error
143+
} catch (err) {
144+
log.error('Circuit relay dial failed', err)
145+
disconnectOnFailure && await relayConnection.close()
146+
throw err
137147
}
138148
}
139149

src/circuit/listener.js

+5-8
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ const log = debug('libp2p:circuit:listener')
88
log.err = debug('libp2p:circuit:error:listener')
99

1010
/**
11-
* @param {object} properties
12-
* @param {Dialer} properties.dialer
13-
* @param {object} properties.options
11+
* @param {*} circuit
12+
* @returns {Listener} a transport listener
1413
*/
15-
module.exports = (circuit, options) => {
14+
module.exports = (circuit) => {
1615
const listener = new EventEmitter()
1716
const listeningAddrs = new Map()
1817

@@ -24,12 +23,10 @@ module.exports = (circuit, options) => {
2423
* @return {void}
2524
*/
2625
listener.listen = async (addr) => {
27-
let [addrString] = String(addr).split('/p2p-circuit').slice(-1)
26+
const [addrString] = String(addr).split('/p2p-circuit').slice(-1)
2827

2928
const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString))
30-
const relayedAddr = relayConn.remoteAddr.encapsulate(`/p2p-circuit`)
31-
32-
console.log('Relayed addr %s', String(relayedAddr))
29+
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
3330

3431
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
3532
listener.emit('listening')

src/errors.js

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ exports.codes = {
1616
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
1717
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
1818
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
19+
ERR_HOP_REQUEST_FAILED: 'ERR_HOP_REQUEST_FAILED',
1920
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
2021
ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE',
2122
ERR_INVALID_PEER: 'ERR_INVALID_PEER',

test/dialing/relay.node.js

+95-6
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
const chai = require('chai')
55
chai.use(require('dirty-chai'))
66
const { expect } = chai
7+
const sinon = require('sinon')
78

89
const multiaddr = require('multiaddr')
910
const { collect } = require('streaming-iterables')
1011
const pipe = require('it-pipe')
1112
const { createPeerInfoFromFixture } = require('../utils/creators/peer')
1213
const baseOptions = require('../utils/base-options')
1314
const Libp2p = require('../../src')
15+
const { codes: Errors } = require('../../src/errors')
1416

1517
describe('Dialing (via relay, TCP)', () => {
1618
let srcLibp2p
@@ -51,16 +53,10 @@ describe('Dialing (via relay, TCP)', () => {
5153
.encapsulate(`/p2p/${relayIdString}`)
5254
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
5355

54-
// Connect the target peer and the relay, since the relay is not active
55-
const destToRelayConn = await dstLibp2p.dial(relayAddr)
56-
expect(destToRelayConn).to.exist()
57-
5856
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
5957
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
6058
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
6159

62-
dstLibp2p.transportManager.getAddrs().forEach(addr => console.log(String(addr)))
63-
6460
const connection = await srcLibp2p.dial(dialAddr)
6561
expect(connection).to.exist()
6662
expect(connection.remotePeer.toBytes()).to.eql(dstLibp2p.peerInfo.id.toBytes())
@@ -82,4 +78,97 @@ describe('Dialing (via relay, TCP)', () => {
8278
)
8379
expect(output.slice()).to.eql(input)
8480
})
81+
82+
it('should fail to connect to a peer over a relay with inactive connections', async () => {
83+
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
84+
const relayIdString = relayLibp2p.peerInfo.id.toString()
85+
86+
const dialAddr = relayAddr
87+
.encapsulate(`/p2p/${relayIdString}`)
88+
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
89+
90+
try {
91+
await srcLibp2p.dial(dialAddr)
92+
expect.fail('Dial should have failed')
93+
} catch (err) {
94+
expect(err).to.exist()
95+
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
96+
}
97+
})
98+
99+
it('should not stay connected to a relay when not already connected and HOP fails', async () => {
100+
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
101+
const relayIdString = relayLibp2p.peerInfo.id.toString()
102+
103+
const dialAddr = relayAddr
104+
.encapsulate(`/p2p/${relayIdString}`)
105+
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
106+
107+
try {
108+
await srcLibp2p.dial(dialAddr)
109+
expect.fail('Dial should have failed')
110+
} catch (err) {
111+
expect(err).to.exist()
112+
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
113+
}
114+
115+
// We should not be connected to the relay, because we weren't before the dial
116+
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
117+
expect(srcToRelayConn).to.not.exist()
118+
})
119+
120+
it('dialer should stay connected to an already connected relay on hop failure', async () => {
121+
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
122+
const relayIdString = relayLibp2p.peerInfo.id.toString()
123+
124+
const dialAddr = relayAddr
125+
.encapsulate(`/p2p/${relayIdString}`)
126+
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
127+
128+
// Connect to the relay first
129+
await srcLibp2p.dial(relayAddr)
130+
131+
try {
132+
await srcLibp2p.dial(dialAddr)
133+
expect.fail('Dial should have failed')
134+
} catch (err) {
135+
expect(err).to.exist()
136+
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
137+
}
138+
139+
const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
140+
expect(srcToRelayConn).to.exist()
141+
expect(srcToRelayConn.stat.status).to.equal('open')
142+
})
143+
144+
it('destination peer should stay connected to an already connected relay on hop failure', async () => {
145+
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
146+
const relayIdString = relayLibp2p.peerInfo.id.toString()
147+
148+
const dialAddr = relayAddr
149+
.encapsulate(`/p2p/${relayIdString}`)
150+
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`)
151+
152+
// Connect the destination peer and the relay
153+
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
154+
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
155+
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
156+
157+
// Tamper with the our multiaddrs for the circuit message
158+
sinon.stub(srcLibp2p.peerInfo.multiaddrs, 'toArray').returns([{
159+
buffer: Buffer.from('an invalid multiaddr')
160+
}])
161+
162+
try {
163+
await srcLibp2p.dial(dialAddr)
164+
expect.fail('Dial should have failed')
165+
} catch (err) {
166+
expect(err).to.exist()
167+
expect(err).to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED)
168+
}
169+
170+
const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo)
171+
expect(dstToRelayConn).to.exist()
172+
expect(dstToRelayConn.stat.status).to.equal('open')
173+
})
85174
})

0 commit comments

Comments
 (0)