Skip to content

Commit 5cb17fd

Browse files
committed
feat: add support for message signing
BREAKING CHANGE: as .publish should now sign messages (via _buildMessage) it now requires a callback since signing is async. This also adds an options param to the pubsub constructor to allow for disabling signing. While this change shouldnt break things upstream, implementations need to be sure to call _buildMessage for each message they will publish.
1 parent dda1894 commit 5cb17fd

File tree

8 files changed

+170
-22
lines changed

8 files changed

+170
-22
lines changed

src/index.js

+30-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const errcode = require('err-code')
1010

1111
const Peer = require('./peer')
1212
const message = require('./message')
13+
const { signMessage } = require('./message/sign')
1314
const utils = require('./utils')
1415

1516
const nextTick = require('async/nextTick')
@@ -22,17 +23,28 @@ class PubsubBaseProtocol extends EventEmitter {
2223
* @param {String} debugName
2324
* @param {String} multicodec
2425
* @param {Object} libp2p libp2p implementation
26+
* @param {Object} options
27+
* @param {boolean} options.signMessages if messages should be signed, defaults to true
2528
* @constructor
2629
*/
27-
constructor (debugName, multicodec, libp2p) {
30+
constructor (debugName, multicodec, libp2p, options) {
2831
super()
2932

33+
options = {
34+
signMessages: true,
35+
...options
36+
}
37+
3038
this.log = debug(debugName)
3139
this.log.err = debug(`${debugName}:error`)
3240
this.multicodec = multicodec
3341
this.libp2p = libp2p
3442
this.started = false
3543

44+
if (options.signMessages) {
45+
this.peerId = this.libp2p.peerInfo.id
46+
}
47+
3648
/**
3749
* Map of topics to which peers are subscribed to
3850
*
@@ -225,16 +237,32 @@ class PubsubBaseProtocol extends EventEmitter {
225237
this._removePeer(peer)
226238
}
227239

240+
/**
241+
* Normalizes the message and signs it, if signing is enabled
242+
*
243+
* @param {Message} message
244+
* @param {function(Error, Message)} callback
245+
*/
246+
_buildMessage (message, callback) {
247+
const msg = utils.normalizeOutRpcMessage(message)
248+
if (this.peerId) {
249+
signMessage(this.peerId, msg, callback)
250+
} else {
251+
nextTick(callback, null, msg)
252+
}
253+
}
254+
228255
/**
229256
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
230257
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
231258
* @abstract
232259
* @param {Array<string>|string} topics
233260
* @param {Array<any>|any} messages
261+
* @param {function(Error)} callback
234262
* @returns {undefined}
235263
*
236264
*/
237-
publish (topics, messages) {
265+
publish (topics, messages, callback) {
238266
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
239267
}
240268

src/message/index.js

+4
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
const protons = require('protons')
44

55
const rpcProto = protons(require('./rpc.proto.js'))
6+
const RPC = rpcProto.RPC
67
const topicDescriptorProto = protons(require('./topic-descriptor.proto.js'))
78

89
exports = module.exports
910
exports.rpc = rpcProto
1011
exports.td = topicDescriptorProto
12+
exports.RPC = RPC
13+
exports.Message = RPC.Message
14+
exports.SubOpts = RPC.SubOpts

src/message/rpc.proto.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ message RPC {
1313
optional bytes from = 1;
1414
optional bytes data = 2;
1515
optional bytes seqno = 3;
16-
repeated string topicIDs = 4;
16+
repeated string topicIDs = 4;
17+
optional bytes signature = 5;
18+
optional bytes key = 6;
1719
}
1820
}`

src/message/sign.js

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict'
2+
3+
const { Message } = require('./index')
4+
const SignPrefix = Buffer.from('libp2p-pubsub:')
5+
6+
module.exports.SignPrefix = SignPrefix
7+
8+
/**
9+
* Signs the provided message with the given `peerId`
10+
*
11+
* @param {PeerId} peerId
12+
* @param {Message} message
13+
* @param {function(Error, Message)} callback
14+
* @returns {void}
15+
*/
16+
module.exports.signMessage = function (peerId, message, callback) {
17+
// Get the message in bytes, and prepend with the pubsub prefix
18+
const bytes = Buffer.concat([
19+
SignPrefix,
20+
Message.encode(message)
21+
])
22+
23+
// Sign the bytes with the private key
24+
peerId.privKey.sign(bytes, (err, signature) => {
25+
if (err) return callback(err)
26+
27+
callback(null, {
28+
...message,
29+
signature: signature,
30+
key: peerId.pubKey.bytes
31+
})
32+
})
33+
}

src/peer.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const pull = require('pull-stream')
66
const setImmediate = require('async/setImmediate')
77
const EventEmitter = require('events')
88

9-
const rpc = require('./message').rpc.RPC
9+
const { RPC } = require('./message')
1010

1111
/**
1212
* The known state of a connected peer.
@@ -109,7 +109,7 @@ class Peer extends EventEmitter {
109109
})
110110
})
111111

112-
this.write(rpc.encode({
112+
this.write(RPC.encode({
113113
subscriptions: subs
114114
}))
115115
}
@@ -139,7 +139,7 @@ class Peer extends EventEmitter {
139139
* @returns {undefined}
140140
*/
141141
sendMessages (msgs) {
142-
this.write(rpc.encode({
142+
this.write(RPC.encode({
143143
msgs: msgs
144144
}))
145145
}

src/utils.js

+9-7
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,17 @@ exports.normalizeInRpcMessages = (messages) => {
8181
})
8282
}
8383

84+
exports.normalizeOutRpcMessage = (message) => {
85+
const m = Object.assign({}, message)
86+
if (typeof message.from === 'string' || message.from instanceof String) {
87+
m.from = bs58.decode(message.from)
88+
}
89+
return m
90+
}
91+
8492
exports.normalizeOutRpcMessages = (messages) => {
8593
if (!messages) {
8694
return messages
8795
}
88-
return messages.map((msg) => {
89-
const m = Object.assign({}, msg)
90-
if (typeof msg.from === 'string' || msg.from instanceof String) {
91-
m.from = bs58.decode(msg.from)
92-
}
93-
return m
94-
})
96+
return messages.map(exports.normalizeOutRpcMessage)
9597
}

test/pubsub.js

+35-9
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ const expect = chai.expect
88
const series = require('async/series')
99
const parallel = require('async/parallel')
1010

11+
const { Message } = require('../src/message')
12+
const { SignPrefix } = require('../src/message/sign')
1113
const PubsubBaseProtocol = require('../src')
14+
const { randomSeqno, normalizeOutRpcMessage } = require('../src/utils')
1215
const utils = require('./utils')
1316
const createNode = utils.createNode
1417

@@ -55,14 +58,7 @@ describe('pubsub base protocol', () => {
5558
})
5659
})
5760

58-
after((done) => {
59-
parallel([
60-
(cb) => nodeA.stop(cb),
61-
(cb) => nodeB.stop(cb)
62-
], done)
63-
})
64-
65-
it('mount the pubsub protocol', (done) => {
61+
before('mount the pubsub protocol', (done) => {
6662
psA = new PubsubImplementation(nodeA)
6763
psB = new PubsubImplementation(nodeB)
6864

@@ -73,13 +69,20 @@ describe('pubsub base protocol', () => {
7369
}, 50)
7470
})
7571

76-
it('start both Pubsub', (done) => {
72+
before('start both Pubsub', (done) => {
7773
parallel([
7874
(cb) => psA.start(cb),
7975
(cb) => psB.start(cb)
8076
], done)
8177
})
8278

79+
after((done) => {
80+
parallel([
81+
(cb) => nodeA.stop(cb),
82+
(cb) => nodeB.stop(cb)
83+
], done)
84+
})
85+
8386
it('Dial from nodeA to nodeB', (done) => {
8487
series([
8588
(cb) => nodeA.dial(nodeB.peerInfo, cb),
@@ -90,6 +93,29 @@ describe('pubsub base protocol', () => {
9093
}, 1000)
9194
], done)
9295
})
96+
97+
it('_buildMessage normalizes and signs messages', (done) => {
98+
const message = {
99+
from: 'QmABC',
100+
data: 'hello',
101+
seqno: randomSeqno(),
102+
topicIDs: ['test-topic']
103+
}
104+
105+
psA._buildMessage(message, (err, signedMessage) => {
106+
expect(err).to.not.exist()
107+
108+
const bytesToSign = Buffer.concat([
109+
SignPrefix,
110+
Message.encode(normalizeOutRpcMessage(message))
111+
])
112+
113+
psA.peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => {
114+
expect(verified).to.eql(true)
115+
done(err)
116+
})
117+
})
118+
})
93119
})
94120

95121
describe('dial the pubsub protocol on mount', () => {

test/sign.spec.js

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/* eslint-env mocha */
2+
/* eslint max-nested-callbacks: ["error", 5] */
3+
'use strict'
4+
5+
const chai = require('chai')
6+
chai.use(require('dirty-chai'))
7+
const expect = chai.expect
8+
9+
const { Message } = require('../src/message')
10+
const { signMessage, SignPrefix } = require('../src/message/sign')
11+
const PeerId = require('peer-id')
12+
const { randomSeqno } = require('../src/utils')
13+
14+
describe('message signing', () => {
15+
let peerId
16+
before((done) => {
17+
peerId = PeerId.create({
18+
bits: 1024
19+
}, (err, id) => {
20+
peerId = id
21+
done(err)
22+
})
23+
})
24+
25+
it('should be able to sign a message', (done) => {
26+
const message = {
27+
from: 'QmABC',
28+
data: 'hello',
29+
seqno: randomSeqno(),
30+
topicIDs: ['test-topic']
31+
}
32+
33+
const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)])
34+
35+
peerId.privKey.sign(bytesToSign, (err, expectedSignature) => {
36+
if (err) return done(err)
37+
38+
signMessage(peerId, message, (err, signedMessage) => {
39+
if (err) return done(err)
40+
41+
// Check the signature and public key
42+
expect(signedMessage.signature).to.eql(expectedSignature)
43+
expect(signedMessage.key).to.eql(peerId.pubKey.bytes)
44+
45+
// Verify the signature
46+
peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => {
47+
expect(verified).to.eql(true)
48+
done(err)
49+
})
50+
})
51+
})
52+
})
53+
})

0 commit comments

Comments
 (0)