-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] Topic policies loading with timeout and retries #25295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -24,20 +24,25 @@ | |||||||||||||||||||||||||||||||||
| import com.github.benmanes.caffeine.cache.Caffeine; | ||||||||||||||||||||||||||||||||||
| import com.google.common.annotations.VisibleForTesting; | ||||||||||||||||||||||||||||||||||
| import com.google.common.collect.Sets; | ||||||||||||||||||||||||||||||||||
| import io.prometheus.client.Counter; | ||||||||||||||||||||||||||||||||||
| import java.util.ArrayList; | ||||||||||||||||||||||||||||||||||
| import java.util.HashSet; | ||||||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||||||
| import java.util.Map; | ||||||||||||||||||||||||||||||||||
| import java.util.Objects; | ||||||||||||||||||||||||||||||||||
| import java.util.Optional; | ||||||||||||||||||||||||||||||||||
| import java.util.PriorityQueue; | ||||||||||||||||||||||||||||||||||
| import java.util.Set; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.CompletableFuture; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.ConcurrentHashMap; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.CopyOnWriteArrayList; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.TimeoutException; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||||||||||||||||||||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||||||||||||||||||||||||||||||||
| import java.util.function.Consumer; | ||||||||||||||||||||||||||||||||||
| import java.util.function.Function; | ||||||||||||||||||||||||||||||||||
| import java.util.stream.Collectors; | ||||||||||||||||||||||||||||||||||
| import org.apache.commons.lang3.concurrent.ConcurrentInitializer; | ||||||||||||||||||||||||||||||||||
| import org.apache.commons.lang3.concurrent.LazyInitializer; | ||||||||||||||||||||||||||||||||||
| import org.apache.commons.lang3.mutable.Mutable; | ||||||||||||||||||||||||||||||||||
|
|
@@ -78,6 +83,18 @@ | |||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService { | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = Counter.build( | ||||||||||||||||||||||||||||||||||
| "pulsar_topic_policies_cache_init_failures_total", | ||||||||||||||||||||||||||||||||||
| "Total number of topic policies cache initialization failures after all retries exhausted") | ||||||||||||||||||||||||||||||||||
| .labelNames("namespace") | ||||||||||||||||||||||||||||||||||
| .register(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | ||||||||||||||||||||||||||||||||||
| "pulsar_topic_policies_cache_init_timeouts_total", | ||||||||||||||||||||||||||||||||||
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | ||||||||||||||||||||||||||||||||||
| .labelNames("namespace") | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+88
to
+95
|
||||||||||||||||||||||||||||||||||
| "Total number of topic policies cache initialization failures after all retries exhausted") | |
| .labelNames("namespace") | |
| .register(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | |
| .labelNames("namespace") | |
| "Total number of topic policies cache initialization failures after all retries exhausted, per tenant") | |
| .labelNames("tenant") | |
| .register(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts), per tenant") | |
| .labelNames("tenant") |
Copilot
AI
Mar 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static Counter.build(...).register() registers into the global default Prometheus registry at class-load time. In long-running brokers and especially in unit/integration tests that may load/reload components in the same JVM, this pattern can trigger duplicate-collector registration errors and makes metric lifecycle hard to control. Prefer wiring metrics through Pulsar’s existing metrics/registry facilities (or a registry owned by the broker instance) rather than static global registration in the service class.
| .register(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | |
| .labelNames("namespace") | |
| .register(); | |
| .create(); | |
| private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = Counter.build( | |
| "pulsar_topic_policies_cache_init_timeouts_total", | |
| "Total number of topic policies cache initialization timeouts (including retried attempts)") | |
| .labelNames("namespace") | |
| .create(); |
Copilot
AI
Mar 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes cleanup behavior from the prior logic that conditioned the boolean argument on the exception type (e.g., isAlreadyClosedException(ex)), and also previously treated reader-creation failures differently. Always passing true can alter shutdown/cleanup semantics and may cause incorrect cleanup decisions for non-close-related failures. Suggest restoring the prior decision logic (e.g., pass isAlreadyClosedException(ex) where appropriate, and keep the more specific branching for reader creation vs. later-stage failures if it impacts cleanup).
| cleanPoliciesCacheInitMap(namespace, true); | |
| cleanPoliciesCacheInitMap(namespace, | |
| ex instanceof PulsarClientException.AlreadyClosedException | |
| || ex.getCause() instanceof PulsarClientException.AlreadyClosedException); |
Copilot
AI
Mar 9, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thenApply(v -> completedFuture(v)) + exceptionally(...) + thenCompose(identity()) pattern is harder to read and maintain than necessary. Consider rewriting using a single handle/whenComplete-style branch that returns either a value or a next-stage future (and then thenCompose once), or using exceptionallyCompose if the project’s Java target supports it—this will avoid nested futures and make the retry/failure flow clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Labeling these counters by full
namespacecan create unbounded metric cardinality (potentially one time series per namespace per broker), which is a common source of Prometheus memory/CPU pressure. Consider reducing cardinality (e.g., remove the label, use tenant-only, or gate per-namespace labeling behind a config) while still meeting the “emit a metric on failure” requirement.