Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,23 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
private String topicPoliciesServiceClassName =
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";

@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout in seconds for initializing the topic policies cache from the system topic. "
+ "If the initialization does not complete within this time, it will be retried up to "
+ "topicPoliciesCacheInitMaxRetries times. Set to 0 to disable the timeout (not recommended). "
+ "Default is 300 seconds (5 minutes)."
)
private long topicPoliciesCacheInitTimeoutSeconds = 300;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum number of retries for initializing the topic policies cache after a timeout. "
+ "After all retries are exhausted, the namespace bundles will be unloaded from this broker "
+ "so that they can be reassigned to a different broker. Default is 3."
)
private int topicPoliciesCacheInitMaxRetries = 3;

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of interceptors for entry metadata.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.commons.lang3.mutable.Mutable;
Expand Down Expand Up @@ -78,6 +83,18 @@
*/
public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService {

private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = Counter.build(
"pulsar_topic_policies_cache_init_failures_total",
"Total number of topic policies cache initialization failures after all retries exhausted")
.labelNames("namespace")
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Labeling these counters by full namespace can create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.

Copilot uses AI. Check for mistakes.
.register();

private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
Comment on lines +88 to +95
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Labeling these counters by full namespace can create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.

Suggested change
"Total number of topic policies cache initialization failures after all retries exhausted")
.labelNames("namespace")
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
"Total number of topic policies cache initialization failures after all retries exhausted, per tenant")
.labelNames("tenant")
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts), per tenant")
.labelNames("tenant")

Copilot uses AI. Check for mistakes.
.register();
Comment on lines +90 to +96
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static Counter.build(...).register() registers into the global default Prometheus registry at class-load time. In long-running brokers and especially in unit/integration tests that may load/reload components in the same JVM, this pattern can trigger duplicate-collector registration errors and makes metric lifecycle hard to control. Prefer wiring metrics through Pulsar’s existing metrics/registry facilities (or a registry owned by the broker instance) rather than static global registration in the service class.

Suggested change
.register();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
.register();
.create();
private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build(
"pulsar_topic_policies_cache_init_timeouts_total",
"Total number of topic policies cache initialization timeouts (including retried attempts)")
.labelNames("namespace")
.create();

Copilot uses AI. Check for mistakes.

