diff --git a/CHANGELOG.md b/CHANGELOG.md index c9ef6b77a..b762086a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046)) - **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047)) +- Replace LRU cache by standard mem cache with manual eviction in `store_adapter`. When P2P blocks were fetched too fast, they would be evicted before being executed [#3051](https://github.com/evstack/ev-node/pull/3051) ## v1.0.0-rc.2 @@ -25,7 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Improve cache handling when there is a significant backlog of pending headers and data. ([#3030](https://github.com/evstack/ev-node/pull/3030)) - Decrease MaxBytesSize to `5MB` to increase compatibility with public nodes. ([#3030](https://github.com/evstack/ev-node/pull/3030)) - Proper counting of `DASubmitterPendingBlobs` metrics. [#3038](https://github.com/evstack/ev-node/pull/3038) -- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well. +- Replace `go-header` store by `ev-node` store. This avoid duplication of all blocks in `go-header` and `ev-node` store. Thanks to the cached store from #3030, this should improve p2p performance as well. [#3036](https://github.com/evstack/ev-node/pull/3036) ## v1.0.0-rc.1 diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 83d5c9653..c58cf6acf 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -4,11 +4,13 @@ import ( "context" "encoding/binary" "fmt" + "strings" "sync" "sync/atomic" lru "github.com/hashicorp/golang-lru/v2" + "github.com/rs/zerolog/log" "github.com/evstack/ev-node/pkg/store" ) @@ -215,32 +217,40 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { // RestoreFromStore loads DA inclusion data from the store into the in-memory cache. // This should be called during initialization to restore persisted state. -// It iterates through store metadata keys with the cache's prefix and populates the LRU cache. -func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error { - if c.store == nil { - return nil // No store configured, nothing to restore +// It directly queries store metadata keys with the cache's prefix, avoiding iteration through all blocks. +func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { + if c.store == nil || c.storeKeyPrefix == "" { + return nil // No store configured or no prefix, nothing to restore } - for _, hash := range hashes { - value, err := c.store.GetMetadata(ctx, c.storeKey(hash)) - if err != nil { - // Key not found is not an error - the hash may not have been DA included yet + // Query all metadata entries with our prefix directly + entries, err := c.store.GetMetadataByPrefix(ctx, c.storeKeyPrefix) + if err != nil { + return fmt.Errorf("failed to query metadata by prefix %q: %w", c.storeKeyPrefix, err) + } + + for _, entry := range entries { + // Extract the hash from the key by removing the prefix + hash := strings.TrimPrefix(entry.Key, c.storeKeyPrefix) + if hash == entry.Key || hash == "" { + // Prefix not found or empty hash - skip invalid entry continue } - daHeight, blockHeight, ok := decodeDAInclusion(value) + daHeight, blockHeight, ok := decodeDAInclusion(entry.Value) if !ok { - continue // Invalid data, skip + log.Warn(). + Str("key", entry.Key). + Int("value_len", len(entry.Value)). + Msg("skipping invalid DA inclusion entry during cache restore") + continue } c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) // Update max DA height - current := c.maxDAHeight.Load() - if daHeight > current { - c.maxDAHeight.Store(daHeight) - } + c.setMaxDAHeight(daHeight) } return nil diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index b4e8c2b79..dbb945acb 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -67,8 +67,7 @@ func TestCache_MaxDAHeight_WithStore(t *testing.T) { // Create new cache and restore from store c2 := NewCache[testItem](st, "test/da-included/") - // Restore with the known hashes - err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + err = c2.RestoreFromStore(ctx) require.NoError(t, err) if got := c2.daHeight(); got != 200 { @@ -106,7 +105,7 @@ func TestCache_WithStorePersistence(t *testing.T) { // Create new cache with same store and restore c2 := NewCache[testItem](st, "test/") - err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + err = c2.RestoreFromStore(ctx) require.NoError(t, err) // hash1 and hash2 should be restored, hash3 should not exist @@ -263,7 +262,7 @@ func TestCache_SaveToStore(t *testing.T) { // Verify data is in store by creating new cache and restoring c2 := NewCache[testItem](st, "save-test/") - err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2"}) + err = c2.RestoreFromStore(ctx) require.NoError(t, err) daHeight, ok := c2.getDAIncluded("hash1") diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 96d727baa..26b52491a 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -342,45 +342,17 @@ func (m *implementation) SaveToStore() error { } // RestoreFromStore restores the DA inclusion cache from the store. -// This iterates through blocks in the store and checks for persisted DA inclusion data. +// This uses prefix-based queries to directly load persisted DA inclusion data, +// avoiding expensive iteration through all blocks. func (m *implementation) RestoreFromStore() error { ctx := context.Background() - // Get current store height to know how many blocks to check - height, err := m.store.Height(ctx) - if err != nil { - return fmt.Errorf("failed to get store height: %w", err) - } - - if height == 0 { - return nil // No blocks to restore - } - - // Collect hashes from stored blocks - var headerHashes []string - var dataHashes []string - - for h := uint64(1); h <= height; h++ { - header, data, err := m.store.GetBlockData(ctx, h) - if err != nil { - m.logger.Warn().Uint64("height", h).Err(err).Msg("failed to get block data during cache restore") - continue - } - - if header != nil { - headerHashes = append(headerHashes, header.Hash().String()) - } - if data != nil { - dataHashes = append(dataHashes, data.DACommitment().String()) - } - } - // Restore DA inclusion data from store - if err := m.headerCache.RestoreFromStore(ctx, headerHashes); err != nil { + if err := m.headerCache.RestoreFromStore(ctx); err != nil { return fmt.Errorf("failed to restore header cache from store: %w", err) } - if err := m.dataCache.RestoreFromStore(ctx, dataHashes); err != nil { + if err := m.dataCache.RestoreFromStore(ctx); err != nil { return fmt.Errorf("failed to restore data cache from store: %w", err) } @@ -388,8 +360,8 @@ func (m *implementation) RestoreFromStore() error { m.initDAHeightFromStore(ctx) m.logger.Info(). - Int("header_hashes", len(headerHashes)). - Int("data_hashes", len(dataHashes)). + Int("header_entries", m.headerCache.daIncluded.Len()). + Int("data_entries", m.dataCache.daIncluded.Len()). Uint64("da_height", m.DaHeight()). Msg("restored DA inclusion cache from store") diff --git a/pkg/store/data_store_adapter_test.go b/pkg/store/data_store_adapter_test.go index c05db183c..63d888301 100644 --- a/pkg/store/data_store_adapter_test.go +++ b/pkg/store/data_store_adapter_test.go @@ -58,9 +58,9 @@ func TestDataStoreAdapter_NewDataStoreAdapter(t *testing.T) { // Initially, height should be 0 assert.Equal(t, uint64(0), adapter.Height()) - // Head should return ErrNotFound when empty + // Head should return ErrEmptyStore when empty _, err = adapter.Head(ctx) - assert.ErrorIs(t, err, header.ErrNotFound) + assert.ErrorIs(t, err, header.ErrEmptyStore) } func TestDataStoreAdapter_AppendAndRetrieve(t *testing.T) { @@ -289,9 +289,9 @@ func TestDataStoreAdapter_Tail(t *testing.T) { store := New(ds) adapter := NewDataStoreAdapter(store, testGenesisData()) - // Tail on empty store should return ErrNotFound + // Tail on empty store should return ErrEmptyStore _, err = adapter.Tail(ctx) - assert.ErrorIs(t, err, header.ErrNotFound) + assert.ErrorIs(t, err, header.ErrEmptyStore) _, d1 := types.GetRandomBlock(1, 1, "test-chain") _, d2 := types.GetRandomBlock(2, 1, "test-chain") @@ -512,9 +512,9 @@ func TestDataStoreAdapter_InitWithNil(t *testing.T) { err = adapter.Init(ctx, nil) require.NoError(t, err) - // Should still return ErrNotFound + // Should still return ErrEmptyStore _, err = adapter.Head(ctx) - assert.ErrorIs(t, err, header.ErrNotFound) + assert.ErrorIs(t, err, header.ErrEmptyStore) } func TestDataStoreAdapter_ContextTimeout(t *testing.T) { diff --git a/pkg/store/header_store_adapter_test.go b/pkg/store/header_store_adapter_test.go index 12635f097..c80374aa7 100644 --- a/pkg/store/header_store_adapter_test.go +++ b/pkg/store/header_store_adapter_test.go @@ -56,9 +56,9 @@ func TestHeaderStoreAdapter_NewHeaderStoreAdapter(t *testing.T) { // Initially, height should be 0 assert.Equal(t, uint64(0), adapter.Height()) - // Head should return ErrNotFound when empty + // Head should return ErrEmptyStore when empty _, err = adapter.Head(ctx) - assert.ErrorIs(t, err, header.ErrNotFound) + assert.ErrorIs(t, err, header.ErrEmptyStore) } func TestHeaderStoreAdapter_AppendAndRetrieve(t *testing.T) { @@ -287,9 +287,9 @@ func TestHeaderStoreAdapter_Tail(t *testing.T) { store := New(ds) adapter := NewHeaderStoreAdapter(store, testGenesis()) - // Tail on empty store should return ErrNotFound + // Tail on empty store should return ErrEmptyStore _, err = adapter.Tail(ctx) - assert.ErrorIs(t, err, header.ErrNotFound) + assert.ErrorIs(t, err, header.ErrEmptyStore) h1, _ := types.GetRandomBlock(1, 1, "test-chain") h2, _ := types.GetRandomBlock(2, 1, "test-chain") @@ -510,9 +510,9 @@ func TestHeaderStoreAdapter_InitWithNil(t *testing.T) { err = adapter.Init(ctx, nil) require.NoError(t, err) - // Should still return ErrNotFound + // Should still return ErrEmptyStore _, err = adapter.Head(ctx) - assert.ErrorIs(t, err, header.ErrNotFound) + assert.ErrorIs(t, err, header.ErrEmptyStore) } func TestHeaderStoreAdapter_ContextTimeout(t *testing.T) { diff --git a/pkg/store/store.go b/pkg/store/store.go index a821e6a8d..eafa47ae7 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -6,8 +6,10 @@ import ( "encoding/binary" "errors" "fmt" + "strings" ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" "google.golang.org/protobuf/proto" "github.com/evstack/ev-node/types" @@ -190,6 +192,40 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err return data, nil } +// GetMetadataByPrefix returns all metadata entries whose keys have the given prefix. +// This is more efficient than iterating through known keys when the set of keys is unknown. +func (s *DefaultStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { + // The full key in the datastore includes the meta prefix + fullPrefix := getMetaKey(prefix) + + results, err := s.db.Query(ctx, dsq.Query{Prefix: fullPrefix}) + if err != nil { + return nil, fmt.Errorf("failed to query metadata with prefix '%s': %w", prefix, err) + } + defer results.Close() + + var entries []MetadataEntry + for result := range results.Next() { + if result.Error != nil { + return nil, fmt.Errorf("error iterating metadata results: %w", result.Error) + } + + // Extract the original key by removing the meta prefix + // The key from datastore is like "/m/cache/header-da-included/hash" + // We want to return "cache/header-da-included/hash" + metaKeyPrefix := getMetaKey("") + key := strings.TrimPrefix(result.Key, metaKeyPrefix) + key = strings.TrimPrefix(key, "/") // Remove leading slash for consistency + + entries = append(entries, MetadataEntry{ + Key: key, + Value: result.Value, + }) + } + + return entries, nil +} + // DeleteMetadata removes a metadata key from the store. func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error { err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key))) diff --git a/pkg/store/store_adapter.go b/pkg/store/store_adapter.go index ecd4da7ab..08743c6eb 100644 --- a/pkg/store/store_adapter.go +++ b/pkg/store/store_adapter.go @@ -1,16 +1,15 @@ package store import ( - "bytes" "context" "encoding/binary" "errors" "fmt" "sync" "sync/atomic" + "time" "github.com/celestiaorg/go-header" - lru "github.com/hashicorp/golang-lru/v2" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/types" @@ -19,8 +18,14 @@ import ( // errElapsedHeight is returned when the requested height was already stored. var errElapsedHeight = errors.New("elapsed height") -// defaultPendingCacheSize is the default size for the pending headers/data LRU cache. -const defaultPendingCacheSize = 1000 +const ( + // maxPendingCacheSize is the maximum number of items in the pending cache. + // When this limit is reached, Append will block until items are pruned. + maxPendingCacheSize = 10_000 + + // pruneRetryInterval is how long to wait between prune attempts when cache is full. + pruneRetryInterval = 50 * time.Millisecond +) // StoreGetter abstracts the store access methods for different types (headers vs data). type StoreGetter[H header.Header[H]] interface { @@ -118,6 +123,159 @@ func (hs *heightSub) notifyUpTo(h uint64) { } } +// pendingCache holds all pending state under a single mutex. +// Items are stored here after P2P receipt until persisted to the store. +type pendingCache[H EntityWithDAHint[H]] struct { + mu sync.RWMutex + items map[uint64]H // height -> item + byHash map[string]uint64 // hash -> height (for O(1) lookups) + daHints map[uint64]uint64 // height -> DA hint + maxHeight uint64 // tracked for O(1) access +} + +func newPendingCache[H EntityWithDAHint[H]]() *pendingCache[H] { + return &pendingCache[H]{ + items: make(map[uint64]H), + byHash: make(map[string]uint64), + daHints: make(map[uint64]uint64), + } +} + +func (c *pendingCache[H]) len() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.items) +} + +func (c *pendingCache[H]) add(item H) { + c.mu.Lock() + defer c.mu.Unlock() + height := item.Height() + c.items[height] = item + c.byHash[string(item.Hash())] = height + if hint := item.DAHint(); hint > 0 { + c.daHints[height] = hint + } + if height > c.maxHeight { + c.maxHeight = height + } +} + +func (c *pendingCache[H]) get(height uint64) (H, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + item, ok := c.items[height] + return item, ok +} + +func (c *pendingCache[H]) getByHash(hash []byte) (H, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + var zero H + height, ok := c.byHash[string(hash)] + if !ok { + return zero, false + } + item, ok := c.items[height] + return item, ok +} + +func (c *pendingCache[H]) has(height uint64) bool { + c.mu.RLock() + defer c.mu.RUnlock() + _, ok := c.items[height] + return ok +} + +func (c *pendingCache[H]) hasByHash(hash []byte) bool { + c.mu.RLock() + defer c.mu.RUnlock() + _, ok := c.byHash[string(hash)] + return ok +} + +func (c *pendingCache[H]) getDAHint(height uint64) (uint64, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + hint, ok := c.daHints[height] + return hint, ok +} + +func (c *pendingCache[H]) setDAHint(height, hint uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.daHints[height] = hint +} + +func (c *pendingCache[H]) delete(height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + if item, ok := c.items[height]; ok { + delete(c.byHash, string(item.Hash())) + delete(c.items, height) + if height == c.maxHeight { + c.recalcMaxHeight() + } + } + delete(c.daHints, height) +} + +func (c *pendingCache[H]) getMaxHeight() uint64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.maxHeight +} + +// head returns the highest item in the pending cache and its height. +// Returns zero value and 0 if pending cache is empty. +func (c *pendingCache[H]) head() (H, uint64) { + c.mu.RLock() + defer c.mu.RUnlock() + var zero H + if c.maxHeight == 0 { + return zero, 0 + } + item, ok := c.items[c.maxHeight] + if !ok { + return zero, 0 + } + return item, c.maxHeight +} + +// pruneIf removes items where the predicate returns true. +// Returns the number of items pruned. +func (c *pendingCache[H]) pruneIf(pred func(height uint64) bool) int { + c.mu.Lock() + defer c.mu.Unlock() + pruned := 0 + needsMaxRecalc := false + for height, item := range c.items { + if pred(height) { + delete(c.byHash, string(item.Hash())) + delete(c.items, height) + delete(c.daHints, height) + pruned++ + if height == c.maxHeight { + needsMaxRecalc = true + } + } + } + if needsMaxRecalc { + c.recalcMaxHeight() + } + return pruned +} + +// recalcMaxHeight recalculates maxHeight from items. Must be called with lock held. +func (c *pendingCache[H]) recalcMaxHeight() { + c.maxHeight = 0 + for h := range c.items { + if h > c.maxHeight { + c.maxHeight = h + } + } +} + // StoreAdapter is a generic adapter that wraps Store to implement header.Store[H]. // This allows the ev-node store to be used directly by go-header's P2P infrastructure, // eliminating the need for a separate go-header store and reducing data duplication. @@ -138,13 +296,9 @@ type StoreAdapter[H EntityWithDAHint[H]] struct { mu sync.RWMutex initialized bool - // pending is an LRU cache for items received via Append that haven't been - // written to the store yet. Keyed by height. Using LRU prevents unbounded growth. - pending *lru.Cache[uint64, H] - - // daHints caches DA height hints by block height for fast access. - // Hints are also persisted to disk via the getter. - daHints *lru.Cache[uint64, uint64] + // pending holds items received via Append that haven't been written to the store yet. + // Bounded by maxPendingCacheSize with backpressure when full. + pending *pendingCache[H] // onDeleteFn is called when items are deleted (for rollback scenarios) onDeleteFn func(context.Context, uint64) error @@ -153,10 +307,6 @@ type StoreAdapter[H EntityWithDAHint[H]] struct { // NewStoreAdapter creates a new StoreAdapter wrapping the given store getter. // The genesis is used to determine the initial height for efficient Tail lookups. func NewStoreAdapter[H EntityWithDAHint[H]](getter StoreGetter[H], gen genesis.Genesis) *StoreAdapter[H] { - // Create LRU cache for pending items - ignore error as size is constant and valid - pendingCache, _ := lru.New[uint64, H](defaultPendingCacheSize) - daHintsCache, _ := lru.New[uint64, uint64](defaultPendingCacheSize) - // Get actual current height from store (0 if empty) var storeHeight uint64 if h, err := getter.Height(context.Background()); err == nil { @@ -166,8 +316,7 @@ func NewStoreAdapter[H EntityWithDAHint[H]](getter StoreGetter[H], gen genesis.G adapter := &StoreAdapter[H]{ getter: getter, genesisInitialHeight: max(gen.InitialHeight, 1), - pending: pendingCache, - daHints: daHintsCache, + pending: newPendingCache[H](), heightSub: newHeightSub(storeHeight), } @@ -204,26 +353,12 @@ func (a *StoreAdapter[H]) Stop(ctx context.Context) error { return nil } -// pendingHead returns the highest item in the pending cache and its height. -// Returns zero value and 0 if pending cache is empty. -func (a *StoreAdapter[H]) pendingHead() (H, uint64) { - var maxHeight uint64 - var head H - for _, h := range a.pending.Keys() { - if item, ok := a.pending.Peek(h); ok && h > maxHeight { - maxHeight = h - head = item - } - } - return head, maxHeight -} - // Head returns the highest item in the store. func (a *StoreAdapter[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { var zero H storeHeight, _ := a.getter.Height(ctx) - pendingHead, pendingHeight := a.pendingHead() + pendingHead, pendingHeight := a.pending.head() // Prefer pending if it's higher than store if pendingHeight > storeHeight { @@ -245,7 +380,7 @@ func (a *StoreAdapter[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) ( return pendingHead, nil } - return zero, header.ErrNotFound + return zero, header.ErrEmptyStore } // Tail returns the lowest item in the store. @@ -260,7 +395,7 @@ func (a *StoreAdapter[H]) Tail(ctx context.Context) (H, error) { // Check store h, err := a.getter.Height(ctx) if err != nil || h == 0 { - return zero, header.ErrNotFound + return zero, header.ErrEmptyStore } height = h } @@ -272,7 +407,7 @@ func (a *StoreAdapter[H]) Tail(ctx context.Context) (H, error) { } // Check pending for genesisInitialHeight - if pendingItem, ok := a.pending.Peek(a.genesisInitialHeight); ok { + if pendingItem, ok := a.pending.get(a.genesisInitialHeight); ok { return pendingItem, nil } @@ -282,12 +417,13 @@ func (a *StoreAdapter[H]) Tail(ctx context.Context) (H, error) { if err == nil { return item, nil } - if pendingItem, ok := a.pending.Peek(h); ok { + if pendingItem, ok := a.pending.get(h); ok { return pendingItem, nil } } - return zero, header.ErrNotFound + // shoud never happen + return zero, header.ErrEmptyStore } // Get returns an item by its hash. @@ -301,12 +437,10 @@ func (a *StoreAdapter[H]) Get(ctx context.Context, hash header.Hash) (H, error) return item, nil } - // Check pending items - for _, h := range a.pending.Keys() { - if pendingItem, ok := a.pending.Peek(h); ok && !pendingItem.IsZero() && bytes.Equal(pendingItem.Hash(), hash) { - a.applyDAHint(pendingItem) - return pendingItem, nil - } + // Check pending items using hash index for O(1) lookup + if pendingItem, ok := a.pending.getByHash(hash); ok { + a.applyDAHint(pendingItem) + return pendingItem, nil } return zero, header.ErrNotFound @@ -344,7 +478,7 @@ func (a *StoreAdapter[H]) getByHeightNoWait(ctx context.Context, height uint64) } // Check pending items - if pendingItem, ok := a.pending.Peek(height); ok { + if pendingItem, ok := a.pending.get(height); ok { a.applyDAHint(pendingItem) return pendingItem, nil } @@ -360,16 +494,16 @@ func (a *StoreAdapter[H]) applyDAHint(item H) { height := item.Height() - // Check cache first - if hint, found := a.daHints.Get(height); found { + // Check pending cache first + if hint, found := a.pending.getDAHint(height); found { item.SetDAHint(hint) return } // Try to load from disk - if hint, err := a.getter.GetDAHint(context.Background(), height); err == nil && hint > 0 { - a.daHints.Add(height, hint) - item.SetDAHint(hint) + if diskHint, err := a.getter.GetDAHint(context.Background(), height); err == nil && diskHint > 0 { + a.pending.setDAHint(height, diskHint) + item.SetDAHint(diskHint) } } @@ -419,14 +553,8 @@ func (a *StoreAdapter[H]) Has(ctx context.Context, hash header.Hash) (bool, erro return true, nil } - // Check pending items - for _, h := range a.pending.Keys() { - if pendingItem, ok := a.pending.Peek(h); ok && !pendingItem.IsZero() && bytes.Equal(pendingItem.Hash(), hash) { - return true, nil - } - } - - return false, nil + // Check pending items using hash index for O(1) lookup + return a.pending.hasByHash(hash), nil } // HasAt checks if an item exists at the given height. @@ -437,7 +565,7 @@ func (a *StoreAdapter[H]) HasAt(ctx context.Context, height uint64) bool { } // Check pending items - return a.pending.Contains(height) + return a.pending.has(height) } // Height returns the current height of the store. @@ -445,13 +573,7 @@ func (a *StoreAdapter[H]) Height() uint64 { // Check store first if h, err := a.getter.Height(context.Background()); err == nil && h > 0 { // Also check pending for higher heights - maxPending := uint64(0) - for _, height := range a.pending.Keys() { - if height > maxPending { - maxPending = height - } - } - + maxPending := a.pending.getMaxHeight() if maxPending > h { a.heightSub.SetHeight(maxPending) return maxPending @@ -466,10 +588,8 @@ func (a *StoreAdapter[H]) Height() uint64 { return height } - for _, h := range a.pending.Keys() { - if h > height { - height = h - } + if maxPending := a.pending.getMaxHeight(); maxPending > height { + return maxPending } return height } @@ -478,6 +598,7 @@ func (a *StoreAdapter[H]) Height() uint64 { // These items are received via P2P and will be available for retrieval // until the ev-node syncer processes and persists them to the store. // If items have a DA hint set, it will be cached for later retrieval. +// If the cache is full, this will block until space is available or context is canceled. func (a *StoreAdapter[H]) Append(ctx context.Context, items ...H) error { if len(items) == 0 { return nil @@ -490,21 +611,24 @@ func (a *StoreAdapter[H]) Append(ctx context.Context, items ...H) error { height := item.Height() - // Cache and persist DA hint if present - if hint := item.DAHint(); hint > 0 { - a.daHints.Add(height, hint) - // Persist to disk - _ = a.getter.SetDAHint(ctx, height, hint) - } - // Check if already in store if a.getter.HasAt(ctx, height) { // Already persisted, skip adding to pending continue } - // Add to pending cache (LRU will evict oldest if full) - a.pending.Add(height, item) + // Wait for space in the cache if full (backpressure) + if err := a.waitForSpace(ctx); err != nil { + return err + } + + // Add to pending cache (includes DA hint if present) + a.pending.add(item) + + // Persist DA hint to disk + if hint := item.DAHint(); hint > 0 { + _ = a.getter.SetDAHint(ctx, height, hint) + } // Update cached height and notify waiters if height > a.heightSub.Height() { @@ -515,6 +639,43 @@ func (a *StoreAdapter[H]) Append(ctx context.Context, items ...H) error { return nil } +// waitForSpace blocks until there's space in the pending cache or context is canceled. +// It actively prunes persisted items to make room. +func (a *StoreAdapter[H]) waitForSpace(ctx context.Context) error { + for { + if a.pending.len() < maxPendingCacheSize { + return nil + } + + // Cache is full, try to prune + a.prunePersisted(ctx) + + if a.pending.len() < maxPendingCacheSize { + return nil + } + + // Still full, wait a bit for executor to catch up + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pruneRetryInterval): + // Retry + } + } +} + +// prunePersisted removes items that have already been persisted to the underlying store. +func (a *StoreAdapter[H]) prunePersisted(ctx context.Context) { + storeHeight, err := a.getter.Height(ctx) + if err != nil || storeHeight == 0 { + return + } + // Items at or below store height are definitely persisted + a.pending.pruneIf(func(height uint64) bool { + return height <= storeHeight + }) +} + // Init initializes the store with the first item. // This is called by go-header when bootstrapping the store with a trusted item. func (a *StoreAdapter[H]) Init(ctx context.Context, item H) error { @@ -529,8 +690,9 @@ func (a *StoreAdapter[H]) Init(ctx context.Context, item H) error { return nil } - // Add to pending cache (LRU will evict oldest if full) - a.pending.Add(item.Height(), item) + // Add to pending cache + a.pending.add(item) + a.heightSub.SetHeight(item.Height()) a.initialized = true @@ -543,15 +705,16 @@ func (a *StoreAdapter[H]) Sync(ctx context.Context) error { return nil } -// DeleteRange deletes items in the range [from, to). -// This is used for rollback operations. +// DeleteRange deletes all items in the range [from, to). func (a *StoreAdapter[H]) DeleteRange(ctx context.Context, from, to uint64) error { - // Remove from pending cache and DA hints cache + // Remove from pending cache for height := from; height < to; height++ { - a.pending.Remove(height) - a.daHints.Remove(height) + a.pending.delete(height) + } - if a.onDeleteFn != nil { + // Call onDeleteFn outside the lock + if a.onDeleteFn != nil { + for height := from; height < to; height++ { if err := a.onDeleteFn(ctx, height); err != nil { return err } diff --git a/pkg/store/store_adapter_test.go b/pkg/store/store_adapter_test.go new file mode 100644 index 000000000..21c42370b --- /dev/null +++ b/pkg/store/store_adapter_test.go @@ -0,0 +1,607 @@ +package store + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/types" +) + +// TestPendingCache_BasicOperations tests add, get, has, delete operations +func TestPendingCache_BasicOperations(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + // Initially empty + assert.Equal(t, 0, cache.len()) + assert.Equal(t, uint64(0), cache.getMaxHeight()) + + // Create test items + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + h2, _ := types.GetRandomBlock(2, 1, "test-chain") + h3, _ := types.GetRandomBlock(3, 1, "test-chain") + + p1 := &types.P2PSignedHeader{SignedHeader: h1} + p2 := &types.P2PSignedHeader{SignedHeader: h2} + p3 := &types.P2PSignedHeader{SignedHeader: h3} + + // Add items + cache.add(p1) + assert.Equal(t, 1, cache.len()) + assert.Equal(t, uint64(1), cache.getMaxHeight()) + + cache.add(p2) + assert.Equal(t, 2, cache.len()) + assert.Equal(t, uint64(2), cache.getMaxHeight()) + + cache.add(p3) + assert.Equal(t, 3, cache.len()) + assert.Equal(t, uint64(3), cache.getMaxHeight()) + + // Get by height + item, ok := cache.get(1) + assert.True(t, ok) + assert.Equal(t, uint64(1), item.Height()) + + item, ok = cache.get(2) + assert.True(t, ok) + assert.Equal(t, uint64(2), item.Height()) + + // Get non-existent + _, ok = cache.get(999) + assert.False(t, ok) + + // Has + assert.True(t, cache.has(1)) + assert.True(t, cache.has(2)) + assert.True(t, cache.has(3)) + assert.False(t, cache.has(999)) + + // Delete + cache.delete(2) + assert.Equal(t, 2, cache.len()) + assert.False(t, cache.has(2)) + assert.True(t, cache.has(1)) + assert.True(t, cache.has(3)) + // maxHeight should still be 3 + assert.Equal(t, uint64(3), cache.getMaxHeight()) +} + +// TestPendingCache_GetByHash tests hash-based lookups +func TestPendingCache_GetByHash(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + h2, _ := types.GetRandomBlock(2, 1, "test-chain") + + p1 := &types.P2PSignedHeader{SignedHeader: h1} + p2 := &types.P2PSignedHeader{SignedHeader: h2} + + cache.add(p1) + cache.add(p2) + + // Get by hash + item, ok := cache.getByHash(p1.Hash()) + assert.True(t, ok) + assert.Equal(t, uint64(1), item.Height()) + + item, ok = cache.getByHash(p2.Hash()) + assert.True(t, ok) + assert.Equal(t, uint64(2), item.Height()) + + // Non-existent hash + _, ok = cache.getByHash([]byte("nonexistent")) + assert.False(t, ok) + + // Has by hash + assert.True(t, cache.hasByHash(p1.Hash())) + assert.True(t, cache.hasByHash(p2.Hash())) + assert.False(t, cache.hasByHash([]byte("nonexistent"))) + + // After delete, hash lookup should fail + cache.delete(1) + _, ok = cache.getByHash(p1.Hash()) + assert.False(t, ok) + assert.False(t, cache.hasByHash(p1.Hash())) +} + +// TestPendingCache_MaxHeightTracking tests O(1) maxHeight tracking +func TestPendingCache_MaxHeightTracking(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + // Add items out of order + h5, _ := types.GetRandomBlock(5, 1, "test-chain") + h3, _ := types.GetRandomBlock(3, 1, "test-chain") + h7, _ := types.GetRandomBlock(7, 1, "test-chain") + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + + cache.add(&types.P2PSignedHeader{SignedHeader: h5}) + assert.Equal(t, uint64(5), cache.getMaxHeight()) + + cache.add(&types.P2PSignedHeader{SignedHeader: h3}) + assert.Equal(t, uint64(5), cache.getMaxHeight()) // Still 5 + + cache.add(&types.P2PSignedHeader{SignedHeader: h7}) + assert.Equal(t, uint64(7), cache.getMaxHeight()) // Now 7 + + cache.add(&types.P2PSignedHeader{SignedHeader: h1}) + assert.Equal(t, uint64(7), cache.getMaxHeight()) // Still 7 + + // Delete non-max should not change maxHeight + cache.delete(3) + assert.Equal(t, uint64(7), cache.getMaxHeight()) + + cache.delete(1) + assert.Equal(t, uint64(7), cache.getMaxHeight()) + + // Delete max should recalculate + cache.delete(7) + assert.Equal(t, uint64(5), cache.getMaxHeight()) + + // Delete remaining + cache.delete(5) + assert.Equal(t, uint64(0), cache.getMaxHeight()) +} + +// TestPendingCache_Head tests head() returns highest item +func TestPendingCache_Head(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + // Empty cache + head, height := cache.head() + assert.Nil(t, head) + assert.Equal(t, uint64(0), height) + + // Add items + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + h3, _ := types.GetRandomBlock(3, 1, "test-chain") + h2, _ := types.GetRandomBlock(2, 1, "test-chain") + + cache.add(&types.P2PSignedHeader{SignedHeader: h1}) + cache.add(&types.P2PSignedHeader{SignedHeader: h3}) + cache.add(&types.P2PSignedHeader{SignedHeader: h2}) + + head, height = cache.head() + assert.NotNil(t, head) + assert.Equal(t, uint64(3), height) + assert.Equal(t, uint64(3), head.Height()) + + // After deleting max + cache.delete(3) + head, height = cache.head() + assert.NotNil(t, head) + assert.Equal(t, uint64(2), height) + assert.Equal(t, uint64(2), head.Height()) +} + +// TestPendingCache_DAHints tests DA hint storage and retrieval +func TestPendingCache_DAHints(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + // Set DA hint directly + cache.setDAHint(10, 100) + hint, ok := cache.getDAHint(10) + assert.True(t, ok) + assert.Equal(t, uint64(100), hint) + + // Non-existent + _, ok = cache.getDAHint(999) + assert.False(t, ok) + + // Add item with DA hint + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + p1 := &types.P2PSignedHeader{SignedHeader: h1, DAHeightHint: 50} + cache.add(p1) + + hint, ok = cache.getDAHint(1) + assert.True(t, ok) + assert.Equal(t, uint64(50), hint) + + // Delete should remove DA hint too + cache.delete(1) + _, ok = cache.getDAHint(1) + assert.False(t, ok) +} + +// TestPendingCache_PruneIf tests conditional pruning +func TestPendingCache_PruneIf(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + // Add items at heights 1-10 + for i := uint64(1); i <= 10; i++ { + h, _ := types.GetRandomBlock(i, 1, "test-chain") + cache.add(&types.P2PSignedHeader{SignedHeader: h}) + } + assert.Equal(t, 10, cache.len()) + assert.Equal(t, uint64(10), cache.getMaxHeight()) + + // Prune items <= 5 + pruned := cache.pruneIf(func(height uint64) bool { + return height <= 5 + }) + assert.Equal(t, 5, pruned) + assert.Equal(t, 5, cache.len()) + + // Verify remaining items + for i := uint64(1); i <= 5; i++ { + assert.False(t, cache.has(i), "height %d should be pruned", i) + } + for i := uint64(6); i <= 10; i++ { + assert.True(t, cache.has(i), "height %d should exist", i) + } + + // maxHeight should still be 10 + assert.Equal(t, uint64(10), cache.getMaxHeight()) + + // Prune including max + pruned = cache.pruneIf(func(height uint64) bool { + return height >= 9 + }) + assert.Equal(t, 2, pruned) + assert.Equal(t, 3, cache.len()) + assert.Equal(t, uint64(8), cache.getMaxHeight()) +} + +// TestPendingCache_ConcurrentAccess tests thread safety +func TestPendingCache_ConcurrentAccess(t *testing.T) { + t.Parallel() + + cache := newPendingCache[*types.P2PSignedHeader]() + + const numGoroutines = 10 + const numOpsPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines * 3) // readers, writers, deleters + + // Writers + for i := 0; i < numGoroutines; i++ { + go func(offset int) { + defer wg.Done() + for j := 0; j < numOpsPerGoroutine; j++ { + height := uint64(offset*numOpsPerGoroutine + j + 1) + h, _ := types.GetRandomBlock(height, 1, "test-chain") + cache.add(&types.P2PSignedHeader{SignedHeader: h}) + } + }(i) + } + + // Readers + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < numOpsPerGoroutine; j++ { + _ = cache.len() + _ = cache.getMaxHeight() + _, _ = cache.head() + _, _ = cache.get(uint64(j + 1)) + } + }() + } + + // Deleters (delete some items) + for i := 0; i < numGoroutines; i++ { + go func(offset int) { + defer wg.Done() + for j := 0; j < numOpsPerGoroutine/2; j++ { + height := uint64(offset*numOpsPerGoroutine + j + 1) + cache.delete(height) + } + }(i) + } + + wg.Wait() + + // Should not panic and should have some items remaining + assert.GreaterOrEqual(t, cache.len(), 0) +} + +// TestStoreAdapter_Backpressure tests that Append blocks when cache is full +func TestStoreAdapter_Backpressure(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + store := New(ds) + adapter := NewHeaderStoreAdapter(store, testGenesis()) + + // Fill the cache close to max (we can't easily fill to exactly max without + // modifying maxPendingCacheSize, so we test the mechanism indirectly) + + // Create many items + const numItems = 100 + items := make([]*types.P2PSignedHeader, numItems) + for i := 0; i < numItems; i++ { + h, _ := types.GetRandomBlock(uint64(i+1), 1, "test-chain") + items[i] = &types.P2PSignedHeader{SignedHeader: h} + } + + // Append should work without blocking when there's space + err = adapter.Append(ctx, items...) + require.NoError(t, err) + + // Verify all items were added + assert.Equal(t, uint64(numItems), adapter.Height()) +} + +// TestStoreAdapter_PrunePersistedOptimization tests height-based pruning +func TestStoreAdapter_PrunePersistedOptimization(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + // Add items to pending via Append + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + h2, d2 := types.GetRandomBlock(2, 1, "test-chain") + h3, d3 := types.GetRandomBlock(3, 1, "test-chain") + + err = adapter.Append(ctx, + &types.P2PSignedHeader{SignedHeader: h1}, + &types.P2PSignedHeader{SignedHeader: h2}, + &types.P2PSignedHeader{SignedHeader: h3}, + ) + require.NoError(t, err) + + // Verify items are in pending (adapter.Height() should return 3) + assert.Equal(t, uint64(3), adapter.Height()) + + // Now persist h1 and h2 to the store directly + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SaveBlockData(h2, d2, &types.Signature{})) + require.NoError(t, batch.SetHeight(2)) + require.NoError(t, batch.Commit()) + + // Items should still be retrievable (from either pending or store) + item, err := adapter.GetByHeight(ctx, 1) + require.NoError(t, err) + assert.Equal(t, uint64(1), item.Height()) + + item, err = adapter.GetByHeight(ctx, 2) + require.NoError(t, err) + assert.Equal(t, uint64(2), item.Height()) + + item, err = adapter.GetByHeight(ctx, 3) + require.NoError(t, err) + assert.Equal(t, uint64(3), item.Height()) + + // Persist h3 too + batch, err = st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h3, d3, &types.Signature{})) + require.NoError(t, batch.SetHeight(3)) + require.NoError(t, batch.Commit()) + + // All items should now be retrievable from store + item, err = adapter.GetByHeight(ctx, 3) + require.NoError(t, err) + assert.Equal(t, uint64(3), item.Height()) +} + +// TestStoreAdapter_AppendSkipsPersistedItems tests that Append doesn't add already-persisted items +func TestStoreAdapter_AppendSkipsPersistedItems(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + // Persist h1 directly to store + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + // Now try to Append h1 - it should be skipped + err = adapter.Append(ctx, &types.P2PSignedHeader{SignedHeader: h1}) + require.NoError(t, err) + + // Height should be 1 (from store) + assert.Equal(t, uint64(1), adapter.Height()) + + // Item should be retrievable + item, err := adapter.GetByHeight(ctx, 1) + require.NoError(t, err) + assert.Equal(t, uint64(1), item.Height()) +} + +// TestStoreAdapter_DAHintPersistence tests that DA hints are persisted and restored +func TestStoreAdapter_DAHintPersistence(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + // Add item with DA hint + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + p1 := &types.P2PSignedHeader{SignedHeader: h1, DAHeightHint: 100} + + err = adapter.Append(ctx, p1) + require.NoError(t, err) + + // Retrieve and check DA hint is applied + item, err := adapter.GetByHeight(ctx, 1) + require.NoError(t, err) + assert.Equal(t, uint64(100), item.DAHint()) +} + +// TestStoreAdapter_HeightUpdatesCorrectly tests height tracking through various operations +func TestStoreAdapter_HeightUpdatesCorrectly(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + // Initially 0 + assert.Equal(t, uint64(0), adapter.Height()) + + // Add item at height 5 (gaps are allowed in pending) + h5, _ := types.GetRandomBlock(5, 1, "test-chain") + err = adapter.Append(ctx, &types.P2PSignedHeader{SignedHeader: h5}) + require.NoError(t, err) + assert.Equal(t, uint64(5), adapter.Height()) + + // Add item at height 3 - should not decrease height + h3, _ := types.GetRandomBlock(3, 1, "test-chain") + err = adapter.Append(ctx, &types.P2PSignedHeader{SignedHeader: h3}) + require.NoError(t, err) + assert.Equal(t, uint64(5), adapter.Height()) + + // Add item at height 10 - should increase height + h10, _ := types.GetRandomBlock(10, 1, "test-chain") + err = adapter.Append(ctx, &types.P2PSignedHeader{SignedHeader: h10}) + require.NoError(t, err) + assert.Equal(t, uint64(10), adapter.Height()) + + // Verify HasAt works + assert.True(t, adapter.HasAt(ctx, 3)) + assert.True(t, adapter.HasAt(ctx, 5)) + assert.True(t, adapter.HasAt(ctx, 10)) + assert.False(t, adapter.HasAt(ctx, 7)) // gap + + // Delete height 10 via DeleteRange + err = adapter.DeleteRange(ctx, 10, 11) + require.NoError(t, err) + + // After delete, HasAt should return false for deleted height + assert.False(t, adapter.HasAt(ctx, 10)) + assert.True(t, adapter.HasAt(ctx, 5)) + assert.True(t, adapter.HasAt(ctx, 3)) +} + +// TestStoreAdapter_DeleteRangeRemovesFromPending tests DeleteRange removes items from pending +func TestStoreAdapter_DeleteRangeRemovesFromPending(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + // Add items 1-5 + for i := uint64(1); i <= 5; i++ { + h, _ := types.GetRandomBlock(i, 1, "test-chain") + err = adapter.Append(ctx, &types.P2PSignedHeader{SignedHeader: h}) + require.NoError(t, err) + } + + // Verify all exist + for i := uint64(1); i <= 5; i++ { + assert.True(t, adapter.HasAt(ctx, i)) + } + + // Delete range [2, 4) + err = adapter.DeleteRange(ctx, 2, 4) + require.NoError(t, err) + + // Verify 2 and 3 are gone, 1, 4, 5 remain + assert.True(t, adapter.HasAt(ctx, 1)) + assert.False(t, adapter.HasAt(ctx, 2)) + assert.False(t, adapter.HasAt(ctx, 3)) + assert.True(t, adapter.HasAt(ctx, 4)) + assert.True(t, adapter.HasAt(ctx, 5)) +} + +// TestStoreAdapter_ConcurrentAppendAndRead tests concurrent access to the adapter +func TestStoreAdapter_ConcurrentAppendAndRead(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + const numWriters = 5 + const numReaders = 5 + const itemsPerWriter = 20 + + var wg sync.WaitGroup + wg.Add(numWriters + numReaders) + + // Writers + for w := 0; w < numWriters; w++ { + go func(writerID int) { + defer wg.Done() + for i := 0; i < itemsPerWriter; i++ { + height := uint64(writerID*itemsPerWriter + i + 1) + h, _ := types.GetRandomBlock(height, 1, "test-chain") + _ = adapter.Append(ctx, &types.P2PSignedHeader{SignedHeader: h}) + } + }(w) + } + + // Readers + for r := 0; r < numReaders; r++ { + go func() { + defer wg.Done() + for i := 0; i < itemsPerWriter*numWriters; i++ { + _ = adapter.Height() + _, _ = adapter.Head(ctx) + _ = adapter.HasAt(ctx, uint64(i+1)) + } + }() + } + + wg.Wait() + + // Should have all items + assert.Equal(t, uint64(numWriters*itemsPerWriter), adapter.Height()) +} + +// TestStoreAdapter_InitOnlyOnce tests that Init only works once +func TestStoreAdapter_InitOnlyOnce(t *testing.T) { + t.Parallel() + ctx := context.Background() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + st := New(ds) + adapter := NewHeaderStoreAdapter(st, testGenesis()) + + h1, _ := types.GetRandomBlock(1, 1, "test-chain") + h2, _ := types.GetRandomBlock(2, 1, "test-chain") + + // First Init should work + err = adapter.Init(ctx, &types.P2PSignedHeader{SignedHeader: h1}) + require.NoError(t, err) + assert.Equal(t, uint64(1), adapter.Height()) + + // Second Init should be no-op + err = adapter.Init(ctx, &types.P2PSignedHeader{SignedHeader: h2}) + require.NoError(t, err) + // Height should still be 1, not 2 + assert.Equal(t, uint64(1), adapter.Height()) +} diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index 8636a0ad0..6a09db465 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -338,6 +338,61 @@ func TestMetadata(t *testing.T) { require.Nil(v) } +func TestGetMetadataByPrefix(t *testing.T) { + t.Parallel() + require := require.New(t) + + kv, err := NewTestInMemoryKVStore() + require.NoError(err) + s := New(kv) + + // Set metadata with a common prefix + prefix := "cache/test-prefix/" + require.NoError(s.SetMetadata(t.Context(), prefix+"hash1", []byte("value1"))) + require.NoError(s.SetMetadata(t.Context(), prefix+"hash2", []byte("value2"))) + require.NoError(s.SetMetadata(t.Context(), prefix+"hash3", []byte("value3"))) + // Set one with different prefix + require.NoError(s.SetMetadata(t.Context(), "other/prefix/key", []byte("other"))) + + // Query by prefix + entries, err := s.GetMetadataByPrefix(t.Context(), prefix) + require.NoError(err) + require.Len(entries, 3) + + // Verify keys have consistent format (no leading slash) + keyMap := make(map[string][]byte) + for _, entry := range entries { + // Key should start with the prefix, not "/prefix" + require.True(len(entry.Key) > 0, "key should not be empty") + require.NotEqual("/", string(entry.Key[0]), "key should not have leading slash: %s", entry.Key) + require.Contains(entry.Key, prefix, "key should contain prefix") + keyMap[entry.Key] = entry.Value + } + + // Verify all expected entries are present + require.Equal([]byte("value1"), keyMap[prefix+"hash1"]) + require.Equal([]byte("value2"), keyMap[prefix+"hash2"]) + require.Equal([]byte("value3"), keyMap[prefix+"hash3"]) + + // Other prefix should not be included + _, hasOther := keyMap["other/prefix/key"] + require.False(hasOther) +} + +func TestGetMetadataByPrefix_EmptyResult(t *testing.T) { + t.Parallel() + require := require.New(t) + + kv, err := NewTestInMemoryKVStore() + require.NoError(err) + s := New(kv) + + // Query non-existent prefix + entries, err := s.GetMetadataByPrefix(t.Context(), "nonexistent/prefix/") + require.NoError(err) + require.Len(entries, 0) +} + func TestGetBlockDataErrors(t *testing.T) { t.Parallel() chainID := "TestGetBlockDataErrors" diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 04baf748f..33a21f93a 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -174,6 +174,23 @@ func (t *tracedStore) GetMetadata(ctx context.Context, key string) ([]byte, erro return data, nil } +func (t *tracedStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { + ctx, span := t.tracer.Start(ctx, "Store.GetMetadataByPrefix", + trace.WithAttributes(attribute.String("prefix", prefix)), + ) + defer span.End() + + entries, err := t.inner.GetMetadataByPrefix(ctx, prefix) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return entries, err + } + + span.SetAttributes(attribute.Int("entries.count", len(entries))) + return entries, nil +} + func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) error { ctx, span := t.tracer.Start(ctx, "Store.SetMetadata", trace.WithAttributes( diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 5c273f661..0c0b96760 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -17,19 +17,20 @@ import ( ) type tracingMockStore struct { - heightFn func(ctx context.Context) (uint64, error) - getBlockDataFn func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) - getBlockByHashFn func(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) - getSignatureFn func(ctx context.Context, height uint64) (*types.Signature, error) - getSignatureByHash func(ctx context.Context, hash []byte) (*types.Signature, error) - getHeaderFn func(ctx context.Context, height uint64) (*types.SignedHeader, error) - getStateFn func(ctx context.Context) (types.State, error) - getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) - getMetadataFn func(ctx context.Context, key string) ([]byte, error) - setMetadataFn func(ctx context.Context, key string, value []byte) error - deleteMetadataFn func(ctx context.Context, key string) error - rollbackFn func(ctx context.Context, height uint64, aggregator bool) error - newBatchFn func(ctx context.Context) (Batch, error) + heightFn func(ctx context.Context) (uint64, error) + getBlockDataFn func(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) + getBlockByHashFn func(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) + getSignatureFn func(ctx context.Context, height uint64) (*types.Signature, error) + getSignatureByHash func(ctx context.Context, hash []byte) (*types.Signature, error) + getHeaderFn func(ctx context.Context, height uint64) (*types.SignedHeader, error) + getStateFn func(ctx context.Context) (types.State, error) + getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) + getMetadataFn func(ctx context.Context, key string) ([]byte, error) + getMetadataByPrefixFn func(ctx context.Context, prefix string) ([]MetadataEntry, error) + setMetadataFn func(ctx context.Context, key string, value []byte) error + deleteMetadataFn func(ctx context.Context, key string) error + rollbackFn func(ctx context.Context, height uint64, aggregator bool) error + newBatchFn func(ctx context.Context) (Batch, error) } func (m *tracingMockStore) Height(ctx context.Context) (uint64, error) { @@ -95,6 +96,13 @@ func (m *tracingMockStore) GetMetadata(ctx context.Context, key string) ([]byte, return nil, nil } +func (m *tracingMockStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) { + if m.getMetadataByPrefixFn != nil { + return m.getMetadataByPrefixFn(ctx, prefix) + } + return nil, nil +} + func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value []byte) error { if m.setMetadataFn != nil { return m.setMetadataFn(ctx, key, value) diff --git a/pkg/store/types.go b/pkg/store/types.go index fa1ecdc92..13b62e2f1 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -33,20 +33,33 @@ type Batch interface { type Store interface { Rollback Reader + Metadata + // Close safely closes underlying data storage, to ensure that data is actually saved. + Close() error + + // NewBatch creates a new batch for atomic operations. + NewBatch(ctx context.Context) (Batch, error) +} + +// MetadataEntry represents a key-value pair from metadata storage. +type MetadataEntry struct { + Key string + Value []byte +} + +type Metadata interface { // SetMetadata saves arbitrary value in the store. // // This method enables evolve to safely persist any information. SetMetadata(ctx context.Context, key string, value []byte) error + // GetMetadataByPrefix returns all metadata entries whose keys have the given prefix. + // This is more efficient than iterating through known keys when the set of keys is unknown. + GetMetadataByPrefix(ctx context.Context, prefix string) ([]MetadataEntry, error) + // DeleteMetadata removes a metadata key from the store. DeleteMetadata(ctx context.Context, key string) error - - // Close safely closes underlying data storage, to ensure that data is actually saved. - Close() error - - // NewBatch creates a new batch for atomic operations. - NewBatch(ctx context.Context) (Batch, error) } type Reader interface { diff --git a/test/mocks/store.go b/test/mocks/store.go index 79b3ac6f5..353fa0a1b 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -428,6 +428,74 @@ func (_c *MockStore_GetMetadata_Call) RunAndReturn(run func(ctx context.Context, return _c } +// GetMetadataByPrefix provides a mock function for the type MockStore +func (_mock *MockStore) GetMetadataByPrefix(ctx context.Context, prefix string) ([]store.MetadataEntry, error) { + ret := _mock.Called(ctx, prefix) + + if len(ret) == 0 { + panic("no return value specified for GetMetadataByPrefix") + } + + var r0 []store.MetadataEntry + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) ([]store.MetadataEntry, error)); ok { + return returnFunc(ctx, prefix) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, string) []store.MetadataEntry); ok { + r0 = returnFunc(ctx, prefix) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]store.MetadataEntry) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = returnFunc(ctx, prefix) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockStore_GetMetadataByPrefix_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMetadataByPrefix' +type MockStore_GetMetadataByPrefix_Call struct { + *mock.Call +} + +// GetMetadataByPrefix is a helper method to define mock.On call +// - ctx context.Context +// - prefix string +func (_e *MockStore_Expecter) GetMetadataByPrefix(ctx interface{}, prefix interface{}) *MockStore_GetMetadataByPrefix_Call { + return &MockStore_GetMetadataByPrefix_Call{Call: _e.mock.On("GetMetadataByPrefix", ctx, prefix)} +} + +func (_c *MockStore_GetMetadataByPrefix_Call) Run(run func(ctx context.Context, prefix string)) *MockStore_GetMetadataByPrefix_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStore_GetMetadataByPrefix_Call) Return(metadataEntrys []store.MetadataEntry, err error) *MockStore_GetMetadataByPrefix_Call { + _c.Call.Return(metadataEntrys, err) + return _c +} + +func (_c *MockStore_GetMetadataByPrefix_Call) RunAndReturn(run func(ctx context.Context, prefix string) ([]store.MetadataEntry, error)) *MockStore_GetMetadataByPrefix_Call { + _c.Call.Return(run) + return _c +} + // GetSignature provides a mock function for the type MockStore func (_mock *MockStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { ret := _mock.Called(ctx, height)