Skip to content

refactor: remove abortable-iterator usage #1785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
"@multiformats/mafmt": "^12.1.2",
"@multiformats/multiaddr": "^12.1.5",
"@multiformats/multiaddr-matcher": "^1.0.0",
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"datastore-core": "^9.0.1",
"delay": "^6.0.0",
Expand Down
27 changes: 20 additions & 7 deletions packages/libp2p/src/autonat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
*/

import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { abortableDuplex } from 'abortable-iterator'
import first from 'it-first'
import * as lp from 'it-length-prefixed'
import map from 'it-map'
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import isPrivateIp from 'private-ip'
import { codes } from '../errors.js'
import {
MAX_INBOUND_STREAMS,
MAX_OUTBOUND_STREAMS,
Expand Down Expand Up @@ -155,6 +156,12 @@ class DefaultAutoNATService implements Startable {
async handleIncomingAutonatStream (data: IncomingStreamData): Promise<void> {
const signal = AbortSignal.timeout(this.timeout)

const onAbort = (): void => {
data.stream.abort(new CodeError('handleIncomingAutonatStream timeout', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
Expand All @@ -166,11 +173,10 @@ class DefaultAutoNATService implements Startable {
.map(ma => ma.toOptions().host)

try {
const source = abortableDuplex(data.stream, signal)
const self = this

await pipe(
source,
data.stream,
(source) => lp.decode(source),
async function * (stream) {
const buf = await first(stream)
Expand Down Expand Up @@ -379,12 +385,12 @@ class DefaultAutoNATService implements Startable {
})
},
(source) => lp.encode(source),
// pipe to the stream, not the abortable source other wise we
// can't tell the remote when a dial timed out..
data.stream
)
} catch (err) {
log.error('error handling incoming autonat stream', err)
} finally {
signal.removeEventListener('abort', onAbort)
}
}

Expand Down Expand Up @@ -453,6 +459,8 @@ class DefaultAutoNATService implements Startable {
const networkSegments: string[] = []

const verifyAddress = async (peer: PeerInfo): Promise<Message.DialResponse | undefined> => {
let onAbort = (): void => {}

try {
log('asking %p to verify multiaddr', peer.id)

Expand All @@ -463,12 +471,15 @@ class DefaultAutoNATService implements Startable {
const stream = await connection.newStream(this.protocol, {
signal
})
const source = abortableDuplex(stream, signal)

onAbort = () => { stream.abort(new CodeError('verifyAddress timeout', codes.ERR_TIMEOUT)) }

signal.addEventListener('abort', onAbort, { once: true })

const buf = await pipe(
[request],
(source) => lp.encode(source),
source,
stream,
(source) => lp.decode(source),
async (stream) => first(stream)
)
Expand Down Expand Up @@ -510,6 +521,8 @@ class DefaultAutoNATService implements Startable {
return response.dialResponse
} catch (err) {
log.error('error asking remote to verify multiaddr', err)
} finally {
signal.removeEventListener('abort', onAbort)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
options.signal?.throwIfAborted()

log('requesting reservation from %p', connection.remotePeer)
const stream = await connection.newStream(RELAY_V2_HOP_CODEC)
const stream = await connection.newStream(RELAY_V2_HOP_CODEC, options)
const pbstr = pbStream(stream)
const hopstr = pbstr.pb(HopMessage)
await hopstr.write({ type: HopMessage.Type.RESERVE }, options)
Expand Down
25 changes: 18 additions & 7 deletions packages/libp2p/src/circuit-relay/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { anySignal } from 'any-signal'
import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
import { codes } from '../errors.js'
import { DEFAULT_DATA_LIMIT, DEFAULT_DURATION_LIMIT } from './constants.js'
import type { Limit } from './pb/index.js'
import type { Stream } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -58,9 +59,13 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
}

queueMicrotask(() => {
void dst.sink(countStreamBytes(abortableSource(src.source, signal, {
abortMessage: 'duration limit exceeded'
}), dataLimit))
const onAbort = (): void => {
dst.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

void dst.sink(countStreamBytes(src.source, dataLimit))
.catch(err => {
log.error('error while relaying streams src -> dst', err)
abortStreams(err)
Expand All @@ -69,16 +74,21 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
srcDstFinished = true

if (dstSrcFinished) {
signal.removeEventListener('abort', onAbort)
signal.clear()
clearTimeout(timeout)
}
})
})

queueMicrotask(() => {
void src.sink(countStreamBytes(abortableSource(dst.source, signal, {
abortMessage: 'duration limit exceeded'
}), dataLimit))
const onAbort = (): void => {
src.abort(new CodeError('duration limit exceeded', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

void src.sink(countStreamBytes(dst.source, dataLimit))
.catch(err => {
log.error('error while relaying streams dst -> src', err)
abortStreams(err)
Expand All @@ -87,6 +97,7 @@ const doRelay = (src: Stream, dst: Stream, abortSignal: AbortSignal, limit: Requ
dstSrcFinished = true

if (srcDstFinished) {
signal.removeEventListener('abort', onAbort)
signal.clear()
clearTimeout(timeout)
}
Expand Down
11 changes: 8 additions & 3 deletions packages/libp2p/src/fetch/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableDuplex } from 'abortable-iterator'
import first from 'it-first'
import * as lp from 'it-length-prefixed'
import { pipe } from 'it-pipe'
Expand Down Expand Up @@ -140,6 +139,7 @@ class DefaultFetchService implements Startable, FetchService {
const connection = await this.components.connectionManager.openConnection(peer, options)
let signal = options.signal
let stream: Stream | undefined
let onAbort = (): void => {}

// create a timeout if no abort signal passed
if (signal == null) {
Expand All @@ -157,15 +157,19 @@ class DefaultFetchService implements Startable, FetchService {
signal
})

onAbort = () => {
stream?.abort(new CodeError('fetch timeout', codes.ERR_TIMEOUT))
}

// make stream abortable
const source = abortableDuplex(stream, signal)
signal.addEventListener('abort', onAbort, { once: true })

log('fetch %s', key)

const result = await pipe(
[FetchRequest.encode({ identifier: key })],
(source) => lp.encode(source),
source,
stream,
(source) => lp.decode(source),
async function (source) {
const buf = await first(source)
Expand Down Expand Up @@ -200,6 +204,7 @@ class DefaultFetchService implements Startable, FetchService {

return result ?? null
} finally {
signal.removeEventListener('abort', onAbort)
if (stream != null) {
await stream.close()
}
Expand Down
11 changes: 8 additions & 3 deletions packages/libp2p/src/ping/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { randomBytes } from '@libp2p/crypto'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import { abortableDuplex } from 'abortable-iterator'
import first from 'it-first'
import { pipe } from 'it-pipe'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
Expand Down Expand Up @@ -108,6 +107,7 @@ class DefaultPingService implements Startable, PingService {
const data = randomBytes(PING_LENGTH)
const connection = await this.components.connectionManager.openConnection(peer, options)
let stream: Stream | undefined
let onAbort = (): void => {}

options.signal = options.signal ?? AbortSignal.timeout(this.timeout)

Expand All @@ -117,12 +117,16 @@ class DefaultPingService implements Startable, PingService {
runOnTransientConnection: this.runOnTransientConnection
})

onAbort = () => {
stream?.abort(new CodeError('ping timeout', codes.ERR_TIMEOUT))
}

// make stream abortable
const source = abortableDuplex(stream, options.signal)
options.signal.addEventListener('abort', onAbort, { once: true })

const result = await pipe(
[data],
source,
stream,
async (source) => first(source)
)

Expand All @@ -146,6 +150,7 @@ class DefaultPingService implements Startable, PingService {

throw err
} finally {
options.signal.removeEventListener('abort', onAbort)
if (stream != null) {
await stream.close()
}
Expand Down
13 changes: 8 additions & 5 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { abortableDuplex } from 'abortable-iterator'
import { createConnection } from './connection/index.js'
import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { codes } from './errors.js'
Expand Down Expand Up @@ -165,16 +164,18 @@ export class DefaultUpgrader implements Upgrader {

const signal = AbortSignal.timeout(this.inboundUpgradeTimeout)

const onAbort = (): void => {
maConn.abort(new CodeError('inbound upgrade timeout', codes.ERR_TIMEOUT))
}

signal.addEventListener('abort', onAbort, { once: true })

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch { }

try {
const abortableStream = abortableDuplex(maConn, signal)
maConn.source = abortableStream.source
maConn.sink = abortableStream.sink

if ((await this.components.connectionGater.denyInboundConnection?.(maConn)) === true) {
throw new CodeError('The multiaddr connection is blocked by gater.acceptConnection', codes.ERR_CONNECTION_INTERCEPTED)
}
Expand Down Expand Up @@ -255,6 +256,8 @@ export class DefaultUpgrader implements Upgrader {
transient: opts?.transient
})
} finally {
signal.removeEventListener('abort', onAbort)

this.components.connectionManager.afterUpgradeInbound()
}
}
Expand Down
3 changes: 3 additions & 0 deletions packages/libp2p/test/autonat/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ describe('autonat', () => {
}

sink.end()
},
abort: (err) => {
void stream.source.throw(err)
}
}
const connection = {
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/circuit-relay/utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ describe('circuit-relay utils', () => {

createLimitedRelay(localStream, remoteStream, controller.signal, limit)

expect(await toBuffer(received)).to.have.property('byteLength', 8)
expect(await toBuffer(received)).to.have.property('byteLength', 12)
expect(localStreamAbortSpy).to.have.property('called', true)
expect(remoteStreamAbortSpy).to.have.property('called', true)
})
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/fetch/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ describe('fetch', () => {
await expect(localFetch.fetch(remoteComponents.peerId, key, {
signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
.to.eventually.be.rejected.with.property('code', 'ERR_TIMEOUT')

// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/test/ping/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ describe('ping', () => {
await expect(localPing.ping(remoteComponents.peerId, {
signal
}))
.to.eventually.be.rejected.with.property('code', 'ABORT_ERR')
.to.eventually.be.rejected.with.property('code', 'ERR_TIMEOUT')

// should have closed stream
expect(newStreamSpy).to.have.property('callCount', 1)
Expand Down
38 changes: 38 additions & 0 deletions packages/libp2p/test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,44 @@ describe('Upgrader', () => {
})
})

it('should clear timeout if upgrade is successful', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })

localUpgrader = new DefaultUpgrader(localComponents, {
connectionEncryption: [
plaintext()()
],
muxers: [
yamux()()
],
inboundUpgradeTimeout: 1000
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
plaintext()()
],
muxers: [
yamux()()
],
inboundUpgradeTimeout: 1000
})

const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])

await delay(2000)

expect(connections).to.have.length(2)

connections.forEach(conn => {
conn.close().catch(() => {
throw new Error('Failed to close connection')
})
})
})

it('should fail if muxers do not match', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })

Expand Down
2 changes: 1 addition & 1 deletion packages/protocol-perf/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class DefaultPerfService implements Startable, PerfService {

const writeBlockSize = this.writeBlockSize

const stream = await connection.newStream([this.protocol])
const stream = await connection.newStream([this.protocol], options)

// Convert sendBytes to uint64 big endian buffer
const view = new DataView(this.databuf)
Expand Down