Back to Repositories

Validating Process Replay Testing in OpenPilot

This test suite implements comprehensive process replay testing for the openpilot system, focusing on validating message outputs across different vehicle interfaces and processing components. It enables regression testing by comparing process outputs against reference logs from various car segments.

Test Coverage Overview

The test suite covers multiple vehicle brands including Hyundai, Toyota, Honda, and others, validating process outputs against reference logs. Key functionality includes message validation, tolerance checking, and cross-brand compatibility testing. Edge cases are handled through specific car segments and integration points include various vehicle interfaces and processing modules.

Implementation Analysis

The testing approach uses process replay methodology to compare output logs against reference data. It implements parallel execution using ProcessPoolExecutor for efficiency and uses LogReader/FileReader for data handling. The framework supports selective testing through whitelisting/blacklisting of processes and car brands.

Technical Details

Testing tools and libraries:
  • concurrent.futures for parallel processing
  • LogReader/FileReader for log handling
  • Azure blob storage for reference data
  • Custom comparison tools for log validation
  • Configurable tolerance and field filtering

Best Practices Demonstrated

The test implementation shows strong testing practices including parallelized execution for performance, comprehensive error handling, and flexible configuration options. Notable practices include:
  • Modular test configuration
  • Efficient parallel processing
  • Detailed diff reporting
  • Version-controlled reference data
  • Selective test filtering

commaai/openpilot

selfdrive/test/process_replay/test_processes.py

            
#!/usr/bin/env python3
import argparse
import concurrent.futures
import os
import sys
from collections import defaultdict
from tqdm import tqdm
from typing import Any

from opendbc.car.car_helpers import interface_names
from openpilot.common.git import get_commit
from openpilot.tools.lib.openpilotci import get_url, upload_file
from openpilot.selfdrive.test.process_replay.compare_logs import compare_logs, format_diff
from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, replay_process, \
                                                                   check_most_messages_valid
from openpilot.tools.lib.filereader import FileReader
from openpilot.tools.lib.logreader import LogReader, save_log

source_segments = [
  ("BODY", "937ccb7243511b65|2022-05-24--16-03-09--1"),        # COMMA.COMMA_BODY
  ("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"),     # HYUNDAI.HYUNDAI_SONATA
  ("HYUNDAI2", "d545129f3ca90f28|2022-11-07--20-43-08--3"),    # HYUNDAI.HYUNDAI_KIA_EV6 (+ QCOM GPS)
  ("TOYOTA", "0982d79ebb0de295|2021-01-04--17-13-21--13"),     # TOYOTA.TOYOTA_PRIUS
  ("TOYOTA2", "0982d79ebb0de295|2021-01-03--20-03-36--6"),     # TOYOTA.TOYOTA_RAV4
  ("TOYOTA3", "f7d7e3538cda1a2a|2021-08-16--08-55-34--6"),     # TOYOTA.TOYOTA_COROLLA_TSS2
  ("HONDA", "eb140f119469d9ab|2021-06-12--10-46-24--27"),      # HONDA.HONDA_CIVIC (NIDEC)
  ("HONDA2", "7d2244f34d1bbcda|2021-06-25--12-25-37--26"),     # HONDA.HONDA_ACCORD (BOSCH)
  ("CHRYSLER", "4deb27de11bee626|2021-02-20--11-28-55--8"),    # CHRYSLER.CHRYSLER_PACIFICA_2018_HYBRID
  ("RAM", "17fc16d840fe9d21|2023-04-26--13-28-44--5"),         # CHRYSLER.RAM_1500_5TH_GEN
  ("SUBARU", "341dccd5359e3c97|2022-09-12--10-35-33--3"),      # SUBARU.SUBARU_OUTBACK
  ("GM", "0c58b6a25109da2b|2021-02-23--16-35-50--11"),         # GM.CHEVROLET_VOLT
  ("GM2", "376bf99325883932|2022-10-27--13-41-22--1"),         # GM.CHEVROLET_BOLT_EUV
  ("NISSAN", "35336926920f3571|2021-02-12--18-38-48--46"),     # NISSAN.NISSAN_XTRAIL
  ("VOLKSWAGEN", "de9592456ad7d144|2021-06-29--11-00-15--6"),  # VOLKSWAGEN.VOLKSWAGEN_GOLF
  ("MAZDA", "bd6a637565e91581|2021-10-30--15-14-53--4"),       # MAZDA.MAZDA_CX9_2021
  ("FORD", "54827bf84c38b14f|2023-01-26--21-59-07--4"),        # FORD.FORD_BRONCO_SPORT_MK1

  # Enable when port is tested and dashcamOnly is no longer set
  #("VOLKSWAGEN2", "3cfdec54aa035f3f|2022-07-19--23-45-10--2"),  # VOLKSWAGEN.VOLKSWAGEN_PASSAT_NMS
]

