Back to Repositories

Testing Locationd Sensor Processing and Kalman Filter Stability in OpenPilot

This test suite validates the locationd component in OpenPilot’s autonomous driving system, focusing on sensor data processing and Kalman filter stability across various scenarios. The tests verify system behavior under normal conditions and edge cases like sensor failures and data spikes.

Test Coverage Overview

The test suite provides comprehensive coverage of locationd’s sensor processing capabilities and Kalman filter stability.

Key areas tested include:
  • Base scenario validation with unchanged sensor data
  • Gyroscope and accelerometer failure scenarios
  • Sensor spike detection and handling
  • Timing anomaly management
  • System flag states (inputsOK, sensorsOK) verification

Implementation Analysis

The testing approach uses scenario-based validation with controlled sensor data modifications. The implementation leverages Python’s unittest framework and numpy for numerical comparisons.

Key patterns include:
  • Scenario enumeration for different test cases
  • Log modification utilities for sensor data manipulation
  • Precise numerical comparisons with appropriate tolerances
  • Systematic validation of system state flags

Technical Details

Testing tools and configuration:
  • LogReader for data ingestion
  • Process replay framework for system simulation
  • Numpy for numerical analysis
  • Custom data modification utilities
  • Scenario-based test execution framework
  • Predefined test routes and comparison fields

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices for autonomous systems.

Notable practices include:
  • Comprehensive edge case coverage
  • Precise numerical tolerance specifications
  • Clear test case documentation
  • Systematic scenario organization
  • Robust data validation approaches
  • Modular test structure with reusable components

commaai/openpilot

selfdrive/locationd/test/test_locationd_scenarios.py

            
import numpy as np
from collections import defaultdict
from enum import Enum

from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.test.process_replay.migration import migrate_all
from openpilot.selfdrive.test.process_replay.process_replay import replay_process_with_name

# TODO find a new segment to test
TEST_ROUTE = "4019fff6e54cf1c7|00000123--4bc0d95ef6/5"
GPS_MESSAGES = ['gpsLocationExternal', 'gpsLocation']
SELECT_COMPARE_FIELDS = {
  'yaw_rate': ['angularVelocityDevice', 'z'],
  'roll': ['orientationNED', 'x'],
  'inputs_flag': ['inputsOK'],
  'sensors_flag': ['sensorsOK'],
}
JUNK_IDX = 100
CONSISTENT_SPIKES_COUNT = 10


class Scenario(Enum):
  BASE = 'base'
  GYRO_OFF = 'gyro_off'
  GYRO_SPIKE_MIDWAY = 'gyro_spike_midway'
  ACCEL_OFF = 'accel_off'
  ACCEL_SPIKE_MIDWAY = 'accel_spike_midway'
  SENSOR_TIMING_SPIKE_MIDWAY = 'timing_spikes'
  SENSOR_TIMING_CONSISTENT_SPIKES = 'timing_consistent_spikes'


def get_select_fields_data(logs):
  def get_nested_keys(msg, keys):
    val = None
    for key in keys:
      val = getattr(msg if val is None else val, key) if isinstance(key, str) else val[key]
    return val
  lp = [x.livePose for x in logs if x.which() == 'livePose']
  data = defaultdict(list)
  for msg in lp:
    for key, fields in SELECT_COMPARE_FIELDS.items():
      data[key].append(get_nested_keys(msg, fields))
  for key in data:
    data[key] = np.array(data[key][JUNK_IDX:], dtype=float)
  return data


