Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions app/core/clients/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,18 +944,34 @@ def _is_native_codex_originator(originator: str | None) -> bool:
return stripped in _NATIVE_CODEX_ORIGINATORS


def _payload_uses_image_generation_tool(payload: Mapping[str, JsonValue]) -> bool:
tools = payload.get("tools")
if not isinstance(tools, list):
return False
for tool in tools:
if not isinstance(tool, dict):
continue
tool_type = tool.get("type")
if tool_type == "image_generation":
return True
return False


def _resolve_stream_transport(
*,
transport: str,
transport_override: str | None,
model: str | None,
headers: Mapping[str, str],
has_image_generation_tool: bool = False,
) -> str:
configured = _configured_stream_transport(transport=transport, transport_override=transport_override)
if configured == "websocket":
return "websocket"
if configured == "http":
return "http"
if has_image_generation_tool:
return "http"
if _has_native_codex_transport_headers(headers):
return "websocket"

Expand Down Expand Up @@ -1598,6 +1614,7 @@ async def stream_responses(
transport_override=upstream_stream_transport_override,
model=payload.model,
headers=headers,
has_image_generation_tool=_payload_uses_image_generation_tool(payload_dict),
)
if transport == "websocket":
upstream_headers = _build_upstream_websocket_headers(headers, access_token, account_id)
Expand Down
6 changes: 5 additions & 1 deletion app/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ class Settings(BaseSettings):
compact_request_budget_seconds: float = Field(default=75.0, gt=0)
stream_idle_timeout_seconds: float = 300.0
proxy_downstream_websocket_idle_timeout_seconds: float = Field(default=120.0, gt=0)
max_sse_event_bytes: int = Field(default=2 * 1024 * 1024, gt=0)
# Applies to both upstream SSE event buffering and upstream websocket message
# frames. Keep the default aligned with the common 16 MiB websocket ceiling so
# large built-in tool payloads (for example image_generation outputs) do not
# fail locally with a 1009 before upstream completion.
max_sse_event_bytes: int = Field(default=16 * 1024 * 1024, gt=0)
auth_base_url: str = "https://auth.openai.com"
oauth_client_id: str = "app_EMoamEEZ73f0CkXaXp7hrann"
oauth_originator: str = "codex_chatgpt_desktop"
Expand Down
9 changes: 6 additions & 3 deletions app/core/openai/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def normalize_tool_choice(choice: JsonValue | None) -> JsonValue | None:
return choice


def validate_tool_types(tools: list[JsonValue]) -> list[JsonValue]:
def validate_tool_types(tools: list[JsonValue], *, allow_builtin_tools: bool = False) -> list[JsonValue]:
normalized_tools: list[JsonValue] = []
for tool in tools:
if not is_json_mapping(tool):
Expand All @@ -83,7 +83,7 @@ def validate_tool_types(tools: list[JsonValue]) -> list[JsonValue]:
tool = dict(tool_mapping)
tool["type"] = normalized_type
tool_type = normalized_type
if tool_type in UNSUPPORTED_TOOL_TYPES:
if not allow_builtin_tools and tool_type in UNSUPPORTED_TOOL_TYPES:
raise ValueError(f"Unsupported tool type: {tool_type}")
normalized_tools.append(tool)
return normalized_tools
Expand Down Expand Up @@ -379,7 +379,7 @@ def _normalize_previous_response_id(cls, value: str | None) -> str | None:
@field_validator("tools")
@classmethod
def _validate_tools(cls, value: list[JsonValue]) -> list[JsonValue]:
return validate_tool_types(value)
return validate_tool_types(value, allow_builtin_tools=True)

@field_validator("tool_choice")
@classmethod
Expand Down Expand Up @@ -511,6 +511,9 @@ def _sort_keys_recursive(value: JsonValue) -> JsonValue:
def _strip_compact_unsupported_fields(payload: MutableJsonObject) -> MutableJsonObject:
payload = _strip_unsupported_fields(payload)
payload.pop("store", None)
payload.pop("tools", None)
payload.pop("tool_choice", None)
payload.pop("parallel_tool_calls", None)
return payload


