Back to Repositories

Testing Camera Frame Synchronization System in openpilot

A comprehensive test suite for the camerad module in openpilot, focusing on camera synchronization and frame capture verification. This test suite ensures reliable operation of multiple camera sensors by validating frame timing, sync status, and capture consistency.

Test Coverage Overview

The test suite provides thorough coverage of the camerad module’s core functionality:

Key areas tested include:
  • Frame capture consistency across multiple cameras
  • Frame timing and synchronization verification
  • Camera sensor type validation
  • Frame skip detection
Edge cases covered include timing tolerances for different sensor types (AR0231 and OX03C10) and handling of frame synchronization anomalies.

Implementation Analysis

The testing approach utilizes pytest fixtures and custom retry logic to ensure reliable test execution. The implementation employs messaging-based communication to monitor camera outputs and sophisticated timing analysis using numpy for statistical calculations.

Notable patterns include:
  • Automated camera process management
  • Message subscription and monitoring
  • Frame timing analysis with configurable tolerances
  • Data collection over specified time windows

Technical Details

Testing infrastructure includes:
  • pytest framework for test organization
  • cereal messaging system for IPC
  • numpy for numerical analysis
  • Custom retry decorator for stability
  • Process management utilities
  • Configurable timing tolerances per sensor type
  • Frame synchronization validation logic

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Separation of setup and test logic
  • Comprehensive error reporting with detailed diagnostics
  • Configurable tolerances for different hardware
  • Robust retry mechanism for flaky conditions
  • Clear test organization and modularity
  • Hardware-specific test marking (@pytest.mark.tici)

commaai/openpilot

system/camerad/test/test_camerad.py

            
import pytest
import time
import numpy as np
from collections import defaultdict

import cereal.messaging as messaging
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.retry import retry
from openpilot.system.manager.process_config import managed_processes

TEST_TIMESPAN = 10
LAG_FRAME_TOLERANCE = {log.FrameData.ImageSensor.ar0231: 0.5,  # ARs use synced pulses for frame starts
                       log.FrameData.ImageSensor.ox03c10: 1.1} # OXs react to out-of-sync at next frame
FRAME_DELTA_TOLERANCE = {log.FrameData.ImageSensor.ar0231: 1.0,
                       log.FrameData.ImageSensor.ox03c10: 1.0}

CAMERAS = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState')

@retry(attempts=3, delay=5.0)
def setup_camerad(cls):
  # run camerad and record logs
  managed_processes['camerad'].start()
  time.sleep(3)
  socks = {c: messaging.sub_sock(c, conflate=False, timeout=100) for c in CAMERAS}

  cls.logs = defaultdict(list)
  start_time = time.monotonic()
  while time.monotonic()- start_time < TEST_TIMESPAN:
    for cam, s in socks.items():
      cls.logs[cam] += messaging.drain_sock(s)
    time.sleep(0.2)
  managed_processes['camerad'].stop()

  cls.log_by_frame_id = defaultdict(list)
  cls.sensor_type = None
  for cam, msgs in cls.logs.items():
    if cls.sensor_type is None:
      cls.sensor_type = getattr(msgs[0], msgs[0].which()).sensor.raw
    expected_frames = SERVICE_LIST[cam].frequency * TEST_TIMESPAN
    assert expected_frames*0.95 < len(msgs) < expected_frames*1.05, f"unexpected frame count {cam}: {expected_frames=}, got {len(msgs)}"

    dts = np.abs(np.diff([getattr(m, m.which()).timestampSof/1e6 for m in msgs]) - 1000/SERVICE_LIST[cam].frequency)
    assert (dts < FRAME_DELTA_TOLERANCE[cls.sensor_type]).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}"

    for m in msgs:
      cls.log_by_frame_id[getattr(m, m.which()).frameId].append(m)

  # strip beginning and end
  for _ in range(3):
    mn, mx = min(cls.log_by_frame_id.keys()), max(cls.log_by_frame_id.keys())
    del cls.log_by_frame_id[mn]
    del cls.log_by_frame_id[mx]

@pytest.mark.tici
class TestCamerad:
  @classmethod
  def setup_class(cls):
    setup_camerad(cls)

  def test_frame_skips(self):
    skips = {}
    frame_ids = self.log_by_frame_id.keys()
    for frame_id in range(min(frame_ids), max(frame_ids)):
      seen_cams = [msg.which() for msg in self.log_by_frame_id[frame_id]]
      skip_cams = set(CAMERAS) - set(seen_cams)
      if len(skip_cams):
        skips[frame_id] = skip_cams
    assert len(skips) == 0, f"Found frame skips, missing cameras for the following frames: {skips}"

  def test_frame_sync(self):
    frame_times = {frame_id: [getattr(m, m.which()).timestampSof for m in msgs] for frame_id, msgs in self.log_by_frame_id.items()}
    diffs = {frame_id: (max(ts) - min(ts))/1e6 for frame_id, ts in frame_times.items()}

    def get_desc(fid, diff):
      cam_times = [(m.which(), getattr(m, m.which()).timestampSof/1e6) for m in self.log_by_frame_id[fid]]
      return (diff, cam_times)
    laggy_frames = {k: get_desc(k, v) for k, v in diffs.items() if v > LAG_FRAME_TOLERANCE[self.sensor_type]}

    def in_tol(diff):
      return 50 - LAG_FRAME_TOLERANCE[self.sensor_type] < diff and diff < 50 + LAG_FRAME_TOLERANCE[self.sensor_type]
    if len(laggy_frames) != 0 and all( in_tol(laggy_frames[lf][0]) for lf in laggy_frames):
      print("TODO: handle camera out of sync")
    else:
      assert len(laggy_frames) == 0, f"Frames not synced properly: {laggy_frames=}"