Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions pip/pip-461.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# PIP-461: Add queued latency metrics for offloader executors

## Background knowledge

Apache Pulsar tiered storage offloaders move ledger data from BookKeeper into remote storage and read it back when
entries are no longer available locally. The jcloud-based implementation,
`BlobStoreManagedLedgerOffloader`, uses two `OrderedScheduler` instances:

- `scheduler` for offload, streaming offload, and delete tasks
- `readExecutor` for opening offloaded ledgers and serving subsequent read/close work

These tasks are not lightweight dispatch operations. They perform blocking remote-storage work such as multipart upload,
blob fetches, index reads, and stream reads. When many offload or read tasks are submitted concurrently, later tasks can
wait in the executor queue for a meaningful amount of time before they begin execution.

Pulsar already exposes a set of offloader metrics through `LedgerOffloaderStats`, such as offload bytes, read bytes,
read latency, and storage errors. Those metrics help identify remote-storage throughput and failures, but they do not
show whether tiered-storage requests are spending time waiting in the offloader executors before any I/O starts.

## Motivation

Today, an operator can observe that tiered storage reads or writes are slow, but cannot tell whether the delay is caused
by:

1. remote storage latency after a task starts running, or
2. queueing before the task gets a thread on the offloader executor.

This distinction matters in practice:

- long queueing on `scheduler` can delay offload completion and blob deletion
- long queueing on `readExecutor` can delay opening offloaded ledgers and serving reads from tiered storage
- the remediation is different for queue saturation than for remote storage slowness

Without dedicated queued-latency metrics, operators must infer saturation indirectly from logs or generic executor
behavior. That makes diagnosis slower and less reliable, especially when trying to distinguish write-path pressure from
read-path pressure in tiered storage.

## Goals

### In Scope

- Add one metric for queued latency on the offload/write executor path
- Add one metric for queued latency on the offload/read executor path
- Record the metrics in the jcloud offloader at the point where tasks are submitted to and begin running on the relevant
executors
- Keep the metrics aligned with the existing `LedgerOffloaderStats` topic/namespace labeling model

### Out of Scope

- Changing the thread counts or scheduling policy of offloader executors
- Reworking `OrderedScheduler`
- Adding queue-depth metrics or executor occupancy metrics
- Instrumenting every offloader implementation in the same PIP
- Measuring intentional timer delays introduced by `schedule(...)` calls

## High Level Design

This proposal adds two new summary metrics to `LedgerOffloaderStats`:

- `brk_ledgeroffloader_offload_executor_queue_latency`
- `brk_ledgeroffloader_read_offload_executor_queue_latency`

The metrics are recorded by capturing the enqueue time when a task is submitted and observing the elapsed time when the
task actually begins running on the executor thread.

At a high level:

1. When `BlobStoreManagedLedgerOffloader` submits blocking work to `scheduler`, Pulsar wraps the task and records the
queued time into the offload queue metric when execution starts.
2. When `BlobStoreManagedLedgerOffloader` submits blocking open/read work to `readExecutor`, Pulsar wraps the task and
records the queued time into the read queue metric when execution starts.
3. The offloaded read-handle implementations also wrap later `readAsync()` and `closeAsync()` tasks, so the metric
covers the full read lifecycle, not only the initial `readOffloaded()` open call.

This design intentionally measures only queue wait. Once execution begins, the existing offloader latency metrics remain
responsible for capturing remote-storage and ledger-read timing.

## Detailed Design

### Design & Implementation Details

The implementation adds two new methods to `LedgerOffloaderStats`:

- `recordOffloadExecutorQueueLatency`
- `recordReadOffloadExecutorQueueLatency`

`LedgerOffloaderStatsImpl` registers two new Prometheus `Summary` instances and records queued time in microseconds,
matching the unit conversion style used by existing offloader latency metrics.

The jcloud implementation introduces a small helper that wraps submitted tasks:

1. capture `System.nanoTime()` at submission time
2. when the wrapped task begins execution, compute `System.nanoTime() - enqueueTime`
3. record the value into the corresponding offloader queue metric
4. run the original task body

The helper is used in these paths:

- `BlobStoreManagedLedgerOffloader.offload(...)`
- `BlobStoreManagedLedgerOffloader.streamingOffload(...)`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can leave streamingOffload() out of scope for now, since this path does not seem to be wired into the current production offload flow yet. If you would still like to include it here, could we also note that there is an intentional delayed resubmission in BlobStoreManagedLedgerOffloader.streamingOffloadLoop() and subtract that delay from the queue-latency metric?

scheduler.chooseThread(segmentInfo)
        .schedule(() -> {
            streamingOffloadLoop(partId, dataObjectLength);
        }, 100, TimeUnit.MILLISECONDS);

Otherwise the metric may end up including this built-in 100ms delay, instead of only the actual executor queue time.