def modify_logs_midway(logs, which, count, fn):
  non_which = [x for x in logs if x.which() != which]
  which = [x for x in logs if x.which() == which]
  temps = which[len(which) // 2:len(which) // 2 + count]
  for i, temp in enumerate(temps):
    temp = temp.as_builder()
    fn(temp)
    which[len(which) // 2 + i] = temp.as_reader()
  return sorted(non_which + which, key=lambda x: x.logMonoTime)


def run_scenarios(scenario, logs):
  if scenario == Scenario.BASE:
    pass

  elif scenario == Scenario.GYRO_OFF:
    logs = sorted([x for x in logs if x.which() != 'gyroscope'], key=lambda x: x.logMonoTime)

  elif scenario == Scenario.GYRO_SPIKE_MIDWAY:
    def gyro_spike(msg):
      msg.gyroscope.gyroUncalibrated.v[0] += 3.0
    logs = modify_logs_midway(logs, 'gyroscope', 1, gyro_spike)

  elif scenario == Scenario.ACCEL_OFF:
    logs = sorted([x for x in logs if x.which() != 'accelerometer'], key=lambda x: x.logMonoTime)

  elif scenario == Scenario.ACCEL_SPIKE_MIDWAY:
    def acc_spike(msg):
      msg.accelerometer.acceleration.v[0] += 10.0
    logs = modify_logs_midway(logs, 'accelerometer', 1, acc_spike)

  elif scenario == Scenario.SENSOR_TIMING_SPIKE_MIDWAY or scenario == Scenario.SENSOR_TIMING_CONSISTENT_SPIKES:
    def timing_spike(msg):
      msg.accelerometer.timestamp -= int(0.150 * 1e9)
    count = 1 if scenario == Scenario.SENSOR_TIMING_SPIKE_MIDWAY else CONSISTENT_SPIKES_COUNT
    logs = modify_logs_midway(logs, 'accelerometer', count, timing_spike)

  replayed_logs = replay_process_with_name(name='locationd', lr=logs)
  return get_select_fields_data(logs), get_select_fields_data(replayed_logs)


class TestLocationdScenarios:
  """
  Test locationd with different scenarios. In all these scenarios, we expect the following:
    - locationd kalman filter should never go unstable (we care mostly about yaw_rate, roll, gpsOK, inputsOK, sensorsOK)
    - faulty values should be ignored, with appropriate flags set
  """

  @classmethod
  def setup_class(cls):
    cls.logs = migrate_all(LogReader(TEST_ROUTE))

  def test_base(self):
    """
    Test: unchanged log
    Expected Result:
      - yaw_rate: unchanged
      - roll: unchanged
    """
    orig_data, replayed_data = run_scenarios(Scenario.BASE, self.logs)
    assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35))
    assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55))

  def test_gyro_off(self):
    """
    Test: no gyroscope message for the entire segment
    Expected Result:
      - yaw_rate: 0
      - roll: 0
      - sensorsOK: False
    """
    _, replayed_data = run_scenarios(Scenario.GYRO_OFF, self.logs)
    assert np.allclose(replayed_data['yaw_rate'], 0.0)
    assert np.allclose(replayed_data['roll'], 0.0)
    assert np.all(replayed_data['sensors_flag'] == 0.0)

  def test_gyro_spikes(self):
    """
    Test: a gyroscope spike in the middle of the segment
    Expected Result:
      - yaw_rate: unchanged
      - roll: unchanged
      - inputsOK: False for some time after the spike, True for the rest
    """
    orig_data, replayed_data = run_scenarios(Scenario.GYRO_SPIKE_MIDWAY, self.logs)
    assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35))
    assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55))
    assert np.diff(replayed_data['inputs_flag'])[499] == -1.0
    assert np.diff(replayed_data['inputs_flag'])[704] == 1.0

  def test_accel_off(self):
    """
    Test: no accelerometer message for the entire segment
    Expected Result:
      - yaw_rate: 0
      - roll: 0
      - sensorsOK: False
    """
    _, replayed_data = run_scenarios(Scenario.ACCEL_OFF, self.logs)
    assert np.allclose(replayed_data['yaw_rate'], 0.0)
    assert np.allclose(replayed_data['roll'], 0.0)
    assert np.all(replayed_data['sensors_flag'] == 0.0)

  def test_accel_spikes(self):
    """
    ToDo:
    Test: an accelerometer spike in the middle of the segment
    Expected Result: Right now, the kalman filter is not robust to small spikes like it is to gyroscope spikes.
    """
    orig_data, replayed_data = run_scenarios(Scenario.ACCEL_SPIKE_MIDWAY, self.logs)
    assert np.allclose(orig_data['yaw_rate'], replayed_data['yaw_rate'], atol=np.radians(0.35))
    assert np.allclose(orig_data['roll'], replayed_data['roll'], atol=np.radians(0.55))

  def test_single_timing_spike(self):
    """
    Test: timing of 150ms off for the single accelerometer message in the middle of the segment
    Expected Result: the message is ignored, and inputsOK is False for that time
    """
    orig_data, replayed_data = run_scenarios(Scenario.SENSOR_TIMING_SPIKE_MIDWAY, self.logs)
    assert np.all(replayed_data['inputs_flag'] == orig_data['inputs_flag'])
    assert np.all(replayed_data['sensors_flag'] == orig_data['sensors_flag'])

  def test_consistent_timing_spikes(self):
    """
    Test: consistent timing spikes for N accelerometer messages in the middle of the segment
    Expected Result: inputsOK becomes False after N of bad measurements
    """
    orig_data, replayed_data = run_scenarios(Scenario.SENSOR_TIMING_CONSISTENT_SPIKES, self.logs)
    assert np.diff(replayed_data['inputs_flag'])[500] == -1.0
    assert np.diff(replayed_data['inputs_flag'])[787] == 1.0