Back to Repositories

Validating Automated Update System Integration in commaai/openpilot

This test suite validates the update functionality in commaai/openpilot’s system, focusing on automated software updates and NEOS version management. It ensures reliable and secure update processes through comprehensive unit testing with pytest.

Test Coverage Overview

The test suite provides extensive coverage of openpilot’s update system, including:

  • Basic update detection and application scenarios
  • Multiple instance handling and locking mechanisms
  • NEOS version management and cache clearing
  • Overlay filesystem initialization and consistency checks
  • Update state parameter validation

Implementation Analysis

The testing approach utilizes pytest fixtures and temporary directory management to create isolated test environments. It implements sophisticated git operations to simulate updates and employs subprocess management for testing the updated.py system component.

Key patterns include environment variable manipulation, signal handling (SIGHUP), and parameter store interactions through the Params class.

Technical Details

  • Testing Framework: pytest with custom markers (@pytest.mark.tici)
  • Key Dependencies: datetime, os, tempfile, subprocess, signal
  • Configuration: Custom environment variables for test paths and git settings
  • Setup: Automated git repository cloning and submodule initialization
  • Validation: Timestamp checks, parameter verification, and filesystem state validation

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through thorough setup/teardown procedures, comprehensive error handling, and isolation of test environments.

  • Proper resource cleanup in teardown methods
  • Timeout handling for async operations
  • Randomized test data generation
  • Systematic state verification
  • Clear separation of test helpers and test cases

commaai/openpilot

selfdrive/test/test_updated.py

            
import datetime
import os
import pytest
import time
import tempfile
import shutil
import signal
import subprocess
import random

from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params


