Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
return;
}

InFlightTask inFlightTask = (InFlightTask) ctx;
inFlightTask.setEntries(Collections.emptyList());

// Reduce read batch size to avoid flooding bookies with retries
readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -642,25 +640,40 @@ public void testReplicatePeekAndSkip() throws Exception {
@Test(timeOut = 30000)
public void testReplicatorClearBacklog() throws Exception {

// This test is to verify that reset cursor fails on global topic
SortedSet<String> testDests = new TreeSet<>();

final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
testDests.add(dest.toString());

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);

@Cleanup
MessageConsumer consumer1 = new MessageConsumer(url3, dest);

// Produce from cluster1 and consume from the rest
// Produce from cluster1 to trigger topic and replicator creation
producer1.produce(2);

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators()
.get(topic.getReplicators().keySet().stream().toList().get(0));

// Wait until the first batch of messages is fully replicated (backlog = 0),
// so that disconnect() won't be rejected due to non-zero backlog
Awaitility.await().untilAsserted(() -> {
assertEquals(replicator.getNumberOfEntriesInBacklog(), 0);
});

// Disconnect replicator to stop geo-replication, so new messages will form backlog
pauseReplicator(replicator);

// Produce more messages while replication is paused, these will accumulate as backlog
producer1.produce(2);

// wait for backlog to accumulate
Awaitility.await().untilAsserted(() -> {
assertEquals(replicator.getNumberOfEntriesInBacklog(), 2);
});

// Clear the backlog
replicator.clearBacklog().get();
Thread.sleep(100);
replicator.updateRates(); // for code-coverage
Expand All @@ -672,25 +685,39 @@ public void testReplicatorClearBacklog() throws Exception {
@Test(timeOut = 30000)
public void testReplicatorExpireMsgAsync() throws Exception {

// This test is to verify that reset cursor fails on global topic
SortedSet<String> testDests = new TreeSet<>();

final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
testDests.add(dest.toString());

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);

@Cleanup
MessageConsumer consumer1 = new MessageConsumer(url3, dest);

// Produce from cluster1 and consume from the rest
// Produce from cluster1 to trigger topic and replicator creation
producer1.produce(2);

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators()
.get(topic.getReplicators().keySet().stream().toList().get(0));

// Wait until the first batch of messages is fully replicated (backlog = 0),
// so that disconnect() won't be rejected due to non-zero backlog
Awaitility.await().untilAsserted(() -> {
assertEquals(replicator.getNumberOfEntriesInBacklog(), 0);
});

// Disconnect replicator to stop geo-replication, so new messages will form backlog
pauseReplicator(replicator);

// Produce more messages while replication is paused, these will accumulate as backlog
producer1.produce(2);

// wait for backlog to accumulate
Awaitility.await().untilAsserted(() -> {
assertEquals(replicator.getNumberOfEntriesInBacklog(), 2);
});

// Clear the backlog
replicator.clearBacklog().get();
Thread.sleep(100);
replicator.updateRates(); // for code-coverage
Expand Down Expand Up @@ -1653,7 +1680,7 @@ private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace,
@Test
public void testReplicatorWithFailedAck() throws Exception {

log.info("--- Starting ReplicatorTest::testReplication ---");
log.info("--- Starting ReplicatorTest::testReplicatorWithFailedAck ---");

String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
Expand Down Expand Up @@ -1686,14 +1713,28 @@ public void testReplicatorWithFailedAck() throws Exception {
}).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class),
Mockito.any());

// Mock the readEntriesFailed scenario:
// Use AtomicBoolean to control whether to trigger read failure
// Initialized to true to ensure the first readMoreEntries after replicator startup is intercepted.
AtomicBoolean isMakeReadFail = new AtomicBoolean(true);
doAnswer(invocation -> {
if (isMakeReadFail.get()) {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(2);
Object ctx = invocation.getArgument(3);
log.info("asyncReadEntriesOrWait will be failed");
callback.readEntriesFailed(new ManagedLedgerException("Mocked read failure"), ctx);
return null;
} else {
log.info("asyncReadEntriesOrWait will proceed normally");
return invocation.callRealMethod();
}
}).when(spyCursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(),
Mockito.any(AsyncCallbacks.ReadEntriesCallback.class), Mockito.any(), Mockito.any(Position.class));

