Skip to content

Latest commit

 

History

History
360 lines (274 loc) · 8.82 KB

File metadata and controls

360 lines (274 loc) · 8.82 KB

ApolloSimFuzz API Reference

Fuzzer API

Base Fuzzer (fuzzer/runner_base.py)

class Fuzzer:
    """Base class for all fuzzers."""

    def __init__(self, fuzzer_config: DictConfig)
    def run()                              # Main entry point
    def _run(self, start_time)             # Override in subclass
    def termination_check(start_time) -> bool

    # Scenario execution
    def execute_individual(individuals) -> list
    def execute_evaluate(individuals) -> list

    # Container management
    def start_sandbox_container() -> str    # Returns container name
    def stop_sandbox_container()
    def stop_apollo_containers()

    # Checkpoint
    def load_checkpoint()
    def save_checkpoint()

    # Results
    def record_logbook(gen, pop)

RandomFuzzer (fuzzer/runner_random.py)

@ENGINE_REGISTRY.register("fuzzer.random")
class RandomFuzzer(Fuzzer):

    def random_sample() -> FuzzSeed        # Generate random scenario
    def random_mutation(parent: FuzzSeed) -> FuzzSeed

FuzzSeed (fuzzer/runner_base.py)

@dataclass
class FuzzSeed:
    id: str
    scenario: ScenarioConfig
    oracle_result: dict          # Violation detection results
    feedback_result: dict        # Fitness metric values
    is_expected: bool            # Has target violations
    is_ignored: bool             # Scenario was invalid

    def set_id(id: str)
    def to_dict() -> dict
    def load_from_dict(d: dict) -> FuzzSeed

Oracle API (fuzzer/oracle/)

OracleWrapper

class OracleWrapper:
    def __init__(self, config: DictConfig)
    def evaluate(observations, runtime_results) -> dict
    def register_check(name: str, check_fn: callable)

Return format:

{
    "expected": True,           # Has focus violations
    "ignored": False,           # Scenario was valid
    "violations": {
        "collision": {...},
        "speeding": {...},
        ...
    }
}

Built-in checks:

Name Function Description
collision check_collision() Physics-based collision detection
speeding check_speeding() Speed vs. map limits
wrong_way check_wrong_way() Heading vs. lane direction
red_light check_red_light() Red light compliance
wrong_lane_type check_wrong_lane() Non-driving lane detection
crossing_solid_line check_solid_line() Solid boundary crossing
stuck_at_red_light check_stuck() Extended stop detection

Feedback API (fuzzer/feedback/)

FeedbackWrapper

class FeedbackWrapper:
    def __init__(self, config: DictConfig)
    def evaluate(observations, runtime_results) -> dict
    def register_metric(name: str, metric_fn: callable)

Return format:

{
    "collision_feedback": 0.85,
    "speeding": 0.0,
    "wrong_way": 0.0,
    ...
}

Scenario Space API (fuzzer/scenario_space.py)

ScenarioODDSpace

class ScenarioODDSpace:
    map_region_space: MapRegionSpace
    ego_space: EGOSpace
    npc_vehicle_space: NPCVehicleSpace
    npc_pedestrian_space: NPCPedestrianSpace
    npc_static_space: NPCStaticSpace
    traffic_light_space: TrafficLightSpace

MapRegionSpace

class MapRegionSpace:
    map_name: str                     # e.g., "san_mateo"
    region_points: List[List[float]]  # Polygon defining test region
    forbidden_region_points: List     # No-spawn zones (e.g., junctions)

NPCVehicleSpace

class NPCVehicleSpace:
    num_range: List[int]              # [min, max] NPC count
    route_length_range: List[float]   # [min, max] route length (m)
    trigger_time_range: List[float]   # [min, max] spawn delay (s)
    speed_range: List[float]          # [min, max] speed (m/s)

Scenario Config API (scenario_corpus/openscenario/config.py)

ScenarioConfig

@dataclass
class ScenarioConfig:
    id: str
    map: MapConfig
    ego: ApolloConfig
    npc_vehicles: List[WaypointVehicleConfig]
    npc_walkers: List[WaypointWalkerConfig]
    npc_statics: List[StaticObstacleConfig]
    traffic_lights: List[RuleLightConfig]

ApolloConfig (Ego)

@dataclass
class ApolloConfig:
    id: str = "0"
    model: str = "vehicle.lincoln.mkz"
    route: List[Waypoint]              # Start and end waypoints