Expand Down
2 changes: 1 addition & 1 deletion app/core/openai/v1_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _ensure_store_false(cls, value: bool | None) -> bool | None:
@field_validator("tools")
@classmethod
def _validate_tools(cls, value: list[JsonValue]) -> list[JsonValue]:
return validate_tool_types(value)
return validate_tool_types(value, allow_builtin_tools=True)

@model_validator(mode="after")
def _validate_input(self) -> "V1ResponsesRequest":
Expand Down
72 changes: 56 additions & 16 deletions app/modules/proxy/load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ async def load_selection_inputs() -> _SelectionInputs:
runtime=self._runtime,
)

result = select_account(
result = _select_account_preferring_budget_safe(
states,
prefer_earlier_reset=prefer_earlier_reset_accounts,
routing_strategy=routing_strategy,
budget_threshold_pct=budget_threshold_pct,
)

selected_account_map = account_map
Expand Down Expand Up @@ -645,10 +646,11 @@ async def _select_with_stickiness(
sticky_repo: StickySessionsRepository | None,
) -> SelectionResult:
if not sticky_key or not sticky_repo:
return select_account(
return _select_account_preferring_budget_safe(
states,
prefer_earlier_reset=prefer_earlier_reset_accounts,
routing_strategy=routing_strategy,
budget_threshold_pct=budget_threshold_pct,
)
if sticky_kind is None:
raise ValueError("sticky_kind is required when sticky_key is provided")
Expand All @@ -670,22 +672,24 @@ async def _select_with_stickiness(
if existing:
pinned = next((state for state in states if state.account_id == existing), None)
if pinned is not None:
# Check if pinned account has insufficient budget (< 5% remaining)
# or rate limit is far away (reset_at more than 10 minutes away)
# Proactively rebind session affinity for prompt-cache and
# codex sessions once the pinned account is already above the
# configured budget threshold. That preserves continuity below
# the threshold while avoiding obvious short-window failures
# once the session is skating on the edge of exhaustion.
now = time.time()
budget_exhausted = (
sticky_kind == StickySessionKind.PROMPT_CACHE
budget_pressured = (
sticky_kind in (StickySessionKind.PROMPT_CACHE, StickySessionKind.CODEX_SESSION)
and pinned.status != AccountStatus.RATE_LIMITED
and pinned.used_percent is not None
and pinned.used_percent > budget_threshold_pct
and _state_above_budget_threshold(pinned, budget_threshold_pct)
)
Comment thread
mhughdo marked this conversation as resolved.
rate_limit_far_away = (
sticky_kind == StickySessionKind.PROMPT_CACHE
and pinned.status == AccountStatus.RATE_LIMITED
and pinned.reset_at is not None
and pinned.reset_at - now >= 600 # 10 minutes
)
if not (budget_exhausted or rate_limit_far_away):
if not (budget_pressured or rate_limit_far_away):
pinned_result = select_account(
[pinned],
prefer_earlier_reset=prefer_earlier_reset_accounts,
Expand All @@ -702,19 +706,17 @@ async def _select_with_stickiness(
# is above the budget threshold, reallocating just
# wastes DB writes and destroys prompt-cache locality
# (thrashing).
if budget_exhausted:
pool_best = select_account(
if budget_pressured:
pool_best = _select_account_preferring_budget_safe(
states,
prefer_earlier_reset=prefer_earlier_reset_accounts,
routing_strategy=routing_strategy,
deterministic_probe=True,
budget_threshold_pct=budget_threshold_pct,
)
pool_also_exhausted = pool_best.account is not None and (
pool_best.account.account_id == pinned.account_id
or (
pool_best.account.used_percent is not None
and pool_best.account.used_percent > budget_threshold_pct
)
or _state_above_budget_threshold(pool_best.account, budget_threshold_pct)
)
if pool_also_exhausted:
pinned_result = select_account(
Expand Down Expand Up @@ -769,10 +771,11 @@ async def _select_with_stickiness(
else:
await sticky_repo.delete(sticky_key, kind=sticky_kind)

chosen = select_account(
chosen = _select_account_preferring_budget_safe(
states,
prefer_earlier_reset=prefer_earlier_reset_accounts,
routing_strategy=routing_strategy,
budget_threshold_pct=budget_threshold_pct,
)
if persist_fallback and chosen.account is not None and chosen.account.account_id in account_map:
await sticky_repo.upsert(sticky_key, chosen.account.account_id, kind=sticky_kind)
Expand Down Expand Up @@ -1295,6 +1298,43 @@ def _additional_usage_is_exhausted(entry: AdditionalUsageHistory) -> bool:
return float(entry.used_percent) >= 100.0


def _state_above_budget_threshold(state: AccountState, budget_threshold_pct: float) -> bool:
return any(
used_percent is not None and used_percent > budget_threshold_pct
for used_percent in (state.used_percent, state.secondary_used_percent)
)


def _select_account_preferring_budget_safe(
states: Iterable[AccountState],
*,
prefer_earlier_reset: bool,
routing_strategy: RoutingStrategy,
budget_threshold_pct: float,
allow_backoff_fallback: bool = True,
deterministic_probe: bool = False,
) -> SelectionResult:
state_list = list(states)
preferred_states = [state for state in state_list if not _state_above_budget_threshold(state, budget_threshold_pct)]
if preferred_states and len(preferred_states) != len(state_list):
preferred = select_account(
preferred_states,
prefer_earlier_reset=prefer_earlier_reset,
routing_strategy=routing_strategy,
allow_backoff_fallback=allow_backoff_fallback,
deterministic_probe=deterministic_probe,
)
if preferred.account is not None:
return preferred
return select_account(
state_list,
prefer_earlier_reset=prefer_earlier_reset,
routing_strategy=routing_strategy,
allow_backoff_fallback=allow_backoff_fallback,
deterministic_probe=deterministic_probe,
)


def _is_upstream_circuit_breaker_open() -> bool:
settings = get_settings()
if not getattr(settings, "circuit_breaker_enabled", False):
Expand Down
15 changes: 15 additions & 0 deletions openspec/changes/raise-upstream-event-size-limit/proposal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Proposal: raise-upstream-event-size-limit

## Why

Recent Codex Desktop builds can request built-in tools such as `image_generation`, which may produce large upstream Responses events. The proxy currently caps upstream SSE events and websocket message frames at 2 MiB, which is too low for legitimate image payloads and causes local websocket `1009 message too big` disconnects before `response.completed`.

## What Changes

- Raise the default upstream Responses event/message size limit from 2 MiB to 16 MiB.
- Keep the existing configuration knob (`max_sse_event_bytes`) so operators can still override the limit.

## Impact

- Prevents local `1009` disconnects for large but valid Responses tool outputs.
- Aligns the default limit with the common 16 MiB websocket ceiling already assumed by the proxy's `response.create` budget logic.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## MODIFIED Requirements
### Requirement: Upstream Responses event size budget
The service SHALL allow upstream Responses SSE events and upstream websocket message frames up to 16 MiB by default before treating them as oversized.

#### Scenario: built-in tool output exceeds the old 2 MiB limit
- **WHEN** upstream Responses traffic includes a single SSE event or websocket message frame larger than 2 MiB but not larger than 16 MiB
- **THEN** the proxy continues processing the event instead of closing the upstream websocket locally with `1009 message too big`
8 changes: 8 additions & 0 deletions openspec/changes/raise-upstream-event-size-limit/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## 1. Implementation

- [x] 1.1 Raise the default upstream Responses event/message size limit to 16 MiB.

## 2. Verification

- [x] 2.1 Add or update settings coverage for the new default.
- [x] 2.2 Run targeted pytest, ruff, and `openspec validate --specs`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## Why

Backend Codex routes use durable `codex_session` stickiness when the client sends a `session_id` header. Today that stickiness preserves continuity too aggressively: if the pinned account remains `ACTIVE` but its short-window usage is already above the sticky reallocation threshold, selection still keeps routing the session there until upstream starts returning `usage_limit_reached`.

Separately, fresh selection still lets near-exhausted active accounts compete with budget-safe accounts until a hard failure occurs. In production this surfaces as repeated compact and websocket failures on accounts whose latest local primary-window usage is already in the `97-99%` range, even while other active accounts still have healthy short-window budget.

## What Changes

- extend proactive sticky reallocation to durable backend `codex_session` mappings when the pinned account is above the configured budget threshold and a healthier candidate exists
- prefer budget-safe Responses routing candidates over already-pressured candidates whenever at least one budget-safe candidate exists
- keep existing durable `codex_session` behavior below that threshold
- add regression coverage for backend Codex responses + compact routing with `session_id`

## Impact

- backend Codex sessions may rebind to a different account slightly earlier, before the pinned account hard-fails upstream on short-window budget exhaustion
- fresh Responses requests will prefer accounts that are still below the configured budget threshold instead of spending more attempts on near-exhausted accounts first
- OpenAI `/v1` routes still do not create durable `codex_session` mappings from `session_id`, but they do benefit from the same budget-safe fresh-selection preference
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## MODIFIED Requirements
### Requirement: Responses routing prefers budget-safe accounts
When serving Responses routes, the service MUST prefer eligible accounts that are still below the configured budget threshold over eligible accounts already above that threshold. If no below-threshold candidate exists, the service MAY fall back to the pressured candidates.

#### Scenario: Fresh Responses request avoids a near-exhausted account
- **WHEN** `/backend-api/codex/responses`, `/backend-api/codex/responses/compact`, `/v1/responses`, or `/v1/responses/compact` selects among multiple eligible active accounts
- **AND** one candidate is above the configured budget threshold
- **AND** another candidate remains below that threshold
- **THEN** the below-threshold candidate is chosen first
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
## MODIFIED Requirements
### Requirement: Sticky sessions are explicitly typed
The system SHALL persist each sticky-session mapping with an explicit kind so durable Codex backend affinity, durable dashboard sticky-thread routing, and bounded prompt-cache affinity can be managed independently.

#### Scenario: Backend Codex session affinity is stored as durable
- **WHEN** a backend Codex request creates or refreshes stickiness from `session_id`
- **THEN** the stored mapping kind is `codex_session`

#### Scenario: Backend Codex session rebinds under budget pressure
- **WHEN** a backend Codex request resolves an existing `codex_session` mapping
- **AND** the pinned account is above the configured sticky reallocation budget threshold
- **AND** another eligible account remains below that threshold
- **THEN** selection rebinds the durable `codex_session` mapping to the healthier account before sending the request upstream
11 changes: 11 additions & 0 deletions openspec/changes/reallocate-codex-session-budget-pressure/tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## 1. Implementation

- [x] 1.1 Update sticky selection so backend `codex_session` mappings reallocate when the pinned account is above the sticky budget threshold and a healthier candidate exists
- [x] 1.2 Preserve existing durable `codex_session` behavior when the pinned account is still below the threshold
- [x] 1.3 Prefer budget-safe Responses routing candidates over pressured candidates when any budget-safe option exists

## 2. Verification

- [x] 2.1 Add integration coverage for backend Codex `session_id` routing that proves reallocation above threshold
- [x] 2.2 Run the affected sticky-session integration tests
- [x] 2.3 Add targeted selection coverage for fresh routing that proves a budget-safe account wins over a pressured one
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-04-17
Loading
Loading