Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 27c128f

Browse files
achingbrainAlan Shaw
authored and
Alan Shaw
committed
fix: Do not load all of a DAG into memory when pinning (#2387)
Port of #2372 into gc branch to ease merging
1 parent 51febd3 commit 27c128f

File tree

4 files changed

+51
-60
lines changed

4 files changed

+51
-60
lines changed

package.json

-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@
120120
"is-stream": "^2.0.0",
121121
"iso-url": "~0.4.6",
122122
"joi": "^14.3.1",
123-
"just-flatten-it": "^2.1.0",
124123
"just-safe-set": "^2.1.0",
125124
"kind-of": "^6.0.2",
126125
"libp2p": "~0.25.4",

src/core/components/dag.js

-34
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
44
const CID = require('cids')
55
const pull = require('pull-stream')
66
const iterToPull = require('async-iterator-to-pull-stream')
7-
const mapAsync = require('async/map')
87
const setImmediate = require('async/setImmediate')
9-
const flattenDeep = require('just-flatten-it')
108
const errCode = require('err-code')
119
const multicodec = require('multicodec')
1210

@@ -180,38 +178,6 @@ module.exports = function dag (self) {
180178
iterToPull(self._ipld.tree(cid, path, options)),
181179
pull.collect(callback)
182180
)
183-
}),
184-
185-
// TODO - use IPLD selectors once they are implemented
186-
_getRecursive: promisify((multihash, options, callback) => {
187-
// gets flat array of all DAGNodes in tree given by multihash
188-
189-
if (typeof options === 'function') {
190-
callback = options
191-
options = {}
192-
}
193-
194-
options = options || {}
195-
196-
let cid
197-
198-
try {
199-
cid = new CID(multihash)
200-
} catch (err) {
201-
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
202-
}
203-
204-
self.dag.get(cid, '', options, (err, res) => {
205-
if (err) { return callback(err) }
206-
207-
mapAsync(res.value.Links, (link, cb) => {
208-
self.dag._getRecursive(link.Hash, options, cb)
209-
}, (err, nodes) => {
210-
// console.log('nodes:', nodes)
211-
if (err) return callback(err)
212-
callback(null, flattenDeep([res.value, nodes]))
213-
})
214-
})
215181
})
216182
}
217183
}

src/core/components/pin.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ module.exports = (self) => {
4848

4949
// entire graph of nested links should be pinned,
5050
// so make sure we have all the objects
51-
dag._getRecursive(key, { preload: options.preload }, (err) => {
51+
pinManager.fetchCompleteDag(key, { preload: options.preload }, (err) => {
5252
if (err) { return cb(err) }
5353
// found all objects, we can add the pin
5454
return cb(null, key)
@@ -242,7 +242,7 @@ module.exports = (self) => {
242242
)
243243
}
244244
if (type === PinTypes.indirect || type === PinTypes.all) {
245-
pinManager.getIndirectKeys((err, indirects) => {
245+
pinManager.getIndirectKeys(options, (err, indirects) => {
246246
if (err) { return callback(err) }
247247
pins = pins
248248
// if something is pinned both directly and indirectly,

src/core/components/pin/pin-manager.js

+49-23
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/* eslint max-nested-callbacks: ["error", 8] */
22
'use strict'
33

4-
const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
4+
const { DAGNode, DAGLink } = require('ipld-dag-pb')
55
const CID = require('cids')
6-
const map = require('async/map')
76
const series = require('async/series')
87
const parallel = require('async/parallel')
98
const eachLimit = require('async/eachLimit')
109
const waterfall = require('async/waterfall')
1110
const detectLimit = require('async/detectLimit')
11+
const queue = require('async/queue')
1212
const { Key } = require('interface-datastore')
1313
const errCode = require('err-code')
1414
const multicodec = require('multicodec')
@@ -43,6 +43,34 @@ class PinManager {
4343
this.recursivePins = new Set()
4444
}
4545

46+
_walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
47+
const q = queue(({ cid }, done) => {
48+
this.dag.get(cid, { preload }, (err, result) => {
49+
if (err) {
50+
return done(err)
51+
}
52+
53+
onCid(cid)
54+
55+
if (result.value.Links) {
56+
q.push(result.value.Links.map(link => ({
57+
cid: link.Hash
58+
})))
59+
}
60+
61+
done()
62+
})
63+
}, concurrencyLimit)
64+
q.drain = () => {
65+
cb()
66+
}
67+
q.error = (err) => {
68+
q.kill()
69+
cb(err)
70+
}
71+
q.push({ cid })
72+
}
73+
4674
directKeys () {
4775
return Array.from(this.directPins, key => new CID(key).buffer)
4876
}
@@ -51,30 +79,21 @@ class PinManager {
5179
return Array.from(this.recursivePins, key => new CID(key).buffer)
5280
}
5381

54-
getIndirectKeys (callback) {
82+
getIndirectKeys ({ preload }, callback) {
5583
const indirectKeys = new Set()
5684
eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => {
57-
this.dag._getRecursive(multihash, (err, nodes) => {
58-
if (err) {
59-
return cb(err)
60-
}
61-
62-
map(nodes, (node, cb) => util.cid(util.serialize(node), {
63-
cidVersion: 0
64-
}).then(cid => cb(null, cid), cb), (err, cids) => {
65-
if (err) {
66-
return cb(err)
85+
this._walkDag({
86+
cid: new CID(multihash),
87+
preload: preload || false,
88+
onCid: (cid) => {
89+
cid = cid.toString()
90+
91+
// recursive pins pre-empt indirect pins
92+
if (!this.recursivePins.has(cid)) {
93+
indirectKeys.add(cid)
6794
}
68-
69-
cids
70-
.map(cid => cid.toString())
71-
// recursive pins pre-empt indirect pins
72-
.filter(key => !this.recursivePins.has(key))
73-
.forEach(key => indirectKeys.add(key))
74-
75-
cb()
76-
})
77-
})
95+
}
96+
}, cb)
7897
}, (err) => {
7998
if (err) { return callback(err) }
8099
callback(null, Array.from(indirectKeys))
@@ -283,6 +302,13 @@ class PinManager {
283302
})
284303
}
285304

305+
fetchCompleteDag (cid, options, callback) {
306+
this._walkDag({
307+
cid,
308+
preload: options.preload
309+
}, callback)
310+
}
311+
286312
// Returns an error if the pin type is invalid
287313
static checkPinType (type) {
288314
if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) {

0 commit comments

Comments
 (0)