Skip to content

feat: add a shardFanoutBits option to the importer #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { decode, type PBNode } from '@ipld/dag-pb'
import errCode from 'err-code'
import { UnixFS } from 'ipfs-unixfs'
import map from 'it-map'
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
Expand All @@ -20,11 +22,28 @@ const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path,
async function * listDirectory (node: PBNode, path: string, resolve: Resolve, depth: number, blockstore: ReadableStorage, options: ExporterOptions): UnixfsV1DirectoryContent {
const links = node.Links

if (node.Data == null) {
throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS')
}

let dir: UnixFS
try {
dir = UnixFS.unmarshal(node.Data)
} catch (err: any) {
throw errCode(err, 'ERR_NOT_UNIXFS')
}

if (dir.fanout == null) {
throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS')
}

const padLength = (dir.fanout - 1n).toString(16).length

const results = pipe(
links,
source => map(source, link => {
return async () => {
const name = link.Name != null ? link.Name.substring(2) : null
const name = link.Name != null ? link.Name.substring(padLength) : null

if (name != null && name !== '') {
const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options)
Expand Down
44 changes: 34 additions & 10 deletions packages/ipfs-unixfs-exporter/src/utils/find-cid-in-shard.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { decode, type PBLink, type PBNode } from '@ipld/dag-pb'
import { murmur3128 } from '@multiformats/murmur3'
import errCode from 'err-code'
import { Bucket, type BucketPosition, createHAMT } from 'hamt-sharding'
import { UnixFS } from 'ipfs-unixfs'
import type { ExporterOptions, ShardTraversalContext, ReadableStorage } from '../index.js'
import type { CID } from 'multiformats/cid'

Expand All @@ -16,13 +18,14 @@ const hashFn = async function (buf: Uint8Array): Promise<Uint8Array> {
}

const addLinksToHamtBucket = async (links: PBLink[], bucket: Bucket<boolean>, rootBucket: Bucket<boolean>): Promise<void> => {
const padLength = (bucket.tableSize() - 1).toString(16).length
await Promise.all(
links.map(async link => {
if (link.Name == null) {
// TODO(@rvagg): what do? this is technically possible
throw new Error('Unexpected Link without a Name')
}
if (link.Name.length === 2) {
if (link.Name.length === padLength) {
const pos = parseInt(link.Name, 16)

bucket._putObjectAt(pos, new Bucket({
Expand All @@ -37,12 +40,12 @@ const addLinksToHamtBucket = async (links: PBLink[], bucket: Bucket<boolean>, ro
)
}

const toPrefix = (position: number): string => {
const toPrefix = (position: number, padLength: number): string => {
return position
.toString(16)
.toUpperCase()
.padStart(2, '0')
.substring(0, 2)
.padStart(padLength, '0')
.substring(0, padLength)
}

const toBucketPath = (position: BucketPosition<boolean>): Array<Bucket<boolean>> => {
Expand All @@ -62,8 +65,27 @@ const toBucketPath = (position: BucketPosition<boolean>): Array<Bucket<boolean>>

const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStorage, context?: ShardTraversalContext, options?: ExporterOptions): Promise<CID | undefined> => {
if (context == null) {
if (node.Data == null) {
throw errCode(new Error('no data in PBNode'), 'ERR_NOT_UNIXFS')
}

let dir: UnixFS
try {
dir = UnixFS.unmarshal(node.Data)
} catch (err: any) {
throw errCode(err, 'ERR_NOT_UNIXFS')
}

if (dir.type !== 'hamt-sharded-directory') {
throw errCode(new Error('not a HAMT'), 'ERR_NOT_UNIXFS')
}
if (dir.fanout == null) {
throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS')
}

const rootBucket = createHAMT<boolean>({
hashFn
hashFn,
bits: Math.log2(Number(dir.fanout))
})

context = {
Expand All @@ -73,25 +95,27 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor
}
}

const padLength = (context.lastBucket.tableSize() - 1).toString(16).length

await addLinksToHamtBucket(node.Links, context.lastBucket, context.rootBucket)

const position = await context.rootBucket._findNewBucketAndPos(name)
let prefix = toPrefix(position.pos)
let prefix = toPrefix(position.pos, padLength)
const bucketPath = toBucketPath(position)

if (bucketPath.length > context.hamtDepth) {
context.lastBucket = bucketPath[context.hamtDepth]

prefix = toPrefix(context.lastBucket._posAtParent)
prefix = toPrefix(context.lastBucket._posAtParent, padLength)
}

const link = node.Links.find(link => {
if (link.Name == null) {
return false
}

const entryPrefix = link.Name.substring(0, 2)
const entryName = link.Name.substring(2)
const entryPrefix = link.Name.substring(0, padLength)
const entryName = link.Name.substring(padLength)

if (entryPrefix !== prefix) {
// not the entry or subshard we're looking for
Expand All @@ -110,7 +134,7 @@ const findShardCid = async (node: PBNode, name: string, blockstore: ReadableStor
return
}

if (link.Name != null && link.Name.substring(2) === name) {
if (link.Name != null && link.Name.substring(padLength) === name) {
return link.Hash
}

Expand Down
64 changes: 62 additions & 2 deletions packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as dagPb from '@ipld/dag-pb'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { UnixFS } from 'ipfs-unixfs'
import { importer } from 'ipfs-unixfs-importer'
import { importer, type ImportCandidate } from 'ipfs-unixfs-importer'
import all from 'it-all'
import randomBytes from 'it-buffer-stream'
import last from 'it-last'
Expand Down Expand Up @@ -241,7 +241,7 @@ describe('exporter sharded', function () {
await block.put(nodeBlockCid, nodeBlockBuf)

const shardNodeBuf = dagPb.encode({
Data: new UnixFS({ type: 'hamt-sharded-directory' }).marshal(),
Data: new UnixFS({ type: 'hamt-sharded-directory', fanout: 2n ** 8n }).marshal(),
Links: [{
Name: '75normal-dir',
Tsize: nodeBlockBuf.length,
Expand All @@ -255,4 +255,64 @@ describe('exporter sharded', function () {

expect(exported.name).to.deep.equal('file-1')
})

it('exports a shard with a different fanout size', async () => {
const files: ImportCandidate[] = [{
path: '/baz.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}, {
path: '/foo.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}, {
path: '/bar.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}]

const result = await last(importer(files, block, {
shardSplitThresholdBytes: 0,
shardFanoutBits: 4, // 2**4 = 16 children max
wrapWithDirectory: true
}))

if (result == null) {
throw new Error('Import failed')
}

const { cid } = result
const dir = await exporter(cid, block)

expect(dir).to.have.nested.property('unixfs.fanout', 16n)

const contents = await all(dir.content())

expect(contents.map(entry => ({
path: `/${entry.name}`,
content: entry.node
})))
.to.deep.equal(files)
})

it('walks path of a HAMT with a different fanout size', async () => {
const files: ImportCandidate[] = [{
path: '/foo/bar/baz.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}]

const result = await last(importer(files, block, {
shardSplitThresholdBytes: 0,
shardFanoutBits: 4, // 2**4 = 16 children max
wrapWithDirectory: true
}))

if (result == null) {
throw new Error('Import failed')
}

const { cid } = result
const file = await last(walkPath(`${cid}/foo/bar/baz.txt`, block))
expect([{
path: file?.path.replace(`${cid}`, ''),
content: file?.node
}]).to.deep.equal(files)
})
})
15 changes: 11 additions & 4 deletions packages/ipfs-unixfs-importer/src/dir-sharded.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ async function hamtHashFn (buf: Uint8Array): Promise<Uint8Array> {
}

const HAMT_HASH_CODE = BigInt(0x22)
const DEFAULT_FANOUT_BITS = 8

export interface DirShardedOptions extends PersistOptions {
shardFanoutBits: number
}

class DirSharded extends Dir {
private readonly _bucket: Bucket<InProgressImportResult | Dir>

constructor (props: DirProps, options: PersistOptions) {
constructor (props: DirProps, options: DirShardedOptions) {
super(props, options)

this._bucket = createHAMT({
hashFn: hamtHashFn,
bits: 8
bits: options.shardFanoutBits ?? DEFAULT_FANOUT_BITS
})
}

Expand Down Expand Up @@ -88,6 +93,7 @@ export default DirSharded

async function * flush (bucket: Bucket<Dir | InProgressImportResult>, blockstore: Blockstore, shardRoot: DirSharded | null, options: PersistOptions): AsyncIterable<ImportResult> {
const children = bucket._children
const padLength = (bucket.tableSize() - 1).toString(16).length
const links: PBLink[] = []
let childrenSize = 0n

Expand All @@ -98,7 +104,7 @@ async function * flush (bucket: Bucket<Dir | InProgressImportResult>, blockstore
continue
}

const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')
const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0')

if (child instanceof Bucket) {
let shard
Expand Down Expand Up @@ -191,6 +197,7 @@ function isDir (obj: any): obj is Dir {

function calculateSize (bucket: Bucket<any>, shardRoot: DirSharded | null, options: PersistOptions): number {
const children = bucket._children
const padLength = (bucket.tableSize() - 1).toString(16).length
const links: PBLink[] = []

for (let i = 0; i < children.length; i++) {
Expand All @@ -200,7 +207,7 @@ function calculateSize (bucket: Bucket<any>, shardRoot: DirSharded | null, optio
continue
}

const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')
const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0')

if (child instanceof Bucket) {
const size = calculateSize(child, null, options)
Expand Down
7 changes: 3 additions & 4 deletions packages/ipfs-unixfs-importer/src/flat-to-shard.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { DirFlat } from './dir-flat.js'
import DirSharded from './dir-sharded.js'
import DirSharded, { type DirShardedOptions } from './dir-sharded.js'
import type { Dir } from './dir.js'
import type { PersistOptions } from './utils/persist.js'

export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: PersistOptions): Promise<DirSharded> {
export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: DirShardedOptions): Promise<DirSharded> {
let newDir = dir as DirSharded

if (dir instanceof DirFlat && dir.estimateNodeSize() > threshold) {
Expand Down Expand Up @@ -31,7 +30,7 @@ export async function flatToShard (child: Dir | null, dir: Dir, threshold: numbe
return newDir
}

async function convertToShard (oldDir: DirFlat, options: PersistOptions): Promise<DirSharded> {
async function convertToShard (oldDir: DirFlat, options: DirShardedOptions): Promise<DirSharded> {
const newDir = new DirSharded({
root: oldDir.root,
dir: true,
Expand Down
9 changes: 9 additions & 0 deletions packages/ipfs-unixfs-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ export interface ImporterOptions extends ProgressOptions<ImporterProgressEvents>
*/
shardSplitThresholdBytes?: number

/**
* The number of bits of a hash digest used at each level of sharding to
* the child index. 2**shardFanoutBits will dictate the maximum number of
* children for any shard in the HAMT. Default: 8
*/
shardFanoutBits?: number

/**
* How many files to import concurrently. For large numbers of small files this
* should be high (e.g. 50). Default: 10
Expand Down Expand Up @@ -241,6 +248,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri

const wrapWithDirectory = options.wrapWithDirectory ?? false
const shardSplitThresholdBytes = options.shardSplitThresholdBytes ?? 262144
const shardFanoutBits = options.shardFanoutBits ?? 8
const cidVersion = options.cidVersion ?? 1
const rawLeaves = options.rawLeaves ?? true
const leafType = options.leafType ?? 'file'
Expand Down Expand Up @@ -269,6 +277,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri
const buildTree: TreeBuilder = options.treeBuilder ?? defaultTreeBuilder({
wrapWithDirectory,
shardSplitThresholdBytes,
shardFanoutBits,
cidVersion,
onProgress: options.onProgress
})
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-unixfs-importer/src/tree-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { PersistOptions } from './utils/persist.js'

export interface AddToTreeOptions extends PersistOptions {
shardSplitThresholdBytes: number
shardFanoutBits: number
}

async function addToTree (elem: InProgressImportResult, tree: Dir, options: AddToTreeOptions): Promise<Dir> {
Expand Down
3 changes: 2 additions & 1 deletion packages/ipfs-unixfs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class UnixFS {
secs: message.mtime.Seconds ?? 0n,
nsecs: message.mtime.FractionalNanoseconds
}
: undefined
: undefined,
fanout: message.fanout
})

// make sure we honour the original mode
Expand Down