segments = [
  ("BODY", "regenA67A128BCD8|2024-08-30--02-36-22--0"),
  ("HYUNDAI", "regen9CBD921E93E|2024-08-30--02-38-51--0"),
  ("HYUNDAI2", "regen306779F6870|2024-10-03--04-03-23--0"),
  ("TOYOTA", "regen1CA7A48E6F7|2024-08-30--02-45-08--0"),
  ("TOYOTA2", "regen6E484EDAB96|2024-08-30--02-47-37--0"),
  ("TOYOTA3", "regen4CE950B0267|2024-08-30--02-51-30--0"),
  ("HONDA", "regenC8F0D6ADC5C|2024-08-30--02-54-01--0"),
  ("HONDA2", "regen4B38A7428CD|2024-08-30--02-56-31--0"),
  ("CHRYSLER", "regenF3DBBA9E8DF|2024-08-30--02-59-03--0"),
  ("RAM", "regenDB02684E00A|2024-08-30--03-02-54--0"),
  ("SUBARU", "regenAA1FF48CF1F|2024-08-30--03-06-45--0"),
  ("GM", "regen720F2BA4CF6|2024-08-30--03-09-15--0"),
  ("GM2", "regen9ADBECBCD1C|2024-08-30--03-13-04--0"),
  ("NISSAN", "regen58464878D07|2024-08-30--03-15-31--0"),
  ("VOLKSWAGEN", "regenED976DEB757|2024-08-30--03-18-02--0"),
  ("MAZDA", "regenACF84CCF482|2024-08-30--03-21-55--0"),
  ("FORD", "regen756F8230C21|2024-11-07--00-08-24--0"),
]

# dashcamOnly makes don't need to be tested until a full port is done
excluded_interfaces = ["mock", "tesla"]

BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/"
REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit")
EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"}


def run_test_process(data):
  segment, cfg, args, cur_log_fn, ref_log_path, lr_dat = data
  res = None
  if not args.upload_only:
    lr = LogReader.from_bytes(lr_dat)
    res, log_msgs = test_process(cfg, lr, segment, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs)
    # save logs so we can upload when updating refs
    save_log(cur_log_fn, log_msgs)

  if args.update_refs or args.upload_only:
    print(f'Uploading: {os.path.basename(cur_log_fn)}')
    assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}"
    upload_file(cur_log_fn, os.path.basename(cur_log_fn))
    os.remove(cur_log_fn)
  return (segment, cfg.proc_name, res)


def get_log_data(segment):
  r, n = segment.rsplit("--", 1)
  with FileReader(get_url(r, n, "rlog.zst")) as f:
    return (segment, f.read())


def test_process(cfg, lr, segment, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None):
  if ignore_fields is None:
    ignore_fields = []
  if ignore_msgs is None:
    ignore_msgs = []

  ref_log_msgs = list(LogReader(ref_log_path))

  try:
    log_msgs = replay_process(cfg, lr, disable_progress=True)
  except Exception as e:
    raise Exception("failed on segment: " + segment) from e

  if not check_most_messages_valid(log_msgs):
    return f"Route did not have enough valid messages: {new_log_path}", log_msgs

  # skip this check if the segment is using qcom gps
  if cfg.proc_name != 'ubloxd' or any(m.which() in cfg.pubs for m in lr):
    seen_msgs = {m.which() for m in log_msgs}
    expected_msgs = set(cfg.subs)
    if seen_msgs != expected_msgs:
      return f"Expected messages: {expected_msgs}, but got: {seen_msgs}", log_msgs

  try:
    return compare_logs(ref_log_msgs, log_msgs, ignore_fields + cfg.ignore, ignore_msgs, cfg.tolerance), log_msgs
  except Exception as e:
    return str(e), log_msgs


