-
Notifications
You must be signed in to change notification settings - Fork 1.2k
perf: debounce pin flush #2634
perf: debounce pin flush #2634
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,8 @@ const multicodec = require('multicodec') | |
const dagCborLinks = require('dag-cbor-links') | ||
const debug = require('debug') | ||
const { cidToString } = require('../../../utils/cid') | ||
|
||
const delay = require('delay') | ||
const AbortController = require('abort-controller') | ||
const createPinSet = require('./pin-set') | ||
|
||
const { Errors } = require('interface-datastore') | ||
|
@@ -106,43 +107,65 @@ class PinManager { | |
// Encode and write pin key sets to the datastore: | ||
// a DAGLink for each of the recursive and direct pinsets | ||
// a DAGNode holding those as DAGLinks, a kind of root pin | ||
async flushPins () { | ||
const [ | ||
dLink, | ||
rLink | ||
] = await Promise.all([ | ||
// create a DAGLink to the node with direct pins | ||
this.pinset.storeSet(this.directKeys()) | ||
.then((result) => { | ||
return new DAGLink(PinTypes.direct, result.node.size, result.cid) | ||
}), | ||
// create a DAGLink to the node with recursive pins | ||
this.pinset.storeSet(this.recursiveKeys()) | ||
.then((result) => { | ||
return new DAGLink(PinTypes.recursive, result.node.size, result.cid) | ||
}), | ||
// the pin-set nodes link to a special 'empty' node, so make sure it exists | ||
this.dag.put(new DAGNode(Buffer.alloc(0)), { | ||
version: 0, | ||
format: multicodec.DAG_PB, | ||
hashAlg: multicodec.SHA2_256, | ||
preload: false | ||
}) | ||
]) | ||
async flushPins () { // eslint-disable-line require-await | ||
if (this._flushingPins) { | ||
this._flushingPins.controller.abort() | ||
} | ||
|
||
// create a root node with DAGLinks to the direct and recursive DAGs | ||
const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) | ||
const rootCid = await this.dag.put(rootNode, { | ||
version: 0, | ||
format: multicodec.DAG_PB, | ||
hashAlg: multicodec.SHA2_256, | ||
preload: false | ||
}) | ||
const controller = new AbortController() | ||
|
||
this._flushingPins = { | ||
controller, | ||
promise: delay(100) | ||
.then(async () => { | ||
if (controller.signal.aborted) { | ||
return | ||
} | ||
|
||
// save root to datastore under a consistent key | ||
await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) | ||
this._flushingPins = null | ||
|
||
const [ | ||
dLink, | ||
rLink | ||
] = await Promise.all([ | ||
// create a DAGLink to the node with direct pins | ||
this.pinset.storeSet(this.directKeys()) | ||
.then((result) => { | ||
return new DAGLink(PinTypes.direct, result.node.size, result.cid) | ||
}), | ||
// create a DAGLink to the node with recursive pins | ||
this.pinset.storeSet(this.recursiveKeys()) | ||
.then((result) => { | ||
return new DAGLink(PinTypes.recursive, result.node.size, result.cid) | ||
}), | ||
// the pin-set nodes link to a special 'empty' node, so make sure it exists | ||
this.dag.put(new DAGNode(Buffer.alloc(0)), { | ||
version: 0, | ||
format: multicodec.DAG_PB, | ||
hashAlg: multicodec.SHA2_256, | ||
preload: false | ||
}) | ||
]) | ||
|
||
// create a root node with DAGLinks to the direct and recursive DAGs | ||
const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) | ||
|
||
const rootCid = await this.dag.put(rootNode, { | ||
version: 0, | ||
format: multicodec.DAG_PB, | ||
hashAlg: multicodec.SHA2_256, | ||
preload: false | ||
}) | ||
|
||
// save root to datastore under a consistent key | ||
await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) | ||
|
||
this.log(`Flushed pins with root: ${rootCid}`) | ||
}) | ||
.catch(err => this.log(`Could not flush pins: ${err}`)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please let us not smother the error! Secondly, if the flush does fail, any debounced calls succeed...and they succeed before the flush has been performed so there's no way to know if an error happens. Could we use something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. If I am importing files A, B and C, and we block on flushing pins after importing A, then B will not start to import until A has finished and the pins have flushed, debounced or otherwise so there's no benefit to be gained. This PR works by doing the flushing in another async context but as you point out, there's no way to know if an error occurred, because the only way to know is to block until the operation has completed, which means the next file in the list will not get imported until the pins have flushed. |
||
} | ||
|
||
this.log(`Flushed pins with root: ${rootCid}`) | ||
return this._flushingPins.promise | ||
} | ||
|
||
async load () { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come this call has a
.then().catch()
? We should stick toawait
as much as possible.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personal preference I guess? I've always found IIFEs a bit, well iffy to look at.