From 9bd5f0ffe350f709619c8536c3e5e5187bb4412d Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 18 Mar 2026 19:06:11 -0300 Subject: [PATCH 1/4] Add per-table byte size metrics (lean_table_bytes) Track the byte size (key + value) of each storage table as a live Prometheus gauge. This provides visibility into storage growth and is a prerequisite for improving pruning decisions. The approach uses one AtomicU64 per table in the Store struct: - On startup, a full scan seeds the counters from existing data. - On insert, the SSZ-encoded key+value size is added. - On prune/delete, the size of removed entries is subtracted. Exposed as lean_table_bytes{table="states"}, lean_table_bytes{table="block_headers"}, etc. --- crates/blockchain/src/lib.rs | 5 +- crates/blockchain/src/metrics.rs | 15 +++ crates/storage/src/api/mod.rs | 2 +- crates/storage/src/api/tables.rs | 35 +++++- crates/storage/src/lib.rs | 2 +- crates/storage/src/store.rs | 198 ++++++++++++++++++++++++++----- 6 files changed, 221 insertions(+), 36 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 56a10d49..329b8501 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -3,7 +3,7 @@ use std::time::{Duration, SystemTime}; use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; use ethlambda_state_transition::is_proposer; -use ethlambda_storage::Store; +use ethlambda_storage::{ALL_TABLES, Store}; use ethlambda_types::{ ShortRoot, attestation::{Attestation, AttestationData, SignedAggregatedAttestation, SignedAttestation}, @@ -285,6 +285,9 @@ impl BlockChainServer { metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); + for table in ALL_TABLES { + metrics::update_table_bytes(table.name(), self.store.table_bytes(table)); + } Ok(()) } diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 88c5817f..33c53ce3 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -308,6 +308,21 @@ pub fn time_committee_signatures_aggregation() -> TimingGuard { TimingGuard::new(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS) } +/// Update a table byte size gauge. +pub fn update_table_bytes(table_name: &str, bytes: u64) { + static LEAN_TABLE_BYTES: std::sync::LazyLock = std::sync::LazyLock::new(|| { + register_int_gauge_vec!( + "lean_table_bytes", + "Byte size of a storage table (key + value bytes)", + &["table"] + ) + .unwrap() + }); + LEAN_TABLE_BYTES + .with_label_values(&[table_name]) + .set(bytes as i64); +} + /// Update the gossip signatures gauge. pub fn update_gossip_signatures(count: usize) { static LEAN_GOSSIP_SIGNATURES: std::sync::LazyLock = std::sync::LazyLock::new(|| { diff --git a/crates/storage/src/api/mod.rs b/crates/storage/src/api/mod.rs index 00755269..555d2288 100644 --- a/crates/storage/src/api/mod.rs +++ b/crates/storage/src/api/mod.rs @@ -16,5 +16,5 @@ mod tables; mod traits; -pub use tables::{ALL_TABLES, Table}; +pub use tables::{ALL_TABLES, TABLE_COUNT, Table}; pub use traits::{Error, PrefixResult, StorageBackend, StorageReadView, StorageWriteBatch}; diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 7f7d7a3c..1efc5f35 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -26,8 +26,11 @@ pub enum Table { LiveChain, } +/// Number of tables. +pub const TABLE_COUNT: usize = 8; + /// All table variants. -pub const ALL_TABLES: [Table; 8] = [ +pub const ALL_TABLES: [Table; TABLE_COUNT] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, @@ -37,3 +40,33 @@ pub const ALL_TABLES: [Table; 8] = [ Table::Metadata, Table::LiveChain, ]; + +impl Table { + /// Index into a fixed-size array. Matches the order in [`ALL_TABLES`]. + pub fn index(self) -> usize { + match self { + Table::BlockHeaders => 0, + Table::BlockBodies => 1, + Table::BlockSignatures => 2, + Table::States => 3, + Table::GossipSignatures => 4, + Table::AttestationDataByRoot => 5, + Table::Metadata => 6, + Table::LiveChain => 7, + } + } + + /// Human-readable name for metrics labels. + pub fn name(self) -> &'static str { + match self { + Table::BlockHeaders => "block_headers", + Table::BlockBodies => "block_bodies", + Table::BlockSignatures => "block_signatures", + Table::States => "states", + Table::GossipSignatures => "gossip_signatures", + Table::AttestationDataByRoot => "attestation_data_by_root", + Table::Metadata => "metadata", + Table::LiveChain => "live_chain", + } + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 5cc90805..01ac010f 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -3,6 +3,6 @@ pub mod backend; mod store; mod types; -pub use api::{StorageBackend, StorageReadView, StorageWriteBatch, Table}; +pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; pub use store::{ForkCheckpoints, SignatureKey, Store}; pub use types::{StoredAggregatedPayload, StoredSignature}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 4dd57538..05a54323 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. @@ -8,7 +8,7 @@ use std::sync::{Arc, LazyLock, Mutex}; /// allowing us to skip storing empty bodies and reconstruct them on read. static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().tree_hash_root()); -use crate::api::{StorageBackend, StorageWriteBatch, Table}; +use crate::api::{ALL_TABLES, StorageBackend, StorageWriteBatch, TABLE_COUNT, Table}; use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ @@ -213,6 +213,8 @@ pub struct Store { new_payloads: Arc>, known_payloads: Arc>, gossip_signatures_count: Arc, + /// Byte size of each table (key + value bytes). One entry per [`Table`] variant. + table_bytes: Arc<[AtomicU64; TABLE_COUNT]>, } impl Store { @@ -349,11 +351,13 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); let initial_gossip_count = Self::count_gossip_signatures(&*backend); + let table_bytes = Self::scan_table_bytes(&*backend); Self { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures_count: Arc::new(AtomicUsize::new(initial_gossip_count)), + table_bytes: Arc::new(table_bytes), } } @@ -370,6 +374,22 @@ impl Store { .count() } + /// Scan all tables and sum key + value bytes. Used once at startup. + fn scan_table_bytes(backend: &dyn StorageBackend) -> [AtomicU64; TABLE_COUNT] { + let view = backend.begin_read().expect("read view"); + let result: [AtomicU64; TABLE_COUNT] = std::array::from_fn(|_| AtomicU64::new(0)); + for table in ALL_TABLES { + let bytes: u64 = view + .prefix_iterator(table, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + .map(|(k, v)| (k.len() + v.len()) as u64) + .sum(); + result[table.index()].store(bytes, Ordering::Relaxed); + } + result + } + // ============ Metadata Helpers ============ fn get_metadata(&self, key: &[u8]) -> T { @@ -550,6 +570,7 @@ impl Store { // Collect keys to delete - stop once we hit finalized_slot // Keys are sorted by slot (big-endian encoding) so we can stop early + let mut deleted_bytes = 0u64; let keys_to_delete: Vec<_> = view .prefix_iterator(Table::LiveChain, &[]) .expect("iterator") @@ -558,7 +579,10 @@ impl Store { let (slot, _) = decode_live_chain_key(k); slot < finalized_slot }) - .map(|(k, _)| k.to_vec()) + .map(|(k, v)| { + deleted_bytes += (k.len() + v.len()) as u64; + k.to_vec() + }) .collect(); drop(view); @@ -572,6 +596,7 @@ impl Store { .delete_batch(Table::LiveChain, keys_to_delete) .expect("delete non-finalized chain entries"); batch.commit().expect("commit"); + self.sub_table_bytes(Table::LiveChain, deleted_bytes); count } @@ -579,7 +604,7 @@ impl Store { /// /// Returns the number of signatures pruned. pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { - let pruned = self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { + let (pruned, _) = self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { StoredSignature::from_ssz_bytes(bytes).ok().map(|s| s.slot) }); self.gossip_signatures_count @@ -594,9 +619,11 @@ impl Store { /// /// Returns the number of entries pruned. pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { - self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { - AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) - }) + let (pruned, _) = + self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { + AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) + }); + pruned } /// Prune old states beyond the retention window. @@ -640,11 +667,22 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { + // Sum byte sizes of entries being deleted + let view = self.backend.begin_read().expect("read view"); + let mut deleted_bytes = 0u64; + for key in &keys_to_delete { + if let Some(value) = view.get(Table::States, key).expect("get") { + deleted_bytes += (key.len() + value.len()) as u64; + } + } + drop(view); + let mut batch = self.backend.begin_write().expect("write batch"); batch .delete_batch(Table::States, keys_to_delete) .expect("delete old states"); batch.commit().expect("commit"); + self.sub_table_bytes(Table::States, deleted_bytes); } count } @@ -689,6 +727,24 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { + // Sum byte sizes across all three block tables + let view = self.backend.begin_read().expect("read view"); + let mut header_bytes = 0u64; + let mut body_bytes = 0u64; + let mut sig_bytes = 0u64; + for key in &keys_to_delete { + if let Some(v) = view.get(Table::BlockHeaders, key).expect("get") { + header_bytes += (key.len() + v.len()) as u64; + } + if let Some(v) = view.get(Table::BlockBodies, key).expect("get") { + body_bytes += (key.len() + v.len()) as u64; + } + if let Some(v) = view.get(Table::BlockSignatures, key).expect("get") { + sig_bytes += (key.len() + v.len()) as u64; + } + } + drop(view); + let mut batch = self.backend.begin_write().expect("write batch"); batch .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) @@ -700,6 +756,9 @@ impl Store { .delete_batch(Table::BlockSignatures, keys_to_delete) .expect("delete old block signatures"); batch.commit().expect("commit"); + self.sub_table_bytes(Table::BlockHeaders, header_bytes); + self.sub_table_bytes(Table::BlockBodies, body_bytes); + self.sub_table_bytes(Table::BlockSignatures, sig_bytes); } count } @@ -725,8 +784,11 @@ impl Store { /// the same keys are overwritten (idempotent) and a `LiveChain` entry is added. pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { let mut batch = self.backend.begin_write().expect("write batch"); - write_signed_block(batch.as_mut(), &root, signed_block); + let (_, written) = write_signed_block(batch.as_mut(), &root, signed_block); batch.commit().expect("commit"); + self.add_table_bytes(Table::BlockHeaders, written.headers); + self.add_table_bytes(Table::BlockBodies, written.bodies); + self.add_table_bytes(Table::BlockSignatures, written.signatures); } /// Insert a signed block, storing the block and signatures separately. @@ -738,17 +800,21 @@ impl Store { /// Takes ownership to avoid cloning large signature data. pub fn insert_signed_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { let mut batch = self.backend.begin_write().expect("write batch"); - let block = write_signed_block(batch.as_mut(), &root, signed_block); + let (block, written) = write_signed_block(batch.as_mut(), &root, signed_block); - let index_entries = vec![( - encode_live_chain_key(block.slot, &root), - block.parent_root.as_ssz_bytes(), - )]; + let index_key = encode_live_chain_key(block.slot, &root); + let index_value = block.parent_root.as_ssz_bytes(); + let index_bytes = (index_key.len() + index_value.len()) as u64; + let index_entries = vec![(index_key, index_value)]; batch .put_batch(Table::LiveChain, index_entries) .expect("put non-finalized chain index"); batch.commit().expect("commit"); + self.add_table_bytes(Table::BlockHeaders, written.headers); + self.add_table_bytes(Table::BlockBodies, written.bodies); + self.add_table_bytes(Table::BlockSignatures, written.signatures); + self.add_table_bytes(Table::LiveChain, index_bytes); } /// Get a signed block by combining header, body, and signatures. @@ -799,10 +865,14 @@ impl Store { /// Stores a state indexed by block root. pub fn insert_state(&mut self, root: H256, state: State) { + let key = root.as_ssz_bytes(); + let value = state.as_ssz_bytes(); + let bytes = (key.len() + value.len()) as u64; let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(root.as_ssz_bytes(), state.as_ssz_bytes())]; + let entries = vec![(key, value)]; batch.put_batch(Table::States, entries).expect("put state"); batch.commit().expect("commit"); + self.add_table_bytes(Table::States, bytes); } // ============ Attestation Data By Root ============ @@ -812,12 +882,16 @@ impl Store { /// Stores attestation data indexed by its tree hash root. pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { + let key = root.as_ssz_bytes(); + let value = data.as_ssz_bytes(); + let bytes = (key.len() + value.len()) as u64; let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; + let entries = vec![(key, value)]; batch .put_batch(Table::AttestationDataByRoot, entries) .expect("put attestation data"); batch.commit().expect("commit"); + self.add_table_bytes(Table::AttestationDataByRoot, bytes); } /// Batch-insert multiple attestation data entries in a single commit. @@ -826,14 +900,21 @@ impl Store { return; } let mut batch = self.backend.begin_write().expect("write batch"); - let ssz_entries = entries + let mut total_bytes = 0u64; + let ssz_entries: Vec<_> = entries .into_iter() - .map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes())) + .map(|(root, data)| { + let key = root.as_ssz_bytes(); + let value = data.as_ssz_bytes(); + total_bytes += (key.len() + value.len()) as u64; + (key, value) + }) .collect(); batch .put_batch(Table::AttestationDataByRoot, ssz_entries) .expect("put attestation data batch"); batch.commit().expect("commit"); + self.add_table_bytes(Table::AttestationDataByRoot, total_bytes); } /// Returns attestation data for the given root hash. @@ -960,14 +1041,16 @@ impl Store { /// Prune entries from a table where the slot (extracted via `get_slot`) is <= `finalized_slot`. /// Returns the number of entries pruned. + /// Returns (count_pruned, bytes_pruned). fn prune_by_slot( &mut self, table: Table, finalized_slot: u64, get_slot: impl Fn(&[u8]) -> Option, - ) -> usize { + ) -> (usize, u64) { let view = self.backend.begin_read().expect("read view"); let mut to_delete = vec![]; + let mut deleted_bytes = 0u64; for (key_bytes, value_bytes) in view .prefix_iterator(table, &[]) @@ -977,6 +1060,7 @@ impl Store { if let Some(slot) = get_slot(&value_bytes) && slot <= finalized_slot { + deleted_bytes += (key_bytes.len() + value_bytes.len()) as u64; to_delete.push(key_bytes.to_vec()); } } @@ -987,8 +1071,9 @@ impl Store { let mut batch = self.backend.begin_write().expect("write batch"); batch.delete_batch(table, to_delete).expect("delete"); batch.commit().expect("commit"); + self.sub_table_bytes(table, deleted_bytes); } - count + (count, deleted_bytes) } /// Promotes all new aggregated payloads to known, making them active in fork choice. @@ -1014,6 +1099,19 @@ impl Store { self.gossip_signatures_count.load(Ordering::Relaxed) } + /// Returns the byte size of a table. + pub fn table_bytes(&self, table: Table) -> u64 { + self.table_bytes[table.index()].load(Ordering::Relaxed) + } + + fn add_table_bytes(&self, table: Table, bytes: u64) { + self.table_bytes[table.index()].fetch_add(bytes, Ordering::Relaxed); + } + + fn sub_table_bytes(&self, table: Table, bytes: u64) { + self.table_bytes[table.index()].fetch_sub(bytes, Ordering::Relaxed); + } + /// Delete specific gossip signatures by key. pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { if keys.is_empty() { @@ -1021,6 +1119,16 @@ impl Store { } let count = keys.len(); let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); + + let view = self.backend.begin_read().expect("read view"); + let mut deleted_bytes = 0u64; + for key in &encoded_keys { + if let Some(v) = view.get(Table::GossipSignatures, key).expect("get") { + deleted_bytes += (key.len() + v.len()) as u64; + } + } + drop(view); + let mut batch = self.backend.begin_write().expect("write batch"); batch .delete_batch(Table::GossipSignatures, encoded_keys) @@ -1031,6 +1139,7 @@ impl Store { Some(current.saturating_sub(count)) }) .unwrap(); + self.sub_table_bytes(Table::GossipSignatures, deleted_bytes); } // ============ Gossip Signatures ============ @@ -1069,17 +1178,19 @@ impl Store { let encoded_key = encode_signature_key(&key); // Check if key already exists to avoid inflating the counter on upsert - let is_new = self + let old_value_len = self .backend .begin_read() .expect("read view") .get(Table::GossipSignatures, &encoded_key) - .expect("get") - .is_none(); + .expect("get"); + let is_new = old_value_len.is_none(); let stored = StoredSignature::new(slot, signature); + let value = stored.as_ssz_bytes(); + let new_bytes = (encoded_key.len() + value.len()) as u64; let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encoded_key, stored.as_ssz_bytes())]; + let entries = vec![(encoded_key, value)]; batch .put_batch(Table::GossipSignatures, entries) .expect("put signature"); @@ -1087,6 +1198,7 @@ impl Store { if is_new { self.gossip_signatures_count.fetch_add(1, Ordering::Relaxed); + self.add_table_bytes(Table::GossipSignatures, new_bytes); } } @@ -1113,15 +1225,21 @@ impl Store { } } +/// Byte sizes written per block-related table. +struct BlockWriteBytes { + headers: u64, + bodies: u64, + signatures: u64, +} + /// Write block header, body, and signatures onto an existing batch. /// -/// Returns the deserialized [`Block`] so callers can access fields like -/// `slot` and `parent_root` without re-deserializing. +/// Returns the deserialized [`Block`] and per-table byte sizes written. fn write_signed_block( batch: &mut dyn StorageWriteBatch, root: &H256, signed_block: SignedBlockWithAttestation, -) -> Block { +) -> (Block, BlockWriteBytes) { let SignedBlockWithAttestation { message: BlockWithAttestation { @@ -1139,25 +1257,39 @@ fn write_signed_block( let header = block.header(); let root_bytes = root.as_ssz_bytes(); - let header_entries = vec![(root_bytes.clone(), header.as_ssz_bytes())]; + let header_value = header.as_ssz_bytes(); + let header_bytes = (root_bytes.len() + header_value.len()) as u64; + let header_entries = vec![(root_bytes.clone(), header_value)]; batch .put_batch(Table::BlockHeaders, header_entries) .expect("put block header"); // Skip storing empty bodies - they can be reconstructed from the header's body_root - if header.body_root != *EMPTY_BODY_ROOT { - let body_entries = vec![(root_bytes.clone(), block.body.as_ssz_bytes())]; + let body_bytes = if header.body_root != *EMPTY_BODY_ROOT { + let body_value = block.body.as_ssz_bytes(); + let bytes = (root_bytes.len() + body_value.len()) as u64; + let body_entries = vec![(root_bytes.clone(), body_value)]; batch .put_batch(Table::BlockBodies, body_entries) .expect("put block body"); - } + bytes + } else { + 0 + }; - let sig_entries = vec![(root_bytes, signatures.as_ssz_bytes())]; + let sig_value = signatures.as_ssz_bytes(); + let sig_bytes = (root_bytes.len() + sig_value.len()) as u64; + let sig_entries = vec![(root_bytes, sig_value)]; batch .put_batch(Table::BlockSignatures, sig_entries) .expect("put block signatures"); - block + let written = BlockWriteBytes { + headers: header_bytes, + bodies: body_bytes, + signatures: sig_bytes, + }; + (block, written) } #[cfg(test)] @@ -1234,6 +1366,7 @@ mod tests { new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures_count: Arc::new(AtomicUsize::new(0)), + table_bytes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))), } } @@ -1245,6 +1378,7 @@ mod tests { new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures_count: Arc::new(AtomicUsize::new(0)), + table_bytes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))), } } } From 459f2e1e50ab17c4a727cd55a55e3e48de1a8bd8 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 18 Mar 2026 19:16:04 -0300 Subject: [PATCH 2/4] Simplify table byte tracking after code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract sum_entry_bytes() helper to deduplicate byte-summing in prune_old_states, prune_old_blocks, and delete_gossip_signatures - Merge count_gossip_signatures into scan_tables to avoid scanning the GossipSignatures table twice at startup - Remove dead (usize, u64) return from prune_by_slot — callers never used the bytes value since sub_table_bytes is called inside - Fix double-counting bug: insert_pending_block no longer tracks bytes since the same keys are overwritten by insert_signed_block - Revert unnecessary old_value_len rename in insert_gossip_signature --- crates/storage/src/store.rs | 115 ++++++++++++++---------------------- 1 file changed, 43 insertions(+), 72 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 05a54323..5bbd0405 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -350,8 +350,7 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - let initial_gossip_count = Self::count_gossip_signatures(&*backend); - let table_bytes = Self::scan_table_bytes(&*backend); + let (table_bytes, initial_gossip_count) = Self::scan_tables(&*backend); Self { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), @@ -361,33 +360,26 @@ impl Store { } } - /// Count existing gossip signatures in the database. - /// - /// Used once at startup to seed the in-memory counter. - fn count_gossip_signatures(backend: &dyn StorageBackend) -> usize { - backend - .begin_read() - .expect("read view") - .prefix_iterator(Table::GossipSignatures, &[]) - .expect("iterator") - .filter_map(|r| r.ok()) - .count() - } - - /// Scan all tables and sum key + value bytes. Used once at startup. - fn scan_table_bytes(backend: &dyn StorageBackend) -> [AtomicU64; TABLE_COUNT] { + /// Scan all tables at startup: compute byte sizes and gossip signature count in one pass. + fn scan_tables(backend: &dyn StorageBackend) -> ([AtomicU64; TABLE_COUNT], usize) { let view = backend.begin_read().expect("read view"); let result: [AtomicU64; TABLE_COUNT] = std::array::from_fn(|_| AtomicU64::new(0)); + let mut gossip_count = 0usize; for table in ALL_TABLES { - let bytes: u64 = view + let mut table_total = 0u64; + for (k, v) in view .prefix_iterator(table, &[]) .expect("iterator") .filter_map(|r| r.ok()) - .map(|(k, v)| (k.len() + v.len()) as u64) - .sum(); - result[table.index()].store(bytes, Ordering::Relaxed); + { + table_total += (k.len() + v.len()) as u64; + if table == Table::GossipSignatures { + gossip_count += 1; + } + } + result[table.index()].store(table_total, Ordering::Relaxed); } - result + (result, gossip_count) } // ============ Metadata Helpers ============ @@ -604,7 +596,7 @@ impl Store { /// /// Returns the number of signatures pruned. pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { - let (pruned, _) = self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { + let pruned = self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { StoredSignature::from_ssz_bytes(bytes).ok().map(|s| s.slot) }); self.gossip_signatures_count @@ -619,11 +611,9 @@ impl Store { /// /// Returns the number of entries pruned. pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { - let (pruned, _) = - self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { - AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) - }); - pruned + self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { + AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) + }) } /// Prune old states beyond the retention window. @@ -667,15 +657,7 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { - // Sum byte sizes of entries being deleted - let view = self.backend.begin_read().expect("read view"); - let mut deleted_bytes = 0u64; - for key in &keys_to_delete { - if let Some(value) = view.get(Table::States, key).expect("get") { - deleted_bytes += (key.len() + value.len()) as u64; - } - } - drop(view); + let deleted_bytes = self.sum_entry_bytes(Table::States, &keys_to_delete); let mut batch = self.backend.begin_write().expect("write batch"); batch @@ -727,23 +709,9 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { - // Sum byte sizes across all three block tables - let view = self.backend.begin_read().expect("read view"); - let mut header_bytes = 0u64; - let mut body_bytes = 0u64; - let mut sig_bytes = 0u64; - for key in &keys_to_delete { - if let Some(v) = view.get(Table::BlockHeaders, key).expect("get") { - header_bytes += (key.len() + v.len()) as u64; - } - if let Some(v) = view.get(Table::BlockBodies, key).expect("get") { - body_bytes += (key.len() + v.len()) as u64; - } - if let Some(v) = view.get(Table::BlockSignatures, key).expect("get") { - sig_bytes += (key.len() + v.len()) as u64; - } - } - drop(view); + let header_bytes = self.sum_entry_bytes(Table::BlockHeaders, &keys_to_delete); + let body_bytes = self.sum_entry_bytes(Table::BlockBodies, &keys_to_delete); + let sig_bytes = self.sum_entry_bytes(Table::BlockSignatures, &keys_to_delete); let mut batch = self.backend.begin_write().expect("write batch"); batch @@ -784,11 +752,10 @@ impl Store { /// the same keys are overwritten (idempotent) and a `LiveChain` entry is added. pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { let mut batch = self.backend.begin_write().expect("write batch"); - let (_, written) = write_signed_block(batch.as_mut(), &root, signed_block); + // Don't track bytes here: pending blocks are re-inserted via insert_signed_block + // which overwrites the same keys, so tracking here would double-count. + write_signed_block(batch.as_mut(), &root, signed_block); batch.commit().expect("commit"); - self.add_table_bytes(Table::BlockHeaders, written.headers); - self.add_table_bytes(Table::BlockBodies, written.bodies); - self.add_table_bytes(Table::BlockSignatures, written.signatures); } /// Insert a signed block, storing the block and signatures separately. @@ -1041,13 +1008,12 @@ impl Store { /// Prune entries from a table where the slot (extracted via `get_slot`) is <= `finalized_slot`. /// Returns the number of entries pruned. - /// Returns (count_pruned, bytes_pruned). fn prune_by_slot( &mut self, table: Table, finalized_slot: u64, get_slot: impl Fn(&[u8]) -> Option, - ) -> (usize, u64) { + ) -> usize { let view = self.backend.begin_read().expect("read view"); let mut to_delete = vec![]; let mut deleted_bytes = 0u64; @@ -1073,7 +1039,7 @@ impl Store { batch.commit().expect("commit"); self.sub_table_bytes(table, deleted_bytes); } - (count, deleted_bytes) + count } /// Promotes all new aggregated payloads to known, making them active in fork choice. @@ -1112,6 +1078,18 @@ impl Store { self.table_bytes[table.index()].fetch_sub(bytes, Ordering::Relaxed); } + /// Sum key + value bytes for a set of keys in a table. + fn sum_entry_bytes(&self, table: Table, keys: &[Vec]) -> u64 { + let view = self.backend.begin_read().expect("read view"); + let mut bytes = 0u64; + for key in keys { + if let Some(v) = view.get(table, key).expect("get") { + bytes += (key.len() + v.len()) as u64; + } + } + bytes + } + /// Delete specific gossip signatures by key. pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { if keys.is_empty() { @@ -1120,14 +1098,7 @@ impl Store { let count = keys.len(); let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); - let view = self.backend.begin_read().expect("read view"); - let mut deleted_bytes = 0u64; - for key in &encoded_keys { - if let Some(v) = view.get(Table::GossipSignatures, key).expect("get") { - deleted_bytes += (key.len() + v.len()) as u64; - } - } - drop(view); + let deleted_bytes = self.sum_entry_bytes(Table::GossipSignatures, &encoded_keys); let mut batch = self.backend.begin_write().expect("write batch"); batch @@ -1178,13 +1149,13 @@ impl Store { let encoded_key = encode_signature_key(&key); // Check if key already exists to avoid inflating the counter on upsert - let old_value_len = self + let is_new = self .backend .begin_read() .expect("read view") .get(Table::GossipSignatures, &encoded_key) - .expect("get"); - let is_new = old_value_len.is_none(); + .expect("get") + .is_none(); let stored = StoredSignature::new(slot, signature); let value = stored.as_ssz_bytes(); From 235e81e7a3eaf4f524af39c9ff188530795310ec Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 18 Mar 2026 19:39:40 -0300 Subject: [PATCH 3/4] Use RocksDB estimate-live-data-size instead of manual byte tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the AtomicU64 bookkeeping approach with a direct query to RocksDB's per-column-family property. This eliminates: - Startup table scan (scan_tables) - Byte tracking on every insert and prune - BlockWriteBytes struct, sum_entry_bytes helper, add/sub_table_bytes The StorageBackend trait gains estimate_table_bytes(table) with a default returning 0. RocksDB implements it via property_int_value_cf on the "rocksdb.estimate-live-data-size" property — an O(1) metadata lookup, no scanning required. --- crates/blockchain/src/lib.rs | 2 +- crates/storage/src/api/mod.rs | 2 +- crates/storage/src/api/tables.rs | 19 +-- crates/storage/src/api/traits.rs | 7 ++ crates/storage/src/backend/rocksdb.rs | 11 ++ crates/storage/src/store.rs | 174 ++++++-------------------- 6 files changed, 58 insertions(+), 157 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 329b8501..d9652455 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -286,7 +286,7 @@ impl BlockChainServer { metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); for table in ALL_TABLES { - metrics::update_table_bytes(table.name(), self.store.table_bytes(table)); + metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table)); } Ok(()) } diff --git a/crates/storage/src/api/mod.rs b/crates/storage/src/api/mod.rs index 555d2288..00755269 100644 --- a/crates/storage/src/api/mod.rs +++ b/crates/storage/src/api/mod.rs @@ -16,5 +16,5 @@ mod tables; mod traits; -pub use tables::{ALL_TABLES, TABLE_COUNT, Table}; +pub use tables::{ALL_TABLES, Table}; pub use traits::{Error, PrefixResult, StorageBackend, StorageReadView, StorageWriteBatch}; diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 1efc5f35..35fb687d 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -26,11 +26,8 @@ pub enum Table { LiveChain, } -/// Number of tables. -pub const TABLE_COUNT: usize = 8; - /// All table variants. -pub const ALL_TABLES: [Table; TABLE_COUNT] = [ +pub const ALL_TABLES: [Table; 8] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, @@ -42,20 +39,6 @@ pub const ALL_TABLES: [Table; TABLE_COUNT] = [ ]; impl Table { - /// Index into a fixed-size array. Matches the order in [`ALL_TABLES`]. - pub fn index(self) -> usize { - match self { - Table::BlockHeaders => 0, - Table::BlockBodies => 1, - Table::BlockSignatures => 2, - Table::States => 3, - Table::GossipSignatures => 4, - Table::AttestationDataByRoot => 5, - Table::Metadata => 6, - Table::LiveChain => 7, - } - } - /// Human-readable name for metrics labels. pub fn name(self) -> &'static str { match self { diff --git a/crates/storage/src/api/traits.rs b/crates/storage/src/api/traits.rs index 5ab82601..b1b15d4c 100644 --- a/crates/storage/src/api/traits.rs +++ b/crates/storage/src/api/traits.rs @@ -13,6 +13,13 @@ pub trait StorageBackend: Send + Sync { /// Begin a write batch. fn begin_write(&self) -> Result, Error>; + + /// Estimated live data size in bytes for a table. + /// Returns 0 if the backend does not support this (e.g. in-memory). + fn estimate_table_bytes(&self, table: Table) -> u64 { + let _ = table; + 0 + } } /// A read-only view of the storage. diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index b1338052..77398ba8 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -61,6 +61,17 @@ impl StorageBackend for RocksDBBackend { batch: WriteBatch::default(), })) } + + fn estimate_table_bytes(&self, table: Table) -> u64 { + let Some(cf) = self.db.cf_handle(cf_name(table)) else { + return 0; + }; + self.db + .property_int_value_cf(&cf, "rocksdb.estimate-live-data-size") + .ok() + .flatten() + .unwrap_or(0) + } } /// Read-only view into RocksDB. diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 5bbd0405..2e3e6bec 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. @@ -8,7 +8,7 @@ use std::sync::{Arc, LazyLock, Mutex}; /// allowing us to skip storing empty bodies and reconstruct them on read. static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().tree_hash_root()); -use crate::api::{ALL_TABLES, StorageBackend, StorageWriteBatch, TABLE_COUNT, Table}; +use crate::api::{StorageBackend, StorageWriteBatch, Table}; use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ @@ -213,8 +213,6 @@ pub struct Store { new_payloads: Arc>, known_payloads: Arc>, gossip_signatures_count: Arc, - /// Byte size of each table (key + value bytes). One entry per [`Table`] variant. - table_bytes: Arc<[AtomicU64; TABLE_COUNT]>, } impl Store { @@ -350,36 +348,26 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - let (table_bytes, initial_gossip_count) = Self::scan_tables(&*backend); + let initial_gossip_count = Self::count_gossip_signatures(&*backend); Self { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures_count: Arc::new(AtomicUsize::new(initial_gossip_count)), - table_bytes: Arc::new(table_bytes), } } - /// Scan all tables at startup: compute byte sizes and gossip signature count in one pass. - fn scan_tables(backend: &dyn StorageBackend) -> ([AtomicU64; TABLE_COUNT], usize) { - let view = backend.begin_read().expect("read view"); - let result: [AtomicU64; TABLE_COUNT] = std::array::from_fn(|_| AtomicU64::new(0)); - let mut gossip_count = 0usize; - for table in ALL_TABLES { - let mut table_total = 0u64; - for (k, v) in view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|r| r.ok()) - { - table_total += (k.len() + v.len()) as u64; - if table == Table::GossipSignatures { - gossip_count += 1; - } - } - result[table.index()].store(table_total, Ordering::Relaxed); - } - (result, gossip_count) + /// Count existing gossip signatures in the database. + /// + /// Used once at startup to seed the in-memory counter. + fn count_gossip_signatures(backend: &dyn StorageBackend) -> usize { + backend + .begin_read() + .expect("read view") + .prefix_iterator(Table::GossipSignatures, &[]) + .expect("iterator") + .filter_map(|r| r.ok()) + .count() } // ============ Metadata Helpers ============ @@ -562,7 +550,6 @@ impl Store { // Collect keys to delete - stop once we hit finalized_slot // Keys are sorted by slot (big-endian encoding) so we can stop early - let mut deleted_bytes = 0u64; let keys_to_delete: Vec<_> = view .prefix_iterator(Table::LiveChain, &[]) .expect("iterator") @@ -571,10 +558,7 @@ impl Store { let (slot, _) = decode_live_chain_key(k); slot < finalized_slot }) - .map(|(k, v)| { - deleted_bytes += (k.len() + v.len()) as u64; - k.to_vec() - }) + .map(|(k, _)| k.to_vec()) .collect(); drop(view); @@ -588,7 +572,6 @@ impl Store { .delete_batch(Table::LiveChain, keys_to_delete) .expect("delete non-finalized chain entries"); batch.commit().expect("commit"); - self.sub_table_bytes(Table::LiveChain, deleted_bytes); count } @@ -657,14 +640,11 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { - let deleted_bytes = self.sum_entry_bytes(Table::States, &keys_to_delete); - let mut batch = self.backend.begin_write().expect("write batch"); batch .delete_batch(Table::States, keys_to_delete) .expect("delete old states"); batch.commit().expect("commit"); - self.sub_table_bytes(Table::States, deleted_bytes); } count } @@ -709,10 +689,6 @@ impl Store { let count = keys_to_delete.len(); if count > 0 { - let header_bytes = self.sum_entry_bytes(Table::BlockHeaders, &keys_to_delete); - let body_bytes = self.sum_entry_bytes(Table::BlockBodies, &keys_to_delete); - let sig_bytes = self.sum_entry_bytes(Table::BlockSignatures, &keys_to_delete); - let mut batch = self.backend.begin_write().expect("write batch"); batch .delete_batch(Table::BlockHeaders, keys_to_delete.clone()) @@ -724,9 +700,6 @@ impl Store { .delete_batch(Table::BlockSignatures, keys_to_delete) .expect("delete old block signatures"); batch.commit().expect("commit"); - self.sub_table_bytes(Table::BlockHeaders, header_bytes); - self.sub_table_bytes(Table::BlockBodies, body_bytes); - self.sub_table_bytes(Table::BlockSignatures, sig_bytes); } count } @@ -752,8 +725,6 @@ impl Store { /// the same keys are overwritten (idempotent) and a `LiveChain` entry is added. pub fn insert_pending_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { let mut batch = self.backend.begin_write().expect("write batch"); - // Don't track bytes here: pending blocks are re-inserted via insert_signed_block - // which overwrites the same keys, so tracking here would double-count. write_signed_block(batch.as_mut(), &root, signed_block); batch.commit().expect("commit"); } @@ -767,21 +738,17 @@ impl Store { /// Takes ownership to avoid cloning large signature data. pub fn insert_signed_block(&mut self, root: H256, signed_block: SignedBlockWithAttestation) { let mut batch = self.backend.begin_write().expect("write batch"); - let (block, written) = write_signed_block(batch.as_mut(), &root, signed_block); + let block = write_signed_block(batch.as_mut(), &root, signed_block); - let index_key = encode_live_chain_key(block.slot, &root); - let index_value = block.parent_root.as_ssz_bytes(); - let index_bytes = (index_key.len() + index_value.len()) as u64; - let index_entries = vec![(index_key, index_value)]; + let index_entries = vec![( + encode_live_chain_key(block.slot, &root), + block.parent_root.as_ssz_bytes(), + )]; batch .put_batch(Table::LiveChain, index_entries) .expect("put non-finalized chain index"); batch.commit().expect("commit"); - self.add_table_bytes(Table::BlockHeaders, written.headers); - self.add_table_bytes(Table::BlockBodies, written.bodies); - self.add_table_bytes(Table::BlockSignatures, written.signatures); - self.add_table_bytes(Table::LiveChain, index_bytes); } /// Get a signed block by combining header, body, and signatures. @@ -832,14 +799,10 @@ impl Store { /// Stores a state indexed by block root. pub fn insert_state(&mut self, root: H256, state: State) { - let key = root.as_ssz_bytes(); - let value = state.as_ssz_bytes(); - let bytes = (key.len() + value.len()) as u64; let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(key, value)]; + let entries = vec![(root.as_ssz_bytes(), state.as_ssz_bytes())]; batch.put_batch(Table::States, entries).expect("put state"); batch.commit().expect("commit"); - self.add_table_bytes(Table::States, bytes); } // ============ Attestation Data By Root ============ @@ -849,16 +812,12 @@ impl Store { /// Stores attestation data indexed by its tree hash root. pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { - let key = root.as_ssz_bytes(); - let value = data.as_ssz_bytes(); - let bytes = (key.len() + value.len()) as u64; let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(key, value)]; + let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; batch .put_batch(Table::AttestationDataByRoot, entries) .expect("put attestation data"); batch.commit().expect("commit"); - self.add_table_bytes(Table::AttestationDataByRoot, bytes); } /// Batch-insert multiple attestation data entries in a single commit. @@ -867,21 +826,14 @@ impl Store { return; } let mut batch = self.backend.begin_write().expect("write batch"); - let mut total_bytes = 0u64; - let ssz_entries: Vec<_> = entries + let ssz_entries = entries .into_iter() - .map(|(root, data)| { - let key = root.as_ssz_bytes(); - let value = data.as_ssz_bytes(); - total_bytes += (key.len() + value.len()) as u64; - (key, value) - }) + .map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes())) .collect(); batch .put_batch(Table::AttestationDataByRoot, ssz_entries) .expect("put attestation data batch"); batch.commit().expect("commit"); - self.add_table_bytes(Table::AttestationDataByRoot, total_bytes); } /// Returns attestation data for the given root hash. @@ -1016,7 +968,6 @@ impl Store { ) -> usize { let view = self.backend.begin_read().expect("read view"); let mut to_delete = vec![]; - let mut deleted_bytes = 0u64; for (key_bytes, value_bytes) in view .prefix_iterator(table, &[]) @@ -1026,7 +977,6 @@ impl Store { if let Some(slot) = get_slot(&value_bytes) && slot <= finalized_slot { - deleted_bytes += (key_bytes.len() + value_bytes.len()) as u64; to_delete.push(key_bytes.to_vec()); } } @@ -1037,7 +987,6 @@ impl Store { let mut batch = self.backend.begin_write().expect("write batch"); batch.delete_batch(table, to_delete).expect("delete"); batch.commit().expect("commit"); - self.sub_table_bytes(table, deleted_bytes); } count } @@ -1065,29 +1014,9 @@ impl Store { self.gossip_signatures_count.load(Ordering::Relaxed) } - /// Returns the byte size of a table. - pub fn table_bytes(&self, table: Table) -> u64 { - self.table_bytes[table.index()].load(Ordering::Relaxed) - } - - fn add_table_bytes(&self, table: Table, bytes: u64) { - self.table_bytes[table.index()].fetch_add(bytes, Ordering::Relaxed); - } - - fn sub_table_bytes(&self, table: Table, bytes: u64) { - self.table_bytes[table.index()].fetch_sub(bytes, Ordering::Relaxed); - } - - /// Sum key + value bytes for a set of keys in a table. - fn sum_entry_bytes(&self, table: Table, keys: &[Vec]) -> u64 { - let view = self.backend.begin_read().expect("read view"); - let mut bytes = 0u64; - for key in keys { - if let Some(v) = view.get(table, key).expect("get") { - bytes += (key.len() + v.len()) as u64; - } - } - bytes + /// Estimated live data size in bytes for a table, as reported by the backend. + pub fn estimate_table_bytes(&self, table: Table) -> u64 { + self.backend.estimate_table_bytes(table) } /// Delete specific gossip signatures by key. @@ -1097,9 +1026,6 @@ impl Store { } let count = keys.len(); let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); - - let deleted_bytes = self.sum_entry_bytes(Table::GossipSignatures, &encoded_keys); - let mut batch = self.backend.begin_write().expect("write batch"); batch .delete_batch(Table::GossipSignatures, encoded_keys) @@ -1110,7 +1036,6 @@ impl Store { Some(current.saturating_sub(count)) }) .unwrap(); - self.sub_table_bytes(Table::GossipSignatures, deleted_bytes); } // ============ Gossip Signatures ============ @@ -1158,10 +1083,8 @@ impl Store { .is_none(); let stored = StoredSignature::new(slot, signature); - let value = stored.as_ssz_bytes(); - let new_bytes = (encoded_key.len() + value.len()) as u64; let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encoded_key, value)]; + let entries = vec![(encoded_key, stored.as_ssz_bytes())]; batch .put_batch(Table::GossipSignatures, entries) .expect("put signature"); @@ -1169,7 +1092,6 @@ impl Store { if is_new { self.gossip_signatures_count.fetch_add(1, Ordering::Relaxed); - self.add_table_bytes(Table::GossipSignatures, new_bytes); } } @@ -1196,21 +1118,15 @@ impl Store { } } -/// Byte sizes written per block-related table. -struct BlockWriteBytes { - headers: u64, - bodies: u64, - signatures: u64, -} - /// Write block header, body, and signatures onto an existing batch. /// -/// Returns the deserialized [`Block`] and per-table byte sizes written. +/// Returns the deserialized [`Block`] so callers can access fields like +/// `slot` and `parent_root` without re-deserializing. fn write_signed_block( batch: &mut dyn StorageWriteBatch, root: &H256, signed_block: SignedBlockWithAttestation, -) -> (Block, BlockWriteBytes) { +) -> Block { let SignedBlockWithAttestation { message: BlockWithAttestation { @@ -1228,39 +1144,25 @@ fn write_signed_block( let header = block.header(); let root_bytes = root.as_ssz_bytes(); - let header_value = header.as_ssz_bytes(); - let header_bytes = (root_bytes.len() + header_value.len()) as u64; - let header_entries = vec![(root_bytes.clone(), header_value)]; + let header_entries = vec![(root_bytes.clone(), header.as_ssz_bytes())]; batch .put_batch(Table::BlockHeaders, header_entries) .expect("put block header"); // Skip storing empty bodies - they can be reconstructed from the header's body_root - let body_bytes = if header.body_root != *EMPTY_BODY_ROOT { - let body_value = block.body.as_ssz_bytes(); - let bytes = (root_bytes.len() + body_value.len()) as u64; - let body_entries = vec![(root_bytes.clone(), body_value)]; + if header.body_root != *EMPTY_BODY_ROOT { + let body_entries = vec![(root_bytes.clone(), block.body.as_ssz_bytes())]; batch .put_batch(Table::BlockBodies, body_entries) .expect("put block body"); - bytes - } else { - 0 - }; + } - let sig_value = signatures.as_ssz_bytes(); - let sig_bytes = (root_bytes.len() + sig_value.len()) as u64; - let sig_entries = vec![(root_bytes, sig_value)]; + let sig_entries = vec![(root_bytes, signatures.as_ssz_bytes())]; batch .put_batch(Table::BlockSignatures, sig_entries) .expect("put block signatures"); - let written = BlockWriteBytes { - headers: header_bytes, - bodies: body_bytes, - signatures: sig_bytes, - }; - (block, written) + block } #[cfg(test)] @@ -1337,7 +1239,6 @@ mod tests { new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures_count: Arc::new(AtomicUsize::new(0)), - table_bytes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))), } } @@ -1349,7 +1250,6 @@ mod tests { new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), gossip_signatures_count: Arc::new(AtomicUsize::new(0)), - table_bytes: Arc::new(std::array::from_fn(|_| AtomicU64::new(0))), } } } From d595a96265ec03fe76379ccb5a5f6a02282463de Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Thu, 19 Mar 2026 16:58:51 -0300 Subject: [PATCH 4/4] Include memtable size in table bytes metric rocksdb.estimate-live-data-size only counts flushed SST files, so the metric stays at 0 until RocksDB triggers a compaction. Add rocksdb.cur-size-all-mem-tables to capture in-memory data too. --- crates/storage/src/backend/rocksdb.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 77398ba8..160ea4cc 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -66,11 +66,19 @@ impl StorageBackend for RocksDBBackend { let Some(cf) = self.db.cf_handle(cf_name(table)) else { return 0; }; - self.db + let sst_bytes = self + .db .property_int_value_cf(&cf, "rocksdb.estimate-live-data-size") .ok() .flatten() - .unwrap_or(0) + .unwrap_or(0); + let memtable_bytes = self + .db + .property_int_value_cf(&cf, "rocksdb.cur-size-all-mem-tables") + .ok() + .flatten() + .unwrap_or(0); + sst_bytes + memtable_bytes } }