Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ $(eval $(call install-sh,standard,operator-controller-standard.yaml))
.PHONY: test
test: manifests generate fmt lint test-unit test-e2e test-regression #HELP Run all tests.

E2E_TIMEOUT ?= 15m
E2E_TIMEOUT ?= 20m
GODOG_ARGS ?=
.PHONY: e2e
e2e: #EXHELP Run the e2e tests.
Expand Down Expand Up @@ -316,7 +316,7 @@ test-experimental-e2e: COVERAGE_NAME := experimental-e2e
test-experimental-e2e: export MANIFEST := $(EXPERIMENTAL_RELEASE_MANIFEST)
test-experimental-e2e: export INSTALL_DEFAULT_CATALOGS := false
test-experimental-e2e: PROMETHEUS_VALUES := helm/prom_experimental.yaml
test-experimental-e2e: E2E_TIMEOUT := 20m
test-experimental-e2e: E2E_TIMEOUT := 25m
test-experimental-e2e: run-internal prometheus e2e e2e-coverage kind-clean #HELP Run experimental e2e test suite on local kind cluster

.PHONY: prometheus
Expand Down
4 changes: 4 additions & 0 deletions helm/experimental.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# to pull in resources or additions
options:
operatorController:
deployment:
replicas: 2
features:
enabled:
- SingleOwnNamespaceInstallSupport
Expand All @@ -20,6 +22,8 @@ options:
# Use with {{- if has "FeatureGate" .Values.options.catalogd.features.enabled }}
# to pull in resources or additions
catalogd:
deployment:
replicas: 2
features:
enabled:
- APIV1MetasHandler
Expand Down
123 changes: 96 additions & 27 deletions internal/catalogd/serverutil/serverutil.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package serverutil

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
Expand All @@ -13,7 +15,7 @@ import (
"github.com/klauspost/compress/gzhttp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/healthz"

catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
Expand All @@ -27,49 +29,116 @@ type CatalogServerConfig struct {
LocalStorage storage.Instance
}

func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFileWatcher *certwatcher.CertWatcher) error {
listener, err := net.Listen("tcp", cfg.CatalogAddr)
// AddCatalogServerToManager adds the catalog HTTP server to the manager and registers
// a readiness check that passes once the server has started serving. Because
// NeedLeaderElection returns false, Start() is called on every pod immediately, so all
// replicas bind the catalog port and become ready. Non-leader pods serve requests but
// return 404 (empty local cache); callers are expected to retry.
func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, cw *certwatcher.CertWatcher) error {
shutdownTimeout := 30 * time.Second
r := &catalogServerRunnable{
cfg: cfg,
cw: cw,
server: &http.Server{
Addr: cfg.CatalogAddr,
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Minute,
},
shutdownTimeout: shutdownTimeout,
ready: make(chan struct{}),
}

if err := mgr.Add(r); err != nil {
return fmt.Errorf("error adding catalog server to manager: %w", err)
}

// Register a readiness check that passes once Start() has been called and the
// server is actively serving. All pods reach Start() (NeedLeaderElection=false),
// so all replicas become ready and receive traffic; non-leaders return 404 until
// they win the leader lease and populate their local cache.
if err := mgr.AddReadyzCheck("catalog-server", r.readyzCheck()); err != nil {
return fmt.Errorf("error adding catalog server readiness check: %w", err)
}

return nil
}

// catalogServerRunnable is a Runnable that binds the catalog HTTP port on every pod.
// Because NeedLeaderElection returns false, Start() is called on all replicas immediately;
// non-leader pods serve the catalog port but return 404 (empty local cache).
type catalogServerRunnable struct {
cfg CatalogServerConfig
cw *certwatcher.CertWatcher
server *http.Server
shutdownTimeout time.Duration
// ready is closed by Start() once the server is about to begin serving.
ready chan struct{}
}

// NeedLeaderElection returns false so the catalog server starts on every pod
// immediately, regardless of leadership. This is required for rolling updates:
// if Start() were gated on leadership, a new pod could not win the leader lease
// (held by the still-running old pod) and therefore could never pass the
// catalog-server readiness check, deadlocking the rollout.
//
// Non-leader pods serve the catalog HTTP port but have an empty local cache
// (only the leader's reconciler downloads catalog content), so requests to a
// non-leader return 404. Callers are expected to retry.
func (r *catalogServerRunnable) NeedLeaderElection() bool { return false }
Comment thread
coderabbitai[bot] marked this conversation as resolved.

