Back to Repositories

Validating Camera Process Power Consumption in OpenPilot

This test suite validates power consumption and message processing for various camera-related processes in the OpenPilot system. It measures power draw and message counts for critical components like camerad, modeld, and encoderd while ensuring they meet specified performance thresholds.

Test Coverage Overview

The test suite provides comprehensive coverage of power consumption patterns across multiple OpenPilot processes.

Key areas tested include:
  • Power draw measurements for camera processes (camerad, modeld, dmonitoringmodeld, encoderd)
  • Message processing verification for system components
  • Baseline power consumption validation
  • Performance thresholds and tolerances for each process

Implementation Analysis

The testing approach utilizes pytest fixtures and mock messaging to isolate process behavior. It implements a sophisticated warmup period to ensure stable measurements, with configurable sampling times and tolerance levels for different processes.

Key implementation features:
  • Dynamic message counting and verification
  • Power draw sampling with configurable duration
  • Automated process management and cleanup
  • Tabulated results presentation

Technical Details

Testing infrastructure includes:
  • pytest framework for test organization
  • cereal messaging system for IPC
  • NumPy for numerical comparisons
  • Custom power monitoring utilities
  • Process management tools
  • Tabulate for results formatting

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper test setup and teardown management
  • Comprehensive error handling and reporting
  • Configurable tolerance levels for different processes
  • Clear separation of concerns between test cases
  • Detailed results logging and presentation

commaai/openpilot

system/hardware/tici/tests/test_power_draw.py

            
from collections import defaultdict, deque
import pytest
import time
import numpy as np
from dataclasses import dataclass
from tabulate import tabulate

import cereal.messaging as messaging
from cereal.services import SERVICE_LIST
from opendbc.car.car_helpers import get_demo_car_params
from openpilot.common.mock import mock_messages
from openpilot.common.params import Params
from openpilot.system.hardware.tici.power_monitor import get_power
from openpilot.system.manager.process_config import managed_processes
from openpilot.system.manager.manager import manager_cleanup

SAMPLE_TIME = 8       # seconds to sample power
MAX_WARMUP_TIME = 30  # seconds to wait for SAMPLE_TIME consecutive valid samples

@dataclass
class Proc:
  procs: list[str]
  power: float
  msgs: list[str]
  rtol: float = 0.05
  atol: float = 0.12

  @property
  def name(self):
    return '+'.join(self.procs)


PROCS = [
  Proc(['camerad'], 1.75, msgs=['roadCameraState', 'wideRoadCameraState', 'driverCameraState']),
  Proc(['modeld'], 1.12, atol=0.2, msgs=['modelV2']),
  Proc(['dmonitoringmodeld'], 0.6, msgs=['driverStateV2']),
  Proc(['encoderd'], 0.23, msgs=[]),
]


@pytest.mark.tici
class TestPowerDraw:

  def setup_method(self):
    Params().put("CarParams", get_demo_car_params().to_bytes())

    # wait a bit for power save to disable
    time.sleep(5)

  def teardown_method(self):
    manager_cleanup()

  def get_expected_messages(self, proc):
    return int(sum(SAMPLE_TIME * SERVICE_LIST[msg].frequency for msg in proc.msgs))

  def valid_msg_count(self, proc, msg_counts):
    msgs_received = sum(msg_counts[msg] for msg in proc.msgs)
    msgs_expected = self.get_expected_messages(proc)
    return np.core.numeric.isclose(msgs_expected, msgs_received, rtol=.02, atol=2)

  def valid_power_draw(self, proc, used):
    return np.core.numeric.isclose(used, proc.power, rtol=proc.rtol, atol=proc.atol)

  def tabulate_msg_counts(self, msgs_and_power):
    msg_counts = defaultdict(int)
    for _, counts in msgs_and_power:
      for msg, count in counts.items():
        msg_counts[msg] += count
    return msg_counts

  def get_power_with_warmup_for_target(self, proc, prev):
    socks = {msg: messaging.sub_sock(msg) for msg in proc.msgs}
    for sock in socks.values():
      messaging.drain_sock_raw(sock)

    msgs_and_power = deque([], maxlen=SAMPLE_TIME)

    start_time = time.monotonic()

    while (time.monotonic() - start_time) < MAX_WARMUP_TIME:
      power = get_power(1)
      iteration_msg_counts = {}
      for msg,sock in socks.items():
        iteration_msg_counts[msg] = len(messaging.drain_sock_raw(sock))
      msgs_and_power.append((power, iteration_msg_counts))

      if len(msgs_and_power) < SAMPLE_TIME:
        continue

      msg_counts = self.tabulate_msg_counts(msgs_and_power)
      now = np.mean([m[0] for m in msgs_and_power])

      if self.valid_msg_count(proc, msg_counts) and self.valid_power_draw(proc, now - prev):
        break

    return now, msg_counts, time.monotonic() - start_time - SAMPLE_TIME

  @mock_messages(['livePose'])
  def test_camera_procs(self, subtests):
    baseline = get_power()

    prev = baseline
    used = {}
    warmup_time = {}
    msg_counts = {}

    for proc in PROCS:
      for p in proc.procs:
        managed_processes[p].start()
      now, local_msg_counts, warmup_time[proc.name] = self.get_power_with_warmup_for_target(proc, prev)
      msg_counts.update(local_msg_counts)

      used[proc.name] = now - prev
      prev = now

    manager_cleanup()

    tab = [['process', 'expected (W)', 'measured (W)', '# msgs expected', '# msgs received', "warmup time (s)"]]
    for proc in PROCS:
      cur = used[proc.name]
      expected = proc.power
      msgs_received = sum(msg_counts[msg] for msg in proc.msgs)
      tab.append([proc.name, round(expected, 2), round(cur, 2), self.get_expected_messages(proc), msgs_received, round(warmup_time[proc.name], 2)])
      with subtests.test(proc=proc.name):
        assert self.valid_msg_count(proc, msg_counts), f"expected {self.get_expected_messages(proc)} msgs, got {msgs_received} msgs"
        assert self.valid_power_draw(proc, cur), f"expected {expected:.2f}W, got {cur:.2f}W"
    print(tabulate(tab))
    print(f"Baseline {baseline:.2f}W
")