Back to Repositories

Testing WebRTC Stream Management and Service Bridging in OpenPilot

This test suite validates the WebRTC streaming functionality in OpenPilot’s webrtcd module, focusing on video/audio stream handling and service bridging capabilities. It ensures reliable real-time communication and proper resource management for the OpenPilot system.

Test Coverage Overview

The test suite provides comprehensive coverage of WebRTC streaming functionality in OpenPilot.

Key areas tested include:
  • Video stream initialization and reception
  • Audio stream handling
  • Service bridging with configurable input/output services
  • Connection establishment and termination
  • Resource cleanup and session management

Implementation Analysis

The testing approach utilizes pytest’s async capabilities with parameterized test cases to validate different service configurations.

Notable patterns include:
  • Async/await pattern for stream operations
  • Mock request handling for WebRTC session setup
  • Timeout-aware assertions for async operations
  • Parameterized testing for various service combinations

Technical Details

Testing infrastructure leverages:
  • pytest-asyncio for asynchronous testing
  • aiortc for WebRTC functionality
  • parameterized for test case variations
  • Custom WebRTCOfferBuilder for stream setup
  • Mock objects for request simulation
  • Custom timeout wrapper for async operations

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through comprehensive validation of the WebRTC implementation.

Notable practices include:
  • Systematic cleanup of resources
  • Parameterized test cases for coverage
  • Robust timeout handling
  • Clear separation of setup and verification steps
  • Mock usage for external dependencies

commaai/openpilot

system/webrtc/tests/test_webrtcd.py

            
import pytest
import asyncio
import json
# for aiortc and its dependencies
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning) # TODO: remove this when google-crc32c publish a python3.12 wheel

from openpilot.system.webrtc.webrtcd import get_stream

import aiortc
from teleoprtc import WebRTCOfferBuilder
from parameterized import parameterized_class


@parameterized_class(("in_services", "out_services"), [
  (["testJoystick"], ["carState"]),
  ([], ["carState"]),
  (["testJoystick"], []),
  ([], []),
])
@pytest.mark.asyncio
class TestWebrtcdProc:
  async def assertCompletesWithTimeout(self, awaitable, timeout=1):
    try:
      async with asyncio.timeout(timeout):
        await awaitable
    except TimeoutError:
      pytest.fail("Timeout while waiting for awaitable to complete")

  async def test_webrtcd(self, mocker):
    mock_request = mocker.MagicMock()
    async def connect(offer):
      body = {'sdp': offer.sdp, 'cameras': offer.video, 'bridge_services_in': self.in_services, 'bridge_services_out': self.out_services}
      mock_request.json.side_effect = mocker.AsyncMock(return_value=body)
      response = await get_stream(mock_request)
      response_json = json.loads(response.text)
      return aiortc.RTCSessionDescription(**response_json)

    builder = WebRTCOfferBuilder(connect)
    builder.offer_to_receive_video_stream("road")
    builder.offer_to_receive_audio_stream()
    if len(self.in_services) > 0 or len(self.out_services) > 0:
      builder.add_messaging()

    stream = builder.stream()

    await self.assertCompletesWithTimeout(stream.start())
    await self.assertCompletesWithTimeout(stream.wait_for_connection())

    assert stream.has_incoming_video_track("road")
    assert stream.has_incoming_audio_track()
    assert stream.has_messaging_channel() == (len(self.in_services) > 0 or len(self.out_services) > 0)

    video_track, audio_track = stream.get_incoming_video_track("road"), stream.get_incoming_audio_track()
    await self.assertCompletesWithTimeout(video_track.recv())
    await self.assertCompletesWithTimeout(audio_track.recv())

    await self.assertCompletesWithTimeout(stream.stop())

    # cleanup, very implementation specific, test may break if it changes
    assert mock_request.app["streams"].__setitem__.called, "Implementation changed, please update this test"
    _, session = mock_request.app["streams"].__setitem__.call_args.args
    await self.assertCompletesWithTimeout(session.post_run_cleanup())