diff --git a/.claude/skills/ev-node-explainer/block-architecture.md b/.claude/skills/ev-node-explainer/block-architecture.md index 8d77e8711..61941de4b 100644 --- a/.claude/skills/ev-node-explainer/block-architecture.md +++ b/.claude/skills/ev-node-explainer/block-architecture.md @@ -51,6 +51,7 @@ type Component interface { ``` Startup order: + 1. Cache Manager (loads persisted state) 2. Syncer (begins sync workers) 3. Executor (begins production loop) - Aggregator only @@ -476,10 +477,6 @@ var ( DARetrievalFailures = prometheus.NewCounter(...) DAInclusionHeight = prometheus.NewGauge(...) - // Cache metrics - PendingHeadersCount = prometheus.NewGauge(...) - PendingDataCount = prometheus.NewGauge(...) - // Forced inclusion ForcedInclusionTxsInGracePeriod = prometheus.NewGauge(...) ForcedInclusionTxsMalicious = prometheus.NewCounter(...) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2b8e89a6..73ad6e939 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Improve cache handling when there is a significant backlog of pending headers and data. ([#3030](https://github.com/evstack/ev-node/pull/3030)) - Decrease MaxBytesSize to `5MB` to increase compatibility with public nodes. ([#3030](https://github.com/evstack/ev-node/pull/3030)) +- Proper counting of `DASubmitterPendingBlobs` metrics. [#3038](https://github.com/evstack/ev-node/pull/3038) ## v1.0.0-rc.1 diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index 13a041943..cb5a1aa97 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -63,8 +63,6 @@ type Metrics struct { DARetrievalSuccesses metrics.Counter DARetrievalFailures metrics.Counter DAInclusionHeight metrics.Gauge - PendingHeadersCount metrics.Gauge - PendingDataCount metrics.Gauge // Forced inclusion metrics ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period @@ -172,20 +170,6 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Height at which all blocks have been included in DA", }, labels).With(labelsAndValues...) - m.PendingHeadersCount = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "pending_headers_count", - Help: "Number of headers pending DA submission", - }, labels).With(labelsAndValues...) - - m.PendingDataCount = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "pending_data_count", - Help: "Number of data blocks pending DA submission", - }, labels).With(labelsAndValues...) - // Forced inclusion metrics m.ForcedInclusionTxsInGracePeriod = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -259,8 +243,6 @@ func NopMetrics() *Metrics { DARetrievalSuccesses: discard.NewCounter(), DARetrievalFailures: discard.NewCounter(), DAInclusionHeight: discard.NewGauge(), - PendingHeadersCount: discard.NewGauge(), - PendingDataCount: discard.NewGauge(), DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter), DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge), DASubmitterPendingBlobs: discard.NewGauge(), diff --git a/block/internal/common/metrics_test.go b/block/internal/common/metrics_test.go index c72c481b3..a493f03c0 100644 --- a/block/internal/common/metrics_test.go +++ b/block/internal/common/metrics_test.go @@ -30,8 +30,6 @@ func TestMetrics(t *testing.T) { assert.NotNil(t, em.DARetrievalSuccesses) assert.NotNil(t, em.DARetrievalFailures) assert.NotNil(t, em.DAInclusionHeight) - assert.NotNil(t, em.PendingHeadersCount) - assert.NotNil(t, em.PendingDataCount) }) t.Run("NopMetrics", func(t *testing.T) { @@ -56,8 +54,6 @@ func TestMetrics(t *testing.T) { assert.NotNil(t, em.DARetrievalSuccesses) assert.NotNil(t, em.DARetrievalFailures) assert.NotNil(t, em.DAInclusionHeight) - assert.NotNil(t, em.PendingHeadersCount) - assert.NotNil(t, em.PendingDataCount) // Verify no-op metrics don't panic when used em.Height.Set(100) @@ -113,8 +109,6 @@ func TestMetricsIntegration(t *testing.T) { em.DARetrievalSuccesses.Add(9) em.DARetrievalFailures.Add(1) em.DAInclusionHeight.Set(995) - em.PendingHeadersCount.Set(3) - em.PendingDataCount.Set(2) } func TestMetricsSubsystem(t *testing.T) { diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 5a6e3d38f..dde6140c5 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -235,7 +235,6 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed "header", s.client.GetHeaderNamespace(), []byte(s.config.DA.SubmitOptions), - func() uint64 { return cache.NumPendingHeaders() }, ) } @@ -435,7 +434,6 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types. "data", s.client.GetDataNamespace(), []byte(s.config.DA.SubmitOptions), - func() uint64 { return cache.NumPendingData() }, ) } @@ -545,7 +543,6 @@ func submitToDA[T any]( itemType string, namespace []byte, options []byte, - getTotalPendingFn func() uint64, ) error { if len(items) != len(marshaled) { return fmt.Errorf("items length (%d) does not match marshaled length (%d)", len(items), len(marshaled)) @@ -570,11 +567,6 @@ func submitToDA[T any]( marshaled = batchMarshaled } - // Update pending blobs metric to track total backlog - if getTotalPendingFn != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) - } - // Start the retry loop for rs.Attempt < pol.MaxAttempts { // Record resend metric for retry attempts (not the first attempt) @@ -615,20 +607,12 @@ func submitToDA[T any]( s.logger.Info().Str("itemType", itemType).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") if int(res.SubmittedCount) == len(items) { rs.Next(reasonSuccess, pol) - // Update pending blobs metric to reflect total backlog - if getTotalPendingFn != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) - } return nil } // partial success: advance window items = items[res.SubmittedCount:] marshaled = marshaled[res.SubmittedCount:] rs.Next(reasonSuccess, pol) - // Update pending blobs count to reflect total backlog - if getTotalPendingFn != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) - } case datypes.StatusTooBig: // Record failure metric @@ -649,10 +633,6 @@ func submitToDA[T any]( marshaled = marshaled[:half] s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") rs.Next(reasonTooBig, pol) - // Update pending blobs count to reflect total backlog - if getTotalPendingFn != nil { - s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) - } case datypes.StatusNotIncludedInBlock: // Record failure metric diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 3b30a208c..2421b5aab 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -80,7 +80,6 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { "item", nsBz, opts, - nil, ) assert.NoError(t, err) @@ -129,7 +128,6 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { "item", nsBz, opts, - nil, ) assert.NoError(t, err) assert.Equal(t, []float64{-1, -1}, usedGas) @@ -180,7 +178,6 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { "item", nsBz, opts, - nil, ) assert.NoError(t, err) assert.Equal(t, []int{4, 2}, batchSizes) @@ -225,7 +222,6 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { "item", nsBz, opts, - nil, ) assert.NoError(t, err) assert.Equal(t, []float64{-1, -1}, usedGas) @@ -269,7 +265,6 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { "item", nsBz, opts, - nil, ) assert.NoError(t, err) assert.Equal(t, 3, totalSubmitted) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 3e1d96777..68fadbef0 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -185,6 +185,7 @@ func (s *Submitter) daSubmissionLoop() { case <-ticker.C: // Check if we should submit headers based on batching strategy headersNb := s.cache.NumPendingHeaders() + if headersNb > 0 { lastSubmitNanos := s.lastHeaderSubmit.Load() timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos)) @@ -305,6 +306,9 @@ func (s *Submitter) daSubmissionLoop() { }() } } + + // Update metrics with current pending counts + s.metrics.DASubmitterPendingBlobs.Set(float64(headersNb + dataNb)) } } } diff --git a/docs/learn/specs/block-manager.md b/docs/learn/specs/block-manager.md index c97171f90..0801bd541 100644 --- a/docs/learn/specs/block-manager.md +++ b/docs/learn/specs/block-manager.md @@ -682,6 +682,7 @@ The block components expose comprehensive metrics for monitoring through the sha ### DA Metrics (Submitter and Syncer Components) +- `da_submitter_pending_blobs`: Total of Header/Data pending blobs - `da_submission_attempts_total`: Total DA submission attempts - `da_submission_success_total`: Successful DA submissions - `da_submission_failure_total`: Failed DA submissions @@ -689,8 +690,6 @@ The block components expose comprehensive metrics for monitoring through the sha - `da_retrieval_success_total`: Successful DA retrievals - `da_retrieval_failure_total`: Failed DA retrievals - `da_height`: Current DA retrieval height -- `pending_headers_count`: Number of headers pending DA submission -- `pending_data_count`: Number of data blocks pending DA submission ### Sync Metrics (Syncer Component) diff --git a/test/mocks/store.go b/test/mocks/store.go index 84424ea01..7f2aa180d 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -880,37 +880,3 @@ func (_c *MockStore_SetMetadata_Call) RunAndReturn(run func(ctx context.Context, _c.Call.Return(run) return _c } - -// MockStore_Sync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sync' -type MockStore_Sync_Call struct { - *mock.Call -} - -// Sync is a helper method to define mock.On call -// - ctx context.Context -func (_e *MockStore_Expecter) Sync(ctx interface{}) *MockStore_Sync_Call { - return &MockStore_Sync_Call{Call: _e.mock.On("Sync", ctx)} -} - -func (_c *MockStore_Sync_Call) Run(run func(ctx context.Context)) *MockStore_Sync_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *MockStore_Sync_Call) Return(err error) *MockStore_Sync_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockStore_Sync_Call) RunAndReturn(run func(ctx context.Context) error) *MockStore_Sync_Call { - _c.Call.Return(run) - return _c -}