Production-ready orchestration for OpenAI Agents SDK with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools.
You've built an AI agent. It works great in development. Now you need to run it in production:
Customer Support Automation - A user submits a ticket. Your agent needs to research their account, check previous interactions, and draft a response. This takes 2-3 minutes. You can't block the HTTP request.
Document Processing Pipeline - Users upload contracts for analysis. Each document needs OCR, entity extraction, clause identification, and risk scoring. You need to process dozens concurrently while tracking progress.
Research & Reporting - Your agent researches companies, gathers data from multiple sources, and generates reports. Users need to see "Gathering financials... 40%" not just a spinning loader.
Multi-Agent Workflows - One agent discovers leads, fans out to research each one, then a final agent aggregates results. You need coordination, not chaos.
Running AI agents in production requires:
- Background execution - Agents take minutes; users shouldn't wait
- Progress tracking - Know what your agents are doing in real-time
- Fault tolerance - Handle failures gracefully with full error traces
- Scalability - Process multiple tasks across worker processes
- Observability - Complete audit trail of agent activities
- User interfaces - Components to build status dashboards and CLI monitors
agentexec provides all of this out of the box.
uv add agentexecRequirements:
- Python 3.12+
- Redis
- SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite)
A typical agentexec application has a few files working together. Here's a complete working example showing each part:
# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
ax.CONF.redis_url = "redis://localhost:6379/0"
engine = create_engine("sqlite:///agents.db")
ax.Base.metadata.create_all(engine) # Creates activity tracking tables
def get_db():
with Session(engine) as db:
yield db# worker.py
from uuid import UUID
from pydantic import BaseModel
from agents import Agent
import agentexec as ax
from .db import engine
class ResearchContext(BaseModel):
company: str
pool = ax.Pool(engine=engine)
@pool.task("research_company")
async def research_company(agent_id: UUID, context: ResearchContext) -> str:
runner = ax.OpenAIRunner(agent_id)
agent = Agent(
name="Research Agent",
instructions=f"Research {context.company}. {runner.prompts.report_status}",
tools=[runner.tools.report_status], # Agent can report its own progress
model="gpt-4o",
)
result = await runner.run(agent, input="Begin research")
return result.final_output
if __name__ == "__main__":
pool.run()# views.py
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
import agentexec as ax
from .worker import ResearchContext
from .db import get_db
router = APIRouter()
@router.post("/research")
async def start_research(company: str) -> dict:
task = await ax.enqueue("research_company", ResearchContext(company=company))
return {"agent_id": str(task.agent_id), "status": "queued"} # Return agent_id for status polling
@router.get("/research/{agent_id}")
async def get_status(agent_id: UUID) -> ax.activity.ActivityDetailSchema:
return await ax.activity.detail(agent_id=agent_id)python worker.pyThat's it. Tasks are queued to Redis, workers process them in parallel, progress is tracked in your database, and your API stays responsive.
Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation:
task = await ax.enqueue(
"process_document",
context,
metadata={"organization_id": "org-123", "user_id": "user-456"}
)
# Filter activities by metadata
activities = await ax.activity.list(metadata_filter={"organization_id": "org-123"})
detail = await ax.activity.detail(agent_id=agent_id, metadata_filter={"organization_id": "org-123"})
# Access metadata programmatically (excluded from API serialization by default)
org_id = detail.metadata["organization_id"]Every task gets full lifecycle tracking without manual updates:
runner = ax.OpenAIRunner(agent_id=agent_id)
result = await runner.run(agent, input="...")
# Activity automatically transitions:
# QUEUED → RUNNING → COMPLETE (or ERROR on failure)Agents can report their own progress:
agent = Agent(
instructions=f"Do research. {runner.prompts.report_status}",
tools=[runner.tools.report_status],
)
# Agent calls: report_status("Analyzing financials", 60)Update progress explicitly from your task:
await ax.activity.update(agent_id, "Processing batch 3 of 10", percentage=30)When multiple tasks of the same type are queued for the same user, they may need to run sequentially because each task reads and writes shared state. Use lock_key to ensure only one task with the same evaluated key runs at a time:
@pool.task("associate_observation", lock_key="user:{user_id}")
async def associate(agent_id: UUID, context: ObservationContext):
...
# Or with add_task()
pool.add_task("associate_observation", handler, lock_key="user:{user_id}")The lock_key is a string template evaluated against the task context fields. Tasks with the same evaluated lock key are routed to a dedicated partition queue ({prefix}:{lock_key}) where they execute one at a time. Workers skip locked partitions and move on to the next available one — no requeuing, no wasted cycles.
The lock is released automatically when a task completes or errors. The lock TTL (AGENTEXEC_LOCK_TTL, default 1800s) is a safety net for worker process death (OOM, SIGKILL) — under normal operation, locks are always explicitly released. Set this higher than your longest expected task duration.
Run tasks on a recurring interval using cron expressions:
# Decorator — registers the task and schedules it in one step
@pool.schedule("refresh_cache", "*/5 * * * *")
async def refresh(agent_id: UUID, context: RefreshContext):
...
# With context and repeat limit
@pool.schedule("sync_users", "0 * * * *", context=SyncContext(full=True), repeat=3)
async def sync(agent_id: UUID, context: SyncContext):
...For tasks registered separately, use pool.add_schedule():
pool.add_schedule("refresh_cache", "*/5 * * * *", RefreshContext(scope="all"))
pool.add_schedule("refresh_cache", "0 * * * *", RefreshContext(scope="users"), repeat=3)The scheduler runs automatically inside pool.run(). Cron expressions are evaluated in the configured timezone (AGENTEXEC_SCHEDULER_TIMEZONE, default UTC) so schedules read naturally regardless of server timezone. Next-run times are computed from the intended anchor time, not wall clock, to prevent cumulative drift.
Control task execution order:
await ax.enqueue("urgent_task", context, priority=ax.Priority.HIGH) # Front of queue
await ax.enqueue("batch_job", context, priority=ax.Priority.LOW) # Back of queueGracefully handle conversation limits:
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Please summarize your findings.",
)
result = await runner.run(agent, max_turns=15)
# If agent hits max turns, automatically continues with wrap-upOrchestrate complex workflows with parallel execution:
import asyncio
pipeline = ax.Pipeline(pool)
class ResearchPipeline(pipeline.Base):
@pipeline.step(0, "parallel research")
async def gather_data(self, context: InputContext) -> tuple[BrandResult, MarketResult]:
return await asyncio.gather(
research_brand(context),
research_market(context),
)
@pipeline.step(1, "analysis")
async def analyze(self, brand: BrandResult, market: MarketResult) -> FinalReport:
return await analyze_results(brand, market)
# Queue pipeline
task = await pipeline.enqueue(context=InputContext(company="Anthropic"))Coordinate dynamically-queued tasks:
tracker = ax.Tracker("research", batch_id)
@function_tool
async def queue_research(company: str) -> None:
"""Discovery agent calls this for each company found."""
tracker.incr()
await ax.enqueue("research", ResearchContext(company=company, batch_id=batch_id))
@function_tool
async def save_result(result: ResearchResult) -> None:
"""Research agent calls this when done."""
save_to_database(result)
tracker.decr()
if tracker.complete:
await ax.enqueue("aggregate", AggregateContext(batch_id=batch_id))If you have an existing FastAPI/Flask/Django backend, run the worker pool in a separate process:
# main.py - Your API server
from fastapi import FastAPI
import agentexec as ax
app = FastAPI()
@app.post("/process")
async def process(data: str) -> dict:
task = await ax.enqueue("process_data", ProcessContext(data=data))
return {"agent_id": task.agent_id}# worker.py - Run separately
from .tasks import pool
if __name__ == "__main__":
pool.run()Terminal 1: Start your API server
uvicorn main:appTerminal 2: Start the workers
python worker.py# worker.py
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)
@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
# Your task implementation
pass
if __name__ == "__main__":
try:
pool.run()
except KeyboardInterrupt:
asyncio.run(ax.activity.cancel_pending())1. Create your worker Dockerfile:
# Dockerfile.worker
FROM ghcr.io/agent-ci/agentexec-worker:latest
COPY ./src /app/src
ENV AGENTEXEC_WORKER_MODULE=src.worker2. Create your worker module:
# src/worker.py
import atexit
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)
async def cleanup() -> None:
await ax.activity.cancel_pending()
atexit.register(lambda: asyncio.run(cleanup()))
@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
pass3. Build and run:
docker build -f Dockerfile.worker -t my-worker .docker run -e DATABASE_URL=... -e REDIS_URL=... -e OPENAI_API_KEY=... my-workeragentexec uses Redis for task queuing, result storage, and coordination between workers. The queue uses a partitioned design where tasks with a lock_key go to dedicated partition queues ({prefix}:{lock_key}) and are serialized by a lock, while tasks without a lock key go to the default queue for concurrent processing.
Workers dequeue using Redis SCAN, which iterates keys in hash-table order — effectively random. This provides fair distribution across partitions without explicit round-robin. See examples/queue-fairness/ for benchmarks showing uniform distribution at 1000+ partitions.
AWS Compatible: Standard Redis features only — AWS ElastiCache works out of the box.
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# or
AGENTEXEC_REDIS_URL=redis://my-cluster.abc123.use1.cache.amazonaws.com:6379Kafka can be used as an alternative backend for task queuing and schedule storage. Activity tracking always uses PostgreSQL regardless of backend — Kafka is not a KV store, so state operations (get/set, counters) are not supported and will raise NotImplementedError.
pip install agentexec[kafka]
AGENTEXEC_STATE_BACKEND=agentexec.state.kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092Kafka uses consumer groups for work distribution instead of Redis's scan-based dequeue. Topics are auto-created on first use. Schedule storage uses a compacted topic that is replayed on each poll.
When to consider Kafka:
- You already run Kafka and want to avoid adding Redis
- You need durable, replayable task queues with built-in replication
- You want partition-level ordering guarantees (tasks with the same key go to the same partition)
Limitations:
- No KV state —
backend.state.get/set/deleteand counters raiseNotImplementedError - No partition-level locking (Kafka partition assignment handles isolation instead)
- Schedule
get_due()replays the entire compacted topic on every poll lock_keyis used as a Kafka partition key (routing), not as a mutex
See Kafka configuration below for all available settings.
The state backend is pluggable. Implement BaseBackend with state, queue, and schedule sub-backends:
AGENTEXEC_STATE_BACKEND=agentexec.state.redis # Default
AGENTEXEC_STATE_BACKEND=agentexec.state.kafka # Experimental
AGENTEXEC_STATE_BACKEND=myapp.state.custom # Custom (must export Backend class)Activity tracking uses SQLAlchemy with two tables (always PostgreSQL/SQLite, independent of the state backend):
agentexec_activity - Main activity records
agent_id- Unique identifier (UUID)agent_type- Task namemetadata- JSON field for custom data (e.g., tenant info)created_at,updated_at- Timestamps
agentexec_activity_log - Status and progress
activity_id- Foreign keymessage- Log messagestatus- QUEUED, RUNNING, COMPLETE, ERROR, CANCELEDpercentage- Progress (0-100)
The activity tracking module exposes Pydantic schemas for building APIs:
from agentexec.activity.schemas import (
ActivityListSchema, # Paginated list response
ActivityListItemSchema, # Single item in list (lightweight)
ActivityDetailSchema, # Full activity with log history
ActivityLogSchema, # Single log entry
)List activities:
result = await ax.activity.list(page=1, page_size=20)
# Returns ActivityListSchema:
# {
# "items": [...], # List of ActivityListItemSchema
# "total": 150,
# "page": 1,
# "page_size": 20,
# "total_pages": 8
# }Get activity detail:
activity = await ax.activity.detail(agent_id=agent_id)
# Returns ActivityDetailSchema:
# {
# "agent_id": "...",
# "agent_type": "research_company",
# "created_at": "2024-01-15T10:30:00Z",
# "updated_at": "2024-01-15T10:32:45Z",
# "logs": [
# {"status": "queued", "message": "Waiting to start", "percentage": 0, ...},
# {"status": "running", "message": "Gathering data", "percentage": 30, ...},
# {"status": "complete", "message": "Done", "percentage": 100, ...}
# ]
# }Count active agents:
count = await ax.activity.count_active()
# Returns number of agents with status QUEUED or RUNNING# cli_monitor.py
from rich.live import Live
from rich.table import Table
from sqlalchemy import Engine
from sqlalchemy.orm import Session
import agentexec as ax
def build_table(db: Session) -> Table:
count = asyncio.run(ax.activity.count_active())
table = Table(title=f"Active Agents: {count}")
table.add_column("Status")
table.add_column("Task")
table.add_column("Message")
table.add_column("Progress")
activities = asyncio.run(ax.activity.list(page=1, page_size=10))
for item in activities.items:
table.add_row(
item.status,
item.agent_type,
item.latest_log_message or "",
f"{item.percentage}%",
)
return table
def monitor(engine: Engine) -> None:
with Live(refresh_per_second=1) as live:
while True:
with Session(engine) as db:
live.update(build_table(db))
if __name__ == "__main__":
from .db import engine
monitor(engine)The agentexec-ui package provides React components for building monitoring interfaces:
npm install agentexec-uiimport {
TaskList,
TaskDetail,
ActiveAgentsBadge,
StatusBadge,
ProgressBar,
} from 'agentexec-ui';
// Display paginated task list
<TaskList
items={activities.items}
loading={isLoading}
onTaskClick={(agentId) => setSelected(agentId)}
selectedAgentId={selectedId}
/>
// Full activity detail view
<TaskDetail
activity={activityDetail}
loading={isDetailLoading}
error={error}
onClose={() => setSelected(null)}
/>
// Active count badge
<ActiveAgentsBadge count={activeCount} loading={isLoading} />
// Individual status indicators
<StatusBadge status="running" />
<ProgressBar percentage={65} status="running" />import type {
Status, // 'queued' | 'running' | 'complete' | 'error' | 'canceled'
ActivityLog,
ActivityDetail,
ActivityListItem,
ActivityList,
} from 'agentexec-ui';These types mirror the Python API schemas (ActivityDetailSchema, ActivityListSchema, etc.), so your API responses integrate directly with the components.
The components are headless (no built-in styling) and work with any CSS framework. See examples/openai-agents-fastapi/ui/ for a complete React app with TanStack Query integration.
import agentexec as ax
task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW)
result = await ax.get_result(task, timeout=300)
results = await ax.gather(task1, task2, task3)import agentexec as ax
pool = ax.Pool(engine=engine)
pool = ax.Pool(database_url="postgresql://...")
@pool.task("name")
async def handler(agent_id: UUID, context: MyContext) -> None: ...
@pool.task("name", lock_key="user:{user_id}") # Sequential per user
async def locked(agent_id: UUID, context: MyContext) -> None: ...
@pool.schedule("name", "*/5 * * * *") # Register + schedule in one step
async def scheduled(agent_id: UUID, context: MyContext) -> None: ...
pool.add_schedule("name", "0 * * * *", MyContext(), repeat=3) # Schedule separately
pool.run() # Blocking - runs workers + scheduler + retry handling
pool.start() # Non-blocking - starts workers in background
pool.shutdown() # Graceful shutdownimport agentexec as ax
# Create activity (returns agent_id for tracking)
agent_id = await ax.activity.create(task_name, message="Starting...")
# Update progress
await ax.activity.update(agent_id, message, percentage=50)
await ax.activity.complete(agent_id, message="Done")
await ax.activity.error(agent_id, message="Failed: ...")
# Query activities (uses database session)
activities = await ax.activity.list(page=1, page_size=20)
activity = await ax.activity.detail(agent_id=agent_id)
count = await ax.activity.count_active()
# Cleanup
canceled = await ax.activity.cancel_pending()import agentexec as ax
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Summarize...",
)
runner.prompts.report_status # Instruction text for agents
runner.tools.report_status # Pre-bound function tool
result = await runner.run(agent, input="...", max_turns=15)
result = await runner.run_streamed(agent, input="...", max_turns=15)
# Base class for custom runners
class MyRunner(ax.BaseAgentRunner):
async def run(self, agent: Agent, input: str) -> RunResult: ...import agentexec as ax
pipeline = ax.Pipeline(pool)
class MyPipeline(pipeline.Base):
@pipeline.step(0, "description")
async def step_one(self, context): ...import agentexec as ax
tracker = ax.Tracker("name", batch_id)
tracker.incr()
if tracker.complete: ... # All tasks doneimport agentexec as ax
ax.Base # SQLAlchemy declarative base for activity tablesAll settings via environment variables:
# Redis
AGENTEXEC_REDIS_URL=redis://localhost:6379/0 # Also accepts REDIS_URL
AGENTEXEC_REDIS_POOL_SIZE=10
AGENTEXEC_REDIS_POOL_TIMEOUT=5
# Workers
AGENTEXEC_NUM_WORKERS=4
AGENTEXEC_QUEUE_PREFIX=agentexec_tasks # Also accepts AGENTEXEC_QUEUE_NAME
AGENTEXEC_GRACEFUL_SHUTDOWN_TIMEOUT=300
AGENTEXEC_MAX_TASK_RETRIES=3 # 0 to disable retries
# Database
AGENTEXEC_TABLE_PREFIX=agentexec_
# Results
AGENTEXEC_RESULT_TTL=3600
# Task locking (Redis backend only)
AGENTEXEC_LOCK_TTL=1800
# Scheduling
AGENTEXEC_SCHEDULER_TIMEZONE=UTC
AGENTEXEC_SCHEDULER_POLL_INTERVAL=10
# State backend
AGENTEXEC_STATE_BACKEND=agentexec.state.redis # or agentexec.state.kafka
AGENTEXEC_KEY_PREFIX=agentexec
# Activity messages (customizable)
AGENTEXEC_ACTIVITY_MESSAGE_CREATE="Waiting to start."
AGENTEXEC_ACTIVITY_MESSAGE_STARTED="Task started."
AGENTEXEC_ACTIVITY_MESSAGE_COMPLETE="Task completed successfully."
AGENTEXEC_ACTIVITY_MESSAGE_ERROR="Task failed with error: {error}"These settings only apply when using the Kafka state backend (AGENTEXEC_STATE_BACKEND=agentexec.state.kafka):
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Also accepts AGENTEXEC_KAFKA_BOOTSTRAP_SERVERS
AGENTEXEC_KAFKA_DEFAULT_PARTITIONS=6 # Partitions for auto-created topics
AGENTEXEC_KAFKA_REPLICATION_FACTOR=1 # Replication factor for auto-created topics
AGENTEXEC_KAFKA_MAX_BATCH_SIZE=16384 # Producer max batch size (bytes)
AGENTEXEC_KAFKA_LINGER_MS=5 # Producer linger time (ms)
AGENTEXEC_KAFKA_RETENTION_MS=-1 # Retention for compacted topics (-1 = forever)For single-node development, set KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 on your broker or consumer groups will hang.
# Clone repository
git clone https://github.com/Agent-CI/agentexec
cd agentexec
# Install dependencies
uv sync
# Run tests
uv run pytest
# Type checking
uv run ty check
# Linting
uv run ruff check src/
# Formatting
uv run ruff format src/- Fork the repository
- Create a feature branch
- Make your changes with tests
- Run
uv run pytestanduv run ty check - Submit a pull request
MIT License - see LICENSE for details.
- PyPI: agentexec
- npm: agentexec-ui
- Documentation: docs/
- Example App: examples/openai-agents-fastapi/
- Multi-Tenancy Example: examples/multi-tenancy/
- Queue Fairness Benchmark: examples/queue-fairness/
- Issues: GitHub Issues