func (r *catalogServerRunnable) Start(ctx context.Context) error {
listener, err := net.Listen("tcp", r.cfg.CatalogAddr)
if err != nil {
return fmt.Errorf("error creating catalog server listener: %w", err)
}

if cfg.CertFile != "" && cfg.KeyFile != "" {
// Use the passed certificate watcher instead of creating a new one
if r.cfg.CertFile != "" && r.cfg.KeyFile != "" {
config := &tls.Config{
GetCertificate: tlsFileWatcher.GetCertificate,
GetCertificate: r.cw.GetCertificate,
MinVersion: tls.VersionTLS12,
}
listener = tls.NewListener(listener, config)
}

shutdownTimeout := 30 * time.Second
catalogServer := manager.Server{
Name: "catalogs",
OnlyServeWhenLeader: true,
Server: &http.Server{
Addr: cfg.CatalogAddr,
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
ReadTimeout: 5 * time.Second,
// TODO: Revert this to 10 seconds if/when the API
// evolves to have significantly smaller responses
WriteTimeout: 5 * time.Minute,
},
ShutdownTimeout: &shutdownTimeout,
Listener: listener,
}
// Signal readiness before blocking on Serve so the readiness probe passes promptly.
close(r.ready)

err = mgr.Add(&catalogServer)
if err != nil {
return fmt.Errorf("error adding catalog server to manager: %w", err)
}
go func() {
<-ctx.Done()
shutdownCtx := context.Background()
if r.shutdownTimeout > 0 {
var cancel context.CancelFunc
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, r.shutdownTimeout)
defer cancel()
}
if err := r.server.Shutdown(shutdownCtx); err != nil {
// Shutdown errors (e.g. context deadline exceeded) are not actionable;
// the process is terminating regardless.
_ = err
}
}()

if err := r.server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}

// readyzCheck returns a healthz.Checker that passes once Start() has been called.
func (r *catalogServerRunnable) readyzCheck() healthz.Checker {
return func(_ *http.Request) error {
select {
case <-r.ready:
return nil
default:
return fmt.Errorf("catalog server not yet started")
}
}
}

