Back to Repositories

Testing Simulation Bridge Integration in OpenPilot

This test suite validates the simulation bridge functionality in OpenPilot’s testing infrastructure, focusing on driving simulation and process management. It ensures reliable communication between OpenPilot and various simulation environments while verifying control engagement and system stability.

Test Coverage Overview

The test suite provides comprehensive coverage of the simulation bridge functionality, focusing on process initialization, driving state management, and system monitoring.

  • Tests startup sequence and process management
  • Validates driving engagement and control states
  • Monitors system events and failure conditions
  • Verifies communication between OpenPilot and simulation environment

Implementation Analysis

The implementation uses a robust pytest-based approach with multiprocessing for handling concurrent operations.

Key patterns include:
  • Process management through subprocess module
  • Inter-process communication via Queue
  • State monitoring using messaging system
  • Graceful teardown and cleanup procedures

Technical Details

Testing infrastructure leverages:

  • pytest framework for test organization
  • Python multiprocessing for parallel execution
  • Custom messaging system for state management
  • Subprocess management for simulation control
  • Time-based assertion checks

Best Practices Demonstrated

The test implementation showcases several testing best practices and quality patterns.

  • Proper setup and teardown procedures
  • Robust error handling and assertion messages
  • Clear separation of concerns between test phases
  • Comprehensive state verification
  • Timeout management for long-running operations

commaai/openpilot

tools/sim/tests/test_sim_bridge.py

            
import os
import subprocess
import time
import pytest

from multiprocessing import Queue

from cereal import messaging
from openpilot.common.basedir import BASEDIR
from openpilot.tools.sim.bridge.common import QueueMessageType

SIM_DIR = os.path.join(BASEDIR, "tools/sim")

class TestSimBridgeBase:
  @classmethod
  def setup_class(cls):
    if cls is TestSimBridgeBase:
      raise pytest.skip("Don't run this base class, run test_metadrive_bridge.py instead")

  def setup_method(self):
    self.processes = []

  def test_driving(self):
    # Startup manager and bridge.py. Check processes are running, then engage and verify.
    p_manager = subprocess.Popen("./launch_openpilot.sh", cwd=SIM_DIR)
    self.processes.append(p_manager)

    sm = messaging.SubMaster(['selfdriveState', 'onroadEvents', 'managerState'])
    q = Queue()
    bridge = self.create_bridge()
    p_bridge = bridge.run(q, retries=10)
    self.processes.append(p_bridge)

    max_time_per_step = 60

    # Wait for bridge to startup
    start_waiting = time.monotonic()
    while not bridge.started.value and time.monotonic() < start_waiting + max_time_per_step:
      time.sleep(0.1)
    assert p_bridge.exitcode is None, f"Bridge process should be running, but exited with code {p_bridge.exitcode}"

    start_time = time.monotonic()
    no_car_events_issues_once = False
    car_event_issues = []
    not_running = []
    while time.monotonic() < start_time + max_time_per_step:
      sm.update()

      not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning]
      car_event_issues = [event.name for event in sm['onroadEvents'] if any([event.noEntry, event.softDisable, event.immediateDisable])]

      if sm.all_alive() and len(car_event_issues) == 0 and len(not_running) == 0:
        no_car_events_issues_once = True
        break

    assert no_car_events_issues_once, \
                    f"Failed because no messages received, or CarEvents '{car_event_issues}' or processes not running '{not_running}'"

    start_time = time.monotonic()
    min_counts_control_active = 100
    control_active = 0

    while time.monotonic() < start_time + max_time_per_step:
      sm.update()

      if sm.all_alive() and sm['selfdriveState'].active:
        control_active += 1

        if control_active == min_counts_control_active:
          break

    assert min_counts_control_active == control_active, f"Simulator did not engage a minimal of {min_counts_control_active} steps was {control_active}"

    failure_states = []
    while bridge.started.value:
      continue

    while not q.empty():
      state = q.get()
      if state.type == QueueMessageType.TERMINATION_INFO:
        done_info = state.info
        failure_states = [done_state for done_state in done_info if done_state != "timeout" and done_info[done_state]]
        break
    assert len(failure_states) == 0, f"Simulator fails to finish a loop. Failure states: {failure_states}"

  def teardown_method(self):
    print("Test shutting down. CommIssues are acceptable")
    for p in reversed(self.processes):
      p.terminate()

    for p in reversed(self.processes):
      p.kill()