Back to Repositories

Testing SPI Communication and CAN Message Handling in Openpilot

A comprehensive test suite for validating SPI communication and CAN message handling in the Openpilot system’s panda interface. This test suite focuses on verifying data integrity, timing, and message transmission reliability between the system and panda device.

Test Coverage Overview

The test suite provides extensive coverage of SPI communication and CAN message handling:
  • CAN message transmission and reception validation
  • Message corruption detection and handling
  • Timing verification for multiple services
  • Panda state monitoring and validation
  • Peripheral state verification including voltage and current checks

Implementation Analysis

The implementation utilizes pytest’s robust testing framework with custom decorators and fixtures. The testing approach employs process isolation through @with_processes decorator and systematic validation of message integrity across multiple communication channels.

Key patterns include service subscription management, timing analysis using numpy, and comprehensive assertion checks for data validation.

Technical Details

Testing tools and configuration:
  • pytest framework with custom markers (@pytest.mark.tici)
  • cereal messaging system for communication
  • numpy for statistical analysis
  • Environment variable configuration for test control
  • Custom test helpers for process management

Best Practices Demonstrated

The test implementation showcases several testing best practices:
  • Proper test isolation and setup/teardown handling
  • Comprehensive error checking and validation
  • Performance timing analysis and verification
  • Modular test structure with subtests
  • Clear separation of test configuration and execution

commaai/openpilot

selfdrive/pandad/tests/test_pandad_spi.py

            
import os
import time
import numpy as np
import pytest
import random

import cereal.messaging as messaging
from cereal.services import SERVICE_LIST
from openpilot.system.hardware import HARDWARE
from openpilot.selfdrive.test.helpers import with_processes
from openpilot.selfdrive.pandad.tests.test_pandad_loopback import setup_pandad, send_random_can_messages

JUNGLE_SPAM = "JUNGLE_SPAM" in os.environ

@pytest.mark.tici
class TestBoarddSpi:
  @classmethod
  def setup_class(cls):
    if HARDWARE.get_device_type() == 'tici':
      pytest.skip("only for spi pandas")
    os.environ['STARTED'] = '1'
    os.environ['SPI_ERR_PROB'] = '0.001'
    if not JUNGLE_SPAM:
      os.environ['BOARDD_LOOPBACK'] = '1'

  @with_processes(['pandad'])
  def test_spi_corruption(self, subtests):
    setup_pandad(1)

    sendcan = messaging.pub_sock('sendcan')
    socks = {s: messaging.sub_sock(s, conflate=False, timeout=100) for s in ('can', 'pandaStates', 'peripheralState')}
    time.sleep(2)
    for s in socks.values():
      messaging.drain_sock_raw(s)

    total_recv_count = 0
    total_sent_count = 0
    sent_msgs = {bus: list() for bus in range(3)}

    st = time.monotonic()
    ts = {s: list() for s in socks.keys()}
    for _ in range(int(os.getenv("TEST_TIME", "20"))):
      # send some CAN messages
      if not JUNGLE_SPAM:
        sent = send_random_can_messages(sendcan, random.randrange(2, 20))
        for k, v in sent.items():
          sent_msgs[k].extend(list(v))
          total_sent_count += len(v)

      for service, sock in socks.items():
        for m in messaging.drain_sock(sock):
          ts[service].append(m.logMonoTime)

          # sanity check for corruption
          assert m.valid or (service == "can")
          if service == "can":
            for msg in m.can:
              if JUNGLE_SPAM:
                # PandaJungle.set_generated_can(True)
                i = msg.address - 0x200
                assert msg.address >= 0x200
                assert msg.src == (i%3)
                assert msg.dat == b"\xff"*(i%8)
                total_recv_count += 1
                continue

              if msg.src > 4:
                continue
              key = (msg.address, msg.dat)
              assert key in sent_msgs[msg.src], f"got unexpected msg: {msg.src=} {msg.address=} {msg.dat=}"
              # TODO: enable this
              #sent_msgs[msg.src].remove(key)
              total_recv_count += 1
          elif service == "pandaStates":
            assert len(m.pandaStates) == 1
            ps = m.pandaStates[0]
            assert ps.uptime < 1000
            assert ps.pandaType == "tres"
            assert ps.ignitionLine
            assert not ps.ignitionCan
            assert 4000 < ps.voltage < 14000
          elif service == "peripheralState":
            ps = m.peripheralState
            assert ps.pandaType == "tres"
            assert 4000 < ps.voltage < 14000
            assert 50 < ps.current < 1000
            assert ps.fanSpeedRpm < 10000

      time.sleep(0.5)
    et = time.monotonic() - st

    print("
======== timing report ========")
    for service, times in ts.items():
      dts = np.diff(times)/1e6
      print(service.ljust(17), f"{np.mean(dts):7.2f} {np.min(dts):7.2f} {np.max(dts):7.2f}")
      with subtests.test(msg="timing check", service=service):
        edt = 1e3 / SERVICE_LIST[service].frequency
        assert edt*0.9 < np.mean(dts) < edt*1.1
        assert np.max(dts) < edt*8
        assert np.min(dts) < edt
        assert len(dts) >= ((et-0.5)*SERVICE_LIST[service].frequency*0.8)

    with subtests.test(msg="CAN traffic"):
      print(f"Sent {total_sent_count} CAN messages, got {total_recv_count} back. {total_recv_count/(total_sent_count+1e-4):.2%} received")
      assert total_recv_count > 20