private final PulsarService pulsarService;
private final HashSet<String> localCluster;
private final String clusterName;
Expand Down Expand Up @@ -581,29 +598,18 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
CompletableFuture<Void> existingFuture =
policyCacheInitMap.putIfAbsent(namespace, initNamespacePolicyFuture);
if (existingFuture == null) {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
newReader(namespace);
readerCompletableFuture
.thenCompose(reader -> {
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
initPolicesCache(reader, stageFuture);
return stageFuture
// Read policies in background
.thenAccept(__ -> readMorePoliciesAsync(reader));
}).thenApply(__ -> {
int maxRetries = pulsarService.getConfiguration()
.getTopicPoliciesCacheInitMaxRetries();
initPoliciesCacheWithTimeoutAndRetry(namespace, maxRetries)
.thenApply(__ -> {
initNamespacePolicyFuture.complete(null);
return null;
}).exceptionally(ex -> {
try {
if (readerCompletableFuture.isCompletedExceptionally()) {
log.error("[{}] Failed to create reader on __change_events topic",
namespace, ex);
initNamespacePolicyFuture.completeExceptionally(ex);
cleanPoliciesCacheInitMap(namespace, true);
} else {
initNamespacePolicyFuture.completeExceptionally(ex);
cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex));
}
log.error("[{}] Failed to initialize topic policies cache",
namespace, ex);
initNamespacePolicyFuture.completeExceptionally(ex);
cleanPoliciesCacheInitMap(namespace, true);
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes cleanup behavior from the prior logic that conditioned the boolean argument on the exception type (e.g., isAlreadyClosedException(ex)), and also previously treated reader-creation failures differently. Always passing true can alter shutdown/cleanup semantics and may cause incorrect cleanup decisions for non-close-related failures. Suggest restoring the prior decision logic (e.g., pass isAlreadyClosedException(ex) where appropriate, and keep the more specific branching for reader creation vs. later-stage failures if it impacts cleanup).

Suggested change
cleanPoliciesCacheInitMap(namespace, true);
cleanPoliciesCacheInitMap(namespace,
ex instanceof PulsarClientException.AlreadyClosedException
|| ex.getCause() instanceof PulsarClientException.AlreadyClosedException);

Copilot uses AI. Check for mistakes.
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break callback chain
log.error("[{}] Failed to cleanup reader on __change_events topic",
Expand All @@ -619,6 +625,123 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
});
}

/**
* Initializes the topic policies cache with timeout and retry support.
* On each attempt, a new reader is created, and {@link #initPolicesCache} is called with a timeout.
* If the initialization times out, the reader is closed and a new attempt is made.
* After all retries are exhausted, namespace bundles are unloaded from this broker so they can be
* reassigned to a different broker.
*
* @param namespace the namespace to initialize policies for
* @param retriesLeft number of retries remaining
* @return a future that completes when initialization succeeds or fails after all retries
*/
private CompletableFuture<Void> initPoliciesCacheWithTimeoutAndRetry(NamespaceName namespace, int retriesLeft) {
if (closed.get()) {
return CompletableFuture.failedFuture(
new BrokerServiceException(getClass().getName() + " is closed."));
}

long timeoutSeconds = pulsarService.getConfiguration().getTopicPoliciesCacheInitTimeoutSeconds();
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = newReader(namespace);

CompletableFuture<Void> attempt = readerFuture.thenCompose(reader -> {
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
initPolicesCache(reader, stageFuture);

CompletableFuture<Void> timedFuture = timeoutSeconds > 0
? stageFuture.orTimeout(timeoutSeconds, TimeUnit.SECONDS)
: stageFuture;

return timedFuture.thenAccept(__ -> readMorePoliciesAsync(reader));
});

return attempt
.thenApply(v -> CompletableFuture.completedFuture(v))
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof TimeoutException) {
TOPIC_POLICIES_CACHE_INIT_TIMEOUTS.labels(namespace.toString()).inc();
// Close the stuck reader and remove from cache so a new one can be created
closeAndRemoveReaderForNamespace(namespace);

if (retriesLeft > 0) {
log.warn("[{}] Topic policies cache initialization timed out after {}s. "
+ "Retrying... ({} retries left)",
namespace, timeoutSeconds, retriesLeft);
return initPoliciesCacheWithTimeoutAndRetry(namespace, retriesLeft - 1);
} else {
log.error("[{}] Topic policies cache initialization failed after all retries "
+ "(timed out after {}s per attempt). Unloading namespace bundles "
+ "from this broker.",
namespace, timeoutSeconds);
TOPIC_POLICIES_CACHE_INIT_FAILURES.labels(namespace.toString()).inc();
unloadNamespaceBundlesAsync(namespace);
return CompletableFuture.<Void>failedFuture(
new BrokerServiceException(
"Topic policies cache initialization failed after all retries "
+ "for namespace " + namespace));
}
}
// For non-timeout exceptions (e.g. reader creation failure), propagate directly
return CompletableFuture.<Void>failedFuture(cause);
})
.thenCompose(Function.identity());
Comment on lines +659 to +689
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thenApply(v -> completedFuture(v)) + exceptionally(...) + thenCompose(identity()) pattern is harder to read and maintain than necessary. Consider rewriting using a single handle/whenComplete-style branch that returns either a value or a next-stage future (and then thenCompose once), or using exceptionallyCompose if the project’s Java target supports it—this will avoid nested futures and make the retry/failure flow clearer.

Copilot uses AI. Check for mistakes.
}

/**
* Closes and removes the reader for the given namespace from the reader cache.
* This is used during retry to ensure a fresh reader is created on the next attempt.
*/
private void closeAndRemoveReaderForNamespace(NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync)
.exceptionally(closeEx -> {
log.warn("[{}] Failed to close reader during retry cleanup", namespace, closeEx);
return null;
});
}
}

/**
* Unloads all namespace bundles belonging to the given namespace from this broker.
* This is called as a last resort when topic policies cache initialization fails after all retries,
* allowing the bundles to be reassigned to a different broker.
*/
private void unloadNamespaceBundlesAsync(NamespaceName namespace) {
try {
NamespaceService namespaceService = pulsarService.getNamespaceService();
if (namespaceService == null) {
log.warn("[{}] Cannot unload namespace bundles: NamespaceService is not available", namespace);
return;
}
Set<NamespaceBundle> ownedBundles = namespaceService.getOwnedServiceUnits();
List<NamespaceBundle> bundlesForNamespace = ownedBundles.stream()
.filter(bundle -> namespace.equals(bundle.getNamespaceObject()))
.collect(Collectors.toList());

if (bundlesForNamespace.isEmpty()) {
log.info("[{}] No owned bundles found to unload for namespace", namespace);
return;
}

log.warn("[{}] Unloading {} namespace bundles due to topic policies cache init failure",
namespace, bundlesForNamespace.size());
for (NamespaceBundle bundle : bundlesForNamespace) {
namespaceService.unloadNamespaceBundle(bundle)
.exceptionally(ex -> {
log.error("[{}] Failed to unload bundle {} after topic policies cache init failure",
namespace, bundle, ex);
return null;
});
}
} catch (Exception e) {
log.error("[{}] Error while attempting to unload namespace bundles after topic policies "
+ "cache init failure", namespace, e);
}
}

private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(NamespaceName ns) {
return readerCaches.compute(ns, (__, existingFuture) -> {
if (existingFuture == null) {
Expand Down
Loading