if __name__ == "__main__":
  all_cars = {car for car, _ in segments}
  all_procs = {cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS}

  cpu_count = os.cpu_count() or 1

  parser = argparse.ArgumentParser(description="Regression test to identify changes in a process's output")
  parser.add_argument("--whitelist-procs", type=str, nargs="*", default=all_procs,
                      help="Whitelist given processes from the test (e.g. controlsd)")
  parser.add_argument("--whitelist-cars", type=str, nargs="*", default=all_cars,
                      help="Whitelist given cars from the test (e.g. HONDA)")
  parser.add_argument("--blacklist-procs", type=str, nargs="*", default=[],
                      help="Blacklist given processes from the test (e.g. controlsd)")
  parser.add_argument("--blacklist-cars", type=str, nargs="*", default=[],
                      help="Blacklist given cars from the test (e.g. HONDA)")
  parser.add_argument("--ignore-fields", type=str, nargs="*", default=[],
                      help="Extra fields or msgs to ignore (e.g. driverMonitoringState.events)")
  parser.add_argument("--ignore-msgs", type=str, nargs="*", default=[],
                      help="Msgs to ignore (e.g. carEvents)")
  parser.add_argument("--update-refs", action="store_true",
                      help="Updates reference logs using current commit")
  parser.add_argument("--upload-only", action="store_true",
                      help="Skips testing processes and uploads logs from previous test run")
  parser.add_argument("-j", "--jobs", type=int, default=max(cpu_count - 2, 1),
                      help="Max amount of parallel jobs")
  args = parser.parse_args()

  tested_procs = set(args.whitelist_procs) - set(args.blacklist_procs)
  tested_cars = set(args.whitelist_cars) - set(args.blacklist_cars)
  tested_cars = {c.upper() for c in tested_cars}

  full_test = (tested_procs == all_procs) and (tested_cars == all_cars) and all(len(x) == 0 for x in (args.ignore_fields, args.ignore_msgs))
  upload = args.update_refs or args.upload_only
  os.makedirs(os.path.dirname(FAKEDATA), exist_ok=True)

  if upload:
    assert full_test, "Need to run full test when updating refs"

  try:
    with open(REF_COMMIT_FN) as f:
      ref_commit = f.read().strip()
  except FileNotFoundError:
    print("Couldn't find reference commit")
    sys.exit(1)

  cur_commit = get_commit()
  if not cur_commit:
    raise Exception("Couldn't get current commit")

  print(f"***** testing against commit {ref_commit} *****")

  # check to make sure all car brands are tested
  if full_test:
    untested = (set(interface_names) - set(excluded_interfaces)) - {c.lower() for c in tested_cars}
    assert len(untested) == 0, f"Cars missing routes: {str(untested)}"

  log_paths: defaultdict[str, dict[str, dict[str, str]]] = defaultdict(lambda: defaultdict(dict))
  with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool:
    if not args.upload_only:
      download_segments = [seg for car, seg in segments if car in tested_cars]
      log_data: dict[str, LogReader] = {}
      p1 = pool.map(get_log_data, download_segments)
      for segment, lr in tqdm(p1, desc="Getting Logs", total=len(download_segments)):
        log_data[segment] = lr

    pool_args: Any = []
    for car_brand, segment in segments:
      if car_brand not in tested_cars:
        continue

      for cfg in CONFIGS:
        if cfg.proc_name not in tested_procs:
          continue

        # to speed things up, we only test all segments on card
        if cfg.proc_name != 'card' and car_brand not in ('HYUNDAI', 'TOYOTA', 'HONDA', 'SUBARU', 'FORD'):
          continue

        cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.zst")
        if args.update_refs:  # reference logs will not exist if routes were just regenerated
          ref_log_path = get_url(*segment.rsplit("--", 1,), "rlog.zst")
        else:
          ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.zst")
          ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn)

        dat = None if args.upload_only else log_data[segment]
        pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat))

        log_paths[segment][cfg.proc_name]['ref'] = ref_log_path
        log_paths[segment][cfg.proc_name]['new'] = cur_log_fn

    results: Any = defaultdict(dict)
    p2 = pool.map(run_test_process, pool_args)
    for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)):
      if not args.upload_only:
        results[segment][proc] = result

  diff_short, diff_long, failed = format_diff(results, log_paths, ref_commit)
  if not upload:
    with open(os.path.join(PROC_REPLAY_DIR, "diff.txt"), "w") as f:
      f.write(diff_long)
    print(diff_short)

    if failed:
      print("TEST FAILED")
      print("

To push the new reference logs for this commit run:")
      print("./test_processes.py --upload-only")
    else:
      print("TEST SUCCEEDED")

  else:
    with open(REF_COMMIT_FN, "w") as f:
      f.write(cur_commit)
    print(f"

Updated reference logs for commit: {cur_commit}")

  sys.exit(int(failed))