Testing State Machine Transitions in openpilot Self-Driving System
This test suite validates the state machine implementation in openpilot’s self-driving system. It thoroughly tests state transitions, event handling, and timing behaviors that are critical for safe autonomous operation. The tests ensure proper handling of different operational states like enabled, disabled, and various override conditions.
Test Coverage Overview
Implementation Analysis
Technical Details
Best Practices Demonstrated
commaai/openpilot
selfdrive/selfdrived/tests/test_state_machine.py
from cereal import log
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.selfdrived.state import StateMachine, SOFT_DISABLE_TIME
from openpilot.selfdrive.selfdrived.events import Events, ET, EVENTS, NormalPermanentAlert
State = log.SelfdriveState.OpenpilotState
# The event types that maintain the current state
MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,),
State.preEnabled: (ET.PRE_ENABLE,), State.overriding: (ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)}
ALL_STATES = tuple(State.schema.enumerants.values())
# The event types checked in DISABLED section of state machine
ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE, ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)
def make_event(event_types):
event = {}
for ev in event_types:
event[ev] = NormalPermanentAlert("alert")
EVENTS[0] = event
return 0
class TestStateMachine:
def setup_method(self):
self.events = Events()
self.state_machine = StateMachine()
self.state_machine.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
def test_immediate_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.IMMEDIATE_DISABLE]))
self.state_machine.state = state
self.state_machine.update(self.events)
assert State.disabled == self.state_machine.state
self.events.clear()
def test_user_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.USER_DISABLE]))
self.state_machine.state = state
self.state_machine.update(self.events)
assert State.disabled == self.state_machine.state
self.events.clear()
def test_soft_disable(self):
for state in ALL_STATES:
if state == State.preEnabled: # preEnabled considers NO_ENTRY instead
continue
for et in MAINTAIN_STATES[state]:
self.events.add(make_event([et, ET.SOFT_DISABLE]))
self.state_machine.state = state
self.state_machine.update(self.events)
assert self.state_machine.state == State.disabled if state == State.disabled else State.softDisabling
self.events.clear()
def test_soft_disable_timer(self):
self.state_machine.state = State.enabled
self.events.add(make_event([ET.SOFT_DISABLE]))
self.state_machine.update(self.events)
for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)):
assert self.state_machine.state == State.softDisabling
self.state_machine.update(self.events)
assert self.state_machine.state == State.disabled
def test_no_entry(self):
# Make sure noEntry keeps us disabled
for et in ENABLE_EVENT_TYPES:
self.events.add(make_event([ET.NO_ENTRY, et]))
self.state_machine.update(self.events)
assert self.state_machine.state == State.disabled
self.events.clear()
def test_no_entry_pre_enable(self):
# preEnabled with noEntry event
self.state_machine.state = State.preEnabled
self.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE]))
self.state_machine.update(self.events)
assert self.state_machine.state == State.preEnabled
def test_maintain_states(self):
# Given current state's event type, we should maintain state
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.state_machine.state = state
self.events.add(make_event([et]))
self.state_machine.update(self.events)
assert self.state_machine.state == state
self.events.clear()