Back to Repositories

Validating Task Management Implementation in Conductor-OSS

This test suite validates core task functionality in the Conductor OSS workflow engine, focusing on task status management, task definition handling, queue wait time calculations, and deep copy operations.

Test Coverage Overview

The test suite provides comprehensive coverage of Task class functionality in Conductor:

  • Task status management and validation
  • Task definition association and retrieval
  • Queue wait time calculations
  • Deep copy operations with field verification

Implementation Analysis

The testing approach uses JUnit to validate core Task class behaviors. Tests employ systematic verification of task properties, status transitions, and timing calculations. The implementation leverages JUnit assertions for precise validation of expected outcomes.

Technical Details

Testing tools and configuration:

  • JUnit testing framework
  • Assert methods for validation
  • System time manipulation for queue timing tests
  • Protobuf integration for message handling
  • Mock task definitions and workflow tasks

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Isolated test methods with clear objectives
  • Comprehensive property validation
  • Edge case handling for timing calculations
  • Thorough deep copy verification
  • Proper test setup and cleanup

conductor-oss/conductor

common/src/test/java/com/netflix/conductor/common/tasks/TaskTest.java

            
/*
 * Copyright 2020 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.common.tasks;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
import java.util.stream.Collectors;

import org.junit.Test;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;

import com.google.protobuf.Any;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class TaskTest {

    @Test
    public void test() {

        Task task = new Task();
        task.setStatus(Status.FAILED);
        assertEquals(Status.FAILED, task.getStatus());

        Set<String> resultStatues =
                Arrays.stream(TaskResult.Status.values())
                        .map(Enum::name)
                        .collect(Collectors.toSet());

        for (Status status : Status.values()) {
            if (resultStatues.contains(status.name())) {
                TaskResult.Status trStatus = TaskResult.Status.valueOf(status.name());
                assertEquals(status.name(), trStatus.name());

                task = new Task();
                task.setStatus(status);
                assertEquals(status, task.getStatus());
            }
        }
    }

    @Test
    public void testTaskDefinitionIfAvailable() {
        Task task = new Task();
        task.setStatus(Status.FAILED);
        assertEquals(Status.FAILED, task.getStatus());

        assertNull(task.getWorkflowTask());
        assertFalse(task.getTaskDefinition().isPresent());

        WorkflowTask workflowTask = new WorkflowTask();
        TaskDef taskDefinition = new TaskDef();
        workflowTask.setTaskDefinition(taskDefinition);
        task.setWorkflowTask(workflowTask);

        assertTrue(task.getTaskDefinition().isPresent());
        assertEquals(taskDefinition, task.getTaskDefinition().get());
    }

    @Test
    public void testTaskQueueWaitTime() {
        Task task = new Task();

        long currentTimeMillis = System.currentTimeMillis();
        task.setScheduledTime(currentTimeMillis - 30_000); // 30 seconds ago
        task.setStartTime(currentTimeMillis - 25_000);

        long queueWaitTime = task.getQueueWaitTime();
        assertEquals(5000L, queueWaitTime);

        task.setUpdateTime(currentTimeMillis - 20_000);
        task.setCallbackAfterSeconds(10);
        queueWaitTime = task.getQueueWaitTime();
        assertTrue(queueWaitTime > 0);
    }

    @Test
    public void testDeepCopyTask() {
        final Task task = new Task();
        // In order to avoid forgetting putting inside the copy method the newly added fields check
        // the number of declared fields.
        final int expectedTaskFieldsNumber = 42;
        final int declaredFieldsNumber = task.getClass().getDeclaredFields().length;

        assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber);

        task.setCallbackAfterSeconds(111L);
        task.setCallbackFromWorker(false);
        task.setCorrelationId("correlation_id");
        task.setInputData(new HashMap<>());
        task.setOutputData(new HashMap<>());
        task.setReferenceTaskName("ref_task_name");
        task.setStartDelayInSeconds(1);
        task.setTaskDefName("task_def_name");
        task.setTaskType("dummy_task_type");
        task.setWorkflowInstanceId("workflowInstanceId");
        task.setWorkflowType("workflowType");
        task.setResponseTimeoutSeconds(11L);
        task.setStatus(Status.COMPLETED);
        task.setRetryCount(0);
        task.setPollCount(0);
        task.setTaskId("taskId");
        task.setWorkflowTask(new WorkflowTask());
        task.setDomain("domain");
        task.setInputMessage(Any.getDefaultInstance());
        task.setOutputMessage(Any.getDefaultInstance());
        task.setRateLimitPerFrequency(11);
        task.setRateLimitFrequencyInSeconds(11);
        task.setExternalInputPayloadStoragePath("externalInputPayloadStoragePath");
        task.setExternalOutputPayloadStoragePath("externalOutputPayloadStoragePath");
        task.setWorkflowPriority(0);
        task.setIteration(1);
        task.setExecutionNameSpace("name_space");
        task.setIsolationGroupId("groupId");
        task.setStartTime(12L);
        task.setEndTime(20L);
        task.setScheduledTime(7L);
        task.setRetried(false);
        task.setReasonForIncompletion("");
        task.setWorkerId("");
        task.setSubWorkflowId("");
        task.setSubworkflowChanged(false);

        final Task copy = task.deepCopy();
        assertEquals(task, copy);
    }
}