diff --git a/pip/pip-459.md b/pip/pip-459.md new file mode 100644 index 0000000000000..da5cd85f41ae9 --- /dev/null +++ b/pip/pip-459.md @@ -0,0 +1,375 @@ +# PIP-459: Batch Status Summary and Filtered Listing for Pulsar Functions + +# Background knowledge + +Pulsar Functions are managed by the Functions worker and exposed through the Pulsar Admin API and `pulsar-admin` CLI. Today, listing functions in a namespace returns only function names. To understand runtime health, operators must fetch function status for each function separately. That creates an N+1 request pattern: one request to list function names and one additional request per function to fetch status. In practice, this is slow, noisy, and hard to use in scripts or daily operations. + +Function runtime status is already represented by `FunctionStatus`, which contains aggregate fields such as the total configured instances and the number of running instances. This proposal introduces a lightweight namespace-level summary model built from those counts. The summary is intentionally smaller than the full status payload: it is designed for listing and filtering, not for per-instance inspection. + +The proposal also has to account for mixed-version deployments. A new client may talk to an older worker that does not expose the new summary endpoint yet. In that case, the client must degrade safely instead of failing outright. That compatibility requirement affects both the public admin interface and the client-side implementation strategy. + +# Motivation + +`pulsar-admin functions list` currently cannot answer a basic operational question: which functions in this namespace are actually running. Operators must either inspect each function one by one or write shell loops that issue many admin calls. The problem becomes more visible in namespaces containing dozens or hundreds of functions, where a simple health check becomes expensive and slow. + +The initial feature request was to add state-aware listing to the CLI, but the implementation uncovered three broader issues that should be addressed together: + +1. The current user experience requires N+1 admin calls for namespace-level health inspection. +2. A new summary endpoint must not break new clients when they talk to older workers during rolling upgrades. +3. Namespace-level summary generation can become slow if the worker builds results strictly serially. + +This PIP proposes a batch status summary API for Pulsar Functions and integrates it into the admin client, CLI, and worker implementation in a backward-compatible way. + +# Goals + +## In Scope + +- Add a namespace-level batch status summary API for Pulsar Functions. +- Add a lightweight public data model that returns function name, derived state, instance counts, and failure classification. +- Add admin client support for the new summary API, including fallback to legacy workers that do not implement it. +- Add CLI support for long-format listing and state-based filtering using the batch summary API. +- Add pagination support for namespace-level function summaries. +- Add an optional request parameter to include detailed per-function runtime metrics in the summary response. +- Improve worker-side summary generation latency by using controlled parallelism. +- Add a worker configuration knob to cap summary-query parallelism. +- Add a worker metric to observe summary-query execution time. + +## Out of Scope + +- Changing the existing `functions list` endpoint that returns only function names. +- Returning full per-instance function status from the new namespace-level endpoint. +- Adding equivalent summary endpoints for sources or sinks in this PIP. +- Adding server-side filtering by state in the REST endpoint. +- Reworking the underlying function runtime status model or scheduler behavior. + + +# High Level Design + +This proposal adds a new REST endpoint: + +`GET /admin/v3/functions/{tenant}/{namespace}/status/summary` + +The endpoint returns a list of `FunctionStatusSummary` objects. By default, each object contains the lightweight fields needed for namespace-level listing: + +- `name` +- `state`: `RUNNING`, `STOPPED`, `PARTIAL`, `UNKNOWN` +- `numInstances` +- `numRunning` +- `error` +- `errorType` + +The endpoint also accepts `detailed=true` to include additional per-function runtime metrics in each response element, such as message-processing counters, exception summaries, and last-invocation timing. When `detailed=false` (the default), those extra fields are omitted to keep the endpoint lightweight for large namespace listings. + +The server remains a generic summary provider. It does not apply state filtering. The CLI consumes the summary list and performs presentation concerns locally, such as `--state` filtering and `--long` formatting. This separation keeps the endpoint reusable for other clients and prevents coupling the REST contract to one CLI presentation format. + +For compatibility, the admin client first tries the new summary endpoint. If the server responds with `404 Not Found` or `405 Method Not Allowed`, the client falls back to the legacy flow: fetch the function names, apply name-based pagination, and then query each function status individually to build summaries client-side. This allows a new client to work against older workers during mixed-version upgrades. + +On the worker side, summary generation is executed only for the requested page and uses a bounded thread pool. A new worker configuration, `functionsStatusSummaryMaxParallelism`, limits how many function status lookups may run concurrently for a single summary request. + +# Detailed Design + +## Design & Implementation Details + +### Data model + +A new public model, `FunctionStatusSummary`, is added under the admin API data package. It intentionally returns only aggregate listing information: + +```java +public class FunctionStatusSummary { + public enum SummaryState { + RUNNING, + STOPPED, + PARTIAL, + UNKNOWN + } + + public enum ErrorType { + AUTHENTICATION_FAILED, + FUNCTION_NOT_FOUND, + NETWORK_ERROR, + INTERNAL_ERROR + } + + private String name; + private SummaryState state; + private int numInstances; + private int numRunning; + private String error; + private ErrorType errorType; + + // Present only when the request includes detailed=true. + private Long numReceived; + private Long numSuccessfullyProcessed; + private Long numUserExceptions; + private List latestUserExceptions; + private Long numSystemExceptions; + private List latestSystemExceptions; + private Double averageLatency; + private Long lastInvocationTime; +} +``` + +`state` is derived from aggregate instance counts: + +- `RUNNING`: `numRunning == numInstances` and `numInstances > 0` +- `STOPPED`: `numRunning == 0` and `numInstances > 0` +- `PARTIAL`: `0 < numRunning < numInstances` +- `UNKNOWN`: the status query failed or the instance counts are not meaningful + +### Admin API interface compatibility + +The public `Functions` admin interface gains namespace-level summary methods: + +- `getFunctionsWithStatus(String tenant, String namespace)` +- `getFunctionsWithStatusAsync(String tenant, String namespace)` +- `getFunctionsWithStatus(String tenant, String namespace, Integer limit, String startAfterFunctionName, Boolean detailed)` +- `getFunctionsWithStatusAsync(String tenant, String namespace, Integer limit, String startAfterFunctionName, Boolean detailed)` + +These methods are introduced as `default` methods. This is important because `Functions` is a public interface and can be implemented outside the Pulsar repository. Adding abstract methods would break source or binary compatibility for custom implementations. Using `default` methods preserves compatibility while still exposing the new capability. + +The default implementation also provides a compatibility fallback path by using the legacy list-plus-status flow if a server-side implementation is unavailable. + +### REST endpoint + +The worker exposes a new endpoint: + +`GET /admin/v3/functions/{tenant}/{namespace}/status/summary` + +The endpoint accepts three optional query parameters: + +- `limit`: maximum number of functions to return; must be greater than `0` when present +- `startAfterFunctionName`: exclusive function name cursor in lexicographical order +- `detailed`: boolean flag; when `true`, include detailed per-function runtime metrics in each summary element; default is `false` + +The server obtains the namespace function list, sorts names lexicographically, applies pagination using `startAfterFunctionName` and `limit`, and then builds one `FunctionStatusSummary` per function in the requested page. + +Per-function status retrieval failures are isolated. The endpoint should still return `200 OK` with a summary entry whose `state` is `UNKNOWN` and whose `error` and `errorType` explain the failure. This prevents one unhealthy function from failing the whole namespace-level listing request. + +When `detailed=true`, the server additionally populates the detailed metric fields for each function summary from the same underlying runtime status data used by the existing function status APIs. This keeps the response model additive while allowing callers to explicitly trade response size and server work for richer operational detail. + +### Worker implementation + +The worker implementation builds summaries using controlled parallelism: + +1. Validate worker availability and request parameters. +2. Reuse namespace authorization and list validation logic. +3. Fetch and sort function names. +4. Apply `startAfterFunctionName` and `limit`. +5. Create a bounded executor sized by `min(pageSize, functionsStatusSummaryMaxParallelism)`. +6. Query status for functions in the requested page concurrently. +7. Convert each result into `FunctionStatusSummary`. +8. Record request latency in worker metrics. + +The worker uses two status retrieval paths: + +- Fast path: query local worker status logic directly. +- Fallback path: if the local path fails with a recoverable server-side error, use the worker's internal admin client. + +This fallback is useful for redirect and topology edge cases and keeps worker behavior more robust. + +### Client fallback to legacy workers + +New clients may talk to workers that do not implement `/status/summary`. When the admin client receives `404` or `405` from the new endpoint, it falls back to: + +1. `getFunctions(tenant, namespace)` +2. lexicographical sorting +3. pagination using `limit` and `startAfterFunctionName` +4. `getFunctionStatus(...)` for each function in the selected page + +This preserves functionality during rolling upgrades and mixed-version environments. The fallback returns the same `FunctionStatusSummary` data model, so callers do not need special handling. + +### CLI behavior + +The proposal extends `pulsar-admin functions list` with: + +- `--state` +- `-l, --long` +- `--limit` +- `--start-after-function-name` +- `--detailed` + +Behavior: + +- With no new flags, the command keeps the existing behavior and prints only function names. +- With `--long`, the command prints summary rows that include state and running/instance counts. +- With `--state`, the CLI filters the summary list locally. +- With `--limit` and `--start-after-function-name`, the CLI paginates through the summary endpoint. +- With `--detailed`, the CLI requests the enriched response shape and can print or serialize the extra metrics. + +The CLI rejects combining `--state` with `--limit` or `--start-after-function-name`. + +That restriction is intentional. Pagination is applied on the server to the full ordered function list, while state filtering happens in the CLI after the page is returned. Allowing both together would make users think they are getting "a page of RUNNING functions" when they are actually getting "a page of all functions, then a local state filter." Rejecting the combination keeps command semantics honest. + +## Public-facing Changes + +### Public API + +New admin REST endpoint: + +- Path: `GET /admin/v3/functions/{tenant}/{namespace}/status/summary` +- Query parameters: + - `limit` optional integer, must be greater than `0` + - `startAfterFunctionName` optional string, exclusive function-name cursor + - `detailed` optional boolean, default `false`; when `true`, include detailed per-function runtime metrics +- Response body: JSON array of `FunctionStatusSummary` + +API response schema for each `FunctionStatusSummary` element: + +- `name`: function name +- `state`: `RUNNING`, `STOPPED`, `PARTIAL`, or `UNKNOWN` +- `numInstances`: configured instance count +- `numRunning`: currently running instance count +- `error`: human-readable error message when the summary could not retrieve function status +- `errorType`: categorized failure reason when `state` is `UNKNOWN` +- `numReceived`: total number of records received by the function across instances; present only when `detailed=true` +- `numSuccessfullyProcessed`: total number of records successfully processed across instances; present only when `detailed=true` +- `numUserExceptions`: total number of user exceptions across instances; present only when `detailed=true` +- `latestUserExceptions`: recent user exception entries; present only when `detailed=true` +- `numSystemExceptions`: total number of system exceptions across instances; present only when `detailed=true` +- `latestSystemExceptions`: recent system exception entries; present only when `detailed=true` +- `averageLatency`: average processing latency across instances; present only when `detailed=true` +- `lastInvocationTime`: timestamp in milliseconds of the most recent invocation across instances; present only when `detailed=true` + +Nested response schema: + +- `latestUserExceptions[]` and `latestSystemExceptions[]` use `ExceptionInformation` +- `ExceptionInformation.exceptionString`: exception message or summary text +- `ExceptionInformation.timestampMs`: epoch-millisecond timestamp for when the exception was recorded + +Response codes: + +- `200 OK`: request succeeded; individual function failures are represented in response elements rather than failing the whole request +- `400 Bad Request`: invalid tenant, namespace, or `limit` +- `403 Forbidden`: requester lacks permission to list functions in the namespace +- `503 Service Unavailable`: Functions worker service is unavailable + +Public admin client additions: + +- namespace-level `getFunctionsWithStatus(...)` methods, including asynchronous and paginated variants +- these methods are additive and implemented as `default` methods to preserve compatibility for external implementations + +### Binary protocol + +No Pulsar binary protocol changes are introduced by this proposal. + +### Configuration + +New worker configuration: + +```yaml +# Maximum parallelism used by function status-summary batch queries. +functionsStatusSummaryMaxParallelism: 4 +``` + +This configuration limits the number of concurrent status lookups used to build a summary page. A lower value reduces pressure on worker and admin dependencies. A higher value can reduce response latency for larger pages. + +### CLI + +New CLI options for `pulsar-admin functions list`: + +- `--state` +- `-l`, `--long` +- `--limit` +- `--start-after-function-name` +- `--detailed` + +Example: + +```bash +pulsar-admin functions list \ + --tenant public \ + --namespace default \ + --state RUNNING \ + --long +``` + +Example paginated call: + +```bash +pulsar-admin functions list \ + --tenant public \ + --namespace default \ + --limit 20 \ + --start-after-function-name my-function-020 +``` + +Example paginated detailed call: + +```bash +pulsar-admin functions list \ + --tenant public \ + --namespace default \ + --limit 20 \ + --start-after-function-name my-function-020 \ + --detailed +``` + +### Metrics + +- Full name: `pulsar_function_worker_functions_status_summary_query_time_ms` +- Description: execution time of a namespace-level functions status-summary batch query +- Attributes: the standard worker metric labels configured by `WorkerStatsManager` +- Unit: milliseconds + + +# Monitoring + +Operators should monitor the new summary query latency metric to detect namespaces or environments where namespace-level function inspection is becoming expensive. A practical first alert is sustained high p95 or max latency for `pulsar_function_worker_functions_status_summary_query_time_ms`, especially after enabling large page sizes or running many functions per namespace. + +Operators should also treat growth in `UNKNOWN` summaries as a signal that status retrieval is degraded. Since per-function failures are isolated into the response instead of failing the whole request, dashboards or scripts that consume the summary endpoint should count `state = UNKNOWN` and inspect `errorType` to distinguish authentication problems, missing functions, network failures, and internal errors. + +# Security Considerations + +The new endpoint uses the same namespace-level authorization model as existing function listing operations. A caller must have permission to list functions in the target namespace. The endpoint does not expose function data across tenant or namespace boundaries and continues to rely on existing request validation and authorization checks. + +Per-function failures are summarized as `UNKNOWN` entries with an error classification instead of returning stack traces. This keeps the API useful operationally while avoiding unnecessary disclosure of internal implementation details. As with other admin APIs, operators should review whether error messages returned to unprivileged callers are appropriate for their deployment's security posture. + +Because the endpoint supports pagination through `startAfterFunctionName`, which is derived from function names, the implementation must continue to validate tenant and namespace inputs before performing any listing or status lookup. + +# Backward & Forward Compatibility + +## Upgrade + +This proposal is additive. Existing calls to `getFunctions(...)` and `pulsar-admin functions list` without new flags continue to behave as before. + +During rolling upgrades: + +- new clients can talk to old workers because the admin client falls back when `/status/summary` returns `404` or `405` +- new workers can serve both old and new clients +- external `Functions` interface implementations remain compatible because the new methods are `default` methods + +No special upgrade sequencing is required beyond standard rolling upgrade practices. + +## Downgrade / Rollback + +If the cluster is downgraded to a version without `/status/summary`, new clients will continue to function through the fallback path, although summary requests will be slower because they revert to list-plus-status behavior. + +If the CLI or client is also downgraded, the new flags and endpoint are simply unavailable and users return to the legacy per-function workflow. + +The worker configuration `functionsStatusSummaryMaxParallelism` should be removed or ignored when running an older worker version that does not recognize it. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +This proposal affects the Functions admin plane only. It does not change topic replication protocols, metadata formats for geo-replication, or data-plane message compatibility. No geo-replication-specific upgrade or rollback steps are required. + +# Alternatives + +Client-side only aggregation was considered. It would require changing only the CLI, but it preserves the N+1 request pattern and provides no reusable API for other clients. + +Server-side filtering by state was also considered. It was rejected because state filtering is a presentation concern and would make the endpoint less reusable. Returning the raw summary set keeps the API generic and allows clients to evolve their own filters without server-side API growth. + +Allowing `--state` together with pagination was considered, but rejected because filtering after pagination produces misleading semantics. The current proposal explicitly blocks that combination. + +Another alternative was overloading the existing list endpoint to optionally return enriched status objects. That was rejected because one endpoint returning different response types based on query flags complicates client typing, documentation, and compatibility. + +# General Notes + +This PIP intentionally solves the namespace-level operator workflow first. The same pattern may later be extended to sources and sinks, but that work is kept separate to avoid broadening the review scope. + +The worker-side implementation still computes summaries by querying per-function status and then aggregating results. Controlled parallelism improves latency, but it does not fundamentally change the cost model. Future work may explore more direct or cached aggregation paths if very large namespaces make the endpoint hot. + +# Links + +* relation issue: [issue-25235](https://github.com/apache/pulsar/issues/25235) +* Mailing List discussion thread: https://lists.apache.org/thread/k792434hgm237qmtbn4fsd62hdzdt0h7 +* Mailing List voting thread: TBD