Skip to content

Commit 11f9027

Browse files
committed
core/rawdb/eradb: improve file cache (WIP)
This changes the file cache to avoid some issues around file closing. With the new logic, a file will never be closed while it is still in use on any goroutine accessing the era store. To do this, we maintain a reference counter for each file. The new cache also explicitly tracks files which are being opened. This ensures that 'replacing' a file in the cache cannot happen.
1 parent 9bfa519 commit 11f9027

File tree

2 files changed

+199
-65
lines changed

2 files changed

+199
-65
lines changed

core/rawdb/eradb/eradb.go

+144-65
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,35 @@ package eradb
1919
import (
2020
"errors"
2121
"fmt"
22+
"io/fs"
2223
"os"
2324
"path/filepath"
25+
"sync"
26+
"sync/atomic"
2427

2528
"github.com/ethereum/go-ethereum/common/lru"
2629
"github.com/ethereum/go-ethereum/internal/era"
2730
"github.com/ethereum/go-ethereum/log"
2831
)
2932

30-
const (
31-
openFileLimit = 64
32-
)
33+
const openFileLimit = 64
34+
35+
var errClosed = errors.New("era store is closed")
3336

3437
// EraDatabase manages read access to a directory of era1 files.
3538
// The getter methods are thread-safe.
3639
type EraDatabase struct {
3740
datadir string
38-
cache *lru.Cache[uint64, *era.Era]
41+
mu sync.Mutex
42+
lru lru.BasicLRU[uint64, *fileCacheEntry]
43+
opening map[uint64]*fileCacheEntry
44+
}
45+
46+
type fileCacheEntry struct {
47+
ref atomic.Int32
48+
opened chan struct{}
49+
file *era.Era
50+
err error
3951
}
4052

4153
// New creates a new EraDatabase instance.
@@ -56,93 +68,138 @@ func New(datadir string) (*EraDatabase, error) {
5668
}
5769
db := &EraDatabase{
5870
datadir: datadir,
59-
cache: lru.NewCache[uint64, *era.Era](openFileLimit),
60-
}
61-
closeEra := func(epoch uint64, e *era.Era) {
62-
if e == nil {
63-
log.Warn("Era1 cache contained nil value", "epoch", epoch)
64-
return
65-
}
66-
if err := e.Close(); err != nil {
67-
log.Warn("Error closing era1 file", "epoch", epoch, "err", err)
68-
}
71+
lru: lru.NewBasicLRU[uint64, *fileCacheEntry](openFileLimit),
72+
opening: make(map[uint64]*fileCacheEntry),
6973
}
70-
// Take care to close era1 files when they are evicted from cache.
71-
db.cache.OnEvicted(closeEra)
72-
73-
// Concurrently calling GetRaw* methods can cause the same era1 file to be
74-
// opened multiple times.
75-
db.cache.OnReplaced(closeEra)
76-
7774
log.Info("Opened Era store", "datadir", datadir)
7875
return db, nil
7976
}
8077

