1
1
'use strict'
2
2
3
+ const defer = require ( 'pull-defer' )
3
4
const pull = require ( 'pull-stream/pull' )
5
+ const error = require ( 'pull-stream/sources/error' )
4
6
const values = require ( 'pull-stream/sources/values' )
5
7
const filter = require ( 'pull-stream/throughs/filter' )
6
8
const map = require ( 'pull-stream/throughs/map' )
7
9
const cat = require ( 'pull-cat' )
10
+ const Bucket = require ( 'hamt-sharding/src/bucket' )
11
+ const DirSharded = require ( 'ipfs-unixfs-importer/src/importer/dir-sharded' )
12
+ const waterfall = require ( 'async/waterfall' )
8
13
9
14
// Logic to export a unixfs directory.
10
15
module . exports = shardedDirExporter
@@ -27,41 +32,180 @@ function shardedDirExporter (cid, node, name, path, pathRest, resolve, size, dag
27
32
return values ( [ dir ] )
28
33
}
29
34
30
- const streams = [
31
- pull (
32
- values ( node . links ) ,
33
- map ( ( link ) => {
34
- // remove the link prefix (2 chars for the bucket index)
35
- const p = link . name . substring ( 2 )
36
- const pp = p ? path + '/' + p : path
37
- let accept = true
35
+ if ( ! pathRest . length ) {
36
+ // return all children
38
37
39
- if ( p && pathRest . length ) {
40
- accept = ( p === pathRest [ 0 ] )
41
- }
38
+ const streams = [
39
+ pull (
40
+ values ( node . links ) ,
41
+ map ( ( link ) => {
42
+ // remove the link prefix (2 chars for the bucket index)
43
+ const entryName = link . name . substring ( 2 )
44
+ const entryPath = entryName ? path + '/' + entryName : path
42
45
43
- if ( accept ) {
44
46
return {
45
- depth : p ? depth + 1 : depth ,
46
- name : p ,
47
- path : pp ,
47
+ depth : entryName ? depth + 1 : depth ,
48
+ name : entryName ,
49
+ path : entryPath ,
48
50
multihash : link . cid . buffer ,
49
- pathRest : p ? pathRest . slice ( 1 ) : pathRest ,
51
+ pathRest : entryName ? pathRest . slice ( 1 ) : pathRest ,
50
52
parent : dir || parent
51
53
}
52
- } else {
53
- return ''
54
- }
55
- } ) ,
56
- filter ( Boolean ) ,
57
- resolve
58
- )
59
- ]
60
-
61
- // place dir before if not specifying subtree
62
- if ( ! pathRest . length || options . fullPath ) {
54
+ } ) ,
55
+ resolve
56
+ )
57
+ ]
58
+
59
+ // place dir before if not specifying subtree
63
60
streams . unshift ( values ( [ dir ] ) )
61
+
62
+ return cat ( streams )
63
+ }
64
+
65
+ const deferred = defer . source ( )
66
+ const targetFile = pathRest [ 0 ]
67
+
68
+ // recreate our level of the HAMT so we can load only the subshard in pathRest
69
+ waterfall ( [
70
+ ( cb ) => {
71
+ if ( ! options . rootBucket ) {
72
+ options . rootBucket = new Bucket ( {
73
+ hashFn : DirSharded . hashFn
74
+ } )
75
+ options . hamtDepth = 1
76
+
77
+ return addLinksToHamtBucket ( node . links , options . rootBucket , options . rootBucket , cb )
78
+ }
79
+
80
+ return addLinksToHamtBucket ( node . links , options . lastBucket , options . rootBucket , cb )
81
+ } ,
82
+ ( cb ) => findPosition ( targetFile , options . rootBucket , cb ) ,
83
+ ( { position } , cb ) => {
84
+ let prefix = toPrefix ( position . pos )
85
+ const bucketPath = toBucketPath ( position )
86
+
87
+ if ( bucketPath . length > ( options . hamtDepth ) ) {
88
+ options . lastBucket = bucketPath [ options . hamtDepth ]
89
+
90
+ prefix = toPrefix ( options . lastBucket . _posAtParent )
91
+ }
92
+
93
+ const streams = [
94
+ pull (
95
+ values ( node . links ) ,
96
+ map ( ( link ) => {
97
+ const entryPrefix = link . name . substring ( 0 , 2 )
98
+ const entryName = link . name . substring ( 2 )
99
+ const entryPath = entryName ? path + '/' + entryName : path
100
+
101
+ if ( entryPrefix !== prefix ) {
102
+ // not the entry or subshard we're looking for
103
+ return false
104
+ }
105
+
106
+ if ( entryName && entryName !== targetFile ) {
107
+ // not the entry we're looking for
108
+ return false
109
+ }
110
+
111
+ if ( ! entryName ) {
112
+ // we are doing to descend into a subshard
113
+ options . hamtDepth ++
114
+ } else {
115
+ // we've found the node we are looking for, remove the context
116
+ // so we don't affect further hamt traversals
117
+ delete options . rootBucket
118
+ delete options . lastBucket
119
+ delete options . hamtDepth
120
+ }
121
+
122
+ return {
123
+ depth : entryName ? depth + 1 : depth ,
124
+ name : entryName ,
125
+ path : entryPath ,
126
+ multihash : link . cid . buffer ,
127
+ pathRest : entryName ? pathRest . slice ( 1 ) : pathRest ,
128
+ parent : dir || parent
129
+ }
130
+ } ) ,
131
+ filter ( Boolean ) ,
132
+ resolve
133
+ )
134
+ ]
135
+
136
+ if ( options . fullPath ) {
137
+ streams . unshift ( values ( [ dir ] ) )
138
+ }
139
+
140
+ cb ( null , streams )
141
+ }
142
+ ] , ( err , streams ) => {
143
+ if ( err ) {
144
+ return deferred . resolve ( error ( err ) )
145
+ }
146
+
147
+ deferred . resolve ( cat ( streams ) )
148
+ } )
149
+
150
+ return deferred
151
+ }
152
+
153
+ const addLinksToHamtBucket = ( links , bucket , rootBucket , callback ) => {
154
+ Promise . all (
155
+ links . map ( link => {
156
+ if ( link . name . length === 2 ) {
157
+ const pos = parseInt ( link . name , 16 )
158
+
159
+ return bucket . _putObjectAt ( pos , new Bucket ( {
160
+ hashFn : DirSharded . hashFn
161
+ } , bucket , pos ) )
162
+ }
163
+
164
+ return rootBucket . put ( link . name . substring ( 2 ) , true )
165
+ } )
166
+ )
167
+ . catch ( err => {
168
+ callback ( err )
169
+ callback = null
170
+ } )
171
+ . then ( ( ) => callback && callback ( ) )
172
+ }
173
+
174
+ const toPrefix = ( position ) => {
175
+ return position
176
+ . toString ( '16' )
177
+ . toUpperCase ( )
178
+ . padStart ( 2 , '0' )
179
+ . substring ( 0 , 2 )
180
+ }
181
+
182
+ const findPosition = ( file , bucket , cb ) => {
183
+ bucket . _findNewBucketAndPos ( file )
184
+ . catch ( err => {
185
+ cb ( err )
186
+ cb = null
187
+ } )
188
+ . then ( position => {
189
+ if ( ! cb ) {
190
+ // would have errored in catch block above
191
+ return
192
+ }
193
+
194
+ cb ( null , { position } )
195
+ } )
196
+ }
197
+
198
+ const toBucketPath = ( position ) => {
199
+ let bucket = position . bucket
200
+ const path = [ ]
201
+
202
+ while ( bucket . _parent ) {
203
+ path . push ( bucket )
204
+
205
+ bucket = bucket . _parent
64
206
}
65
207
66
- return cat ( streams )
208
+ path . push ( bucket )
209
+
210
+ return path . reverse ( )
67
211
}
0 commit comments