Back to Repositories

Testing Sensor Data Collection and Timing Accuracy in OpenPilot

This test suite validates the functionality of the sensord module in OpenPilot, focusing on sensor data collection, timing accuracy, and interrupt handling. The tests ensure proper sensor configuration, data sampling rates, and value validation across multiple sensor types including accelerometers, gyroscopes, and magnetometers.

Test Coverage Overview

The test suite provides comprehensive coverage of sensor functionality including:

  • Sensor presence and configuration validation
  • Timing accuracy for LSM6DS3 sensor at 104Hz
  • Sampling frequency verification across all sensors
  • Timestamp synchronization and monotonicity checks
  • Sensor value range validation
  • Interrupt handling verification

Implementation Analysis

The testing approach utilizes pytest fixtures and class-level setup for efficient test organization. It implements sophisticated timing analysis using numpy for statistical calculations, and employs custom sensor data collection methods with timeout handling.

Key patterns include sensor event monitoring, interrupt verification, and comprehensive error checking with detailed assertion messages.

Technical Details

Testing tools and configuration:

  • pytest framework with TICI marker for platform-specific testing
  • numpy for statistical analysis
  • cereal messaging system for sensor data communication
  • Custom timeout handling and process management
  • Environment variable configuration for test parameters
  • IRQ monitoring through system interfaces

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices including:

  • Proper test isolation and cleanup
  • Comprehensive error messages with detailed context
  • Efficient use of pytest subtests for granular testing
  • Robust timing validation with statistical analysis
  • Thorough setup and teardown procedures
  • Clear separation of test concerns

commaai/openpilot

system/sensord/tests/test_sensord.py

            
import os
import pytest
import time
import numpy as np
from collections import namedtuple, defaultdict

import cereal.messaging as messaging
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.gpio import get_irqs_for_action
from openpilot.common.timeout import Timeout
from openpilot.system.manager.process_config import managed_processes

BMX = {
  ('bmx055', 'acceleration'),
  ('bmx055', 'gyroUncalibrated'),
  ('bmx055', 'magneticUncalibrated'),
  ('bmx055', 'temperature'),
}

LSM = {
  ('lsm6ds3', 'acceleration'),
  ('lsm6ds3', 'gyroUncalibrated'),
  ('lsm6ds3', 'temperature'),
}
LSM_C = {(x[0]+'trc', x[1]) for x in LSM}

MMC = {
  ('mmc5603nj', 'magneticUncalibrated'),
}

SENSOR_CONFIGURATIONS = (
  (BMX | LSM),
  (MMC | LSM),
  (BMX | LSM_C),
  (MMC| LSM_C),
)

Sensor = log.SensorEventData.SensorSource
SensorConfig = namedtuple('SensorConfig', ['type', 'sanity_min', 'sanity_max'])
ALL_SENSORS = {
  Sensor.lsm6ds3: {
    SensorConfig("acceleration", 5, 15),
    SensorConfig("gyroUncalibrated", 0, .2),
    SensorConfig("temperature", 0, 60),
  },

  Sensor.lsm6ds3trc: {
    SensorConfig("acceleration", 5, 15),
    SensorConfig("gyroUncalibrated", 0, .2),
    SensorConfig("temperature", 0, 60),
  },

  Sensor.bmx055: {
    SensorConfig("acceleration", 5, 15),
    SensorConfig("gyroUncalibrated", 0, .2),
    SensorConfig("magneticUncalibrated", 0, 300),
    SensorConfig("temperature", 0, 60),
  },

  Sensor.mmc5603nj: {
    SensorConfig("magneticUncalibrated", 0, 300),
  }
}


def get_irq_count(irq: int):
  with open(f"/sys/kernel/irq/{irq}/per_cpu_count") as f:
    per_cpu = map(int, f.read().split(","))
    return sum(per_cpu)

def read_sensor_events(duration_sec):
  sensor_types = ['accelerometer', 'gyroscope', 'magnetometer', 'accelerometer2',
                  'gyroscope2', 'temperatureSensor', 'temperatureSensor2']
  socks = {}
  poller = messaging.Poller()
  events = defaultdict(list)
  for stype in sensor_types:
    socks[stype] = messaging.sub_sock(stype, poller=poller, timeout=100)

  # wait for sensors to come up
  with Timeout(int(os.environ.get("SENSOR_WAIT", "5")), "sensors didn't come up"):
    while len(poller.poll(250)) == 0:
      pass
  time.sleep(1)
  for s in socks.values():
    messaging.drain_sock_raw(s)

  st = time.monotonic()
  while time.monotonic() - st < duration_sec:
    for s in socks:
      events[s] += messaging.drain_sock(socks[s])
    time.sleep(0.1)

  assert sum(map(len, events.values())) != 0, "No sensor events collected!"

  return {k: v for k, v in events.items() if len(v) > 0}