Comment on lines +1716 to +1733
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a new test case? These tests might not be the best possible example of good test style. If you are able to create a new test class with a different style and minimal mocking, that would be preferred (such as using pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(...) to inject failures/delays).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test case, mocking is mandatory, a precise triggering of the repair scenario is needed to determine if the problem has been resolved; otherwise, the system will continue running normally without triggering the case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean with "a precise triggering of the repair scenario is needed to determine if the problem has been resolved" exactly? What is the "precise triggering of the repair scenario"? What is required for triggering it?

If one of the challenges is to check logs in the test to validate behavior, it's possible to use org.apache.pulsar.utils.TestLogAppender for capturing logs in a test case.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the test cases to call readEntriesFailed() to ensure that the changes are executed and to confirm that the tests are running as expected.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the test cases to call readEntriesFailed() to ensure that the changes are executed and to confirm that the tests are running as expected.

That doesn't yet explain what triggers the problem in actual production code. With pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(...) you can inject failures that would result in readEntriesFailed to be called. It's possible to then validate whether you can reproduce the issue by checking the logs for "Not scheduling read due to pending read or no permits." with TestLogAppender. This would be a true integration test that reproduces the actual problem. I would assume that Claude Code with Claude Opus 4.6 could achieve this just by providing the PR as context (instructed to fetch all PR details including comments) to create a representative test to reproduce the issue before creating a fix.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Copy Markdown
Author

@gosonzhang gosonzhang Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed your suggestion, but I still believe the current modification is more appropriate.

Although I have used tools to construct a getMockBookKeeper scenario to simulate exceptions, and kept the Pulsar dependencies unmocked, the changes to the ReplicatorTestBasefile are particularly extensive. From the perspective of addressing the issue, I think that absolutely avoiding mocking within Pulsar's logic deviates from the original intent of the test case.

This modification primarily addresses the issue where the PersistentReplicator.readEntriesFailed()implementation did not assign a value to its corresponding InFlightTask. This omission could affect the proper execution of geo-replication.

A Bookie exception is just one of many scenarios that could lead to the invocation of readEntriesFailed(), as long as any test case can simulate the execution of readEntriesFailed()and confirm that the modification is correct, that should be sufficient.

log.info("--- Starting producer --- " + url1);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"), false);
// Produce from cluster1 and consume from the rest
producer1.produce(2);

MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId());

// Wait for replicator to start
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
.ignoreExceptions()
.untilAsserted(() -> {
Expand All @@ -1703,9 +1744,33 @@ public void testReplicatorWithFailedAck() throws Exception {
replicator.getState());
});

// Make sure all the data has replicated to the remote cluster before close the cursor.
Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
// --- Test readEntriesFailed scenario ---
// isMakeReadFail is already true, replicator's readMoreEntries keeps failing
// Record current mark delete position
Position posBeforeReadFail = cursor.getMarkDeletedPosition();

// Produce messages; since reads keep failing, messages cannot be replicated
producer1.produce(2);

MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId());

// During 2 seconds of continuous read failure, mark delete position should not advance
Awaitility.await()
.during(2, TimeUnit.SECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), posBeforeReadFail));

// Disable the read failure flag; replicator will read normally on retry, thus resuming replication
isMakeReadFail.set(false);

// Wait for replicator to recover from read failure and complete replication
// (mark delete catches up to the latest position)
Awaitility.await().timeout(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(cursor.getMarkDeletedPosition(), lastPosition);
});

// --- Test DeleteCallback scenario ---
isMakeAckFail.set(true);

producer1.produce(10);
Expand Down
Loading