func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler {
return handlers.CustomLoggingHandler(nil, handler, func(_ io.Writer, params handlers.LogFormatterParams) {
// extract parameters used in apache common log format, but then log using `logr` to remain consistent
// with other loggers used in this codebase.
username := "-"
if params.URL.User != nil {
if name := params.URL.User.Username(); name != "" {
Expand Down
6 changes: 4 additions & 2 deletions internal/operator-controller/catalogmetadata/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ func (c *Client) PopulateCache(ctx context.Context, catalog *ocv1.ClusterCatalog
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
errToCache := fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, nil, errToCache)
// Do not cache non-200 responses (e.g. 404 from a non-leader catalogd pod).
// Returning the error directly lets the next reconcile retry a fresh HTTP
// request and eventually hit the leader.
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
}
Comment on lines 108 to 113
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Non-200 responses are now hidden from cache consumers

On Line 112, returning directly skips cache.Put, so later reads can surface cache ... not found instead of the real upstream failure. That’s risky for persistent non-200s (e.g., 401/403/5xx). Consider bypassing cache only for the HA race case (404), and caching other non-200 errors.

Proposed scoped fix
 if resp.StatusCode != http.StatusOK {
-	// Do not cache non-200 responses (e.g. 404 from a non-leader catalogd pod).
-	// Returning the error directly lets the next reconcile retry a fresh HTTP
-	// request and eventually hit the leader.
-	return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
+	err := fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
+	// Avoid pinning transient HA misses, but preserve error visibility for other failures.
+	if resp.StatusCode == http.StatusNotFound {
+		return nil, err
+	}
+	return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, nil, err)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resp.StatusCode != http.StatusOK {
errToCache := fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, nil, errToCache)
// Do not cache non-200 responses (e.g. 404 from a non-leader catalogd pod).
// Returning the error directly lets the next reconcile retry a fresh HTTP
// request and eventually hit the leader.
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
// Avoid pinning transient HA misses, but preserve error visibility for other failures.
if resp.StatusCode == http.StatusNotFound {
return nil, err
}
return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, nil, err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/operator-controller/catalogmetadata/client/client.go` around lines
108 - 113, The code currently returns early on any non-200 resp.StatusCode which
prevents cache.Put from recording the actual error; instead, modify the branch
so that only 404 bypasses the cache (treat as HA leader-miss) while other
non-200 statuses create an error value and call cache.Put(key,
errOrErrorResponse) before returning the error; specifically, in the function
handling the HTTP response (the block that checks resp.StatusCode and currently
returns fmt.Errorf...), change the logic to if resp.StatusCode ==
http.StatusNotFound then return without caching, else construct the appropriate
error (including status code and body if available), call cache.Put with the
same cache key used elsewhere, and then return that error.


return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.Ref, resp.Body, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,6 @@ func TestClientPopulateCache(t *testing.T) {
}},
}, nil
},
putFuncConstructor: func(t *testing.T) func(source string, errToCache error) (fs.FS, error) {
return func(source string, errToCache error) (fs.FS, error) {
assert.Empty(t, source)
assert.Error(t, errToCache)
return nil, errToCache
}
},
assert: func(t *testing.T, fs fs.FS, err error) {
assert.Nil(t, fs)
assert.ErrorContains(t, err, "received unexpected response status code 500")
Expand Down
4 changes: 2 additions & 2 deletions manifests/experimental-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2621,7 +2621,7 @@ metadata:
namespace: olmv1-system
spec:
minReadySeconds: 5
replicas: 1
replicas: 2
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The shared e2e-coverage PVC does not scale with these replica bumps.

Both Deployments still mount the same e2e-coverage claim, but this file defines that PVC as ReadWriteOnce at Line 180-Line 185. With 2 catalogd pods and 2 operator-controller pods in the 2-node experimental suite, a replica scheduled onto a different node can stay Pending on volume attach, which defeats the HA rollout this change is trying to exercise.

Also applies to: 2775-2775

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@manifests/experimental-e2e.yaml` at line 2624, The shared e2e-coverage PVC is
defined as ReadWriteOnce but both Deployments (catalogd and operator-controller)
mount it and replicas were increased to 2, causing pods on different nodes to
hang on attach; update the storage model to support multiple nodes by either
changing the PVC named e2e-coverage to accessMode: ReadWriteMany (if your
storage class supports RWX) or convert these Deployments to StatefulSets and use
volumeClaimTemplates to give each replica its own PVC; locate the e2e-coverage
PVC definition and the catalogd/operator-controller workload manifests and apply
the RWX change or the StatefulSet + volumeClaimTemplates approach accordingly.

strategy:
type: RollingUpdate
rollingUpdate:
Expand Down Expand Up @@ -2772,7 +2772,7 @@ metadata:
name: operator-controller-controller-manager
namespace: olmv1-system
spec:
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
4 changes: 2 additions & 2 deletions manifests/experimental.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2541,7 +2541,7 @@ metadata:
namespace: olmv1-system
spec:
minReadySeconds: 5
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down Expand Up @@ -2679,7 +2679,7 @@ metadata:
name: operator-controller-controller-manager
namespace: olmv1-system
spec:
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
19 changes: 18 additions & 1 deletion openshift/catalogd/manifests-experimental.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ spec:
- Ingress
- Egress
---
# Source: olmv1/templates/poddisruptionbudget-olmv1-system-catalogd.yml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: catalogd-controller-manager
namespace: openshift-catalogd
labels:
app.kubernetes.io/name: catalogd
app.kubernetes.io/part-of: olm
annotations:
olm.operatorframework.io/feature-set: experimental
spec:
minAvailable: 1
selector:
matchLabels:
control-plane: catalogd-controller-manager
---
# Source: olmv1/templates/serviceaccount-olmv1-system-common-controller-manager.yml
apiVersion: v1
kind: ServiceAccount
Expand Down Expand Up @@ -825,7 +842,7 @@ metadata:
namespace: openshift-catalogd
spec:
minReadySeconds: 5
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
19 changes: 18 additions & 1 deletion openshift/catalogd/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ spec:
- Ingress
- Egress
---
# Source: olmv1/templates/poddisruptionbudget-olmv1-system-catalogd.yml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: catalogd-controller-manager
namespace: openshift-catalogd
labels:
app.kubernetes.io/name: catalogd
app.kubernetes.io/part-of: olm
annotations:
olm.operatorframework.io/feature-set: standard
spec:
minAvailable: 1
selector:
matchLabels:
control-plane: catalogd-controller-manager
---
# Source: olmv1/templates/serviceaccount-olmv1-system-common-controller-manager.yml
apiVersion: v1
kind: ServiceAccount
Expand Down Expand Up @@ -825,7 +842,7 @@ metadata:
namespace: openshift-catalogd
spec:
minReadySeconds: 5
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
3 changes: 1 addition & 2 deletions openshift/helm/catalogd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ options:
enabled: true
deployment:
image: ${CATALOGD_IMAGE}
podDisruptionBudget:
enabled: false
replicas: 2
operatorController:
enabled: false
openshift:
Expand Down
3 changes: 1 addition & 2 deletions openshift/helm/operator-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ options:
enabled: true
deployment:
image: ${OPERATOR_CONTROLLER_IMAGE}
podDisruptionBudget:
enabled: false
replicas: 2
catalogd:
enabled: false
openshift:
Expand Down
19 changes: 18 additions & 1 deletion openshift/operator-controller/manifests-experimental.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ spec:
- Ingress
- Egress
---
# Source: olmv1/templates/poddisruptionbudget-olmv1-system-operator-controller.yml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: operator-controller-controller-manager
namespace: openshift-operator-controller
labels:
app.kubernetes.io/name: operator-controller
app.kubernetes.io/part-of: olm
annotations:
olm.operatorframework.io/feature-set: experimental
spec:
minAvailable: 1
selector:
matchLabels:
control-plane: operator-controller-controller-manager
---
# Source: olmv1/templates/serviceaccount-olmv1-system-common-controller-manager.yml
apiVersion: v1
kind: ServiceAccount
Expand Down Expand Up @@ -1184,7 +1201,7 @@ metadata:
name: operator-controller-controller-manager
namespace: openshift-operator-controller
spec:
replicas: 1
replicas: 2
strategy:
type: RollingUpdate
rollingUpdate:
Expand Down
Loading