Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

Commit 6f09061

Browse files
committed
chore: address review
1 parent 67a5f51 commit 6f09061

File tree

9 files changed

+53
-150
lines changed

9 files changed

+53
-150
lines changed

package.json

-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
"protons": "^2.0.0",
7676
"sinon": "^9.0.2",
7777
"streaming-iterables": "^5.0.2",
78-
"typescript": "^4.1.2",
7978
"uint8arrays": "^1.1.0"
8079
},
8180
"devDependencies": {

src/connection/connection.js

+7-16
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')
99

1010
/**
1111
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
12+
* @typedef {import('./status').Status} Status
1213
*/
1314

1415
/**
16+
* @typedef {Object} Timeline
17+
* @property {number} open - connection opening timestamp.
18+
* @property {number} [upgraded] - connection upgraded timestamp.
19+
* @property {number} [close]
20+
*
1521
* @typedef {Object} ConectionStat
1622
* @property {string} direction - connection establishment direction ("inbound" or "outbound").
1723
* @property {object} timeline - connection relevant events timestamp.
@@ -75,7 +81,7 @@ class Connection {
7581
/**
7682
* Connection metadata.
7783
*
78-
* @type {Stat & {status: Status}}
84+
* @type {ConectionStat & {status: Status}}
7985
*/
8086
this._stat = {
8187
...stat,
@@ -223,21 +229,6 @@ class Connection {
223229
}
224230
}
225231

226-
/**
227-
* @typedef {Object} Stat
228-
* @property {string} direction - connection establishment direction ("inbound" or "outbound").
229-
* @property {Timeline} timeline - connection relevant events timestamp.
230-
* @property {string} [multiplexer] - connection multiplexing identifier.
231-
* @property {string} [encryption] - connection encryption method identifier.
232-
*
233-
* @typedef {Object} Timeline
234-
* @property {number} open - connection opening timestamp.
235-
* @property {number} [upgraded] - connection upgraded timestamp.
236-
* @property {number} [close]
237-
*
238-
* @typedef {import('./status').Status} Status
239-
*/
240-
241232
module.exports = Connection
242233

243234
function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {

src/crypto/types.ts

+5-21
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,24 @@
1+
import PeerId from 'peer-id'
2+
import { MultiaddrConnection } from '../transport/types'
3+
14
/**
25
* A libp2p crypto module must be compliant to this interface
36
* to ensure all exchanged data between two peers is encrypted.
47
*/
5-
export interface CryptoInterface {
8+
export interface Crypto {
9+
protocol: string;
610
/**
711
* Encrypt outgoing data to the remote party.
8-
*
9-
* @param {PeerId} localPeer - PeerId of the receiving peer
10-
* @param {MultiaddrConnection} connection - streaming iterable duplex that will be encrypted
11-
* @param {PeerId} remotePeer - PeerId of the remote peer. Used to validate the integrity of the remote peer.
12-
* @returns {Promise<SecureOutbound>}
1312
*/
1413
secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise<SecureOutbound>;
15-
1614
/**
1715
* Decrypt incoming data.
18-
*
19-
* @param {PeerId} localPeer - PeerId of the receiving peer.
20-
* @param {MultiaddrConnection} connection - streaming iterable duplex that will be encryption.
21-
* @param {PeerId} remotePeer - optional PeerId of the initiating peer, if known. This may only exist during transport upgrades.
22-
* @returns {Promise<SecureOutbound>}
2316
*/
2417
secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise<SecureOutbound>;
2518
}
2619

27-
export declare class Crypto implements CryptoInterface {
28-
protocol: string;
29-
secureOutbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer: PeerId): Promise<SecureOutbound>;
30-
secureInbound(localPeer: PeerId, connection: MultiaddrConnection, remotePeer?: PeerId): Promise<SecureOutbound>;
31-
}
32-
3320
export type SecureOutbound = {
3421
conn: MultiaddrConnection;
3522
remoteEarlyData: Buffer;
3623
remotePeer: PeerId;
3724
}
38-
39-
type PeerId = import('peer-id');
40-
type MultiaddrConnection = import('../transport/types').MultiaddrConnection

src/pubsub/index.js

+10-10
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class PubsubBaseProtocol extends EventEmitter {
216216
* @private
217217
* @param {Object} props
218218
* @param {string} props.protocol
219-
* @param {DuplexIterableStream} props.stream
219+
* @param {MuxedStream} props.stream
220220
* @param {Connection} props.connection - connection
221221
*/
222222
_onIncomingStream ({ protocol, stream, connection }) {
@@ -225,8 +225,7 @@ class PubsubBaseProtocol extends EventEmitter {
225225
const peer = this._addPeer(peerId, protocol)
226226
peer.attachInboundStream(stream)
227227

228-
// @ts-ignore - peer.inboundStream maybe null
229-
this._processMessages(idB58Str, peer.inboundStream, peer)
228+
peer.inboundStream && this._processMessages(idB58Str, peer.inboundStream, peer)
230229
}
231230

232231
/**
@@ -243,7 +242,6 @@ class PubsubBaseProtocol extends EventEmitter {
243242
try {
244243
const { stream, protocol } = await conn.newStream(this.multicodecs)
245244
const peer = this._addPeer(peerId, protocol)
246-
// @ts-ignore MuxedStream is not DuplexIterableStream
247245
await peer.attachOutboundStream(stream)
248246
} catch (err) {
249247
this.log.err(err)
@@ -333,7 +331,7 @@ class PubsubBaseProtocol extends EventEmitter {
333331
* Responsible for processing each RPC message received by other peers.
334332
*
335333
* @param {string} idB58Str - peer id string in base58
336-
* @param {DuplexIterableStream} stream - inbound stream
334+
* @param {MuxedStream} stream - inbound stream
337335
* @param {PeerStreams} peerStreams - PubSub peer
338336
* @returns {Promise<void>}
339337
*/
@@ -342,8 +340,8 @@ class PubsubBaseProtocol extends EventEmitter {
342340
await pipe(
343341
stream,
344342
async (source) => {
345-
// @ts-ignore - DuplexIterableStream isn't defined as iterable
346343
for await (const data of source) {
344+
// @ts-ignore data slice from BufferList
347345
const rpcBytes = data instanceof Uint8Array ? data : data.slice()
348346
const rpcMsg = this._decodeRpc(rpcBytes)
349347

@@ -732,15 +730,17 @@ class PubsubBaseProtocol extends EventEmitter {
732730

733731
/**
734732
* @typedef {any} Libp2p
735-
* @typedef {import('./peer-streams').DuplexIterableStream} DuplexIterableStream
733+
* @typedef {object} MuxedStream
734+
* @type import('../stream-muxer/types').MuxedStream
736735
* @typedef {import('../connection/connection')} Connection
737736
* @typedef {import('./message').RPC} RPC
738737
* @typedef {import('./message').SubOpts} RPCSubOpts
739738
* @typedef {import('./message').Message} RPCMessage
740739
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
741740
*/
742741

742+
PubsubBaseProtocol.message = message
743+
PubsubBaseProtocol.utils = utils
744+
PubsubBaseProtocol.SignaturePolicy = SignaturePolicy
745+
743746
module.exports = PubsubBaseProtocol
744-
module.exports.message = message
745-
module.exports.utils = utils
746-
module.exports.SignaturePolicy = SignaturePolicy

src/pubsub/message/sign.js

+6-5
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ async function verifySignature (message) {
4040
const baseMessage = { ...message }
4141
delete baseMessage.signature
4242
delete baseMessage.key
43-
// @ts-ignore - from is optional
44-
baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes()
43+
4544
const bytes = uint8ArrayConcat([
4645
SignPrefix,
47-
Message.encode(baseMessage)
46+
Message.encode(Object.assign(baseMessage, {
47+
from: baseMessage.from && PeerId.createFromCID(baseMessage.from).toBytes()
48+
}))
4849
])
4950

5051
// Get the public key
@@ -64,14 +65,14 @@ async function verifySignature (message) {
6465
*/
6566
async function messagePublicKey (message) {
6667
// should be available in the from property of the message (peer id)
67-
// @ts-ignore - from is optional
68+
// @ts-ignore - from type changed
6869
const from = PeerId.createFromCID(message.from)
6970

7071
if (message.key) {
7172
const keyPeerId = await PeerId.createFromPubKey(message.key)
7273

7374
// the key belongs to the sender, return the key
74-
if (keyPeerId.isEqual(from)) return keyPeerId.pubKey
75+
if (keyPeerId.equals(from)) return keyPeerId.pubKey
7576
// We couldn't validate pubkey is from the originator, error
7677
throw new Error('Public Key does not match the originator')
7778
} else if (from.pubKey) {

src/pubsub/peer-streams.js

+17-22
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ log.error = debug('libp2p-pubsub:peer-streams:error')
2020
* @param {Uint8Array} source
2121
* @returns {Promise<Uint8Array>}
2222
*
23-
* @typedef {object} DuplexIterableStream
24-
* @property {Sink} sink
25-
* @property {AsyncIterator<Uint8Array>} source
23+
* @typedef {object} MuxedStream
24+
* @type import('../stream-muxer/types').MuxedStream
2625
*
2726
* @typedef PeerId
2827
* @type import('peer-id')
28+
*
29+
* @typedef PushableStream
30+
* @type import('it-pushable').Pushable<Uint8Array>
2931
*/
3032

3133
/**
@@ -54,33 +56,33 @@ class PeerStreams extends EventEmitter {
5456
* The raw outbound stream, as retrieved from conn.newStream
5557
*
5658
* @private
57-
* @type {null|DuplexIterableStream}
59+
* @type {null|MuxedStream}
5860
*/
5961
this._rawOutboundStream = null
6062
/**
6163
* The raw inbound stream, as retrieved from the callback from libp2p.handle
6264
*
6365
* @private
64-
* @type {null|DuplexIterableStream}
66+
* @type {null|MuxedStream}
6567
*/
6668
this._rawInboundStream = null
6769
/**
6870
* An AbortController for controlled shutdown of the inbound stream
6971
*
7072
* @private
71-
* @type {null|AbortController}
73+
* @type {AbortController}
7274
*/
73-
this._inboundAbortController = null
75+
this._inboundAbortController = new AbortController()
7476
/**
7577
* Write stream -- its preferable to use the write method
7678
*
77-
* @type {null|import('it-pushable').Pushable<Uint8Array>}
79+
* @type {null|PushableStream}
7880
*/
7981
this.outboundStream = null
8082
/**
8183
* Read stream
8284
*
83-
* @type {null|DuplexIterableStream}
85+
* @type {null|MuxedStream}
8486
*/
8587
this.inboundStream = null
8688
}
@@ -123,23 +125,21 @@ class PeerStreams extends EventEmitter {
123125
/**
124126
* Attach a raw inbound stream and setup a read stream
125127
*
126-
* @param {DuplexIterableStream} stream
128+
* @param {MuxedStream} stream
127129
* @returns {void}
128130
*/
129131
attachInboundStream (stream) {
130132
// Create and attach a new inbound stream
131133
// The inbound stream is:
132134
// - abortable, set to only return on abort, rather than throw
133135
// - transformed with length-prefix transform
134-
this._inboundAbortController = new AbortController()
135136
this._rawInboundStream = stream
136-
// @ts-ignore - abortable returns AsyncIterable and not a DuplexIterableStream
137+
// @ts-ignore - abortable returns AsyncIterable and not a MuxedStream
137138
this.inboundStream = abortable(
138139
pipe(
139140
this._rawInboundStream,
140141
lp.decode()
141142
),
142-
// @ts-ignore - possibly null
143143
this._inboundAbortController.signal,
144144
{ returnOnAbort: true }
145145
)
@@ -150,30 +150,26 @@ class PeerStreams extends EventEmitter {
150150
/**
151151
* Attach a raw outbound stream and setup a write stream
152152
*
153-
* @param {DuplexIterableStream} stream
153+
* @param {MuxedStream} stream
154154
* @returns {Promise<void>}
155155
*/
156156
async attachOutboundStream (stream) {
157157
// If an outbound stream already exists,
158158
// gently close it
159159
const _prevStream = this.outboundStream
160-
if (_prevStream) {
160+
if (this.outboundStream) {
161161
// End the stream without emitting a close event
162-
// @ts-ignore - outboundStream may be null
163-
await this.outboundStream.end(false)
162+
await this.outboundStream.end()
164163
}
165164

166165
this._rawOutboundStream = stream
167166
this.outboundStream = pushable({
168167
onEnd: (shouldEmit) => {
169168
// close writable side of the stream
170-
// @ts-ignore - DuplexIterableStream does not define reset
171169
this._rawOutboundStream && this._rawOutboundStream.reset && this._rawOutboundStream.reset()
172170
this._rawOutboundStream = null
173171
this.outboundStream = null
174-
// @ts-ignore - shouldEmit is `Error | undefined` so condition is
175-
// always false
176-
if (shouldEmit !== false) {
172+
if (shouldEmit) {
177173
this.emit('close')
178174
}
179175
}
@@ -205,7 +201,6 @@ class PeerStreams extends EventEmitter {
205201
}
206202
// End the inbound stream
207203
if (this.inboundStream) {
208-
// @ts-ignore - possibly null
209204
this._inboundAbortController.abort()
210205
}
211206

src/stream-muxer/tests/spawner.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ const { expect } = require('chai')
44
const pair = require('it-pair/duplex')
55
const { pipe } = require('it-pipe')
66

7-
/** @type {typeof import('p-limit').default} */
8-
// @ts-ignore - wrong type defs
9-
const pLimit = require('p-limit')
7+
const pLimit = require('p-limit').default
108
const { collect, tap, consume } = require('streaming-iterables')
119

1210
module.exports = async (Muxer, nStreams, nMsg, limit) => {

src/stream-muxer/types.ts

+3-16
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,26 @@
11
/**
22
* A libp2p stream muxer
33
*/
4-
export interface StreamMuxerInterface {
4+
export interface Muxer {
5+
multicodec: string;
56
readonly streams: Array<MuxedStream>;
67
/**
78
* Initiate a new stream with the given name. If no name is
89
* provided, the id of th stream will be used.
9-
*
10-
* @param {string} [name] - If name is not a string it will be cast to one
11-
* @returns {Stream}
1210
*/
1311
newStream (name?: string): MuxedStream;
1412

1513
/**
1614
* A function called when receiving a new stream from the remote.
17-
*
18-
* @param {MuxedStream} stream
1915
*/
2016
onStream (stream: MuxedStream): void;
2117

2218
/**
2319
* A function called when a stream ends.
24-
*
25-
* @param {MuxedStream} stream
2620
*/
2721
onStreamEnd (stream: MuxedStream): void;
2822
}
2923

30-
export declare class Muxer implements StreamMuxerInterface {
31-
multicodec: string;
32-
readonly streams: Array<MuxedStream>;
33-
newStream (name?: string): MuxedStream;
34-
onStream(stream: MuxedStream): void;
35-
onStreamEnd(stream: MuxedStream): void;
36-
}
37-
3824
export type MuxedTimeline = {
3925
open: number;
4026
close?: number;
@@ -48,6 +34,7 @@ export type MuxedStream = {
4834
source: () => AsyncIterable<Uint8Array>;
4935
timeline: MuxedTimeline;
5036
id: string;
37+
[Symbol.asyncIterator](): AsyncIterator<Uint8Array>;
5138
}
5239

5340
type Sink = (source: Uint8Array) => Promise<Uint8Array>;

0 commit comments

Comments
 (0)