Back to Repositories

Testing Annotated Worker Implementation in conductor-oss/conductor

This test suite validates the functionality of annotated workers in the Conductor Java SDK, focusing on input parameter handling, task execution, and worker configuration. The tests ensure proper behavior of worker annotations, parameter processing, and thread management.

Test Coverage Overview

The test suite provides comprehensive coverage of annotated worker functionality in Conductor.

Key areas tested include:
  • Input parameter handling for Lists, Maps, and Tasks
  • Null value processing
  • Multiple input parameter support
  • Worker configuration including polling intervals and thread counts
  • Type conversion and annotation processing

Implementation Analysis

The testing approach employs JUnit 5 with focused unit tests for each worker feature. Tests utilize mock objects and assertions to verify behavior patterns.

Notable patterns include:
  • Method-level worker task annotations
  • Input/Output parameter decorators
  • Worker configuration inheritance
  • Task execution verification

Technical Details

Testing tools and configuration:
  • JUnit Jupiter test framework
  • Mockito for TaskClient mocking
  • Custom annotations: @WorkerTask, @InputParam, @OutputParam
  • TaskRunnerConfigurer for worker execution
  • Custom test configuration classes

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through thorough coverage and clear organization.

Notable practices include:
  • Descriptive test names using @DisplayName
  • Isolated test cases for each feature
  • Comprehensive edge case handling
  • Clear separation of concerns
  • Proper test setup and teardown

conductor-oss/conductor

conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerTests.java

            
/*
 * Copyright 2022 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.sdk.workflow.executor.task;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.OutputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

public class AnnotatedWorkerTests {

    @Test
    @DisplayName("it should handle null values when InputParam is a List")
    void nullListAsInputParam() throws NoSuchMethodException {
        var worker = new CarWorker();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", List.class), worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();
        assertNull(outputData.get("result"));
    }

    @Test
    @DisplayName("it should handle an empty List as InputParam")
    void emptyListAsInputParam() throws NoSuchMethodException {
        var worker = new CarWorker();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", List.class), worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setInputData(Map.of("input", List.of()));

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();

        @SuppressWarnings("unchecked")
        List<Car> result = (List<Car>) outputData.get("result");
        assertTrue(result.isEmpty());
    }

    @Test
    @DisplayName("it should handle a non empty List as InputParam")
    void nonEmptyListAsInputParam() throws NoSuchMethodException {
        var worker = new CarWorker();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", List.class), worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setInputData(Map.of("input", List.of(Map.of("brand", "BMW"))));

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();

        @SuppressWarnings("unchecked")
        List<Car> result = (List<Car>) outputData.get("result");
        assertEquals(1, result.size());

        Car car = result.get(0);
        assertEquals("BMW", car.getBrand());
    }

    @SuppressWarnings("rawtypes")
    static class RawListInput {
        @WorkerTask("test_1")
        public @OutputParam("result") List doWork(@InputParam("input") List input) {
            return input;
        }
    }

    @Test
    @DisplayName("it should handle a Raw List Type as InputParam")
    void rawListAsInputParam() throws NoSuchMethodException {
        var worker = new RawListInput();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", List.class), worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setInputData(Map.of("input", List.of(Map.of("brand", "BMW"))));

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();
        assertEquals(task.getInputData().get("input"), outputData.get("result"));
    }

    static class MapInput {
        @WorkerTask("test_1")
        public @OutputParam("result") Map<String, Object> doWork(Map<String, Object> input) {
            return input;
        }
    }

    @Test
    @DisplayName("it should accept a not annotated Map as input")
    void mapAsInputParam() throws NoSuchMethodException {
        var worker = new MapInput();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", Map.class), worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setInputData(Map.of("input", List.of(Map.of("brand", "BMW"))));

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();
        assertEquals(task.getInputData(), outputData.get("result"));
    }

    static class TaskInput {
        @WorkerTask("test_1")
        public @OutputParam("result") Task doWork(Task input) {
            return input;
        }
    }

    @Test
    @DisplayName("it should accept a Task as input")
    void taskAsInputParam() throws NoSuchMethodException {
        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setTaskId(UUID.randomUUID().toString());
        task.setInputData(Map.of("input", List.of(Map.of("brand", "BMW"))));

        var worker = new TaskInput();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", Task.class), worker);

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();
        var result = (Task) outputData.get("result");
        assertEquals(result.getTaskId(), task.getTaskId());
    }

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.PARAMETER)
    public @interface AnotherAnnotation {}

    static class AnotherAnnotationInput {
        @WorkerTask("test_2")
        public @OutputParam("result") Bike doWork(@AnotherAnnotation Bike input) {
            return input;
        }
    }

    @Test
    @DisplayName(
            "it should convert to the correct type even if there's no @InputParam and parameters are annotated with other annotations")
    void annotatedWithAnotherAnnotation() throws NoSuchMethodException {
        var worker = new AnotherAnnotationInput();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1", worker.getClass().getMethod("doWork", Bike.class), worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setTaskId(UUID.randomUUID().toString());
        task.setInputData(Map.of("brand", "Trek"));

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();
        var bike = (Bike) outputData.get("result");
        assertEquals("Trek", bike.getBrand());
    }

    static class MultipleInputParams {
        @WorkerTask(value = "test_1", threadCount = 3, pollingInterval = 333)
        public Map<String, Object> doWork(
                @InputParam("bike") Bike bike, @InputParam("car") Car car) {
            return Map.of("bike", bike, "car", car);
        }
    }

    @Test
    @DisplayName("it should handle multiple input params")
    void multipleInputParams() throws NoSuchMethodException {
        var worker = new MultipleInputParams();
        var annotatedWorker =
                new AnnotatedWorker(
                        "test_1",
                        worker.getClass().getMethod("doWork", Bike.class, Car.class),
                        worker);

        var task = new Task();
        task.setStatus(Task.Status.IN_PROGRESS);
        task.setTaskId(UUID.randomUUID().toString());
        task.setInputData(Map.of("bike", Map.of("brand", "Trek"), "car", Map.of("brand", "BMW")));

        var result0 = annotatedWorker.execute(task);
        var outputData = result0.getOutputData();

        var bike = (Bike) outputData.get("bike");
        assertEquals("Trek", bike.getBrand());

        var car = (Car) outputData.get("car");
        assertEquals("BMW", car.getBrand());
    }

    @Test
    @DisplayName("it should honor the polling interval from annotations and config")
    void pollingIntervalTest() {
        var config = new TestWorkerConfig();

        var worker = new MultipleInputParams();

        AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class));
        annotatedWorkerExecutor.addBean(worker);
        annotatedWorkerExecutor.startPolling();
        List<Worker> workers = annotatedWorkerExecutor.getWorkers();
        assertNotNull(workers);
        assertEquals(1, workers.size());
        Worker taskWorker = workers.get(0);
        assertEquals(333, taskWorker.getPollingInterval());

        var worker2 = new AnotherAnnotationInput();
        annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class));
        annotatedWorkerExecutor.addBean(worker2);
        annotatedWorkerExecutor.startPolling();
        workers = annotatedWorkerExecutor.getWorkers();
        assertNotNull(workers);
        assertEquals(1, workers.size());
        taskWorker = workers.get(0);
        assertEquals(100, taskWorker.getPollingInterval());

        config.setPollingInterval("test_2", 123);
        annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class), config);
        annotatedWorkerExecutor.addBean(worker2);
        annotatedWorkerExecutor.startPolling();
        workers = annotatedWorkerExecutor.getWorkers();
        assertNotNull(workers);
        assertEquals(1, workers.size());
        taskWorker = workers.get(0);
        assertEquals(123, taskWorker.getPollingInterval());
    }

    @Test
    @DisplayName("it should return the correct thread test count")
    void threadCountTest() {
        var config = new TestWorkerConfig();

        var worker = new MultipleInputParams();
        var worker2 = new AnotherAnnotationInput();

        AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class), config);
        annotatedWorkerExecutor.addBean(worker);
        annotatedWorkerExecutor.addBean(worker2);

        annotatedWorkerExecutor.startPolling();
        TaskRunnerConfigurer runner = annotatedWorkerExecutor.getTaskRunner();
        assertNotNull(runner);
        Map<String, Integer> taskThreadCount = runner.getTaskThreadCount();

        assertNotNull(taskThreadCount);
        assertEquals(3, taskThreadCount.get("test_1"));
        assertEquals(1, taskThreadCount.get("test_2"));

        annotatedWorkerExecutor.shutdown();
        config.setThreadCount("test_2", 2);
        annotatedWorkerExecutor = new AnnotatedWorkerExecutor(mock(TaskClient.class), config);
        annotatedWorkerExecutor.addBean(worker);
        annotatedWorkerExecutor.addBean(worker2);

        annotatedWorkerExecutor.startPolling();
        runner = annotatedWorkerExecutor.getTaskRunner();

        taskThreadCount = runner.getTaskThreadCount();

        assertNotNull(taskThreadCount);
        assertEquals(3, taskThreadCount.get("test_1"));
        assertEquals(2, taskThreadCount.get("test_2"));
    }
}