class Fuzzer:
"""Base class for all fuzzers."""
def __init__(self, fuzzer_config: DictConfig)
def run() # Main entry point
def _run(self, start_time) # Override in subclass
def termination_check(start_time) -> bool
# Scenario execution
def execute_individual(individuals) -> list
def execute_evaluate(individuals) -> list
# Container management
def start_sandbox_container() -> str # Returns container name
def stop_sandbox_container()
def stop_apollo_containers()
# Checkpoint
def load_checkpoint()
def save_checkpoint()
# Results
def record_logbook(gen, pop)@ENGINE_REGISTRY.register("fuzzer.random")
class RandomFuzzer(Fuzzer):
def random_sample() -> FuzzSeed # Generate random scenario
def random_mutation(parent: FuzzSeed) -> FuzzSeed@dataclass
class FuzzSeed:
id: str
scenario: ScenarioConfig
oracle_result: dict # Violation detection results
feedback_result: dict # Fitness metric values
is_expected: bool # Has target violations
is_ignored: bool # Scenario was invalid
def set_id(id: str)
def to_dict() -> dict
def load_from_dict(d: dict) -> FuzzSeedclass OracleWrapper:
def __init__(self, config: DictConfig)
def evaluate(observations, runtime_results) -> dict
def register_check(name: str, check_fn: callable)Return format:
{
"expected": True, # Has focus violations
"ignored": False, # Scenario was valid
"violations": {
"collision": {...},
"speeding": {...},
...
}
}Built-in checks:
| Name | Function | Description |
|---|---|---|
collision |
check_collision() |
Physics-based collision detection |
speeding |
check_speeding() |
Speed vs. map limits |
wrong_way |
check_wrong_way() |
Heading vs. lane direction |
red_light |
check_red_light() |
Red light compliance |
wrong_lane_type |
check_wrong_lane() |
Non-driving lane detection |
crossing_solid_line |
check_solid_line() |
Solid boundary crossing |
stuck_at_red_light |
check_stuck() |
Extended stop detection |
class FeedbackWrapper:
def __init__(self, config: DictConfig)
def evaluate(observations, runtime_results) -> dict
def register_metric(name: str, metric_fn: callable)Return format:
{
"collision_feedback": 0.85,
"speeding": 0.0,
"wrong_way": 0.0,
...
}class ScenarioODDSpace:
map_region_space: MapRegionSpace
ego_space: EGOSpace
npc_vehicle_space: NPCVehicleSpace
npc_pedestrian_space: NPCPedestrianSpace
npc_static_space: NPCStaticSpace
traffic_light_space: TrafficLightSpaceclass MapRegionSpace:
map_name: str # e.g., "san_mateo"
region_points: List[List[float]] # Polygon defining test region
forbidden_region_points: List # No-spawn zones (e.g., junctions)class NPCVehicleSpace:
num_range: List[int] # [min, max] NPC count
route_length_range: List[float] # [min, max] route length (m)
trigger_time_range: List[float] # [min, max] spawn delay (s)
speed_range: List[float] # [min, max] speed (m/s)@dataclass
class ScenarioConfig:
id: str
map: MapConfig
ego: ApolloConfig
npc_vehicles: List[WaypointVehicleConfig]
npc_walkers: List[WaypointWalkerConfig]
npc_statics: List[StaticObstacleConfig]
traffic_lights: List[RuleLightConfig]@dataclass
class ApolloConfig:
id: str = "0"
model: str = "vehicle.lincoln.mkz"
route: List[Waypoint] # Start and end waypoints@dataclass
class WaypointVehicleConfig:
id: str
model: str = "vehicle.lincoln.mkz.perfect"
trigger_time: float # When to start moving
target_speed: float # Desired speed (m/s)
waypoints: List[Waypoint] # Route to followclass SandboxOperator:
def __init__(self, container_name: str)
# Dot-style RPC access:
operator.sim.reset()
operator.sim.create_actor({...})
operator.sim.start_scenario()
operator.sim.get_snapshot()
operator.map.find_lane_id(x, y)
operator.map.lane.get_all(False, "CITY_DRIVING")
def connect()
def close()class ScenarioManager:
def __init__(self, config, apollo_ctns, sandbox_container_name, scenario_dir)
def run() -> bool # Execute scenario
def stop() # Cleanup
def export_result(dir) # Save observations + resultsclass SubScenarioManager:
"""Override to create custom scenario types."""
def create_actors() # Create actors in sandbox
def create_agents() # Spawn agent processes
def create_criteria() # Return list of runtime criteria
def _get_agent_cls() # Return agent class
def _get_agent_config(config) # Return agent kwargsclass AgentBase:
"""Base class for all agents. Runs in a subprocess."""
def __init__(self, sandbox_container_name, actor_id, start_event, stop_event, ...)
def run() # Main loop (called by multiprocessing)
def _tick(snapshot) -> dict # Override: compute next action
def _apply_control(control) # Send control to sandboxclass CriteriaBase:
def __init__(self, name, actor_id, actor_config, terminate_on_failure=False)
def tick(self, snapshot) # Override: check for violations
def terminate() -> bool # Should scenario stop?
# Result
st_detail: dict = {"occurred": False, "details": {}}from registry import ENGINE_REGISTRY
@ENGINE_REGISTRY.register("fuzzer.my_method")
class MyFuzzer(Fuzzer):
...| Registry | Purpose | Example key |
|---|---|---|
ENGINE_REGISTRY |
Fuzzers | fuzzer.random |
PUBLISHER_REGISTRY |
Apollo publishers | publisher.chassis |
SUBSCRIBER_REGISTRY |
Apollo subscribers | subscriber.control |
RUNNER_REGISTRY |
Scenario runners | -- |
MANAGER_REGISTRY |
Scenario managers | -- |
run_tag: debug # Unique run identifier
debug: true # Debug logging
resume: true # Resume from checkpoint
save_record: true # Save scenario recordings
use_dreamview: true # Enable Apollo Dreamview UI
output_root: results # Output directory
fuzzer_dir: "fuzzer" # Fuzzer module directory
apollo_tag: "default" # Apollo container name prefix
apollo_root: "apollo" # Path to Apollo source
sandbox_image: 'drivora/sandbox:latest'
sandbox_fps: 20.0 # Simulation FPS
tester:
type: 'random' # Fuzzer type (registry key)
time_budget: 4 # Hours
config_path: 'fuzzer/configs/s1.yaml'feedback:
skip_start_frame: 0
oracle:
skip_start_frame: 0
collision_speed_threshold_ego: 0.01
collision_speed_threshold_npc: 1.0
collision_degree: 90.0
focus_violations:
- "collision" # Which violations determine "expected"
scenario_space:
map_region_space:
map_name: "san_mateo"
region_points: [...] # Test area polygon
forbidden_region_points: [...] # No-spawn zones
ego_space:
route: [[start_x, start_y], [end_x, end_y]]
npc_vehicle_space:
num_range: [5, 5] # Exact 5 NPCs
route_length_range: [100.0, 500.0]
trigger_time_range: [0.0, 5.0]
speed_range: [0.2, 8.0]
traffic_light_space:
pattern_range: ["rule", "force_green"]
green_duration_range: [5.0, 10.0]
yellow_duration_range: [2.0, 3.0]
red_duration_range: [5.0, 10.0]