diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 284e6a6892856..10919bd37c694 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -59,7 +59,6 @@ import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -886,30 +885,31 @@ public void testMsgDropStat() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(threads); byte[] msgData = "testData".getBytes(); + NonPersistentTopic topic = + (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + /* - * Trigger at least one publisher drop through concurrent send() calls. + * Send concurrent bursts until publisher AND subscription drop rates are all > 0. + * + * Each burst uses a CyclicBarrier so all threads send simultaneously. With + * maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx drops overlapping + * sends (publisher drops). Once subscriber queues (size 1) are full, the dispatcher + * also drops delivered messages (subscription drops). * - * Uses CyclicBarrier to ensure all threads send simultaneously, creating overlap. - * With maxConcurrentNonPersistentMessagePerConnection = 0, ServerCnx#handleSend - * drops any send while another is in-flight, returning MessageId with entryId = -1. - * Awaitility repeats whole bursts (bounded to 20s) until a drop is observed. + * IMPORTANT: updateRates() calls Rate.calculateRate() which resets counters via + * sumThenReset(). We must keep sending fresh bursts so each updateRates() call + * sees new drops, rather than retrying with stale (reset) counters. */ - AtomicBoolean publisherDropSeen = new AtomicBoolean(false); - Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> { + Awaitility.await().atMost(Duration.ofSeconds(20)).pollInterval(Duration.ofMillis(100)).until(() -> { CyclicBarrier barrier = new CyclicBarrier(threads); CountDownLatch completionLatch = new CountDownLatch(threads); AtomicReference error = new AtomicReference<>(); - publisherDropSeen.set(false); for (int i = 0; i < threads; i++) { executor.submit(() -> { try { barrier.await(); - MessageId msgId = producer.send(msgData); - // Publisher drop is signaled by MessageIdImpl.entryId == -1 - if (msgId instanceof MessageIdImpl && ((MessageIdImpl) msgId).getEntryId() == -1) { - publisherDropSeen.set(true); - } + producer.send(msgData); } catch (Throwable t) { if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -921,27 +921,23 @@ public void testMsgDropStat() throws Exception { }); } - // Wait for all sends to complete. - assertTrue(completionLatch.await(20, TimeUnit.SECONDS)); - - assertNull(error.get(), "Concurrent send encountered an exception"); - return publisherDropSeen.get(); - }); - - assertTrue(publisherDropSeen.get(), "Expected at least one publisher drop (entryId == -1)"); - - NonPersistentTopic topic = - (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + completionLatch.await(20, TimeUnit.SECONDS); + if (error.get() != null) { + return false; + } - Awaitility.await().ignoreExceptions().untilAsserted(() -> { pulsar.getBrokerService().updateRates(); NonPersistentTopicStats stats = topic.getStats(false, false, false); + if (stats.getPublishers().isEmpty()) { + return false; + } NonPersistentPublisherStats npStats = stats.getPublishers().get(0); NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1"); NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2"); - assertTrue(npStats.getMsgDropRate() > 0); - assertTrue(sub1Stats.getMsgDropRate() > 0); - assertTrue(sub2Stats.getMsgDropRate() > 0); + return sub1Stats != null && sub2Stats != null + && npStats.getMsgDropRate() > 0 + && sub1Stats.getMsgDropRate() > 0 + && sub2Stats.getMsgDropRate() > 0; }); } finally {