Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.

Push subscriptions on connect; subscriber example implementation #7

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions examples/pub-sub-1-topic/subscriber.js
Original file line number Diff line number Diff line change
@@ -1 +1,57 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const libp2pIPFS = require('libp2p-ipfs')
const series = require('run-series')

const PSG = require('../../src')

let nodeSubscriber
let psSubscriber

function bootNode (next) {
const idSubscriber = PeerId.create({ bits: 2048 })
const peerSubscriber = new PeerInfo(idSubscriber)
peerSubscriber.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/10333'))
nodeSubscriber = new libp2pIPFS.Node(peerSubscriber)
nodeSubscriber.start((err) => {
if (err) {
throw err
}

console.log('Subscriber listening on:')

peerSubscriber.multiaddrs.forEach((ma) => {
console.log(ma.toString() + '/ipfs/' + idSubscriber.toB58String())
})

next(err)
})
}

function setUpPS (next) {
console.log('attaching pubsub')
psSubscriber = new PSG(nodeSubscriber)
next()
}

function listen () {
psSubscriber.subscribe('interop')

psSubscriber.on('interop', (data) => {
console.log(data.toString('utf8'))
})

const privateKey = 'CAASpgkwggSiAgEAAoIBAQC2SKo/HMFZeBml1AF3XijzrxrfQXdJzjePBZAbdxqKR1Mc6juRHXij6HXYPjlAk01BhF1S3Ll4Lwi0cAHhggf457sMg55UWyeGKeUv0ucgvCpBwlR5cQ020i0MgzjPWOLWq1rtvSbNcAi2ZEVn6+Q2EcHo3wUvWRtLeKz+DZSZfw2PEDC+DGPJPl7f8g7zl56YymmmzH9liZLNrzg/qidokUv5u1pdGrcpLuPNeTODk0cqKB+OUbuKj9GShYECCEjaybJDl9276oalL9ghBtSeEv20kugatTvYy590wFlJkkvyl+nPxIH0EEYMKK9XRWlu9XYnoSfboiwcv8M3SlsjAgMBAAECggEAZtju/bcKvKFPz0mkHiaJcpycy9STKphorpCT83srBVQi59CdFU6Mj+aL/xt0kCPMVigJw8P3/YCEJ9J+rS8BsoWE+xWUEsJvtXoT7vzPHaAtM3ci1HZd302Mz1+GgS8Epdx+7F5p80XAFLDUnELzOzKftvWGZmWfSeDnslwVONkL/1VAzwKy7Ce6hk4SxRE7l2NE2OklSHOzCGU1f78ZzVYKSnS5Ag9YrGjOAmTOXDbKNKN/qIorAQ1bovzGoCwx3iGIatQKFOxyVCyO1PsJYT7JO+kZbhBWRRE+L7l+ppPER9bdLFxs1t5CrKc078h+wuUr05S1P1JjXk68pk3+kQKBgQDeK8AR11373Mzib6uzpjGzgNRMzdYNuExWjxyxAzz53NAR7zrPHvXvfIqjDScLJ4NcRO2TddhXAfZoOPVH5k4PJHKLBPKuXZpWlookCAyENY7+Pd55S8r+a+MusrMagYNljb5WbVTgN8cgdpim9lbbIFlpN6SZaVjLQL3J8TWH6wKBgQDSChzItkqWX11CNstJ9zJyUE20I7LrpyBJNgG1gtvz3ZMUQCn3PxxHtQzN9n1P0mSSYs+jBKPuoSyYLt1wwe10/lpgL4rkKWU3/m1Myt0tveJ9WcqHh6tzcAbb/fXpUFT/o4SWDimWkPkuCb+8j//2yiXk0a/T2f36zKMuZvujqQKBgC6B7BAQDG2H2B/ijofp12ejJU36nL98gAZyqOfpLJ+FeMz4TlBDQ+phIMhnHXA5UkdDapQ+zA3SrFk+6yGk9Vw4Hf46B+82SvOrSbmnMa+PYqKYIvUzR4gg34rL/7AhwnbEyD5hXq4dHwMNsIDq+l2elPjwm/U9V0gdAl2+r50HAoGALtsKqMvhv8HucAMBPrLikhXP/8um8mMKFMrzfqZ+otxfHzlhI0L08Bo3jQrb0Z7ByNY6M8epOmbCKADsbWcVre/AAY0ZkuSZK/CaOXNX/AhMKmKJh8qAOPRY02LIJRBCpfS4czEdnfUhYV/TYiFNnKRj57PPYZdTzUsxa/yVTmECgYBr7slQEjb5Onn5mZnGDh+72BxLNdgwBkhO0OCdpdISqk0F0Pxby22DFOKXZEpiyI9XYP1C8wPiJsShGm2yEwBPWXnrrZNWczaVuCbXHrZkWQogBDG3HGXNdU4MAWCyiYlyinIBpPpoAJZSzpGLmWbMWh28+RJS6AQX6KHrK1o2uw=='
const idPublisher = PeerId.createFromPrivKey(privateKey)
const peerPublisher = new PeerInfo(idPublisher)
peerPublisher.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/12345/'))
psSubscriber.connect(peerPublisher)
}

series([
bootNode,
setUpPS
], listen)
20 changes: 19 additions & 1 deletion src/dial-floodsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const log = config.log
const multicodec = config.multicodec
const stream = require('stream')
const PassThrough = stream.PassThrough
const pb = require('./message')
const toPull = require('stream-to-pull-stream')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
Expand Down Expand Up @@ -51,7 +52,24 @@ module.exports = (libp2pNode, peerSet, subscriptions) => {
)

if (subscriptions.length > 0) {
// TODO send my subscriptions through the new conn
// send my subscriptions through the new conn
const peers = Object
.keys(peerSet)
.map((idB58Str) => peerSet[idB58Str])

peers.forEach((peer) => {
const subopts = subscriptions.map((topic) => {
return {
subscribe: true,
topicCID: topic
}
})
const rpc = pb.rpc.RPC.encode({
subscriptions: subopts
})

peer.stream.write(rpc)
})
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ function PubSubGossip (libp2pNode, dagService) {
})
}

this.connect = (peerInfo) => {
dial(peerInfo)
}

this.getPeerSet = () => {
return peerSet
}
Expand Down