Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -886,30 +885,31 @@ public void testMsgDropStat() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(threads);
byte[] msgData = "testData".getBytes();

NonPersistentTopic topic =
(NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

/*
* Trigger at least one publisher drop through concurrent send() calls.
* Send concurrent bursts until publisher AND subscription drop rates are all > 0.
*
* Each burst uses a CyclicBarrier so all threads send simultaneously. With
* maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx drops overlapping
* sends (publisher drops). Once subscriber queues (size 1) are full, the dispatcher
* also drops delivered messages (subscription drops).
*
* Uses CyclicBarrier to ensure all threads send simultaneously, creating overlap.
* With maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx#handleSend
* drops any send while another is in-flight, returning MessageId with entryId = -1.
* Awaitility repeats whole bursts (bounded to 20s) until a drop is observed.
* IMPORTANT: updateRates() calls Rate.calculateRate() which resets counters via
* sumThenReset(). We must keep sending fresh bursts so each updateRates() call
* sees new drops, rather than retrying with stale (reset) counters.
*/
AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> {
CyclicBarrier barrier = new CyclicBarrier(threads);
CountDownLatch completionLatch = new CountDownLatch(threads);
AtomicReference<Throwable> error = new AtomicReference<>();
publisherDropSeen.set(false);

for (int i = 0; i < threads; i++) {
executor.submit(() -> {
try {
barrier.await();
MessageId msgId = producer.send(msgData);
// Publisher drop is signaled by MessageIdImpl.entryId == -1
if (msgId instanceof MessageIdImpl && ((MessageIdImpl) msgId).getEntryId() == -1) {
publisherDropSeen.set(true);
}
producer.send(msgData);
} catch (Throwable t) {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand All @@ -921,27 +921,23 @@ public void testMsgDropStat() throws Exception {
});
}

// Wait for all sends to complete.
assertTrue(completionLatch.await(20, TimeUnit.SECONDS));

assertNull(error.get(), "Concurrent send encountered an exception");
return publisherDropSeen.get();
});

assertTrue(publisherDropSeen.get(), "Expected at least one publisher drop (entryId == -1)");

NonPersistentTopic topic =
(NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
completionLatch.await(20, TimeUnit.SECONDS);
if (error.get() != null) {
return false;
}

Awaitility.await().ignoreExceptions().untilAsserted(() -> {
pulsar.getBrokerService().updateRates();
NonPersistentTopicStats stats = topic.getStats(false, false, false);
if (stats.getPublishers().isEmpty()) {
return false;
}
NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
assertTrue(npStats.getMsgDropRate() > 0);
assertTrue(sub1Stats.getMsgDropRate() > 0);
assertTrue(sub2Stats.getMsgDropRate() > 0);
return sub1Stats != null && sub2Stats != null
&& npStats.getMsgDropRate() > 0
&& sub1Stats.getMsgDropRate() > 0
&& sub2Stats.getMsgDropRate() > 0;
});

} finally {
Expand Down
Loading