- `BlobStoreManagedLedgerOffloader.readOffloaded(...)`
- `BlobStoreManagedLedgerOffloader.deleteOffloaded(...)`
- `BlobStoreBackedReadHandleImpl.readAsync(...)`
- `BlobStoreBackedReadHandleImpl.closeAsync(...)`
- `BlobStoreBackedReadHandleImplV2.readAsync(...)`
- `BlobStoreBackedReadHandleImplV2.closeAsync(...)`

The metric uses the managed-ledger topic name when available. If a managed-ledger name is absent, the metric falls back
to the existing `"unknown"` label behavior already used by `LedgerOffloaderStatsImpl`.

Delayed tasks submitted with `schedule(...)` are intentionally excluded from this metric because their delay is part of
the scheduling logic, not executor saturation.

### Public-facing Changes

### Public API

No client-facing public API changes are introduced.

### Binary protocol

No binary protocol changes.

### Configuration

No configuration changes.

### CLI

No CLI changes.

### Metrics

1. `brk_ledgeroffloader_offload_executor_queue_latency`
- Description: Time that a blocking tiered-storage offload/write/delete task spends waiting in the offloader
executor queue before starting execution
- Attributes:
- `namespace`
- `topic` when topic-level offloader metrics are enabled
- Unit: microseconds

2. `brk_ledgeroffloader_read_offload_executor_queue_latency`
- Description: Time that a blocking tiered-storage read task spends waiting in the offloader read executor queue
before starting execution
- Attributes:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could be helpful to add one more attribute here, something like operation (or caller). These executors seem to handle more than just one kind of work, so separating values such as offload, delete, open, read, and close might make the queue-latency metrics easier to interpret in practice. For example, it would be easier to tell whether the latency is coming from read traffic itself or from extra work scheduled on the same executor.

- `namespace`
- `topic` when topic-level offloader metrics are enabled
- Unit: microseconds

Both metrics are exposed as Prometheus summaries and therefore provide summary count/sum output and configured
quantiles for the observed queue latency.

## Monitoring

Operators should use these metrics together with the existing offloader throughput and latency metrics:

- A rising `brk_ledgeroffloader_offload_executor_queue_latency` suggests the write/offload executor is saturated or has
too much blocking remote-storage work queued.
- A rising `brk_ledgeroffloader_read_offload_executor_queue_latency` suggests the read executor is saturated and remote
reads are waiting before they even begin.
- If queue latency remains low but the existing read/offload latency metrics are high, the bottleneck is more likely in
the remote storage operation itself rather than executor saturation.

Typical alerting strategies can use the summary quantiles or average latency from `_sum / _count`:

- warn when p95 or p99 queue latency remains elevated for several collection intervals
- investigate executor thread count, request concurrency, and remote-storage behavior when queue latency and error rates
rise together
- compare write-queue and read-queue behavior to determine whether pressure is isolated to one path

## Security Considerations

This proposal adds observability only. It does not introduce new endpoints, new permissions, or new cross-tenant data
access. The metric labels follow the existing offloader metrics model.

## Backward & Forward Compatibility

## Upgrade

No upgrade action is required.

After upgrading, operators may optionally add the new metrics to dashboards and alerts. Existing dashboards continue to
work unchanged.

## Downgrade / Rollback

If Pulsar is downgraded to a version without this proposal, the two new metrics will disappear. No data migration or
configuration rollback is needed.

Dashboards or alerts that depend on these metrics must be updated or disabled before rollback.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

No geo-replication-specific considerations apply. The proposal adds broker-local observability only and does not affect
replication semantics or metadata.

## Alternatives

1. Rely only on generic `OrderedScheduler` metrics

Generic scheduler metrics are useful but do not provide dedicated tiered-storage offloader observability through the
existing `LedgerOffloaderStats` surface. They also do not clearly separate offload queueing from read queueing at the
offloader component level.

2. Add queue-depth metrics instead of queued-latency metrics

Queue depth alone does not show how much delay requests actually experience. A latency metric is more directly useful
for operational diagnosis and alerting.

3. Measure total task duration only

Pulsar already has metrics for parts of remote-storage and ledger-read latency. The missing signal is the wait before
execution begins, which total duration does not isolate.

4. Instrument all offloader implementations in the same proposal

This would expand scope and delay delivery. The immediate problem is in `BlobStoreManagedLedgerOffloader`, where the
separate write and read executors run blocking remote-storage tasks.

## General Notes

This proposal keeps the metric names under the existing `LedgerOffloaderStats` namespace and does not introduce any new
configuration gate.

The initial implementation targets the jcloud offloader because that is where the observed scheduler and read-executor
queueing problem exists today.

## Links

* Mailing List discussion thread: https://lists.apache.org/thread/l3hyysy61yxwj3rcszqtghtf2y49kdpm
* Mailing List voting thread:
Loading