@pytest.mark.tici
class TestSensord:
  @classmethod
  def setup_class(cls):
    # enable LSM self test
    os.environ["LSM_SELF_TEST"] = "1"

    # read initial sensor values every test case can use
    os.system("pkill -f \\\\./sensord")
    try:
      managed_processes["sensord"].start()
      cls.sample_secs = int(os.getenv("SAMPLE_SECS", "10"))
      cls.events = read_sensor_events(cls.sample_secs)

      # determine sensord's irq
      cls.sensord_irq = get_irqs_for_action("sensord")[0]
    finally:
      # teardown won't run if this doesn't succeed
      managed_processes["sensord"].stop()

  @classmethod
  def teardown_class(cls):
    managed_processes["sensord"].stop()

  def teardown_method(self):
    managed_processes["sensord"].stop()

  def test_sensors_present(self):
    # verify correct sensors configuration
    seen = set()
    for etype in self.events:
      for measurement in self.events[etype]:
        m = getattr(measurement, measurement.which())
        seen.add((str(m.source), m.which()))

    assert seen in SENSOR_CONFIGURATIONS

  def test_lsm6ds3_timing(self, subtests):
    # verify measurements are sampled and published at 104Hz

    sensor_t = {
      1: [], # accel
      5: [], # gyro
    }

    for measurement in self.events['accelerometer']:
      m = getattr(measurement, measurement.which())
      sensor_t[m.sensor].append(m.timestamp)

    for measurement in self.events['gyroscope']:
      m = getattr(measurement, measurement.which())
      sensor_t[m.sensor].append(m.timestamp)

    for s, vals in sensor_t.items():
      with subtests.test(sensor=s):
        assert len(vals) > 0
        tdiffs = np.diff(vals) / 1e6 # millis

        high_delay_diffs = list(filter(lambda d: d >= 20., tdiffs))
        assert len(high_delay_diffs) < 15, f"Too many large diffs: {high_delay_diffs}"

        avg_diff = sum(tdiffs)/len(tdiffs)
        avg_freq = 1. / (avg_diff * 1e-3)
        assert 92. < avg_freq < 114., f"avg freq {avg_freq}Hz wrong, expected 104Hz"

        stddev = np.std(tdiffs)
        assert stddev < 2.0, f"Standard-dev to big {stddev}"

  def test_sensor_frequency(self, subtests):
    for s, msgs in self.events.items():
      with subtests.test(sensor=s):
        freq = len(msgs) / self.sample_secs
        ef = SERVICE_LIST[s].frequency
        assert ef*0.85 <= freq <= ef*1.15

  def test_logmonottime_timestamp_diff(self):
    # ensure diff between the message logMonotime and sample timestamp is small

    tdiffs = list()
    for etype in self.events:
      for measurement in self.events[etype]:
        m = getattr(measurement, measurement.which())

        # check if gyro and accel timestamps are before logMonoTime
        if str(m.source).startswith("lsm6ds3") and m.which() != 'temperature':
          err_msg = f"Timestamp after logMonoTime: {m.timestamp} > {measurement.logMonoTime}"
          assert m.timestamp < measurement.logMonoTime, err_msg

        # negative values might occur, as non interrupt packages created
        # before the sensor is read
        tdiffs.append(abs(measurement.logMonoTime - m.timestamp) / 1e6)

    # some sensors have a read procedure that will introduce an expected diff on the order of 20ms
    high_delay_diffs = set(filter(lambda d: d >= 25., tdiffs))
    assert len(high_delay_diffs) < 20, f"Too many measurements published: {high_delay_diffs}"

    avg_diff = round(sum(tdiffs)/len(tdiffs), 4)
    assert avg_diff < 4, f"Avg packet diff: {avg_diff:.1f}ms"

  def test_sensor_values(self):
    sensor_values = dict()
    for etype in self.events:
      for measurement in self.events[etype]:
        m = getattr(measurement, measurement.which())
        key = (m.source.raw, m.which())
        values = getattr(m, m.which())

        if hasattr(values, 'v'):
          values = values.v
        values = np.atleast_1d(values)

        if key in sensor_values:
          sensor_values[key].append(values)
        else:
          sensor_values[key] = [values]

    # Sanity check sensor values
    for sensor, stype in sensor_values:
      for s in ALL_SENSORS[sensor]:
        if s.type != stype:
          continue

        key = (sensor, s.type)
        mean_norm = np.mean(np.linalg.norm(sensor_values[key], axis=1))
        err_msg = f"Sensor '{sensor} {s.type}' failed sanity checks {mean_norm} is not between {s.sanity_min} and {s.sanity_max}"
        assert s.sanity_min <= mean_norm <= s.sanity_max, err_msg

  def test_sensor_verify_no_interrupts_after_stop(self):
    managed_processes["sensord"].start()
    time.sleep(3)

    # read /proc/interrupts to verify interrupts are received
    state_one = get_irq_count(self.sensord_irq)
    time.sleep(1)
    state_two = get_irq_count(self.sensord_irq)

    error_msg = f"no interrupts received after sensord start!
{state_one} {state_two}"
    assert state_one != state_two, error_msg

    managed_processes["sensord"].stop()
    time.sleep(1)

    # read /proc/interrupts to verify no more interrupts are received
    state_one = get_irq_count(self.sensord_irq)
    time.sleep(1)
    state_two = get_irq_count(self.sensord_irq)
    assert state_one == state_two, "Interrupts received after sensord stop!"