Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.protocol.ByteBufPair;
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testTamperingMessageIsDetected() throws Exception {
// WHEN
// protocol message is created with checksum
ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP,
OpSendMsg op = OpSendMsg.create(ProducerMetrics.NOOP,
(MessageImpl<byte[]>) msgBuilder.getMessage(), cmd, 1, null);

// THEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.protocol.ByteBufPair;
Expand Down Expand Up @@ -465,7 +465,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{
ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
MessageImpl msgImpl = ((MessageImpl<byte[]>) msg.getMessage());
msgImpl.setSchemaState(SchemaState.Ready);
OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, msgImpl, cmd, 1, null);
OpSendMsg op = OpSendMsg.create(ProducerMetrics.NOOP, msgImpl, cmd, 1, null);
producer.processOpSendMsg(op);

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
// messageContainers before publishing this one-batch message.
op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(),
firstCallback, batchAllocatedSizeBytes);

// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
Expand Down Expand Up @@ -332,7 +332,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
}

OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
OpSendMsg op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes);

op.setNumMessagesInBatch(numMessagesInBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.FastThreadLocal;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -98,10 +97,8 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.metrics.Counter;
import org.apache.pulsar.client.impl.metrics.ConsumerMetrics;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
Expand Down Expand Up @@ -228,16 +225,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final boolean createTopicIfDoesNotExist;
private final boolean poolMessages;

private final Counter messagesReceivedCounter;
private final Counter bytesReceivedCounter;
private final UpDownCounter messagesPrefetchedGauge;
private final UpDownCounter bytesPrefetchedGauge;
private final Counter consumersOpenedCounter;
private final Counter consumersClosedCounter;
private final Counter consumerAcksCounter;
private final Counter consumerNacksCounter;

private final Counter consumerDlqMessagesCounter;
private final ConsumerMetrics consumerMetrics;

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final AtomicInteger previousExceptionCount = new AtomicInteger();
Expand Down Expand Up @@ -421,29 +409,10 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();

InstrumentProvider ip = client.instrumentProvider();
Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build();
consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
"The number of consumer sessions opened", topic, attrs);
consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
"The number of consumer sessions closed", topic, attrs);
messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages,
"The number of messages explicitly received by the consumer application", topic, attrs);
bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes,
"The number of bytes explicitly received by the consumer application", topic, attrs);
messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages,
"The number of messages currently sitting in the consumer receive queue", topic, attrs);
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
"The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);

consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
"The number of acknowledged messages", topic, attrs);
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
"The number of negatively acknowledged messages", topic, attrs);
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
"The number of messages sent to DLQ", topic, attrs);
consumerMetrics = new ConsumerMetrics(ip, topic, subscription);
grabCnx();

consumersOpenedCounter.increment();
consumerMetrics.recordConsumerOpened();
}

public ConnectionHandler getConnectionHandler() {
Expand Down Expand Up @@ -607,7 +576,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn) {
consumerAcksCounter.increment();
consumerMetrics.recordAck();

if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
Expand All @@ -630,7 +599,7 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties, TransactionImpl txn) {
consumerAcksCounter.increment();
consumerMetrics.recordAck();

if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
Expand Down Expand Up @@ -721,7 +690,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
copyMessageEventTime(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
consumerDlqMessagesCounter.increment();
consumerMetrics.recordDlqMessageSent();

doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
Expand Down Expand Up @@ -836,7 +805,7 @@ private static void copyMessageEventTime(Message<?> message,

@Override
public void negativeAcknowledge(MessageId messageId) {
consumerNacksCounter.increment();
consumerMetrics.recordNack();
negativeAcksTracker.add(messageId);

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
Expand All @@ -845,7 +814,7 @@ public void negativeAcknowledge(MessageId messageId) {

@Override
public void negativeAcknowledge(Message<?> message) {
consumerNacksCounter.increment();
consumerMetrics.recordNack();
negativeAcksTracker.add(message);

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
Expand Down Expand Up @@ -1186,7 +1155,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
return compositeCloseFuture;
}

consumersClosedCounter.increment();
consumerMetrics.recordConsumerClosed();

if (!isConnected()) {
log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription);
Expand Down Expand Up @@ -1370,8 +1339,7 @@ protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
}

private void executeNotifyCallback(final MessageImpl<T> message) {
messagesPrefetchedGauge.increment();
bytesPrefetchedGauge.add(message.size());
consumerMetrics.recordMessagePrefetched(message.size());

// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
Expand Down Expand Up @@ -1870,11 +1838,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
lastDequeuedMessageId = msg.getMessageId();

messagesPrefetchedGauge.decrement();
messagesReceivedCounter.increment();

bytesPrefetchedGauge.subtract(msg.size());
bytesReceivedCounter.add(msg.size());
consumerMetrics.recordMessageReceived(msg.size());

if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
Expand Down
Loading