diff --git a/CLAUDE.md b/CLAUDE.md index 4e5b2e7..7bf5b6c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -55,7 +55,10 @@ make docker-push IMG= # Push container image - **SeiNodeDeployment** creates and owns **SeiNode** resources. Groups orchestrate genesis ceremonies, manage deployments, and coordinate networking/monitoring. - **SeiNode** creates StatefulSets (replicas=1), headless Services, and PVCs via server-side apply (fieldOwner: `seinode-controller`). +- **Plan-driven reconciliation** — Both controllers use ordered task plans (stored in `.status.plan`) to drive lifecycle. Plans are built by `internal/planner/` (`ResolvePlan` for nodes, `ForGroup` for deployments), executed by `planner.Executor`, with individual tasks in `internal/task/`. The reconcile loop is: `ResolvePlan → persist plan → ExecutePlan`. See `internal/planner/doc.go` for the full plan lifecycle. +- **Init plans** transition nodes from Pending → Running. They include infrastructure tasks (`ensure-data-pvc`, `apply-statefulset`, `apply-service`) followed by sidecar tasks (`configure-genesis`, `config-apply`, etc.). +- **Convergence plans** keep Running nodes in sync. They contain only `apply-statefulset` + `apply-service` and are nilled from status after completion. +- **Resource generators** live in `internal/noderesource/` — pure functions that produce StatefulSets, Services, and PVCs from a SeiNode spec. Used by both the controller and plan tasks. - **Platform config** is fully environment-driven — all fields in `platform.Config` must be set via env vars (no defaults). See `internal/platform/platform.go` for the full list. - **Genesis resolution** is handled by the sidecar autonomously: embedded sei-config for well-known chains, S3 fallback at `{SEI_GENESIS_BUCKET}/{chainID}/genesis.json` for custom chains. -- Sidecar bootstrap progression is driven by the node controller polling the sidecar HTTP API and submitting tasks in sequence. - Config keys in seid's `config.toml` use **hyphens** (e.g., `persistent-peers`, `trust-height`), not underscores. diff --git a/internal/controller/node/controller.go b/internal/controller/node/controller.go index ed8595c..79b711a 100644 --- a/internal/controller/node/controller.go +++ b/internal/controller/node/controller.go @@ -30,7 +30,6 @@ const ( nodeFinalizerName = "sei.io/seinode-finalizer" seiNodeControllerName = "seinode" statusPollInterval = 30 * time.Second - fieldOwner = client.FieldOwner("seinode-controller") ) // PlatformConfig is an alias for platform.Config, used throughout the node @@ -78,116 +77,74 @@ func (r *SeiNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } - p, err := planner.ForNode(node) - if err != nil { - return ctrl.Result{}, fmt.Errorf("resolving planner: %w", err) - } - if err := p.Validate(node); err != nil { - return ctrl.Result{}, fmt.Errorf("validating spec: %w", err) + // Failed is terminal — nothing to do. + if node.Status.Phase == seiv1alpha1.PhaseFailed { + r.Recorder.Eventf(node, corev1.EventTypeWarning, "NodeFailed", + "SeiNode is in Failed state. Delete and recreate the resource to retry.") + return ctrl.Result{}, nil } - // TODO: reconcile peers should become a part of a plan if it needs to be performed. + // Pre-plan: resolve label-based peers so plan params have fresh data. if err := r.reconcilePeers(ctx, node); err != nil { return ctrl.Result{}, fmt.Errorf("reconciling peers: %w", err) } - // TODO: this should be a part of a plan and not part of the main reconciliation flow. - if err := r.ensureNodeDataPVC(ctx, node); err != nil { - return ctrl.Result{}, fmt.Errorf("ensuring data PVC: %w", err) + // Resolve or resume plan. ResolvePlan either resumes an active plan or + // builds a new one based on the node's phase, stamping it onto + // node.Status.Plan (and transitioning Pending → Initializing). + observedPhase := node.Status.Phase + planAlreadyActive := node.Status.Plan != nil && node.Status.Plan.Phase == seiv1alpha1.TaskPlanActive + patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{}) + if err := planner.ResolvePlan(node); err != nil { + return ctrl.Result{}, fmt.Errorf("resolving plan: %w", err) } - // TODO: if the plan becomes the abstraction it should be, then p from planner.ForNode should just take an existing - // plan off the node if one exists, otherwise, create one based on the state it sees. - switch node.Status.Phase { - case "", seiv1alpha1.PhasePending: - return r.reconcilePending(ctx, node, p) - case seiv1alpha1.PhaseInitializing: - return r.reconcileInitializing(ctx, node) - case seiv1alpha1.PhaseRunning: - return r.reconcileRunning(ctx, node) - case seiv1alpha1.PhaseFailed: - r.Recorder.Eventf(node, corev1.EventTypeWarning, "NodeFailed", - "SeiNode is in Failed state. Delete and recreate the resource to retry.") - return ctrl.Result{}, nil - default: + if node.Status.Plan == nil { return ctrl.Result{}, nil } -} -// reconcilePending builds the unified Plan and transitions to Initializing. -// For genesis ceremony nodes the plan includes artifact generation and assembly -// steps. For bootstrap nodes the plan includes controller-side Job lifecycle -// tasks followed by production config. All orchestration lives in the plan. -func (r *SeiNodeReconciler) reconcilePending(ctx context.Context, node *seiv1alpha1.SeiNode, p planner.NodePlanner) (ctrl.Result, error) { - patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{}) - - plan, err := p.BuildPlan(node) - if err != nil { - return ctrl.Result{}, fmt.Errorf("building plan: %w", err) + // If ResolvePlan built a new plan, persist it before execution. + if !planAlreadyActive { + if err := r.Status().Patch(ctx, node, patch); err != nil { + return ctrl.Result{}, fmt.Errorf("persisting new plan: %w", err) + } + return planner.ResultRequeueImmediate, nil } - node.Status.Plan = plan - node.Status.Phase = seiv1alpha1.PhaseInitializing - if err := r.Status().Patch(ctx, node, patch); err != nil { - return ctrl.Result{}, fmt.Errorf("initializing plans: %w", err) + // Execute the plan. The executor handles phase transitions via + // TargetPhase/FailedPhase and sets Conditions on failure. + result, err := r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) + if err != nil { + return result, err } - ns, name := node.Namespace, node.Name - nodePhaseTransitions.WithLabelValues(ns, string(seiv1alpha1.PhasePending), string(seiv1alpha1.PhaseInitializing)).Inc() - emitNodePhase(ns, name, seiv1alpha1.PhaseInitializing) - r.Recorder.Eventf(node, corev1.EventTypeNormal, "PhaseTransition", - "Phase changed from %s to %s", seiv1alpha1.PhasePending, seiv1alpha1.PhaseInitializing) + // Emit metrics/events if the phase changed. + if node.Status.Phase != observedPhase { + ns, name := node.Namespace, node.Name + nodePhaseTransitions.WithLabelValues(ns, string(observedPhase), string(node.Status.Phase)).Inc() + emitNodePhase(ns, name, node.Status.Phase) + r.Recorder.Eventf(node, corev1.EventTypeNormal, "PhaseTransition", + "Phase changed from %s to %s", observedPhase, node.Status.Phase) - return planner.ResultRequeueImmediate, nil -} - -// reconcileInitializing drives the unified Plan to completion. For -// bootstrap nodes the StatefulSet and Service are created only after -// bootstrap teardown is complete (to avoid RWO PVC conflicts). For -// non-bootstrap nodes they are created immediately. -func (r *SeiNodeReconciler) reconcileInitializing(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) { - plan := node.Status.Plan - - // TODO: This should be a part of the plan, not a special case we need to filter out for the reconciliation before we - // go ahead with the actual plan. - if !planner.NeedsBootstrap(node) || planner.IsBootstrapComplete(plan) { - if err := r.reconcileNodeStatefulSet(ctx, node); err != nil { - return ctrl.Result{}, fmt.Errorf("reconciling statefulset: %w", err) - } - if err := r.reconcileNodeService(ctx, node); err != nil { - return ctrl.Result{}, fmt.Errorf("reconciling service: %w", err) + if node.Status.Phase == seiv1alpha1.PhaseRunning { + dur := time.Since(node.CreationTimestamp.Time).Seconds() + nodeInitDuration.WithLabelValues(ns, node.Spec.ChainID).Observe(dur) + nodeLastInitDuration.WithLabelValues(ns, name).Set(dur) } } - result, err := r.PlanExecutor.ExecutePlan(ctx, node, plan) - if err != nil { - return result, err + // Running phase: after the convergence plan completes, handle + // runtime tasks (image observation, monitor task polling). + if node.Status.Phase == seiv1alpha1.PhaseRunning { + return r.reconcileRunningTasks(ctx, node) } - // TODO: transitioning should be a part of ExecutePlan, I would think. This way, when a plan is done, it has materialized - // the proper state and does some final patch to the node, basically leaving it in a state that other logic can observe and - // act on as well. For instance, if a plan succeeds, the plan knows what state to put it in. Same for if it fails. - if plan.Phase == seiv1alpha1.TaskPlanComplete { - return r.transitionPhase(ctx, node, seiv1alpha1.PhaseRunning) - } - if plan.Phase == seiv1alpha1.TaskPlanFailed { - return r.transitionPhase(ctx, node, seiv1alpha1.PhaseFailed) - } return result, nil } -// reconcileRunning converges owned resources and handles runtime tasks. -func (r *SeiNodeReconciler) reconcileRunning(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) { - - // TODO: ideally this becomes a task in a plan - if err := r.reconcileNodeStatefulSet(ctx, node); err != nil { - return ctrl.Result{}, fmt.Errorf("reconciling statefulset: %w", err) - } - // TODO: ideally this becomes a task in a plan - if err := r.reconcileNodeService(ctx, node); err != nil { - return ctrl.Result{}, fmt.Errorf("reconciling service: %w", err) - } - +// reconcileRunningTasks handles Running-phase work that is outside the plan: +// image observation and sidecar monitor task polling. +func (r *SeiNodeReconciler) reconcileRunningTasks(ctx context.Context, node *seiv1alpha1.SeiNode) (ctrl.Result, error) { if err := r.observeCurrentImage(ctx, node); err != nil { return ctrl.Result{}, fmt.Errorf("observing current image: %w", err) } @@ -226,36 +183,6 @@ func (r *SeiNodeReconciler) observeCurrentImage(ctx context.Context, node *seiv1 return nil } -// transitionPhase transitions the node to a new phase and emits the associated -// metric counter, phase gauge, and Kubernetes event. -func (r *SeiNodeReconciler) transitionPhase(ctx context.Context, node *seiv1alpha1.SeiNode, phase seiv1alpha1.SeiNodePhase) (ctrl.Result, error) { - prev := node.Status.Phase - if prev == "" { - prev = seiv1alpha1.PhasePending - } - - patch := client.MergeFromWithOptions(node.DeepCopy(), client.MergeFromWithOptimisticLock{}) - node.Status.Phase = phase - if err := r.Status().Patch(ctx, node, patch); err != nil { - return ctrl.Result{}, fmt.Errorf("setting phase to %s: %w", phase, err) - } - - ns, name := node.Namespace, node.Name - nodePhaseTransitions.WithLabelValues(ns, string(prev), string(phase)).Inc() - emitNodePhase(ns, name, phase) - - if phase == seiv1alpha1.PhaseRunning { - dur := time.Since(node.CreationTimestamp.Time).Seconds() - nodeInitDuration.WithLabelValues(ns, node.Spec.ChainID).Observe(dur) - nodeLastInitDuration.WithLabelValues(ns, name).Set(dur) - } - - r.Recorder.Eventf(node, corev1.EventTypeNormal, "PhaseTransition", - "Phase changed from %s to %s", prev, phase) - - return planner.ResultRequeueImmediate, nil -} - // SetupWithManager sets up the controller with the Manager. func (r *SeiNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). @@ -308,37 +235,3 @@ func (r *SeiNodeReconciler) deleteNodeDataPVC(ctx context.Context, node *seiv1al } return r.Delete(ctx, pvc) } - -func (r *SeiNodeReconciler) ensureNodeDataPVC(ctx context.Context, node *seiv1alpha1.SeiNode) error { - desired := noderesource.GenerateDataPVC(node, r.Platform) - if err := ctrl.SetControllerReference(node, desired, r.Scheme); err != nil { - return fmt.Errorf("setting owner reference: %w", err) - } - - existing := &corev1.PersistentVolumeClaim{} - err := r.Get(ctx, types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing) - if apierrors.IsNotFound(err) { - return r.Create(ctx, desired) - } - return err -} - -func (r *SeiNodeReconciler) reconcileNodeStatefulSet(ctx context.Context, node *seiv1alpha1.SeiNode) error { - desired := noderesource.GenerateStatefulSet(node, r.Platform) - desired.SetGroupVersionKind(appsv1.SchemeGroupVersion.WithKind("StatefulSet")) - if err := ctrl.SetControllerReference(node, desired, r.Scheme); err != nil { - return fmt.Errorf("setting owner reference: %w", err) - } - //nolint:staticcheck // migrating to typed ApplyConfiguration is a separate effort - return r.Patch(ctx, desired, client.Apply, fieldOwner, client.ForceOwnership) -} - -func (r *SeiNodeReconciler) reconcileNodeService(ctx context.Context, node *seiv1alpha1.SeiNode) error { - desired := noderesource.GenerateHeadlessService(node) - desired.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Service")) - if err := ctrl.SetControllerReference(node, desired, r.Scheme); err != nil { - return fmt.Errorf("setting owner reference: %w", err) - } - //nolint:staticcheck // migrating to typed ApplyConfiguration is a separate effort - return r.Patch(ctx, desired, client.Apply, fieldOwner, client.ForceOwnership) -} diff --git a/internal/controller/node/monitor_test.go b/internal/controller/node/monitor_test.go index d931ed2..bbc5ad0 100644 --- a/internal/controller/node/monitor_test.go +++ b/internal/controller/node/monitor_test.go @@ -503,9 +503,9 @@ func TestReconcileRunning_MonitorMode_SubmitsMonitorTask(t *testing.T) { r, c := newProgressionReconciler(t, mock, node) - _, err := r.reconcileRunning(context.Background(), node) + _, err := r.reconcileRunningTasks(context.Background(), node) if err != nil { - t.Fatalf("reconcileRunning: %v", err) + t.Fatalf("reconcileRunningTasks: %v", err) } if len(mock.submitted) != 1 { diff --git a/internal/controller/node/plan_execution_integration_test.go b/internal/controller/node/plan_execution_integration_test.go index d415142..0114cb5 100644 --- a/internal/controller/node/plan_execution_integration_test.go +++ b/internal/controller/node/plan_execution_integration_test.go @@ -8,14 +8,17 @@ import ( . "github.com/onsi/gomega" sidecar "github.com/sei-protocol/seictl/sidecar/client" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" "github.com/sei-protocol/sei-k8s-controller/internal/planner" + "github.com/sei-protocol/sei-k8s-controller/internal/task" ) // driveTask submits one task and completes it. For fire-and-forget tasks -// (config-validate, mark-ready), the task completes in a single ExecutePlan -// call. For normal tasks, mock results are set and a second call is made. +// (config-validate, mark-ready, infrastructure tasks), the task completes +// in a single ExecutePlan call. For normal tasks, mock results are set and +// a second call is made. func driveTask( t *testing.T, g Gomega, @@ -36,8 +39,6 @@ func driveTask( _, err = r.PlanExecutor.ExecutePlan(context.Background(), node, node.Status.Plan) g.Expect(err).NotTo(HaveOccurred()) - g.Expect(mock.submitted).To(HaveLen(1)) - g.Expect(mock.submitted[0].Type).To(Equal(taskType)) // Check if already complete (fire-and-forget tasks complete in one call) node = fetch() @@ -48,18 +49,27 @@ func driveTask( } // Task is still running — set mock results and drive to completion - mock.taskResults = map[uuid.UUID]*sidecar.TaskResult{ - taskUUID: completedResult(taskUUID, taskType, nil), + if len(mock.submitted) > 0 { + mock.taskResults = map[uuid.UUID]*sidecar.TaskResult{ + taskUUID: completedResult(taskUUID, taskType, nil), + } } node = fetch() _, err = r.PlanExecutor.ExecutePlan(context.Background(), node, node.Status.Plan) g.Expect(err).NotTo(HaveOccurred()) } +// reconcileOnce runs a single Reconcile call for the given node. +func reconcileOnce(t *testing.T, g Gomega, r *SeiNodeReconciler, name, namespace string) { + t.Helper() + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}} + _, err := r.Reconcile(context.Background(), req) + g.Expect(err).NotTo(HaveOccurred()) +} + func TestIntegrationFullProgressionSnapshotMode(t *testing.T) { g := NewGomegaWithT(t) node := snapshotNode() - p, _ := planner.ForNode(node) mock := &mockSidecarClient{} r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() @@ -71,44 +81,36 @@ func TestIntegrationFullProgressionSnapshotMode(t *testing.T) { return n } - _, err := r.reconcilePending(ctx, node, p) - g.Expect(err).NotTo(HaveOccurred()) + // First Reconcile: ResolvePlan builds the plan and transitions to Initializing. + reconcileOnce(t, g, r, node.Name, node.Namespace) node = fetch() g.Expect(node.Status.Plan).NotTo(BeNil()) g.Expect(node.Status.Phase).To(Equal(seiv1alpha1.PhaseInitializing)) - ct := planner.CurrentTask(node.Status.Plan) - g.Expect(ct).NotTo(BeNil()) - taskUUID, err := uuid.Parse(ct.ID) - g.Expect(err).NotTo(HaveOccurred()) - - _, err = r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(mock.submitted[0].Type).To(Equal(planner.TaskSnapshotRestore)) - - mock.taskResults = map[uuid.UUID]*sidecar.TaskResult{ - taskUUID: completedResult(taskUUID, planner.TaskSnapshotRestore, nil), - } - updated := fetch() - _, err = r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) - g.Expect(err).NotTo(HaveOccurred()) + // Drive infrastructure tasks (fire-and-forget, complete in one call each). + driveTask(t, g, r, mock, fetch, task.TaskTypeEnsureDataPVC) + driveTask(t, g, r, mock, fetch, task.TaskTypeApplyStatefulSet) + driveTask(t, g, r, mock, fetch, task.TaskTypeApplyService) + // Drive sidecar tasks. + driveTask(t, g, r, mock, fetch, planner.TaskSnapshotRestore) driveTask(t, g, r, mock, fetch, planner.TaskConfigureGenesis) driveTask(t, g, r, mock, fetch, planner.TaskConfigApply) driveTask(t, g, r, mock, fetch, planner.TaskConfigureStateSync) driveTask(t, g, r, mock, fetch, planner.TaskConfigValidate) driveTask(t, g, r, mock, fetch, planner.TaskMarkReady) - updated = fetch() - _, err = r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) + // Final ExecutePlan marks the plan complete and transitions to Running. + updated := fetch() + _, err := r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) g.Expect(err).NotTo(HaveOccurred()) - g.Expect(fetch().Status.Plan.Phase).To(Equal(seiv1alpha1.TaskPlanComplete)) + updated = fetch() + g.Expect(updated.Status.Phase).To(Equal(seiv1alpha1.PhaseRunning)) } func TestIntegrationFullProgressionGenesisMode(t *testing.T) { g := NewGomegaWithT(t) node := genesisNode() - p, _ := planner.ForNode(node) mock := &mockSidecarClient{} r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() @@ -120,41 +122,33 @@ func TestIntegrationFullProgressionGenesisMode(t *testing.T) { return n } - _, err := r.reconcilePending(ctx, node, p) - g.Expect(err).NotTo(HaveOccurred()) + // First Reconcile: ResolvePlan builds the plan and transitions to Initializing. + reconcileOnce(t, g, r, node.Name, node.Namespace) node = fetch() g.Expect(node.Status.Plan).NotTo(BeNil()) - ct := planner.CurrentTask(node.Status.Plan) - g.Expect(ct).NotTo(BeNil()) - taskUUID, err := uuid.Parse(ct.ID) - g.Expect(err).NotTo(HaveOccurred()) - - _, err = r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(mock.submitted[0].Type).To(Equal(planner.TaskConfigureGenesis)) - - mock.taskResults = map[uuid.UUID]*sidecar.TaskResult{ - taskUUID: completedResult(taskUUID, planner.TaskConfigureGenesis, nil), - } - updated := fetch() - _, err = r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) - g.Expect(err).NotTo(HaveOccurred()) + // Drive infrastructure tasks. + driveTask(t, g, r, mock, fetch, task.TaskTypeEnsureDataPVC) + driveTask(t, g, r, mock, fetch, task.TaskTypeApplyStatefulSet) + driveTask(t, g, r, mock, fetch, task.TaskTypeApplyService) + // Drive sidecar tasks. + driveTask(t, g, r, mock, fetch, planner.TaskConfigureGenesis) driveTask(t, g, r, mock, fetch, planner.TaskConfigApply) driveTask(t, g, r, mock, fetch, planner.TaskConfigValidate) driveTask(t, g, r, mock, fetch, planner.TaskMarkReady) - updated = fetch() - _, err = r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) + // Final ExecutePlan marks the plan complete and transitions to Running. + updated := fetch() + _, err := r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) g.Expect(err).NotTo(HaveOccurred()) - g.Expect(fetch().Status.Plan.Phase).To(Equal(seiv1alpha1.TaskPlanComplete)) + updated = fetch() + g.Expect(updated.Status.Phase).To(Equal(seiv1alpha1.PhaseRunning)) } func TestIntegrationTaskFailure_FailsPlan(t *testing.T) { g := NewGomegaWithT(t) node := snapshotNode() - p, _ := planner.ForNode(node) mock := &mockSidecarClient{} r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() @@ -166,19 +160,28 @@ func TestIntegrationTaskFailure_FailsPlan(t *testing.T) { return n } - _, err := r.reconcilePending(ctx, node, p) - g.Expect(err).NotTo(HaveOccurred()) + // First Reconcile: builds plan and transitions to Initializing. + reconcileOnce(t, g, r, node.Name, node.Namespace) node = fetch() g.Expect(node.Status.Plan).NotTo(BeNil()) + // Drive infrastructure tasks past the fire-and-forget phase. + driveTask(t, g, r, mock, fetch, task.TaskTypeEnsureDataPVC) + driveTask(t, g, r, mock, fetch, task.TaskTypeApplyStatefulSet) + driveTask(t, g, r, mock, fetch, task.TaskTypeApplyService) + + // Drive snapshot-restore: submit it. + node = fetch() ct := planner.CurrentTask(node.Status.Plan) g.Expect(ct).NotTo(BeNil()) + g.Expect(ct.Type).To(Equal(planner.TaskSnapshotRestore)) taskUUID, err := uuid.Parse(ct.ID) g.Expect(err).NotTo(HaveOccurred()) _, err = r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) g.Expect(err).NotTo(HaveOccurred()) + // Fail the snapshot-restore task. mock.taskResults = map[uuid.UUID]*sidecar.TaskResult{ taskUUID: completedResult(taskUUID, planner.TaskSnapshotRestore, strPtr("S3 access denied")), } @@ -188,9 +191,15 @@ func TestIntegrationTaskFailure_FailsPlan(t *testing.T) { updated = fetch() g.Expect(updated.Status.Plan.Phase).To(Equal(seiv1alpha1.TaskPlanFailed)) - g.Expect(updated.Status.Plan.Tasks[0].Status).To(Equal(seiv1alpha1.TaskFailed)) - g.Expect(updated.Status.Plan.Tasks[0].Error).To(Equal("S3 access denied")) + g.Expect(updated.Status.Phase).To(Equal(seiv1alpha1.PhaseFailed)) + + // Verify the failed task details. + failedTask := findPlannedTask(updated.Status.Plan, planner.TaskSnapshotRestore) + g.Expect(failedTask).NotTo(BeNil()) + g.Expect(failedTask.Status).To(Equal(seiv1alpha1.TaskFailed)) + g.Expect(failedTask.Error).To(Equal("S3 access denied")) + // Subsequent ExecutePlan is a no-op on failed plans. mock.submitted = nil _, err = r.PlanExecutor.ExecutePlan(ctx, updated, updated.Status.Plan) g.Expect(err).NotTo(HaveOccurred()) diff --git a/internal/controller/node/plan_execution_test.go b/internal/controller/node/plan_execution_test.go index dd11626..a981be1 100644 --- a/internal/controller/node/plan_execution_test.go +++ b/internal/controller/node/plan_execution_test.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -33,13 +34,12 @@ const ( testSnapshotRegion = "eu-central-1" ) -func mustBuildPlan(t *testing.T, p planner.NodePlanner, node *seiv1alpha1.SeiNode) *seiv1alpha1.TaskPlan { +func mustBuildPlan(t *testing.T, node *seiv1alpha1.SeiNode) *seiv1alpha1.TaskPlan { t.Helper() - plan, err := p.BuildPlan(node) - if err != nil { - t.Fatalf("BuildPlan: %v", err) + if err := planner.ResolvePlan(node); err != nil { + t.Fatalf("ResolvePlan: %v", err) } - return plan + return node.Status.Plan } type mockSidecarClient struct { @@ -232,37 +232,12 @@ func replayerNode() *seiv1alpha1.SeiNode { } } -// --- Bootstrap mode tests --- - -func TestBootstrapMode(t *testing.T) { - tests := []struct { - name string - snap *seiv1alpha1.SnapshotSource - want string - }{ - {"snapshot", &seiv1alpha1.SnapshotSource{S3: &seiv1alpha1.S3SnapshotSource{TargetHeight: 1}}, "snapshot"}, - {"state-sync", &seiv1alpha1.SnapshotSource{StateSync: &seiv1alpha1.StateSyncSource{}}, "state-sync"}, - {"genesis", nil, "genesis"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p, _ := planner.ForNode(snapshotNode()) - plan := mustBuildPlan(t, p, snapshotNode()) - if plan == nil { - t.Fatal("expected non-nil plan") - } - _ = tt // bootstrap mode is now internal to planner - }) - } -} - // --- Plan building tests --- func TestBuildPlan_Snapshot(t *testing.T) { - p, _ := planner.ForNode(snapshotNode()) - plan := mustBuildPlan(t, p, snapshotNode()) + plan := mustBuildPlan(t, snapshotNode()) got := taskTypes(plan) - want := []string{planner.TaskSnapshotRestore, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskSnapshotRestore, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } @@ -271,26 +246,23 @@ func TestBuildPlan_SnapshotWithPeers(t *testing.T) { node.Spec.Peers = []seiv1alpha1.PeerSource{ {EC2Tags: &seiv1alpha1.EC2TagsPeerSource{Region: "eu-central-1", Tags: map[string]string{"Chain": "atlantic-2"}}}, } - p, _ := planner.ForNode(node) - plan := mustBuildPlan(t, p, node) + plan := mustBuildPlan(t, node) got := taskTypes(plan) - want := []string{planner.TaskSnapshotRestore, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskSnapshotRestore, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } func TestBuildPlan_StateSync(t *testing.T) { - p, _ := planner.ForNode(peerSyncNode()) - plan := mustBuildPlan(t, p, peerSyncNode()) + plan := mustBuildPlan(t, peerSyncNode()) got := taskTypes(plan) - want := []string{planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } func TestBuildPlan_Genesis(t *testing.T) { - p, _ := planner.ForNode(genesisNode()) - plan := mustBuildPlan(t, p, genesisNode()) + plan := mustBuildPlan(t, genesisNode()) got := taskTypes(plan) - want := []string{planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } @@ -299,39 +271,35 @@ func TestBuildPlan_GenesisWithPeers(t *testing.T) { node.Spec.Peers = []seiv1alpha1.PeerSource{ {EC2Tags: &seiv1alpha1.EC2TagsPeerSource{Region: "eu-central-1", Tags: map[string]string{"Chain": "arctic-1"}}}, } - p, _ := planner.ForNode(node) - plan := mustBuildPlan(t, p, node) + plan := mustBuildPlan(t, node) got := taskTypes(plan) - want := []string{planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } func TestBuildPlan_Replayer(t *testing.T) { node := replayerNode() - p, _ := planner.ForNode(node) - plan := mustBuildPlan(t, p, node) + plan := mustBuildPlan(t, node) got := taskTypes(plan) - want := []string{planner.TaskSnapshotRestore, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskSnapshotRestore, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigureStateSync, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } func TestBuildPlan_Archive(t *testing.T) { node := snapshotterNode() - p, _ := planner.ForNode(node) - plan := mustBuildPlan(t, p, node) + plan := mustBuildPlan(t, node) got := taskTypes(plan) - want := []string{planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigValidate, planner.TaskMarkReady} + want := []string{task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, planner.TaskConfigureGenesis, planner.TaskConfigApply, planner.TaskDiscoverPeers, planner.TaskConfigValidate, planner.TaskMarkReady} assertProgression(t, got, want) } func TestBuildPlanPhaseAndTasks(t *testing.T) { - p, _ := planner.ForNode(snapshotNode()) - plan := mustBuildPlan(t, p, snapshotNode()) + plan := mustBuildPlan(t, snapshotNode()) if plan.Phase != seiv1alpha1.TaskPlanActive { t.Errorf("phase = %q, want Active", plan.Phase) } - if len(plan.Tasks) != 6 { - t.Fatalf("expected 6 tasks, got %d: %v", len(plan.Tasks), taskTypes(plan)) + if len(plan.Tasks) != 9 { + t.Fatalf("expected 9 tasks, got %d: %v", len(plan.Tasks), taskTypes(plan)) } for _, pt := range plan.Tasks { if pt.Status != seiv1alpha1.TaskPending { @@ -344,16 +312,17 @@ func TestBuildPlanPhaseAndTasks(t *testing.T) { t.Errorf("task %q has nil Params", pt.Type) } } - if plan.Tasks[0].Type != planner.TaskSnapshotRestore { - t.Errorf("first task = %q, want %q", plan.Tasks[0].Type, planner.TaskSnapshotRestore) + if plan.Tasks[0].Type != task.TaskTypeEnsureDataPVC { + t.Errorf("first task = %q, want %q", plan.Tasks[0].Type, task.TaskTypeEnsureDataPVC) } } func TestBuildPlan_UniqueIDsAcrossRebuilds(t *testing.T) { node := snapshotNode() - p, _ := planner.ForNode(node) - plan1 := mustBuildPlan(t, p, node) - plan2 := mustBuildPlan(t, p, node) + plan1 := mustBuildPlan(t, node) + // Clear the plan so ResolvePlan builds a fresh one. + node.Status.Plan = nil + plan2 := mustBuildPlan(t, node) if plan1.ID == plan2.ID { t.Errorf("plan IDs should differ across rebuilds: both %q", plan1.ID) } @@ -374,14 +343,21 @@ func TestBuildPlan_UniqueIDsAcrossRebuilds(t *testing.T) { func TestBuildPlan_ParamsRoundTrip(t *testing.T) { node := snapshotNode() - p, _ := planner.ForNode(node) - plan := mustBuildPlan(t, p, node) - firstTask := plan.Tasks[0] - if firstTask.Type != planner.TaskSnapshotRestore { - t.Fatalf("expected snapshot-restore, got %s", firstTask.Type) + plan := mustBuildPlan(t, node) + + // Find snapshot-restore task (now after infrastructure tasks). + var snapshotTask *seiv1alpha1.PlannedTask + for i := range plan.Tasks { + if plan.Tasks[i].Type == planner.TaskSnapshotRestore { + snapshotTask = &plan.Tasks[i] + break + } + } + if snapshotTask == nil { + t.Fatal("expected snapshot-restore task in plan") } var params task.SnapshotRestoreParams - if err := json.Unmarshal(firstTask.Params.Raw, ¶ms); err != nil { + if err := json.Unmarshal(snapshotTask.Params.Raw, ¶ms); err != nil { t.Fatalf("unmarshal error: %v", err) } if params.TargetHeight != 100000000 { @@ -395,8 +371,7 @@ func TestConfigApply_ParamsFromPlan(t *testing.T) { node.Spec.Overrides = map[string]string{ "giga_executor.enabled": "true", } - p, _ := planner.ForNode(node) - plan := mustBuildPlan(t, p, node) + plan := mustBuildPlan(t, node) var configTask *seiv1alpha1.PlannedTask for i := range plan.Tasks { @@ -438,65 +413,56 @@ func taskTypes(plan *seiv1alpha1.TaskPlan) []string { func TestReconcile_CreatesPlanOnFirstRun(t *testing.T) { mock := &mockSidecarClient{} node := snapshotNode() - p, _ := planner.ForNode(node) r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: node.Name, Namespace: node.Namespace}} - _, err := r.reconcilePending(ctx, node, p) - if err != nil { - t.Fatalf("error = %v", err) - } - node = fetchNode(t, c, node.Name, node.Namespace) - - _, err = r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) + // First Reconcile: ResolvePlan builds the plan, transitions to Initializing, + // and executes the first task (ensure-data-pvc, which is fire-and-forget). + _, err := r.Reconcile(ctx, req) if err != nil { t.Fatalf("error = %v", err) } updated := fetchNode(t, c, node.Name, node.Namespace) if updated.Status.Plan == nil { - t.Fatal("expected InitPlan to be created") + t.Fatal("expected plan to be created") } if updated.Status.Plan.Phase != seiv1alpha1.TaskPlanActive { t.Errorf("phase = %q, want Active", updated.Status.Plan.Phase) } - if len(mock.submitted) != 1 { - t.Fatalf("expected 1 submitted task, got %d", len(mock.submitted)) - } - if mock.submitted[0].Type != planner.TaskSnapshotRestore { - t.Errorf("submitted task = %q, want %q", mock.submitted[0].Type, planner.TaskSnapshotRestore) + if updated.Status.Phase != seiv1alpha1.PhaseInitializing { + t.Errorf("node phase = %q, want Initializing", updated.Status.Phase) } } func TestReconcile_SubmitsFirstPendingTask(t *testing.T) { mock := &mockSidecarClient{} node := snapshotNode() - p, _ := planner.ForNode(node) - node.Status.Plan = mustBuildPlan(t, p, node) + mustBuildPlan(t, node) r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() + // First task is ensure-data-pvc (controller-side, no sidecar submission). _, err := r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) if err != nil { t.Fatalf("error = %v", err) } - if len(mock.submitted) != 1 { - t.Fatalf("expected 1 submitted, got %d", len(mock.submitted)) - } - updated := fetchNode(t, c, node.Name, node.Namespace) firstTask := updated.Status.Plan.Tasks[0] - if firstTask.Status != seiv1alpha1.TaskPending && firstTask.Status != seiv1alpha1.TaskComplete { - t.Logf("task status = %q (submit succeeded, status depends on mock GetTask)", firstTask.Status) + if firstTask.Type != task.TaskTypeEnsureDataPVC { + t.Errorf("first task = %q, want %q", firstTask.Type, task.TaskTypeEnsureDataPVC) + } + if firstTask.Status != seiv1alpha1.TaskComplete { + t.Errorf("first task status = %q, want Complete", firstTask.Status) } } func TestReconcile_AllTasksComplete_MarksPlanComplete(t *testing.T) { mock := &mockSidecarClient{} node := genesisNode() - p, _ := planner.ForNode(node) - node.Status.Plan = mustBuildPlan(t, p, node) + mustBuildPlan(t, node) for i := range node.Status.Plan.Tasks { node.Status.Plan.Tasks[i].Status = seiv1alpha1.TaskComplete } @@ -521,8 +487,7 @@ func TestReconcile_AllTasksComplete_MarksPlanComplete(t *testing.T) { func TestReconcile_FailedPlan_NoOps(t *testing.T) { mock := &mockSidecarClient{} node := snapshotNode() - p, _ := planner.ForNode(node) - node.Status.Plan = mustBuildPlan(t, p, node) + mustBuildPlan(t, node) node.Status.Plan.Phase = seiv1alpha1.TaskPlanFailed r, _ := newProgressionReconciler(t, mock, node) @@ -550,7 +515,7 @@ func TestReconcile_CompletePlan_SubmitsSnapshotUploadMonitor(t *testing.T) { r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() - _, err := r.reconcileRunning(ctx, node) + _, err := r.reconcileRunningTasks(ctx, node) if err != nil { t.Fatalf("error = %v", err) } @@ -585,7 +550,7 @@ func TestReconcile_CompletePlan_SkipsAlreadySubmittedMonitor(t *testing.T) { r, _ := newProgressionReconciler(t, mock, node) ctx := context.Background() - _, err := r.reconcileRunning(ctx, node) + _, err := r.reconcileRunningTasks(ctx, node) if err != nil { t.Fatalf("error = %v", err) } @@ -597,12 +562,21 @@ func TestReconcile_CompletePlan_SkipsAlreadySubmittedMonitor(t *testing.T) { func TestReconcile_SubmitError_RequeuesGracefully(t *testing.T) { mock := &mockSidecarClient{submitErr: fmt.Errorf("connection refused")} node := snapshotNode() - p, _ := planner.ForNode(node) - node.Status.Plan = mustBuildPlan(t, p, node) + mustBuildPlan(t, node) + + // Advance past the infrastructure tasks (they are controller-side, complete synchronously). + for i := range node.Status.Plan.Tasks { + if node.Status.Plan.Tasks[i].Type == task.TaskTypeEnsureDataPVC || + node.Status.Plan.Tasks[i].Type == task.TaskTypeApplyStatefulSet || + node.Status.Plan.Tasks[i].Type == task.TaskTypeApplyService { + node.Status.Plan.Tasks[i].Status = seiv1alpha1.TaskComplete + } + } r, c := newProgressionReconciler(t, mock, node) ctx := context.Background() + // The first sidecar task (snapshot-restore) will fail to submit. result, err := r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) if err != nil { t.Fatalf("error = %v", err) @@ -611,8 +585,12 @@ func TestReconcile_SubmitError_RequeuesGracefully(t *testing.T) { t.Errorf("RequeueAfter = %v, want %v", result.RequeueAfter, planner.TaskPollInterval) } updated := fetchNode(t, c, node.Name, node.Namespace) - if updated.Status.Plan.Tasks[0].Status != seiv1alpha1.TaskPending { - t.Errorf("task status = %q, want Pending after submit failure", updated.Status.Plan.Tasks[0].Status) + snapshotTask := findPlannedTask(updated.Status.Plan, planner.TaskSnapshotRestore) + if snapshotTask == nil { + t.Fatal("expected snapshot-restore task in plan") + } + if snapshotTask.Status != seiv1alpha1.TaskPending { + t.Errorf("snapshot task status = %q, want Pending after submit failure", snapshotTask.Status) } } @@ -629,79 +607,86 @@ func TestExecutePlan_NilPlan_ReturnsError(t *testing.T) { } } -// --- PlannerForNode dispatch tests --- +// --- ResolvePlan dispatch tests --- -func TestPlannerForNode_FullNode(t *testing.T) { +func TestResolvePlan_FullNode(t *testing.T) { node := snapshotNode() - p, err := planner.ForNode(node) - if err != nil { + if err := planner.ResolvePlan(node); err != nil { t.Fatal(err) } - if p.Mode() != string(seiconfig.ModeFull) { - t.Errorf("Mode() = %q, want %q", p.Mode(), string(seiconfig.ModeFull)) + if node.Status.Plan == nil { + t.Fatal("expected non-nil plan") } } -func TestPlannerForNode_Archive(t *testing.T) { +func TestResolvePlan_Archive(t *testing.T) { node := snapshotterNode() - p, err := planner.ForNode(node) - if err != nil { + if err := planner.ResolvePlan(node); err != nil { t.Fatal(err) } - if p.Mode() != string(seiconfig.ModeArchive) { - t.Errorf("Mode() = %q, want %q", p.Mode(), string(seiconfig.ModeArchive)) + if node.Status.Plan == nil { + t.Fatal("expected non-nil plan") } } -func TestPlannerForNode_Validator(t *testing.T) { +func TestResolvePlan_Validator(t *testing.T) { node := genesisNode() - p, err := planner.ForNode(node) - if err != nil { + if err := planner.ResolvePlan(node); err != nil { t.Fatal(err) } - if p.Mode() != string(seiconfig.ModeValidator) { - t.Errorf("Mode() = %q, want %q", p.Mode(), string(seiconfig.ModeValidator)) + if node.Status.Plan == nil { + t.Fatal("expected non-nil plan") } } -func TestPlannerForNode_Replayer(t *testing.T) { +func TestResolvePlan_Replayer(t *testing.T) { node := replayerNode() - p, err := planner.ForNode(node) - if err != nil { + if err := planner.ResolvePlan(node); err != nil { t.Fatal(err) } - if p.Mode() != string(seiconfig.ModeFull) { - t.Errorf("Mode() = %q, want %q", p.Mode(), string(seiconfig.ModeFull)) + if node.Status.Plan == nil { + t.Fatal("expected non-nil plan") } } -func TestPlannerForNode_NoSubSpec(t *testing.T) { +func TestResolvePlan_NoSubSpec(t *testing.T) { node := &seiv1alpha1.SeiNode{ Spec: seiv1alpha1.SeiNodeSpec{ ChainID: "test", Image: "sei:latest", }, } - _, err := planner.ForNode(node) + err := planner.ResolvePlan(node) if err == nil { t.Error("expected error for node with no sub-spec") } } +func TestResolvePlan_ResumesActivePlan(t *testing.T) { + node := snapshotNode() + node.Status.Plan = &seiv1alpha1.TaskPlan{ + ID: "existing-plan", + Phase: seiv1alpha1.TaskPlanActive, + } + if err := planner.ResolvePlan(node); err != nil { + t.Fatal(err) + } + if node.Status.Plan.ID != "existing-plan" { + t.Errorf("expected plan to be resumed, got new plan %q", node.Status.Plan.ID) + } +} + // --- Phase transition tests --- -func TestReconcilePending_NoBootstrap_SetsInitializingWithPlan(t *testing.T) { +func TestReconcile_Pending_SetsInitializingWithPlan(t *testing.T) { mock := &mockSidecarClient{} node := snapshotNode() r, c := newProgressionReconciler(t, mock, node) + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: node.Name, Namespace: node.Namespace}} - p, _ := planner.ForNode(node) - result, err := r.reconcilePending(context.Background(), node, p) + _, err := r.Reconcile(context.Background(), req) if err != nil { - t.Fatalf("reconcilePending error: %v", err) - } - if result.RequeueAfter == 0 { - t.Error("expected requeue after reconcilePending") + t.Fatalf("Reconcile error: %v", err) } updated := fetchNode(t, c, node.Name, node.Namespace) @@ -709,23 +694,20 @@ func TestReconcilePending_NoBootstrap_SetsInitializingWithPlan(t *testing.T) { t.Errorf("Phase = %q, want %q", updated.Status.Phase, seiv1alpha1.PhaseInitializing) } if updated.Status.Plan == nil { - t.Fatal("expected InitPlan to be created") + t.Fatal("expected plan to be created") } } -func TestReconcilePending_WithBootstrap_SetsInitializing(t *testing.T) { +func TestReconcile_Pending_WithBootstrap_SetsInitializing(t *testing.T) { mock := &mockSidecarClient{} node := replayerNode() node.Spec.Replayer.Snapshot.BootstrapImage = testBootstrapImage r, c := newProgressionReconciler(t, mock, node) + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: node.Name, Namespace: node.Namespace}} - p, _ := planner.ForNode(node) - result, err := r.reconcilePending(context.Background(), node, p) + _, err := r.Reconcile(context.Background(), req) if err != nil { - t.Fatalf("reconcilePending error: %v", err) - } - if result.RequeueAfter == 0 { - t.Error("expected requeue after reconcilePending") + t.Fatalf("Reconcile error: %v", err) } updated := fetchNode(t, c, node.Name, node.Namespace) @@ -733,54 +715,100 @@ func TestReconcilePending_WithBootstrap_SetsInitializing(t *testing.T) { t.Errorf("Phase = %q, want %q", updated.Status.Phase, seiv1alpha1.PhaseInitializing) } if updated.Status.Plan == nil { - t.Fatal("expected InitPlan to be created") + t.Fatal("expected plan to be created") } } -func TestReconcileInitializing_PlanComplete_TransitionsToRunning(t *testing.T) { +func TestExecutePlan_AllComplete_TransitionsToTargetPhase(t *testing.T) { + g := NewWithT(t) mock := &mockSidecarClient{} node := genesisNode() node.Status.Phase = seiv1alpha1.PhaseInitializing - node.Status.Plan = &seiv1alpha1.TaskPlan{ - Phase: seiv1alpha1.TaskPlanComplete, + mustBuildPlan(t, node) + // Pre-complete all tasks so ExecutePlan triggers plan completion. + for i := range node.Status.Plan.Tasks { + node.Status.Plan.Tasks[i].Status = seiv1alpha1.TaskComplete } + r, c := newProgressionReconciler(t, mock, node) + _, err := r.PlanExecutor.ExecutePlan(context.Background(), node, node.Status.Plan) + g.Expect(err).NotTo(HaveOccurred()) - result, err := r.reconcileInitializing(context.Background(), node) - if err != nil { - t.Fatalf("reconcileInitializing error: %v", err) + updated := fetchNode(t, c, node.Name, node.Namespace) + g.Expect(updated.Status.Phase).To(Equal(seiv1alpha1.PhaseRunning), "executor should transition to TargetPhase") + g.Expect(updated.Status.Plan.Phase).To(Equal(seiv1alpha1.TaskPlanComplete)) +} + +func TestExecutePlan_ConvergencePlan_NilsOnCompletion(t *testing.T) { + g := NewWithT(t) + mock := &mockSidecarClient{} + node := snapshotNode() + node.Status.Phase = seiv1alpha1.PhaseRunning + node.Status.Plan = nil + // Build a convergence plan for a Running node. + if err := planner.ResolvePlan(node); err != nil { + t.Fatal(err) } - if result.RequeueAfter == 0 { - t.Error("expected requeue after plan complete") + // Pre-complete all tasks. + for i := range node.Status.Plan.Tasks { + node.Status.Plan.Tasks[i].Status = seiv1alpha1.TaskComplete } + r, c := newProgressionReconciler(t, mock, node) + _, err := r.PlanExecutor.ExecutePlan(context.Background(), node, node.Status.Plan) + g.Expect(err).NotTo(HaveOccurred()) + updated := fetchNode(t, c, node.Name, node.Namespace) - if updated.Status.Phase != seiv1alpha1.PhaseRunning { - t.Errorf("Phase = %q, want %q", updated.Status.Phase, seiv1alpha1.PhaseRunning) - } + g.Expect(updated.Status.Plan).To(BeNil(), "convergence plan should be nilled after completion") + g.Expect(updated.Status.Phase).To(Equal(seiv1alpha1.PhaseRunning), "phase should stay Running") } -func TestReconcileInitializing_PlanFailed_TransitionsToFailed(t *testing.T) { +func TestExecutePlan_TaskFailure_SetsPlanFailedCondition(t *testing.T) { + g := NewWithT(t) mock := &mockSidecarClient{} - node := genesisNode() - node.Status.Phase = seiv1alpha1.PhaseInitializing - node.Status.Plan = &seiv1alpha1.TaskPlan{ - Phase: seiv1alpha1.TaskPlanFailed, + node := snapshotNode() + mustBuildPlan(t, node) + // Advance past infrastructure tasks. + for i := range node.Status.Plan.Tasks { + if node.Status.Plan.Tasks[i].Type == task.TaskTypeEnsureDataPVC || + node.Status.Plan.Tasks[i].Type == task.TaskTypeApplyStatefulSet || + node.Status.Plan.Tasks[i].Type == task.TaskTypeApplyService { + node.Status.Plan.Tasks[i].Status = seiv1alpha1.TaskComplete + } } r, c := newProgressionReconciler(t, mock, node) + ctx := context.Background() - result, err := r.reconcileInitializing(context.Background(), node) - if err != nil { - t.Fatalf("reconcileInitializing error: %v", err) - } - if result.RequeueAfter == 0 { - t.Error("expected requeue after plan failed") + // Submit snapshot-restore. + _, err := r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) + g.Expect(err).NotTo(HaveOccurred()) + + // Fail the task via mock. + ct := planner.CurrentTask(node.Status.Plan) + g.Expect(ct).NotTo(BeNil()) + taskUUID, parseErr := uuid.Parse(ct.ID) + g.Expect(parseErr).NotTo(HaveOccurred()) + mock.taskResults = map[uuid.UUID]*sidecar.TaskResult{ + taskUUID: completedResult(taskUUID, planner.TaskSnapshotRestore, strPtr("boom")), } + node = fetchNode(t, c, node.Name, node.Namespace) + _, err = r.PlanExecutor.ExecutePlan(ctx, node, node.Status.Plan) + g.Expect(err).NotTo(HaveOccurred()) updated := fetchNode(t, c, node.Name, node.Namespace) - if updated.Status.Phase != seiv1alpha1.PhaseFailed { - t.Errorf("Phase = %q, want %q", updated.Status.Phase, seiv1alpha1.PhaseFailed) + g.Expect(updated.Status.Plan.Phase).To(Equal(seiv1alpha1.TaskPlanFailed)) + + // Verify PlanFailed condition was set. + var found bool + for _, cond := range updated.Status.Conditions { + if cond.Type == planner.ConditionPlanFailed { + g.Expect(cond.Status).To(Equal(metav1.ConditionTrue)) + g.Expect(cond.Reason).To(Equal("TaskFailed")) + g.Expect(cond.Message).To(ContainSubstring("boom")) + found = true + } } + g.Expect(found).To(BeTrue(), "expected PlanFailed condition on node") } // --- Result export tests --- @@ -864,7 +892,7 @@ func TestReconcileInitializing_SidecarClientError_Requeues(t *testing.T) { }, } - result, err := r.reconcileInitializing(context.Background(), node) + result, err := r.Reconcile(context.Background(), ctrl.Request{NamespacedName: types.NamespacedName{Name: node.Name, Namespace: node.Namespace}}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -980,7 +1008,7 @@ func TestReconcile_ReplayerWithoutResultExport_NoMonitorTask(t *testing.T) { r, c := newProgressionReconciler(t, mock, node) - _, err := r.reconcileRunning(context.Background(), node) + _, err := r.reconcileRunningTasks(context.Background(), node) if err != nil { t.Fatalf("error = %v", err) } @@ -1007,7 +1035,7 @@ func TestReconcile_SnapshotterWithMonitorTask_BothSubmitted(t *testing.T) { r, c := newProgressionReconciler(t, mock, node) - _, err := r.reconcileRunning(context.Background(), node) + _, err := r.reconcileRunningTasks(context.Background(), node) if err != nil { t.Fatalf("error = %v", err) } @@ -1048,7 +1076,7 @@ func TestReconcileRunning_PollRequeue_ImmediateRequeue(t *testing.T) { r, _ := newProgressionReconciler(t, mock, node) - result, err := r.reconcileRunning(context.Background(), node) + result, err := r.reconcileRunningTasks(context.Background(), node) if err != nil { t.Fatalf("error = %v", err) } @@ -1066,7 +1094,7 @@ func TestReconcile_CompletePlan_SubmitsResultExportMonitorForReplayer(t *testing r, c := newProgressionReconciler(t, mock, node) - _, err := r.reconcileRunning(context.Background(), node) + _, err := r.reconcileRunningTasks(context.Background(), node) if err != nil { t.Fatalf("error = %v", err) } diff --git a/internal/controller/node/reconciler_test.go b/internal/controller/node/reconciler_test.go index 80cb038..e43485a 100644 --- a/internal/controller/node/reconciler_test.go +++ b/internal/controller/node/reconciler_test.go @@ -103,8 +103,10 @@ func TestNodeReconcile_ValidatorNode_CreateStatefulSetAndService(t *testing.T) { node := newGenesisNode("mynet-0", "default") r, c := newNodeReconciler(t, node) - // Drive through Pending -> Initializing. - for range 4 { + // Drive through Pending -> Initializing -> infrastructure tasks. + // Plan: ensure-data-pvc, apply-statefulset, apply-service, then sidecar tasks. + // Each reconcile drives one step forward. + for range 8 { _, err := r.Reconcile(ctx, nodeReqFor("mynet-0", "default")) g.Expect(err).NotTo(HaveOccurred()) } @@ -140,8 +142,8 @@ func TestNodeReconcile_StatefulSet_Idempotent(t *testing.T) { node := newSnapshotNode("snap-0", "default") r, c := newNodeReconciler(t, node) - // Drive through Pending -> Initializing, then one more for idempotency. - for range 5 { + // Drive through Pending -> Initializing -> infrastructure tasks, then one more for idempotency. + for range 8 { _, err := r.Reconcile(ctx, nodeReqFor("snap-0", "default")) g.Expect(err).NotTo(HaveOccurred()) } @@ -158,8 +160,11 @@ func TestNodeReconcile_SnapshotNode_CreatesPVC(t *testing.T) { node := newSnapshotNode("snap-0", "default") r, c := newNodeReconciler(t, node) - _, err := r.Reconcile(ctx, nodeReqFor("snap-0", "default")) - g.Expect(err).NotTo(HaveOccurred()) + // Reconcile 1: finalizer + build plan. Reconcile 2: execute ensure-data-pvc. + for range 3 { + _, err := r.Reconcile(ctx, nodeReqFor("snap-0", "default")) + g.Expect(err).NotTo(HaveOccurred()) + } pvc := &corev1.PersistentVolumeClaim{} g.Expect(c.Get(ctx, types.NamespacedName{Name: "data-snap-0", Namespace: "default"}, pvc)).To(Succeed()) @@ -173,8 +178,8 @@ func TestNodeReconcile_SnapshotNode_StatefulSetHasInitContainers(t *testing.T) { node := newSnapshotNode("snap-0", "default") r, c := newNodeReconciler(t, node) - // Drive through Pending -> Initializing. - for range 4 { + // Drive through Pending -> Initializing -> infrastructure tasks. + for range 8 { _, err := r.Reconcile(ctx, nodeReqFor("snap-0", "default")) g.Expect(err).NotTo(HaveOccurred()) } @@ -211,9 +216,11 @@ func TestNodeReconcile_RunningPhase_UpdatesStatefulSetImage(t *testing.T) { node.Spec.Image = testImageV2 g.Expect(c.Update(ctx, node)).To(Succeed()) - // Reconcile — this enters reconcileRunning which should update the StatefulSet. - _, err := r.Reconcile(ctx, nodeReqFor("mynet-0", "default")) - g.Expect(err).NotTo(HaveOccurred()) + // Reconcile — this builds a convergence plan and drives apply-statefulset. + for range 4 { + _, err := r.Reconcile(ctx, nodeReqFor("mynet-0", "default")) + g.Expect(err).NotTo(HaveOccurred()) + } // StatefulSet should now reflect the new image. g.Expect(c.Get(ctx, types.NamespacedName{Name: "mynet-0", Namespace: "default"}, sts)).To(Succeed()) diff --git a/internal/controller/node/testhelpers_test.go b/internal/controller/node/testhelpers_test.go index abd105c..2fc24ba 100644 --- a/internal/controller/node/testhelpers_test.go +++ b/internal/controller/node/testhelpers_test.go @@ -50,3 +50,15 @@ func findContainer(containers []corev1.Container, name string) *corev1.Container } return nil } + +func findPlannedTask(plan *seiv1alpha1.TaskPlan, taskType string) *seiv1alpha1.PlannedTask { + if plan == nil { + return nil + } + for i := range plan.Tasks { + if plan.Tasks[i].Type == taskType { + return &plan.Tasks[i] + } + } + return nil +} diff --git a/internal/planner/archive.go b/internal/planner/archive.go index d6d623b..57b2e48 100644 --- a/internal/planner/archive.go +++ b/internal/planner/archive.go @@ -23,6 +23,9 @@ func (p *archiveNodePlanner) Validate(node *seiv1alpha1.SeiNode) error { } func (p *archiveNodePlanner) BuildPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) { + if node.Status.Phase == seiv1alpha1.PhaseRunning { + return buildRunningPlan(node) + } return buildBasePlan(node, node.Spec.Peers, nil, &task.ConfigApplyParams{ Mode: string(seiconfig.ModeArchive), Overrides: mergeOverrides(mergeOverrides(commonOverrides(node), p.controllerOverrides(node)), node.Spec.Overrides), diff --git a/internal/planner/bootstrap.go b/internal/planner/bootstrap.go index 0173cb9..87690c1 100644 --- a/internal/planner/bootstrap.go +++ b/internal/planner/bootstrap.go @@ -1,8 +1,6 @@ package planner import ( - "slices" - "github.com/google/uuid" seiconfig "github.com/sei-protocol/sei-config" sidecar "github.com/sei-protocol/seictl/sidecar/client" @@ -28,7 +26,10 @@ func buildBootstrapPlan( jobName := task.BootstrapJobName(node) serviceName := node.Name - bootstrapProg := buildBootstrapProgression(peers, snap) + bootstrapProg, err := buildSidecarProgression(snap, peers) + if err != nil { + return nil, err + } postProg := buildPostBootstrapProgression(peers) tasks := make([]seiv1alpha1.PlannedTask, 0, 2+len(bootstrapProg)+2+len(postProg)) @@ -42,6 +43,12 @@ func buildBootstrapPlan( return nil } + // Phase 0: Ensure the data PVC exists (needed by both bootstrap Job and StatefulSet) + if err := appendTask(task.TaskTypeEnsureDataPVC, + &task.EnsureDataPVCParams{NodeName: node.Name, Namespace: node.Namespace}); err != nil { + return nil, err + } + // Phase 1: Deploy bootstrap infrastructure if err := appendTask(task.TaskTypeDeployBootstrapSvc, &task.DeployBootstrapServiceParams{ServiceName: serviceName, Namespace: node.Namespace}); err != nil { @@ -69,35 +76,37 @@ func buildBootstrapPlan( return nil, err } - // Phase 4: Post-bootstrap config on StatefulSet pod + // Phase 4: Create production StatefulSet and Service (after bootstrap teardown frees the PVC) + if err := appendTask(task.TaskTypeApplyStatefulSet, + &task.ApplyStatefulSetParams{NodeName: node.Name, Namespace: node.Namespace}); err != nil { + return nil, err + } + if err := appendTask(task.TaskTypeApplyService, + &task.ApplyServiceParams{NodeName: node.Name, Namespace: node.Namespace}); err != nil { + return nil, err + } + + // Phase 5: Post-bootstrap config on StatefulSet pod for _, taskType := range postProg { if err := appendTask(taskType, paramsForTaskType(node, taskType, nil, configApplyParams)); err != nil { return nil, err } } - return &seiv1alpha1.TaskPlan{ID: planID, Phase: seiv1alpha1.TaskPlanActive, Tasks: tasks}, nil -} - -// buildBootstrapProgression returns the sidecar task sequence for the -// bootstrap Job phase (everything except mark-ready). -func buildBootstrapProgression(peers []seiv1alpha1.PeerSource, snap *seiv1alpha1.SnapshotSource) []string { - mode := bootstrapMode(snap) - prog := slices.Clone(baseProgression[mode]) - - prog = insertBefore(prog, TaskConfigApply, TaskConfigureGenesis) - if len(peers) > 0 { - prog = insertBefore(prog, TaskConfigValidate, TaskDiscoverPeers) - } - if snap != nil { - prog = insertBefore(prog, TaskConfigValidate, TaskConfigureStateSync) - } - - return prog + return &seiv1alpha1.TaskPlan{ + ID: planID, + Phase: seiv1alpha1.TaskPlanActive, + Tasks: tasks, + TargetPhase: seiv1alpha1.PhaseRunning, + FailedPhase: seiv1alpha1.PhaseFailed, + }, nil } // buildPostBootstrapProgression returns the sidecar task sequence for the -// production StatefulSet after bootstrap teardown. +// production StatefulSet after bootstrap teardown. This is intentionally +// a separate, hand-written progression — it runs on the production pod +// after bootstrap teardown and only includes the config tasks needed to +// prepare the already-restored data directory for production use. func buildPostBootstrapProgression(peers []seiv1alpha1.PeerSource) []string { prog := []string{TaskConfigureGenesis, TaskConfigApply} if len(peers) > 0 { @@ -132,10 +141,15 @@ const genesisConfigureMaxRetries = 180 // configure-genesis retries until the group controller has assembled and // uploaded genesis.json to S3. func buildGenesisPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) { - gc := node.Spec.Validator.GenesisCeremony - planID := uuid.New().String() + configApplyParams := &task.ConfigApplyParams{ + Mode: string(seiconfig.ModeValidator), + Overrides: mergeOverrides(commonOverrides(node), node.Spec.Overrides), + } prog := []string{ + task.TaskTypeEnsureDataPVC, + task.TaskTypeApplyStatefulSet, + task.TaskTypeApplyService, TaskGenerateIdentity, TaskGenerateGentx, TaskUploadGenesisArtifacts, @@ -146,9 +160,10 @@ func buildGenesisPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) TaskMarkReady, } + planID := uuid.New().String() tasks := make([]seiv1alpha1.PlannedTask, len(prog)) for i, taskType := range prog { - t, err := buildPlannedTask(planID, taskType, i, genesisParamsForTaskType(node, gc, taskType)) + t, err := buildPlannedTask(planID, taskType, i, paramsForTaskType(node, taskType, nil, configApplyParams)) if err != nil { return nil, err } @@ -157,43 +172,13 @@ func buildGenesisPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) } tasks[i] = t } - return &seiv1alpha1.TaskPlan{ID: planID, Phase: seiv1alpha1.TaskPlanActive, Tasks: tasks}, nil -} - -func genesisParamsForTaskType(node *seiv1alpha1.SeiNode, gc *seiv1alpha1.GenesisCeremonyNodeConfig, taskType string) any { - switch taskType { - case TaskGenerateIdentity: - return &task.GenerateIdentityParams{ - ChainID: gc.ChainID, - Moniker: node.Name, - } - case TaskGenerateGentx: - return &task.GenerateGentxParams{ - ChainID: gc.ChainID, - StakingAmount: gc.StakingAmount, - AccountBalance: gc.AccountBalance, - GenesisParams: gc.GenesisParams, - } - case TaskUploadGenesisArtifacts: - return &task.UploadGenesisArtifactsParams{ - NodeName: node.Name, - } - case TaskConfigureGenesis: - return &task.ConfigureGenesisParams{} - case TaskConfigApply: - return &task.ConfigApplyParams{ - Mode: string(seiconfig.ModeValidator), - Overrides: mergeOverrides(commonOverrides(node), node.Spec.Overrides), - } - case TaskSetGenesisPeers: - return &task.SetGenesisPeersParams{} - case TaskConfigValidate: - return &task.ConfigValidateParams{} - case TaskMarkReady: - return &task.MarkReadyParams{} - default: - return nil - } + return &seiv1alpha1.TaskPlan{ + ID: planID, + Phase: seiv1alpha1.TaskPlanActive, + Tasks: tasks, + TargetPhase: seiv1alpha1.PhaseRunning, + FailedPhase: seiv1alpha1.PhaseFailed, + }, nil } // SnapshotUploadMonitorTask returns a snapshot-upload TaskRequest if applicable. diff --git a/internal/planner/doc.go b/internal/planner/doc.go new file mode 100644 index 0000000..4743cfe --- /dev/null +++ b/internal/planner/doc.go @@ -0,0 +1,39 @@ +// Package planner builds and executes ordered task plans that drive SeiNode +// and SeiNodeDeployment resources through their lifecycle. +// +// # Plan Lifecycle +// +// A plan is an ordered list of tasks stored in .status.plan on the owning +// resource. The lifecycle is: +// +// 1. Build: ResolvePlan (for nodes) or ForGroup (for deployments) inspects the +// resource's current phase and spec, then builds an appropriate plan. +// 2. Persist: The controller patches the plan into the resource's status. +// 3. Execute: Executor.ExecutePlan drives one task per reconcile, patching +// status after each task completes. +// 4. Complete: When all tasks finish, the executor sets TargetPhase on the +// resource and marks the plan Complete. Convergence plans (where the +// target phase equals the current phase) are nilled out to avoid stale +// data in etcd. +// 5. Fail: If a task fails terminally, the executor sets FailedPhase and a +// PlanFailed condition on the resource for operator observability. +// +// # Plan Types +// +// Init plans transition a node from Pending to Running. They include +// infrastructure tasks (ensure-data-pvc, apply-statefulset, apply-service) +// followed by sidecar tasks (configure-genesis, config-apply, etc.). +// Init plans set both TargetPhase (Running) and FailedPhase (Failed). +// +// Convergence plans keep a Running node's owned resources in sync with the +// spec. They contain only apply-statefulset and apply-service. FailedPhase +// is deliberately empty: a convergence failure retries on the next reconcile +// rather than transitioning the node to Failed. +// +// # Task Types +// +// Sidecar tasks are submitted to the sidecar HTTP API and polled for +// completion. Controller-side tasks (ensure-data-pvc, apply-statefulset, +// apply-service, deploy-bootstrap-job, etc.) execute inline against the +// Kubernetes API. +package planner diff --git a/internal/planner/executor.go b/internal/planner/executor.go index af43759..2e8564c 100644 --- a/internal/planner/executor.go +++ b/internal/planner/executor.go @@ -24,7 +24,8 @@ const ( ConditionPlanFailed = "PlanFailed" ) -// ResultRequeueImmediate requests an immediate re-enqueue. Uses a minimal +// ResultRequeueImmediate requests an immediate re-enqueue with a minimal +// delay to avoid busy-looping while still progressing the plan promptly. var ResultRequeueImmediate = ctrl.Result{RequeueAfter: 1 * time.Millisecond} // PlanExecutor drives a TaskPlan to completion for a given resource type. @@ -109,7 +110,7 @@ func executePlan( patch := client.MergeFromWithOptions(obj.DeepCopyObject().(client.Object), client.MergeFromWithOptimisticLock{}) plan.Phase = seiv1alpha1.TaskPlanComplete setTargetPhase(obj, plan.TargetPhase) - nilPlanIfConvergence(obj, prevPhase, plan.TargetPhase) + clearCompletedConvergencePlan(obj, prevPhase, plan.TargetPhase) if err := kc.Status().Patch(ctx, obj, patch); err != nil { return ctrl.Result{}, fmt.Errorf("marking plan complete: %w", err) } @@ -268,11 +269,11 @@ func currentPhase(obj client.Object) seiv1alpha1.SeiNodePhase { return "" } -// nilPlanIfConvergence nils the plan on the object's status when the plan's +// clearCompletedConvergencePlan nils the plan on the object's status when the plan's // target phase matches the phase the node was already in before the plan // completed (convergence — the node stays in the same phase). Init plans // that transition to a new phase keep their completed plan visible in status. -func nilPlanIfConvergence(obj client.Object, prevPhase, targetPhase seiv1alpha1.SeiNodePhase) { +func clearCompletedConvergencePlan(obj client.Object, prevPhase, targetPhase seiv1alpha1.SeiNodePhase) { if targetPhase == "" { return } diff --git a/internal/planner/full.go b/internal/planner/full.go index 5e4ed92..fe3a8f8 100644 --- a/internal/planner/full.go +++ b/internal/planner/full.go @@ -27,6 +27,9 @@ func (p *fullNodePlanner) Validate(node *seiv1alpha1.SeiNode) error { } func (p *fullNodePlanner) BuildPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) { + if node.Status.Phase == seiv1alpha1.PhaseRunning { + return buildRunningPlan(node) + } fn := node.Spec.FullNode params := &task.ConfigApplyParams{ Mode: string(seiconfig.ModeFull), diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 776dd0d..6d334de 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -102,9 +102,45 @@ func hasCondition(group *seiv1alpha1.SeiNodeDeployment, condType string) bool { return false } -// ForNode returns the appropriate NodePlanner based on which mode sub-spec -// is populated on the SeiNode. -func ForNode(node *seiv1alpha1.SeiNode) (NodePlanner, error) { +// ResolvePlan ensures node.Status.Plan is set and ready for execution. +// If an active plan exists, it is left in place (resume). Otherwise a +// new plan is built from the node's current phase and spec. +// +// ResolvePlan mutates the node in place: it sets Status.Plan and may +// transition Status.Phase from Pending to Initializing. The caller must +// capture a MergeFrom patch base before calling ResolvePlan, and persist +// the status change if a new plan was built (check planAlreadyActive). +func ResolvePlan(node *seiv1alpha1.SeiNode) error { + if node.Status.Plan != nil && node.Status.Plan.Phase == seiv1alpha1.TaskPlanActive { + return nil + } + + p, err := plannerForMode(node) + if err != nil { + return err + } + if err := p.Validate(node); err != nil { + return err + } + + plan, err := p.BuildPlan(node) + if err != nil { + return err + } + if plan == nil { + return nil + } + + node.Status.Plan = plan + if node.Status.Phase == "" || node.Status.Phase == seiv1alpha1.PhasePending { + node.Status.Phase = seiv1alpha1.PhaseInitializing + } + return nil +} + +// plannerForMode returns the appropriate NodePlanner based on which mode +// sub-spec is populated on the SeiNode. +func plannerForMode(node *seiv1alpha1.SeiNode) (NodePlanner, error) { switch { case node.Spec.FullNode != nil: return &fullNodePlanner{}, nil @@ -119,18 +155,45 @@ func ForNode(node *seiv1alpha1.SeiNode) (NodePlanner, error) { } } -// insertBefore inserts task into prog immediately before target, unless task -// is already present. -func insertBefore(prog []string, target, taskType string) []string { +// insertBefore inserts taskType into prog immediately before target. +// Returns an error if the target is not found — this catches plan +// construction bugs rather than producing silently incomplete plans. +// No-op if taskType is already present. +func insertBefore(prog []string, target, taskType string) ([]string, error) { if slices.Contains(prog, taskType) { - return prog + return prog, nil } for i, t := range prog { if t == target { - return slices.Insert(prog, i, taskType) + return slices.Insert(prog, i, taskType), nil } } - return prog + return nil, fmt.Errorf("insertBefore: target %q not found in progression %v", target, prog) +} + +// buildSidecarProgression constructs the sidecar task sequence for the given +// bootstrap mode, inserting optional tasks (genesis, peers, state-sync) at +// the correct positions. Used by both buildBasePlan and buildBootstrapPlan +// to ensure they produce consistent sidecar progressions. +func buildSidecarProgression(snap *seiv1alpha1.SnapshotSource, peers []seiv1alpha1.PeerSource) ([]string, error) { + mode := bootstrapMode(snap) + prog := slices.Clone(baseProgression[mode]) + + var err error + if prog, err = insertBefore(prog, TaskConfigApply, TaskConfigureGenesis); err != nil { + return nil, err + } + if len(peers) > 0 { + if prog, err = insertBefore(prog, TaskConfigValidate, TaskDiscoverPeers); err != nil { + return nil, err + } + } + if snap != nil { + if prog, err = insertBefore(prog, TaskConfigValidate, TaskConfigureStateSync); err != nil { + return nil, err + } + } + return prog, nil } // NeedsBootstrap returns true when the node requires a bootstrap Job to @@ -159,23 +222,6 @@ func SnapshotGeneration(node *seiv1alpha1.SeiNode) *seiv1alpha1.SnapshotGenerati } } -// NeedsLongStartup returns true when the node's bootstrap strategy involves -// replaying blocks. -func NeedsLongStartup(node *seiv1alpha1.SeiNode) bool { - switch { - case node.Spec.FullNode != nil: - return node.Spec.FullNode.Snapshot != nil - case node.Spec.Validator != nil: - return node.Spec.Validator.Snapshot != nil - case node.Spec.Replayer != nil: - return true - case node.Spec.Archive != nil: - return true - default: - return false - } -} - func hasS3Snapshot(snap *seiv1alpha1.SnapshotSource) bool { return snap != nil && snap.S3 != nil } @@ -233,25 +279,24 @@ func buildPlannedTask(planID, taskType string, planIndex int, params any) (seiv1 }, nil } -// buildBasePlan builds a TaskPlan by starting with the base progression for the -// node's bootstrap mode and inserting optional tasks. +// buildBasePlan builds a TaskPlan by starting with infrastructure tasks, +// then the base sidecar progression for the node's bootstrap mode. func buildBasePlan( node *seiv1alpha1.SeiNode, peers []seiv1alpha1.PeerSource, snap *seiv1alpha1.SnapshotSource, configApplyParams *task.ConfigApplyParams, ) (*seiv1alpha1.TaskPlan, error) { - mode := bootstrapMode(snap) - prog := slices.Clone(baseProgression[mode]) - - prog = insertBefore(prog, TaskConfigApply, TaskConfigureGenesis) - if len(peers) > 0 { - prog = insertBefore(prog, TaskConfigValidate, TaskDiscoverPeers) - } - if snap != nil { - prog = insertBefore(prog, TaskConfigValidate, TaskConfigureStateSync) + sidecarProg, err := buildSidecarProgression(snap, peers) + if err != nil { + return nil, err } + // Infrastructure tasks run before sidecar tasks. + prog := make([]string, 0, 3+len(sidecarProg)) + prog = append(prog, task.TaskTypeEnsureDataPVC, task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService) + prog = append(prog, sidecarProg...) + planID := uuid.New().String() tasks := make([]seiv1alpha1.PlannedTask, len(prog)) for i, taskType := range prog { @@ -262,13 +307,16 @@ func buildBasePlan( tasks[i] = t } return &seiv1alpha1.TaskPlan{ - ID: planID, - Phase: seiv1alpha1.TaskPlanActive, - Tasks: tasks, + ID: planID, + Phase: seiv1alpha1.TaskPlanActive, + Tasks: tasks, + TargetPhase: seiv1alpha1.PhaseRunning, + FailedPhase: seiv1alpha1.PhaseFailed, }, nil } // paramsForTaskType constructs the appropriate params struct for a task type. +// This is the single factory for all task params — every plan builder uses it. func paramsForTaskType( node *seiv1alpha1.SeiNode, taskType string, @@ -276,10 +324,19 @@ func paramsForTaskType( configApplyParams *task.ConfigApplyParams, ) any { switch taskType { + // Infrastructure tasks + case task.TaskTypeEnsureDataPVC: + return &task.EnsureDataPVCParams{NodeName: node.Name, Namespace: node.Namespace} + case task.TaskTypeApplyStatefulSet: + return &task.ApplyStatefulSetParams{NodeName: node.Name, Namespace: node.Namespace} + case task.TaskTypeApplyService: + return &task.ApplyServiceParams{NodeName: node.Name, Namespace: node.Namespace} + + // Sidecar tasks case TaskSnapshotRestore: return snapshotRestoreParams(snap) case TaskConfigureGenesis: - return configureGenesisParams(node) + return &task.ConfigureGenesisParams{} case TaskConfigApply: if configApplyParams != nil { return configApplyParams @@ -293,6 +350,35 @@ func paramsForTaskType( return &task.ConfigValidateParams{} case TaskMarkReady: return &task.MarkReadyParams{} + + // Genesis ceremony tasks — only valid when Validator.GenesisCeremony is set. + case TaskGenerateIdentity, TaskGenerateGentx, TaskUploadGenesisArtifacts, TaskSetGenesisPeers: + return genesisCeremonyTaskParams(node, taskType) + + default: + return nil + } +} + +func genesisCeremonyTaskParams(node *seiv1alpha1.SeiNode, taskType string) any { + if node.Spec.Validator == nil || node.Spec.Validator.GenesisCeremony == nil { + return nil + } + gc := node.Spec.Validator.GenesisCeremony + switch taskType { + case TaskGenerateIdentity: + return &task.GenerateIdentityParams{ChainID: gc.ChainID, Moniker: node.Name} + case TaskGenerateGentx: + return &task.GenerateGentxParams{ + ChainID: gc.ChainID, + StakingAmount: gc.StakingAmount, + AccountBalance: gc.AccountBalance, + GenesisParams: gc.GenesisParams, + } + case TaskUploadGenesisArtifacts: + return &task.UploadGenesisArtifactsParams{NodeName: node.Name} + case TaskSetGenesisPeers: + return &task.SetGenesisPeersParams{} default: return nil } @@ -307,10 +393,6 @@ func snapshotRestoreParams(snap *seiv1alpha1.SnapshotSource) *task.SnapshotResto } } -func configureGenesisParams(_ *seiv1alpha1.SeiNode) *task.ConfigureGenesisParams { - return &task.ConfigureGenesisParams{} -} - func discoverPeersParams(node *seiv1alpha1.SeiNode) *task.DiscoverPeersParams { if len(node.Spec.Peers) == 0 { return &task.DiscoverPeersParams{} @@ -363,6 +445,33 @@ func commonOverrides(node *seiv1alpha1.SeiNode) map[string]string { } } +// buildRunningPlan builds a convergence plan for a Running node. +// It ensures the StatefulSet and Service match the current spec. +// +// FailedPhase is deliberately empty: a convergence failure should not +// transition the node out of Running. The executor still sets a PlanFailed +// condition for observability, and the next reconcile will build a fresh +// convergence plan to retry. +func buildRunningPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) { + prog := []string{task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService} + + planID := uuid.New().String() + tasks := make([]seiv1alpha1.PlannedTask, len(prog)) + for i, taskType := range prog { + t, err := buildPlannedTask(planID, taskType, i, paramsForTaskType(node, taskType, nil, nil)) + if err != nil { + return nil, err + } + tasks[i] = t + } + return &seiv1alpha1.TaskPlan{ + ID: planID, + Phase: seiv1alpha1.TaskPlanActive, + Tasks: tasks, + TargetPhase: seiv1alpha1.PhaseRunning, + }, nil +} + // mergeOverrides combines controller-generated overrides with user-specified // overrides. User overrides take precedence. func mergeOverrides(controllerOverrides, userOverrides map[string]string) map[string]string { diff --git a/internal/planner/replay.go b/internal/planner/replay.go index b5c641c..830492a 100644 --- a/internal/planner/replay.go +++ b/internal/planner/replay.go @@ -32,6 +32,9 @@ func (p *replayerPlanner) Validate(node *seiv1alpha1.SeiNode) error { } func (p *replayerPlanner) BuildPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) { + if node.Status.Phase == seiv1alpha1.PhaseRunning { + return buildRunningPlan(node) + } params := &task.ConfigApplyParams{ Mode: string(seiconfig.ModeFull), Overrides: mergeOverrides(mergeOverrides(commonOverrides(node), p.controllerOverrides()), node.Spec.Overrides), diff --git a/internal/planner/validator.go b/internal/planner/validator.go index 758cba8..ad8ed0e 100644 --- a/internal/planner/validator.go +++ b/internal/planner/validator.go @@ -36,6 +36,9 @@ func (p *validatorPlanner) Validate(node *seiv1alpha1.SeiNode) error { } func (p *validatorPlanner) BuildPlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, error) { + if node.Status.Phase == seiv1alpha1.PhaseRunning { + return buildRunningPlan(node) + } if isGenesisCeremonyNode(node) { return buildGenesisPlan(node) } diff --git a/internal/task/apply_service.go b/internal/task/apply_service.go index 9495342..868dbbb 100644 --- a/internal/task/apply_service.go +++ b/internal/task/apply_service.go @@ -15,9 +15,9 @@ import ( const TaskTypeApplyService = "apply-service" -var serviceFieldOwner = client.FieldOwner("seinode-controller") - // ApplyServiceParams identifies the node whose headless Service should be applied. +// Fields are serialized into the plan for observability (the task itself +// reads the node from ExecutionConfig.Resource). type ApplyServiceParams struct { NodeName string `json:"nodeName"` Namespace string `json:"namespace"` @@ -56,7 +56,7 @@ func (e *applyServiceExecution) Execute(ctx context.Context) error { } //nolint:staticcheck // migrating to typed ApplyConfiguration is a separate effort - if err := e.cfg.KubeClient.Patch(ctx, desired, client.Apply, serviceFieldOwner, client.ForceOwnership); err != nil { + if err := e.cfg.KubeClient.Patch(ctx, desired, client.Apply, fieldOwner, client.ForceOwnership); err != nil { return fmt.Errorf("applying service: %w", err) } @@ -65,8 +65,5 @@ func (e *applyServiceExecution) Execute(ctx context.Context) error { } func (e *applyServiceExecution) Status(_ context.Context) ExecutionStatus { - if s, done := e.isTerminal(); done { - return s - } - return e.status + return e.DefaultStatus() } diff --git a/internal/task/apply_statefulset.go b/internal/task/apply_statefulset.go index 1421424..f2939ce 100644 --- a/internal/task/apply_statefulset.go +++ b/internal/task/apply_statefulset.go @@ -15,9 +15,9 @@ import ( const TaskTypeApplyStatefulSet = "apply-statefulset" -var statefulSetFieldOwner = client.FieldOwner("seinode-controller") - // ApplyStatefulSetParams identifies the node whose StatefulSet should be applied. +// Fields are serialized into the plan for observability (the task itself +// reads the node from ExecutionConfig.Resource). type ApplyStatefulSetParams struct { NodeName string `json:"nodeName"` Namespace string `json:"namespace"` @@ -56,7 +56,7 @@ func (e *applyStatefulSetExecution) Execute(ctx context.Context) error { } //nolint:staticcheck // migrating to typed ApplyConfiguration is a separate effort - if err := e.cfg.KubeClient.Patch(ctx, desired, client.Apply, statefulSetFieldOwner, client.ForceOwnership); err != nil { + if err := e.cfg.KubeClient.Patch(ctx, desired, client.Apply, fieldOwner, client.ForceOwnership); err != nil { return fmt.Errorf("applying statefulset: %w", err) } @@ -65,8 +65,5 @@ func (e *applyStatefulSetExecution) Execute(ctx context.Context) error { } func (e *applyStatefulSetExecution) Status(_ context.Context) ExecutionStatus { - if s, done := e.isTerminal(); done { - return s - } - return e.status + return e.DefaultStatus() } diff --git a/internal/task/ensure_pvc.go b/internal/task/ensure_pvc.go index 635bb36..34c0fdb 100644 --- a/internal/task/ensure_pvc.go +++ b/internal/task/ensure_pvc.go @@ -15,6 +15,8 @@ import ( const TaskTypeEnsureDataPVC = "ensure-data-pvc" // EnsureDataPVCParams identifies the node whose PVC should be ensured. +// Fields are serialized into the plan for observability (the task itself +// reads the node from ExecutionConfig.Resource). type EnsureDataPVCParams struct { NodeName string `json:"nodeName"` Namespace string `json:"namespace"` @@ -64,8 +66,5 @@ func (e *ensureDataPVCExecution) Execute(ctx context.Context) error { } func (e *ensureDataPVCExecution) Status(_ context.Context) ExecutionStatus { - if s, done := e.isTerminal(); done { - return s - } - return e.status + return e.DefaultStatus() } diff --git a/internal/task/task.go b/internal/task/task.go index 6538593..7f3d355 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -26,6 +26,10 @@ const ( TaskTypeTeardownBootstrap = "teardown-bootstrap" ) +// fieldOwner is the server-side apply field manager for resources +// owned by the seinode-controller. +var fieldOwner = client.FieldOwner("seinode-controller") + // ExecutionStatus represents the lifecycle state of a task execution. type ExecutionStatus string @@ -96,6 +100,16 @@ func (b *taskBase) setFailed(err error) { b.err = err } +// DefaultStatus returns the cached status, short-circuiting if terminal. +// Fire-and-forget tasks that complete entirely within Execute can use this +// as their Status implementation directly. +func (b *taskBase) DefaultStatus() ExecutionStatus { + if s, done := b.isTerminal(); done { + return s + } + return b.status +} + // Err returns the error that caused failure, or nil. func (b *taskBase) Err() error { return b.err } diff --git a/platform/clusters/prod/protocol/atlantic-2/.gitkeep b/platform/clusters/prod/protocol/atlantic-2/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/platform/clusters/prod/protocol/pacific-1/.gitkeep b/platform/clusters/prod/protocol/pacific-1/.gitkeep deleted file mode 100644 index e69de29..0000000