diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 25cc7221a9..d5d69a9b8a 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -60,15 +60,22 @@ type RefreshEvaluator interface { } const ( - refreshCheckInterval = 5 * time.Second - refreshMaxConcurrency = 16 - refreshPendingBackoff = time.Minute - refreshFailureBackoff = 5 * time.Minute - quotaBackoffBase = time.Second - quotaBackoffMax = 30 * time.Minute + refreshCheckInterval = 5 * time.Second + refreshMaxConcurrency = 16 + refreshPendingBackoff = time.Minute + refreshFailureBackoff = 5 * time.Minute + quotaBackoffBase = time.Second + quotaBackoffMax = 30 * time.Minute + mixedProviderWarmupTTL = time.Minute + mixedProviderWarmupTimeout = 10 * time.Second + mixedProviderWarmupPrompt = "hi" + mixedProviderWarmupMaxTok = 1 ) -var quotaCooldownDisabled atomic.Bool +var ( + quotaCooldownDisabled atomic.Bool + mixedProviderWarmups = map[string]struct{}{"openai": {}} +) // SetQuotaCooldownDisabled toggles quota cooldown scheduling globally. func SetQuotaCooldownDisabled(disable bool) { @@ -164,6 +171,10 @@ type Manager struct { // Auto refresh state refreshCancel context.CancelFunc refreshSemaphore chan struct{} + + mixedWarmMu sync.Mutex + mixedWarmUntil map[string]time.Time + mixedWarmInFlight map[string]struct{} } // NewManager constructs a manager with optional custom selector and hook. @@ -175,14 +186,16 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager { hook = NoopHook{} } manager := &Manager{ - store: store, - executors: make(map[string]ProviderExecutor), - selector: selector, - hook: hook, - auths: make(map[string]*Auth), - providerOffsets: make(map[string]int), - modelPoolOffsets: make(map[string]int), - refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), + store: store, + executors: make(map[string]ProviderExecutor), + selector: selector, + hook: hook, + auths: make(map[string]*Auth), + providerOffsets: make(map[string]int), + modelPoolOffsets: make(map[string]int), + refreshSemaphore: make(chan struct{}, refreshMaxConcurrency), + mixedWarmUntil: make(map[string]time.Time), + mixedWarmInFlight: make(map[string]struct{}), } // atomic.Value requires non-nil initial value. manager.runtimeConfig.Store(&internalconfig.Config{}) @@ -430,6 +443,20 @@ func openAICompatModelPoolKey(auth *Auth, requestedModel string) string { return strings.ToLower(strings.TrimSpace(auth.ID)) + "|" + openAICompatProviderKey(auth) + "|" + strings.ToLower(base) } +func (m *Manager) currentModelPoolOffset(key string, size int) int { + if m == nil || size <= 1 { + return 0 + } + key = strings.TrimSpace(key) + if key == "" { + return 0 + } + m.mu.RLock() + offset := m.modelPoolOffsets[key] + m.mu.RUnlock() + return offset % size +} + func (m *Manager) nextModelPoolOffset(key string, size int) int { if m == nil || size <= 1 { return 0 @@ -500,13 +527,27 @@ func preserveRequestedModelSuffix(requestedModel, resolved string) string { } func (m *Manager) executionModelCandidates(auth *Auth, routeModel string) []string { + return m.executionModelCandidatesWithPoolOffset(auth, routeModel, true) +} + +func (m *Manager) executionModelCandidatesWithoutAdvancing(auth *Auth, routeModel string) []string { + return m.executionModelCandidatesWithPoolOffset(auth, routeModel, false) +} + +func (m *Manager) executionModelCandidatesWithPoolOffset(auth *Auth, routeModel string, advance bool) []string { requestedModel := rewriteModelForAuth(routeModel, auth) requestedModel = m.applyOAuthModelAlias(auth, requestedModel) if pool := m.resolveOpenAICompatUpstreamModelPool(auth, requestedModel); len(pool) > 0 { if len(pool) == 1 { return pool } - offset := m.nextModelPoolOffset(openAICompatModelPoolKey(auth, requestedModel), len(pool)) + key := openAICompatModelPoolKey(auth, requestedModel) + offset := 0 + if advance { + offset = m.nextModelPoolOffset(key, len(pool)) + } else { + offset = m.currentModelPoolOffset(key, len(pool)) + } return rotateStrings(pool, offset) } resolved := m.applyAPIKeyModelAlias(auth, requestedModel) @@ -571,7 +612,15 @@ func (m *Manager) filterExecutionModels(auth *Auth, routeModel string, candidate } func (m *Manager) preparedExecutionModels(auth *Auth, routeModel string) ([]string, bool) { - candidates := m.executionModelCandidates(auth, routeModel) + return m.preparedExecutionModelsWithPoolOffset(auth, routeModel, true) +} + +func (m *Manager) preparedExecutionModelsWithoutAdvancing(auth *Auth, routeModel string) ([]string, bool) { + return m.preparedExecutionModelsWithPoolOffset(auth, routeModel, false) +} + +func (m *Manager) preparedExecutionModelsWithPoolOffset(auth *Auth, routeModel string, advance bool) ([]string, bool) { + candidates := m.executionModelCandidatesWithPoolOffset(auth, routeModel, advance) pooled := len(candidates) > 1 return m.filterExecutionModels(auth, routeModel, candidates, pooled), pooled } @@ -1250,6 +1299,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req if len(providers) == 0 { return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"} } + ctx = withMixedWarmupEnabled(ctx) routeModel := req.Model opts = ensureRequestedModelMetadata(opts, routeModel) tried := make(map[string]struct{}) @@ -1273,6 +1323,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + m.maybeWarmOtherMixedProvidersAsync(ctx, providers, provider, routeModel, opts) tried[auth.ID] = struct{}{} execCtx := ctx @@ -1406,6 +1457,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string if len(providers) == 0 { return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"} } + ctx = withMixedWarmupEnabled(ctx) routeModel := req.Model opts = ensureRequestedModelMetadata(opts, routeModel) tried := make(map[string]struct{}) @@ -1437,6 +1489,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string entry := logEntryWithRequestID(ctx) debugLogAuthSelection(entry, auth, provider, req.Model) publishSelectedAuthMetadata(opts.Metadata, auth.ID) + m.maybeWarmOtherMixedProvidersAsync(ctx, providers, provider, routeModel, opts) tried[auth.ID] = struct{}{} execCtx := ctx @@ -2863,6 +2916,274 @@ func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model s return authCopy, executor, providerKey, nil } +type mixedWarmupEnabledContextKey struct{} + +func withMixedWarmupEnabled(ctx context.Context) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, mixedWarmupEnabledContextKey{}, true) +} + +func mixedWarmupEnabled(ctx context.Context) bool { + if ctx == nil { + return false + } + enabled, _ := ctx.Value(mixedWarmupEnabledContextKey{}).(bool) + return enabled +} + +func (m *Manager) maybeWarmOtherMixedProvidersAsync(ctx context.Context, providers []string, selectedProvider, routeModel string, opts cliproxyexecutor.Options) { + if m == nil || !mixedWarmupEnabled(ctx) || !m.usesFillFirstSelector() || len(providers) < 2 { + return + } + selectedProvider = strings.ToLower(strings.TrimSpace(selectedProvider)) + routeModel = strings.TrimSpace(routeModel) + targets := make([]string, 0, len(providers)) + seen := make(map[string]struct{}, len(providers)) + for _, provider := range providers { + providerKey := strings.ToLower(strings.TrimSpace(provider)) + if providerKey == "" || providerKey == selectedProvider { + continue + } + if _, ok := seen[providerKey]; ok { + continue + } + seen[providerKey] = struct{}{} + if !shouldWarmMixedProvider(providerKey) { + continue + } + targets = append(targets, providerKey) + } + for _, providerKey := range targets { + auth, _, err := m.pickNext(ctx, providerKey, routeModel, opts, nil) + if err != nil { + logEntryWithRequestID(ctx).Debugf("mixed provider warmup skipped provider=%s model=%s error=%v", providerKey, routeModel, err) + continue + } + if auth == nil { + continue + } + if !m.tryStartMixedProviderWarmup(providerKey, auth.ID, time.Now()) { + continue + } + go func(requestCtx context.Context, auth *Auth, providerKey, model string) { + if err := m.warmMixedProvider(requestCtx, auth, providerKey, model); err != nil { + logEntryWithRequestID(requestCtx).Debugf("mixed provider warmup skipped provider=%s model=%s error=%v", providerKey, model, err) + } + }(ctx, auth, providerKey, routeModel) + } +} + +func shouldWarmMixedProvider(provider string) bool { + provider = strings.ToLower(strings.TrimSpace(provider)) + if provider == "" { + return false + } + _, ok := mixedProviderWarmups[provider] + return ok +} + +func (m *Manager) usesFillFirstSelector() bool { + if m == nil { + return false + } + m.mu.RLock() + defer m.mu.RUnlock() + _, ok := m.selector.(*FillFirstSelector) + return ok +} + +func (m *Manager) warmMixedProvider(ctx context.Context, auth *Auth, provider, routeModel string) error { + defer m.finishMixedProviderWarmup(provider) + + warmBaseCtx := context.Background() + if ctx != nil { + warmBaseCtx = context.WithoutCancel(ctx) + } + warmCtx, cancel := context.WithTimeout(warmBaseCtx, mixedProviderWarmupTimeout) + defer cancel() + + if auth == nil { + return &Error{Code: "auth_not_found", Message: "warmup auth is nil"} + } + models, _ := m.preparedExecutionModelsWithoutAdvancing(auth, routeModel) + if len(models) == 0 { + return &Error{Code: "model_not_found", Message: "warmup model is unavailable"} + } + warmReq, err := buildMixedProviderWarmupRequest(auth, provider, models[0]) + if err != nil { + return err + } + if !m.reserveMixedProviderWarmup(provider, auth.ID, time.Now()) { + return nil + } + reserved := true + clearReservation := func() { + if !reserved { + return + } + m.clearMixedProviderWarmup(provider, auth.ID) + reserved = false + } + if rt := m.roundTripperFor(auth); rt != nil { + warmCtx = context.WithValue(warmCtx, roundTripperContextKey{}, rt) + warmCtx = context.WithValue(warmCtx, "cliproxy.roundtripper", rt) + } + resp, err := m.HttpRequest(warmCtx, auth, warmReq.WithContext(warmCtx)) + if resp != nil && resp.Body != nil { + defer func() { + if errClose := resp.Body.Close(); errClose != nil { + logEntryWithRequestID(warmCtx).Debugf("mixed provider warmup close failed provider=%s auth=%s error=%v", provider, auth.ID, errClose) + } + }() + defer func() { + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 4096)) + }() + } + if err != nil { + clearReservation() + return err + } + if resp == nil { + clearReservation() + return &Error{Code: "provider_request_failed", Message: "warmup response is nil"} + } + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + clearReservation() + message := strings.TrimSpace(resp.Status) + if message == "" { + message = "warmup request failed with status " + strconv.Itoa(resp.StatusCode) + } + return &Error{Code: "provider_request_failed", Message: message, HTTPStatus: resp.StatusCode} + } + return nil +} + +func buildMixedProviderWarmupRequest(auth *Auth, provider, model string) (*http.Request, error) { + provider = strings.ToLower(strings.TrimSpace(provider)) + model = strings.TrimSpace(model) + if parsed := thinking.ParseSuffix(model); parsed.ModelName != "" { + model = strings.TrimSpace(parsed.ModelName) + } + if auth == nil { + return nil, &Error{Code: "auth_not_found", Message: "warmup auth is nil"} + } + if model == "" { + return nil, &Error{Code: "model_not_found", Message: "warmup model is empty"} + } + switch provider { + case "openai": + baseURL := "" + if auth.Attributes != nil { + baseURL = strings.TrimSpace(auth.Attributes["base_url"]) + } + if baseURL == "" { + return nil, &Error{Code: "invalid_request", Message: "warmup base_url is empty"} + } + payload, errMarshal := json.Marshal(map[string]any{ + "model": model, + "messages": []map[string]any{{ + "role": "user", + "content": mixedProviderWarmupPrompt, + }}, + "max_tokens": mixedProviderWarmupMaxTok, + "stream": false, + }) + if errMarshal != nil { + return nil, errMarshal + } + url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" + req, errReq := http.NewRequest(http.MethodPost, url, bytes.NewReader(payload)) + if errReq != nil { + return nil, errReq + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "cliproxy-fill-first-warmup") + return req, nil + default: + return nil, &Error{Code: "provider_not_found", Message: "warmup provider not supported"} + } +} + +func (m *Manager) tryStartMixedProviderWarmup(provider, authID string, now time.Time) bool { + if m == nil { + return false + } + provider = strings.ToLower(strings.TrimSpace(provider)) + authID = strings.TrimSpace(authID) + if provider == "" || authID == "" { + return false + } + key := provider + "|" + authID + m.mixedWarmMu.Lock() + defer m.mixedWarmMu.Unlock() + m.cleanupExpiredMixedProviderWarmupsLocked(now) + if _, ok := m.mixedWarmInFlight[provider]; ok { + return false + } + if until, ok := m.mixedWarmUntil[key]; ok && until.After(now) { + return false + } + m.mixedWarmInFlight[provider] = struct{}{} + return true +} + +func (m *Manager) finishMixedProviderWarmup(provider string) { + if m == nil { + return + } + provider = strings.ToLower(strings.TrimSpace(provider)) + if provider == "" { + return + } + m.mixedWarmMu.Lock() + defer m.mixedWarmMu.Unlock() + delete(m.mixedWarmInFlight, provider) +} + +func (m *Manager) cleanupExpiredMixedProviderWarmupsLocked(now time.Time) { + for key, until := range m.mixedWarmUntil { + if !until.After(now) { + delete(m.mixedWarmUntil, key) + } + } +} + +func (m *Manager) reserveMixedProviderWarmup(provider, authID string, now time.Time) bool { + if m == nil { + return false + } + provider = strings.ToLower(strings.TrimSpace(provider)) + authID = strings.TrimSpace(authID) + if provider == "" || authID == "" { + return false + } + key := provider + "|" + authID + m.mixedWarmMu.Lock() + defer m.mixedWarmMu.Unlock() + m.cleanupExpiredMixedProviderWarmupsLocked(now) + if until, ok := m.mixedWarmUntil[key]; ok && until.After(now) { + return false + } + m.mixedWarmUntil[key] = now.Add(mixedProviderWarmupTTL) + return true +} + +func (m *Manager) clearMixedProviderWarmup(provider, authID string) { + if m == nil { + return + } + provider = strings.ToLower(strings.TrimSpace(provider)) + authID = strings.TrimSpace(authID) + if provider == "" || authID == "" { + return + } + m.mixedWarmMu.Lock() + defer m.mixedWarmMu.Unlock() + delete(m.mixedWarmUntil, provider+"|"+authID) +} + func (m *Manager) persist(ctx context.Context, auth *Auth) error { if m.store == nil || auth == nil { return nil diff --git a/sdk/cliproxy/auth/conductor_overrides_test.go b/sdk/cliproxy/auth/conductor_overrides_test.go index 1b74aab17d..20a07dfbba 100644 --- a/sdk/cliproxy/auth/conductor_overrides_test.go +++ b/sdk/cliproxy/auth/conductor_overrides_test.go @@ -2,7 +2,10 @@ package auth import ( "context" + "encoding/json" + "io" "net/http" + "strings" "sync" "testing" "time" @@ -157,20 +160,28 @@ func (e *credentialRetryLimitExecutor) Calls() int { type authFallbackExecutor struct { id string - mu sync.Mutex - executeCalls []string - streamCalls []string - executeErrors map[string]error - streamFirstErrors map[string]error + mu sync.Mutex + executeCalls []string + executeModels []string + streamCalls []string + httpRequests []*http.Request + executeErrors map[string]error + streamFirstErrors map[string]error + httpRequestErr error + httpResponseStatus int + httpRequestSignal chan struct{} + httpRequestBlock chan struct{} + httpRequestDone chan struct{} } func (e *authFallbackExecutor) Identifier() string { return e.id } -func (e *authFallbackExecutor) Execute(_ context.Context, auth *Auth, _ cliproxyexecutor.Request, _ cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { +func (e *authFallbackExecutor) Execute(_ context.Context, auth *Auth, req cliproxyexecutor.Request, _ cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { e.mu.Lock() e.executeCalls = append(e.executeCalls, auth.ID) + e.executeModels = append(e.executeModels, req.Model) err := e.executeErrors[auth.ID] e.mu.Unlock() if err != nil { @@ -204,8 +215,54 @@ func (e *authFallbackExecutor) CountTokens(context.Context, *Auth, cliproxyexecu return cliproxyexecutor.Response{}, &Error{HTTPStatus: 500, Message: "not implemented"} } -func (e *authFallbackExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { - return nil, nil +func (e *authFallbackExecutor) HttpRequest(_ context.Context, _ *Auth, req *http.Request) (*http.Response, error) { + e.mu.Lock() + if req != nil { + e.httpRequests = append(e.httpRequests, req.Clone(req.Context())) + } + err := e.httpRequestErr + status := e.httpResponseStatus + signal := e.httpRequestSignal + block := e.httpRequestBlock + done := e.httpRequestDone + e.mu.Unlock() + if signal != nil { + select { + case signal <- struct{}{}: + default: + } + } + if block != nil && req != nil { + select { + case <-block: + case <-req.Context().Done(): + if done != nil { + select { + case done <- struct{}{}: + default: + } + } + return nil, req.Context().Err() + } + } + if done != nil { + select { + case done <- struct{}{}: + default: + } + } + if err != nil { + return nil, err + } + if status == 0 { + status = http.StatusOK + } + return &http.Response{ + Status: http.StatusText(status), + StatusCode: status, + Body: io.NopCloser(strings.NewReader(`{"id":"warmup"}`)), + Header: make(http.Header), + }, nil } func (e *authFallbackExecutor) ExecuteCalls() []string { @@ -216,6 +273,14 @@ func (e *authFallbackExecutor) ExecuteCalls() []string { return out } +func (e *authFallbackExecutor) ExecuteModels() []string { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]string, len(e.executeModels)) + copy(out, e.executeModels) + return out +} + func (e *authFallbackExecutor) StreamCalls() []string { e.mu.Lock() defer e.mu.Unlock() @@ -224,6 +289,20 @@ func (e *authFallbackExecutor) StreamCalls() []string { return out } +func (e *authFallbackExecutor) HTTPRequestCount() int { + e.mu.Lock() + defer e.mu.Unlock() + return len(e.httpRequests) +} + +func (e *authFallbackExecutor) HTTPRequests() []*http.Request { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]*http.Request, len(e.httpRequests)) + copy(out, e.httpRequests) + return out +} + type retryAfterStatusError struct { status int message string @@ -851,3 +930,662 @@ func TestManager_RequestScopedNotFoundStopsRetryWithoutSuspendingAuth(t *testing t.Fatalf("expected request-scoped 404 to avoid bad auth model cooldown state, got %#v", state) } } + +func TestManager_FillFirstWarmsOtherOpenAIProviderAsync(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + resp, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}) + if errExecute != nil { + t.Fatalf("execute: %v", errExecute) + } + if string(resp.Payload) != selectedAuth.ID { + t.Fatalf("payload = %q, want %q", string(resp.Payload), selectedAuth.ID) + } + + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected async warmup request") + } + + if warmExecutor.HTTPRequestCount() != 1 { + t.Fatalf("http request count = %d, want 1", warmExecutor.HTTPRequestCount()) + } + requests := warmExecutor.HTTPRequests() + if len(requests) != 1 { + t.Fatalf("http requests len = %d, want 1", len(requests)) + } + if got := requests[0].URL.String(); got != "https://warmed.example/chat/completions" { + t.Fatalf("warmup url = %q, want %q", got, "https://warmed.example/chat/completions") + } +} + +func TestManager_RoundRobinDoesNotWarmOtherProviders(t *testing.T) { + m := NewManager(nil, &RoundRobinSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + resp, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}) + if errExecute != nil { + t.Fatalf("execute: %v", errExecute) + } + if string(resp.Payload) != selectedAuth.ID { + t.Fatalf("payload = %q, want %q", string(resp.Payload), selectedAuth.ID) + } + select { + case <-signal: + t.Fatal("unexpected warmup request under round-robin") + case <-time.After(200 * time.Millisecond): + } + if warmExecutor.HTTPRequestCount() != 0 { + t.Fatalf("http request count = %d, want 0", warmExecutor.HTTPRequestCount()) + } +} + +func TestManager_WarmupFailureDoesNotAffectPrimaryRequest(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestErr: context.DeadlineExceeded} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + resp, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}) + if errExecute != nil { + t.Fatalf("execute: %v", errExecute) + } + if string(resp.Payload) != selectedAuth.ID { + t.Fatalf("payload = %q, want %q", string(resp.Payload), selectedAuth.ID) + } +} + +func TestManager_WarmupRespectsPinnedAuthMetadata(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + opts := cliproxyexecutor.Options{Metadata: map[string]any{cliproxyexecutor.PinnedAuthMetadataKey: selectedAuth.ID}} + resp, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, opts) + if errExecute != nil { + t.Fatalf("execute: %v", errExecute) + } + if string(resp.Payload) != selectedAuth.ID { + t.Fatalf("payload = %q, want %q", string(resp.Payload), selectedAuth.ID) + } + select { + case <-signal: + t.Fatal("unexpected warmup request for pinned auth session") + case <-time.After(200 * time.Millisecond): + } + if warmExecutor.HTTPRequestCount() != 0 { + t.Fatalf("http request count = %d, want 0", warmExecutor.HTTPRequestCount()) + } + if len(m.mixedWarmInFlight) != 0 { + t.Fatalf("expected no in-flight warmups for pinned auth session, got %d", len(m.mixedWarmInFlight)) + } +} + +func TestManager_WarmupDeduplicatesWithinTTL(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + if _, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute != nil { + t.Fatalf("first execute: %v", errExecute) + } + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected first async warmup request") + } + + if _, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute != nil { + t.Fatalf("second execute: %v", errExecute) + } + select { + case <-signal: + t.Fatal("unexpected second warmup request within ttl") + case <-time.After(200 * time.Millisecond): + } + + if warmExecutor.HTTPRequestCount() != 1 { + t.Fatalf("http request count = %d, want 1", warmExecutor.HTTPRequestCount()) + } + if len(m.mixedWarmInFlight) != 0 { + t.Fatalf("expected no in-flight warmups after ttl dedupe, got %d", len(m.mixedWarmInFlight)) + } +} + +func TestManager_WarmupSingleFlightsPerProvider(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + block := make(chan struct{}) + done := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal, httpRequestBlock: block, httpRequestDone: done} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + start := make(chan struct{}) + errCh := make(chan error, 2) + for i := 0; i < 2; i++ { + go func() { + <-start + _, err := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}) + errCh <- err + }() + } + close(start) + + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected async warmup request") + } + select { + case <-signal: + t.Fatal("unexpected second concurrent warmup request") + case <-time.After(200 * time.Millisecond): + } + + close(block) + for range 2 { + if err := <-errCh; err != nil { + t.Fatalf("execute: %v", err) + } + } + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("expected blocked warmup request to finish") + } + + if warmExecutor.HTTPRequestCount() != 1 { + t.Fatalf("http request count = %d, want 1", warmExecutor.HTTPRequestCount()) + } +} + +func TestManager_WarmupUsesIndependentTimeout(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + block := make(chan struct{}) + done := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal, httpRequestBlock: block, httpRequestDone: done} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + ctx, cancel := context.WithCancel(context.Background()) + if _, errExecute := m.Execute(ctx, []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute != nil { + t.Fatalf("execute: %v", errExecute) + } + cancel() + + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected async warmup request") + } + requests := warmExecutor.HTTPRequests() + if len(requests) != 1 { + t.Fatalf("http requests len = %d, want 1", len(requests)) + } + deadline, ok := requests[0].Context().Deadline() + if !ok { + t.Fatal("expected warmup request context to have a deadline") + } + remaining := time.Until(deadline) + if remaining <= 0 || remaining > mixedProviderWarmupTimeout { + t.Fatalf("warmup deadline remaining = %v, want within (0, %v]", remaining, mixedProviderWarmupTimeout) + } + + select { + case <-done: + case <-time.After(mixedProviderWarmupTimeout + 2*time.Second): + t.Fatal("expected warmup request to finish after timeout") + } + deadlineWait := time.Now().Add(500 * time.Millisecond) + for len(m.mixedWarmInFlight) != 0 && time.Now().Before(deadlineWait) { + time.Sleep(10 * time.Millisecond) + } + if len(m.mixedWarmInFlight) != 0 { + t.Fatalf("expected no in-flight warmups after timeout, got %d", len(m.mixedWarmInFlight)) + } +} + +func TestManager_ReserveMixedProviderWarmupCleansExpiredEntries(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + now := time.Now() + m.mixedWarmUntil["openai|expired"] = now.Add(-time.Second) + m.mixedWarmUntil["openai|active"] = now.Add(time.Second) + + if !m.reserveMixedProviderWarmup("openai", "fresh", now) { + t.Fatal("expected fresh warmup reservation to succeed") + } + if _, ok := m.mixedWarmUntil["openai|expired"]; ok { + t.Fatal("expected expired warmup entry to be removed") + } + if _, ok := m.mixedWarmUntil["openai|active"]; !ok { + t.Fatal("expected active warmup entry to remain") + } + if _, ok := m.mixedWarmUntil["openai|fresh"]; !ok { + t.Fatal("expected fresh warmup entry to be added") + } +} + +func TestManager_ExecuteCountDoesNotWarmOtherProviders(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + if _, errExecute := m.ExecuteCount(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute == nil { + t.Fatal("expected count tokens to use executor.CountTokens stub and return not implemented") + } + select { + case <-signal: + t.Fatal("unexpected warmup request during ExecuteCount") + case <-time.After(200 * time.Millisecond): + } + if warmExecutor.HTTPRequestCount() != 0 { + t.Fatalf("http request count = %d, want 0", warmExecutor.HTTPRequestCount()) + } +} + +func TestManager_TryStartMixedProviderWarmupDoesNotUseProviderWideTTL(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + now := time.Now() + m.mixedWarmUntil["openai|auth-a"] = now.Add(time.Second) + + if !m.tryStartMixedProviderWarmup("openai", "auth-b", now) { + t.Fatal("expected provider warmup gate to allow another auth while no warmup is in flight") + } + if _, ok := m.mixedWarmInFlight["openai"]; !ok { + t.Fatal("expected in-flight gate to be set for provider") + } + m.finishMixedProviderWarmup("openai") + if len(m.mixedWarmInFlight) != 0 { + t.Fatalf("expected in-flight gate to clear, got %d entries", len(m.mixedWarmInFlight)) + } +} + +func TestManager_TryStartMixedProviderWarmupSkipsSameAuthWithinTTL(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + now := time.Now() + m.mixedWarmUntil["openai|auth-a"] = now.Add(time.Second) + + if m.tryStartMixedProviderWarmup("openai", "auth-a", now) { + t.Fatal("expected same auth warmup to be skipped within ttl") + } + if _, ok := m.mixedWarmInFlight["openai"]; ok { + t.Fatal("expected in-flight gate to remain unset when ttl blocks warmup") + } +} + +func TestManager_WarmupDoesNotAdvanceModelPoolOffset(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + cfg := &internalconfig.Config{ + OpenAICompatibility: []internalconfig.OpenAICompatibility{{ + Name: "openai", + Models: []internalconfig.OpenAICompatibilityModel{ + {Name: "qwen3.5-plus", Alias: "claude-opus-4.66"}, + {Name: "glm-5", Alias: "claude-opus-4.66"}, + }, + }}, + } + m.SetConfig(cfg) + + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "claude-opus-4.66" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ + ID: "bb-warmed", + Provider: "openai", + Attributes: map[string]string{ + "api_key": "test-key", + "base_url": "https://warmed.example", + "compat_name": "openai", + "provider_key": "openai", + }, + } + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + if _, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute != nil { + t.Fatalf("trigger warmup execute: %v", errExecute) + } + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected async warmup request") + } + + if got := warmExecutor.HTTPRequests(); len(got) != 1 { + t.Fatalf("http requests len = %d, want 1", len(got)) + } else if reqModel := strings.TrimSpace(readJSONFieldFromBody(t, got[0], "model")); reqModel != "qwen3.5-plus" { + t.Fatalf("warmup model = %q, want %q", reqModel, "qwen3.5-plus") + } + + resp, errExecute := m.Execute(context.Background(), []string{"openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}) + if errExecute != nil { + t.Fatalf("direct execute after warmup: %v", errExecute) + } + if string(resp.Payload) != warmedAuth.ID { + t.Fatalf("payload = %q, want %q", string(resp.Payload), warmedAuth.ID) + } + gotModels := warmExecutor.ExecuteModels() + if len(gotModels) != 1 { + t.Fatalf("execute models len = %d, want 1", len(gotModels)) + } + if gotModels[0] != "qwen3.5-plus" { + t.Fatalf("first real execute model = %q, want %q", gotModels[0], "qwen3.5-plus") + } + key := openAICompatModelPoolKey(warmedAuth, model) + if gotOffset := m.currentModelPoolOffset(key, 2); gotOffset != 1 { + t.Fatalf("model pool offset = %d, want 1 after first real execute", gotOffset) + } +} + +func TestManager_WarmupStripsThinkingSuffixFromPayload(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1(8192)" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + if _, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute != nil { + t.Fatalf("trigger warmup execute: %v", errExecute) + } + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected async warmup request") + } + + requests := warmExecutor.HTTPRequests() + if len(requests) != 1 { + t.Fatalf("http requests len = %d, want 1", len(requests)) + } + if reqModel := strings.TrimSpace(readJSONFieldFromBody(t, requests[0], "model")); reqModel != "gpt-4.1" { + t.Fatalf("warmup model = %q, want %q", reqModel, "gpt-4.1") + } +} + +func TestManager_WarmupClearsTTLOnNon2xxResponse(t *testing.T) { + m := NewManager(nil, &FillFirstSelector{}, nil) + signal := make(chan struct{}, 4) + primaryExecutor := &authFallbackExecutor{id: "claude"} + warmExecutor := &authFallbackExecutor{id: "openai", httpRequestSignal: signal, httpResponseStatus: http.StatusUnauthorized} + m.RegisterExecutor(primaryExecutor) + m.RegisterExecutor(warmExecutor) + + model := "gpt-4.1" + selectedAuth := &Auth{ID: "aa-selected", Provider: "claude"} + warmedAuth := &Auth{ID: "bb-warmed", Provider: "openai", Attributes: map[string]string{"base_url": "https://warmed.example"}} + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(selectedAuth.ID, "claude", []*registry.ModelInfo{{ID: model}}) + reg.RegisterClient(warmedAuth.ID, "openai", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { + reg.UnregisterClient(selectedAuth.ID) + reg.UnregisterClient(warmedAuth.ID) + }) + + if _, errRegister := m.Register(context.Background(), selectedAuth); errRegister != nil { + t.Fatalf("register selected auth: %v", errRegister) + } + if _, errRegister := m.Register(context.Background(), warmedAuth); errRegister != nil { + t.Fatalf("register warmed auth: %v", errRegister) + } + + if _, errExecute := m.Execute(context.Background(), []string{"claude", "openai"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}); errExecute != nil { + t.Fatalf("trigger warmup execute: %v", errExecute) + } + select { + case <-signal: + case <-time.After(2 * time.Second): + t.Fatal("expected async warmup request") + } + + deadlineWait := time.Now().Add(500 * time.Millisecond) + for time.Now().Before(deadlineWait) { + m.mixedWarmMu.Lock() + _, ok := m.mixedWarmUntil["openai|bb-warmed"] + m.mixedWarmMu.Unlock() + if !ok { + break + } + time.Sleep(10 * time.Millisecond) + } + m.mixedWarmMu.Lock() + _, ok := m.mixedWarmUntil["openai|bb-warmed"] + m.mixedWarmMu.Unlock() + if ok { + t.Fatal("expected non-2xx warmup to clear TTL reservation") + } + if !m.reserveMixedProviderWarmup("openai", "bb-warmed", time.Now()) { + t.Fatal("expected auth to be warmable again after failed warmup") + } +} + +func readJSONFieldFromBody(t *testing.T, req *http.Request, field string) string { + t.Helper() + if req == nil || req.Body == nil { + t.Fatal("expected request body") + } + body, err := io.ReadAll(req.Body) + if err != nil { + t.Fatalf("read request body: %v", err) + } + var payload map[string]any + if err := json.Unmarshal(body, &payload); err != nil { + t.Fatalf("unmarshal request body: %v", err) + } + value, _ := payload[field].(string) + return value +}