diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7e3520dda426b..586e5d83d2f08 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1738,6 +1738,23 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece private String topicPoliciesServiceClassName = "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService"; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Timeout in seconds for initializing the topic policies cache from the system topic. " + + "If the initialization does not complete within this time, it will be retried up to " + + "topicPoliciesCacheInitMaxRetries times. Set to 0 to disable the timeout (not recommended). " + + "Default is 300 seconds (5 minutes)." + ) + private long topicPoliciesCacheInitTimeoutSeconds = 300; + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Maximum number of retries for initializing the topic policies cache after a timeout. " + + "After all retries are exhausted, the namespace bundles will be unloaded from this broker " + + "so that they can be reassigned to a different broker. Default is 3." + ) + private int topicPoliciesCacheInitMaxRetries = 3; + @FieldContext( category = CATEGORY_SERVER, doc = "List of interceptors for entry metadata.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index c3d88b9c7237d..92bb58627e24e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -24,6 +24,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; +import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -31,13 +32,17 @@ import java.util.Objects; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.commons.lang3.mutable.Mutable; @@ -78,6 +83,18 @@ */ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService { + private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = Counter.build( + "pulsar_topic_policies_cache_init_failures_total", + "Total number of topic policies cache initialization failures after all retries exhausted") + .labelNames("namespace") + .register(); + + private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( + "pulsar_topic_policies_cache_init_timeouts_total", + "Total number of topic policies cache initialization timeouts (including retried attempts)") + .labelNames("namespace") + .register(); + private final PulsarService pulsarService; private final HashSet localCluster; private final String clusterName; @@ -581,29 +598,18 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { CompletableFuture existingFuture = policyCacheInitMap.putIfAbsent(namespace, initNamespacePolicyFuture); if (existingFuture == null) { - final CompletableFuture> readerCompletableFuture = - newReader(namespace); - readerCompletableFuture - .thenCompose(reader -> { - final CompletableFuture stageFuture = new CompletableFuture<>(); - initPolicesCache(reader, stageFuture); - return stageFuture - // Read policies in background - .thenAccept(__ -> readMorePoliciesAsync(reader)); - }).thenApply(__ -> { + int maxRetries = pulsarService.getConfiguration() + .getTopicPoliciesCacheInitMaxRetries(); + initPoliciesCacheWithTimeoutAndRetry(namespace, maxRetries) + .thenApply(__ -> { initNamespacePolicyFuture.complete(null); return null; }).exceptionally(ex -> { try { - if (readerCompletableFuture.isCompletedExceptionally()) { - log.error("[{}] Failed to create reader on __change_events topic", - namespace, ex); - initNamespacePolicyFuture.completeExceptionally(ex); - cleanPoliciesCacheInitMap(namespace, true); - } else { - initNamespacePolicyFuture.completeExceptionally(ex); - cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex)); - } + log.error("[{}] Failed to initialize topic policies cache", + namespace, ex); + initNamespacePolicyFuture.completeExceptionally(ex); + cleanPoliciesCacheInitMap(namespace, true); } catch (Throwable cleanupEx) { // Adding this catch to avoid break callback chain log.error("[{}] Failed to cleanup reader on __change_events topic", @@ -619,6 +625,123 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { }); } + /** + * Initializes the topic policies cache with timeout and retry support. + * On each attempt, a new reader is created, and {@link #initPolicesCache} is called with a timeout. + * If the initialization times out, the reader is closed and a new attempt is made. + * After all retries are exhausted, namespace bundles are unloaded from this broker so they can be + * reassigned to a different broker. + * + * @param namespace the namespace to initialize policies for + * @param retriesLeft number of retries remaining + * @return a future that completes when initialization succeeds or fails after all retries + */ + private CompletableFuture initPoliciesCacheWithTimeoutAndRetry(NamespaceName namespace, int retriesLeft) { + if (closed.get()) { + return CompletableFuture.failedFuture( + new BrokerServiceException(getClass().getName() + " is closed.")); + } + + long timeoutSeconds = pulsarService.getConfiguration().getTopicPoliciesCacheInitTimeoutSeconds(); + final CompletableFuture> readerFuture = newReader(namespace); + + CompletableFuture attempt = readerFuture.thenCompose(reader -> { + final CompletableFuture stageFuture = new CompletableFuture<>(); + initPolicesCache(reader, stageFuture); + + CompletableFuture timedFuture = timeoutSeconds > 0 + ? stageFuture.orTimeout(timeoutSeconds, TimeUnit.SECONDS) + : stageFuture; + + return timedFuture.thenAccept(__ -> readMorePoliciesAsync(reader)); + }); + + return attempt + .thenApply(v -> CompletableFuture.completedFuture(v)) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof TimeoutException) { + TOPIC_POLICIES_CACHE_INIT_TIMEOUTS.labels(namespace.toString()).inc(); + // Close the stuck reader and remove from cache so a new one can be created + closeAndRemoveReaderForNamespace(namespace); + + if (retriesLeft > 0) { + log.warn("[{}] Topic policies cache initialization timed out after {}s. " + + "Retrying... ({} retries left)", + namespace, timeoutSeconds, retriesLeft); + return initPoliciesCacheWithTimeoutAndRetry(namespace, retriesLeft - 1); + } else { + log.error("[{}] Topic policies cache initialization failed after all retries " + + "(timed out after {}s per attempt). Unloading namespace bundles " + + "from this broker.", + namespace, timeoutSeconds); + TOPIC_POLICIES_CACHE_INIT_FAILURES.labels(namespace.toString()).inc(); + unloadNamespaceBundlesAsync(namespace); + return CompletableFuture.failedFuture( + new BrokerServiceException( + "Topic policies cache initialization failed after all retries " + + "for namespace " + namespace)); + } + } + // For non-timeout exceptions (e.g. reader creation failure), propagate directly + return CompletableFuture.failedFuture(cause); + }) + .thenCompose(Function.identity()); + } + + /** + * Closes and removes the reader for the given namespace from the reader cache. + * This is used during retry to ensure a fresh reader is created on the next attempt. + */ + private void closeAndRemoveReaderForNamespace(NamespaceName namespace) { + CompletableFuture> readerFuture = readerCaches.remove(namespace); + if (readerFuture != null && !readerFuture.isCompletedExceptionally()) { + readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync) + .exceptionally(closeEx -> { + log.warn("[{}] Failed to close reader during retry cleanup", namespace, closeEx); + return null; + }); + } + } + + /** + * Unloads all namespace bundles belonging to the given namespace from this broker. + * This is called as a last resort when topic policies cache initialization fails after all retries, + * allowing the bundles to be reassigned to a different broker. + */ + private void unloadNamespaceBundlesAsync(NamespaceName namespace) { + try { + NamespaceService namespaceService = pulsarService.getNamespaceService(); + if (namespaceService == null) { + log.warn("[{}] Cannot unload namespace bundles: NamespaceService is not available", namespace); + return; + } + Set ownedBundles = namespaceService.getOwnedServiceUnits(); + List bundlesForNamespace = ownedBundles.stream() + .filter(bundle -> namespace.equals(bundle.getNamespaceObject())) + .collect(Collectors.toList()); + + if (bundlesForNamespace.isEmpty()) { + log.info("[{}] No owned bundles found to unload for namespace", namespace); + return; + } + + log.warn("[{}] Unloading {} namespace bundles due to topic policies cache init failure", + namespace, bundlesForNamespace.size()); + for (NamespaceBundle bundle : bundlesForNamespace) { + namespaceService.unloadNamespaceBundle(bundle) + .exceptionally(ex -> { + log.error("[{}] Failed to unload bundle {} after topic policies cache init failure", + namespace, bundle, ex); + return null; + }); + } + } catch (Exception e) { + log.error("[{}] Error while attempting to unload namespace bundles after topic policies " + + "cache init failure", namespace, e); + } + } + private CompletableFuture> newReader(NamespaceName ns) { return readerCaches.compute(ns, (__, existingFuture) -> { if (existingFuture == null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 6b050e8b421c6..5afe3c5deddb6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; @@ -623,7 +624,7 @@ public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro // make sure not do cleanPoliciesCacheInitMap() twice // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap() is 1. boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> - logEvent.getMessage().toString().contains("Failed to create reader on __change_events topic")); + logEvent.getMessage().toString().contains("Failed to initialize topic policies cache")); assertTrue(logFound); boolean logFound2 = testLogAppender.getEvents().stream().anyMatch(logEvent -> logEvent.getMessage().toString().contains("Failed to check the move events for the system topic") @@ -631,4 +632,140 @@ public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro assertFalse(logFound2); verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), anyBoolean()); } + + @Test + public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws Exception { + @Cleanup + TestLogAppender testLogAppender = TestLogAppender.create(log); + + pulsar.getTopicPoliciesService().close(); + // Set a very short timeout and allow 2 retries + conf.setTopicPoliciesCacheInitTimeoutSeconds(1); + conf.setTopicPoliciesCacheInitMaxRetries(2); + + SystemTopicBasedTopicPoliciesService spyService = + Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); + FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); + + admin.namespaces().createNamespace(NAMESPACE5); + final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(topic, 1); + + // Create a reader that never completes hasMoreEventsAsync (simulates a stuck reader) + SystemTopicClient.Reader mockReader = Mockito.mock(SystemTopicClient.Reader.class); + SystemTopicClient mockSystemTopic = Mockito.mock(SystemTopicClient.class); + TopicName changeEventsTopic = TopicName.get("persistent://" + NAMESPACE5 + "/__change_events"); + Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic); + Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic); + // First call: never complete (will timeout). Second call: return false (no more events) + CompletableFuture neverCompleteFuture = new CompletableFuture<>(); + Mockito.when(mockReader.hasMoreEventsAsync()) + .thenReturn(neverCompleteFuture) + .thenReturn(CompletableFuture.completedFuture(false)); + Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + // Put the mock reader in reader cache + ConcurrentHashMap>> + spyReaderCaches = new ConcurrentHashMap<>(); + spyReaderCaches.put(NamespaceName.get(NAMESPACE5), CompletableFuture.completedFuture(mockReader)); + FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true); + + // On retry (after the stuck reader is removed), create a real reader + Mockito.doAnswer(invocation -> { + NamespaceName ns = invocation.getArgument(0); + // Return a real reader for the retry + return spyReaderCaches.compute(ns, (k, v) -> { + if (v == null) { + return CompletableFuture.completedFuture(mockReader); + } + return v; + }); + }).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5)); + + CompletableFuture prepareFuture = + spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5)); + + // The first attempt times out, the second attempt should succeed (since hasMoreEventsAsync + // returns false on second call) + try { + prepareFuture.get(30, TimeUnit.SECONDS); + } catch (Exception e) { + // Retry may or may not succeed depending on mock setup; the important thing is + // the timeout was detected + } + + // Verify that the timeout was detected and retry was attempted + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + boolean timeoutLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains( + "Topic policies cache initialization timed out")); + assertTrue(timeoutLogFound); + }); + + // Reset config + conf.setTopicPoliciesCacheInitTimeoutSeconds(300); + conf.setTopicPoliciesCacheInitMaxRetries(3); + } + + @Test + public void testInitPoliciesCacheTimeoutExhaustsRetries() throws Exception { + @Cleanup + TestLogAppender testLogAppender = TestLogAppender.create(log); + + pulsar.getTopicPoliciesService().close(); + // Set a very short timeout and 0 retries so it fails immediately after first timeout + conf.setTopicPoliciesCacheInitTimeoutSeconds(1); + conf.setTopicPoliciesCacheInitMaxRetries(0); + + SystemTopicBasedTopicPoliciesService spyService = + Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); + FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); + + admin.namespaces().createNamespace(NAMESPACE5); + + // Create a reader that never completes hasMoreEventsAsync (simulates a stuck reader) + SystemTopicClient.Reader mockReader = Mockito.mock(SystemTopicClient.Reader.class); + SystemTopicClient mockSystemTopic = Mockito.mock(SystemTopicClient.class); + TopicName changeEventsTopic = TopicName.get("persistent://" + NAMESPACE5 + "/__change_events"); + Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic); + Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic); + Mockito.when(mockReader.hasMoreEventsAsync()).thenReturn(new CompletableFuture<>()); + Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + ConcurrentHashMap>> + spyReaderCaches = new ConcurrentHashMap<>(); + spyReaderCaches.put(NamespaceName.get(NAMESPACE5), CompletableFuture.completedFuture(mockReader)); + FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true); + + CompletableFuture prepareFuture = + spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5)); + + try { + prepareFuture.get(30, TimeUnit.SECONDS); + Assert.fail("Should have failed after retries exhausted"); + } catch (ExecutionException e) { + assertTrue(e.getCause().getMessage().contains( + "Topic policies cache initialization failed after all retries")); + } + + // Verify the failure log was emitted + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + boolean failureLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains( + "Topic policies cache initialization failed after all retries")); + assertTrue(failureLogFound); + }); + + // Verify that the unloading log was emitted (may be "No owned bundles" or "Unloading") + boolean unloadLogFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> { + String msg = logEvent.getMessage().toString(); + return msg.contains("Unloading") && msg.contains("namespace bundles") + || msg.contains("No owned bundles found to unload"); + }); + assertTrue(unloadLogFound); + + // Reset config + conf.setTopicPoliciesCacheInitTimeoutSeconds(300); + conf.setTopicPoliciesCacheInitMaxRetries(3); + } }