Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Use cache to improve pin performance #2198

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module.exports = (self) => {
map(mhs, (multihash, cb) => {
const key = toB58String(multihash)
if (recursive) {
if (pinManager.recursivePins.has(key)) {
if (pinManager.pinsets.recursive.hasPin(key)) {
// it's already pinned recursively
return cb(null, null)
}
Expand All @@ -54,11 +54,11 @@ module.exports = (self) => {
return cb(null, key)
})
} else {
if (pinManager.recursivePins.has(key)) {
if (pinManager.pinsets.recursive.hasPin(key)) {
// recursive supersedes direct, can't have both
return cb(new Error(`${key} already pinned recursively`))
}
if (pinManager.directPins.has(key)) {
if (pinManager.pinsets.direct.hasPin(key)) {
// already directly pinned
return cb(null, null)
}
Expand Down Expand Up @@ -222,15 +222,15 @@ module.exports = (self) => {
let pins = []
if (type === PinTypes.direct || type === PinTypes.all) {
pins = pins.concat(
Array.from(pinManager.directPins).map(hash => ({
[...pinManager.pinsets.direct.pinKeys].map(hash => ({
type: PinTypes.direct,
hash
}))
)
}
if (type === PinTypes.recursive || type === PinTypes.all) {
pins = pins.concat(
Array.from(pinManager.recursivePins).map(hash => ({
[...pinManager.pinsets.recursive.pinKeys].map(hash => ({
type: PinTypes.recursive,
hash
}))
Expand All @@ -244,7 +244,7 @@ module.exports = (self) => {
// report the indirect entry
.filter(({ hash }) =>
!indirects.includes(hash) ||
(indirects.includes(hash) && !pinManager.directPins.has(hash))
(indirects.includes(hash) && !pinManager.pinsets.direct.hasPin(hash))
)
.concat(indirects.map(hash => ({
type: PinTypes.indirect,
Expand Down
197 changes: 84 additions & 113 deletions src/core/components/pin/pin-manager.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'

const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
const { DAGNode, util } = require('ipld-dag-pb')
const CID = require('cids')
const map = require('async/map')
const series = require('async/series')
const parallel = require('async/parallel')
const eachLimit = require('async/eachLimit')
const waterfall = require('async/waterfall')
const detectLimit = require('async/detectLimit')
const { Key } = require('interface-datastore')
const errCode = require('err-code')
const multicodec = require('multicodec')

const createPinSet = require('./pin-set')
const PinStore = require('./pin-store')
const PinSet = require('./pin-set')
const Lock = require('./lock')

// arbitrary limit to the number of concurrent dag operations
const concurrencyLimit = 300
const PIN_DS_KEY = new Key('/local/pins')
const NO_PINS_ERR = 'No pins to load'

function toB58String (hash) {
return new CID(hash).toBaseEncodedString()
return new CID(hash).toBaseEncodedString('base58btc')
}

function invalidPinTypeErr (type) {
Expand All @@ -40,25 +40,20 @@ class PinManager {
constructor (repo, dag, repoOwner, log) {
this.repo = repo
this.dag = dag
this.store = new PinStore(dag)
this.log = log
this.pinset = createPinSet(dag)
this.directPins = new Set()
this.recursivePins = new Set()
this.pinsets = {
direct: new PinSet(PinTypes.direct, this.store),
recursive: new PinSet(PinTypes.recursive, this.store)
}
this._linkCache = {}
this._lock = new Lock(repoOwner, 'ipfs:pin-manager:lock')
}

directKeys () {
return Array.from(this.directPins).map(key => new CID(key).buffer)
}

recursiveKeys () {
return Array.from(this.recursivePins).map(key => new CID(key).buffer)
}

getIndirectKeys (callback) {
this._lock.readLock((lockCb) => {
const indirectKeys = new Set()
eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => {
eachLimit([...this.pinsets.recursive.pinKeys], concurrencyLimit, (multihash, cb) => {
this.dag._getRecursive(multihash, (err, nodes) => {
if (err) {
return cb(err)
Expand All @@ -74,7 +69,8 @@ class PinManager {
cids
.map(cid => cid.toString())
// recursive pins pre-empt indirect pins
.filter(key => !this.recursivePins.has(key))
// .filter(key => !this.recursivePins.has(key))
.filter(key => !this.pinsets.recursive.hasPin(key))
.forEach(key => indirectKeys.add(key))

cb()
Expand All @@ -88,122 +84,97 @@ class PinManager {
}

addRecursivePins (keys, callback) {
this._addPins(keys, this.recursivePins, callback)
this._addPins(keys, this.pinsets.recursive, callback)
}

addDirectPins (keys, callback) {
this._addPins(keys, this.directPins, callback)
this._addPins(keys, this.pinsets.direct, callback)
}

_addPins (keys, pinSet, callback) {
_addPins (keys, pinset, callback) {
this._lock.writeLock((lockCb) => {
keys = keys.filter(key => !pinSet.has(key))
if (!keys.length) return lockCb(null, [])

for (const key of keys) {
pinSet.add(key)
}
this._flushPins(lockCb)
// Add pins to the pin set (direct or recursive)
pinset.addPins(keys, (err, changed) => {
if (err) { return lockCb(err) }
// If the pin set was changed, update the root node to point at the
// changed pin set and write it out
if (changed) { return this._saveRootNode(lockCb) }
return lockCb()
})
}, callback)
}

rmPins (keys, recursive, callback) {
if (!keys.length) return callback(null, [])

this._lock.writeLock((lockCb) => {
const pins = {
direct: new Set(),
recursive: new Set()
}

for (const key of keys) {
if (recursive && this.recursivePins.has(key)) {
this.recursivePins.delete(key)
if (recursive && this.pinsets.recursive.hasPin(key)) {
pins.recursive.add(key)
} else {
this.directPins.delete(key)
pins.direct.add(key)
}
}

this._flushPins(lockCb)
waterfall([
(cb) => parallel([
// Remove pins from the pin sets
(pcb) => this.pinsets.direct.rmPins([...pins.direct], pcb),
(pcb) => this.pinsets.recursive.rmPins([...pins.recursive], pcb)
], cb),
(changed, cb) => {
// If either of the pin sets was changed, update the root node to
// point at the changed pin sets and write it out
if (changed[0] || changed[1]) {
return this._saveRootNode(cb)
}
cb()
}
], lockCb)
}, callback)
}

// Encode and write pin key sets to the datastore:
// a DAGLink for each of the recursive and direct pinsets
// a DAGNode holding those as DAGLinks, a kind of root pin
// Note: should only be called within a lock
_flushPins (callback) {
let dLink, rLink, root
series([
// create a DAGLink to the node with direct pins
cb => waterfall([
cb => this.pinset.storeSet(this.directKeys(), cb),
({ node, cid }, cb) => {
try {
cb(null, new DAGLink(PinTypes.direct, node.size, cid))
} catch (err) {
cb(err)
}
},
(link, cb) => { dLink = link; cb(null) }
_saveRootNode (callback) {
let root

waterfall([
// ensure that there are nodes for both the direct and recursive pin sets
(cb) => parallel([
pcb => this.pinsets.direct.saveSet(pcb),
pcb => this.pinsets.recursive.saveSet(pcb)
], cb),

// create a DAGLink to the node with recursive pins
cb => waterfall([
cb => this.pinset.storeSet(this.recursiveKeys(), cb),
({ node, cid }, cb) => {
try {
cb(null, new DAGLink(PinTypes.recursive, node.size, cid))
} catch (err) {
cb(err)
}
},
(link, cb) => { rLink = link; cb(null) }
], cb),

// the pin-set nodes link to a special 'empty' node, so make sure it exists
cb => {
let empty

try {
empty = DAGNode.create(Buffer.alloc(0))
} catch (err) {
return cb(err)
}

this.dag.put(empty, {
version: 0,
format: multicodec.DAG_PB,
hashAlg: multicodec.SHA2_256,
preload: false
}, cb)
},

// create a root node with DAGLinks to the direct and recursive DAGs
cb => {
let node

// create a root node with DAGLinks to the direct and recursive pin sets
(res, cb) => {
try {
node = DAGNode.create(Buffer.alloc(0), [dLink, rLink])
root = DAGNode.create(Buffer.alloc(0), res.map(r => r.link))
} catch (err) {
return cb(err)
}

root = node
this.dag.put(root, {
version: 0,
format: multicodec.DAG_PB,
hashAlg: multicodec.SHA2_256,
preload: false
}, (err, cid) => {
this.store.save(root, (err, cid) => {
if (!err) {
root.multihash = cid.buffer
}
cb(err)
return cb(err)
})
},

// hack for CLI tests
cb => this.repo.closed ? this.repo.open(cb) : cb(null, null),
cb => this.repo.closed ? this.repo.open(cb) : cb(),

// save root to datastore under a consistent key
cb => this.repo.datastore.put(PIN_DS_KEY, root.multihash, cb)
], (err, res) => {
], (err) => {
if (err) { return callback(err) }
this.log(`Flushed pins with root: ${root}`)
return callback(null, root)
Expand All @@ -215,34 +186,33 @@ class PinManager {
waterfall([
// hack for CLI tests
(cb) => this.repo.closed ? this.repo.datastore.open(cb) : cb(null, null),
// Get root node CID from datastore
(_, cb) => this.repo.datastore.has(PIN_DS_KEY, cb),
(has, cb) => has ? cb() : cb(new Error('No pins to load')),
(has, cb) => has ? cb() : cb(new Error(NO_PINS_ERR)),
(cb) => this.repo.datastore.get(PIN_DS_KEY, cb),
// Load root node
(mh, cb) => {
this.dag.get(new CID(mh), '', { preload: false }, cb)
this.store.fetch(new CID(mh), cb)
}
], (err, pinRoot) => {
if (err) {
if (err.message === 'No pins to load') {
if (err.message === NO_PINS_ERR) {
this.log('No pins to load')
return lockCb()
} else {
return lockCb(err)
}
}

// Load the direct and recursive pin sets
parallel([
cb => this.pinset.loadSet(pinRoot.value, PinTypes.recursive, cb),
cb => this.pinset.loadSet(pinRoot.value, PinTypes.direct, cb)
], (err, keys) => {
if (err) { return lockCb(err) }
const [ rKeys, dKeys ] = keys

this.directPins = new Set(dKeys.map(toB58String))
this.recursivePins = new Set(rKeys.map(toB58String))

this.log('Loaded pins from the datastore')
return lockCb(null)
cb => this.pinsets.direct.loadSet(pinRoot.value, cb),
cb => this.pinsets.recursive.loadSet(pinRoot.value, cb)
], (err) => {
if (!err) {
this.log('Loaded pins from the datastore')
}
return lockCb(err)
})
})
}, callback)
Expand All @@ -253,7 +223,7 @@ class PinManager {
const { recursive, direct, all } = PinTypes

// recursive
if ((type === recursive || type === all) && this.recursivePins.has(key)) {
if ((type === recursive || type === all) && this.pinsets.recursive.hasPin(key)) {
return callback(null, {
key,
pinned: true,
Expand All @@ -269,7 +239,7 @@ class PinManager {
}

// direct
if ((type === direct || type === all) && this.directPins.has(key)) {
if ((type === direct || type === all) && this.pinsets.direct.hasPin(key)) {
return callback(null, {
key,
pinned: true,
Expand All @@ -288,11 +258,12 @@ class PinManager {
// indirect (default)
// check each recursive key to see if multihash is under it
// arbitrary limit, enables handling 1000s of pins.
detectLimit(this.recursiveKeys().map(key => new CID(key)), concurrencyLimit, (cid, cb) => {
const cids = [...this.pinsets.recursive.pinKeys].map(key => new CID(key))
detectLimit(cids, concurrencyLimit, (cid, cb) => {
waterfall([
(done) => this.dag.get(cid, '', { preload: false }, done),
(done) => this.store.fetch(cid, done),
(result, done) => done(null, result.value),
(node, done) => this.pinset.hasDescendant(node, key, done)
(node, done) => this.pinsets.recursive.hasDescendant(node, key, done)
], cb)
}, (err, cid) => lockCb(err, {
key,
Expand All @@ -315,7 +286,7 @@ class PinManager {
}

const cid = new CID(mh)
this.dag.get(cid, '', { preload: false }, (err, obj) => {
this.store.fetch(cid, (err, obj) => {
if (err) {
return lockCb(new Error(`Could not get pin sets from store: ${err.message}`))
}
Expand All @@ -325,7 +296,7 @@ class PinManager {
// 2. The recursively pinned CIDs
// If large enough, these pin sets may have links to buckets to hold
// the pins
this.pinset.getInternalCids(obj.value, (err, cids) => {
PinSet.getInternalCids(this.store, obj.value, (err, cids) => {
if (err) {
return lockCb(new Error(`Could not get pinner internal cids: ${err.message}`))
}
Expand Down
Loading