-
Notifications
You must be signed in to change notification settings - Fork 1
Implement a LMDB stream state store #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
aac3a46 to
ac94124
Compare
ac94124 to
cffbdff
Compare
The _is_duplicate_batch check was incorrectly skipping batches based only on comparing ranges to the previous batch. This is wrong because: 1. Proper duplicate detection is handled by state_store.is_processed() in load_stream_continuous, which checks against ALL processed batches 2. The range-only comparison does not check hashes, so batches with same ranges but different data (after reorg) would be incorrectly skipped 3. After crash/restart, prev_ranges_by_network is empty anyway
cffbdff to
65a73f5
Compare
| should_update = True | ||
|
|
||
| if should_update: | ||
| meta_value = self._serialize_metadata(batch.end_block, batch.end_hash, batch.start_parent_hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No start block?
|
|
||
|
|
||
| class LMDBStreamStateStore(StreamStateStore): | ||
| env: lmdb.Environment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this matters for systems that provide users with documentation using these comments, but I think standard style is to put the class attributes after the comment block.
| True only if ALL batches are already processed | ||
| """ | ||
| if not batch_ids: | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the InMemoryStreamStateStore we return False for this case when empty batch_ids list is provided
amp-python/src/amp/streaming/state.py
Lines 248 to 249 in 8ce25f1
| if not batch_ids: | |
| return False |
It doesn't look we check for empty batch_ids before calling this, so this code path doesn't ever get hit; however it would be good to align the implementations for consistency.
You have any preference @incrypto32 ?
| ranges.append( | ||
| BlockRange( | ||
| network=network, | ||
| start=meta_data['end_block'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is an invalidation range (open ended) hence the start=end?
Let's leave a comment explaining why above, so this doesn't trip us up in the future. Maybe linking to crash recovery explanation or code.
| start=meta_data['end_block'], | ||
| end=meta_data['end_block'], | ||
| hash=meta_data.get('end_hash'), | ||
| prev_hash=meta_data.get('start_parent_hash'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This start_parent_hash value is (bear with me because this is a mouthful) the hash of the parent block of the start block of the last range that was consumed?
start_parent -> start_block --..--> end_block/end_hash/inval_range_start_block
This hash is disconnected from the other values in the invalidation range. Did you have intention for using this start_parent_hash? If not I'd suggest we don't set prev_hash at all.
No description provided.