WaypointVehicleConfig (NPC)

@dataclass
class WaypointVehicleConfig:
    id: str
    model: str = "vehicle.lincoln.mkz.perfect"
    trigger_time: float                # When to start moving
    target_speed: float                # Desired speed (m/s)
    waypoints: List[Waypoint]          # Route to follow

Scenario Runner API (scenario_runner/)

SandboxOperator

class SandboxOperator:
    def __init__(self, container_name: str)

    # Dot-style RPC access:
    operator.sim.reset()
    operator.sim.create_actor({...})
    operator.sim.start_scenario()
    operator.sim.get_snapshot()
    operator.map.find_lane_id(x, y)
    operator.map.lane.get_all(False, "CITY_DRIVING")

    def connect()
    def close()

ScenarioManager

class ScenarioManager:
    def __init__(self, config, apollo_ctns, sandbox_container_name, scenario_dir)

    def run() -> bool              # Execute scenario
    def stop()                     # Cleanup
    def export_result(dir)         # Save observations + results

SubScenarioManager

class SubScenarioManager:
    """Override to create custom scenario types."""

    def create_actors()            # Create actors in sandbox
    def create_agents()            # Spawn agent processes
    def create_criteria()          # Return list of runtime criteria
    def _get_agent_cls()           # Return agent class
    def _get_agent_config(config)  # Return agent kwargs

Agent API (scenario_elements/agents/)

AgentBase

class AgentBase:
    """Base class for all agents. Runs in a subprocess."""

    def __init__(self, sandbox_container_name, actor_id, start_event, stop_event, ...)
    def run()                      # Main loop (called by multiprocessing)
    def _tick(snapshot) -> dict     # Override: compute next action
    def _apply_control(control)    # Send control to sandbox

Criteria API (scenario_elements/criteria/)

CriteriaBase

class CriteriaBase:
    def __init__(self, name, actor_id, actor_config, terminate_on_failure=False)

    def tick(self, snapshot)        # Override: check for violations
    def terminate() -> bool        # Should scenario stop?

    # Result
    st_detail: dict = {"occurred": False, "details": {}}

Registry API (registry/)

Register a component

from registry import ENGINE_REGISTRY

@ENGINE_REGISTRY.register("fuzzer.my_method")
class MyFuzzer(Fuzzer):
    ...

Available registries

Registry Purpose Example key
ENGINE_REGISTRY Fuzzers fuzzer.random
PUBLISHER_REGISTRY Apollo publishers publisher.chassis
SUBSCRIBER_REGISTRY Apollo subscribers subscriber.control
RUNNER_REGISTRY Scenario runners --
MANAGER_REGISTRY Scenario managers --

Configuration Reference

config.yaml (Hydra)

run_tag: debug                    # Unique run identifier
debug: true                       # Debug logging
resume: true                      # Resume from checkpoint
save_record: true                 # Save scenario recordings
use_dreamview: true               # Enable Apollo Dreamview UI

output_root: results              # Output directory
fuzzer_dir: "fuzzer"              # Fuzzer module directory
apollo_tag: "default"             # Apollo container name prefix
apollo_root: "apollo"             # Path to Apollo source

sandbox_image: 'drivora/sandbox:latest'
sandbox_fps: 20.0                 # Simulation FPS

tester:
  type: 'random'                  # Fuzzer type (registry key)
  time_budget: 4                  # Hours
  config_path: 'fuzzer/configs/s1.yaml'

fuzzer/configs/s1.yaml

feedback:
  skip_start_frame: 0

oracle:
  skip_start_frame: 0
  collision_speed_threshold_ego: 0.01
  collision_speed_threshold_npc: 1.0
  collision_degree: 90.0
  focus_violations:
    - "collision"                  # Which violations determine "expected"

scenario_space:
  map_region_space:
    map_name: "san_mateo"
    region_points: [...]          # Test area polygon
    forbidden_region_points: [...] # No-spawn zones

  ego_space:
    route: [[start_x, start_y], [end_x, end_y]]

  npc_vehicle_space:
    num_range: [5, 5]             # Exact 5 NPCs
    route_length_range: [100.0, 500.0]
    trigger_time_range: [0.0, 5.0]
    speed_range: [0.2, 8.0]

  traffic_light_space:
    pattern_range: ["rule", "force_green"]
    green_duration_range: [5.0, 10.0]
    yellow_duration_range: [2.0, 3.0]
    red_duration_range: [5.0, 10.0]