Skip to content

Commit 32f68de

Browse files
authored
fix: use abstract stream class from muxer interface module (libp2p#165)
Refactors the stream class in this module to use the abstract superclass from `@libp2p/interface-stream-muxer` as it handles all the various scenarios for closing streams which this module does not. Fixes libp2p#164
1 parent 0e0cc8c commit 32f68de

File tree

8 files changed

+522
-552
lines changed

8 files changed

+522
-552
lines changed

.aegir.js

+51
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,59 @@
1+
import { createLibp2p } from 'libp2p'
2+
import { circuitRelayServer } from 'libp2p/circuit-relay'
3+
import { identifyService } from 'libp2p/identify'
4+
import { webSockets } from '@libp2p/websockets'
5+
import { noise } from '@chainsafe/libp2p-noise'
6+
import { yamux } from '@chainsafe/libp2p-yamux'
7+
18
export default {
29
build: {
310
config: {
411
platform: 'node'
512
},
613
bundlesizeMax: '117KB'
14+
},
15+
test: {
16+
before: async () => {
17+
// start a relay node for use in the tests
18+
const relay = await createLibp2p({
19+
addresses: {
20+
listen: [
21+
'/ip4/127.0.0.1/tcp/0/ws'
22+
]
23+
},
24+
transports: [
25+
webSockets()
26+
],
27+
connectionEncryption: [
28+
noise()
29+
],
30+
streamMuxers: [
31+
yamux()
32+
],
33+
services: {
34+
relay: circuitRelayServer({
35+
reservations: {
36+
maxReservations: Infinity
37+
}
38+
}),
39+
identify: identifyService()
40+
},
41+
connectionManager: {
42+
minConnections: 0
43+
}
44+
})
45+
46+
const multiaddrs = relay.getMultiaddrs().map(ma => ma.toString())
47+
48+
return {
49+
relay,
50+
env: {
51+
RELAY_MULTIADDR: multiaddrs[0]
52+
}
53+
}
54+
},
55+
after: async (_, before) => {
56+
await before.relay.stop()
57+
}
758
}
859
}

package.json

+10-3
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@
140140
"@libp2p/interface-metrics": "^4.0.8",
141141
"@libp2p/interface-peer-id": "^2.0.2",
142142
"@libp2p/interface-registrar": "^2.0.12",
143-
"@libp2p/interface-stream-muxer": "^4.0.1",
143+
"@libp2p/interface-stream-muxer": "^4.1.2",
144144
"@libp2p/interface-transport": "^4.0.3",
145145
"@libp2p/interfaces": "^3.3.2",
146146
"@libp2p/logger": "^2.0.7",
@@ -150,7 +150,6 @@
150150
"abortable-iterator": "^5.0.1",
151151
"detect-browser": "^5.3.0",
152152
"it-length-prefixed": "^9.0.1",
153-
"it-merge": "^3.0.0",
154153
"it-pb-stream": "^4.0.1",
155154
"it-pipe": "^3.0.1",
156155
"it-pushable": "^3.1.3",
@@ -164,11 +163,19 @@
164163
"uint8arrays": "^4.0.3"
165164
},
166165
"devDependencies": {
166+
"@chainsafe/libp2p-yamux": "^4.0.1",
167+
"@libp2p/interface-libp2p": "^3.1.0",
167168
"@libp2p/interface-mocks": "^12.0.1",
168169
"@libp2p/peer-id-factory": "^2.0.3",
170+
"@libp2p/websockets": "^6.0.1",
169171
"@types/sinon": "^10.0.14",
170-
"aegir": "^39.0.6",
172+
"aegir": "^39.0.7",
173+
"delay": "^5.0.0",
174+
"it-all": "^3.0.2",
175+
"it-length": "^3.0.2",
176+
"it-map": "^3.0.3",
171177
"it-pair": "^2.0.6",
178+
"libp2p": "^0.45.0",
172179
"protons": "^7.0.2",
173180
"sinon": "^15.0.4",
174181
"sinon-ts": "^1.0.0"

src/muxer.ts

+69-66
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
import { type DataChannelOpts, WebRTCStream } from './stream.js'
1+
import { createStream } from './stream.js'
22
import { nopSink, nopSource } from './util.js'
3+
import type { DataChannelOpts } from './stream.js'
34
import type { Stream } from '@libp2p/interface-connection'
45
import type { CounterGroup } from '@libp2p/interface-metrics'
56
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
67
import type { Source, Sink } from 'it-stream-types'
78
import type { Uint8ArrayList } from 'uint8arraylist'
89

10+
const PROTOCOL = '/webrtc'
11+
912
export interface DataChannelMuxerFactoryInit {
1013
/**
1114
* WebRTC Peer Connection
@@ -21,50 +24,73 @@ export interface DataChannelMuxerFactoryInit {
2124
* Data channel options
2225
*/
2326
dataChannelOptions?: Partial<DataChannelOpts>
27+
28+
/**
29+
* The protocol to use
30+
*/
31+
protocol?: string
2432
}
2533

2634
export class DataChannelMuxerFactory implements StreamMuxerFactory {
35+
public readonly protocol: string
36+
2737
/**
2838
* WebRTC Peer Connection
2939
*/
30-
private streamBuffer: WebRTCStream[] = []
40+
private readonly peerConnection: RTCPeerConnection
41+
private streamBuffer: Stream[] = []
42+
private readonly metrics?: CounterGroup
43+
private readonly dataChannelOptions?: Partial<DataChannelOpts>
44+
45+
constructor (init: DataChannelMuxerFactoryInit) {
46+
this.peerConnection = init.peerConnection
47+
this.metrics = init.metrics
48+
this.protocol = init.protocol ?? PROTOCOL
49+
this.dataChannelOptions = init.dataChannelOptions
3150

32-
constructor (readonly init: DataChannelMuxerFactoryInit, readonly protocol = '/webrtc') {
3351
// store any datachannels opened before upgrade has been completed
34-
this.init.peerConnection.ondatachannel = ({ channel }) => {
35-
const stream = new WebRTCStream({
52+
this.peerConnection.ondatachannel = ({ channel }) => {
53+
const stream = createStream({
3654
channel,
37-
stat: {
38-
direction: 'inbound',
39-
timeline: { open: 0 }
40-
},
55+
direction: 'inbound',
4156
dataChannelOptions: init.dataChannelOptions,
42-
closeCb: (_stream) => {
43-
this.streamBuffer = this.streamBuffer.filter(s => !_stream.eq(s))
57+
onEnd: () => {
58+
this.streamBuffer = this.streamBuffer.filter(s => s.id !== stream.id)
4459
}
4560
})
4661
this.streamBuffer.push(stream)
4762
}
4863
}
4964

50-
createStreamMuxer (init?: StreamMuxerInit | undefined): StreamMuxer {
51-
return new DataChannelMuxer(this.init, this.streamBuffer, this.protocol, init)
65+
createStreamMuxer (init?: StreamMuxerInit): StreamMuxer {
66+
return new DataChannelMuxer({
67+
...init,
68+
peerConnection: this.peerConnection,
69+
dataChannelOptions: this.dataChannelOptions,
70+
metrics: this.metrics,
71+
streams: this.streamBuffer,
72+
protocol: this.protocol
73+
})
5274
}
5375
}
5476

77+
export interface DataChannelMuxerInit extends DataChannelMuxerFactoryInit, StreamMuxerInit {
78+
streams: Stream[]
79+
}
80+
5581
/**
5682
* A libp2p data channel stream muxer
5783
*/
5884
export class DataChannelMuxer implements StreamMuxer {
5985
/**
6086
* Array of streams in the data channel
6187
*/
62-
streams: Stream[] = []
88+
public streams: Stream[]
89+
public protocol: string
6390

64-
/**
65-
* Initialized stream muxer
66-
*/
67-
init?: StreamMuxerInit
91+
private readonly peerConnection: RTCPeerConnection
92+
private readonly dataChannelOptions?: DataChannelOpts
93+
private readonly metrics?: CounterGroup
6894

6995
/**
7096
* Close or abort all tracked streams and stop the muxer
@@ -81,45 +107,37 @@ export class DataChannelMuxer implements StreamMuxer {
81107
*/
82108
sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink
83109

84-
constructor (readonly dataChannelMuxer: DataChannelMuxerFactoryInit, streams: Stream[], readonly protocol: string = '/webrtc', init?: StreamMuxerInit) {
85-
/**
86-
* Initialized stream muxer
87-
*/
88-
this.init = init
110+
constructor (readonly init: DataChannelMuxerInit) {
111+
this.streams = init.streams
112+
this.peerConnection = init.peerConnection
113+
this.protocol = init.protocol ?? PROTOCOL
114+
this.metrics = init.metrics
89115

90116
/**
91117
* Fired when a data channel has been added to the connection has been
92118
* added by the remote peer.
93119
*
94120
* {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/datachannel_event}
95121
*/
96-
this.dataChannelMuxer.peerConnection.ondatachannel = ({ channel }) => {
97-
const stream = new WebRTCStream({
122+
this.peerConnection.ondatachannel = ({ channel }) => {
123+
const stream = createStream({
98124
channel,
99-
stat: {
100-
direction: 'inbound',
101-
timeline: {
102-
open: 0
103-
}
104-
},
105-
dataChannelOptions: dataChannelMuxer.dataChannelOptions,
106-
closeCb: this.wrapStreamEnd(init?.onIncomingStream)
125+
direction: 'inbound',
126+
dataChannelOptions: this.dataChannelOptions,
127+
onEnd: () => {
128+
this.streams = this.streams.filter(s => s.id !== stream.id)
129+
this.metrics?.increment({ stream_end: true })
130+
init?.onStreamEnd?.(stream)
131+
}
107132
})
108133

109134
this.streams.push(stream)
110135
if ((init?.onIncomingStream) != null) {
111-
this.dataChannelMuxer.metrics?.increment({ incoming_stream: true })
136+
this.metrics?.increment({ incoming_stream: true })
112137
init.onIncomingStream(stream)
113138
}
114139
}
115140

116-
// wrap open streams with the onStreamEnd callback
117-
this.streams = streams
118-
.filter(stream => stream.stat.timeline.close == null)
119-
.map(stream => {
120-
(stream as WebRTCStream).closeCb = this.wrapStreamEnd(init?.onStreamEnd)
121-
return stream
122-
})
123141
const onIncomingStream = init?.onIncomingStream
124142
if (onIncomingStream != null) {
125143
this.streams.forEach(s => { onIncomingStream(s) })
@@ -128,35 +146,20 @@ export class DataChannelMuxer implements StreamMuxer {
128146

129147
newStream (): Stream {
130148
// The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label
131-
const channel = this.dataChannelMuxer.peerConnection.createDataChannel('')
132-
const closeCb = (stream: Stream): void => {
133-
this.dataChannelMuxer.metrics?.increment({ stream_end: true })
134-
this.init?.onStreamEnd?.(stream)
135-
}
136-
const stream = new WebRTCStream({
149+
const channel = this.peerConnection.createDataChannel('')
150+
const stream = createStream({
137151
channel,
138-
stat: {
139-
direction: 'outbound',
140-
timeline: {
141-
open: 0
142-
}
143-
},
144-
dataChannelOptions: this.dataChannelMuxer.dataChannelOptions,
145-
closeCb: this.wrapStreamEnd(closeCb)
152+
direction: 'outbound',
153+
dataChannelOptions: this.dataChannelOptions,
154+
onEnd: () => {
155+
this.streams = this.streams.filter(s => s.id !== stream.id)
156+
this.metrics?.increment({ stream_end: true })
157+
this.init?.onStreamEnd?.(stream)
158+
}
146159
})
147160
this.streams.push(stream)
148-
this.dataChannelMuxer.metrics?.increment({ outgoing_stream: true })
161+
this.metrics?.increment({ outgoing_stream: true })
149162

150163
return stream
151164
}
152-
153-
private wrapStreamEnd (onStreamEnd?: (s: Stream) => void): (stream: Stream) => void {
154-
const self = this
155-
return (_stream) => {
156-
self.streams = self.streams.filter(s => !(_stream instanceof WebRTCStream && (_stream).eq(s)))
157-
if (onStreamEnd != null) {
158-
onStreamEnd(_stream)
159-
}
160-
}
161-
}
162165
}

src/private-to-public/transport.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'
99
import { dataChannelError, inappropriateMultiaddr, unimplemented, invalidArgument } from '../error.js'
1010
import { WebRTCMultiaddrConnection } from '../maconn.js'
1111
import { DataChannelMuxerFactory } from '../muxer.js'
12-
import { type DataChannelOpts, WebRTCStream } from '../stream.js'
12+
import { createStream } from '../stream.js'
1313
import { isFirefox } from '../util.js'
1414
import * as sdp from './sdp.js'
1515
import { genUfrag } from './util.js'
1616
import type { WebRTCDialOptions } from './options.js'
17+
import type { DataChannelOpts } from '../stream.js'
1718
import type { Connection } from '@libp2p/interface-connection'
1819
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics'
1920
import type { PeerId } from '@libp2p/interface-peer-id'
@@ -190,7 +191,7 @@ export class WebRTCDirectTransport implements Transport {
190191
// we pass in undefined for these parameters.
191192
const noise = Noise({ prologueBytes: fingerprintsPrologue })()
192193

193-
const wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'inbound', timeline: { open: 1 } }, dataChannelOptions: this.init.dataChannel })
194+
const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel })
194195
const wrappedDuplex = {
195196
...wrappedChannel,
196197
sink: wrappedChannel.sink.bind(wrappedChannel),

0 commit comments

Comments
 (0)