Skip to content

Commit d729d66

Browse files
authored
fix: dial relay when we are dialed via it but have no reservation (#2252)
Fixes an edge case where we receive an incoming connection attempt from a peer via a relay that we did not know we had a reservation on. This could happen if we restart or otherwise lose context. When this happens, attempt to listen on a p2p-circuit version of the incoming connection remote addr - this will cause us to refresh our reservation on the remote relay. Fixes recent interop test instability where the test configures a relay reservation on behalf of a node which then has no knowledge of it's reservations so tells identify peers that it has no listen addresses.
1 parent bb6ceb1 commit d729d66

File tree

8 files changed

+460
-365
lines changed

8 files changed

+460
-365
lines changed

packages/transport-circuit-relay-v2/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
"p-defer": "^4.0.0",
6464
"p-retry": "^6.1.0",
6565
"protons-runtime": "^5.0.0",
66+
"race-signal": "^1.0.2",
6667
"uint8arraylist": "^2.4.3",
6768
"uint8arrays": "^4.0.6"
6869
},

packages/transport-circuit-relay-v2/src/transport/index.ts

Lines changed: 6 additions & 335 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,16 @@
1-
import { CodeError } from '@libp2p/interface/errors'
2-
import { symbol, type Transport, type CreateListenerOptions, type Listener, type Upgrader } from '@libp2p/interface/transport'
3-
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
4-
import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn'
5-
import * as mafmt from '@multiformats/mafmt'
6-
import { multiaddr } from '@multiformats/multiaddr'
7-
import { pbStream } from 'it-protobuf-stream'
8-
import { CIRCUIT_PROTO_CODE, ERR_HOP_REQUEST_FAILED, ERR_RELAYED_DIAL, MAX_CONNECTIONS, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../constants.js'
9-
import { StopMessage, HopMessage, Status } from '../pb/index.js'
10-
import { RelayDiscovery, type RelayDiscoveryComponents } from './discovery.js'
11-
import { createListener } from './listener.js'
12-
import { type RelayStoreInit, ReservationStore } from './reservation-store.js'
13-
import type { Libp2pEvents, AbortOptions, ComponentLogger, Logger } from '@libp2p/interface'
14-
import type { Connection, Stream } from '@libp2p/interface/connection'
1+
import { type Transport, type Upgrader } from '@libp2p/interface/transport'
2+
import { type RelayDiscoveryComponents } from './discovery.js'
3+
import { type RelayStoreInit } from './reservation-store.js'
4+
import { CircuitRelayTransport } from './transport.js'
5+
import type { Libp2pEvents, ComponentLogger } from '@libp2p/interface'
156
import type { ConnectionGater } from '@libp2p/interface/connection-gater'
167
import type { ContentRouting } from '@libp2p/interface/content-routing'
178
import type { TypedEventTarget } from '@libp2p/interface/events'
189
import type { PeerId } from '@libp2p/interface/peer-id'
1910
import type { PeerStore } from '@libp2p/interface/peer-store'
2011
import type { AddressManager } from '@libp2p/interface-internal/address-manager'
2112
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
22-
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
23-
import type { Multiaddr } from '@multiformats/multiaddr'
24-
25-
const isValidStop = (request: StopMessage): request is Required<StopMessage> => {
26-
if (request.peer == null) {
27-
return false
28-
}
29-
30-
try {
31-
request.peer.addrs.forEach(multiaddr)
32-
} catch {
33-
return false
34-
}
35-
36-
return true
37-
}
13+
import type { Registrar } from '@libp2p/interface-internal/registrar'
3814

3915
export interface CircuitRelayTransportComponents extends RelayDiscoveryComponents {
4016
peerId: PeerId
@@ -49,16 +25,6 @@ export interface CircuitRelayTransportComponents extends RelayDiscoveryComponent
4925
logger: ComponentLogger
5026
}
5127

52-
interface ConnectOptions {
53-
stream: Stream
54-
connection: Connection
55-
destinationPeer: PeerId
56-
destinationAddr: Multiaddr
57-
relayAddr: Multiaddr
58-
ma: Multiaddr
59-
disconnectOnFailure: boolean
60-
}
61-
6228
/**
6329
* RelayConfig configures the circuit v2 relay transport.
6430
*/
@@ -96,301 +62,6 @@ export interface CircuitRelayTransportInit extends RelayStoreInit {
9662
reservationCompletionTimeout?: number
9763
}
9864

99-
const defaults = {
100-
maxInboundStopStreams: MAX_CONNECTIONS,
101-
maxOutboundStopStreams: MAX_CONNECTIONS,
102-
stopTimeout: 30000
103-
}
104-
105-
class CircuitRelayTransport implements Transport {
106-
private readonly discovery?: RelayDiscovery
107-
private readonly registrar: Registrar
108-
private readonly peerStore: PeerStore
109-
private readonly connectionManager: ConnectionManager
110-
private readonly peerId: PeerId
111-
private readonly upgrader: Upgrader
112-
private readonly addressManager: AddressManager
113-
private readonly connectionGater: ConnectionGater
114-
private readonly reservationStore: ReservationStore
115-
private readonly logger: ComponentLogger
116-
private readonly maxInboundStopStreams: number
117-
private readonly maxOutboundStopStreams?: number
118-
private readonly stopTimeout: number
119-
private started: boolean
120-
private readonly log: Logger
121-
122-
constructor (components: CircuitRelayTransportComponents, init: CircuitRelayTransportInit) {
123-
this.log = components.logger.forComponent('libp2p:circuit-relay:transport')
124-
this.registrar = components.registrar
125-
this.peerStore = components.peerStore
126-
this.connectionManager = components.connectionManager
127-
this.logger = components.logger
128-
this.peerId = components.peerId
129-
this.upgrader = components.upgrader
130-
this.addressManager = components.addressManager
131-
this.connectionGater = components.connectionGater
132-
this.maxInboundStopStreams = init.maxInboundStopStreams ?? defaults.maxInboundStopStreams
133-
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams
134-
this.stopTimeout = init.stopTimeout ?? defaults.stopTimeout
135-
136-
if (init.discoverRelays != null && init.discoverRelays > 0) {
137-
this.discovery = new RelayDiscovery(components)
138-
this.discovery.addEventListener('relay:discover', (evt) => {
139-
this.reservationStore.addRelay(evt.detail, 'discovered')
140-
.catch(err => {
141-
this.log.error('could not add discovered relay %p', evt.detail, err)
142-
})
143-
})
144-
}
145-
146-
this.reservationStore = new ReservationStore(components, init)
147-
this.reservationStore.addEventListener('relay:not-enough-relays', () => {
148-
this.discovery?.discover()
149-
.catch(err => {
150-
this.log.error('could not discover relays', err)
151-
})
152-
})
153-
154-
this.started = false
155-
}
156-
157-
isStarted (): boolean {
158-
return this.started
159-
}
160-
161-
async start (): Promise<void> {
162-
await this.reservationStore.start()
163-
await this.discovery?.start()
164-
165-
await this.registrar.handle(RELAY_V2_STOP_CODEC, (data) => {
166-
void this.onStop(data).catch(err => {
167-
this.log.error('error while handling STOP protocol', err)
168-
data.stream.abort(err)
169-
})
170-
}, {
171-
maxInboundStreams: this.maxInboundStopStreams,
172-
maxOutboundStreams: this.maxOutboundStopStreams,
173-
runOnTransientConnection: true
174-
})
175-
176-
this.started = true
177-
}
178-
179-
async stop (): Promise<void> {
180-
this.discovery?.stop()
181-
await this.reservationStore.stop()
182-
await this.registrar.unhandle(RELAY_V2_STOP_CODEC)
183-
184-
this.started = false
185-
}
186-
187-
readonly [symbol] = true
188-
189-
readonly [Symbol.toStringTag] = 'libp2p/circuit-relay-v2'
190-
191-
/**
192-
* Dial a peer over a relay
193-
*/
194-
async dial (ma: Multiaddr, options: AbortOptions = {}): Promise<Connection> {
195-
if (ma.protoCodes().filter(code => code === CIRCUIT_PROTO_CODE).length !== 1) {
196-
const errMsg = 'Invalid circuit relay address'
197-
this.log.error(errMsg, ma)
198-
throw new CodeError(errMsg, ERR_RELAYED_DIAL)
199-
}
200-
201-
// Check the multiaddr to see if it contains a relay and a destination peer
202-
const addrs = ma.toString().split('/p2p-circuit')
203-
const relayAddr = multiaddr(addrs[0])
204-
const destinationAddr = multiaddr(addrs[addrs.length - 1])
205-
const relayId = relayAddr.getPeerId()
206-
const destinationId = destinationAddr.getPeerId()
207-
208-
if (relayId == null || destinationId == null) {
209-
const errMsg = `Circuit relay dial to ${ma.toString()} failed as address did not have peer ids`
210-
this.log.error(errMsg)
211-
throw new CodeError(errMsg, ERR_RELAYED_DIAL)
212-
}
213-
214-
const relayPeer = peerIdFromString(relayId)
215-
const destinationPeer = peerIdFromString(destinationId)
216-
217-
let disconnectOnFailure = false
218-
const relayConnections = this.connectionManager.getConnections(relayPeer)
219-
let relayConnection = relayConnections[0]
220-
221-
if (relayConnection == null) {
222-
await this.peerStore.merge(relayPeer, {
223-
multiaddrs: [relayAddr]
224-
})
225-
relayConnection = await this.connectionManager.openConnection(relayPeer, options)
226-
disconnectOnFailure = true
227-
}
228-
229-
let stream: Stream | undefined
230-
231-
try {
232-
stream = await relayConnection.newStream(RELAY_V2_HOP_CODEC)
233-
234-
return await this.connectV2({
235-
stream,
236-
connection: relayConnection,
237-
destinationPeer,
238-
destinationAddr,
239-
relayAddr,
240-
ma,
241-
disconnectOnFailure
242-
})
243-
} catch (err: any) {
244-
this.log.error('circuit relay dial to destination %p via relay %p failed', destinationPeer, relayPeer, err)
245-
246-
if (stream != null) {
247-
stream.abort(err)
248-
}
249-
disconnectOnFailure && await relayConnection.close()
250-
throw err
251-
}
252-
}
253-
254-
async connectV2 (
255-
{
256-
stream, connection, destinationPeer,
257-
destinationAddr, relayAddr, ma,
258-
disconnectOnFailure
259-
}: ConnectOptions
260-
): Promise<Connection> {
261-
try {
262-
const pbstr = pbStream(stream)
263-
const hopstr = pbstr.pb(HopMessage)
264-
await hopstr.write({
265-
type: HopMessage.Type.CONNECT,
266-
peer: {
267-
id: destinationPeer.toBytes(),
268-
addrs: [multiaddr(destinationAddr).bytes]
269-
}
270-
})
271-
272-
const status = await hopstr.read()
273-
274-
if (status.status !== Status.OK) {
275-
throw new CodeError(`failed to connect via relay with status ${status?.status?.toString() ?? 'undefined'}`, ERR_HOP_REQUEST_FAILED)
276-
}
277-
278-
const maConn = streamToMaConnection({
279-
stream: pbstr.unwrap(),
280-
remoteAddr: ma,
281-
localAddr: relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toString()}`),
282-
logger: this.logger
283-
})
284-
285-
this.log('new outbound transient connection %a', maConn.remoteAddr)
286-
return await this.upgrader.upgradeOutbound(maConn, {
287-
transient: true
288-
})
289-
} catch (err: any) {
290-
this.log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
291-
disconnectOnFailure && await connection.close()
292-
throw err
293-
}
294-
}
295-
296-
/**
297-
* Create a listener
298-
*/
299-
createListener (options: CreateListenerOptions): Listener {
300-
return createListener({
301-
connectionManager: this.connectionManager,
302-
relayStore: this.reservationStore,
303-
logger: this.logger
304-
})
305-
}
306-
307-
/**
308-
* Filter check for all Multiaddrs that this transport can dial on
309-
*
310-
* @param {Multiaddr[]} multiaddrs
311-
* @returns {Multiaddr[]}
312-
*/
313-
filter (multiaddrs: Multiaddr[]): Multiaddr[] {
314-
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
315-
316-
return multiaddrs.filter((ma) => {
317-
return mafmt.Circuit.matches(ma)
318-
})
319-
}
320-
321-
/**
322-
* An incoming STOP request means a remote peer wants to dial us via a relay
323-
*/
324-
async onStop ({ connection, stream }: IncomingStreamData): Promise<void> {
325-
const signal = AbortSignal.timeout(this.stopTimeout)
326-
const pbstr = pbStream(stream).pb(StopMessage)
327-
const request = await pbstr.read({
328-
signal
329-
})
330-
331-
this.log('new circuit relay v2 stop stream from %p with type %s', connection.remotePeer, request.type)
332-
333-
if (request?.type === undefined) {
334-
this.log.error('type was missing from circuit v2 stop protocol request from %s', connection.remotePeer)
335-
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE }, {
336-
signal
337-
})
338-
await stream.close()
339-
return
340-
}
341-
342-
// Validate the STOP request has the required input
343-
if (request.type !== StopMessage.Type.CONNECT) {
344-
this.log.error('invalid stop connect request via peer %p', connection.remotePeer)
345-
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.UNEXPECTED_MESSAGE }, {
346-
signal
347-
})
348-
await stream.close()
349-
return
350-
}
351-
352-
if (!isValidStop(request)) {
353-
this.log.error('invalid stop connect request via peer %p', connection.remotePeer)
354-
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.MALFORMED_MESSAGE }, {
355-
signal
356-
})
357-
await stream.close()
358-
return
359-
}
360-
361-
const remotePeerId = peerIdFromBytes(request.peer.id)
362-
363-
if ((await this.connectionGater.denyInboundRelayedConnection?.(connection.remotePeer, remotePeerId)) === true) {
364-
this.log.error('connection gater denied inbound relayed connection from %p', connection.remotePeer)
365-
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.PERMISSION_DENIED }, {
366-
signal
367-
})
368-
await stream.close()
369-
return
370-
}
371-
372-
this.log.trace('sending success response to %p', connection.remotePeer)
373-
await pbstr.write({ type: StopMessage.Type.STATUS, status: Status.OK }, {
374-
signal
375-
})
376-
377-
const remoteAddr = connection.remoteAddr.encapsulate(`/p2p-circuit/p2p/${remotePeerId.toString()}`)
378-
const localAddr = this.addressManager.getAddresses()[0]
379-
const maConn = streamToMaConnection({
380-
stream: pbstr.unwrap().unwrap(),
381-
remoteAddr,
382-
localAddr,
383-
logger: this.logger
384-
})
385-
386-
this.log('new inbound transient connection %a', maConn.remoteAddr)
387-
await this.upgrader.upgradeInbound(maConn, {
388-
transient: true
389-
})
390-
this.log('%s connection %a upgraded', 'inbound', maConn.remoteAddr)
391-
}
392-
}
393-
39465
export function circuitRelayTransport (init: CircuitRelayTransportInit = {}): (components: CircuitRelayTransportComponents) => Transport {
39566
return (components) => {
39667
return new CircuitRelayTransport(components, init)

packages/transport-circuit-relay-v2/src/transport/listener.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class CircuitRelayTransportListener extends TypedEventEmitter<ListenerEvents> im
4545
const relayConn = await this.connectionManager.openConnection(relayAddr)
4646

4747
if (!this.relayStore.hasReservation(relayConn.remotePeer)) {
48+
this.log('making reservation on peer %p', relayConn.remotePeer)
4849
// addRelay calls transportManager.listen which calls this listen method
4950
await this.relayStore.addRelay(relayConn.remotePeer, 'configured')
5051
return

0 commit comments

Comments
 (0)