8178
// Close closes all open era1 files in the cache.
82-
func (db *EraDatabase) Close() error {
83-
// Close all open era1 files in the cache.
84-
keys := db.cache.Keys()
85-
errs := make([]error, len(keys))
86-
for _, key := range keys {
87-
if e, ok := db.cache.Get(key); ok {
88-
if err := e.Close(); err != nil {
89-
errs = append(errs, err)
90-
}
91-
}
79+
func (db *EraDatabase) Close() {
80+
db.mu.Lock()
81+
defer db.mu.Unlock()
82+
83+
keys := db.lru.Keys()
84+
for _, epoch := range keys {
85+
entry, _ := db.lru.Get(epoch)
86+
entry.done(epoch)
9287
}
93-
return errors.Join(errs...)
88+
db.opening = nil
9489
}
9590

9691
// GetRawBody returns the raw body for a given block number.
9792
func (db *EraDatabase) GetRawBody(number uint64) ([]byte, error) {
9893
// Lookup the table by epoch.
9994
epoch := number / uint64(era.MaxEra1Size)
100-
e, err := db.getEraByEpoch(epoch)
101-
if err != nil {
102-
return nil, err
103-
}
104-
// The era1 file for given epoch may not exist.
105-
if e == nil {
106-
return nil, nil
95+
entry := db.getEraByEpoch(epoch)
96+
if entry.err != nil {
97+
if errors.Is(entry.err, fs.ErrNotExist) {
98+
return nil, nil
99+
}
100+
return nil, entry.err
107101
}
108-
return e.GetRawBodyByNumber(number)
102+
defer entry.done(epoch)
103+
return entry.file.GetRawBodyByNumber(number)
109104
}
110105

111106
// GetRawReceipts returns the raw receipts for a given block number.
112107
func (db *EraDatabase) GetRawReceipts(number uint64) ([]byte, error) {
113108
epoch := number / uint64(era.MaxEra1Size)
114-
e, err := db.getEraByEpoch(epoch)
115-
if err != nil {
116-
return nil, err
109+
entry := db.getEraByEpoch(epoch)
110+
if entry.err != nil {
111+
if errors.Is(entry.err, fs.ErrNotExist) {
112+
return nil, nil
113+
}
114+
return nil, entry.err
117115
}
118-
// The era1 file for given epoch may not exist.
119-
if e == nil {
120-
return nil, nil
116+
defer entry.done(epoch)
117+
return entry.file.GetRawReceiptsByNumber(number)
118+
}
119+
120+
// getEraByEpoch opens an era file or gets it from the cache. The caller can access
121+
// entry.file and entry.err and must call entry.done when done reading the file.
122+
func (db *EraDatabase) getEraByEpoch(epoch uint64) *fileCacheEntry {
123+
// Add the requested epoch to the cache.
124+
entry := db.getCacheEntry(epoch)
125+
if entry == nil {
126+
return &fileCacheEntry{err: errClosed}
127+
}
128+
129+
// First goroutine to use the file has to open it.
130+
if entry.ref.Add(1) == 1 {
131+
e, err := db.openEraFile(epoch)
132+
if err != nil {
133+
db.fileFailedToOpen(epoch, entry, err)
134+
} else {
135+
db.fileOpened(epoch, entry, e)
136+
}
137+
close(entry.opened)
121138
}
122-
return e.GetRawReceiptsByNumber(number)
139+
140+
// Bump the refcount and wait for the file to be opened.
141+
entry.ref.Add(1)
142+
<-entry.opened
143+
return entry
123144
}
124145

125-
func (db *EraDatabase) openEra(name string) (*era.Era, error) {
126-
e, err := era.Open(name)
127-
if err != nil {
128-
return nil, err
146+
// getCacheEntry gets an open era file from the cache.
147+
func (db *EraDatabase) getCacheEntry(epoch uint64) *fileCacheEntry {
148+
db.mu.Lock()
149+
defer db.mu.Unlock()
150+
151+
// Check if this epoch is already being opened.
152+
if db.opening == nil {
153+
return nil
129154
}
130-
// Assign an epoch to the table.
131-
if e.Count() != uint64(era.MaxEra1Size) {
132-
return nil, fmt.Errorf("pre-merge era1 files must be full. Want: %d, have: %d", era.MaxEra1Size, e.Count())
155+
if entry, ok := db.opening[epoch]; ok {
156+
return entry
133157
}
134-
if e.Start()%uint64(era.MaxEra1Size) != 0 {
135-
return nil, fmt.Errorf("pre-merge era1 file has invalid boundary. %d %% %d != 0", e.Start(), era.MaxEra1Size)
158+
// Check if it's in the cache.
159+
if entry, ok := db.lru.Get(epoch); ok {
160+
return entry
136161
}
137-
return e, nil
162+
// It's a new file, create an entry in the 'opening' table.
163+
entry := &fileCacheEntry{opened: make(chan struct{})}
164+
db.opening[epoch] = entry
165+
return entry
138166
}
139167

140-
func (db *EraDatabase) getEraByEpoch(epoch uint64) (*era.Era, error) {
141-
// Check the cache first.
142-
if e, ok := db.cache.Get(epoch); ok {
143-
return e, nil
168+
// fileOpened is called after an era file has been successfully opened.
169+
func (db *EraDatabase) fileOpened(epoch uint64, entry *fileCacheEntry, file *era.Era) {
170+
db.mu.Lock()
171+
defer db.mu.Unlock()
172+
173+
// The database may have been closed while opening the file. When that happens,
174+
// db.opening will be set to nil, so we need to handle that here and ensure the caller
175+
// knows.
176+
if db.opening == nil {
177+
entry.err = errClosed
178+
return
144179
}
145-
// file name scheme is <network>-<epoch>-<root>.
180+
181+
// Remove from 'opening' table and add to the LRU.
182+
// This may evict an existing item, which we have to close.
183+
entry.file = file
184+
delete(db.opening, epoch)
185+
if _, evictedEntry, _ := db.lru.Add3(epoch, entry); evictedEntry != nil {
186+
evictedEntry.done(epoch)
187+
}
188+
}
189+
190+
// fileFailedToOpen is called when an era file could not be opened.
191+
func (db *EraDatabase) fileFailedToOpen(epoch uint64, entry *fileCacheEntry, err error) {
192+
entry.err = err
193+
194+
db.mu.Lock()
195+
defer db.mu.Unlock()
196+
if db.opening != nil {
197+
delete(db.opening, epoch)
198+
}
199+
}
200+
201+
func (db *EraDatabase) openEraFile(epoch uint64) (*era.Era, error) {
202+
// File name scheme is <network>-<epoch>-<root>.
146203
glob := fmt.Sprintf("*-%05d-*.era1", epoch)
147204
matches, err := filepath.Glob(filepath.Join(db.datadir, glob))
148205
if err != nil {
@@ -155,11 +212,33 @@ func (db *EraDatabase) getEraByEpoch(epoch uint64) (*era.Era, error) {
155212
return nil, nil
156213
}
157214
filename := matches[0]
158-
e, err := db.openEra(filename)
215+
216+
e, err := era.Open(filename)
159217
if err != nil {
160218
return nil, err
161219
}
162-
// Add the era to the cache.
163-
db.cache.Add(epoch, e)
220+
// Assign an epoch to the table.
221+
if e.Count() != uint64(era.MaxEra1Size) {
222+
return nil, fmt.Errorf("pre-merge era1 files must be full. Want: %d, have: %d", era.MaxEra1Size, e.Count())
223+
}
224+
if e.Start()%uint64(era.MaxEra1Size) != 0 {
225+
return nil, fmt.Errorf("pre-merge era1 file has invalid boundary. %d %% %d != 0", e.Start(), era.MaxEra1Size)
226+
}
164227
return e, nil
165228
}
229+
230+
// done signals that the caller has finished using a file.
231+
// This decrements the refcount and ensures the file is closed by the last user.
232+
func (f *fileCacheEntry) done(epoch uint64) {
233+
if f.err != nil {
234+
return
235+
}
236+
if f.ref.Add(-1) == 0 {
237+
err := f.file.Close()
238+
if err == nil {
239+
log.Debug("Closed era1 file", "epoch", epoch)
240+
} else {
241+
log.Warn("Error closing era1 file", "epoch", epoch, "err", err)
242+
}
243+
}
244+
}

core/rawdb/eradb/eradb_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package eradb
1818

1919
import (
20+
"sync"
2021
"testing"
2122

2223
"github.com/ethereum/go-ethereum/core/types"
@@ -48,3 +49,57 @@ func TestEraDatabase(t *testing.T) {
4849
require.NotNil(t, receipts, "receipts not found")
4950
assert.Equal(t, 0, len(receipts), "receipts length mismatch")
5051
}
52+
53+
func TestEraDatabaseConcurrentOpen(t *testing.T) {
54+
db, err := New("testdata")
55+
require.NoError(t, err)
56+
defer db.Close()
57+
58+
const N = 25
59+
var wg sync.WaitGroup
60+
wg.Add(N)
61+
for range N {
62+
go func() {
63+
defer wg.Done()
64+
r, err := db.GetRawBody(1024)
65+
if err != nil {
66+
t.Error("err:", err)
67+
}
68+
if len(r) == 0 {
69+
t.Error("empty body")
70+
}
71+
}()
72+
}
73+
wg.Wait()
74+
}
75+
76+
func TestEraDatabaseConcurrentOpenClose(t *testing.T) {
77+
db, err := New("testdata")
78+
require.NoError(t, err)
79+
defer db.Close()
80+
81+
const N = 10
82+
var wg sync.WaitGroup
83+
wg.Add(N)
84+
for range N {
85+
go func() {
86+
defer wg.Done()
87+
r, err := db.GetRawBody(1024)
88+
if err == errClosed {
89+
return
90+
}
91+
if err != nil {
92+
t.Error("err:", err)
93+
}
94+
if len(r) == 0 {
95+
t.Error("empty body")
96+
}
97+
}()
98+
}
99+
wg.Add(1)
100+
go func() {
101+
defer wg.Done()
102+
db.Close()
103+
}()
104+
wg.Wait()
105+
}

0 commit comments

Comments
 (0)