@pytest.mark.tici
class TestUpdated:

  def setup_method(self):
    self.updated_proc = None

    self.tmp_dir = tempfile.TemporaryDirectory()
    org_dir = os.path.join(self.tmp_dir.name, "commaai")

    self.basedir = os.path.join(org_dir, "openpilot")
    self.git_remote_dir = os.path.join(org_dir, "openpilot_remote")
    self.staging_dir = os.path.join(org_dir, "safe_staging")
    for d in [org_dir, self.basedir, self.git_remote_dir, self.staging_dir]:
      os.mkdir(d)

    self.neos_version = os.path.join(org_dir, "neos_version")
    self.neosupdate_dir = os.path.join(org_dir, "neosupdate")
    with open(self.neos_version, "w") as f:
      v = subprocess.check_output(r"bash -c 'source launch_env.sh && echo $REQUIRED_NEOS_VERSION'",
                                  cwd=BASEDIR, shell=True, encoding='utf8').strip()
      f.write(v)

    self.upper_dir = os.path.join(self.staging_dir, "upper")
    self.merged_dir = os.path.join(self.staging_dir, "merged")
    self.finalized_dir = os.path.join(self.staging_dir, "finalized")

    # setup local submodule remotes
    submodules = subprocess.check_output("git submodule --quiet foreach 'echo $name'",
                                         shell=True, cwd=BASEDIR, encoding='utf8').split()
    for s in submodules:
      sub_path = os.path.join(org_dir, s.split("_repo")[0])
      self._run(f"git clone {s} {sub_path}.git", cwd=BASEDIR)

    # setup two git repos, a remote and one we'll run updated in
    self._run([
      f"git clone {BASEDIR} {self.git_remote_dir}",
      f"git clone {self.git_remote_dir} {self.basedir}",
      f"cd {self.basedir} && git submodule init && git submodule update",
      f"cd {self.basedir} && scons -j{os.cpu_count()} cereal/ common/"
    ])

    self.params = Params(os.path.join(self.basedir, "persist/params"))
    self.params.clear_all()
    os.sync()

  def teardown_method(self):
    try:
      if self.updated_proc is not None:
        self.updated_proc.terminate()
        self.updated_proc.wait(30)
    except Exception as e:
      print(e)
    self.tmp_dir.cleanup()


  # *** test helpers ***


  def _run(self, cmd, cwd=None):
    if not isinstance(cmd, list):
      cmd = (cmd,)

    for c in cmd:
      subprocess.check_output(c, cwd=cwd, shell=True)

  def _get_updated_proc(self):
    os.environ["PYTHONPATH"] = self.basedir
    os.environ["GIT_AUTHOR_NAME"] = "testy tester"
    os.environ["GIT_COMMITTER_NAME"] = "testy tester"
    os.environ["GIT_AUTHOR_EMAIL"] = "[email protected]"
    os.environ["GIT_COMMITTER_EMAIL"] = "[email protected]"
    os.environ["UPDATER_TEST_IP"] = "localhost"
    os.environ["UPDATER_LOCK_FILE"] = os.path.join(self.tmp_dir.name, "updater.lock")
    os.environ["UPDATER_STAGING_ROOT"] = self.staging_dir
    os.environ["UPDATER_NEOS_VERSION"] = self.neos_version
    os.environ["UPDATER_NEOSUPDATE_DIR"] = self.neosupdate_dir
    updated_path = os.path.join(self.basedir, "system/updated.py")
    return subprocess.Popen(updated_path, env=os.environ)

  def _start_updater(self, offroad=True, nosleep=False):
    self.params.put_bool("IsOffroad", offroad)
    self.updated_proc = self._get_updated_proc()
    if not nosleep:
      time.sleep(1)

  def _update_now(self):
    self.updated_proc.send_signal(signal.SIGHUP)

  # TODO: this should be implemented in params
  def _read_param(self, key, timeout=1):
    ret = None
    start_time = time.monotonic()
    while ret is None:
      ret = self.params.get(key, encoding='utf8')
      if time.monotonic() - start_time > timeout:
        break
      time.sleep(0.01)
    return ret

  def _wait_for_update(self, timeout=30, clear_param=False):
    if clear_param:
      self.params.remove("LastUpdateTime")

    self._update_now()
    t = self._read_param("LastUpdateTime", timeout=timeout)
    if t is None:
      raise Exception("timed out waiting for update to complete")

  def _make_commit(self):
    all_dirs, all_files = [], []
    for root, dirs, files in os.walk(self.git_remote_dir):
      if ".git" in root:
        continue
      for d in dirs:
        all_dirs.append(os.path.join(root, d))
      for f in files:
        all_files.append(os.path.join(root, f))

    # make a new dir and some new files
    new_dir = os.path.join(self.git_remote_dir, "this_is_a_new_dir")
    os.mkdir(new_dir)
    for _ in range(random.randrange(5, 30)):
      for d in (new_dir, random.choice(all_dirs)):
        with tempfile.NamedTemporaryFile(dir=d, delete=False) as f:
          f.write(os.urandom(random.randrange(1, 1000000)))

    # modify some files
    for f in random.sample(all_files, random.randrange(5, 50)):
      with open(f, "w+") as ff:
        txt = ff.readlines()
        ff.seek(0)
        for line in txt:
          ff.write(line[::-1])

    # remove some files
    for f in random.sample(all_files, random.randrange(5, 50)):
      os.remove(f)

    # remove some dirs
    for d in random.sample(all_dirs, random.randrange(1, 10)):
      shutil.rmtree(d)

    # commit the changes
    self._run([
      "git add -A",
      "git commit -m 'an update'",
    ], cwd=self.git_remote_dir)

  def _check_update_state(self, update_available):
    # make sure LastUpdateTime is recent
    t = self._read_param("LastUpdateTime")
    last_update_time = datetime.datetime.fromisoformat(t)
    td = datetime.datetime.now(datetime.UTC).replace(tzinfo=None) - last_update_time
    assert td.total_seconds() < 10
    self.params.remove("LastUpdateTime")

    # wait a bit for the rest of the params to be written
    time.sleep(0.1)

    # check params
    update = self._read_param("UpdateAvailable")
    assert update == "1" == update_available, f"UpdateAvailable: {repr(update)}"
    assert self._read_param("UpdateFailedCount") == "0"

    # TODO: check that the finalized update actually matches remote
    # check the .overlay_init and .overlay_consistent flags
    assert os.path.isfile(os.path.join(self.basedir, ".overlay_init"))
    assert os.path.isfile(os.path.join(self.finalized_dir, ".overlay_consistent")) == update_available


  # *** test cases ***


  # Run updated for 100 cycles with no update
  def test_no_update(self):
    self._start_updater()
    for _ in range(100):
      self._wait_for_update(clear_param=True)
      self._check_update_state(False)

  # Let the updater run with no update for a cycle, then write an update
  def test_update(self):
    self._start_updater()

    # run for a cycle with no update
    self._wait_for_update(clear_param=True)
    self._check_update_state(False)

    # write an update to our remote
    self._make_commit()

    # run for a cycle to get the update
    self._wait_for_update(timeout=60, clear_param=True)
    self._check_update_state(True)

    # run another cycle with no update
    self._wait_for_update(clear_param=True)
    self._check_update_state(True)

  # Let the updater run for 10 cycles, and write an update every cycle
  @pytest.mark.skip("need to make this faster")
  def test_update_loop(self):
    self._start_updater()

    # run for a cycle with no update
    self._wait_for_update(clear_param=True)
    for _ in range(10):
      time.sleep(0.5)
      self._make_commit()
      self._wait_for_update(timeout=90, clear_param=True)
      self._check_update_state(True)

  # Test overlay re-creation after tracking a new file in basedir's git
  def test_overlay_reinit(self):
    self._start_updater()

    overlay_init_fn = os.path.join(self.basedir, ".overlay_init")

    # run for a cycle with no update
    self._wait_for_update(clear_param=True)
    self.params.remove("LastUpdateTime")
    first_mtime = os.path.getmtime(overlay_init_fn)

    # touch a file in the basedir
    self._run("touch new_file && git add new_file", cwd=self.basedir)

    # run another cycle, should have a new mtime
    self._wait_for_update(clear_param=True)
    second_mtime = os.path.getmtime(overlay_init_fn)
    assert first_mtime != second_mtime

    # run another cycle, mtime should be same as last cycle
    self._wait_for_update(clear_param=True)
    new_mtime = os.path.getmtime(overlay_init_fn)
    assert second_mtime == new_mtime

  # Make sure updated exits if another instance is running
  def test_multiple_instances(self):
    # start updated and let it run for a cycle
    self._start_updater()
    time.sleep(1)
    self._wait_for_update(clear_param=True)

    # start another instance
    second_updated = self._get_updated_proc()
    ret_code = second_updated.wait(timeout=5)
    assert ret_code is not None


  # *** test cases with NEOS updates ***


  # Run updated with no update, make sure it clears the old NEOS update
  def test_clear_neos_cache(self):
    # make the dir and some junk files
    os.mkdir(self.neosupdate_dir)
    for _ in range(15):
      with tempfile.NamedTemporaryFile(dir=self.neosupdate_dir, delete=False) as f:
        f.write(os.urandom(random.randrange(1, 1000000)))

    self._start_updater()
    self._wait_for_update(clear_param=True)
    self._check_update_state(False)
    assert not os.path.isdir(self.neosupdate_dir)

  # Let the updater run with no update for a cycle, then write an update
  @pytest.mark.skip("TODO: only runs on device")
  def test_update_with_neos_update(self):
    # bump the NEOS version and commit it
    self._run([
      "echo 'export REQUIRED_NEOS_VERSION=3' >> launch_env.sh",
      "git -c user.name='testy' -c user.email='[email protected]' \
       commit -am 'a neos update'",
    ], cwd=self.git_remote_dir)

    # run for a cycle to get the update
    self._start_updater()
    self._wait_for_update(timeout=60, clear_param=True)
    self._check_update_state(True)

    # TODO: more comprehensive check
    assert os.path.isdir(self.neosupdate_dir)