Skip to content

Commit 850b5c9

Browse files
committed
fix: ensure identify streams are closed
1 parent 5608178 commit 850b5c9

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

src/identify/index.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const debug = require('debug')
44
const pb = require('it-protocol-buffers')
55
const lp = require('it-length-prefixed')
66
const pipe = require('it-pipe')
7-
const { collect, take } = require('streaming-iterables')
7+
const { collect, take, consume } = require('streaming-iterables')
88

99
const PeerInfo = require('peer-info')
1010
const PeerId = require('peer-id')
@@ -114,7 +114,8 @@ class IdentifyService {
114114
protocols: Array.from(this._protocols.keys())
115115
}],
116116
pb.encode(Message),
117-
stream
117+
stream,
118+
consume
118119
)
119120
} catch (err) {
120121
// Just log errors
@@ -159,6 +160,8 @@ class IdentifyService {
159160
toBuffer,
160161
collect
161162
)
163+
// close the stream, no need to wait
164+
stream.sink([])
162165

163166
if (!data) {
164167
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
@@ -242,7 +245,8 @@ class IdentifyService {
242245
pipe(
243246
[message],
244247
lp.encode(),
245-
stream
248+
stream,
249+
consume
246250
)
247251
}
248252

@@ -261,6 +265,8 @@ class IdentifyService {
261265
toBuffer,
262266
collect
263267
)
268+
// close the stream, but no need to wait
269+
stream.sink([])
264270

265271
let message
266272
try {

test/identify/index.spec.js

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const PeerId = require('peer-id')
1212
const PeerInfo = require('peer-info')
1313
const duplexPair = require('it-pair/duplex')
1414
const multiaddr = require('multiaddr')
15+
const pWaitFor = require('p-wait-for')
1516

1617
const { codes: Errors } = require('../../src/errors')
1718
const { IdentifyService, multicodecs } = require('../../src/identify')
@@ -203,16 +204,17 @@ describe('Identify', () => {
203204
})
204205

205206
sinon.spy(libp2p.identifyService, 'identify')
206-
sinon.spy(libp2p.peerStore, 'replace')
207+
const peerStoreSpy = sinon.spy(libp2p.peerStore, 'replace')
207208

208209
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
209210
expect(connection).to.exist()
210-
// Wait for nextTick to trigger the identify call
211-
await delay(1)
211+
212+
// Wait for peer store to be updated
213+
await pWaitFor(() => peerStoreSpy.callCount === 1)
212214
expect(libp2p.identifyService.identify.callCount).to.equal(1)
213-
await libp2p.identifyService.identify.firstCall.returnValue
214215

215-
expect(libp2p.peerStore.replace.callCount).to.equal(1)
216+
// The connection should have no open streams
217+
expect(connection.streams).to.have.length(0)
216218
await connection.close()
217219
})
218220

@@ -247,6 +249,9 @@ describe('Identify', () => {
247249
const results = await call.returnValue
248250
expect(results.length).to.equal(1)
249251
}
252+
253+
// The connection should have no open streams
254+
expect(connection.streams).to.have.length(0)
250255
})
251256
})
252257
})

0 commit comments

Comments
 (0)