From d1a13f2b8913747b7b007f80cab24f56d1dded37 Mon Sep 17 00:00:00 2001 From: Yahya Date: Wed, 27 Sep 2017 20:10:16 +0200 Subject: [PATCH 01/24] feature: ability to override default bitswap max message size License: MIT Signed-off-by: Yahya --- src/decision-engine/index.js | 9 ++++++--- src/index.js | 6 +++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index ebd60346..562a2306 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -21,7 +21,7 @@ const logger = require('../utils').logger const MAX_MESSAGE_SIZE = 512 * 1024 class DecisionEngine { - constructor (peerId, blockstore, network) { + constructor (peerId, blockstore, network, options) { this._log = logger(peerId, 'engine') this.blockstore = blockstore this.network = network @@ -33,6 +33,9 @@ class DecisionEngine { // List of tasks to be processed this._tasks = [] + options = options || {} + this._maxMessageSize = options.maxMessageSize || MAX_MESSAGE_SIZE + this._outbox = debounce(this._processTasks.bind(this), 100) } @@ -42,7 +45,7 @@ class DecisionEngine { return acc + b.data.byteLength }, 0) - if (total < MAX_MESSAGE_SIZE) { + if (total < this._maxMessageSize) { return this._sendSafeBlocks(peer, blocks, cb) } @@ -55,7 +58,7 @@ class DecisionEngine { batch.push(b) size += b.data.byteLength - if (size >= MAX_MESSAGE_SIZE || + if (size >= this._maxMessageSize || // need to ensure the last remaining items get sent outstanding === 0) { const nextBatch = batch.slice() diff --git a/src/index.js b/src/index.js index f0e59be7..863d0653 100644 --- a/src/index.js +++ b/src/index.js @@ -21,17 +21,17 @@ const logger = require('./utils').logger * @param {Blockstore} blockstore */ class Bitswap { - constructor (libp2p, blockstore) { + constructor (libp2p, blockstore, options) { this._libp2p = libp2p this._log = logger(this.peerInfo.id) - + options = options || {} // the network delivers messages this.network = new Network(libp2p, this) // local database this.blockstore = blockstore - this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network) + this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, options) // handle message sending this.wm = new WantManager(this.peerInfo.id, this.network) From bd50ee35773c04a250d87b6ad6ec90cc9bd51a11 Mon Sep 17 00:00:00 2001 From: Yahya Date: Thu, 12 Oct 2017 18:30:33 +0200 Subject: [PATCH 02/24] setting a minimum msg size, tests for go-interop and variable msg size nodes License: MIT Signed-off-by: Yahya --- src/decision-engine/index.js | 4 +++ test/bitswap.js | 69 +++++++++++++++++++++++++++++------- test/types/message.spec.js | 2 ++ 3 files changed, 63 insertions(+), 12 deletions(-) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index 562a2306..b377450e 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -35,6 +35,10 @@ class DecisionEngine { options = options || {} this._maxMessageSize = options.maxMessageSize || MAX_MESSAGE_SIZE + // override Max size to 1kb bytes by minimum. + if (this._maxMessageSize < 1024) { + this._maxMessageSize = 1024 + } this._outbox = debounce(this._processTasks.bind(this), 100) } diff --git a/test/bitswap.js b/test/bitswap.js index 0ea43d7b..8442a73e 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -18,8 +18,15 @@ const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const MAX_MESSAGE_SIZE = 512 * 1024 + // Creates a repo + libp2pNode + Bitswap with or without DHT -function createThing (dht, callback) { +function createThing (dht, msgSize, callback) { + if (!callback) { + callback = msgSize + msgSize = MAX_MESSAGE_SIZE + } + waterfall([ (cb) => createTempRepo(cb), (repo, cb) => { @@ -28,7 +35,7 @@ function createThing (dht, callback) { }, (err, node) => cb(err, repo, node)) }, (repo, libp2pNode, cb) => { - const bitswap = new Bitswap(libp2pNode, repo.blocks) + const bitswap = new Bitswap(libp2pNode, repo.blocks, {maxMessageSize: msgSize}) bitswap.start((err) => cb(err, repo, libp2pNode, bitswap)) } ], (err, repo, libp2pNode, bitswap) => { @@ -49,12 +56,13 @@ describe('bitswap without DHT', function () { before((done) => { parallel([ - (cb) => createThing(false, cb), - (cb) => createThing(false, cb), - (cb) => createThing(false, cb) + (cb) => createThing(false, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(false, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(false, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(false, 32, cb) ], (err, results) => { expect(err).to.not.exist() - expect(results).to.have.length(3) + expect(results).to.have.length(4) nodes = results done() }) @@ -77,6 +85,13 @@ describe('bitswap without DHT', function () { ], done) }) + it('connect 1 -> 3 && 2 -> 3', (done) => { + parallel([ + (cb) => nodes[1].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb), + (cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb) + ], done) + }) + it('put a block in 2, fail to get it in 0', (done) => { const finish = orderedFinish(2, done) @@ -104,12 +119,13 @@ describe('bitswap with DHT', () => { before((done) => { parallel([ - (cb) => createThing(true, cb), - (cb) => createThing(true, cb), - (cb) => createThing(true, cb) + (cb) => createThing(true, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(true, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(true, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(true, 32, cb) ], (err, results) => { expect(err).to.not.exist() - expect(results).to.have.length(3) + expect(results).to.have.length(4) nodes = results done() }) @@ -125,10 +141,11 @@ describe('bitswap with DHT', () => { }, done) }) - it('connect 0 -> 1 && 1 -> 2', (done) => { + it('connect 0 -> 1 && 1 -> 2 && 2 -> 3', (done) => { parallel([ (cb) => nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo, cb), - (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb) + (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb), + (cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb) ], done) }) @@ -145,4 +162,32 @@ describe('bitswap with DHT', () => { }) ], done) }) + + it('put a block in 2, get it in 3', (done) => { + waterfall([ + (cb) => makeBlock(cb), + (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)), + (block, cb) => setTimeout(() => cb(null, block), 400), + (block, cb) => nodes[3].bitswap.get(block.cid, (err, blockRetrieved) => { + expect(err).to.not.exist() + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) + cb() + }) + ], done) + }) + + it('put a block in 3, get it in 2', (done) => { + waterfall([ + (cb) => makeBlock(cb), + (block, cb) => nodes[3].bitswap.put(block, () => cb(null, block)), + (block, cb) => setTimeout(() => cb(null, block), 400), + (block, cb) => nodes[2].bitswap.get(block.cid, (err, blockRetrieved) => { + expect(err).to.not.exist() + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) + cb() + }) + ], done) + }) }) diff --git a/test/types/message.spec.js b/test/types/message.spec.js index 9e155f70..b4c4817b 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -266,6 +266,7 @@ describe('BitswapMessage', () => { expect(err).to.not.exist() // TODO // check the deserialised message + expect(message.serializeToBitswap110()).to.eql(rawMessageFullWantlist) done() }) }) @@ -275,6 +276,7 @@ describe('BitswapMessage', () => { expect(err).to.not.exist() // TODO // check the deserialised message + expect(message.serializeToBitswap110()).to.eql(rawMessageOneBlock) done() }) }) From 339503d421847dada3a52853a23ff338369233ca Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 24 Oct 2017 19:14:57 +0100 Subject: [PATCH 03/24] chore: update deps --- package.json | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index 96715309..f29eb641 100644 --- a/package.json +++ b/package.json @@ -37,29 +37,29 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^12.0.6", + "aegir": "^12.1.0", "benchmark": "^2.1.4", "chai": "^4.1.2", "dirty-chai": "^2.0.1", "ipfs-repo": "~0.17.0", - "libp2p": "~0.12.3", - "libp2p-kad-dht": "~0.5.0", + "libp2p": "~0.12.4", + "libp2p-kad-dht": "~0.5.1", "libp2p-multiplex": "~0.5.0", "libp2p-secio": "~0.8.1", - "libp2p-tcp": "~0.11.0", + "libp2p-tcp": "~0.11.1", "lodash": "^4.17.4", "multiaddr": "^3.0.1", "ncp": "^2.0.0", - "peer-book": "~0.5.0", - "peer-id": "~0.10.0", + "peer-book": "~0.5.1", + "peer-id": "~0.10.2", "peer-info": "~0.11.0", - "rimraf": "^2.6.1", + "rimraf": "^2.6.2", "safe-buffer": "^5.1.1" }, "dependencies": { "async": "^2.5.0", - "cids": "~0.5.1", - "debug": "^3.0.1", + "cids": "~0.5.2", + "debug": "^3.1.0", "ipfs-block": "~0.6.0", "lodash.debounce": "^4.0.8", "lodash.find": "^4.6.0", @@ -70,12 +70,12 @@ "lodash.sortby": "^4.7.0", "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", - "multihashing-async": "~0.4.6", + "multihashing-async": "~0.4.7", "protons": "^1.0.0", "pull-defer": "^0.2.2", "pull-length-prefixed": "^1.3.0", "pull-pushable": "^2.1.1", - "pull-stream": "^3.6.0", + "pull-stream": "^3.6.1", "safe-buffer": "^5.1.1", "varint-decoder": "^0.1.1" }, From 751d436fb64bfbadd58e649fa86f6ce8a8da5d6a Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Wed, 8 Nov 2017 16:03:13 +0000 Subject: [PATCH 04/24] fix: add missing multicodec dependency (#155) * added missing multicodec dependency * using ~ for multicodec dependency range because version < 1 --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index f29eb641..a5090c1a 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "lodash.sortby": "^4.7.0", "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", + "multicodec": "~0.2.5", "multihashing-async": "~0.4.7", "protons": "^1.0.0", "pull-defer": "^0.2.2", From fa797a0825888dacb3252d743dc03ce2fb6f3d05 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 8 Nov 2017 16:04:11 +0000 Subject: [PATCH 05/24] chore: update deps --- package.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index a5090c1a..fc181dfe 100644 --- a/package.json +++ b/package.json @@ -37,12 +37,12 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^12.1.0", + "aegir": "^12.1.3", "benchmark": "^2.1.4", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "ipfs-repo": "~0.17.0", - "libp2p": "~0.12.4", + "ipfs-repo": "~0.18.3", + "libp2p": "~0.13.0", "libp2p-kad-dht": "~0.5.1", "libp2p-multiplex": "~0.5.0", "libp2p-secio": "~0.8.1", @@ -57,10 +57,10 @@ "safe-buffer": "^5.1.1" }, "dependencies": { - "async": "^2.5.0", + "async": "^2.6.0", "cids": "~0.5.2", "debug": "^3.1.0", - "ipfs-block": "~0.6.0", + "ipfs-block": "~0.6.1", "lodash.debounce": "^4.0.8", "lodash.find": "^4.6.0", "lodash.groupby": "^4.6.0", From 1a62f496849fdf34bb4a0c728991b642f23e05e5 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 8 Nov 2017 16:07:49 +0000 Subject: [PATCH 06/24] chore: update contributors From ab69719935f52ec4b54fb63425a09ee26db51e61 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 8 Nov 2017 16:07:49 +0000 Subject: [PATCH 07/24] chore: release version v0.17.3 --- CHANGELOG.md | 10 ++++++++++ package.json | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1de261a9..4265e177 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ + +## [0.17.3](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.2...v0.17.3) (2017-11-08) + + +### Bug Fixes + +* add missing multicodec dependency ([#155](https://github.com/ipfs/js-ipfs-bitswap/issues/155)) ([751d436](https://github.com/ipfs/js-ipfs-bitswap/commit/751d436)) + + + ## [0.17.2](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.1...v0.17.2) (2017-09-07) diff --git a/package.json b/package.json index fc181dfe..b3b67aed 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ipfs-bitswap", - "version": "0.17.2", + "version": "0.17.3", "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", "main": "src/index.js", "browser": { From a8b1e07b2f3476d3616ba66cb7a50f473f62069a Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Fri, 10 Nov 2017 23:06:04 +1300 Subject: [PATCH 08/24] feat: windows interop (#154) * test: close IPFSRepo before deleting its files * chore: run on appveyor * chore: upgrade packages * chore: debugging appveyor failure * test: more appveyor debugging * test: use os.tmpdir() not '/tmp' * test: timeout issue * test: fix error reporting * test: Math.Random => uuid Math.Random can create collisions. Using uuid should not create a collision. * test: all tests can timeout, need global timeout specified * test: bitswap-mock multi peer is just too long, skip it * fix package.json * test: ipfs@0.18.3 breaks us. reverting to 0.18.2 * chore: update repo version * chore: update deps * bring back the test --- appveyor.yml | 23 +++++++++++++++++++++++ package.json | 14 +++++++------- test/bitswap-mock-internals.js | 2 +- test/bitswap.js | 4 +++- test/fixtures/repo/version | 2 +- test/utils/create-temp-repo-nodejs.js | 15 +++++++++++---- test/utils/make-block.js | 3 ++- test/wantmanager/index.spec.js | 2 +- 8 files changed, 49 insertions(+), 16 deletions(-) create mode 100644 appveyor.yml diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 00000000..de3e3780 --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,23 @@ +environment: + matrix: + - nodejs_version: "6" + - nodejs_version: "8" + +# cache: +# - node_modules + +platform: + - x64 + +install: + - ps: Install-Product node $env:nodejs_version $env:platform + - npm install + +test_script: + - node --version + - npm --version + - npm test + +build: off + +version: "{build}" diff --git a/package.json b/package.json index b3b67aed..34055566 100644 --- a/package.json +++ b/package.json @@ -8,16 +8,16 @@ "./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js" }, "scripts": { - "test": "aegir test --target node --target browser", - "test:browser": "aegir test --target browser", - "test:node": "aegir test --target node", + "test": "aegir test -t node -t browser", + "test:browser": "aegir test -t browser", + "test:node": "aegir test -t node", "lint": "aegir lint", "release": "aegir release", "release-minor": "aegir release --type minor", "release-major": "aegir release --type major", "bench": "node benchmarks/index", "build": "aegir build", - "coverage": "aegir coverage -u", + "coverage": "aegir coverage --provider codecov", "docs": "aegir docs" }, "repository": { @@ -43,7 +43,7 @@ "dirty-chai": "^2.0.1", "ipfs-repo": "~0.18.3", "libp2p": "~0.13.0", - "libp2p-kad-dht": "~0.5.1", + "libp2p-kad-dht": "~0.6.0", "libp2p-multiplex": "~0.5.0", "libp2p-secio": "~0.8.1", "libp2p-tcp": "~0.11.1", @@ -52,7 +52,7 @@ "ncp": "^2.0.0", "peer-book": "~0.5.1", "peer-id": "~0.10.2", - "peer-info": "~0.11.0", + "peer-info": "~0.11.1", "rimraf": "^2.6.2", "safe-buffer": "^5.1.1" }, @@ -73,7 +73,7 @@ "multicodec": "~0.2.5", "multihashing-async": "~0.4.7", "protons": "^1.0.0", - "pull-defer": "^0.2.2", + "pull-defer": "~0.2.2", "pull-length-prefixed": "^1.3.0", "pull-pushable": "^2.1.1", "pull-stream": "^3.6.1", diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 47e03fa6..6cc96a7c 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -110,7 +110,7 @@ describe('bitswap with mocks', () => { }) it('multi peer', function (done) { - this.timeout(40 * 1000) + this.timeout(80 * 1000) const bs = new Bitswap(mockLibp2pNode(), repo.blocks) let others diff --git a/test/bitswap.js b/test/bitswap.js index 0ea43d7b..f3b21ceb 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -99,7 +99,9 @@ describe('bitswap without DHT', function () { }) }) -describe('bitswap with DHT', () => { +describe('bitswap with DHT', function () { + this.timeout(20 * 1000) + let nodes before((done) => { diff --git a/test/fixtures/repo/version b/test/fixtures/repo/version index 7ed6ff82..1e8b3149 100644 --- a/test/fixtures/repo/version +++ b/test/fixtures/repo/version @@ -1 +1 @@ -5 +6 diff --git a/test/utils/create-temp-repo-nodejs.js b/test/utils/create-temp-repo-nodejs.js index de34c326..b656bcf2 100644 --- a/test/utils/create-temp-repo-nodejs.js +++ b/test/utils/create-temp-repo-nodejs.js @@ -1,22 +1,29 @@ 'use strict' const IPFSRepo = require('ipfs-repo') -const path = require('path') +const pathJoin = require('path').join +const os = require('os') const ncp = require('ncp') const rimraf = require('rimraf') +const series = require('async/series') -const baseRepo = path.join(__dirname, '../fixtures/repo') +const baseRepo = pathJoin(__dirname, '../fixtures/repo') function createTempRepo (callback) { const date = Date.now().toString() - const path = `/tmp/bitswap-tests-${date}-${Math.random()}` + const path = pathJoin(os.tmpdir(), `bitswap-tests-${date}-${Math.random()}`) ncp(baseRepo, path, (err) => { if (err) { return callback(err) } const repo = new IPFSRepo(path) - repo.teardown = (callback) => rimraf(path, callback) + repo.teardown = (done) => { + series([ + (cb) => repo.close(cb), + (cb) => rimraf(path, cb) + ], (err) => done(err)) + } repo.open((err) => { if (err) { return callback(err) } diff --git a/test/utils/make-block.js b/test/utils/make-block.js index d125f880..66b69193 100644 --- a/test/utils/make-block.js +++ b/test/utils/make-block.js @@ -4,9 +4,10 @@ const multihashing = require('multihashing-async') const CID = require('cids') const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer +const uuid = require('uuid/v4') module.exports = (callback) => { - const data = Buffer.from(`hello world ${Math.random()}`) + const data = Buffer.from(`hello world ${uuid()}`) multihashing(data, 'sha2-256', (err, hash) => { if (err) { return callback(err) } diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index c40590f9..539f71cc 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -67,7 +67,7 @@ describe('WantManager', () => { expect(m[0]).to.be.eql(calls.connects[i]) if (!m[1].equals(msgs[i])) { return done( - new Error(`expected ${m[1].toString()} to equal ${msgs[1].toString()}`) + new Error(`expected ${m[1].toString()} to equal ${msgs[i].toString()}`) ) } }) From 1eea3636c483ffb050118d740fa55f182127c741 Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 10 Nov 2017 10:07:57 +0000 Subject: [PATCH 09/24] chore: update contributors --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 34055566..af0baa69 100644 --- a/package.json +++ b/package.json @@ -86,6 +86,7 @@ "Friedel Ziegelmayer ", "Pedro Teixeira ", "Richard Littauer ", + "Richard Schneider ", "Stephen Whitmore ", "dmitriy ryajov ", "greenkeeper[bot] ", From 91aaedcde1fffbf409e72d88b984685f3efc407e Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 10 Nov 2017 10:07:57 +0000 Subject: [PATCH 10/24] chore: release version v0.17.4 --- CHANGELOG.md | 10 ++++++++++ package.json | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4265e177..56187d4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ + +## [0.17.4](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.3...v0.17.4) (2017-11-10) + + +### Features + +* windows interop ([#154](https://github.com/ipfs/js-ipfs-bitswap/issues/154)) ([a8b1e07](https://github.com/ipfs/js-ipfs-bitswap/commit/a8b1e07)) + + + ## [0.17.3](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.2...v0.17.3) (2017-11-08) diff --git a/package.json b/package.json index af0baa69..d815f747 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ipfs-bitswap", - "version": "0.17.3", + "version": "0.17.4", "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", "main": "src/index.js", "browser": { From f2c8ea28334ea8ab32509c8cf00d3e12d2aaf264 Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 23 Nov 2017 10:56:26 +0000 Subject: [PATCH 11/24] chore: update deps --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index d815f747..ebd183c9 100644 --- a/package.json +++ b/package.json @@ -37,12 +37,12 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^12.1.3", + "aegir": "^12.2.0", "benchmark": "^2.1.4", "chai": "^4.1.2", "dirty-chai": "^2.0.1", "ipfs-repo": "~0.18.3", - "libp2p": "~0.13.0", + "libp2p": "~0.13.1", "libp2p-kad-dht": "~0.6.0", "libp2p-multiplex": "~0.5.0", "libp2p-secio": "~0.8.1", From 6886a595d1f657f526a0eeec7377d721f9e277b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=A0=C9=AA=E1=B4=84=E1=B4=9B=E1=B4=8F=CA=80=20=CA=99?= =?UTF-8?q?=E1=B4=8A=E1=B4=87=CA=9F=E1=B4=8B=CA=9C=E1=B4=8F=CA=9F=E1=B4=8D?= Date: Mon, 27 Nov 2017 14:30:58 +0100 Subject: [PATCH 12/24] chore: updating CI files (#157) This commit updates all CI scripts to the latest version --- .travis.yml | 2 +- appveyor.yml | 28 +++++++++++++++++----------- ci/Jenkinsfile | 2 ++ circle.yml | 15 +++++++++++++++ 4 files changed, 35 insertions(+), 12 deletions(-) create mode 100644 ci/Jenkinsfile create mode 100644 circle.yml diff --git a/.travis.yml b/.travis.yml index 584f308f..5102ee5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. sudo: false language: node_js @@ -14,7 +15,6 @@ script: - npm run lint - npm run test - npm run coverage - - make test before_script: - export DISPLAY=:99.0 diff --git a/appveyor.yml b/appveyor.yml index de3e3780..046bf910 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,23 +1,29 @@ +# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. +version: "{build}" + environment: matrix: - nodejs_version: "6" - nodejs_version: "8" -# cache: -# - node_modules - -platform: - - x64 +matrix: + fast_finish: true install: - - ps: Install-Product node $env:nodejs_version $env:platform - - npm install + # Install Node.js + - ps: Install-Product node $env:nodejs_version -test_script: + # Upgrade npm + - npm install -g npm + + # Output our current versions for debugging - node --version - npm --version - - npm test -build: off + # Install our package dependencies + - npm install -version: "{build}" +test_script: + - npm run test:node + +build: off diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile new file mode 100644 index 00000000..a7da2e54 --- /dev/null +++ b/ci/Jenkinsfile @@ -0,0 +1,2 @@ +// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. +javascript() diff --git a/circle.yml b/circle.yml new file mode 100644 index 00000000..00096937 --- /dev/null +++ b/circle.yml @@ -0,0 +1,15 @@ +# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. +machine: + node: + version: stable + +dependencies: + pre: + - google-chrome --version + - curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb + - sudo dpkg -i google-chrome.deb || true + - sudo apt-get update + - sudo apt-get install -f + - sudo apt-get install --only-upgrade lsb-base + - sudo dpkg -i google-chrome.deb + - google-chrome --version From 17e15d079a468d99442fc93554cb2a29a5c60bc8 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Mon, 27 Nov 2017 13:43:27 +0000 Subject: [PATCH 13/24] feat: stats improvements (#158) * bitswap stats: async updater * linter happy * pre-commit hook for linting and tests * stats: support dataReceived * stats: data received * stats: blocks sent and data sent * stats: using bignum * stats: compute throttle with threshold * stats: update timeout is now dynamic based on queue size * stats: moving averages * stats: providesBufferLength * stats: support for providesBufferLength and wantListLength * stats: support for peerCount * stats: enable / disable * increased test timeout --- package.json | 7 + src/decision-engine/index.js | 6 +- src/index.js | 73 +++++++--- src/network.js | 11 +- src/stats.js | 154 +++++++++++++++++++++ src/types/wantlist/index.js | 9 +- src/want-manager/index.js | 5 +- test/bitswap-mock-internals.js | 22 +-- test/bitswap-stats.js | 244 +++++++++++++++++++++++++++++++++ test/node.js | 1 + test/utils/mocks.js | 6 +- 11 files changed, 493 insertions(+), 45 deletions(-) create mode 100644 src/stats.js create mode 100644 test/bitswap-stats.js diff --git a/package.json b/package.json index ebd183c9..d7d670be 100644 --- a/package.json +++ b/package.json @@ -53,11 +53,13 @@ "peer-book": "~0.5.1", "peer-id": "~0.10.2", "peer-info": "~0.11.1", + "pre-commit": "^1.2.2", "rimraf": "^2.6.2", "safe-buffer": "^5.1.1" }, "dependencies": { "async": "^2.6.0", + "big.js": "^5.0.3", "cids": "~0.5.2", "debug": "^3.1.0", "ipfs-block": "~0.6.1", @@ -70,6 +72,7 @@ "lodash.sortby": "^4.7.0", "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", + "moving-average": "^1.0.0", "multicodec": "~0.2.5", "multihashing-async": "~0.4.7", "protons": "^1.0.0", @@ -80,6 +83,10 @@ "safe-buffer": "^5.1.1", "varint-decoder": "^0.1.1" }, + "pre-commit": [ + "lint", + "test" + ], "contributors": [ "David Dias ", "Dmitriy Ryajov ", diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index ebd60346..54acdfb2 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -21,10 +21,11 @@ const logger = require('../utils').logger const MAX_MESSAGE_SIZE = 512 * 1024 class DecisionEngine { - constructor (peerId, blockstore, network) { + constructor (peerId, blockstore, network, stats) { this._log = logger(peerId, 'engine') this.blockstore = blockstore this.network = network + this._stats = stats // A list of of ledgers by their partner id this.ledgerMap = new Map() @@ -267,6 +268,9 @@ class DecisionEngine { const l = new Ledger(peerId) this.ledgerMap.set(peerIdStr, l) + if (this._stats) { + this._stats.push('peerCount', 1) + } return l } diff --git a/src/index.js b/src/index.js index f0e59be7..522fbcf6 100644 --- a/src/index.js +++ b/src/index.js @@ -12,6 +12,24 @@ const Network = require('./network') const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') const logger = require('./utils').logger +const Stats = require('./stats') + +const defaultOptions = { + statsEnabled: false, + statsComputeThrottleTimeout: 1000, + statsComputeThrottleMaxQueueSize: 1000 +} +const statsKeys = [ + 'blocksReceived', + 'dataReceived', + 'dupBlksReceived', + 'dupDataReceived', + 'blocksSent', + 'dataSent', + 'providesBufferLength', + 'wantListLength', + 'peerCount' +] /** * JavaScript implementation of the Bitswap 'data exchange' protocol @@ -21,24 +39,29 @@ const logger = require('./utils').logger * @param {Blockstore} blockstore */ class Bitswap { - constructor (libp2p, blockstore) { + constructor (libp2p, blockstore, options) { this._libp2p = libp2p this._log = logger(this.peerInfo.id) + this._options = Object.assign({}, defaultOptions, options) + + // stats + this._stats = new Stats(statsKeys, { + enabled: this._options.statsEnabled, + computeThrottleTimeout: this._options.statsComputeThrottleTimeout, + computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize + }) + // the network delivers messages - this.network = new Network(libp2p, this) + this.network = new Network(libp2p, this, {}, this._stats) // local database this.blockstore = blockstore - this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network) + this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats) // handle message sending - this.wm = new WantManager(this.peerInfo.id, this.network) - - this.blocksRecvd = 0 - this.dupBlocksRecvd = 0 - this.dupDataRecvd = 0 + this.wm = new WantManager(this.peerInfo.id, this.network, this._stats) this.notifications = new Notifications(this.peerInfo.id) } @@ -95,11 +118,12 @@ class Bitswap { } _updateReceiveCounters (block, exists) { - this.blocksRecvd++ + this._stats.push('blocksReceived', 1) + this._stats.push('dataReceived', block.data.length) if (exists) { - this.dupBlocksRecvd++ - this.dupDataRecvd += block.data.length + this._stats.push('dupBlksReceived', 1) + this._stats.push('dupDataReceived', block.data.length) } } @@ -137,6 +161,14 @@ class Bitswap { }) } + enableStats () { + this._stats.enable() + } + + disableStats () { + this._stats.disable() + } + /** * Return the current wantlist for a given `peerId` * @@ -329,25 +361,28 @@ class Bitswap { /** * Get the current list of wants. * - * @returns {Array} + * @returns {Iterator} */ getWantlist () { return this.wm.wantlist.entries() } + /** + * Get the current list of partners. + * + * @returns {Array} + */ + peers () { + return this.engine.peers() + } + /** * Get stats about the bitswap node. * * @returns {Object} */ stat () { - return { - wantlist: this.getWantlist(), - blocksReceived: this.blocksRecvd, - dupBlksReceived: this.dupBlocksRecvd, - dupDataReceived: this.dupDataRecvd, - peers: this.engine.peers() - } + return this._stats } /** diff --git a/src/network.js b/src/network.js index 9ac85590..f3d410dd 100644 --- a/src/network.js +++ b/src/network.js @@ -14,13 +14,14 @@ const BITSWAP100 = '/ipfs/bitswap/1.0.0' const BITSWAP110 = '/ipfs/bitswap/1.1.0' class Network { - constructor (libp2p, bitswap, options) { + constructor (libp2p, bitswap, options, stats) { this._log = logger(libp2p.peerInfo.id, 'network') options = options || {} this.libp2p = libp2p this.bitswap = bitswap this.b100Only = options.b100Only || false + this._stats = stats this._running = false } @@ -149,6 +150,7 @@ class Network { } }) callback() + this._updateSentStats(msg.blocks) }) } @@ -176,6 +178,13 @@ class Network { callback(null, conn, BITSWAP110) }) } + + _updateSentStats (blocks) { + if (this._stats) { + blocks.forEach((block) => this._stats.push('dataSent', block.data.length)) + this._stats.push('blocksSent', blocks.size) + } + } } function writeMessage (conn, msg, callback) { diff --git a/src/stats.js b/src/stats.js new file mode 100644 index 00000000..bccd62cf --- /dev/null +++ b/src/stats.js @@ -0,0 +1,154 @@ +'use strict' + +const EventEmitter = require('events') +const Big = require('big.js') +const MovingAverage = require('moving-average') + +const defaultOptions = { + movingAverageIntervals: [ + 60 * 1000, // 1 minute + 5 * 60 * 1000, // 5 minutes + 15 * 60 * 1000 // 15 minutes + ] +} + +class Stats extends EventEmitter { + constructor (initialCounters, _options) { + super() + + const options = Object.assign({}, defaultOptions, _options) + + if (typeof options.computeThrottleTimeout !== 'number') { + throw new Error('need computeThrottleTimeout') + } + + if (typeof options.computeThrottleMaxQueueSize !== 'number') { + throw new Error('need computeThrottleMaxQueueSize') + } + + this._options = options + this._queue = [] + this._stats = {} + + this._frequencyLastTime = Date.now() + this._frequencyAccumulators = {} + this._movingAverages = {} + + this._update = this._update.bind(this) + + initialCounters.forEach((key) => { + this._stats[key] = Big(0) + this._movingAverages[key] = {} + this._options.movingAverageIntervals.forEach((interval) => { + const ma = this._movingAverages[key][interval] = MovingAverage(interval) + ma.push(this._frequencyLastTime, 0) + }) + }) + + this._enabled = this._options.enabled + } + + enable () { + this._enabled = true + } + + disable () { + this._disabled = true + } + + get snapshot () { + return Object.assign({}, this._stats) + } + + get movingAverages () { + return Object.assign({}, this._movingAverages) + } + + push (counter, inc) { + if (this._enabled) { + this._queue.push([counter, inc, Date.now()]) + this._resetComputeTimeout() + } + } + + _resetComputeTimeout () { + if (this._timeout) { + clearTimeout(this._timeout) + } + this._timeout = setTimeout(this._update, this._nextTimeout()) + } + + _nextTimeout () { + // calculate the need for an update, depending on the queue length + const urgency = this._queue.length / this._options.computeThrottleMaxQueueSize + return Math.max(this._options.computeThrottleTimeout * (1 - urgency), 0) + } + + _update () { + this._timeout = null + if (this._queue.length) { + let last + while (this._queue.length) { + const op = last = this._queue.shift() + this._applyOp(op) + } + + this._updateFrequency(last[2]) // contains timestamp of last op + + this.emit('update', this._stats) + } + } + + _updateFrequency (latestTime) { + const timeDiff = latestTime - this._frequencyLastTime + + Object.keys(this._stats).forEach((key) => { + this._updateFrequencyFor(key, timeDiff, latestTime) + }) + + this._frequencyLastTime = latestTime + } + + _updateFrequencyFor (key, timeDiffMS, latestTime) { + const count = this._frequencyAccumulators[key] || 0 + this._frequencyAccumulators[key] = 0 + const hz = (count / timeDiffMS) * 1000 + + let movingAverages = this._movingAverages[key] + if (!movingAverages) { + movingAverages = this._movingAverages[key] = {} + } + this._options.movingAverageIntervals.forEach((movingAverageInterval) => { + let movingAverage = movingAverages[movingAverageInterval] + if (!movingAverage) { + movingAverage = movingAverages[movingAverageInterval] = MovingAverage(movingAverageInterval) + } + movingAverage.push(latestTime, hz) + }) + } + + _applyOp (op) { + const key = op[0] + const inc = op[1] + + if (typeof inc !== 'number') { + throw new Error('invalid increment number:', inc) + } + + let n + + if (!this._stats.hasOwnProperty(key)) { + n = this._stats[key] = Big(0) + } else { + n = this._stats[key] + } + this._stats[key] = n.plus(inc) + + if (!this._frequencyAccumulators[key]) { + this._frequencyAccumulators[key] = 0 + } + this._frequencyAccumulators[key] += inc + } +} + +module.exports = Stats diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js index a269375d..3459c8e3 100644 --- a/src/types/wantlist/index.js +++ b/src/types/wantlist/index.js @@ -4,8 +4,9 @@ const sort = require('lodash.sortby') const Entry = require('./entry') class Wantlist { - constructor () { + constructor (stats) { this.set = new Map() + this._stats = stats } get length () { @@ -21,6 +22,9 @@ class Wantlist { entry.priority = priority } else { this.set.set(cidStr, new Entry(cid, priority)) + if (this._stats) { + this._stats.push('wantListSize', 1) + } } } @@ -40,6 +44,9 @@ class Wantlist { } this.set.delete(cidStr) + if (this._stats) { + this._stats.push('wantListSize', -1) + } } removeForce (cidStr) { diff --git a/src/want-manager/index.js b/src/want-manager/index.js index db4c2c12..d7753dcd 100644 --- a/src/want-manager/index.js +++ b/src/want-manager/index.js @@ -9,11 +9,12 @@ const MsgQueue = require('./msg-queue') const logger = require('../utils').logger module.exports = class WantManager { - constructor (peerId, network) { + constructor (peerId, network, stats) { this.peers = new Map() - this.wantlist = new Wantlist() + this.wantlist = new Wantlist(stats) this.network = network + this._stats = stats this._peerId = peerId this._log = logger(peerId, 'want') diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index 6cc96a7c..f040a815 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -24,7 +24,9 @@ const storeHasBlocks = require('./utils/store-has-blocks') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish -describe('bitswap with mocks', () => { +describe('bitswap with mocks', function () { + this.timeout(10 * 1000) + let repo let blocks let ids @@ -66,8 +68,6 @@ describe('bitswap with mocks', () => { bs._receiveMessage(other, msg, (err) => { expect(err).to.not.exist() - expect(bs.blocksRecvd).to.equal(2) - expect(bs.dupBlocksRecvd).to.equal(0) map([b1.cid, b2.cid], (cid, cb) => repo.blocks.get(cid, cb), (err, blocks) => { expect(err).to.not.exist() @@ -96,9 +96,6 @@ describe('bitswap with mocks', () => { bs._receiveMessage(other, msg, (err) => { expect(err).to.not.exist() - expect(bs.blocksRecvd).to.be.eql(0) - expect(bs.dupBlocksRecvd).to.be.eql(0) - const wl = bs.wantlistForPeer(other) expect(wl.has(b1.cid.buffer.toString())).to.eql(true) @@ -328,19 +325,6 @@ describe('bitswap with mocks', () => { }) }) - describe('stat', () => { - it('has initial stats', () => { - const bs = new Bitswap(mockLibp2pNode(), {}) - - const stats = bs.stat() - expect(stats).to.have.property('wantlist') - expect(stats).to.have.property('blocksReceived', 0) - expect(stats).to.have.property('dupBlksReceived', 0) - expect(stats).to.have.property('dupDataReceived', 0) - expect(stats).to.have.property('peers') - }) - }) - describe('unwant', () => { it('removes blocks that are wanted multiple times', (done) => { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js new file mode 100644 index 00000000..44b6e28a --- /dev/null +++ b/test/bitswap-stats.js @@ -0,0 +1,244 @@ +/* eslint-env mocha */ +'use strict' + +const map = require('async/map') +const each = require('async/each') +const eachOf = require('async/eachOf') +const parallel = require('async/parallel') +const _ = require('lodash') +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const PeerId = require('peer-id') + +const Message = require('../src/types/message') +const Bitswap = require('../src') + +const createTempRepo = require('./utils/create-temp-repo-nodejs') +const createLibp2pNode = require('./utils/create-libp2p-node') +const makeBlock = require('./utils/make-block') +const orderedFinish = require('./utils/helpers').orderedFinish + +const expectedStats = [ + 'blocksReceived', + 'dataReceived', + 'dupBlksReceived', + 'dupDataReceived', + 'blocksSent', + 'dataSent', + 'providesBufferLength', + 'wantListLength' +] + +const expectedTimeWindows = [ + 1000 * 60, + 1000 * 60 * 5, + 1000 * 60 * 15 +] + +describe('bitswap stats', () => { + const nodes = [0, 1] + let libp2pNodes + let repos + let bitswaps + let bs + let blocks + let ids + + before((done) => { + parallel( + { + blocks: (cb) => map(_.range(2), (i, cb) => makeBlock(cb), cb), + ids: (cb) => map(_.range(2), (i, cb) => PeerId.create({bits: 1024}, cb), cb) + }, + (err, results) => { + if (err) { + return done(err) + } + + blocks = results.blocks + ids = results.ids + + done() + } + ) + }) + + before((done) => { + // create 2 temp repos + map(nodes, (n, cb) => createTempRepo(cb), (err, _repos) => { + expect(err).to.not.exist() + repos = _repos + done() + }) + }) + + before((done) => { + // create 2 libp2p nodes + map(nodes, (n, cb) => createLibp2pNode({ + DHT: repos[n].datastore + }, cb), (err, _libp2pNodes) => { + expect(err).to.not.exist() + libp2pNodes = _libp2pNodes + done() + }) + }) + + before(() => { + bitswaps = nodes.map((node, i) => new Bitswap(libp2pNodes[i], repos[i].blocks, { + statsEnabled: true, + statsComputeThrottleTimeout: 500 // fast update interval for so tests run fast + })) + bs = bitswaps[0] + }) + + // start the first bitswap + before((done) => bs.start(done)) + + after((done) => each(bitswaps, (bs, cb) => bs.stop(cb), done)) + + after((done) => each(repos, (repo, cb) => repo.teardown(cb), done)) + + after((done) => each(libp2pNodes, (n, cb) => n.stop(cb), done)) + + it('has initial stats', () => { + const stats = bs.stat() + const snapshot = stats.snapshot + + expectedStats.forEach((key) => { + expect(snapshot).to.have.property(key) + expect(snapshot[key].eq(0)).to.be.true() + }) + + const movingAverages = stats.movingAverages + expectedStats.forEach((key) => { + expectedTimeWindows.forEach((timeWindow) => { + expect(movingAverages).to.have.property(key) + expect(stats.movingAverages[key]).to.have.property(timeWindow) + const ma = stats.movingAverages[key][timeWindow] + expect(ma.movingAverage()).to.eql(0) + expect(ma.variance()).to.eql(0) + }) + }) + }) + + it('updates blocks received', (done) => { + bs.stat().once('update', (stats) => { + expect(stats.blocksReceived.eq(2)).to.be.true() + expect(stats.dataReceived.eq(96)).to.be.true() + expect(stats.dupBlksReceived.eq(0)).to.be.true() + expect(stats.dupDataReceived.eq(0)).to.be.true() + expect(stats.blocksSent.eq(0)).to.be.true() + expect(stats.dataSent.eq(0)).to.be.true() + expect(stats.providesBufferLength.eq(0)).to.be.true() + expect(stats.wantListLength.eq(0)).to.be.true() + expect(stats.peerCount.eq(1)).to.be.true() + + // test moving averages + const movingAverages = bs.stat().movingAverages + const blocksReceivedMA = movingAverages.blocksReceived + expectedTimeWindows.forEach((timeWindow) => { + expect(blocksReceivedMA).to.have.property(timeWindow) + const ma = blocksReceivedMA[timeWindow] + expect(ma.movingAverage()).to.be.above(0) + expect(ma.variance()).to.be.above(0) + }) + + const dataReceivedMA = movingAverages.dataReceived + expectedTimeWindows.forEach((timeWindow) => { + expect(dataReceivedMA).to.have.property(timeWindow) + const ma = dataReceivedMA[timeWindow] + expect(ma.movingAverage()).to.be.above(0) + expect(ma.variance()).to.be.above(0) + }) + done() + }) + + const other = ids[1] + + const msg = new Message(false) + blocks.forEach((block) => msg.addBlock(block)) + + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist() + }) + }) + + it('updates duplicate blocks counters', (done) => { + bs.stat().once('update', (stats) => { + expect(stats.blocksReceived.eq(4)).to.be.true() + expect(stats.dataReceived.eq(192)).to.be.true() + expect(stats.dupBlksReceived.eq(2)).to.be.true() + expect(stats.dupDataReceived.eq(96)).to.be.true() + expect(stats.blocksSent.eq(0)).to.be.true() + expect(stats.dataSent.eq(0)).to.be.true() + expect(stats.providesBufferLength.eq(0)).to.be.true() + done() + }) + + const other = ids[1] + + const msg = new Message(false) + blocks.forEach((block) => msg.addBlock(block)) + + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist() + }) + }) + + describe('connected to another bitswap', () => { + let bs2 + let block + + before((done) => { + eachOf( + libp2pNodes, + (node, i, cb) => node.dial(libp2pNodes[(i + 1) % nodes.length].peerInfo, cb), + done) + }) + + before((done) => { + bs2 = bitswaps[1] + bs2.start(done) + }) + + after((done) => { + bs2.stop(done) + }) + + before((done) => { + makeBlock((err, _block) => { + expect(err).to.not.exist() + expect(_block).to.exist() + block = _block + done() + }) + }) + + before((done) => { + bs.put(block, done) + }) + + it('updates stats on transfer', (done) => { + const finish = orderedFinish(2, done) + bs.stat().once('update', (stats) => { + expect(stats.blocksReceived.eq(4)).to.be.true() + expect(stats.dataReceived.eq(192)).to.be.true() + expect(stats.dupBlksReceived.eq(2)).to.be.true() + expect(stats.dupDataReceived.eq(96)).to.be.true() + expect(stats.blocksSent.eq(1)).to.be.true() + expect(stats.dataSent.eq(48)).to.be.true() + expect(stats.providesBufferLength.eq(0)).to.be.true() + expect(stats.wantListLength.eq(0)).to.be.true() + expect(stats.peerCount.eq(2)).to.be.true() + finish(2) + }) + + bs2.get(block.cid, (err, block) => { + expect(err).to.not.exist() + expect(block).to.exist() + finish(1) + }) + }) + }) +}) diff --git a/test/node.js b/test/node.js index 02daf18b..f4503726 100644 --- a/test/node.js +++ b/test/node.js @@ -2,6 +2,7 @@ require('./bitswap.js') require('./bitswap-mock-internals.js') +require('./bitswap-stats.js') require('./decision-engine/decision-engine') require('./network/network.node.js') require('./network/gen-bitswap-network.node.js') diff --git a/test/utils/mocks.js b/test/utils/mocks.js index eea8c36e..debb1124 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -13,6 +13,7 @@ const PeerBook = require('peer-book') const Node = require('./create-libp2p-node').bundle const os = require('os') const Repo = require('ipfs-repo') +const EventEmitter = require('events') const Bitswap = require('../../src') @@ -22,9 +23,10 @@ const Bitswap = require('../../src') exports.mockLibp2pNode = () => { const peerInfo = new PeerInfo(PeerId.createFromHexString('122019318b6e5e0cf93a2314bf01269a2cc23cd3dcd452d742cdb9379d8646f6e4a9')) - return { + return Object.assign(new EventEmitter(), { peerInfo: peerInfo, handle () {}, + unhandle () {}, contentRouting: { provide: (cid, callback) => callback(), findProviders: (cid, timeout, callback) => callback(null, []) @@ -38,7 +40,7 @@ exports.mockLibp2pNode = () => { setMaxListeners () {} }, peerBook: new PeerBook() - } + }) } /* From 519e2a9f05937ca6fbcede2b10a1a1e3adcc3eb3 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Fri, 15 Dec 2017 09:47:53 +0000 Subject: [PATCH 14/24] test: port go tests (#159) * getMany: guaranteed ordering * tests: swarm, humble beginnings * chore: added test for when no cid is passed in * chore: more swarm tests * tests: serializing interconnections on swarm tests * more swarm tests enabled * tests: removed some onlys * tests: serializing interconnections on swarm tests * tests: added double get test * chore: added test for double get * one less callback function * chore: update libp2p version to latest * chore: update remaining deps --- package.json | 16 +++-- src/index.js | 112 +++++++++++++------------------- test/bitswap-mock-internals.js | 38 ++++++++++- test/node.js | 1 + test/swarms.js | 33 ++++++++++ test/utils/connect-all.js | 15 +++++ test/utils/create-bitswap.js | 26 ++++++++ test/utils/distribution-test.js | 75 +++++++++++++++++++++ 8 files changed, 240 insertions(+), 76 deletions(-) create mode 100644 test/swarms.js create mode 100644 test/utils/connect-all.js create mode 100644 test/utils/create-bitswap.js create mode 100644 test/utils/distribution-test.js diff --git a/package.json b/package.json index d7d670be..87733742 100644 --- a/package.json +++ b/package.json @@ -41,18 +41,20 @@ "benchmark": "^2.1.4", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "ipfs-repo": "~0.18.3", - "libp2p": "~0.13.1", + "ipfs-repo": "~0.18.5", + "libp2p": "~0.14.3", "libp2p-kad-dht": "~0.6.0", - "libp2p-multiplex": "~0.5.0", + "libp2p-multiplex": "~0.5.1", "libp2p-secio": "~0.8.1", "libp2p-tcp": "~0.11.1", "lodash": "^4.17.4", + "lodash.range": "^3.2.0", + "lodash.without": "^4.4.0", "multiaddr": "^3.0.1", "ncp": "^2.0.0", - "peer-book": "~0.5.1", - "peer-id": "~0.10.2", - "peer-info": "~0.11.1", + "peer-book": "~0.5.2", + "peer-id": "~0.10.3", + "peer-info": "~0.11.4", "pre-commit": "^1.2.2", "rimraf": "^2.6.2", "safe-buffer": "^5.1.1" @@ -75,7 +77,7 @@ "moving-average": "^1.0.0", "multicodec": "~0.2.5", "multihashing-async": "~0.4.7", - "protons": "^1.0.0", + "protons": "^1.0.1", "pull-defer": "~0.2.2", "pull-length-prefixed": "^1.3.0", "pull-pushable": "^2.1.1", diff --git a/src/index.js b/src/index.js index 522fbcf6..b253557d 100644 --- a/src/index.js +++ b/src/index.js @@ -5,7 +5,6 @@ const reject = require('async/reject') const each = require('async/each') const series = require('async/series') const map = require('async/map') -const once = require('once') const WantManager = require('./want-manager') const Network = require('./network') @@ -211,78 +210,57 @@ class Bitswap { * @returns {void} */ getMany (cids, callback) { - const retrieved = [] - const locals = [] - const missing = [] - const canceled = [] - - const finish = once(() => { - map(locals, (cid, cb) => { - this.blockstore.get(cid, cb) - }, (err, localBlocks) => { - if (err) { - return callback(err) + let pendingStart = cids.length + const wantList = [] + let promptedNetwork = false + + const getFromOutside = (cid, cb) => { + wantList.push(cid) + + this.notifications.wantBlock( + cid, + // called on block receive + (block) => { + this.wm.cancelWants([cid]) + cb(null, block) + }, + // called on unwant + () => { + this.wm.cancelWants([cid]) + cb(null, undefined) } + ) - callback(null, localBlocks.concat(retrieved)) - }) - }) - - this._log('getMany', cids.length) - - const addListeners = (cids) => { - cids.forEach((cid) => { - this.notifications.wantBlock( - cid, - // called on block receive - (block) => { - this.wm.cancelWants([cid]) - retrieved.push(block) - - if (retrieved.length === missing.length) { - finish() - } - }, - // called on unwant - () => { - this.wm.cancelWants([cid]) - canceled.push(cid) - if (canceled.length + retrieved.length === missing.length) { - finish() - } - } - ) - }) + if (!pendingStart) { + this.wm.wantBlocks(wantList) + } } - each(cids, (cid, cb) => { - this.blockstore.has(cid, (err, has) => { - if (err) { - return cb(err) - } - - if (has) { - locals.push(cid) - } else { - missing.push(cid) - } - cb() - }) - }, () => { - if (missing.length === 0) { - // already finished - finish() - } + map(cids, (cid, cb) => { + waterfall( + [ + (cb) => this.blockstore.has(cid, cb), + (has, cb) => { + pendingStart-- + if (has) { + return this.blockstore.get(cid, cb) + } - addListeners(missing) - this.wm.wantBlocks(missing) + if (!promptedNetwork) { + promptedNetwork = true + this.network.findAndConnect(cids[0], (err) => { + if (err) { + this._log.error(err) + } + }) + } - this.network.findAndConnect(cids[0], (err) => { - if (err) { - this._log.error(err) - } - }) - }) + // we don't have the block here + getFromOutside(cid, cb) + } + ], + cb) + }, callback) } // removes the given cids from the wantlist independent of any ref counts diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index f040a815..24f91c17 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -49,7 +49,9 @@ describe('bitswap with mocks', function () { }) }) - after((done) => repo.teardown(done)) + after((done) => { + repo.teardown(done) + }) describe('receive message', () => { it('simple block message', (done) => { @@ -151,6 +153,15 @@ describe('bitswap with mocks', function () { }) describe('get', () => { + it('fails on requesting empty block', (done) => { + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + bs.get(null, (err, res) => { + expect(err).to.exist() + expect(err.message).to.equal('Not a valid cid') + done() + }) + }) + it('block exists locally', (done) => { const block = blocks[4] @@ -323,6 +334,29 @@ describe('bitswap with mocks', function () { ], done) }) }) + + it('double get', (done) => { + const block = blocks[11] + + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + parallel( + [ + (cb) => bs.get(block.cid, cb), + (cb) => bs.get(block.cid, cb) + ], + (err, res) => { + expect(err).to.not.exist() + expect(res[0]).to.eql(block) + expect(res[1]).to.eql(block) + done() + } + ) + + bs.put(block, (err) => { + expect(err).to.not.exist() + }) + }) }) describe('unwant', () => { @@ -330,7 +364,7 @@ describe('bitswap with mocks', function () { const bs = new Bitswap(mockLibp2pNode(), repo.blocks) bs.start((err) => { expect(err).to.not.exist() - const b = blocks[11] + const b = blocks[12] let counter = 0 const check = (err, res) => { diff --git a/test/node.js b/test/node.js index f4503726..458b2310 100644 --- a/test/node.js +++ b/test/node.js @@ -6,3 +6,4 @@ require('./bitswap-stats.js') require('./decision-engine/decision-engine') require('./network/network.node.js') require('./network/gen-bitswap-network.node.js') +require('./swarms.js') diff --git a/test/swarms.js b/test/swarms.js new file mode 100644 index 00000000..a19ff045 --- /dev/null +++ b/test/swarms.js @@ -0,0 +1,33 @@ +'use strict' + +/* eslint-env mocha */ + +const distributionTest = require('./utils/distribution-test') +const test = it + +describe('swarms', () => { + test('2 nodes, 2 blocks', function (done) { + this.timeout(10 * 1000) + distributionTest(2, 2, done) + }) + + test('10 nodes, 2 blocks', function (done) { + this.timeout(30 * 1000) + distributionTest(10, 2, done) + }) + + test('50 nodes, 2 blocks', function (done) { + this.timeout(300 * 1000) + distributionTest(50, 2, done) + }) + + test.skip('100 nodes, 2 blocks', function (done) { + this.timeout(300 * 1000) + distributionTest(100, 2, done) + }) + + test('10 nodes, 100 blocks', function (done) { + this.timeout(300 * 1000) + distributionTest(10, 100, done) + }) +}) diff --git a/test/utils/connect-all.js b/test/utils/connect-all.js new file mode 100644 index 00000000..cb9c7bc6 --- /dev/null +++ b/test/utils/connect-all.js @@ -0,0 +1,15 @@ +'use strict' + +const eachSeries = require('async/eachSeries') +const without = require('lodash.without') + +module.exports = (nodes, callback) => { + eachSeries(nodes, (node, cb) => { + eachSeries( + without(nodes, node), + (otherNode, cb) => { + node.libp2pNode.dial(otherNode.bitswap.peerInfo, cb) + }, + cb) + }, callback) +} diff --git a/test/utils/create-bitswap.js b/test/utils/create-bitswap.js new file mode 100644 index 00000000..332f529a --- /dev/null +++ b/test/utils/create-bitswap.js @@ -0,0 +1,26 @@ +'use strict' + +const waterfall = require('async/waterfall') + +const Bitswap = require('../..') +const createTempRepo = require('./create-temp-repo-nodejs') +const createLibp2pNode = require('./create-libp2p-node') + +module.exports = (callback) => { + waterfall([ + (cb) => createTempRepo(cb), + (repo, cb) => { + createLibp2pNode({ + DHT: repo.datastore + }, (err, node) => cb(err, repo, node)) + }, + (repo, libp2pNode, cb) => { + const bitswap = new Bitswap(libp2pNode, repo.blocks) + bitswap.start((err) => cb(err, { + bitswap: bitswap, + repo: repo, + libp2pNode: libp2pNode + })) + } + ], callback) +} diff --git a/test/utils/distribution-test.js b/test/utils/distribution-test.js new file mode 100644 index 00000000..dc9efc59 --- /dev/null +++ b/test/utils/distribution-test.js @@ -0,0 +1,75 @@ +'use strict' + +const range = require('lodash.range') +const map = require('async/map') +const each = require('async/each') +const parallel = require('async/parallel') +const series = require('async/series') +const waterfall = require('async/waterfall') +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect + +const createBitswap = require('./create-bitswap') +const makeBlock = require('./make-block') +const connectAll = require('./connect-all') + +module.exports = (instanceCount, blockCount, callback) => { + let nodes + let blocks + + waterfall([ + (cb) => parallel( + { + nodes: (cb) => map(range(instanceCount), (_, cb) => createBitswap(cb), cb), + blocks: (cb) => map(range(blockCount), (_, cb) => makeBlock(cb), cb) + }, + cb), + (results, cb) => { + nodes = results.nodes + blocks = results.blocks + const first = nodes[0] + + parallel([ + (cb) => connectAll(results.nodes, cb), + (cb) => each(results.blocks, first.bitswap.put.bind(first.bitswap), cb) + ], cb) + }, + (results, cb) => { + const cids = blocks.map((block) => block.cid) + map(nodes, (node, cb) => node.bitswap.getMany(cids, cb), cb) + }, + (results, cb) => { + try { + expect(results).have.lengthOf(instanceCount) + results.forEach((nodeBlocks) => { + expect(nodeBlocks).to.have.lengthOf(blocks.length) + nodeBlocks.forEach((block, i) => { + expect(block.data).to.deep.equal(blocks[i].data) + }) + }) + } catch (err) { + return cb(err) + } + cb() + } + ], + (err) => { + each( + nodes, + (node, cb) => { + series( + [ + (cb) => node.bitswap.stop(cb), + (cb) => node.libp2pNode.stop(cb), + (cb) => node.repo.teardown(cb) + ], + cb + ) + }, + (err2) => { + callback(err) + } + ) + }) +} From 095f467be0a1da6956a88f0a23e60577276bb6cb Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 15 Dec 2017 09:50:39 +0000 Subject: [PATCH 15/24] chore: update contributors --- package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 87733742..c1c9d68c 100644 --- a/package.json +++ b/package.json @@ -100,6 +100,7 @@ "dmitriy ryajov ", "greenkeeper[bot] ", "greenkeeperio-bot ", - "npmcdn-to-unpkg-bot " + "npmcdn-to-unpkg-bot ", + "ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ " ] } From 349b3bce459796845587c6334606803bd5570e46 Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 15 Dec 2017 09:50:39 +0000 Subject: [PATCH 16/24] chore: release version v0.18.0 --- CHANGELOG.md | 10 ++++++++++ package.json | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56187d4e..2eb8bea1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ + +# [0.18.0](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.4...v0.18.0) (2017-12-15) + + +### Features + +* stats improvements ([#158](https://github.com/ipfs/js-ipfs-bitswap/issues/158)) ([17e15d0](https://github.com/ipfs/js-ipfs-bitswap/commit/17e15d0)) + + + ## [0.17.4](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.3...v0.17.4) (2017-11-10) diff --git a/package.json b/package.json index c1c9d68c..2f6e73ea 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ipfs-bitswap", - "version": "0.17.4", + "version": "0.18.0", "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", "main": "src/index.js", "browser": { From 8e91def24399c33ee9dc4a724d826546ee81f1d3 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Sun, 28 Jan 2018 03:00:24 +0000 Subject: [PATCH 17/24] fix: getMany: ensuring we set the want list (#162) * fix: getMany: ensuring we set the want list independently of having all blocks or not * bitswap stats tests: not requiring specific order to finish * increasing the timeouts of swarm tests --- src/index.js | 3 +++ test/bitswap-stats.js | 8 ++++---- test/swarms.js | 6 +++--- test/utils/helpers.js | 13 +++++++++++++ 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/index.js b/src/index.js index b253557d..e516c9b8 100644 --- a/src/index.js +++ b/src/index.js @@ -243,6 +243,9 @@ class Bitswap { (has, cb) => { pendingStart-- if (has) { + if (!pendingStart) { + this.wm.wantBlocks(wantList) + } return this.blockstore.get(cid, cb) } diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index 44b6e28a..bdba8e62 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -17,7 +17,7 @@ const Bitswap = require('../src') const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') -const orderedFinish = require('./utils/helpers').orderedFinish +const countToFinish = require('./utils/helpers').countToFinish const expectedStats = [ 'blocksReceived', @@ -220,7 +220,7 @@ describe('bitswap stats', () => { }) it('updates stats on transfer', (done) => { - const finish = orderedFinish(2, done) + const finish = countToFinish(2, done) bs.stat().once('update', (stats) => { expect(stats.blocksReceived.eq(4)).to.be.true() expect(stats.dataReceived.eq(192)).to.be.true() @@ -231,13 +231,13 @@ describe('bitswap stats', () => { expect(stats.providesBufferLength.eq(0)).to.be.true() expect(stats.wantListLength.eq(0)).to.be.true() expect(stats.peerCount.eq(2)).to.be.true() - finish(2) + finish() }) bs2.get(block.cid, (err, block) => { expect(err).to.not.exist() expect(block).to.exist() - finish(1) + finish() }) }) }) diff --git a/test/swarms.js b/test/swarms.js index a19ff045..b88f511f 100644 --- a/test/swarms.js +++ b/test/swarms.js @@ -17,17 +17,17 @@ describe('swarms', () => { }) test('50 nodes, 2 blocks', function (done) { - this.timeout(300 * 1000) + this.timeout(600 * 1000) distributionTest(50, 2, done) }) test.skip('100 nodes, 2 blocks', function (done) { - this.timeout(300 * 1000) + this.timeout(600 * 1000) distributionTest(100, 2, done) }) test('10 nodes, 100 blocks', function (done) { - this.timeout(300 * 1000) + this.timeout(600 * 1000) distributionTest(10, 100, done) }) }) diff --git a/test/utils/helpers.js b/test/utils/helpers.js index 9e3d897e..98a1be5d 100644 --- a/test/utils/helpers.js +++ b/test/utils/helpers.js @@ -16,3 +16,16 @@ exports.orderedFinish = (n, callback) => { } } } + +exports.countToFinish = (n, callback) => { + let pending = n + + return () => { + pending-- + if (pending === 0) { + callback() + } else if (pending < 0) { + callback(new Error('too many finishes, expected only ' + n)) + } + } +} From ff978d0c5fd186a3f80432d53fd1df9d1ffc2ff2 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Tue, 6 Feb 2018 11:21:22 +0100 Subject: [PATCH 18/24] feat: per-peer stats (#166) * docs: current stats API documented * per peer stats --- README.md | 88 +++++++++++++++++++++++++++++ src/decision-engine/index.js | 2 +- src/index.js | 14 +++-- src/network.js | 9 +-- src/stats/index.js | 99 +++++++++++++++++++++++++++++++++ src/{stats.js => stats/stat.js} | 26 +++------ src/types/wantlist/index.js | 4 +- test/bitswap-stats.js | 22 ++++++++ 8 files changed, 232 insertions(+), 32 deletions(-) create mode 100644 src/stats/index.js rename src/{stats.js => stats/stat.js} (86%) diff --git a/README.md b/README.md index 53dc3449..25758ebd 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,94 @@ Loading this module through a script tag will make the `IpfsBitswap` object avai See https://ipfs.github.io/js-ipfs-bitswap +### Stats + +```js +const bitswapNode = // ... + +const stats = bitswapNode.stat() +``` + +Stats contains a snapshot accessor, a moving average acessor and a peer accessor. + +Besides that, it emits "update" events every time it is updated. + +```js +stats.on('update', (stats) => { + console.log('latest stats snapshot: %j', stats) +}) +``` + +#### Peer accessor: + +You can get the stats for a specific peer by doing: + +```js +const peerStats = stats.forPeer(peerId) +``` + +The returned object behaves like the root stats accessor (has a snapshot, a moving average accessors and is an event emitter). + +#### Global snapshot accessor: + +```js +const snapshot = stats.snapshot +console.log('stats: %j', snapshot) +``` + +the snapshot will contain the following keys, with the values being [Big.js](https://github.com/MikeMcl/big.js#readme) instances: + +```js +// stats: { +// "dataReceived":"96", +// "blocksReceived":"2", +// "dataReceived":"96", +// "dupBlksReceived":"0", +// "dupDataReceived":"0", +// "blocksSent":"0", +// "dataSent":"0", +// "providesBufferLength":"0", +// "wantListLength":"0", +// "peerCount":"1" +// } +``` + +#### Moving average accessor: + +```js +const movingAverages = stats.movingAverages +``` + +This object contains these properties: + +* 'blocksReceived', +* 'dataReceived', +* 'dupBlksReceived', +* 'dupDataReceived', +* 'blocksSent', +* 'dataSent', +* 'providesBufferLength', +* 'wantListLength', +* 'peerCount' + +```js +const dataReceivedMovingAverages = movingAverages.dataReceived +``` + +Each one of these will contain one key per interval (miliseconds), being the default intervals defined: + +* 60000 (1 minute) +* 300000 (5 minutes) +* 900000 (15 minutes) + +You can then select one of them + +```js +const oneMinuteDataReceivedMovingAverages = dataReceivedMovingAverages[60000] +``` + +This object will be a [movingAverage](https://github.com/pgte/moving-average#readme) instance. + ## Development ### Structure diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index 54acdfb2..ffd0827b 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -269,7 +269,7 @@ class DecisionEngine { this.ledgerMap.set(peerIdStr, l) if (this._stats) { - this._stats.push('peerCount', 1) + this._stats.push(peerIdStr, 'peerCount', 1) } return l diff --git a/src/index.js b/src/index.js index e516c9b8..cee77cb8 100644 --- a/src/index.js +++ b/src/index.js @@ -106,7 +106,7 @@ class Bitswap { waterfall([ (cb) => this.blockstore.has(block.cid, cb), (has, cb) => { - this._updateReceiveCounters(block, has) + this._updateReceiveCounters(peerId.toB58String(), block, has) if (has) { return cb() } @@ -116,13 +116,13 @@ class Bitswap { ], callback) } - _updateReceiveCounters (block, exists) { - this._stats.push('blocksReceived', 1) - this._stats.push('dataReceived', block.data.length) + _updateReceiveCounters (peerId, block, exists) { + this._stats.push(peerId, 'blocksReceived', 1) + this._stats.push(peerId, 'dataReceived', block.data.length) if (exists) { - this._stats.push('dupBlksReceived', 1) - this._stats.push('dupDataReceived', block.data.length) + this._stats.push(peerId, 'dupBlksReceived', 1) + this._stats.push(peerId, 'dupDataReceived', block.data.length) } } @@ -140,6 +140,7 @@ class Bitswap { _onPeerDisconnected (peerId) { this.wm.disconnected(peerId) this.engine.peerDisconnected(peerId) + this._stats.disconnected(peerId) } _putBlock (block, callback) { @@ -389,6 +390,7 @@ class Bitswap { * @returns {void} */ stop (callback) { + this._stats.stop() series([ (cb) => this.wm.stop(cb), (cb) => this.network.stop(cb), diff --git a/src/network.js b/src/network.js index f3d410dd..6b603a25 100644 --- a/src/network.js +++ b/src/network.js @@ -150,7 +150,7 @@ class Network { } }) callback() - this._updateSentStats(msg.blocks) + this._updateSentStats(peer, msg.blocks) }) } @@ -179,10 +179,11 @@ class Network { }) } - _updateSentStats (blocks) { + _updateSentStats (peer, blocks) { + const peerId = peer.toB58String() if (this._stats) { - blocks.forEach((block) => this._stats.push('dataSent', block.data.length)) - this._stats.push('blocksSent', blocks.size) + blocks.forEach((block) => this._stats.push(peerId, 'dataSent', block.data.length)) + this._stats.push(peerId, 'blocksSent', blocks.size) } } } diff --git a/src/stats/index.js b/src/stats/index.js new file mode 100644 index 00000000..b364784d --- /dev/null +++ b/src/stats/index.js @@ -0,0 +1,99 @@ +'use strict' + +const EventEmitter = require('events') +const Stat = require('./stat') + +const defaultOptions = { + movingAverageIntervals: [ + 60 * 1000, // 1 minute + 5 * 60 * 1000, // 5 minutes + 15 * 60 * 1000 // 15 minutes + ] +} + +class Stats extends EventEmitter { + constructor (initialCounters, _options) { + super() + + const options = Object.assign({}, defaultOptions, _options) + + if (typeof options.computeThrottleTimeout !== 'number') { + throw new Error('need computeThrottleTimeout') + } + + if (typeof options.computeThrottleMaxQueueSize !== 'number') { + throw new Error('need computeThrottleMaxQueueSize') + } + + this._initialCounters = initialCounters + this._options = options + this._enabled = this._options.enabled + + this._global = new Stat(initialCounters, options) + this._global.on('update', (stats) => this.emit('update', stats)) + + this._peers = new Map() + } + + enable () { + this._enabled = true + this._options.enabled = true + this._global.enable() + } + + disable () { + this._enabled = false + this._options.enabled = false + this._global.disable() + } + + stop () { + this._enabled = false + this._global.stop() + for (let peerStat of this._peers) { + peerStat[1].stop() + } + } + + get snapshot () { + return this._global.snapshot + } + + get movingAverages () { + return this._global.movingAverages + } + + forPeer (peerId) { + if (peerId.toB58String) { + peerId = peerId.toB58String() + } + return this._peers.get(peerId) + } + + push (peer, counter, inc) { + if (this._enabled) { + this._global.push(counter, inc) + + if (peer) { + let peerStats = this._peers.get(peer) + if (!peerStats) { + peerStats = new Stat(this._initialCounters, this._options) + this._peers.set(peer, peerStats) + } + + peerStats.push(counter, inc) + } + } + } + + disconnected (peer) { + const peerId = peer.toB58String() + const peerStats = this._peers.get(peerId) + if (peerStats) { + peerStats.stop() + this._peers.delete(peerId) + } + } +} + +module.exports = Stats diff --git a/src/stats.js b/src/stats/stat.js similarity index 86% rename from src/stats.js rename to src/stats/stat.js index bccd62cf..66a22a71 100644 --- a/src/stats.js +++ b/src/stats/stat.js @@ -4,28 +4,10 @@ const EventEmitter = require('events') const Big = require('big.js') const MovingAverage = require('moving-average') -const defaultOptions = { - movingAverageIntervals: [ - 60 * 1000, // 1 minute - 5 * 60 * 1000, // 5 minutes - 15 * 60 * 1000 // 15 minutes - ] -} - class Stats extends EventEmitter { - constructor (initialCounters, _options) { + constructor (initialCounters, options) { super() - const options = Object.assign({}, defaultOptions, _options) - - if (typeof options.computeThrottleTimeout !== 'number') { - throw new Error('need computeThrottleTimeout') - } - - if (typeof options.computeThrottleMaxQueueSize !== 'number') { - throw new Error('need computeThrottleMaxQueueSize') - } - this._options = options this._queue = [] this._stats = {} @@ -56,6 +38,12 @@ class Stats extends EventEmitter { this._disabled = true } + stop () { + if (this._timeout) { + clearTimeout(this._timeout) + } + } + get snapshot () { return Object.assign({}, this._stats) } diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js index 3459c8e3..f24682d0 100644 --- a/src/types/wantlist/index.js +++ b/src/types/wantlist/index.js @@ -23,7 +23,7 @@ class Wantlist { } else { this.set.set(cidStr, new Entry(cid, priority)) if (this._stats) { - this._stats.push('wantListSize', 1) + this._stats.push(null, 'wantListSize', 1) } } } @@ -45,7 +45,7 @@ class Wantlist { this.set.delete(cidStr) if (this._stats) { - this._stats.push('wantListSize', -1) + this._stats.push(null, 'wantListSize', -1) } } diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js index bdba8e62..bd459118 100644 --- a/test/bitswap-stats.js +++ b/test/bitswap-stats.js @@ -240,5 +240,27 @@ describe('bitswap stats', () => { finish() }) }) + + it('has peer stats', (done) => { + const peerIds = libp2pNodes.map((node) => node.peerInfo.id.toB58String()) + const peerStats = bs2.stat().forPeer(peerIds[0]) + peerStats.once('update', (stats) => { + expect(stats.blocksReceived.eq(1)).to.be.true() + expect(stats.dataReceived.eq(48)).to.be.true() + expect(stats.dupBlksReceived.eq(0)).to.be.true() + expect(stats.dupDataReceived.eq(0)).to.be.true() + expect(stats.blocksSent.eq(0)).to.be.true() + expect(stats.dataSent.eq(0)).to.be.true() + expect(stats.providesBufferLength.eq(0)).to.be.true() + expect(stats.wantListLength.eq(0)).to.be.true() + expect(stats.peerCount.eq(1)).to.be.true() + + const ma = peerStats.movingAverages.dataReceived[60000] + expect(ma.movingAverage()).to.be.above(0) + expect(ma.variance()).to.be.above(0) + + done() + }) + }) }) }) From b3490852d53ac309abf5aeb202109e977f24ae84 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Tue, 6 Feb 2018 11:22:29 +0100 Subject: [PATCH 19/24] feat: added getMany performance tests (#164) --- .gitignore | 2 + README.md | 29 +++++ package.json | 6 +- test/benchmarks/get-many.js | 12 +++ .../benchmarks/helpers/print-swarm-results.js | 36 +++++++ test/swarms.js | 59 +++++++++- test/utils/distribution-test.js | 102 +++++++++++------- 7 files changed, 203 insertions(+), 43 deletions(-) create mode 100644 test/benchmarks/get-many.js create mode 100644 test/benchmarks/helpers/print-swarm-results.js diff --git a/.gitignore b/.gitignore index 9547fedb..d23420d9 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ test/test-repo-for* docs test/test-repo/datastore + +*.flamegraph \ No newline at end of file diff --git a/README.md b/README.md index 25758ebd..49f9f1b5 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,35 @@ src └── index.js ``` +## Performance tests + +You can run performance tests like this: + +``` +$ npm run benchmarks +``` + +### Profiling + +You can run each of the individual performance tests with a profiler like 0x. + +To do that, you need to install 0x: + +```bash +$ npm install 0x --global +``` + +And then run the test: + +```bash +$ 0x test/benchmarks/get-many +``` + +This will output a flame graph and print the location for it. +Use the browser Chrome to open and inspect the generated graph. + +![Flame graph](https://ipfs.io/ipfs/QmVbyLgYfkLewNtzTAFwAEMmP2hTJgs8sSqsRTBNBjyQ1y) + ## Contribute Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-bitswap/issues)! diff --git a/package.json b/package.json index 2f6e73ea..3d1bdcb1 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,8 @@ "bench": "node benchmarks/index", "build": "aegir build", "coverage": "aegir coverage --provider codecov", - "docs": "aegir docs" + "docs": "aegir docs", + "benchmarks": "node test/benchmarks/get-many" }, "repository": { "type": "git", @@ -57,7 +58,8 @@ "peer-info": "~0.11.4", "pre-commit": "^1.2.2", "rimraf": "^2.6.2", - "safe-buffer": "^5.1.1" + "safe-buffer": "^5.1.1", + "stats-lite": "^2.1.0" }, "dependencies": { "async": "^2.6.0", diff --git a/test/benchmarks/get-many.js b/test/benchmarks/get-many.js new file mode 100644 index 00000000..40aa1c8d --- /dev/null +++ b/test/benchmarks/get-many.js @@ -0,0 +1,12 @@ +'use strict' + +const distributionTest = require('../utils/distribution-test') +const print = require('./helpers/print-swarm-results') + +print('10 nodes, 10 blocks, 5 iterations', distributionTest(10, 10, 5, (err) => { + if (err) { + throw err + } + + console.log('Finished. Can kill now...') +})) diff --git a/test/benchmarks/helpers/print-swarm-results.js b/test/benchmarks/helpers/print-swarm-results.js new file mode 100644 index 00000000..60ea7b1b --- /dev/null +++ b/test/benchmarks/helpers/print-swarm-results.js @@ -0,0 +1,36 @@ +'use strict' + +const stats = require('stats-lite') + +module.exports = (suite, emitter) => { + const elapseds = [] + emitter.once('start', () => { + console.log('\n------------------------') + console.log(suite) + console.log('started') + }) + emitter.once('all connected', () => { + console.log('all nodes connected to each other') + }) + emitter.on('getting many', () => { + process.stdout.write('.') + }) + emitter.once('stop', () => { + console.log('\nstopping') + }) + emitter.once('stopped', () => { + console.log('stopped') + console.log('stats:') + console.log('---------') + console.log('mean: %s', stats.mean(elapseds)) + console.log('median: %s', stats.median(elapseds)) + console.log('variance: %s', stats.variance(elapseds)) + console.log('standard deviation: %s', stats.stdev(elapseds)) + console.log('85th percentile: %s', stats.percentile(elapseds, 0.85)) + }) + + emitter.on('got block', (elapsed) => { + process.stdout.write('+') + elapseds.push(elapsed) + }) +} diff --git a/test/swarms.js b/test/swarms.js index b88f511f..074b90c1 100644 --- a/test/swarms.js +++ b/test/swarms.js @@ -2,32 +2,81 @@ /* eslint-env mocha */ +const stats = require('stats-lite') const distributionTest = require('./utils/distribution-test') const test = it describe('swarms', () => { + const print = Boolean(process.env.PRINT) + + after(() => { + process.exit() + }) + test('2 nodes, 2 blocks', function (done) { this.timeout(10 * 1000) - distributionTest(2, 2, done) + maybePrint('2 nodes, 2 blocks', distributionTest(2, 2, done)) }) test('10 nodes, 2 blocks', function (done) { this.timeout(30 * 1000) - distributionTest(10, 2, done) + maybePrint('10 nodes, 2 blocks', distributionTest(10, 2, done)) + }) + + test.only('10 nodes, 10 blocks', function (done) { + this.timeout(30 * 1000) + maybePrint('10 nodes, 10 blocks', distributionTest(10, 10, 1, done)) + }) + + test('10 nodes, 20 blocks', function (done) { + this.timeout(30 * 1000) + maybePrint('10 nodes, 20 blocks', distributionTest(10, 20, done)) }) test('50 nodes, 2 blocks', function (done) { this.timeout(600 * 1000) - distributionTest(50, 2, done) + maybePrint('50 nodes, 2 blocks', distributionTest(50, 2, done)) }) test.skip('100 nodes, 2 blocks', function (done) { this.timeout(600 * 1000) - distributionTest(100, 2, done) + maybePrint('100 nodes, 2 blocks', distributionTest(100, 2, done)) }) test('10 nodes, 100 blocks', function (done) { this.timeout(600 * 1000) - distributionTest(10, 100, done) + maybePrint('10 nodes, 100 blocks', distributionTest(10, 100, done)) }) + + function maybePrint (suite, emitter) { + if (!print) { + return + } + const elapseds = [] + emitter.once('start', () => { + console.log('\n------------------------') + console.log(suite) + console.log('started') + }) + emitter.once('all connected', () => { + console.log('all nodes connected to each other') + }) + emitter.once('stop', () => { + console.log('stopping') + }) + emitter.once('stopped', () => { + console.log('stopped') + console.log('stats:') + console.log('---------') + console.log('mean: %s', stats.mean(elapseds)) + console.log('median: %s', stats.median(elapseds)) + console.log('variance: %s', stats.variance(elapseds)) + console.log('standard deviation: %s', stats.stdev(elapseds)) + console.log('85th percentile: %s', stats.percentile(elapseds, 0.85)) + }) + + emitter.on('got block', (elapsed) => { + elapseds.push(elapsed) + }) + } }) diff --git a/test/utils/distribution-test.js b/test/utils/distribution-test.js index dc9efc59..3f27fc36 100644 --- a/test/utils/distribution-test.js +++ b/test/utils/distribution-test.js @@ -3,58 +3,86 @@ const range = require('lodash.range') const map = require('async/map') const each = require('async/each') -const parallel = require('async/parallel') +const whilst = require('async/whilst') const series = require('async/series') const waterfall = require('async/waterfall') const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect +const EventEmitter = require('events') const createBitswap = require('./create-bitswap') const makeBlock = require('./make-block') const connectAll = require('./connect-all') -module.exports = (instanceCount, blockCount, callback) => { +module.exports = (instanceCount, blockCount, repeats, callback) => { + let pendingRepeats = repeats let nodes - let blocks + const events = new EventEmitter() waterfall([ - (cb) => parallel( - { - nodes: (cb) => map(range(instanceCount), (_, cb) => createBitswap(cb), cb), - blocks: (cb) => map(range(blockCount), (_, cb) => makeBlock(cb), cb) - }, - cb), - (results, cb) => { - nodes = results.nodes - blocks = results.blocks - const first = nodes[0] - - parallel([ - (cb) => connectAll(results.nodes, cb), - (cb) => each(results.blocks, first.bitswap.put.bind(first.bitswap), cb) - ], cb) + (cb) => map(range(instanceCount), (_, cb) => createBitswap(cb), cb), + (_nodes, cb) => { + nodes = _nodes + events.emit('start') + cb() }, - (results, cb) => { - const cids = blocks.map((block) => block.cid) - map(nodes, (node, cb) => node.bitswap.getMany(cids, cb), cb) + (cb) => { + connectAll(nodes, cb) }, - (results, cb) => { - try { - expect(results).have.lengthOf(instanceCount) - results.forEach((nodeBlocks) => { - expect(nodeBlocks).to.have.lengthOf(blocks.length) - nodeBlocks.forEach((block, i) => { - expect(block.data).to.deep.equal(blocks[i].data) - }) - }) - } catch (err) { - return cb(err) - } - cb() + (cb) => { + events.emit('all connected') + whilst(() => pendingRepeats > 0, (cb) => { + const first = nodes[0] + let blocks + waterfall([ + (cb) => map(range(blockCount), (_, cb) => makeBlock(cb), cb), + (_blocks, cb) => { + blocks = _blocks + cb() + }, + (cb) => each(blocks, first.bitswap.put.bind(first.bitswap), (err) => { + events.emit('first put') + cb(err) + }), + (cb) => map(nodes, (node, cb) => { + events.emit('getting many') + const cids = blocks.map((block) => block.cid) + const start = Date.now() + node.bitswap.getMany(cids, (err, result) => { + if (err) { + cb(err) + } else { + const elapsed = Date.now() - start + events.emit('got block', elapsed) + cb(null, result) + } + }) + }, cb), + (results, cb) => { + try { + expect(results).have.lengthOf(instanceCount) + results.forEach((nodeBlocks) => { + expect(nodeBlocks).to.have.lengthOf(blocks.length) + nodeBlocks.forEach((block, i) => { + expect(block.data).to.deep.equal(blocks[i].data) + }) + }) + } catch (err) { + return cb(err) + } + cb() + }, + (cb) => { + pendingRepeats-- + cb() + } + ], cb) + }, cb) } ], (err) => { + events.emit('stop') each( nodes, (node, cb) => { @@ -64,12 +92,14 @@ module.exports = (instanceCount, blockCount, callback) => { (cb) => node.libp2pNode.stop(cb), (cb) => node.repo.teardown(cb) ], - cb - ) + cb) }, (err2) => { + events.emit('stopped') callback(err) } ) }) + + return events } From 89b263a695e2d592220fcb9807808d225e3a6576 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Feb 2018 10:23:59 +0000 Subject: [PATCH 20/24] chore: update deps --- package.json | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index 3d1bdcb1..77ae4aee 100644 --- a/package.json +++ b/package.json @@ -38,24 +38,24 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^12.2.0", + "aegir": "^12.4.0", "benchmark": "^2.1.4", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "ipfs-repo": "~0.18.5", - "libp2p": "~0.14.3", - "libp2p-kad-dht": "~0.6.0", + "ipfs-repo": "~0.18.7", + "libp2p": "~0.15.2", + "libp2p-kad-dht": "~0.6.3", "libp2p-multiplex": "~0.5.1", - "libp2p-secio": "~0.8.1", - "libp2p-tcp": "~0.11.1", - "lodash": "^4.17.4", + "libp2p-secio": "~0.9.1", + "libp2p-tcp": "~0.11.2", + "lodash": "^4.17.5", "lodash.range": "^3.2.0", "lodash.without": "^4.4.0", - "multiaddr": "^3.0.1", + "multiaddr": "^3.0.2", "ncp": "^2.0.0", - "peer-book": "~0.5.2", - "peer-id": "~0.10.3", - "peer-info": "~0.11.4", + "peer-book": "~0.5.4", + "peer-id": "~0.10.5", + "peer-info": "~0.11.6", "pre-commit": "^1.2.2", "rimraf": "^2.6.2", "safe-buffer": "^5.1.1", @@ -77,12 +77,12 @@ "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", "moving-average": "^1.0.0", - "multicodec": "~0.2.5", + "multicodec": "~0.2.6", "multihashing-async": "~0.4.7", "protons": "^1.0.1", "pull-defer": "~0.2.2", "pull-length-prefixed": "^1.3.0", - "pull-pushable": "^2.1.1", + "pull-pushable": "^2.1.2", "pull-stream": "^3.6.1", "safe-buffer": "^5.1.1", "varint-decoder": "^0.1.1" From 1fc73b40b1521a19605f2aa802df4fe6aefafe97 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Feb 2018 10:38:07 +0000 Subject: [PATCH 21/24] chore: update contributors From f0905906a3fe26105699a8342f8fd89f7d9cbe32 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Feb 2018 10:38:07 +0000 Subject: [PATCH 22/24] chore: release version v0.18.1 --- CHANGELOG.md | 16 ++++++++++++++++ package.json | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eb8bea1..c3bd1577 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,19 @@ + +## [0.18.1](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.18.0...v0.18.1) (2018-02-06) + + +### Bug Fixes + +* getMany: ensuring we set the want list ([#162](https://github.com/ipfs/js-ipfs-bitswap/issues/162)) ([8e91def](https://github.com/ipfs/js-ipfs-bitswap/commit/8e91def)) + + +### Features + +* added getMany performance tests ([#164](https://github.com/ipfs/js-ipfs-bitswap/issues/164)) ([b349085](https://github.com/ipfs/js-ipfs-bitswap/commit/b349085)) +* per-peer stats ([#166](https://github.com/ipfs/js-ipfs-bitswap/issues/166)) ([ff978d0](https://github.com/ipfs/js-ipfs-bitswap/commit/ff978d0)) + + + # [0.18.0](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.4...v0.18.0) (2017-12-15) diff --git a/package.json b/package.json index 77ae4aee..921d387f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ipfs-bitswap", - "version": "0.18.0", + "version": "0.18.1", "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", "main": "src/index.js", "browser": { From 37f19cfe9ac4d1047d14aac8868dcb23e82b0c65 Mon Sep 17 00:00:00 2001 From: Yahya Date: Wed, 27 Sep 2017 20:10:16 +0200 Subject: [PATCH 23/24] feature: ability to override default bitswap max message size License: MIT Signed-off-by: Yahya --- src/decision-engine/index.js | 11 ++++++----- src/index.js | 5 +++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index ffd0827b..42bed88c 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -18,10 +18,8 @@ const Wantlist = require('../types/wantlist') const Ledger = require('./ledger') const logger = require('../utils').logger -const MAX_MESSAGE_SIZE = 512 * 1024 - class DecisionEngine { - constructor (peerId, blockstore, network, stats) { + constructor (peerId, blockstore, network, stats, options) { this._log = logger(peerId, 'engine') this.blockstore = blockstore this.network = network @@ -34,6 +32,9 @@ class DecisionEngine { // List of tasks to be processed this._tasks = [] + options = options || {} + this._maxMessageSize = options.maxMessageSize + this._outbox = debounce(this._processTasks.bind(this), 100) } @@ -43,7 +44,7 @@ class DecisionEngine { return acc + b.data.byteLength }, 0) - if (total < MAX_MESSAGE_SIZE) { + if (total < this._maxMessageSize) { return this._sendSafeBlocks(peer, blocks, cb) } @@ -56,7 +57,7 @@ class DecisionEngine { batch.push(b) size += b.data.byteLength - if (size >= MAX_MESSAGE_SIZE || + if (size >= this._maxMessageSize || // need to ensure the last remaining items get sent outstanding === 0) { const nextBatch = batch.slice() diff --git a/src/index.js b/src/index.js index cee77cb8..8546adcf 100644 --- a/src/index.js +++ b/src/index.js @@ -16,7 +16,8 @@ const Stats = require('./stats') const defaultOptions = { statsEnabled: false, statsComputeThrottleTimeout: 1000, - statsComputeThrottleMaxQueueSize: 1000 + statsComputeThrottleMaxQueueSize: 1000, + maxMessageSize: 512 * 1024 } const statsKeys = [ 'blocksReceived', @@ -57,7 +58,7 @@ class Bitswap { // local database this.blockstore = blockstore - this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats) + this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats, this._options) // handle message sending this.wm = new WantManager(this.peerInfo.id, this.network, this._stats) From 114c4cf0fc0ff4b6519539286061598f109c622e Mon Sep 17 00:00:00 2001 From: Yahya Date: Thu, 12 Oct 2017 18:30:33 +0200 Subject: [PATCH 24/24] setting a minimum msg size, tests for go-interop and variable msg size nodes License: MIT Signed-off-by: Yahya --- src/decision-engine/index.js | 8 ++++- test/bitswap.js | 69 +++++++++++++++++++++++++++++------- test/types/message.spec.js | 2 ++ 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index 42bed88c..9a1306fc 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -18,6 +18,8 @@ const Wantlist = require('../types/wantlist') const Ledger = require('./ledger') const logger = require('../utils').logger +const MAX_MESSAGE_SIZE = 512 * 1024 + class DecisionEngine { constructor (peerId, blockstore, network, stats, options) { this._log = logger(peerId, 'engine') @@ -33,7 +35,11 @@ class DecisionEngine { this._tasks = [] options = options || {} - this._maxMessageSize = options.maxMessageSize + this._maxMessageSize = options.maxMessageSize || MAX_MESSAGE_SIZE + // make sure the maxMessageSize is at least 1024 + if (this._maxMessageSize < 1024) { + this._maxMessageSize = 1024 + } this._outbox = debounce(this._processTasks.bind(this), 100) } diff --git a/test/bitswap.js b/test/bitswap.js index f3b21ceb..1e3d1786 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -18,8 +18,15 @@ const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const MAX_MESSAGE_SIZE = 512 * 1024 + // Creates a repo + libp2pNode + Bitswap with or without DHT -function createThing (dht, callback) { +function createThing (dht, msgSize, callback) { + if (!callback) { + callback = msgSize + msgSize = MAX_MESSAGE_SIZE + } + waterfall([ (cb) => createTempRepo(cb), (repo, cb) => { @@ -28,7 +35,7 @@ function createThing (dht, callback) { }, (err, node) => cb(err, repo, node)) }, (repo, libp2pNode, cb) => { - const bitswap = new Bitswap(libp2pNode, repo.blocks) + const bitswap = new Bitswap(libp2pNode, repo.blocks, {maxMessageSize: msgSize}) bitswap.start((err) => cb(err, repo, libp2pNode, bitswap)) } ], (err, repo, libp2pNode, bitswap) => { @@ -49,12 +56,13 @@ describe('bitswap without DHT', function () { before((done) => { parallel([ - (cb) => createThing(false, cb), - (cb) => createThing(false, cb), - (cb) => createThing(false, cb) + (cb) => createThing(false, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(false, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(false, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(false, 32, cb) ], (err, results) => { expect(err).to.not.exist() - expect(results).to.have.length(3) + expect(results).to.have.length(4) nodes = results done() }) @@ -77,6 +85,13 @@ describe('bitswap without DHT', function () { ], done) }) + it('connect 1 -> 3 && 2 -> 3', (done) => { + parallel([ + (cb) => nodes[1].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb), + (cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb) + ], done) + }) + it('put a block in 2, fail to get it in 0', (done) => { const finish = orderedFinish(2, done) @@ -106,12 +121,13 @@ describe('bitswap with DHT', function () { before((done) => { parallel([ - (cb) => createThing(true, cb), - (cb) => createThing(true, cb), - (cb) => createThing(true, cb) + (cb) => createThing(true, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(true, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(true, MAX_MESSAGE_SIZE, cb), + (cb) => createThing(true, 32, cb) ], (err, results) => { expect(err).to.not.exist() - expect(results).to.have.length(3) + expect(results).to.have.length(4) nodes = results done() }) @@ -127,10 +143,11 @@ describe('bitswap with DHT', function () { }, done) }) - it('connect 0 -> 1 && 1 -> 2', (done) => { + it('connect 0 -> 1 && 1 -> 2 && 2 -> 3', (done) => { parallel([ (cb) => nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo, cb), - (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb) + (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb), + (cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb) ], done) }) @@ -147,4 +164,32 @@ describe('bitswap with DHT', function () { }) ], done) }) + + it('put a block in 2, get it in 3', (done) => { + waterfall([ + (cb) => makeBlock(cb), + (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)), + (block, cb) => setTimeout(() => cb(null, block), 400), + (block, cb) => nodes[3].bitswap.get(block.cid, (err, blockRetrieved) => { + expect(err).to.not.exist() + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) + cb() + }) + ], done) + }) + + it('put a block in 3, get it in 2', (done) => { + waterfall([ + (cb) => makeBlock(cb), + (block, cb) => nodes[3].bitswap.put(block, () => cb(null, block)), + (block, cb) => setTimeout(() => cb(null, block), 400), + (block, cb) => nodes[2].bitswap.get(block.cid, (err, blockRetrieved) => { + expect(err).to.not.exist() + expect(block.data).to.eql(blockRetrieved.data) + expect(block.cid).to.eql(blockRetrieved.cid) + cb() + }) + ], done) + }) }) diff --git a/test/types/message.spec.js b/test/types/message.spec.js index 9e155f70..b4c4817b 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -266,6 +266,7 @@ describe('BitswapMessage', () => { expect(err).to.not.exist() // TODO // check the deserialised message + expect(message.serializeToBitswap110()).to.eql(rawMessageFullWantlist) done() }) }) @@ -275,6 +276,7 @@ describe('BitswapMessage', () => { expect(err).to.not.exist() // TODO // check the deserialised message + expect(message.serializeToBitswap110()).to.eql(rawMessageOneBlock) done() }) })