[fix] [broker] Remove the repl cursor if the creation fails and the replication is turned off#20025
Conversation
…e cluster is absent
|
|
||
| if (topicLevelPolicy) { | ||
| admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); | ||
| admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); |
There was a problem hiding this comment.
Why did you add another cluster "test" to this test? Even without the change of PersistentTopic, this test could still pass.
There was a problem hiding this comment.
- If we disabled replication, the
clusterswill contain one clustertest. See: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java#L55-L56 - If we enabled replication, the
clusterswill contain two clusters: local cluster and remote cluster
If the cluster is not set correctly, it can mislead others: clusters is an empty collection when Replication is disabled.
There was a problem hiding this comment.
I think it's another topic about the difference determined by whether the replication is enabled. It does not affect this test.
There was a problem hiding this comment.
I think it's another topic about the difference determined by whether the replication is enabled. It does not affect this test.
Correct.
There was a problem hiding this comment.
So, it's better to revert this unrelated change.
There was a problem hiding this comment.
So @poorbarcode Is there any other reason to add the local cluster here? It might be misleading that adding this change could make test better.
There was a problem hiding this comment.
reverted thse changes
|
It's still flaky. |
|
Could you also change the title? The PR title is duplicated with #19972. |
| Awaitility.await().atMost(10, TimeUnit.SECONDS) | ||
| .until(() -> { | ||
| topic.initialize().get(3, TimeUnit.SECONDS); |
There was a problem hiding this comment.
It's meaningless because topic.initialize might throw an exception and it won't be retried.
java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.pulsar.metadata.api.MetadataStoreException$NotFoundException: remote
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
at org.apache.pulsar.broker.service.persistent.PersistentTopicTest.lambda$testCreateTopicWithZombieReplicatorCursor$10(PersistentTopicTest.java:602)
at org.awaitility.core.CallableCondition$ConditionEvaluationWrapper.eval(CallableCondition.java:99)
There was a problem hiding this comment.
perhaps .ignoreExceptionsInstanceOf(MetadataStoreException.NotFoundException.class) could help?
There was a problem hiding this comment.
BTW, the root cause is not here. I added a try-catch block to swallow the exception, the test could still fail.
try {
topic.initialize().get(3, TimeUnit.SECONDS);
} catch (ExecutionException e) {
log.warn("Failed to initialize: {}", e.getCause().getMessage());
}There was a problem hiding this comment.
BTW, the root cause is not here. I added a try-catch block to swallow the exception, the test could still fail.
The root cause is like this:
- a replication task starting
- failed to get cluster meta from ZK
- check replication policies
- Since the policy is updated asynchronously, the cache has not been updated.
- get clusters( responses
[local_cluster, remote_cluster])
- Since check replication successes, will not delete the cursor. bingo
There was a problem hiding this comment.
We don't need to guarantee that the cursor will be deleted the first time because the client will try to reload the topic. We just need to guarantee that the cursor will be deleted eventually. So I did this change:
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.until(() -> {
topic.initialize().get(3, TimeUnit.SECONDS);
return !topic.getManagedLedger().getCursors().iterator().hasNext();
});topic.initialize().get(3, TimeUnit.SECONDS); just mock the load operation by client, and verify that the cursor will be deleted eventually.
There was a problem hiding this comment.
just mock the load operation by client,
Actually, the operation failed at topic.initialize in my local env. An exception will be thrown and it won't retry.
There was a problem hiding this comment.
See step 4 above, it goes to the logic branch else, and the error is thrown.
There was a problem hiding this comment.
I have change the test to Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(, it will be fixed
Codecov Report
@@ Coverage Diff @@
## master #20025 +/- ##
=============================================
+ Coverage 33.17% 72.81% +39.64%
- Complexity 12158 31601 +19443
=============================================
Files 1499 1860 +361
Lines 113832 137406 +23574
Branches 12368 15131 +2763
=============================================
+ Hits 37769 100059 +62290
+ Misses 71143 29381 -41762
- Partials 4920 7966 +3046
Flags with carried forward coverage won't be shown. Click here to find out more.
|
| replicationStartFuture.complete(null); | ||
| return removeReplicator(remoteCluster).whenComplete((ignore2, remoteCursorEx) -> { |
There was a problem hiding this comment.
why is the replicationStartFuture completed before removing the replicator?
There was a problem hiding this comment.
Already fixed. Thanks
| try { | ||
| admin.topics().setReplicationClusters(topicName, Arrays.asList(remoteCluster, "test")); | ||
| return true; | ||
| } catch (Exception ex) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
You could use Awaitility.await().ignoreExceptions().untilAsserted( instead of the try / ... / return true / catch Exception ... return false solution.
There was a problem hiding this comment.
Good idea. Thanks, already change to Awaitility.await().ignoreExceptions().untilAsserted
|
@poorbarcode Could you also change the title? The PR title is duplicated with #19972. |
Done |
|
Here is a failure from my local env: failure.log I see many |
| }).whenComplete((ignore, ex) -> { | ||
| if (ex == null){ | ||
| replicationStartFuture.complete(null); | ||
| } else { |
There was a problem hiding this comment.
It's better to print the ex warning here.
|
After discussing with @poorbarcode offline, let's move to #20037 to fix the flaky test. |
|
As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label |



Fixes #20010
Motivation
The Cursor
replis automatically created when replication is turned on and it will be deleted when replication is turned off. But there has a bug: if the Topic is not online when you turn off replication, the cursor cannot be deleted, and #19972 fixes this bug.But #19972 cannot solve the following scenario that cause the flaky test #20010:
[local_cluster, remote_cluster])replremote_clusteris removed, and the remote cluster is down now.start replicatorwill fail, but the cursor still thereModifications
Differing with #19972, do remove the
replcursor when the taskstart replicatoris failed.Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: