Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ public class ClustersBase extends AdminResource {
})
public void getClusters(@Suspended AsyncResponse asyncResponse) {
clusterResources().listAsync()
.thenApply(HashSet::new)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get clusters {}", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
.<Void>handleAsync((clusters, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get clusters {}", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
}
asyncResponse.resume(clusters);
return null;
});
}, webExecutor());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,18 @@ public void getTenants(@Suspended final AsyncResponse asyncResponse) {
final String clientAppId = clientAppId();
validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS)
.thenCompose(__ -> tenantResources().listTenantsAsync())
.thenAccept(tenants -> {
.<Void>handleAsync((tenants, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get tenants list", clientAppId, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
}
// deep copy the tenants to avoid concurrent sort exception
List<String> deepCopy = new ArrayList<>(tenants);
deepCopy.sort(null);
asyncResponse.resume(deepCopy);
}).exceptionally(ex -> {
log.error("[{}] Failed to get tenants list", clientAppId, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}, webExecutor());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,15 @@ public class Namespaces extends NamespacesBase {
public void getTenantNamespaces(@Suspended final AsyncResponse response,
@PathParam("tenant") String tenant) {
internalGetTenantNamespaces(tenant)
.thenAccept(response::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex);
resumeAsyncResponseExceptionally(response, ex);
.<Void>handleAsync((namespaces, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex);
resumeAsyncResponseExceptionally(response, ex);
return null;
}
response.resume(namespaces);
return null;
});
}, webExecutor());
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -133,6 +134,7 @@ public abstract class PulsarWebResource {
protected UriInfo uri;

private PulsarService pulsar;
private Executor webExecutor;

protected PulsarService pulsar() {
if (pulsar == null) {
Expand All @@ -142,6 +144,13 @@ protected PulsarService pulsar() {
return pulsar;
}

protected Executor webExecutor() {
if (webExecutor == null) {
webExecutor = pulsar().getWebService().getWebServiceExecutor();
}
return webExecutor;
}

protected ServiceConfiguration config() {
return pulsar().getConfiguration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public class WebService implements AutoCloseable {
@Deprecated
private final WebExecutorStats executorStats;
private final WebExecutorThreadPoolStats webExecutorThreadPoolStats;
@Getter
private final WebExecutorThreadPool webServiceExecutor;

private final ServerConnector httpConnector;
private final ServerConnector httpsConnector;
private final FilterInitializer filterInitializer;
Expand Down
Loading