diff --git a/.gitignore b/.gitignore
index 9547fedb..d23420d9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,5 @@ test/test-repo-for*
docs
test/test-repo/datastore
+
+*.flamegraph
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index 584f308f..5102ee5f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,3 +1,4 @@
+# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
sudo: false
language: node_js
@@ -14,7 +15,6 @@ script:
- npm run lint
- npm run test
- npm run coverage
- - make test
before_script:
- export DISPLAY=:99.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1de261a9..c3bd1577 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,49 @@
+
+## [0.18.1](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.18.0...v0.18.1) (2018-02-06)
+
+
+### Bug Fixes
+
+* getMany: ensuring we set the want list ([#162](https://github.com/ipfs/js-ipfs-bitswap/issues/162)) ([8e91def](https://github.com/ipfs/js-ipfs-bitswap/commit/8e91def))
+
+
+### Features
+
+* added getMany performance tests ([#164](https://github.com/ipfs/js-ipfs-bitswap/issues/164)) ([b349085](https://github.com/ipfs/js-ipfs-bitswap/commit/b349085))
+* per-peer stats ([#166](https://github.com/ipfs/js-ipfs-bitswap/issues/166)) ([ff978d0](https://github.com/ipfs/js-ipfs-bitswap/commit/ff978d0))
+
+
+
+
+# [0.18.0](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.4...v0.18.0) (2017-12-15)
+
+
+### Features
+
+* stats improvements ([#158](https://github.com/ipfs/js-ipfs-bitswap/issues/158)) ([17e15d0](https://github.com/ipfs/js-ipfs-bitswap/commit/17e15d0))
+
+
+
+
+## [0.17.4](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.3...v0.17.4) (2017-11-10)
+
+
+### Features
+
+* windows interop ([#154](https://github.com/ipfs/js-ipfs-bitswap/issues/154)) ([a8b1e07](https://github.com/ipfs/js-ipfs-bitswap/commit/a8b1e07))
+
+
+
+
+## [0.17.3](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.2...v0.17.3) (2017-11-08)
+
+
+### Bug Fixes
+
+* add missing multicodec dependency ([#155](https://github.com/ipfs/js-ipfs-bitswap/issues/155)) ([751d436](https://github.com/ipfs/js-ipfs-bitswap/commit/751d436))
+
+
+
## [0.17.2](https://github.com/ipfs/js-ipfs-bitswap/compare/v0.17.1...v0.17.2) (2017-09-07)
diff --git a/README.md b/README.md
index 53dc3449..49f9f1b5 100644
--- a/README.md
+++ b/README.md
@@ -54,6 +54,94 @@ Loading this module through a script tag will make the `IpfsBitswap` object avai
See https://ipfs.github.io/js-ipfs-bitswap
+### Stats
+
+```js
+const bitswapNode = // ...
+
+const stats = bitswapNode.stat()
+```
+
+Stats contains a snapshot accessor, a moving average acessor and a peer accessor.
+
+Besides that, it emits "update" events every time it is updated.
+
+```js
+stats.on('update', (stats) => {
+ console.log('latest stats snapshot: %j', stats)
+})
+```
+
+#### Peer accessor:
+
+You can get the stats for a specific peer by doing:
+
+```js
+const peerStats = stats.forPeer(peerId)
+```
+
+The returned object behaves like the root stats accessor (has a snapshot, a moving average accessors and is an event emitter).
+
+#### Global snapshot accessor:
+
+```js
+const snapshot = stats.snapshot
+console.log('stats: %j', snapshot)
+```
+
+the snapshot will contain the following keys, with the values being [Big.js](https://github.com/MikeMcl/big.js#readme) instances:
+
+```js
+// stats: {
+// "dataReceived":"96",
+// "blocksReceived":"2",
+// "dataReceived":"96",
+// "dupBlksReceived":"0",
+// "dupDataReceived":"0",
+// "blocksSent":"0",
+// "dataSent":"0",
+// "providesBufferLength":"0",
+// "wantListLength":"0",
+// "peerCount":"1"
+// }
+```
+
+#### Moving average accessor:
+
+```js
+const movingAverages = stats.movingAverages
+```
+
+This object contains these properties:
+
+* 'blocksReceived',
+* 'dataReceived',
+* 'dupBlksReceived',
+* 'dupDataReceived',
+* 'blocksSent',
+* 'dataSent',
+* 'providesBufferLength',
+* 'wantListLength',
+* 'peerCount'
+
+```js
+const dataReceivedMovingAverages = movingAverages.dataReceived
+```
+
+Each one of these will contain one key per interval (miliseconds), being the default intervals defined:
+
+* 60000 (1 minute)
+* 300000 (5 minutes)
+* 900000 (15 minutes)
+
+You can then select one of them
+
+```js
+const oneMinuteDataReceivedMovingAverages = dataReceivedMovingAverages[60000]
+```
+
+This object will be a [movingAverage](https://github.com/pgte/moving-average#readme) instance.
+
## Development
### Structure
@@ -83,6 +171,35 @@ src
└── index.js
```
+## Performance tests
+
+You can run performance tests like this:
+
+```
+$ npm run benchmarks
+```
+
+### Profiling
+
+You can run each of the individual performance tests with a profiler like 0x.
+
+To do that, you need to install 0x:
+
+```bash
+$ npm install 0x --global
+```
+
+And then run the test:
+
+```bash
+$ 0x test/benchmarks/get-many
+```
+
+This will output a flame graph and print the location for it.
+Use the browser Chrome to open and inspect the generated graph.
+
+
+
## Contribute
Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-bitswap/issues)!
diff --git a/appveyor.yml b/appveyor.yml
new file mode 100644
index 00000000..046bf910
--- /dev/null
+++ b/appveyor.yml
@@ -0,0 +1,29 @@
+# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
+version: "{build}"
+
+environment:
+ matrix:
+ - nodejs_version: "6"
+ - nodejs_version: "8"
+
+matrix:
+ fast_finish: true
+
+install:
+ # Install Node.js
+ - ps: Install-Product node $env:nodejs_version
+
+ # Upgrade npm
+ - npm install -g npm
+
+ # Output our current versions for debugging
+ - node --version
+ - npm --version
+
+ # Install our package dependencies
+ - npm install
+
+test_script:
+ - npm run test:node
+
+build: off
diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile
new file mode 100644
index 00000000..a7da2e54
--- /dev/null
+++ b/ci/Jenkinsfile
@@ -0,0 +1,2 @@
+// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
+javascript()
diff --git a/circle.yml b/circle.yml
new file mode 100644
index 00000000..00096937
--- /dev/null
+++ b/circle.yml
@@ -0,0 +1,15 @@
+# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories.
+machine:
+ node:
+ version: stable
+
+dependencies:
+ pre:
+ - google-chrome --version
+ - curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
+ - sudo dpkg -i google-chrome.deb || true
+ - sudo apt-get update
+ - sudo apt-get install -f
+ - sudo apt-get install --only-upgrade lsb-base
+ - sudo dpkg -i google-chrome.deb
+ - google-chrome --version
diff --git a/package.json b/package.json
index 96715309..921d387f 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "ipfs-bitswap",
- "version": "0.17.2",
+ "version": "0.18.1",
"description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS",
"main": "src/index.js",
"browser": {
@@ -8,17 +8,18 @@
"./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js"
},
"scripts": {
- "test": "aegir test --target node --target browser",
- "test:browser": "aegir test --target browser",
- "test:node": "aegir test --target node",
+ "test": "aegir test -t node -t browser",
+ "test:browser": "aegir test -t browser",
+ "test:node": "aegir test -t node",
"lint": "aegir lint",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"bench": "node benchmarks/index",
"build": "aegir build",
- "coverage": "aegir coverage -u",
- "docs": "aegir docs"
+ "coverage": "aegir coverage --provider codecov",
+ "docs": "aegir docs",
+ "benchmarks": "node test/benchmarks/get-many"
},
"repository": {
"type": "git",
@@ -37,30 +38,35 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
- "aegir": "^12.0.6",
+ "aegir": "^12.4.0",
"benchmark": "^2.1.4",
"chai": "^4.1.2",
"dirty-chai": "^2.0.1",
- "ipfs-repo": "~0.17.0",
- "libp2p": "~0.12.3",
- "libp2p-kad-dht": "~0.5.0",
- "libp2p-multiplex": "~0.5.0",
- "libp2p-secio": "~0.8.1",
- "libp2p-tcp": "~0.11.0",
- "lodash": "^4.17.4",
- "multiaddr": "^3.0.1",
+ "ipfs-repo": "~0.18.7",
+ "libp2p": "~0.15.2",
+ "libp2p-kad-dht": "~0.6.3",
+ "libp2p-multiplex": "~0.5.1",
+ "libp2p-secio": "~0.9.1",
+ "libp2p-tcp": "~0.11.2",
+ "lodash": "^4.17.5",
+ "lodash.range": "^3.2.0",
+ "lodash.without": "^4.4.0",
+ "multiaddr": "^3.0.2",
"ncp": "^2.0.0",
- "peer-book": "~0.5.0",
- "peer-id": "~0.10.0",
- "peer-info": "~0.11.0",
- "rimraf": "^2.6.1",
- "safe-buffer": "^5.1.1"
+ "peer-book": "~0.5.4",
+ "peer-id": "~0.10.5",
+ "peer-info": "~0.11.6",
+ "pre-commit": "^1.2.2",
+ "rimraf": "^2.6.2",
+ "safe-buffer": "^5.1.1",
+ "stats-lite": "^2.1.0"
},
"dependencies": {
- "async": "^2.5.0",
- "cids": "~0.5.1",
- "debug": "^3.0.1",
- "ipfs-block": "~0.6.0",
+ "async": "^2.6.0",
+ "big.js": "^5.0.3",
+ "cids": "~0.5.2",
+ "debug": "^3.1.0",
+ "ipfs-block": "~0.6.1",
"lodash.debounce": "^4.0.8",
"lodash.find": "^4.6.0",
"lodash.groupby": "^4.6.0",
@@ -70,25 +76,33 @@
"lodash.sortby": "^4.7.0",
"lodash.uniqwith": "^4.5.0",
"lodash.values": "^4.3.0",
- "multihashing-async": "~0.4.6",
- "protons": "^1.0.0",
- "pull-defer": "^0.2.2",
+ "moving-average": "^1.0.0",
+ "multicodec": "~0.2.6",
+ "multihashing-async": "~0.4.7",
+ "protons": "^1.0.1",
+ "pull-defer": "~0.2.2",
"pull-length-prefixed": "^1.3.0",
- "pull-pushable": "^2.1.1",
- "pull-stream": "^3.6.0",
+ "pull-pushable": "^2.1.2",
+ "pull-stream": "^3.6.1",
"safe-buffer": "^5.1.1",
"varint-decoder": "^0.1.1"
},
+ "pre-commit": [
+ "lint",
+ "test"
+ ],
"contributors": [
"David Dias ",
"Dmitriy Ryajov ",
"Friedel Ziegelmayer ",
"Pedro Teixeira ",
"Richard Littauer ",
+ "Richard Schneider ",
"Stephen Whitmore ",
"dmitriy ryajov ",
"greenkeeper[bot] ",
"greenkeeperio-bot ",
- "npmcdn-to-unpkg-bot "
+ "npmcdn-to-unpkg-bot ",
+ "ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ "
]
}
diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js
index ebd60346..9a1306fc 100644
--- a/src/decision-engine/index.js
+++ b/src/decision-engine/index.js
@@ -21,10 +21,11 @@ const logger = require('../utils').logger
const MAX_MESSAGE_SIZE = 512 * 1024
class DecisionEngine {
- constructor (peerId, blockstore, network) {
+ constructor (peerId, blockstore, network, stats, options) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
this.network = network
+ this._stats = stats
// A list of of ledgers by their partner id
this.ledgerMap = new Map()
@@ -33,6 +34,13 @@ class DecisionEngine {
// List of tasks to be processed
this._tasks = []
+ options = options || {}
+ this._maxMessageSize = options.maxMessageSize || MAX_MESSAGE_SIZE
+ // make sure the maxMessageSize is at least 1024
+ if (this._maxMessageSize < 1024) {
+ this._maxMessageSize = 1024
+ }
+
this._outbox = debounce(this._processTasks.bind(this), 100)
}
@@ -42,7 +50,7 @@ class DecisionEngine {
return acc + b.data.byteLength
}, 0)
- if (total < MAX_MESSAGE_SIZE) {
+ if (total < this._maxMessageSize) {
return this._sendSafeBlocks(peer, blocks, cb)
}
@@ -55,7 +63,7 @@ class DecisionEngine {
batch.push(b)
size += b.data.byteLength
- if (size >= MAX_MESSAGE_SIZE ||
+ if (size >= this._maxMessageSize ||
// need to ensure the last remaining items get sent
outstanding === 0) {
const nextBatch = batch.slice()
@@ -267,6 +275,9 @@ class DecisionEngine {
const l = new Ledger(peerId)
this.ledgerMap.set(peerIdStr, l)
+ if (this._stats) {
+ this._stats.push(peerIdStr, 'peerCount', 1)
+ }
return l
}
diff --git a/src/index.js b/src/index.js
index f0e59be7..8546adcf 100644
--- a/src/index.js
+++ b/src/index.js
@@ -5,13 +5,31 @@ const reject = require('async/reject')
const each = require('async/each')
const series = require('async/series')
const map = require('async/map')
-const once = require('once')
const WantManager = require('./want-manager')
const Network = require('./network')
const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
+const Stats = require('./stats')
+
+const defaultOptions = {
+ statsEnabled: false,
+ statsComputeThrottleTimeout: 1000,
+ statsComputeThrottleMaxQueueSize: 1000,
+ maxMessageSize: 512 * 1024
+}
+const statsKeys = [
+ 'blocksReceived',
+ 'dataReceived',
+ 'dupBlksReceived',
+ 'dupDataReceived',
+ 'blocksSent',
+ 'dataSent',
+ 'providesBufferLength',
+ 'wantListLength',
+ 'peerCount'
+]
/**
* JavaScript implementation of the Bitswap 'data exchange' protocol
@@ -21,24 +39,29 @@ const logger = require('./utils').logger
* @param {Blockstore} blockstore
*/
class Bitswap {
- constructor (libp2p, blockstore) {
+ constructor (libp2p, blockstore, options) {
this._libp2p = libp2p
this._log = logger(this.peerInfo.id)
+ this._options = Object.assign({}, defaultOptions, options)
+
+ // stats
+ this._stats = new Stats(statsKeys, {
+ enabled: this._options.statsEnabled,
+ computeThrottleTimeout: this._options.statsComputeThrottleTimeout,
+ computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize
+ })
+
// the network delivers messages
- this.network = new Network(libp2p, this)
+ this.network = new Network(libp2p, this, {}, this._stats)
// local database
this.blockstore = blockstore
- this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network)
+ this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats, this._options)
// handle message sending
- this.wm = new WantManager(this.peerInfo.id, this.network)
-
- this.blocksRecvd = 0
- this.dupBlocksRecvd = 0
- this.dupDataRecvd = 0
+ this.wm = new WantManager(this.peerInfo.id, this.network, this._stats)
this.notifications = new Notifications(this.peerInfo.id)
}
@@ -84,7 +107,7 @@ class Bitswap {
waterfall([
(cb) => this.blockstore.has(block.cid, cb),
(has, cb) => {
- this._updateReceiveCounters(block, has)
+ this._updateReceiveCounters(peerId.toB58String(), block, has)
if (has) {
return cb()
}
@@ -94,12 +117,13 @@ class Bitswap {
], callback)
}
- _updateReceiveCounters (block, exists) {
- this.blocksRecvd++
+ _updateReceiveCounters (peerId, block, exists) {
+ this._stats.push(peerId, 'blocksReceived', 1)
+ this._stats.push(peerId, 'dataReceived', block.data.length)
if (exists) {
- this.dupBlocksRecvd++
- this.dupDataRecvd += block.data.length
+ this._stats.push(peerId, 'dupBlksReceived', 1)
+ this._stats.push(peerId, 'dupDataReceived', block.data.length)
}
}
@@ -117,6 +141,7 @@ class Bitswap {
_onPeerDisconnected (peerId) {
this.wm.disconnected(peerId)
this.engine.peerDisconnected(peerId)
+ this._stats.disconnected(peerId)
}
_putBlock (block, callback) {
@@ -137,6 +162,14 @@ class Bitswap {
})
}
+ enableStats () {
+ this._stats.enable()
+ }
+
+ disableStats () {
+ this._stats.disable()
+ }
+
/**
* Return the current wantlist for a given `peerId`
*
@@ -179,78 +212,60 @@ class Bitswap {
* @returns {void}
*/
getMany (cids, callback) {
- const retrieved = []
- const locals = []
- const missing = []
- const canceled = []
-
- const finish = once(() => {
- map(locals, (cid, cb) => {
- this.blockstore.get(cid, cb)
- }, (err, localBlocks) => {
- if (err) {
- return callback(err)
+ let pendingStart = cids.length
+ const wantList = []
+ let promptedNetwork = false
+
+ const getFromOutside = (cid, cb) => {
+ wantList.push(cid)
+
+ this.notifications.wantBlock(
+ cid,
+ // called on block receive
+ (block) => {
+ this.wm.cancelWants([cid])
+ cb(null, block)
+ },
+ // called on unwant
+ () => {
+ this.wm.cancelWants([cid])
+ cb(null, undefined)
}
+ )
- callback(null, localBlocks.concat(retrieved))
- })
- })
-
- this._log('getMany', cids.length)
-
- const addListeners = (cids) => {
- cids.forEach((cid) => {
- this.notifications.wantBlock(
- cid,
- // called on block receive
- (block) => {
- this.wm.cancelWants([cid])
- retrieved.push(block)
-
- if (retrieved.length === missing.length) {
- finish()
- }
- },
- // called on unwant
- () => {
- this.wm.cancelWants([cid])
- canceled.push(cid)
- if (canceled.length + retrieved.length === missing.length) {
- finish()
- }
- }
- )
- })
+ if (!pendingStart) {
+ this.wm.wantBlocks(wantList)
+ }
}
- each(cids, (cid, cb) => {
- this.blockstore.has(cid, (err, has) => {
- if (err) {
- return cb(err)
- }
-
- if (has) {
- locals.push(cid)
- } else {
- missing.push(cid)
- }
- cb()
- })
- }, () => {
- if (missing.length === 0) {
- // already finished
- finish()
- }
+ map(cids, (cid, cb) => {
+ waterfall(
+ [
+ (cb) => this.blockstore.has(cid, cb),
+ (has, cb) => {
+ pendingStart--
+ if (has) {
+ if (!pendingStart) {
+ this.wm.wantBlocks(wantList)
+ }
+ return this.blockstore.get(cid, cb)
+ }
- addListeners(missing)
- this.wm.wantBlocks(missing)
+ if (!promptedNetwork) {
+ promptedNetwork = true
+ this.network.findAndConnect(cids[0], (err) => {
+ if (err) {
+ this._log.error(err)
+ }
+ })
+ }
- this.network.findAndConnect(cids[0], (err) => {
- if (err) {
- this._log.error(err)
- }
- })
- })
+ // we don't have the block here
+ getFromOutside(cid, cb)
+ }
+ ],
+ cb)
+ }, callback)
}
// removes the given cids from the wantlist independent of any ref counts
@@ -329,25 +344,28 @@ class Bitswap {
/**
* Get the current list of wants.
*
- * @returns {Array}
+ * @returns {Iterator}
*/
getWantlist () {
return this.wm.wantlist.entries()
}
+ /**
+ * Get the current list of partners.
+ *
+ * @returns {Array}
+ */
+ peers () {
+ return this.engine.peers()
+ }
+
/**
* Get stats about the bitswap node.
*
* @returns {Object}
*/
stat () {
- return {
- wantlist: this.getWantlist(),
- blocksReceived: this.blocksRecvd,
- dupBlksReceived: this.dupBlocksRecvd,
- dupDataReceived: this.dupDataRecvd,
- peers: this.engine.peers()
- }
+ return this._stats
}
/**
@@ -373,6 +391,7 @@ class Bitswap {
* @returns {void}
*/
stop (callback) {
+ this._stats.stop()
series([
(cb) => this.wm.stop(cb),
(cb) => this.network.stop(cb),
diff --git a/src/network.js b/src/network.js
index 9ac85590..6b603a25 100644
--- a/src/network.js
+++ b/src/network.js
@@ -14,13 +14,14 @@ const BITSWAP100 = '/ipfs/bitswap/1.0.0'
const BITSWAP110 = '/ipfs/bitswap/1.1.0'
class Network {
- constructor (libp2p, bitswap, options) {
+ constructor (libp2p, bitswap, options, stats) {
this._log = logger(libp2p.peerInfo.id, 'network')
options = options || {}
this.libp2p = libp2p
this.bitswap = bitswap
this.b100Only = options.b100Only || false
+ this._stats = stats
this._running = false
}
@@ -149,6 +150,7 @@ class Network {
}
})
callback()
+ this._updateSentStats(peer, msg.blocks)
})
}
@@ -176,6 +178,14 @@ class Network {
callback(null, conn, BITSWAP110)
})
}
+
+ _updateSentStats (peer, blocks) {
+ const peerId = peer.toB58String()
+ if (this._stats) {
+ blocks.forEach((block) => this._stats.push(peerId, 'dataSent', block.data.length))
+ this._stats.push(peerId, 'blocksSent', blocks.size)
+ }
+ }
}
function writeMessage (conn, msg, callback) {
diff --git a/src/stats/index.js b/src/stats/index.js
new file mode 100644
index 00000000..b364784d
--- /dev/null
+++ b/src/stats/index.js
@@ -0,0 +1,99 @@
+'use strict'
+
+const EventEmitter = require('events')
+const Stat = require('./stat')
+
+const defaultOptions = {
+ movingAverageIntervals: [
+ 60 * 1000, // 1 minute
+ 5 * 60 * 1000, // 5 minutes
+ 15 * 60 * 1000 // 15 minutes
+ ]
+}
+
+class Stats extends EventEmitter {
+ constructor (initialCounters, _options) {
+ super()
+
+ const options = Object.assign({}, defaultOptions, _options)
+
+ if (typeof options.computeThrottleTimeout !== 'number') {
+ throw new Error('need computeThrottleTimeout')
+ }
+
+ if (typeof options.computeThrottleMaxQueueSize !== 'number') {
+ throw new Error('need computeThrottleMaxQueueSize')
+ }
+
+ this._initialCounters = initialCounters
+ this._options = options
+ this._enabled = this._options.enabled
+
+ this._global = new Stat(initialCounters, options)
+ this._global.on('update', (stats) => this.emit('update', stats))
+
+ this._peers = new Map()
+ }
+
+ enable () {
+ this._enabled = true
+ this._options.enabled = true
+ this._global.enable()
+ }
+
+ disable () {
+ this._enabled = false
+ this._options.enabled = false
+ this._global.disable()
+ }
+
+ stop () {
+ this._enabled = false
+ this._global.stop()
+ for (let peerStat of this._peers) {
+ peerStat[1].stop()
+ }
+ }
+
+ get snapshot () {
+ return this._global.snapshot
+ }
+
+ get movingAverages () {
+ return this._global.movingAverages
+ }
+
+ forPeer (peerId) {
+ if (peerId.toB58String) {
+ peerId = peerId.toB58String()
+ }
+ return this._peers.get(peerId)
+ }
+
+ push (peer, counter, inc) {
+ if (this._enabled) {
+ this._global.push(counter, inc)
+
+ if (peer) {
+ let peerStats = this._peers.get(peer)
+ if (!peerStats) {
+ peerStats = new Stat(this._initialCounters, this._options)
+ this._peers.set(peer, peerStats)
+ }
+
+ peerStats.push(counter, inc)
+ }
+ }
+ }
+
+ disconnected (peer) {
+ const peerId = peer.toB58String()
+ const peerStats = this._peers.get(peerId)
+ if (peerStats) {
+ peerStats.stop()
+ this._peers.delete(peerId)
+ }
+ }
+}
+
+module.exports = Stats
diff --git a/src/stats/stat.js b/src/stats/stat.js
new file mode 100644
index 00000000..66a22a71
--- /dev/null
+++ b/src/stats/stat.js
@@ -0,0 +1,142 @@
+'use strict'
+
+const EventEmitter = require('events')
+const Big = require('big.js')
+const MovingAverage = require('moving-average')
+
+class Stats extends EventEmitter {
+ constructor (initialCounters, options) {
+ super()
+
+ this._options = options
+ this._queue = []
+ this._stats = {}
+
+ this._frequencyLastTime = Date.now()
+ this._frequencyAccumulators = {}
+ this._movingAverages = {}
+
+ this._update = this._update.bind(this)
+
+ initialCounters.forEach((key) => {
+ this._stats[key] = Big(0)
+ this._movingAverages[key] = {}
+ this._options.movingAverageIntervals.forEach((interval) => {
+ const ma = this._movingAverages[key][interval] = MovingAverage(interval)
+ ma.push(this._frequencyLastTime, 0)
+ })
+ })
+
+ this._enabled = this._options.enabled
+ }
+
+ enable () {
+ this._enabled = true
+ }
+
+ disable () {
+ this._disabled = true
+ }
+
+ stop () {
+ if (this._timeout) {
+ clearTimeout(this._timeout)
+ }
+ }
+
+ get snapshot () {
+ return Object.assign({}, this._stats)
+ }
+
+ get movingAverages () {
+ return Object.assign({}, this._movingAverages)
+ }
+
+ push (counter, inc) {
+ if (this._enabled) {
+ this._queue.push([counter, inc, Date.now()])
+ this._resetComputeTimeout()
+ }
+ }
+
+ _resetComputeTimeout () {
+ if (this._timeout) {
+ clearTimeout(this._timeout)
+ }
+ this._timeout = setTimeout(this._update, this._nextTimeout())
+ }
+
+ _nextTimeout () {
+ // calculate the need for an update, depending on the queue length
+ const urgency = this._queue.length / this._options.computeThrottleMaxQueueSize
+ return Math.max(this._options.computeThrottleTimeout * (1 - urgency), 0)
+ }
+
+ _update () {
+ this._timeout = null
+ if (this._queue.length) {
+ let last
+ while (this._queue.length) {
+ const op = last = this._queue.shift()
+ this._applyOp(op)
+ }
+
+ this._updateFrequency(last[2]) // contains timestamp of last op
+
+ this.emit('update', this._stats)
+ }
+ }
+
+ _updateFrequency (latestTime) {
+ const timeDiff = latestTime - this._frequencyLastTime
+
+ Object.keys(this._stats).forEach((key) => {
+ this._updateFrequencyFor(key, timeDiff, latestTime)
+ })
+
+ this._frequencyLastTime = latestTime
+ }
+
+ _updateFrequencyFor (key, timeDiffMS, latestTime) {
+ const count = this._frequencyAccumulators[key] || 0
+ this._frequencyAccumulators[key] = 0
+ const hz = (count / timeDiffMS) * 1000
+
+ let movingAverages = this._movingAverages[key]
+ if (!movingAverages) {
+ movingAverages = this._movingAverages[key] = {}
+ }
+ this._options.movingAverageIntervals.forEach((movingAverageInterval) => {
+ let movingAverage = movingAverages[movingAverageInterval]
+ if (!movingAverage) {
+ movingAverage = movingAverages[movingAverageInterval] = MovingAverage(movingAverageInterval)
+ }
+ movingAverage.push(latestTime, hz)
+ })
+ }
+
+ _applyOp (op) {
+ const key = op[0]
+ const inc = op[1]
+
+ if (typeof inc !== 'number') {
+ throw new Error('invalid increment number:', inc)
+ }
+
+ let n
+
+ if (!this._stats.hasOwnProperty(key)) {
+ n = this._stats[key] = Big(0)
+ } else {
+ n = this._stats[key]
+ }
+ this._stats[key] = n.plus(inc)
+
+ if (!this._frequencyAccumulators[key]) {
+ this._frequencyAccumulators[key] = 0
+ }
+ this._frequencyAccumulators[key] += inc
+ }
+}
+
+module.exports = Stats
diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js
index a269375d..f24682d0 100644
--- a/src/types/wantlist/index.js
+++ b/src/types/wantlist/index.js
@@ -4,8 +4,9 @@ const sort = require('lodash.sortby')
const Entry = require('./entry')
class Wantlist {
- constructor () {
+ constructor (stats) {
this.set = new Map()
+ this._stats = stats
}
get length () {
@@ -21,6 +22,9 @@ class Wantlist {
entry.priority = priority
} else {
this.set.set(cidStr, new Entry(cid, priority))
+ if (this._stats) {
+ this._stats.push(null, 'wantListSize', 1)
+ }
}
}
@@ -40,6 +44,9 @@ class Wantlist {
}
this.set.delete(cidStr)
+ if (this._stats) {
+ this._stats.push(null, 'wantListSize', -1)
+ }
}
removeForce (cidStr) {
diff --git a/src/want-manager/index.js b/src/want-manager/index.js
index db4c2c12..d7753dcd 100644
--- a/src/want-manager/index.js
+++ b/src/want-manager/index.js
@@ -9,11 +9,12 @@ const MsgQueue = require('./msg-queue')
const logger = require('../utils').logger
module.exports = class WantManager {
- constructor (peerId, network) {
+ constructor (peerId, network, stats) {
this.peers = new Map()
- this.wantlist = new Wantlist()
+ this.wantlist = new Wantlist(stats)
this.network = network
+ this._stats = stats
this._peerId = peerId
this._log = logger(peerId, 'want')
diff --git a/test/benchmarks/get-many.js b/test/benchmarks/get-many.js
new file mode 100644
index 00000000..40aa1c8d
--- /dev/null
+++ b/test/benchmarks/get-many.js
@@ -0,0 +1,12 @@
+'use strict'
+
+const distributionTest = require('../utils/distribution-test')
+const print = require('./helpers/print-swarm-results')
+
+print('10 nodes, 10 blocks, 5 iterations', distributionTest(10, 10, 5, (err) => {
+ if (err) {
+ throw err
+ }
+
+ console.log('Finished. Can kill now...')
+}))
diff --git a/test/benchmarks/helpers/print-swarm-results.js b/test/benchmarks/helpers/print-swarm-results.js
new file mode 100644
index 00000000..60ea7b1b
--- /dev/null
+++ b/test/benchmarks/helpers/print-swarm-results.js
@@ -0,0 +1,36 @@
+'use strict'
+
+const stats = require('stats-lite')
+
+module.exports = (suite, emitter) => {
+ const elapseds = []
+ emitter.once('start', () => {
+ console.log('\n------------------------')
+ console.log(suite)
+ console.log('started')
+ })
+ emitter.once('all connected', () => {
+ console.log('all nodes connected to each other')
+ })
+ emitter.on('getting many', () => {
+ process.stdout.write('.')
+ })
+ emitter.once('stop', () => {
+ console.log('\nstopping')
+ })
+ emitter.once('stopped', () => {
+ console.log('stopped')
+ console.log('stats:')
+ console.log('---------')
+ console.log('mean: %s', stats.mean(elapseds))
+ console.log('median: %s', stats.median(elapseds))
+ console.log('variance: %s', stats.variance(elapseds))
+ console.log('standard deviation: %s', stats.stdev(elapseds))
+ console.log('85th percentile: %s', stats.percentile(elapseds, 0.85))
+ })
+
+ emitter.on('got block', (elapsed) => {
+ process.stdout.write('+')
+ elapseds.push(elapsed)
+ })
+}
diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js
index 47e03fa6..24f91c17 100644
--- a/test/bitswap-mock-internals.js
+++ b/test/bitswap-mock-internals.js
@@ -24,7 +24,9 @@ const storeHasBlocks = require('./utils/store-has-blocks')
const makeBlock = require('./utils/make-block')
const orderedFinish = require('./utils/helpers').orderedFinish
-describe('bitswap with mocks', () => {
+describe('bitswap with mocks', function () {
+ this.timeout(10 * 1000)
+
let repo
let blocks
let ids
@@ -47,7 +49,9 @@ describe('bitswap with mocks', () => {
})
})
- after((done) => repo.teardown(done))
+ after((done) => {
+ repo.teardown(done)
+ })
describe('receive message', () => {
it('simple block message', (done) => {
@@ -66,8 +70,6 @@ describe('bitswap with mocks', () => {
bs._receiveMessage(other, msg, (err) => {
expect(err).to.not.exist()
- expect(bs.blocksRecvd).to.equal(2)
- expect(bs.dupBlocksRecvd).to.equal(0)
map([b1.cid, b2.cid], (cid, cb) => repo.blocks.get(cid, cb), (err, blocks) => {
expect(err).to.not.exist()
@@ -96,9 +98,6 @@ describe('bitswap with mocks', () => {
bs._receiveMessage(other, msg, (err) => {
expect(err).to.not.exist()
- expect(bs.blocksRecvd).to.be.eql(0)
- expect(bs.dupBlocksRecvd).to.be.eql(0)
-
const wl = bs.wantlistForPeer(other)
expect(wl.has(b1.cid.buffer.toString())).to.eql(true)
@@ -110,7 +109,7 @@ describe('bitswap with mocks', () => {
})
it('multi peer', function (done) {
- this.timeout(40 * 1000)
+ this.timeout(80 * 1000)
const bs = new Bitswap(mockLibp2pNode(), repo.blocks)
let others
@@ -154,6 +153,15 @@ describe('bitswap with mocks', () => {
})
describe('get', () => {
+ it('fails on requesting empty block', (done) => {
+ const bs = new Bitswap(mockLibp2pNode(), repo.blocks)
+ bs.get(null, (err, res) => {
+ expect(err).to.exist()
+ expect(err.message).to.equal('Not a valid cid')
+ done()
+ })
+ })
+
it('block exists locally', (done) => {
const block = blocks[4]
@@ -326,18 +334,28 @@ describe('bitswap with mocks', () => {
], done)
})
})
- })
- describe('stat', () => {
- it('has initial stats', () => {
- const bs = new Bitswap(mockLibp2pNode(), {})
+ it('double get', (done) => {
+ const block = blocks[11]
- const stats = bs.stat()
- expect(stats).to.have.property('wantlist')
- expect(stats).to.have.property('blocksReceived', 0)
- expect(stats).to.have.property('dupBlksReceived', 0)
- expect(stats).to.have.property('dupDataReceived', 0)
- expect(stats).to.have.property('peers')
+ const bs = new Bitswap(mockLibp2pNode(), repo.blocks)
+
+ parallel(
+ [
+ (cb) => bs.get(block.cid, cb),
+ (cb) => bs.get(block.cid, cb)
+ ],
+ (err, res) => {
+ expect(err).to.not.exist()
+ expect(res[0]).to.eql(block)
+ expect(res[1]).to.eql(block)
+ done()
+ }
+ )
+
+ bs.put(block, (err) => {
+ expect(err).to.not.exist()
+ })
})
})
@@ -346,7 +364,7 @@ describe('bitswap with mocks', () => {
const bs = new Bitswap(mockLibp2pNode(), repo.blocks)
bs.start((err) => {
expect(err).to.not.exist()
- const b = blocks[11]
+ const b = blocks[12]
let counter = 0
const check = (err, res) => {
diff --git a/test/bitswap-stats.js b/test/bitswap-stats.js
new file mode 100644
index 00000000..bd459118
--- /dev/null
+++ b/test/bitswap-stats.js
@@ -0,0 +1,266 @@
+/* eslint-env mocha */
+'use strict'
+
+const map = require('async/map')
+const each = require('async/each')
+const eachOf = require('async/eachOf')
+const parallel = require('async/parallel')
+const _ = require('lodash')
+const chai = require('chai')
+chai.use(require('dirty-chai'))
+const expect = chai.expect
+const PeerId = require('peer-id')
+
+const Message = require('../src/types/message')
+const Bitswap = require('../src')
+
+const createTempRepo = require('./utils/create-temp-repo-nodejs')
+const createLibp2pNode = require('./utils/create-libp2p-node')
+const makeBlock = require('./utils/make-block')
+const countToFinish = require('./utils/helpers').countToFinish
+
+const expectedStats = [
+ 'blocksReceived',
+ 'dataReceived',
+ 'dupBlksReceived',
+ 'dupDataReceived',
+ 'blocksSent',
+ 'dataSent',
+ 'providesBufferLength',
+ 'wantListLength'
+]
+
+const expectedTimeWindows = [
+ 1000 * 60,
+ 1000 * 60 * 5,
+ 1000 * 60 * 15
+]
+
+describe('bitswap stats', () => {
+ const nodes = [0, 1]
+ let libp2pNodes
+ let repos
+ let bitswaps
+ let bs
+ let blocks
+ let ids
+
+ before((done) => {
+ parallel(
+ {
+ blocks: (cb) => map(_.range(2), (i, cb) => makeBlock(cb), cb),
+ ids: (cb) => map(_.range(2), (i, cb) => PeerId.create({bits: 1024}, cb), cb)
+ },
+ (err, results) => {
+ if (err) {
+ return done(err)
+ }
+
+ blocks = results.blocks
+ ids = results.ids
+
+ done()
+ }
+ )
+ })
+
+ before((done) => {
+ // create 2 temp repos
+ map(nodes, (n, cb) => createTempRepo(cb), (err, _repos) => {
+ expect(err).to.not.exist()
+ repos = _repos
+ done()
+ })
+ })
+
+ before((done) => {
+ // create 2 libp2p nodes
+ map(nodes, (n, cb) => createLibp2pNode({
+ DHT: repos[n].datastore
+ }, cb), (err, _libp2pNodes) => {
+ expect(err).to.not.exist()
+ libp2pNodes = _libp2pNodes
+ done()
+ })
+ })
+
+ before(() => {
+ bitswaps = nodes.map((node, i) => new Bitswap(libp2pNodes[i], repos[i].blocks, {
+ statsEnabled: true,
+ statsComputeThrottleTimeout: 500 // fast update interval for so tests run fast
+ }))
+ bs = bitswaps[0]
+ })
+
+ // start the first bitswap
+ before((done) => bs.start(done))
+
+ after((done) => each(bitswaps, (bs, cb) => bs.stop(cb), done))
+
+ after((done) => each(repos, (repo, cb) => repo.teardown(cb), done))
+
+ after((done) => each(libp2pNodes, (n, cb) => n.stop(cb), done))
+
+ it('has initial stats', () => {
+ const stats = bs.stat()
+ const snapshot = stats.snapshot
+
+ expectedStats.forEach((key) => {
+ expect(snapshot).to.have.property(key)
+ expect(snapshot[key].eq(0)).to.be.true()
+ })
+
+ const movingAverages = stats.movingAverages
+ expectedStats.forEach((key) => {
+ expectedTimeWindows.forEach((timeWindow) => {
+ expect(movingAverages).to.have.property(key)
+ expect(stats.movingAverages[key]).to.have.property(timeWindow)
+ const ma = stats.movingAverages[key][timeWindow]
+ expect(ma.movingAverage()).to.eql(0)
+ expect(ma.variance()).to.eql(0)
+ })
+ })
+ })
+
+ it('updates blocks received', (done) => {
+ bs.stat().once('update', (stats) => {
+ expect(stats.blocksReceived.eq(2)).to.be.true()
+ expect(stats.dataReceived.eq(96)).to.be.true()
+ expect(stats.dupBlksReceived.eq(0)).to.be.true()
+ expect(stats.dupDataReceived.eq(0)).to.be.true()
+ expect(stats.blocksSent.eq(0)).to.be.true()
+ expect(stats.dataSent.eq(0)).to.be.true()
+ expect(stats.providesBufferLength.eq(0)).to.be.true()
+ expect(stats.wantListLength.eq(0)).to.be.true()
+ expect(stats.peerCount.eq(1)).to.be.true()
+
+ // test moving averages
+ const movingAverages = bs.stat().movingAverages
+ const blocksReceivedMA = movingAverages.blocksReceived
+ expectedTimeWindows.forEach((timeWindow) => {
+ expect(blocksReceivedMA).to.have.property(timeWindow)
+ const ma = blocksReceivedMA[timeWindow]
+ expect(ma.movingAverage()).to.be.above(0)
+ expect(ma.variance()).to.be.above(0)
+ })
+
+ const dataReceivedMA = movingAverages.dataReceived
+ expectedTimeWindows.forEach((timeWindow) => {
+ expect(dataReceivedMA).to.have.property(timeWindow)
+ const ma = dataReceivedMA[timeWindow]
+ expect(ma.movingAverage()).to.be.above(0)
+ expect(ma.variance()).to.be.above(0)
+ })
+ done()
+ })
+
+ const other = ids[1]
+
+ const msg = new Message(false)
+ blocks.forEach((block) => msg.addBlock(block))
+
+ bs._receiveMessage(other, msg, (err) => {
+ expect(err).to.not.exist()
+ })
+ })
+
+ it('updates duplicate blocks counters', (done) => {
+ bs.stat().once('update', (stats) => {
+ expect(stats.blocksReceived.eq(4)).to.be.true()
+ expect(stats.dataReceived.eq(192)).to.be.true()
+ expect(stats.dupBlksReceived.eq(2)).to.be.true()
+ expect(stats.dupDataReceived.eq(96)).to.be.true()
+ expect(stats.blocksSent.eq(0)).to.be.true()
+ expect(stats.dataSent.eq(0)).to.be.true()
+ expect(stats.providesBufferLength.eq(0)).to.be.true()
+ done()
+ })
+
+ const other = ids[1]
+
+ const msg = new Message(false)
+ blocks.forEach((block) => msg.addBlock(block))
+
+ bs._receiveMessage(other, msg, (err) => {
+ expect(err).to.not.exist()
+ })
+ })
+
+ describe('connected to another bitswap', () => {
+ let bs2
+ let block
+
+ before((done) => {
+ eachOf(
+ libp2pNodes,
+ (node, i, cb) => node.dial(libp2pNodes[(i + 1) % nodes.length].peerInfo, cb),
+ done)
+ })
+
+ before((done) => {
+ bs2 = bitswaps[1]
+ bs2.start(done)
+ })
+
+ after((done) => {
+ bs2.stop(done)
+ })
+
+ before((done) => {
+ makeBlock((err, _block) => {
+ expect(err).to.not.exist()
+ expect(_block).to.exist()
+ block = _block
+ done()
+ })
+ })
+
+ before((done) => {
+ bs.put(block, done)
+ })
+
+ it('updates stats on transfer', (done) => {
+ const finish = countToFinish(2, done)
+ bs.stat().once('update', (stats) => {
+ expect(stats.blocksReceived.eq(4)).to.be.true()
+ expect(stats.dataReceived.eq(192)).to.be.true()
+ expect(stats.dupBlksReceived.eq(2)).to.be.true()
+ expect(stats.dupDataReceived.eq(96)).to.be.true()
+ expect(stats.blocksSent.eq(1)).to.be.true()
+ expect(stats.dataSent.eq(48)).to.be.true()
+ expect(stats.providesBufferLength.eq(0)).to.be.true()
+ expect(stats.wantListLength.eq(0)).to.be.true()
+ expect(stats.peerCount.eq(2)).to.be.true()
+ finish()
+ })
+
+ bs2.get(block.cid, (err, block) => {
+ expect(err).to.not.exist()
+ expect(block).to.exist()
+ finish()
+ })
+ })
+
+ it('has peer stats', (done) => {
+ const peerIds = libp2pNodes.map((node) => node.peerInfo.id.toB58String())
+ const peerStats = bs2.stat().forPeer(peerIds[0])
+ peerStats.once('update', (stats) => {
+ expect(stats.blocksReceived.eq(1)).to.be.true()
+ expect(stats.dataReceived.eq(48)).to.be.true()
+ expect(stats.dupBlksReceived.eq(0)).to.be.true()
+ expect(stats.dupDataReceived.eq(0)).to.be.true()
+ expect(stats.blocksSent.eq(0)).to.be.true()
+ expect(stats.dataSent.eq(0)).to.be.true()
+ expect(stats.providesBufferLength.eq(0)).to.be.true()
+ expect(stats.wantListLength.eq(0)).to.be.true()
+ expect(stats.peerCount.eq(1)).to.be.true()
+
+ const ma = peerStats.movingAverages.dataReceived[60000]
+ expect(ma.movingAverage()).to.be.above(0)
+ expect(ma.variance()).to.be.above(0)
+
+ done()
+ })
+ })
+ })
+})
diff --git a/test/bitswap.js b/test/bitswap.js
index 0ea43d7b..1e3d1786 100644
--- a/test/bitswap.js
+++ b/test/bitswap.js
@@ -18,8 +18,15 @@ const createLibp2pNode = require('./utils/create-libp2p-node')
const makeBlock = require('./utils/make-block')
const orderedFinish = require('./utils/helpers').orderedFinish
+const MAX_MESSAGE_SIZE = 512 * 1024
+
// Creates a repo + libp2pNode + Bitswap with or without DHT
-function createThing (dht, callback) {
+function createThing (dht, msgSize, callback) {
+ if (!callback) {
+ callback = msgSize
+ msgSize = MAX_MESSAGE_SIZE
+ }
+
waterfall([
(cb) => createTempRepo(cb),
(repo, cb) => {
@@ -28,7 +35,7 @@ function createThing (dht, callback) {
}, (err, node) => cb(err, repo, node))
},
(repo, libp2pNode, cb) => {
- const bitswap = new Bitswap(libp2pNode, repo.blocks)
+ const bitswap = new Bitswap(libp2pNode, repo.blocks, {maxMessageSize: msgSize})
bitswap.start((err) => cb(err, repo, libp2pNode, bitswap))
}
], (err, repo, libp2pNode, bitswap) => {
@@ -49,12 +56,13 @@ describe('bitswap without DHT', function () {
before((done) => {
parallel([
- (cb) => createThing(false, cb),
- (cb) => createThing(false, cb),
- (cb) => createThing(false, cb)
+ (cb) => createThing(false, MAX_MESSAGE_SIZE, cb),
+ (cb) => createThing(false, MAX_MESSAGE_SIZE, cb),
+ (cb) => createThing(false, MAX_MESSAGE_SIZE, cb),
+ (cb) => createThing(false, 32, cb)
], (err, results) => {
expect(err).to.not.exist()
- expect(results).to.have.length(3)
+ expect(results).to.have.length(4)
nodes = results
done()
})
@@ -77,6 +85,13 @@ describe('bitswap without DHT', function () {
], done)
})
+ it('connect 1 -> 3 && 2 -> 3', (done) => {
+ parallel([
+ (cb) => nodes[1].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb),
+ (cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb)
+ ], done)
+ })
+
it('put a block in 2, fail to get it in 0', (done) => {
const finish = orderedFinish(2, done)
@@ -99,17 +114,20 @@ describe('bitswap without DHT', function () {
})
})
-describe('bitswap with DHT', () => {
+describe('bitswap with DHT', function () {
+ this.timeout(20 * 1000)
+
let nodes
before((done) => {
parallel([
- (cb) => createThing(true, cb),
- (cb) => createThing(true, cb),
- (cb) => createThing(true, cb)
+ (cb) => createThing(true, MAX_MESSAGE_SIZE, cb),
+ (cb) => createThing(true, MAX_MESSAGE_SIZE, cb),
+ (cb) => createThing(true, MAX_MESSAGE_SIZE, cb),
+ (cb) => createThing(true, 32, cb)
], (err, results) => {
expect(err).to.not.exist()
- expect(results).to.have.length(3)
+ expect(results).to.have.length(4)
nodes = results
done()
})
@@ -125,10 +143,11 @@ describe('bitswap with DHT', () => {
}, done)
})
- it('connect 0 -> 1 && 1 -> 2', (done) => {
+ it('connect 0 -> 1 && 1 -> 2 && 2 -> 3', (done) => {
parallel([
(cb) => nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo, cb),
- (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb)
+ (cb) => nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo, cb),
+ (cb) => nodes[2].libp2pNode.dial(nodes[3].libp2pNode.peerInfo, cb)
], done)
})
@@ -145,4 +164,32 @@ describe('bitswap with DHT', () => {
})
], done)
})
+
+ it('put a block in 2, get it in 3', (done) => {
+ waterfall([
+ (cb) => makeBlock(cb),
+ (block, cb) => nodes[2].bitswap.put(block, () => cb(null, block)),
+ (block, cb) => setTimeout(() => cb(null, block), 400),
+ (block, cb) => nodes[3].bitswap.get(block.cid, (err, blockRetrieved) => {
+ expect(err).to.not.exist()
+ expect(block.data).to.eql(blockRetrieved.data)
+ expect(block.cid).to.eql(blockRetrieved.cid)
+ cb()
+ })
+ ], done)
+ })
+
+ it('put a block in 3, get it in 2', (done) => {
+ waterfall([
+ (cb) => makeBlock(cb),
+ (block, cb) => nodes[3].bitswap.put(block, () => cb(null, block)),
+ (block, cb) => setTimeout(() => cb(null, block), 400),
+ (block, cb) => nodes[2].bitswap.get(block.cid, (err, blockRetrieved) => {
+ expect(err).to.not.exist()
+ expect(block.data).to.eql(blockRetrieved.data)
+ expect(block.cid).to.eql(blockRetrieved.cid)
+ cb()
+ })
+ ], done)
+ })
})
diff --git a/test/fixtures/repo/version b/test/fixtures/repo/version
index 7ed6ff82..1e8b3149 100644
--- a/test/fixtures/repo/version
+++ b/test/fixtures/repo/version
@@ -1 +1 @@
-5
+6
diff --git a/test/node.js b/test/node.js
index 02daf18b..458b2310 100644
--- a/test/node.js
+++ b/test/node.js
@@ -2,6 +2,8 @@
require('./bitswap.js')
require('./bitswap-mock-internals.js')
+require('./bitswap-stats.js')
require('./decision-engine/decision-engine')
require('./network/network.node.js')
require('./network/gen-bitswap-network.node.js')
+require('./swarms.js')
diff --git a/test/swarms.js b/test/swarms.js
new file mode 100644
index 00000000..074b90c1
--- /dev/null
+++ b/test/swarms.js
@@ -0,0 +1,82 @@
+'use strict'
+
+/* eslint-env mocha */
+
+const stats = require('stats-lite')
+const distributionTest = require('./utils/distribution-test')
+const test = it
+
+describe('swarms', () => {
+ const print = Boolean(process.env.PRINT)
+
+ after(() => {
+ process.exit()
+ })
+
+ test('2 nodes, 2 blocks', function (done) {
+ this.timeout(10 * 1000)
+ maybePrint('2 nodes, 2 blocks', distributionTest(2, 2, done))
+ })
+
+ test('10 nodes, 2 blocks', function (done) {
+ this.timeout(30 * 1000)
+ maybePrint('10 nodes, 2 blocks', distributionTest(10, 2, done))
+ })
+
+ test.only('10 nodes, 10 blocks', function (done) {
+ this.timeout(30 * 1000)
+ maybePrint('10 nodes, 10 blocks', distributionTest(10, 10, 1, done))
+ })
+
+ test('10 nodes, 20 blocks', function (done) {
+ this.timeout(30 * 1000)
+ maybePrint('10 nodes, 20 blocks', distributionTest(10, 20, done))
+ })
+
+ test('50 nodes, 2 blocks', function (done) {
+ this.timeout(600 * 1000)
+ maybePrint('50 nodes, 2 blocks', distributionTest(50, 2, done))
+ })
+
+ test.skip('100 nodes, 2 blocks', function (done) {
+ this.timeout(600 * 1000)
+ maybePrint('100 nodes, 2 blocks', distributionTest(100, 2, done))
+ })
+
+ test('10 nodes, 100 blocks', function (done) {
+ this.timeout(600 * 1000)
+ maybePrint('10 nodes, 100 blocks', distributionTest(10, 100, done))
+ })
+
+ function maybePrint (suite, emitter) {
+ if (!print) {
+ return
+ }
+ const elapseds = []
+ emitter.once('start', () => {
+ console.log('\n------------------------')
+ console.log(suite)
+ console.log('started')
+ })
+ emitter.once('all connected', () => {
+ console.log('all nodes connected to each other')
+ })
+ emitter.once('stop', () => {
+ console.log('stopping')
+ })
+ emitter.once('stopped', () => {
+ console.log('stopped')
+ console.log('stats:')
+ console.log('---------')
+ console.log('mean: %s', stats.mean(elapseds))
+ console.log('median: %s', stats.median(elapseds))
+ console.log('variance: %s', stats.variance(elapseds))
+ console.log('standard deviation: %s', stats.stdev(elapseds))
+ console.log('85th percentile: %s', stats.percentile(elapseds, 0.85))
+ })
+
+ emitter.on('got block', (elapsed) => {
+ elapseds.push(elapsed)
+ })
+ }
+})
diff --git a/test/types/message.spec.js b/test/types/message.spec.js
index 9e155f70..b4c4817b 100644
--- a/test/types/message.spec.js
+++ b/test/types/message.spec.js
@@ -266,6 +266,7 @@ describe('BitswapMessage', () => {
expect(err).to.not.exist()
// TODO
// check the deserialised message
+ expect(message.serializeToBitswap110()).to.eql(rawMessageFullWantlist)
done()
})
})
@@ -275,6 +276,7 @@ describe('BitswapMessage', () => {
expect(err).to.not.exist()
// TODO
// check the deserialised message
+ expect(message.serializeToBitswap110()).to.eql(rawMessageOneBlock)
done()
})
})
diff --git a/test/utils/connect-all.js b/test/utils/connect-all.js
new file mode 100644
index 00000000..cb9c7bc6
--- /dev/null
+++ b/test/utils/connect-all.js
@@ -0,0 +1,15 @@
+'use strict'
+
+const eachSeries = require('async/eachSeries')
+const without = require('lodash.without')
+
+module.exports = (nodes, callback) => {
+ eachSeries(nodes, (node, cb) => {
+ eachSeries(
+ without(nodes, node),
+ (otherNode, cb) => {
+ node.libp2pNode.dial(otherNode.bitswap.peerInfo, cb)
+ },
+ cb)
+ }, callback)
+}
diff --git a/test/utils/create-bitswap.js b/test/utils/create-bitswap.js
new file mode 100644
index 00000000..332f529a
--- /dev/null
+++ b/test/utils/create-bitswap.js
@@ -0,0 +1,26 @@
+'use strict'
+
+const waterfall = require('async/waterfall')
+
+const Bitswap = require('../..')
+const createTempRepo = require('./create-temp-repo-nodejs')
+const createLibp2pNode = require('./create-libp2p-node')
+
+module.exports = (callback) => {
+ waterfall([
+ (cb) => createTempRepo(cb),
+ (repo, cb) => {
+ createLibp2pNode({
+ DHT: repo.datastore
+ }, (err, node) => cb(err, repo, node))
+ },
+ (repo, libp2pNode, cb) => {
+ const bitswap = new Bitswap(libp2pNode, repo.blocks)
+ bitswap.start((err) => cb(err, {
+ bitswap: bitswap,
+ repo: repo,
+ libp2pNode: libp2pNode
+ }))
+ }
+ ], callback)
+}
diff --git a/test/utils/create-temp-repo-nodejs.js b/test/utils/create-temp-repo-nodejs.js
index de34c326..b656bcf2 100644
--- a/test/utils/create-temp-repo-nodejs.js
+++ b/test/utils/create-temp-repo-nodejs.js
@@ -1,22 +1,29 @@
'use strict'
const IPFSRepo = require('ipfs-repo')
-const path = require('path')
+const pathJoin = require('path').join
+const os = require('os')
const ncp = require('ncp')
const rimraf = require('rimraf')
+const series = require('async/series')
-const baseRepo = path.join(__dirname, '../fixtures/repo')
+const baseRepo = pathJoin(__dirname, '../fixtures/repo')
function createTempRepo (callback) {
const date = Date.now().toString()
- const path = `/tmp/bitswap-tests-${date}-${Math.random()}`
+ const path = pathJoin(os.tmpdir(), `bitswap-tests-${date}-${Math.random()}`)
ncp(baseRepo, path, (err) => {
if (err) { return callback(err) }
const repo = new IPFSRepo(path)
- repo.teardown = (callback) => rimraf(path, callback)
+ repo.teardown = (done) => {
+ series([
+ (cb) => repo.close(cb),
+ (cb) => rimraf(path, cb)
+ ], (err) => done(err))
+ }
repo.open((err) => {
if (err) { return callback(err) }
diff --git a/test/utils/distribution-test.js b/test/utils/distribution-test.js
new file mode 100644
index 00000000..3f27fc36
--- /dev/null
+++ b/test/utils/distribution-test.js
@@ -0,0 +1,105 @@
+'use strict'
+
+const range = require('lodash.range')
+const map = require('async/map')
+const each = require('async/each')
+const whilst = require('async/whilst')
+const series = require('async/series')
+const waterfall = require('async/waterfall')
+const chai = require('chai')
+chai.use(require('dirty-chai'))
+const expect = chai.expect
+const EventEmitter = require('events')
+
+const createBitswap = require('./create-bitswap')
+const makeBlock = require('./make-block')
+const connectAll = require('./connect-all')
+
+module.exports = (instanceCount, blockCount, repeats, callback) => {
+ let pendingRepeats = repeats
+ let nodes
+ const events = new EventEmitter()
+
+ waterfall([
+ (cb) => map(range(instanceCount), (_, cb) => createBitswap(cb), cb),
+ (_nodes, cb) => {
+ nodes = _nodes
+ events.emit('start')
+ cb()
+ },
+ (cb) => {
+ connectAll(nodes, cb)
+ },
+ (cb) => {
+ events.emit('all connected')
+ whilst(() => pendingRepeats > 0, (cb) => {
+ const first = nodes[0]
+ let blocks
+ waterfall([
+ (cb) => map(range(blockCount), (_, cb) => makeBlock(cb), cb),
+ (_blocks, cb) => {
+ blocks = _blocks
+ cb()
+ },
+ (cb) => each(blocks, first.bitswap.put.bind(first.bitswap), (err) => {
+ events.emit('first put')
+ cb(err)
+ }),
+ (cb) => map(nodes, (node, cb) => {
+ events.emit('getting many')
+ const cids = blocks.map((block) => block.cid)
+ const start = Date.now()
+ node.bitswap.getMany(cids, (err, result) => {
+ if (err) {
+ cb(err)
+ } else {
+ const elapsed = Date.now() - start
+ events.emit('got block', elapsed)
+ cb(null, result)
+ }
+ })
+ }, cb),
+ (results, cb) => {
+ try {
+ expect(results).have.lengthOf(instanceCount)
+ results.forEach((nodeBlocks) => {
+ expect(nodeBlocks).to.have.lengthOf(blocks.length)
+ nodeBlocks.forEach((block, i) => {
+ expect(block.data).to.deep.equal(blocks[i].data)
+ })
+ })
+ } catch (err) {
+ return cb(err)
+ }
+ cb()
+ },
+ (cb) => {
+ pendingRepeats--
+ cb()
+ }
+ ], cb)
+ }, cb)
+ }
+ ],
+ (err) => {
+ events.emit('stop')
+ each(
+ nodes,
+ (node, cb) => {
+ series(
+ [
+ (cb) => node.bitswap.stop(cb),
+ (cb) => node.libp2pNode.stop(cb),
+ (cb) => node.repo.teardown(cb)
+ ],
+ cb)
+ },
+ (err2) => {
+ events.emit('stopped')
+ callback(err)
+ }
+ )
+ })
+
+ return events
+}
diff --git a/test/utils/helpers.js b/test/utils/helpers.js
index 9e3d897e..98a1be5d 100644
--- a/test/utils/helpers.js
+++ b/test/utils/helpers.js
@@ -16,3 +16,16 @@ exports.orderedFinish = (n, callback) => {
}
}
}
+
+exports.countToFinish = (n, callback) => {
+ let pending = n
+
+ return () => {
+ pending--
+ if (pending === 0) {
+ callback()
+ } else if (pending < 0) {
+ callback(new Error('too many finishes, expected only ' + n))
+ }
+ }
+}
diff --git a/test/utils/make-block.js b/test/utils/make-block.js
index d125f880..66b69193 100644
--- a/test/utils/make-block.js
+++ b/test/utils/make-block.js
@@ -4,9 +4,10 @@ const multihashing = require('multihashing-async')
const CID = require('cids')
const Block = require('ipfs-block')
const Buffer = require('safe-buffer').Buffer
+const uuid = require('uuid/v4')
module.exports = (callback) => {
- const data = Buffer.from(`hello world ${Math.random()}`)
+ const data = Buffer.from(`hello world ${uuid()}`)
multihashing(data, 'sha2-256', (err, hash) => {
if (err) { return callback(err) }
diff --git a/test/utils/mocks.js b/test/utils/mocks.js
index eea8c36e..debb1124 100644
--- a/test/utils/mocks.js
+++ b/test/utils/mocks.js
@@ -13,6 +13,7 @@ const PeerBook = require('peer-book')
const Node = require('./create-libp2p-node').bundle
const os = require('os')
const Repo = require('ipfs-repo')
+const EventEmitter = require('events')
const Bitswap = require('../../src')
@@ -22,9 +23,10 @@ const Bitswap = require('../../src')
exports.mockLibp2pNode = () => {
const peerInfo = new PeerInfo(PeerId.createFromHexString('122019318b6e5e0cf93a2314bf01269a2cc23cd3dcd452d742cdb9379d8646f6e4a9'))
- return {
+ return Object.assign(new EventEmitter(), {
peerInfo: peerInfo,
handle () {},
+ unhandle () {},
contentRouting: {
provide: (cid, callback) => callback(),
findProviders: (cid, timeout, callback) => callback(null, [])
@@ -38,7 +40,7 @@ exports.mockLibp2pNode = () => {
setMaxListeners () {}
},
peerBook: new PeerBook()
- }
+ })
}
/*
diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js
index c40590f9..539f71cc 100644
--- a/test/wantmanager/index.spec.js
+++ b/test/wantmanager/index.spec.js
@@ -67,7 +67,7 @@ describe('WantManager', () => {
expect(m[0]).to.be.eql(calls.connects[i])
if (!m[1].equals(msgs[i])) {
return done(
- new Error(`expected ${m[1].toString()} to equal ${msgs[1].toString()}`)
+ new Error(`expected ${m[1].toString()} to equal ${msgs[i].toString()}`)
)
}
})