diff --git a/src/core/components/pin.js b/src/core/components/pin.js index 40d661e9a0..87dcfedc3b 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -41,7 +41,7 @@ module.exports = (self) => { map(mhs, (multihash, cb) => { const key = toB58String(multihash) if (recursive) { - if (pinManager.recursivePins.has(key)) { + if (pinManager.pinsets.recursive.hasPin(key)) { // it's already pinned recursively return cb(null, null) } @@ -54,11 +54,11 @@ module.exports = (self) => { return cb(null, key) }) } else { - if (pinManager.recursivePins.has(key)) { + if (pinManager.pinsets.recursive.hasPin(key)) { // recursive supersedes direct, can't have both return cb(new Error(`${key} already pinned recursively`)) } - if (pinManager.directPins.has(key)) { + if (pinManager.pinsets.direct.hasPin(key)) { // already directly pinned return cb(null, null) } @@ -222,7 +222,7 @@ module.exports = (self) => { let pins = [] if (type === PinTypes.direct || type === PinTypes.all) { pins = pins.concat( - Array.from(pinManager.directPins).map(hash => ({ + [...pinManager.pinsets.direct.pinKeys].map(hash => ({ type: PinTypes.direct, hash })) @@ -230,7 +230,7 @@ module.exports = (self) => { } if (type === PinTypes.recursive || type === PinTypes.all) { pins = pins.concat( - Array.from(pinManager.recursivePins).map(hash => ({ + [...pinManager.pinsets.recursive.pinKeys].map(hash => ({ type: PinTypes.recursive, hash })) @@ -244,7 +244,7 @@ module.exports = (self) => { // report the indirect entry .filter(({ hash }) => !indirects.includes(hash) || - (indirects.includes(hash) && !pinManager.directPins.has(hash)) + (indirects.includes(hash) && !pinManager.pinsets.direct.hasPin(hash)) ) .concat(indirects.map(hash => ({ type: PinTypes.indirect, diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js index fba7aeaf49..e15c12e705 100644 --- a/src/core/components/pin/pin-manager.js +++ b/src/core/components/pin/pin-manager.js @@ -1,27 +1,27 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const { DAGNode, DAGLink, util } = require('ipld-dag-pb') +const { DAGNode, util } = require('ipld-dag-pb') const CID = require('cids') const map = require('async/map') -const series = require('async/series') const parallel = require('async/parallel') const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') const detectLimit = require('async/detectLimit') const { Key } = require('interface-datastore') const errCode = require('err-code') -const multicodec = require('multicodec') -const createPinSet = require('./pin-set') +const PinStore = require('./pin-store') +const PinSet = require('./pin-set') const Lock = require('./lock') // arbitrary limit to the number of concurrent dag operations const concurrencyLimit = 300 const PIN_DS_KEY = new Key('/local/pins') +const NO_PINS_ERR = 'No pins to load' function toB58String (hash) { - return new CID(hash).toBaseEncodedString() + return new CID(hash).toBaseEncodedString('base58btc') } function invalidPinTypeErr (type) { @@ -40,25 +40,20 @@ class PinManager { constructor (repo, dag, repoOwner, log) { this.repo = repo this.dag = dag + this.store = new PinStore(dag) this.log = log - this.pinset = createPinSet(dag) - this.directPins = new Set() - this.recursivePins = new Set() + this.pinsets = { + direct: new PinSet(PinTypes.direct, this.store), + recursive: new PinSet(PinTypes.recursive, this.store) + } + this._linkCache = {} this._lock = new Lock(repoOwner, 'ipfs:pin-manager:lock') } - directKeys () { - return Array.from(this.directPins).map(key => new CID(key).buffer) - } - - recursiveKeys () { - return Array.from(this.recursivePins).map(key => new CID(key).buffer) - } - getIndirectKeys (callback) { this._lock.readLock((lockCb) => { const indirectKeys = new Set() - eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => { + eachLimit([...this.pinsets.recursive.pinKeys], concurrencyLimit, (multihash, cb) => { this.dag._getRecursive(multihash, (err, nodes) => { if (err) { return cb(err) @@ -74,7 +69,8 @@ class PinManager { cids .map(cid => cid.toString()) // recursive pins pre-empt indirect pins - .filter(key => !this.recursivePins.has(key)) + // .filter(key => !this.recursivePins.has(key)) + .filter(key => !this.pinsets.recursive.hasPin(key)) .forEach(key => indirectKeys.add(key)) cb() @@ -88,22 +84,23 @@ class PinManager { } addRecursivePins (keys, callback) { - this._addPins(keys, this.recursivePins, callback) + this._addPins(keys, this.pinsets.recursive, callback) } addDirectPins (keys, callback) { - this._addPins(keys, this.directPins, callback) + this._addPins(keys, this.pinsets.direct, callback) } - _addPins (keys, pinSet, callback) { + _addPins (keys, pinset, callback) { this._lock.writeLock((lockCb) => { - keys = keys.filter(key => !pinSet.has(key)) - if (!keys.length) return lockCb(null, []) - - for (const key of keys) { - pinSet.add(key) - } - this._flushPins(lockCb) + // Add pins to the pin set (direct or recursive) + pinset.addPins(keys, (err, changed) => { + if (err) { return lockCb(err) } + // If the pin set was changed, update the root node to point at the + // changed pin set and write it out + if (changed) { return this._saveRootNode(lockCb) } + return lockCb() + }) }, callback) } @@ -111,15 +108,34 @@ class PinManager { if (!keys.length) return callback(null, []) this._lock.writeLock((lockCb) => { + const pins = { + direct: new Set(), + recursive: new Set() + } + for (const key of keys) { - if (recursive && this.recursivePins.has(key)) { - this.recursivePins.delete(key) + if (recursive && this.pinsets.recursive.hasPin(key)) { + pins.recursive.add(key) } else { - this.directPins.delete(key) + pins.direct.add(key) } } - this._flushPins(lockCb) + waterfall([ + (cb) => parallel([ + // Remove pins from the pin sets + (pcb) => this.pinsets.direct.rmPins([...pins.direct], pcb), + (pcb) => this.pinsets.recursive.rmPins([...pins.recursive], pcb) + ], cb), + (changed, cb) => { + // If either of the pin sets was changed, update the root node to + // point at the changed pin sets and write it out + if (changed[0] || changed[1]) { + return this._saveRootNode(cb) + } + cb() + } + ], lockCb) }, callback) } @@ -127,83 +143,38 @@ class PinManager { // a DAGLink for each of the recursive and direct pinsets // a DAGNode holding those as DAGLinks, a kind of root pin // Note: should only be called within a lock - _flushPins (callback) { - let dLink, rLink, root - series([ - // create a DAGLink to the node with direct pins - cb => waterfall([ - cb => this.pinset.storeSet(this.directKeys(), cb), - ({ node, cid }, cb) => { - try { - cb(null, new DAGLink(PinTypes.direct, node.size, cid)) - } catch (err) { - cb(err) - } - }, - (link, cb) => { dLink = link; cb(null) } + _saveRootNode (callback) { + let root + + waterfall([ + // ensure that there are nodes for both the direct and recursive pin sets + (cb) => parallel([ + pcb => this.pinsets.direct.saveSet(pcb), + pcb => this.pinsets.recursive.saveSet(pcb) ], cb), - // create a DAGLink to the node with recursive pins - cb => waterfall([ - cb => this.pinset.storeSet(this.recursiveKeys(), cb), - ({ node, cid }, cb) => { - try { - cb(null, new DAGLink(PinTypes.recursive, node.size, cid)) - } catch (err) { - cb(err) - } - }, - (link, cb) => { rLink = link; cb(null) } - ], cb), - - // the pin-set nodes link to a special 'empty' node, so make sure it exists - cb => { - let empty - - try { - empty = DAGNode.create(Buffer.alloc(0)) - } catch (err) { - return cb(err) - } - - this.dag.put(empty, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, cb) - }, - - // create a root node with DAGLinks to the direct and recursive DAGs - cb => { - let node - + // create a root node with DAGLinks to the direct and recursive pin sets + (res, cb) => { try { - node = DAGNode.create(Buffer.alloc(0), [dLink, rLink]) + root = DAGNode.create(Buffer.alloc(0), res.map(r => r.link)) } catch (err) { return cb(err) } - root = node - this.dag.put(root, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, (err, cid) => { + this.store.save(root, (err, cid) => { if (!err) { root.multihash = cid.buffer } - cb(err) + return cb(err) }) }, // hack for CLI tests - cb => this.repo.closed ? this.repo.open(cb) : cb(null, null), + cb => this.repo.closed ? this.repo.open(cb) : cb(), // save root to datastore under a consistent key cb => this.repo.datastore.put(PIN_DS_KEY, root.multihash, cb) - ], (err, res) => { + ], (err) => { if (err) { return callback(err) } this.log(`Flushed pins with root: ${root}`) return callback(null, root) @@ -215,15 +186,17 @@ class PinManager { waterfall([ // hack for CLI tests (cb) => this.repo.closed ? this.repo.datastore.open(cb) : cb(null, null), + // Get root node CID from datastore (_, cb) => this.repo.datastore.has(PIN_DS_KEY, cb), - (has, cb) => has ? cb() : cb(new Error('No pins to load')), + (has, cb) => has ? cb() : cb(new Error(NO_PINS_ERR)), (cb) => this.repo.datastore.get(PIN_DS_KEY, cb), + // Load root node (mh, cb) => { - this.dag.get(new CID(mh), '', { preload: false }, cb) + this.store.fetch(new CID(mh), cb) } ], (err, pinRoot) => { if (err) { - if (err.message === 'No pins to load') { + if (err.message === NO_PINS_ERR) { this.log('No pins to load') return lockCb() } else { @@ -231,18 +204,15 @@ class PinManager { } } + // Load the direct and recursive pin sets parallel([ - cb => this.pinset.loadSet(pinRoot.value, PinTypes.recursive, cb), - cb => this.pinset.loadSet(pinRoot.value, PinTypes.direct, cb) - ], (err, keys) => { - if (err) { return lockCb(err) } - const [ rKeys, dKeys ] = keys - - this.directPins = new Set(dKeys.map(toB58String)) - this.recursivePins = new Set(rKeys.map(toB58String)) - - this.log('Loaded pins from the datastore') - return lockCb(null) + cb => this.pinsets.direct.loadSet(pinRoot.value, cb), + cb => this.pinsets.recursive.loadSet(pinRoot.value, cb) + ], (err) => { + if (!err) { + this.log('Loaded pins from the datastore') + } + return lockCb(err) }) }) }, callback) @@ -253,7 +223,7 @@ class PinManager { const { recursive, direct, all } = PinTypes // recursive - if ((type === recursive || type === all) && this.recursivePins.has(key)) { + if ((type === recursive || type === all) && this.pinsets.recursive.hasPin(key)) { return callback(null, { key, pinned: true, @@ -269,7 +239,7 @@ class PinManager { } // direct - if ((type === direct || type === all) && this.directPins.has(key)) { + if ((type === direct || type === all) && this.pinsets.direct.hasPin(key)) { return callback(null, { key, pinned: true, @@ -288,11 +258,12 @@ class PinManager { // indirect (default) // check each recursive key to see if multihash is under it // arbitrary limit, enables handling 1000s of pins. - detectLimit(this.recursiveKeys().map(key => new CID(key)), concurrencyLimit, (cid, cb) => { + const cids = [...this.pinsets.recursive.pinKeys].map(key => new CID(key)) + detectLimit(cids, concurrencyLimit, (cid, cb) => { waterfall([ - (done) => this.dag.get(cid, '', { preload: false }, done), + (done) => this.store.fetch(cid, done), (result, done) => done(null, result.value), - (node, done) => this.pinset.hasDescendant(node, key, done) + (node, done) => this.pinsets.recursive.hasDescendant(node, key, done) ], cb) }, (err, cid) => lockCb(err, { key, @@ -315,7 +286,7 @@ class PinManager { } const cid = new CID(mh) - this.dag.get(cid, '', { preload: false }, (err, obj) => { + this.store.fetch(cid, (err, obj) => { if (err) { return lockCb(new Error(`Could not get pin sets from store: ${err.message}`)) } @@ -325,7 +296,7 @@ class PinManager { // 2. The recursively pinned CIDs // If large enough, these pin sets may have links to buckets to hold // the pins - this.pinset.getInternalCids(obj.value, (err, cids) => { + PinSet.getInternalCids(this.store, obj.value, (err, cids) => { if (err) { return lockCb(new Error(`Could not get pinner internal cids: ${err.message}`)) } diff --git a/src/core/components/pin/pin-set-cache.js b/src/core/components/pin/pin-set-cache.js new file mode 100644 index 0000000000..ce692cbfab --- /dev/null +++ b/src/core/components/pin/pin-set-cache.js @@ -0,0 +1,64 @@ +'use strict' + +const fnv1a = require('fnv1a') + +class PinSetCache { + constructor () { + this.fanoutLinks = [] + this.subcaches = [] + } + + get (index, pins) { + if (!this.fanoutLinks[index]) return null + + const cacheId = PinSetCache.getCacheId(pins) + if (this.fanoutLinks[index].id === cacheId) { + return this.fanoutLinks[index].link + } + return null + } + + put (index, pins, link) { + this.fanoutLinks[index] = { + id: PinSetCache.getCacheId(pins), + link + } + } + + getSubcache (index) { + if (!this.subcaches[index]) { + this.subcaches[index] = new PinSetCache() + } + return this.subcaches[index] + } + + clearMissing (pins) { + for (const i of Object.keys(this.fanoutLinks)) { + if (!pins[i]) { + delete this.fanoutLinks[i] + } + } + for (const i of Object.keys(this.subcaches)) { + if (!pins[i]) { + delete this.subcaches[i] + } + } + } + + walkItems (step, level = this) { + for (const i of Object.keys(level.fanoutLinks)) { + step(level.fanoutLinks[i].link, i) + } + for (const i of Object.keys(level.subcaches)) { + this.walkItems(step, level.subcaches[i]) + } + } + + static getCacheId (pins) { + const hashLen = pins[0].key.multihash.length + const buff = Buffer.concat(pins.map(p => p.key.multihash), hashLen * pins.length) + return fnv1a(buff.toString('binary')) + } +} + +module.exports = PinSetCache diff --git a/src/core/components/pin/pin-set.js b/src/core/components/pin/pin-set.js index 04a389bf2c..4ee0c86273 100644 --- a/src/core/components/pin/pin-set.js +++ b/src/core/components/pin/pin-set.js @@ -6,21 +6,24 @@ const protobuf = require('protons') const fnv1a = require('fnv1a') const varint = require('varint') const { DAGNode, DAGLink } = require('ipld-dag-pb') -const multicodec = require('multicodec') -const someSeries = require('async/someSeries') -const eachSeries = require('async/eachSeries') -const eachOfSeries = require('async/eachOfSeries') +const some = require('async/some') +const each = require('async/each') +const eachOf = require('async/eachOf') +const debug = require('debug') +const log = debug('ipfs:pin:pin-set') +const PinSetCache = require('./pin-set-cache') const pbSchema = require('./pin.proto') const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' const emptyKey = multihashes.fromB58String(emptyKeyHash) const defaultFanout = 256 -const maxItems = 8192 +const defaultFanoutLink = new DAGLink('', 1, emptyKey) +const defaultMaxItems = 8192 const pb = protobuf(pbSchema) function toB58String (hash) { - return new CID(hash).toBaseEncodedString() + return new CID(hash).toBaseEncodedString('base58btc') } function readHeader (rootNode) { @@ -58,234 +61,358 @@ function hash (seed, key) { return fnv1a(data.toString('binary')) } -exports = module.exports = function (dag) { - const pinSet = { - // should this be part of `object` API? - hasDescendant: (root, childhash, callback) => { - const seen = {} - - if (CID.isCID(childhash) || Buffer.isBuffer(childhash)) { - childhash = toB58String(childhash) - } +class PinSet { + constructor (pinType, store, fanout = defaultFanout, maxItems = defaultMaxItems) { + this.pinType = pinType + this.store = store + this.fanout = fanout + this.maxItems = maxItems + this.pinKeys = new Set() + this.cache = new PinSetCache() + } - return searchChildren(root, callback) + hasPin (key) { + return this.pinKeys.has(key) + } - function searchChildren (root, cb) { - someSeries(root.Links, (link, done) => { - const cid = link.Hash - const bs58Link = toB58String(cid) + // should this be part of `object` API? + hasDescendant (root, childhash, callback) { + const seen = {} - if (bs58Link === childhash) { - return done(null, true) - } + if (CID.isCID(childhash) || Buffer.isBuffer(childhash)) { + childhash = toB58String(childhash) + } - if (bs58Link in seen) { - return done(null, false) - } + return this.searchChildren(root, childhash, seen, callback) + } - seen[bs58Link] = true + searchChildren (root, childhash, seen, cb) { + some(root.Links, (link, done) => { + const cid = link.Hash + const bs58Link = toB58String(cid) - dag.get(cid, '', { preload: false }, (err, res) => { - if (err) { - return done(err) - } + if (bs58Link === childhash) { + return done(null, true) + } - searchChildren(res.value, done) - }) - }, cb) + if (bs58Link in seen) { + return done(null, false) } - }, - storeSet: (keys, callback) => { - const pins = keys.map(key => { - if (typeof key === 'string' || Buffer.isBuffer(key)) { - key = new CID(key) - } + seen[bs58Link] = true - return { - key: key, - data: null + this.store.fetch(cid, (err, res) => { + if (err) { + return done(err) } + + this.searchChildren(res.value, childhash, seen, done) }) + }, cb) + } + + addPins (keys, callback) { + // Make sure there are some new pins to add + keys = keys.filter(key => !this.hasPin(key)) + if (this.stored && !keys.length) return callback(null, false) + + for (const key of keys) { + this.pinKeys.add(key) + } + this.storeSet(this.pinKeys, (err) => callback(err, true)) + } + + rmPins (keys, callback) { + // Make sure there are some pins to remove + keys = keys.filter(key => this.hasPin(key)) + if (this.stored && !keys.length) return callback(null, false) + + for (const key of keys) { + this.pinKeys.delete(key) + } + this.storeSet(this.pinKeys, (err) => callback(err, true)) + } - pinSet.storeItems(pins, (err, rootNode) => { + // Store the current pin set if it hasn't already been stored + saveSet (callback) { + if (this.stored) { + return callback(null, this.stored) + } + + this.storeSet(this.pinKeys, callback) + } + + // Store the given set of keys + storeSet (keys, callback) { + const pins = [...keys].map(key => { + key = new CID(key) + + return { + key: key, + data: null + } + }) + + log(`storing ${pins.length} ${this.pinType} pins`) + this.storeItems(pins, this.cache, (err, rootNode) => { + if (err) { return callback(err) } + + log(`stored ${pins.length} ${this.pinType} pins`) + this.store.save(rootNode, (err, cid) => { if (err) { return callback(err) } - dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, (err, cid) => { - if (err) { return callback(err, cid) } - callback(null, { node: rootNode, cid }) - }) + this.stored = { + cid, + node: rootNode, + link: new DAGLink(this.pinType, rootNode.size, cid) + } + + callback(null, this.stored) }) - }, + }) + } + + storeItems (items, cache, callback) { + if (this.emptyNode) { + return this.storePins(items, 0, cache, callback) + } - storeItems: (items, callback) => { - return storePins(items, 0, callback) + // The pin-set nodes link to a special 'empty' node, so make sure it exists + let empty + try { + empty = DAGNode.create(Buffer.alloc(0)) + } catch (err) { + return callback(err) + } + + this.emptyNode = this.store.save(empty, (err) => { + if (err) { return callback(err) } - function storePins (pins, depth, storePinsCb) { - const pbHeader = pb.Set.encode({ - version: 1, - fanout: defaultFanout, - seed: depth + this.storePins(items, 0, cache, callback) + }) + } + + storePins (pins, depth, psCache, storePinsCb) { + // A header with the version and the fanout (number of bins) at this depth + const pbHeader = pb.Set.encode({ + version: 1, + fanout: this.fanout, + seed: depth + }) + const headerBuf = Buffer.concat([ + Buffer.from(varint.encode(pbHeader.length)), pbHeader + ]) + + // Initialize the fanout links (links to bins) to point to an empty DAGNode + const fanoutLinks = [] + for (let i = 0; i < this.fanout; i++) { + fanoutLinks[i] = defaultFanoutLink + } + + // If there are less than maxItems pins, just store them all in one DAGNode + if (pins.length <= this.maxItems) { + const nodes = pins + .map(item => { + return ({ + link: new DAGLink('', 1, item.key), + data: item.data || Buffer.alloc(0) + }) }) - const headerBuf = Buffer.concat([ - Buffer.from(varint.encode(pbHeader.length)), pbHeader - ]) - const fanoutLinks = [] - for (let i = 0; i < defaultFanout; i++) { - fanoutLinks.push(new DAGLink('', 1, emptyKey)) + // sorting makes any ordering of `pins` produce the same DAGNode + .sort((a, b) => Buffer.compare(a.link.Hash.buffer, b.link.Hash.buffer)) + + // Add the pin links to the (empty) bin links + const rootLinks = fanoutLinks.concat(nodes.map(item => item.link)) + const rootData = Buffer.concat( + [headerBuf].concat(nodes.map(item => item.data)) + ) + + let rootNode + + // Create the DAGNode with all the links in it + try { + rootNode = DAGNode.create(rootData, rootLinks) + } catch (err) { + return storePinsCb(err) + } + + return storePinsCb(null, rootNode) + } else { + // If the array of pins is > maxItems, we: + // - distribute the pins among `defaultFanout` bins + // - create a DAGNode for each bin + // - add each pin as a DAGLink to that bin + // - create a root DAGNode + // - add each bin as a DAGLink + // - send that root DAGNode via callback + // (using go-ipfs' "wasteful but simple" approach for consistency) + // https://github.com/ipfs/go-ipfs/blob/master/pin/set.go#L57 + + const bins = pins.reduce((bins, pin) => { + const n = hash(depth, pin.key) % this.fanout + bins[n] = n in bins ? bins[n].concat([pin]) : [pin] + return bins + }, {}) + + // Clear any cache slots for removed pins + psCache.clearMissing(bins) + + eachOf(bins, (bin, idx, eachCb) => { + // Check if the bin at this index is unchanged + const link = psCache.get(idx, bin) + if (link) { + // log(' cache hit') + fanoutLinks[idx] = link + return eachCb() } + // log(' cache miss') - if (pins.length <= maxItems) { - const nodes = pins - .map(item => { - return ({ - link: new DAGLink('', 1, item.key), - data: item.data || Buffer.alloc(0) - }) - }) - // sorting makes any ordering of `pins` produce the same DAGNode - .sort((a, b) => Buffer.compare(a.link.Hash.buffer, b.link.Hash.buffer)) + // Store the pins in this bin + this.storePins( + bin, + depth + 1, + psCache.getSubcache(idx), + (err, child) => { + if (err) { return eachCb(err) } - const rootLinks = fanoutLinks.concat(nodes.map(item => item.link)) - const rootData = Buffer.concat( - [headerBuf].concat(nodes.map(item => item.data)) - ) + this.store.save(child, (err, cid) => { + if (err) { return eachCb(err) } - let rootNode + const link = new DAGLink('', child.size, cid) + fanoutLinks[idx] = link + psCache.put(idx, bin, link) - try { - rootNode = DAGNode.create(rootData, rootLinks) - } catch (err) { - return storePinsCb(err) + eachCb() + }) } + ) + }, err => { + if (err) { return storePinsCb(err) } - return storePinsCb(null, rootNode) - } else { - // If the array of pins is > maxItems, we: - // - distribute the pins among `defaultFanout` bins - // - create a DAGNode for each bin - // - add each pin as a DAGLink to that bin - // - create a root DAGNode - // - add each bin as a DAGLink - // - send that root DAGNode via callback - // (using go-ipfs' "wasteful but simple" approach for consistency) - // https://github.com/ipfs/go-ipfs/blob/master/pin/set.go#L57 - - const bins = pins.reduce((bins, pin) => { - const n = hash(depth, pin.key) % defaultFanout - bins[n] = n in bins ? bins[n].concat([pin]) : [pin] - return bins - }, {}) - - eachOfSeries(bins, (bin, idx, eachCb) => { - storePins( - bin, - depth + 1, - (err, child) => storeChild(err, child, idx, eachCb) - ) - }, err => { - if (err) { return storePinsCb(err) } - - let rootNode - - try { - rootNode = DAGNode.create(headerBuf, fanoutLinks) - } catch (err) { - return storePinsCb(err) - } + let rootNode - return storePinsCb(null, rootNode) - }) + // Create a DAGNode with links to all the bins + try { + rootNode = DAGNode.create(headerBuf, fanoutLinks) + } catch (err) { + return storePinsCb(err) } - function storeChild (err, child, binIdx, cb) { - if (err) { return cb(err) } + return storePinsCb(null, rootNode) + }) + } + } - const opts = { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - } + // Load the pin set + loadSet (rootNode, callback) { + let index = rootNode.Links.findIndex(l => l.Name === this.pinType) + if (index === -1) { + return callback(new Error('No link found with name ' + this.pinType)) + } - dag.put(child, opts, (err, cid) => { - if (err) { return cb(err) } - fanoutLinks[binIdx] = new DAGLink('', child.size, cid) - cb(null) - }) - } - } - }, + return this.loadSetAt(rootNode, index, callback) + } - loadSet: (rootNode, name, callback) => { - const link = rootNode.Links.find(l => l.Name === name) - if (!link) { - return callback(new Error('No link found with name ' + name)) - } + // Load a pin set from the given node at the given index + loadSetAt (rootNode, index, callback) { + // Get the link at the given index + let link = rootNode.Links[index] + if (!link) { + return callback(new Error('No link found at index ' + index)) + } - dag.get(link.Hash, '', { preload: false }, (err, res) => { + // Fetch the DAGNode pointed to by the link + this.store.fetch(link.Hash, (err, res) => { + if (err) { return callback(err) } + + // Get the pins from the node + const keys = [] + const stepPin = link => keys.push(link.Hash.buffer) + const cache = this.cache + this.walkItems(res.value, { stepPin, cache }, err => { if (err) { return callback(err) } - const keys = [] - const stepPin = link => keys.push(link.Hash.buffer) - pinSet.walkItems(res.value, { stepPin }, err => { - if (err) { return callback(err) } - return callback(null, keys) - }) + + // Initialize the pin set cache + this.pinKeys = new Set(keys.map(toB58String)) + return callback(null, keys) }) - }, + }) + } - walkItems: (node, { stepPin = () => {}, stepBin = () => {} }, callback) => { - let pbh - try { - pbh = readHeader(node) - } catch (err) { - return callback(err) - } + // Walk items in this pin set's store + walkItems (node, opts, callback) { + PinSet.walkStoreItems(this.store, node, opts, callback) + } + + // Walk items in the given store starting at the given node + // stepPin(link, index, data) - called each time a pin is encountered + // stepBin(link, index, data) - called each time a (non-empty) bin is encountered + static walkStoreItems (store, node, { stepPin = () => {}, stepBin = () => {}, cache }, callback) { + let pbh + try { + pbh = readHeader(node) + } catch (err) { + return callback(err) + } + + let pins = [] + eachOf(node.Links, (link, idx, eachCb) => { + if (idx < pbh.header.fanout) { + // the first pbh.header.fanout links are fanout bins + // if a fanout bin is not 'empty', dig into and walk its DAGLinks + const linkHash = link.Hash.buffer - eachOfSeries(node.Links, (link, idx, eachCb) => { - if (idx < pbh.header.fanout) { - // the first pbh.header.fanout links are fanout bins - // if a fanout bin is not 'empty', dig into and walk its DAGLinks - const linkHash = link.Hash.buffer + if (!emptyKey.equals(linkHash)) { + stepBin(link, idx, pbh.data) - if (!emptyKey.equals(linkHash)) { - stepBin(link, idx, pbh.data) + // walk the links of this fanout bin + return store.fetch(linkHash, (err, res) => { + if (err) { return eachCb(err) } - // walk the links of this fanout bin - return dag.get(linkHash, '', { preload: false }, (err, res) => { + const opts = { + stepPin, + stepBin, + cache: cache && cache.getSubcache(idx) + } + PinSet.walkStoreItems(store, res.value, opts, (err, subPins) => { if (err) { return eachCb(err) } - pinSet.walkItems(res.value, { stepPin, stepBin }, eachCb) + + pins = pins.concat(subPins) + cache && cache.put(idx, subPins, link) + + eachCb(null) }) - } - } else { - // otherwise, the link is a pin - stepPin(link, idx, pbh.data) + }) } + } else { + // otherwise, the link is a pin + stepPin(link, idx, pbh.data) + pins.push({ key: link.Hash, data: pbh.data }) + } - eachCb(null) - }, callback) - }, + eachCb(null) + }, (err) => callback(err, pins)) + } - getInternalCids: (rootNode, callback) => { - // "Empty block" used by the pinner - const cids = [new CID(emptyKey)] + // Get CIDs used internally by the pinner + // - the empty block + // - all internal structural nodes, ie the "bins" that hold groups of pins + static getInternalCids (store, rootNode, callback) { + // "Empty block" used by the pinner + const cids = [new CID(emptyKey)] - const stepBin = link => cids.push(link.Hash) - eachSeries(rootNode.Links, (topLevelLink, cb) => { - cids.push(topLevelLink.Hash) + const stepBin = link => cids.push(link.Hash) + each(rootNode.Links, (topLevelLink, cb) => { + cids.push(topLevelLink.Hash) - dag.get(topLevelLink.Hash, '', { preload: false }, (err, res) => { - if (err) { return cb(err) } + store.fetch(topLevelLink.Hash, (err, res) => { + if (err) { return cb(err) } - pinSet.walkItems(res.value, { stepBin }, cb) - }) - }, (err) => callback(err, cids)) - } + PinSet.walkStoreItems(store, res.value, { stepBin }, cb) + }) + }, (err) => callback(err, cids)) } - return pinSet } + +module.exports = PinSet +module.exports.EmptyKeyHash = emptyKeyHash diff --git a/src/core/components/pin/pin-store.js b/src/core/components/pin/pin-store.js new file mode 100644 index 0000000000..28603cfe0e --- /dev/null +++ b/src/core/components/pin/pin-store.js @@ -0,0 +1,25 @@ +'use strict' + +const multicodec = require('multicodec') + +// Wrapper around dag to set preload to false and write with DAG_PB format +class PinStore { + constructor (dag) { + this.dag = dag + } + + fetch (cid, cb) { + this.dag.get(cid, '', { preload: false }, cb) + } + + save (node, cb) { + this.dag.put(node, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }, cb) + } +} + +module.exports = PinStore diff --git a/test/core/pin-set.js b/test/core/pin-set.js index 73bfb69044..bb7effcbc4 100644 --- a/test/core/pin-set.js +++ b/test/core/pin-set.js @@ -9,6 +9,7 @@ chai.use(dirtyChai) const parallelLimit = require('async/parallelLimit') const series = require('async/series') +const waterfall = require('async/waterfall') const { util: { cid, @@ -19,7 +20,8 @@ const { const CID = require('cids') const IPFS = require('../../src/core') -const createPinSet = require('../../src/core/components/pin/pin-set') +const PinStore = require('../../src/core/components/pin/pin-store') +const PinSet = require('../../src/core/components/pin/pin-set') const createTempRepo = require('../utils/create-repo-nodejs') const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' @@ -69,6 +71,7 @@ function createNode (data, links = [], callback) { describe('pinSet', function () { let ipfs let pinSet + let store let repo before(function (done) { @@ -86,7 +89,8 @@ describe('pinSet', function () { } }) ipfs.on('ready', () => { - pinSet = createPinSet(ipfs.dag) + store = new PinStore(ipfs.dag) + pinSet = new PinSet('recursive', store) done() }) }) @@ -105,11 +109,17 @@ describe('pinSet', function () { createNode('data', (err, result) => { expect(err).to.not.exist() const nodeHash = result.cid.toBaseEncodedString() + + // Store a single pin pinSet.storeSet([nodeHash], (err, rootNode) => { expect(err).to.not.exist() + + // Should have the correct CID expect(rootNode.cid.toBaseEncodedString()).to.eql(expectedRootHash) + // Node should have links to `defaultFanout` empty bins + one pin expect(rootNode.node.Links).to.have.length(defaultFanout + 1) + // Last link should be link to the pin const lastLink = rootNode.node.Links[rootNode.node.Links.length - 1] const mhash = lastLink.Hash.toBaseEncodedString() expect(mhash).to.eql(nodeHash) @@ -123,22 +133,34 @@ describe('pinSet', function () { it('handles storing items > maxItems', function (done) { this.timeout(90 * 1000) const expectedHash = 'QmbvhSy83QWfgLXDpYjDmLWBFfGc8utoqjcXHyj3gYuasT' + + // Once there are more than maxItems pins, the pins should be distributed + // across `defaultFanout` bins const count = maxItems + 1 createNodes(count, (err, cids) => { expect(err).to.not.exist() + pinSet.storeSet(cids, (err, result) => { expect(err).to.not.exist() expect(result.node.size).to.eql(3184696) + + // Pins should be distributed across `defaultFanout` bins expect(result.node.Links).to.have.length(defaultFanout) expect(result.cid.toBaseEncodedString()).to.eql(expectedHash) - pinSet.loadSet(result.node, '', (err, loaded) => { + // Load the pins in the first bin + pinSet.loadSetAt(result.node, 0, (err, loaded) => { expect(err).to.not.exist() + + // Should have 30 pins in it (the way the pins are created for + // this test is deterministic and will result in 30 pins falling + // into the first bin) expect(loaded).to.have.length(30) const hashes = loaded.map(l => new CID(l).toBaseEncodedString()) - // just check the first node, assume all are children if successful + // Make sure hasDescendant() finds a pin inside a bin. + // Just check the first node, assume all are children if successful pinSet.hasDescendant(result.node, hashes[0], (err, has) => { expect(err).to.not.exist() expect(has).to.eql(true) @@ -181,6 +203,114 @@ describe('pinSet', function () { }) }) + describe('add and remove', function () { + function loadThenCheck (ps, ks1, ks2, cb) { + ps.saveSet((err, result) => { + expect(err).to.not.exist() + + const rootNode = DAGNode.create('pins', [{ Hash: result.cid }]) + pinSet.loadSetAt(rootNode, 0, (err, loaded) => { + expect(err).to.not.exist() + + expect([...ks1].sort()).eql([...ks2].sort()) + + cb() + }) + }) + } + + it('adds pins', function (done) { + const testPinSet = new PinSet('recursive', store) + + createNodes(5, (err, keys) => { + expect(err).to.not.exist() + + testPinSet.addPins(keys, (err) => { + expect(err).to.not.exist() + + loadThenCheck(testPinSet, testPinSet.pinKeys, keys, done) + }) + }) + }) + + it('removes pins', function (done) { + const testPinSet = new PinSet('recursive', store) + + createNodes(5, (err, keys) => { + expect(err).to.not.exist() + + series([ + (cb) => testPinSet.addPins(keys, cb), + (cb) => testPinSet.rmPins([keys[1], keys[3]], cb), + (cb) => loadThenCheck(testPinSet, testPinSet.pinKeys, [keys[0], keys[2], keys[4]], cb) + ], done) + }) + }) + + it('add existing pins has no effect', function (done) { + const testPinSet = new PinSet('recursive', store) + + createNodes(5, (err, keys) => { + expect(err).to.not.exist() + + waterfall([ + (cb) => testPinSet.addPins(keys, cb), + (changed, cb) => testPinSet.addPins(keys, cb) + ], (err, changed) => { + expect(err).to.not.exist() + + expect(changed).equal(false) + done() + }) + }) + }) + + it('remove non-existent pins has no effect', function (done) { + const testPinSet = new PinSet('recursive', store) + + createNodes(5, (err, keys) => { + expect(err).to.not.exist() + + waterfall([ + (cb) => testPinSet.addPins(keys, cb), + (changed, cb) => testPinSet.rmPins([keys[1], keys[3]], cb), + (changed, cb) => testPinSet.rmPins([keys[1], keys[3]], cb) + ], (err, changed) => { + expect(err).to.not.exist() + + expect(changed).equal(false) + done() + }) + }) + }) + + it('several adds and removes results in correct pin set', function (done) { + const testPinSet = new PinSet('recursive', store) + + createNodes(5, (err, keys) => { + expect(err).to.not.exist() + + series([ + // [0, 1, 2, 3, 4] + (cb) => testPinSet.addPins(keys, cb), + + // [0, -, -, -, 4] + (cb) => testPinSet.rmPins([keys[1], keys[2], keys[3]], cb), + + // [0, 1, -, 3, 4] + (cb) => testPinSet.addPins([keys[0], keys[1], keys[3]], cb), + + // [0, -, -, 3, 4] + (cb) => testPinSet.rmPins([keys[1], keys[2]], cb) + ], (err) => { + expect(err).to.not.exist() + + loadThenCheck(testPinSet, testPinSet.pinKeys, [keys[0], keys[3], keys[4]], done) + }) + }) + }) + }) + describe('walkItems', function () { it(`fails if node doesn't have a pin-set protobuf header`, function (done) { createNode('datum', (err, node) => { @@ -197,46 +327,76 @@ describe('pinSet', function () { it('visits all links of a root node', function (done) { this.timeout(90 * 1000) - const seenPins = [] - const stepPin = (link, idx, data) => seenPins.push({ link, idx, data }) - const seenBins = [] - const stepBin = (link, idx, data) => seenBins.push({ link, idx, data }) + const binsPerNode = 5 + const itemsPerNode = 20 + const testPinSet = new PinSet('recursive', store, binsPerNode, itemsPerNode) - createNodes(maxItems + 1, (err, nodes) => { - expect(err).to.not.exist() + // Generate enough pins that they will be distributed across a structure + // of bins that is a few levels deep + const numPins = binsPerNode * itemsPerNode * 5 + + // Checks that the structure is in the expected shape + function checkStructure (node, cache, cb) { + const seenPins = [] + const stepPin = (link, idx, data) => seenPins.push({ link, idx, data }) + const seenBins = [] + const stepBin = (link, idx, data) => seenBins.push({ link, idx, data }) + + const cacheSeenBins = {} + const cacheStepBin = (link, idx) => { + cacheSeenBins[link.Hash.toString()] = { link, idx } + } + + cache.walkItems(cacheStepBin) + // console.log(cacheSeenBins) - pinSet.storeSet(nodes, (err, result) => { + testPinSet.walkItems(node, { stepPin, stepBin }, err => { expect(err).to.not.exist() - pinSet.walkItems(result.node, { stepPin, stepBin }, err => { - expect(err).to.not.exist() - expect(seenPins).to.have.length(maxItems + 1) - expect(seenBins).to.have.length(defaultFanout) - done() - }) - }) - }) - }) + // Walking the structure we should see every pin + expect(seenPins).to.have.length(numPins) - it('visits all non-fanout links of a root node', function (done) { - const seen = [] - const stepPin = (link, idx, data) => seen.push({ link, idx, data }) + // The way pins are generated for the tests is deterministic, and + // will result in 80 bins being created + expect(seenBins).to.have.length(80) - createNodes(defaultFanout, (err, nodes) => { + // Expect the pins to have the correct fields + for (const item of seenPins) { + expect(item.data).to.eql(Buffer.alloc(0)) + expect(item.link).to.exist() + } + + // Expect the cached bin structure to match the actual structure + expect(Object.keys(cacheSeenBins)).to.have.length(seenBins.length) + for (const bin of seenBins) { + const cachedBin = cacheSeenBins[bin.link.Hash.toString()] + expect(cachedBin).to.exist() + expect(String(cachedBin.idx)).to.equal(String(bin.idx)) + } + + cb() + }) + } + + createNodes(numPins, (err, nodes) => { expect(err).to.not.exist() - pinSet.storeSet(nodes, (err, result) => { + testPinSet.storeSet(nodes, (err, result) => { expect(err).to.not.exist() - pinSet.walkItems(result.node, { stepPin }, err => { - expect(err).to.not.exist() - expect(seen).to.have.length(defaultFanout) - expect(seen[0].idx).to.eql(defaultFanout) - seen.forEach(item => { - expect(item.data).to.eql(Buffer.alloc(0)) - expect(item.link).to.exist() + // Check that the structure has the correct shape + checkStructure(result.node, testPinSet.cache, () => { + // Load the pin set from storage + const rootNode = DAGNode.create('pins', [{ Hash: result.cid }]) + testPinSet.loadSetAt(rootNode, 0, (err) => { + expect(err).to.not.exist() + + // Check that the loaded set matches the original set + expect([...testPinSet.pinKeys].sort()).eql([...nodes].sort()) + + // Check that the structure still has the correct shape + checkStructure(testPinSet.stored.node, testPinSet.cache, done) }) - done() }) }) }) @@ -245,19 +405,34 @@ describe('pinSet', function () { describe('getInternalCids', function () { it('gets all links and empty key CID', function (done) { - createNodes(defaultFanout, (err, nodes) => { + const binsPerNode = 5 + const itemsPerNode = 20 + const testPinSet = new PinSet('recursive', store, binsPerNode, itemsPerNode) + + // Generate enough pins that they will be distributed across a structure + // of bins that is a few levels deep + const numPins = binsPerNode * itemsPerNode * 5 + + createNodes(numPins, (err, nodes) => { expect(err).to.not.exist() - pinSet.storeSet(nodes, (err, result) => { + testPinSet.storeSet(nodes, (err, result) => { expect(err).to.not.exist() const rootNode = DAGNode.create('pins', [{ Hash: result.cid }]) - pinSet.getInternalCids(rootNode, (err, cids) => { + PinSet.getInternalCids(store, rootNode, (err, cids) => { expect(err).to.not.exist() - expect(cids.length).to.eql(2) + + // The way pins are generated for the tests is deterministic, and + // will result in 82 internal cids being created + expect(cids.length).to.eql(82) + const cidStrs = cids.map(c => c.toString()) + // Should include the empty key hash expect(cidStrs).includes(emptyKeyHash) + // Should include the set root node expect(cidStrs).includes(result.cid.toString()) + done() }) }) diff --git a/test/perf/add.js b/test/perf/add.js new file mode 100644 index 0000000000..4e9debe6d9 --- /dev/null +++ b/test/perf/add.js @@ -0,0 +1,71 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const hat = require('hat') +const IPFSFactory = require('ipfsd-ctl') +const IPFS = require('../../src/core') + +describe('files', () => { + let ipfsd, ipfs + + before(function (done) { + this.timeout(20 * 1000) + + const factory = IPFSFactory.create({ type: 'proc' }) + + factory.spawn({ + exec: IPFS, + initOptions: { bits: 512 }, + config: { Bootstrap: [] } + }, (err, _ipfsd) => { + expect(err).to.not.exist() + ipfsd = _ipfsd + ipfs = _ipfsd.api + done() + }) + }) + + after((done) => { + if (ipfsd) { + ipfsd.stop(done) + } else { + done() + } + }) + + const displayDuration = (t) => { + return Math.floor(t / 60000) + 'm' + Math.floor((t % 60000) / 1000) + 's' + } + + describe('add-perf', () => { + it('timing', async function () { + this.timeout(60 * 60 * 1000) + + const batchCount = 30 + const batchSize = 300 + // eslint-disable-next-line no-console + console.log(`Running ${batchCount} batches of size ${batchSize}:`) + let sum = 0 + for (let i = 0; i < batchCount; i++) { + let batchSum = 0 + for (let j = 0; j < batchSize; j++) { + const start = Date.now() + await ipfs.add(Buffer.from(hat())) + batchSum += Date.now() - start + } + sum += batchSum + // console.log('batch sum', displayDuration(batchSum)) + // eslint-disable-next-line no-console + console.log(`batch ${i + 1} avg`, Math.round(batchSum / batchSize) + 'ms') + } + // eslint-disable-next-line no-console + console.log('total', displayDuration(sum)) + // console.log('avg ', Math.round(sum / batchSize * batchCount) + 'ms') + }) + }) +})