Back to Repositories

Validating OpenCL Image Processing Pipeline in openpilot

This test suite validates the image processing pipeline in the openpilot system, focusing on raw camera frame processing and YUV/RGB conversion functionality. It ensures consistent image processing across different hardware configurations and verifies the output against known reference hashes.

Test Coverage Overview

The test suite provides comprehensive coverage of the image processing pipeline:

Key functionality tested:
  • Raw camera frame processing
  • YUV to RGB color space conversion
  • OpenCL kernel initialization and execution
  • Frame buffer handling and memory management

Edge cases include hardware-specific configurations (PC vs TICI) and different frame offset scenarios.

Implementation Analysis

The testing approach utilizes replay-based verification, processing recorded camera frames through the OpenCL pipeline. It employs PyOpenCL for GPU acceleration and implements custom kernel configurations for different hardware platforms.

Key patterns include:
  • Frame buffer management with numpy arrays
  • OpenCL context and program initialization
  • Command queue execution for GPU processing
  • Hash-based output validation

Technical Details

Testing tools and configuration:
  • PyOpenCL for GPU-accelerated image processing
  • NumPy for array manipulation
  • Custom OpenCL kernels with hardware-specific build arguments
  • LogReader for test data replay
  • SHA1 hashing for output validation

Best Practices Demonstrated

The test implementation showcases several testing best practices:

  • Deterministic output validation using hash comparison
  • Hardware abstraction for cross-platform testing
  • Efficient memory management with GPU buffers
  • Modular function design for processing pipeline stages
  • Clear separation of initialization and processing logic

commaai/openpilot

selfdrive/test/process_replay/test_imgproc.py

            
import os
import numpy as np
import hashlib

import pyopencl as cl  # install with `PYOPENCL_CL_PRETEND_VERSION=2.0 pip install pyopencl`

from openpilot.system.hardware import PC, TICI
from openpilot.common.basedir import BASEDIR
from openpilot.common.transformations.camera import DEVICE_CAMERAS
from openpilot.system.camerad.snapshot.snapshot import yuv_to_rgb
from openpilot.tools.lib.logreader import LogReader

# TODO: check all sensors
TEST_ROUTE = "8345e3b82948d454|2022-05-04--13-45-33/0"

cam = DEVICE_CAMERAS[("tici", "ar0231")]
FRAME_WIDTH, FRAME_HEIGHT = (cam.dcam.width, cam.dcam.height)
FRAME_STRIDE = FRAME_WIDTH * 12 // 8 + 4

UV_WIDTH = FRAME_WIDTH // 2
UV_HEIGHT = FRAME_HEIGHT // 2
UV_SIZE = UV_WIDTH * UV_HEIGHT


def init_kernels(frame_offset=0):
  ctx = cl.create_some_context(interactive=False)

  with open(os.path.join(BASEDIR, 'system/camerad/cameras/process_raw.cl')) as f:
    build_args = f' -cl-fast-relaxed-math -cl-denorms-are-zero -cl-single-precision-constant -I{BASEDIR}/system/camerad/sensors ' + \
      f' -DFRAME_WIDTH={FRAME_WIDTH} -DFRAME_HEIGHT={FRAME_WIDTH} -DFRAME_STRIDE={FRAME_STRIDE} -DFRAME_OFFSET={frame_offset} ' + \
      f' -DRGB_WIDTH={FRAME_WIDTH} -DRGB_HEIGHT={FRAME_HEIGHT} -DYUV_STRIDE={FRAME_WIDTH} -DUV_OFFSET={FRAME_WIDTH*FRAME_HEIGHT}' + \
      ' -DSENSOR_ID=1 -DVIGNETTING=0 '
    if PC:
      build_args += ' -DHALF_AS_FLOAT=1 -cl-std=CL2.0'
    imgproc_prg = cl.Program(ctx, f.read()).build(options=build_args)

  return ctx, imgproc_prg

def proc_frame(ctx, imgproc_prg, data, rgb=False):
  q = cl.CommandQueue(ctx)

  yuv_buff = np.empty(FRAME_WIDTH * FRAME_HEIGHT + UV_SIZE * 2, dtype=np.uint8)

  cam_g = cl.Buffer(ctx, cl.mem_flags.READ_ONLY | cl.mem_flags.COPY_HOST_PTR, hostbuf=data)
  yuv_g = cl.Buffer(ctx, cl.mem_flags.WRITE_ONLY, FRAME_WIDTH * FRAME_HEIGHT + UV_SIZE * 2)

  krn = imgproc_prg.process_raw
  krn.set_scalar_arg_dtypes([None, None, np.int32])
  local_worksize = (20, 20) if TICI else (4, 4)

  ev1 = krn(q, (FRAME_WIDTH//2, FRAME_HEIGHT//2), local_worksize, cam_g, yuv_g, 1)
  cl.enqueue_copy(q, yuv_buff, yuv_g, wait_for=[ev1]).wait()
  cl.enqueue_barrier(q)

  y = yuv_buff[:FRAME_WIDTH*FRAME_HEIGHT].reshape((FRAME_HEIGHT, FRAME_WIDTH))
  u = yuv_buff[FRAME_WIDTH*FRAME_HEIGHT::2].reshape((UV_HEIGHT, UV_WIDTH))
  v = yuv_buff[FRAME_WIDTH*FRAME_HEIGHT+1::2].reshape((UV_HEIGHT, UV_WIDTH))

  if rgb:
    return yuv_to_rgb(y, u, v)
  else:
    return y, u, v


def imgproc_replay(lr):
  ctx, imgproc_prg = init_kernels()

  frames = []
  for m in lr:
    if m.which() == 'roadCameraState':
      cs = m.roadCameraState
      if cs.image:
        data = np.frombuffer(cs.image, dtype=np.uint8)
        img = proc_frame(ctx, imgproc_prg, data)

        frames.append(img)

  return frames


if __name__ == "__main__":
  # load logs
  lr = list(LogReader(TEST_ROUTE))
  # run replay
  out_frames = imgproc_replay(lr)

  all_pix = np.concatenate([np.concatenate([d.flatten() for d in f]) for f in out_frames])
  pix_hash = hashlib.sha1(all_pix).hexdigest()

  with open('imgproc_replay_ref_hash') as f:
    ref_hash = f.read()

  if pix_hash != ref_hash:
    print("result changed! please check kernel")
    print(f"ref: {ref_hash}")
    print(f"new: {pix_hash}")
  else:
    print("test passed")