From 80eed7db4619028ad63998f48d0c1302462c26c4 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 12:45:07 +0800 Subject: [PATCH 1/8] [improve][client] Extract OTel metrics into ProducerMetrics/ConsumerMetrics classes --- .../pulsar/client/impl/ConsumerImpl.java | 61 ++----- .../pulsar/client/impl/ProducerImpl.java | 47 +---- .../client/impl/metrics/ConsumerMetrics.java | 89 +++++++++ .../client/impl/metrics/ProducerMetrics.java | 95 ++++++++++ .../impl/metrics/ConsumerMetricsTest.java | 172 ++++++++++++++++++ .../impl/metrics/ProducerMetricsTest.java | 144 +++++++++++++++ 6 files changed, 523 insertions(+), 85 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 03e6d6df38245..b2f72c7ec1809 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -34,7 +34,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.concurrent.FastThreadLocal; -import io.opentelemetry.api.common.Attributes; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -98,10 +98,8 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; -import org.apache.pulsar.client.impl.metrics.Counter; +import org.apache.pulsar.client.impl.metrics.ConsumerMetrics; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; -import org.apache.pulsar.client.impl.metrics.Unit; -import org.apache.pulsar.client.impl.metrics.UpDownCounter; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -228,16 +226,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final boolean createTopicIfDoesNotExist; private final boolean poolMessages; - private final Counter messagesReceivedCounter; - private final Counter bytesReceivedCounter; - private final UpDownCounter messagesPrefetchedGauge; - private final UpDownCounter bytesPrefetchedGauge; - private final Counter consumersOpenedCounter; - private final Counter consumersClosedCounter; - private final Counter consumerAcksCounter; - private final Counter consumerNacksCounter; - - private final Counter consumerDlqMessagesCounter; + private final ConsumerMetrics consumerMetrics; private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final AtomicInteger previousExceptionCount = new AtomicInteger(); @@ -421,29 +410,10 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat topicNameWithoutPartition = topicName.getPartitionedTopicName(); InstrumentProvider ip = client.instrumentProvider(); - Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build(); - consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions, - "The number of consumer sessions opened", topic, attrs); - consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions, - "The number of consumer sessions closed", topic, attrs); - messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages, - "The number of messages explicitly received by the consumer application", topic, attrs); - bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes, - "The number of bytes explicitly received by the consumer application", topic, attrs); - messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages, - "The number of messages currently sitting in the consumer receive queue", topic, attrs); - bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes, - "The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs); - - consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages, - "The number of acknowledged messages", topic, attrs); - consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages, - "The number of negatively acknowledged messages", topic, attrs); - consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, - "The number of messages sent to DLQ", topic, attrs); + consumerMetrics = new ConsumerMetrics(ip, topic, subscription); grabCnx(); - consumersOpenedCounter.increment(); + consumerMetrics.recordConsumerOpened(); } public ConnectionHandler getConnectionHandler() { @@ -607,7 +577,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn) { - consumerAcksCounter.increment(); + consumerMetrics.recordAck(); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); @@ -630,7 +600,7 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { - consumerAcksCounter.increment(); + consumerMetrics.recordAck(); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); @@ -721,7 +691,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a copyMessageKeysIfNeeded(message, typedMessageBuilderNew); copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { - consumerDlqMessagesCounter.increment(); + consumerMetrics.recordDlq(); doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { result.complete(null); @@ -836,7 +806,7 @@ private static void copyMessageEventTime(Message message, @Override public void negativeAcknowledge(MessageId messageId) { - consumerNacksCounter.increment(); + consumerMetrics.recordNack(); negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" @@ -845,7 +815,7 @@ public void negativeAcknowledge(MessageId messageId) { @Override public void negativeAcknowledge(Message message) { - consumerNacksCounter.increment(); + consumerMetrics.recordNack(); negativeAcksTracker.add(message); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" @@ -1186,7 +1156,7 @@ public synchronized CompletableFuture closeAsync() { return compositeCloseFuture; } - consumersClosedCounter.increment(); + consumerMetrics.recordConsumerClosed(); if (!isConnected()) { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); @@ -1370,8 +1340,7 @@ protected MessageImpl newMessage(final MessageIdImpl messageId, } private void executeNotifyCallback(final MessageImpl message) { - messagesPrefetchedGauge.increment(); - bytesPrefetchedGauge.add(message.size()); + consumerMetrics.recordMessagePrefetched(message.size()); // Enqueue the message so that it can be retrieved when application calls receive() // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. @@ -1870,11 +1839,7 @@ protected synchronized void messageProcessed(Message msg) { ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); lastDequeuedMessageId = msg.getMessageId(); - messagesPrefetchedGauge.decrement(); - messagesReceivedCounter.increment(); - - bytesPrefetchedGauge.subtract(msg.size()); - bytesReceivedCounter.add(msg.size()); + consumerMetrics.recordMessageReceived(msg.size()); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 8b7fec005b820..ec2e7e3bee6c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -81,11 +81,9 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; -import org.apache.pulsar.client.impl.metrics.Counter; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.metrics.LatencyHistogram; -import org.apache.pulsar.client.impl.metrics.Unit; -import org.apache.pulsar.client.impl.metrics.UpDownCounter; +import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.client.impl.transaction.TransactionImpl; @@ -185,14 +183,9 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private boolean errorState; - private final LatencyHistogram latencyHistogram; + private final ProducerMetrics producerMetrics; + // rpcLatencyHistogram 需要传递给 OpSendMsg,保留包级别访问 final LatencyHistogram rpcLatencyHistogram; - private final Counter publishedBytesCounter; - private final UpDownCounter pendingMessagesUpDownCounter; - private final UpDownCounter pendingBytesUpDownCounter; - - private final Counter producersOpenedCounter; - private final Counter producersClosedCounter; private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure; // This variable can be exposed as a metrics in the future, a PIP is needed. private final AtomicInteger pendingQueueFullCounter; @@ -296,30 +289,14 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration } InstrumentProvider ip = client.instrumentProvider(); - latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration", - "Publish latency experienced by the application, includes client batching time", topic, - Attributes.empty()); - rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration", - "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic, - Attributes.empty()); - publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size", - Unit.Bytes, "The number of bytes published", topic, Attributes.empty()); - pendingMessagesUpDownCounter = - ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages, - "The number of messages in the producer internal send queue, waiting to be sent", topic, - Attributes.empty()); - pendingBytesUpDownCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes, - "The size of the messages in the producer internal queue, waiting to sent", topic, Attributes.empty()); - producersOpenedCounter = ip.newCounter("pulsar.client.producer.opened", Unit.Sessions, - "The number of producer sessions opened", topic, Attributes.empty()); - producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions, - "The number of producer sessions closed", topic, Attributes.empty()); + producerMetrics = new ProducerMetrics(ip, topic); + rpcLatencyHistogram = producerMetrics.rpcLatencyHistogram; pendingQueueFullCounter = new AtomicInteger(); this.connectionHandler = initConnectionHandler(); setChunkMaxMessageSize(); grabCnx(); - producersOpenedCounter.increment(); + producerMetrics.recordProducerOpened(); } @VisibleForTesting @@ -400,8 +377,7 @@ CompletableFuture internalSendAsync(Message message) { } int msgSize = interceptorMessage.getDataBuffer().readableBytes(); - pendingMessagesUpDownCounter.increment(); - pendingBytesUpDownCounter.add(msgSize); + producerMetrics.recordPendingMessage(msgSize); sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage, msgSize)); return future; @@ -460,8 +436,6 @@ private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl< long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback) ? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt; long latencyNanos = System.nanoTime() - createdAt; - pendingMessagesUpDownCounter.decrement(); - pendingBytesUpDownCounter.subtract(msgSize); ByteBuf payload = msg.getDataBuffer(); if (payload == null) { log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", @@ -469,13 +443,12 @@ private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl< } try { if (e != null) { - latencyHistogram.recordFailure(latencyNanos); + producerMetrics.recordSendFailed(latencyNanos, msgSize); stats.incrementSendFailed(); onSendAcknowledgement(msg, null, e); sendCallback.getFuture().completeExceptionally(e); } else { - latencyHistogram.recordSuccess(latencyNanos); - publishedBytesCounter.add(msgSize); + producerMetrics.recordSendSuccess(latencyNanos, msgSize); stats.incrementNumAcksReceived(latencyNanos); onSendAcknowledgement(msg, msg.getMessageId(), null); sendCallback.getFuture().complete(msg.getMessageId()); @@ -1241,7 +1214,7 @@ public synchronized CompletableFuture closeAsync() { return CompletableFuture.completedFuture(null); } - producersClosedCounter.increment(); + producerMetrics.recordProducerClosed(); closeProducerTasks(); ClientCnx cnx = cnx(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java new file mode 100644 index 0000000000000..708dbb48faa77 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.metrics; + +import io.opentelemetry.api.common.Attributes; + +public class ConsumerMetrics { + + private final Counter messagesReceivedCounter; + private final Counter bytesReceivedCounter; + private final UpDownCounter messagesPrefetchedGauge; + private final UpDownCounter bytesPrefetchedGauge; + private final Counter consumersOpenedCounter; + private final Counter consumersClosedCounter; + private final Counter consumerAcksCounter; + private final Counter consumerNacksCounter; + private final Counter consumerDlqMessagesCounter; + + public ConsumerMetrics(InstrumentProvider ip, String topic, String subscription) { + Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build(); + + consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions, + "The number of consumer sessions opened", topic, attrs); + consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions, + "The number of consumer sessions closed", topic, attrs); + messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages, + "The number of messages explicitly received by the consumer application", topic, attrs); + bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes, + "The number of bytes explicitly received by the consumer application", topic, attrs); + messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages, + "The number of messages currently sitting in the consumer receive queue", topic, attrs); + bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes, + "The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs); + consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages, + "The number of acknowledged messages", topic, attrs); + consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages, + "The number of negatively acknowledged messages", topic, attrs); + consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, + "The number of messages sent to DLQ", topic, attrs); + } + + public void recordMessagePrefetched(int msgSize) { + messagesPrefetchedGauge.increment(); + bytesPrefetchedGauge.add(msgSize); + } + + public void recordMessageReceived(int msgSize) { + messagesPrefetchedGauge.decrement(); + bytesPrefetchedGauge.subtract(msgSize); + messagesReceivedCounter.increment(); + bytesReceivedCounter.add(msgSize); + } + + public void recordAck() { + consumerAcksCounter.increment(); + } + + public void recordNack() { + consumerNacksCounter.increment(); + } + + public void recordDlq() { + consumerDlqMessagesCounter.increment(); + } + + public void recordConsumerOpened() { + consumersOpenedCounter.increment(); + } + + public void recordConsumerClosed() { + consumersClosedCounter.increment(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java new file mode 100644 index 0000000000000..5ec7d4dbc1156 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.metrics; + +import io.opentelemetry.api.common.Attributes; + +public class ProducerMetrics { + + private final LatencyHistogram sendLatencyHistogram; + public final LatencyHistogram rpcLatencyHistogram; + private final Counter publishedBytesCounter; + private final UpDownCounter pendingMessagesUpDownCounter; + private final UpDownCounter pendingBytesUpDownCounter; + private final Counter producersOpenedCounter; + private final Counter producersClosedCounter; + + public ProducerMetrics(InstrumentProvider ip, String topic) { + sendLatencyHistogram = ip.newLatencyHistogram( + "pulsar.client.producer.message.send.duration", + "Publish latency experienced by the application, includes client batching time", + topic, Attributes.empty()); + + rpcLatencyHistogram = ip.newLatencyHistogram( + "pulsar.client.producer.rpc.send.duration", + "Publish RPC latency experienced internally by the client when sending data to receiving an ack", + topic, Attributes.empty()); + + publishedBytesCounter = ip.newCounter( + "pulsar.client.producer.message.send.size", + Unit.Bytes, "The number of bytes published", + topic, Attributes.empty()); + + pendingMessagesUpDownCounter = ip.newUpDownCounter( + "pulsar.client.producer.message.pending.count", Unit.Messages, + "The number of messages in the producer internal send queue, waiting to be sent", + topic, Attributes.empty()); + + pendingBytesUpDownCounter = ip.newUpDownCounter( + "pulsar.client.producer.message.pending.size", Unit.Bytes, + "The size of the messages in the producer internal queue, waiting to sent", + topic, Attributes.empty()); + + producersOpenedCounter = ip.newCounter( + "pulsar.client.producer.opened", Unit.Sessions, + "The number of producer sessions opened", + topic, Attributes.empty()); + + producersClosedCounter = ip.newCounter( + "pulsar.client.producer.closed", Unit.Sessions, + "The number of producer sessions closed", + topic, Attributes.empty()); + } + + public void recordPendingMessage(int msgSize) { + pendingMessagesUpDownCounter.increment(); + pendingBytesUpDownCounter.add(msgSize); + } + + public void recordSendSuccess(long latencyNanos, int msgSize) { + pendingMessagesUpDownCounter.decrement(); + pendingBytesUpDownCounter.subtract(msgSize); + sendLatencyHistogram.recordSuccess(latencyNanos); + publishedBytesCounter.add(msgSize); + } + + public void recordSendFailed(long latencyNanos, int msgSize) { + pendingMessagesUpDownCounter.decrement(); + pendingBytesUpDownCounter.subtract(msgSize); + sendLatencyHistogram.recordFailure(latencyNanos); + } + + public void recordProducerOpened() { + producersOpenedCounter.increment(); + } + + public void recordProducerClosed() { + producersClosedCounter.increment(); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java new file mode 100644 index 0000000000000..c2ea2afd5f60f --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.metrics; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ConsumerMetricsTest { + + private InMemoryMetricReader metricReader; + private InstrumentProvider instrumentProvider; + private ConsumerMetrics consumerMetrics; + + @BeforeMethod + public void setup() { + metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + instrumentProvider = new InstrumentProvider(openTelemetry); + consumerMetrics = new ConsumerMetrics(instrumentProvider, + "persistent://public/default/test-topic", "test-sub"); + } + + @Test + public void testRecordConsumerOpenedAndClosed() { + consumerMetrics.recordConsumerOpened(); + consumerMetrics.recordConsumerOpened(); + consumerMetrics.recordConsumerClosed(); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.consumer.opened", 2); + assertLongSumMetric(metrics, "pulsar.client.consumer.closed", 1); + } + + @Test + public void testRecordMessagePrefetched() { + consumerMetrics.recordMessagePrefetched(100); + consumerMetrics.recordMessagePrefetched(200); + + Collection metrics = metricReader.collectAllMetrics(); + + // 2 messages in receive queue, 300 bytes total + assertLongSumMetric(metrics, "pulsar.client.consumer.receive_queue.count", 2); + assertLongSumMetric(metrics, "pulsar.client.consumer.receive_queue.size", 300); + } + + @Test + public void testRecordMessageReceived() { + consumerMetrics.recordMessagePrefetched(512); + consumerMetrics.recordMessagePrefetched(256); + + consumerMetrics.recordMessageReceived(512); + + Collection metrics = metricReader.collectAllMetrics(); + + // 1 message (256 bytes) remaining in receive queue + assertLongSumMetric(metrics, "pulsar.client.consumer.receive_queue.count", 1); + assertLongSumMetric(metrics, "pulsar.client.consumer.receive_queue.size", 256); + // 1 message (512 bytes) delivered to application + assertLongSumMetric(metrics, "pulsar.client.consumer.message.received.count", 1); + assertLongSumMetric(metrics, "pulsar.client.consumer.message.received.size", 512); + } + + @Test + public void testRecordAck() { + consumerMetrics.recordAck(); + consumerMetrics.recordAck(); + consumerMetrics.recordAck(); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.consumer.message.ack", 3); + } + + @Test + public void testRecordNack() { + consumerMetrics.recordNack(); + consumerMetrics.recordNack(); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.consumer.message.nack", 2); + } + + @Test + public void testRecordDlq() { + consumerMetrics.recordDlq(); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.consumer.message.dlq", 1); + } + + @Test + public void testSubscriptionAttributeIsAttached() { + consumerMetrics.recordAck(); + + Collection metrics = metricReader.collectAllMetrics(); + + MetricData ackMetric = findMetric(metrics, "pulsar.client.consumer.message.ack"); + assertNotNull(ackMetric); + boolean hasSubscriptionAttr = ackMetric.getLongSumData().getPoints().stream() + .anyMatch(p -> "test-sub".equals(p.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("pulsar.subscription")))); + assertEquals(hasSubscriptionAttr, true); + } + + @Test + public void testReceiveQueueDrainToZero() { + consumerMetrics.recordMessagePrefetched(100); + consumerMetrics.recordMessagePrefetched(200); + consumerMetrics.recordMessagePrefetched(300); + + consumerMetrics.recordMessageReceived(100); + consumerMetrics.recordMessageReceived(200); + consumerMetrics.recordMessageReceived(300); + + Collection metrics = metricReader.collectAllMetrics(); + + // receive queue drained to zero + assertLongSumMetric(metrics, "pulsar.client.consumer.receive_queue.count", 0); + assertLongSumMetric(metrics, "pulsar.client.consumer.receive_queue.size", 0); + // 3 messages (600 bytes) delivered to application + assertLongSumMetric(metrics, "pulsar.client.consumer.message.received.count", 3); + assertLongSumMetric(metrics, "pulsar.client.consumer.message.received.size", 600); + } + + private MetricData findMetric(Collection metrics, String name) { + return metrics.stream() + .filter(m -> m.getName().equals(name)) + .findFirst() + .orElse(null); + } + + private void assertLongSumMetric(Collection metrics, String name, long expectedValue) { + MetricData metric = findMetric(metrics, name); + assertNotNull(metric, "Metric should exist: " + name); + long actual = metric.getLongSumData().getPoints().stream() + .mapToLong(LongPointData::getValue).sum(); + assertEquals(actual, expectedValue, "Unexpected metric value: " + name); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java new file mode 100644 index 0000000000000..8e74e828d24ea --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.metrics; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ProducerMetricsTest { + + private InMemoryMetricReader metricReader; + private InstrumentProvider instrumentProvider; + private ProducerMetrics producerMetrics; + + @BeforeMethod + public void setup() { + metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + instrumentProvider = new InstrumentProvider(openTelemetry); + producerMetrics = new ProducerMetrics(instrumentProvider, "persistent://public/default/test-topic"); + } + + @Test + public void testRecordProducerOpenedAndClosed() { + producerMetrics.recordProducerOpened(); + producerMetrics.recordProducerOpened(); + producerMetrics.recordProducerClosed(); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.producer.opened", 2); + assertLongSumMetric(metrics, "pulsar.client.producer.closed", 1); + } + + @Test + public void testRecordPendingMessage() { + producerMetrics.recordPendingMessage(100); + producerMetrics.recordPendingMessage(200); + + Collection metrics = metricReader.collectAllMetrics(); + + // 2 messages pending, 300 bytes pending + assertLongSumMetric(metrics, "pulsar.client.producer.message.pending.count", 2); + assertLongSumMetric(metrics, "pulsar.client.producer.message.pending.size", 300); + } + + @Test + public void testRecordSendSuccess() { + producerMetrics.recordPendingMessage(512); + producerMetrics.recordPendingMessage(256); + + long latencyNanos = TimeUnit.MILLISECONDS.toNanos(5); + producerMetrics.recordSendSuccess(latencyNanos, 512); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.producer.message.pending.count", 1); + assertLongSumMetric(metrics, "pulsar.client.producer.message.pending.size", 256); + assertLongSumMetric(metrics, "pulsar.client.producer.message.send.size", 512); + assertHistogramCount(metrics, "pulsar.client.producer.message.send.duration", 1); + } + + @Test + public void testRecordSendFailed() { + producerMetrics.recordPendingMessage(128); + + long latencyNanos = TimeUnit.MILLISECONDS.toNanos(10); + producerMetrics.recordSendFailed(latencyNanos, 128); + + Collection metrics = metricReader.collectAllMetrics(); + + assertLongSumMetric(metrics, "pulsar.client.producer.message.pending.count", 0); + assertLongSumMetric(metrics, "pulsar.client.producer.message.pending.size", 0); + assertNull(findMetric(metrics, "pulsar.client.producer.message.send.size"), + "send.size should not be recorded on failure"); + assertHistogramCount(metrics, "pulsar.client.producer.message.send.duration", 1); + } + + @Test + public void testRpcLatencyHistogramIsAccessible() { + assertNotNull(producerMetrics.rpcLatencyHistogram); + + producerMetrics.rpcLatencyHistogram.recordSuccess(TimeUnit.MILLISECONDS.toNanos(3)); + producerMetrics.rpcLatencyHistogram.recordFailure(TimeUnit.MILLISECONDS.toNanos(1)); + + Collection metrics = metricReader.collectAllMetrics(); + assertHistogramCount(metrics, "pulsar.client.producer.rpc.send.duration", 2); + } + + private MetricData findMetric(Collection metrics, String name) { + return metrics.stream() + .filter(m -> m.getName().equals(name)) + .findFirst() + .orElse(null); + } + + private void assertLongSumMetric(Collection metrics, String name, long expectedValue) { + MetricData metric = findMetric(metrics, name); + assertNotNull(metric, "Metric should exist: " + name); + long actual = metric.getLongSumData().getPoints().stream() + .mapToLong(LongPointData::getValue).sum(); + assertEquals(actual, expectedValue, "Unexpected metric value: " + name); + } + + private void assertHistogramCount(Collection metrics, String name, long expectedCount) { + MetricData metric = findMetric(metrics, name); + assertNotNull(metric, "Histogram metric should exist: " + name); + long actual = metric.getHistogramData().getPoints().stream() + .mapToLong(HistogramPointData::getCount).sum(); + assertEquals(actual, expectedCount, "Unexpected histogram count: " + name); + } +} From a482e53fee2179ab8cae3bafbab2bb148e4100bb Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 12:46:34 +0800 Subject: [PATCH 2/8] fix test --- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 3 +-- .../apache/pulsar/client/impl/metrics/ProducerMetrics.java | 6 +++++- .../pulsar/client/impl/metrics/ProducerMetricsTest.java | 6 +++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index ec2e7e3bee6c9..4f255d835bd40 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -184,7 +184,6 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private boolean errorState; private final ProducerMetrics producerMetrics; - // rpcLatencyHistogram 需要传递给 OpSendMsg,保留包级别访问 final LatencyHistogram rpcLatencyHistogram; private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure; // This variable can be exposed as a metrics in the future, a PIP is needed. @@ -290,7 +289,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration InstrumentProvider ip = client.instrumentProvider(); producerMetrics = new ProducerMetrics(ip, topic); - rpcLatencyHistogram = producerMetrics.rpcLatencyHistogram; + rpcLatencyHistogram = producerMetrics.getRpcLatencyHistogram(); pendingQueueFullCounter = new AtomicInteger(); this.connectionHandler = initConnectionHandler(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java index 5ec7d4dbc1156..aab9dbb53001e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java @@ -23,7 +23,7 @@ public class ProducerMetrics { private final LatencyHistogram sendLatencyHistogram; - public final LatencyHistogram rpcLatencyHistogram; + private final LatencyHistogram rpcLatencyHistogram; private final Counter publishedBytesCounter; private final UpDownCounter pendingMessagesUpDownCounter; private final UpDownCounter pendingBytesUpDownCounter; @@ -92,4 +92,8 @@ public void recordProducerOpened() { public void recordProducerClosed() { producersClosedCounter.increment(); } + + public LatencyHistogram getRpcLatencyHistogram() { + return rpcLatencyHistogram; + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java index 8e74e828d24ea..a67d785ab7a16 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java @@ -110,10 +110,10 @@ public void testRecordSendFailed() { @Test public void testRpcLatencyHistogramIsAccessible() { - assertNotNull(producerMetrics.rpcLatencyHistogram); + assertNotNull(producerMetrics.getRpcLatencyHistogram()); - producerMetrics.rpcLatencyHistogram.recordSuccess(TimeUnit.MILLISECONDS.toNanos(3)); - producerMetrics.rpcLatencyHistogram.recordFailure(TimeUnit.MILLISECONDS.toNanos(1)); + producerMetrics.getRpcLatencyHistogram().recordSuccess(TimeUnit.MILLISECONDS.toNanos(3)); + producerMetrics.getRpcLatencyHistogram().recordFailure(TimeUnit.MILLISECONDS.toNanos(1)); Collection metrics = metricReader.collectAllMetrics(); assertHistogramCount(metrics, "pulsar.client.producer.rpc.send.duration", 2); From d2f115c6612cc15591524d4a4b46b52da8261908 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 13:00:50 +0800 Subject: [PATCH 3/8] fix test --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 1 - .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 1 - 2 files changed, 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b2f72c7ec1809..a3ce56f509159 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -34,7 +34,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.concurrent.FastThreadLocal; - import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 4f255d835bd40..7097b703bd31e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -44,7 +44,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.ScheduledFuture; -import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; From 893ee57a459d0ca519681a7218724ff61e6e1c02 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 17:14:15 +0800 Subject: [PATCH 4/8] opti method name and log print --- .../impl/BatchMessageContainerImpl.java | 4 +-- .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../pulsar/client/impl/ProducerImpl.java | 31 +++++++++---------- .../client/impl/metrics/ConsumerMetrics.java | 2 +- .../client/impl/metrics/ProducerMetrics.java | 16 ++++++---- 5 files changed, 29 insertions(+), 26 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 57946bd7a699e..88c94c0a03be4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -278,7 +278,7 @@ public OpSendMsg createOpSendMsg() throws IOException { // Because when invoke `ProducerImpl.processOpSendMsg` on flush, // if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush // messageContainers before publishing this one-batch message. - op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(), + op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(), firstCallback, batchAllocatedSizeBytes); // NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the @@ -332,7 +332,7 @@ public OpSendMsg createOpSendMsg() throws IOException { messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes()); } - OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(), + OpSendMsg op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(), messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes); op.setNumMessagesInBatch(numMessagesInBatch); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a3ce56f509159..ece1f98f3ae9d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -690,7 +690,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a copyMessageKeysIfNeeded(message, typedMessageBuilderNew); copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { - consumerMetrics.recordDlq(); + consumerMetrics.recordDlqMessageSent(); doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { result.complete(null); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 7097b703bd31e..0015e574801a3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -81,7 +81,6 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; -import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.SchemaUtils; @@ -182,8 +181,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private boolean errorState; - private final ProducerMetrics producerMetrics; - final LatencyHistogram rpcLatencyHistogram; + final ProducerMetrics producerMetrics; private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure; // This variable can be exposed as a metrics in the future, a PIP is needed. private final AtomicInteger pendingQueueFullCounter; @@ -288,7 +286,6 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration InstrumentProvider ip = client.instrumentProvider(); producerMetrics = new ProducerMetrics(ip, topic); - rpcLatencyHistogram = producerMetrics.getRpcLatencyHistogram(); pendingQueueFullCounter = new AtomicInteger(); this.connectionHandler = initConnectionHandler(); @@ -797,9 +794,11 @@ private void serializeAndSendMessage(MessageImpl msg, if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) { ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata, encryptedPayload); - op = OpSendMsg.create(rpcLatencyHistogram, msg, cmd, sequenceId, callback); + op = OpSendMsg.create(producerMetrics, msg, cmd, sequenceId, callback); + } else { - op = OpSendMsg.create(rpcLatencyHistogram, msg, null, sequenceId, callback); + op = OpSendMsg.create(producerMetrics, msg, null, sequenceId, callback); + final MessageMetadata finalMsgMetadata = msgMetadata; op.rePopulate = () -> { if (msgMetadata.hasChunkId()) { @@ -1549,7 +1548,7 @@ public ReferenceCounted touch(Object hint) { } protected static final class OpSendMsg { - LatencyHistogram rpcLatencyHistogram; + ProducerMetrics producerMetrics; MessageImpl msg; List> msgs; ByteBufPair cmd; @@ -1569,7 +1568,7 @@ protected static final class OpSendMsg { int chunkId = -1; void initialize() { - rpcLatencyHistogram = null; + producerMetrics = null; msg = null; msgs = null; cmd = null; @@ -1589,11 +1588,11 @@ void initialize() { chunkedMessageCtx = null; } - static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl msg, ByteBufPair cmd, + static OpSendMsg create(ProducerMetrics producerMetrics, MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) { OpSendMsg op = RECYCLER.get(); op.initialize(); - op.rpcLatencyHistogram = rpcLatencyHistogram; + op.producerMetrics = producerMetrics; op.msg = msg; op.cmd = cmd; op.callback = callback; @@ -1603,11 +1602,11 @@ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl msg return op; } - static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List> msgs, ByteBufPair cmd, + static OpSendMsg create(ProducerMetrics producerMetrics, List> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback, int batchAllocatedSize) { OpSendMsg op = RECYCLER.get(); op.initialize(); - op.rpcLatencyHistogram = rpcLatencyHistogram; + op.producerMetrics = producerMetrics; op.msgs = msgs; op.cmd = cmd; op.callback = callback; @@ -1621,12 +1620,12 @@ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List> msgs, ByteBufPair cmd, + static OpSendMsg create(ProducerMetrics producerMetrics, List> msgs, ByteBufPair cmd, long lowestSequenceId, long highestSequenceId, SendCallback callback, int batchAllocatedSize) { OpSendMsg op = RECYCLER.get(); op.initialize(); - op.rpcLatencyHistogram = rpcLatencyHistogram; + op.producerMetrics = producerMetrics; op.msgs = msgs; op.cmd = cmd; op.callback = callback; @@ -1678,9 +1677,9 @@ void sendComplete(final Exception e) { } if (e == null) { - rpcLatencyHistogram.recordSuccess(now - this.lastSentAt); + producerMetrics.recordRpcLatencySuccess(now - this.lastSentAt); } else { - rpcLatencyHistogram.recordFailure(now - this.lastSentAt); + producerMetrics.recordRpcLatencyFailure(now - this.lastSentAt); } OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java index 708dbb48faa77..d2380a8879881 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java @@ -75,7 +75,7 @@ public void recordNack() { consumerNacksCounter.increment(); } - public void recordDlq() { + public void recordDlqMessageSent() { consumerDlqMessagesCounter.increment(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java index aab9dbb53001e..83619c7770984 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java @@ -38,7 +38,7 @@ public ProducerMetrics(InstrumentProvider ip, String topic) { rpcLatencyHistogram = ip.newLatencyHistogram( "pulsar.client.producer.rpc.send.duration", - "Publish RPC latency experienced internally by the client when sending data to receiving an ack", + "Publish RPC latency experienced internally by the client when sending data and receiving an ack", topic, Attributes.empty()); publishedBytesCounter = ip.newCounter( @@ -53,7 +53,7 @@ public ProducerMetrics(InstrumentProvider ip, String topic) { pendingBytesUpDownCounter = ip.newUpDownCounter( "pulsar.client.producer.message.pending.size", Unit.Bytes, - "The size of the messages in the producer internal queue, waiting to sent", + "The size of the messages in the producer internal queue, waiting to be sent", topic, Attributes.empty()); producersOpenedCounter = ip.newCounter( @@ -85,6 +85,14 @@ public void recordSendFailed(long latencyNanos, int msgSize) { sendLatencyHistogram.recordFailure(latencyNanos); } + public void recordRpcLatencySuccess(long latencyNanos) { + rpcLatencyHistogram.recordSuccess(latencyNanos); + } + + public void recordRpcLatencyFailure(long latencyNanos) { + rpcLatencyHistogram.recordFailure(latencyNanos); + } + public void recordProducerOpened() { producersOpenedCounter.increment(); } @@ -92,8 +100,4 @@ public void recordProducerOpened() { public void recordProducerClosed() { producersClosedCounter.increment(); } - - public LatencyHistogram getRpcLatencyHistogram() { - return rpcLatencyHistogram; - } } From 59006b9b4e682be600917caab933735ea420bc30 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 17:20:05 +0800 Subject: [PATCH 5/8] fix test --- .../pulsar/client/impl/metrics/ConsumerMetricsTest.java | 2 +- .../pulsar/client/impl/metrics/ProducerMetricsTest.java | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java index c2ea2afd5f60f..e0ebb77624248 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ConsumerMetricsTest.java @@ -114,7 +114,7 @@ public void testRecordNack() { @Test public void testRecordDlq() { - consumerMetrics.recordDlq(); + consumerMetrics.recordDlqMessageSent(); Collection metrics = metricReader.collectAllMetrics(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java index a67d785ab7a16..3c32ec87e41c1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/ProducerMetricsTest.java @@ -109,11 +109,9 @@ public void testRecordSendFailed() { } @Test - public void testRpcLatencyHistogramIsAccessible() { - assertNotNull(producerMetrics.getRpcLatencyHistogram()); - - producerMetrics.getRpcLatencyHistogram().recordSuccess(TimeUnit.MILLISECONDS.toNanos(3)); - producerMetrics.getRpcLatencyHistogram().recordFailure(TimeUnit.MILLISECONDS.toNanos(1)); + public void testRpcLatency() { + producerMetrics.recordRpcLatencySuccess(TimeUnit.MILLISECONDS.toNanos(3)); + producerMetrics.recordRpcLatencyFailure(TimeUnit.MILLISECONDS.toNanos(1)); Collection metrics = metricReader.collectAllMetrics(); assertHistogramCount(metrics, "pulsar.client.producer.rpc.send.duration", 2); From 46bd0c05f5ba23f565705c88762d5074a1f064f2 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 17:26:34 +0800 Subject: [PATCH 6/8] fix test --- .../org/apache/pulsar/client/impl/OpSendMsgQueueTest.java | 4 ++-- .../org/apache/pulsar/client/impl/ProducerImplTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java index 28787568dd1d5..d466abbf413b1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java @@ -24,7 +24,7 @@ import com.google.common.collect.Lists; import java.util.Arrays; import java.util.Iterator; -import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -41,7 +41,7 @@ public void createMockMessage() { } private ProducerImpl.OpSendMsg createDummyOpSendMsg() { - return ProducerImpl.OpSendMsg.create(LatencyHistogram.NOOP, message, null, 0L, null); + return ProducerImpl.OpSendMsg.create((ProducerMetrics) null, message, null, 0L, null); } @Test diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index 0fe177b775cbd..c9315d00e31fb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -29,7 +29,7 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.ByteBufPair; import org.mockito.Mockito; @@ -46,7 +46,7 @@ public void testChunkedMessageCtxDeallocate() { for (int i = 0; i < totalChunks; i++) { ProducerImpl.OpSendMsg opSendMsg = ProducerImpl.OpSendMsg.create( - LatencyHistogram.NOOP, + (ProducerMetrics) null, MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null), null, 0, null); opSendMsg.chunkedMessageCtx = ctx; @@ -100,7 +100,7 @@ public void testFailPendingMessagesSyncRetry() MessageImpl msg = Mockito.mock(MessageImpl.class); Mockito.when(msg.getUncompressedSize()).thenReturn(10); ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create( - Mockito.mock(LatencyHistogram.class), + (ProducerMetrics) null, msg, Mockito.mock(ByteBufPair.class), 1L, @@ -118,7 +118,7 @@ public void testFailPendingMessagesSyncRetry() Mockito.doAnswer(invocation -> { // Reentrant retry during callback ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create( - Mockito.mock(LatencyHistogram.class), + (ProducerMetrics) null, retryMsg, Mockito.mock(ByteBufPair.class), 2L, From 38c3355304b63a464148d778092cce20cc12b0bb Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 11 Mar 2026 23:48:56 +0800 Subject: [PATCH 7/8] opti test and add javadoc --- .../pulsar/client/impl/metrics/ConsumerMetrics.java | 5 +++++ .../pulsar/client/impl/metrics/ProducerMetrics.java | 5 +++++ .../apache/pulsar/client/impl/OpSendMsgQueueTest.java | 6 +++++- .../apache/pulsar/client/impl/ProducerImplTest.java | 10 +++++++--- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java index d2380a8879881..ee02bc25478bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java @@ -20,6 +20,11 @@ import io.opentelemetry.api.common.Attributes; +/** + * Encapsulates OpenTelemetry metrics for a Pulsar consumer, tracking message receives, + * prefetch queue size, acknowledgements, negative acknowledgements, dead-letter queue + * messages, and consumer session open/close events. + */ public class ConsumerMetrics { private final Counter messagesReceivedCounter; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java index 83619c7770984..09a8ac4f6a426 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java @@ -20,6 +20,11 @@ import io.opentelemetry.api.common.Attributes; +/** + * Encapsulates OpenTelemetry metrics for a Pulsar producer, tracking message send latency, + * RPC latency, published bytes, pending queue message count and size, and producer session + * open/close events. + */ public class ProducerMetrics { private final LatencyHistogram sendLatencyHistogram; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java index d466abbf413b1..f06820b9d8e76 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import java.util.Arrays; import java.util.Iterator; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -32,6 +33,9 @@ * Contains unit tests for ProducerImpl.OpSendMsgQueue inner class. */ public class OpSendMsgQueueTest { + private static final ProducerMetrics NOOP_PRODUCER_METRICS = + new ProducerMetrics(InstrumentProvider.NOOP, "test-topic"); + MessageImpl message; @BeforeClass @@ -41,7 +45,7 @@ public void createMockMessage() { } private ProducerImpl.OpSendMsg createDummyOpSendMsg() { - return ProducerImpl.OpSendMsg.create((ProducerMetrics) null, message, null, 0L, null); + return ProducerImpl.OpSendMsg.create(NOOP_PRODUCER_METRICS, message, null, 0L, null); } @Test diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index c9315d00e31fb..8096fb333e285 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.ByteBufPair; @@ -36,6 +37,9 @@ import org.testng.annotations.Test; public class ProducerImplTest { + private static final ProducerMetrics NOOP_PRODUCER_METRICS = + new ProducerMetrics(InstrumentProvider.NOOP, "test-topic"); + @Test public void testChunkedMessageCtxDeallocate() { int totalChunks = 3; @@ -46,7 +50,7 @@ public void testChunkedMessageCtxDeallocate() { for (int i = 0; i < totalChunks; i++) { ProducerImpl.OpSendMsg opSendMsg = ProducerImpl.OpSendMsg.create( - (ProducerMetrics) null, + NOOP_PRODUCER_METRICS, MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null), null, 0, null); opSendMsg.chunkedMessageCtx = ctx; @@ -100,7 +104,7 @@ public void testFailPendingMessagesSyncRetry() MessageImpl msg = Mockito.mock(MessageImpl.class); Mockito.when(msg.getUncompressedSize()).thenReturn(10); ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create( - (ProducerMetrics) null, + NOOP_PRODUCER_METRICS, msg, Mockito.mock(ByteBufPair.class), 1L, @@ -118,7 +122,7 @@ public void testFailPendingMessagesSyncRetry() Mockito.doAnswer(invocation -> { // Reentrant retry during callback ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create( - (ProducerMetrics) null, + NOOP_PRODUCER_METRICS, retryMsg, Mockito.mock(ByteBufPair.class), 2L, From df550117cd3f8be884dc25b95c7a44746525ca04 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Wed, 25 Mar 2026 14:32:49 +0800 Subject: [PATCH 8/8] opti test and add javadoc --- .../org/apache/pulsar/client/impl/MessageChecksumTest.java | 4 ++-- .../org/apache/pulsar/client/impl/MessageChunkingTest.java | 4 ++-- .../apache/pulsar/client/impl/metrics/ProducerMetrics.java | 3 +++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java index 61e09664ea1e9..4ee6dded50cdc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java @@ -36,7 +36,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; -import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.protocol.ByteBufPair; @@ -193,7 +193,7 @@ public void testTamperingMessageIsDetected() throws Exception { // WHEN // protocol message is created with checksum ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, msgMetadata, payload); - OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, + OpSendMsg op = OpSendMsg.create(ProducerMetrics.NOOP, (MessageImpl) msgBuilder.getMessage(), cmd, 1, null); // THEN diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index abe92d062544c..fa443d66850eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -56,7 +56,7 @@ import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageImpl.SchemaState; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; -import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.ProducerMetrics; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.protocol.ByteBufPair; @@ -465,7 +465,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{ ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload); MessageImpl msgImpl = ((MessageImpl) msg.getMessage()); msgImpl.setSchemaState(SchemaState.Ready); - OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, msgImpl, cmd, 1, null); + OpSendMsg op = OpSendMsg.create(ProducerMetrics.NOOP, msgImpl, cmd, 1, null); producer.processOpSendMsg(op); retryStrategically((test) -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java index 09a8ac4f6a426..19bcb2137f3c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java @@ -27,6 +27,9 @@ */ public class ProducerMetrics { + // Used for tests + public static final ProducerMetrics NOOP = new ProducerMetrics(InstrumentProvider.NOOP, null); + private final LatencyHistogram sendLatencyHistogram; private final LatencyHistogram rpcLatencyHistogram; private final Counter publishedBytesCounter;