Back to Repositories

Testing Conductor Workflow Engine Integration in conductor-oss

A comprehensive integration test suite for the Conductor OSS workflow engine that validates ephemeral workflows, task definitions, and event handling. This abstract base class provides essential test infrastructure for verifying Conductor’s core functionality with Elasticsearch integration.

Test Coverage Overview

The test suite provides extensive coverage of Conductor’s workflow execution capabilities.

Key areas tested include:
  • Ephemeral workflow execution with stored tasks
  • Ephemeral workflow execution with ephemeral tasks
  • Mixed workflow scenarios with both stored and ephemeral tasks
  • Event handler registration and retrieval
Integration points focus on Elasticsearch persistence and workflow state management.

Implementation Analysis

The testing approach uses JUnit with Spring Test context for integration testing. The implementation leverages TestContainers for Elasticsearch setup and employs abstract methods to allow different concrete implementations of workflow operations.

Key patterns include:
  • Static container initialization for test efficiency
  • Flexible task definition creation
  • Workflow definition builders
  • Elasticsearch client management

Technical Details

Testing tools and configuration:
  • JUnit 4 test framework
  • TestContainers for Elasticsearch 7.17.16
  • Spring Test context configuration
  • Elasticsearch REST client
  • Custom task and workflow definition builders
  • Abstract method pattern for implementation flexibility

Best Practices Demonstrated

The test suite exemplifies several testing best practices for workflow systems.

Notable practices include:
  • Proper test isolation through container management
  • Comprehensive cleanup after test execution
  • Flexible test infrastructure through abstraction
  • Thorough validation of workflow states
  • Consistent test data generation
  • Clear separation of concerns in test organization

conductor-oss/conductor

test-harness/src/test/java/com/netflix/conductor/test/integration/AbstractEndToEndTest.java

            
/*
 * Copyright 2020 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.test.integration;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.TestPropertySource;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

@TestPropertySource(
        properties = {
            "conductor.indexing.enabled=true",
            "conductor.elasticsearch.version=7",
            "conductor.queue.type=xxx"
        })
public abstract class AbstractEndToEndTest {

    private static final Logger log = LoggerFactory.getLogger(AbstractEndToEndTest.class);

    private static final String TASK_DEFINITION_PREFIX = "task_";
    private static final String DEFAULT_DESCRIPTION = "description";
    // Represents null value deserialized from the redis in memory db
    private static final String DEFAULT_NULL_VALUE = "null";
    protected static final String DEFAULT_EMAIL_ADDRESS = "[email protected]";

    private static final ElasticsearchContainer container =
            new ElasticsearchContainer(
                    DockerImageName.parse("elasticsearch")
                            .withTag("7.17.16")); // this should match the client version

    private static RestClient restClient;

    // Initialization happens in a static block so the container is initialized
    // only once for all the sub-class tests in a CI environment
    // container is stopped when JVM exits
    // https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers
    static {
        container.start();
        String httpHostAddress = container.getHttpHostAddress();
        System.setProperty("conductor.elasticsearch.url", "http://" + httpHostAddress);
        log.info("Initialized Elasticsearch {}", container.getContainerId());
    }

    @BeforeClass
    public static void initializeEs() {
        String httpHostAddress = container.getHttpHostAddress();
        String host = httpHostAddress.split(":")[0];
        int port = Integer.parseInt(httpHostAddress.split(":")[1]);

        RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(host, port, "http"));
        restClient = restClientBuilder.build();
    }

    @AfterClass
    public static void cleanupEs() throws Exception {
        // deletes all indices
        Response beforeResponse = restClient.performRequest(new Request("GET", "/_cat/indices"));
        Reader streamReader = new InputStreamReader(beforeResponse.getEntity().getContent());
        BufferedReader bufferedReader = new BufferedReader(streamReader);

        String line;
        while ((line = bufferedReader.readLine()) != null) {
            String[] fields = line.split("\\s");
            String endpoint = String.format("/%s", fields[2]);

            restClient.performRequest(new Request("DELETE", endpoint));
        }

        if (restClient != null) {
            restClient.close();
        }
    }

    @Test
    public void testEphemeralWorkflowsWithStoredTasks() {
        String workflowExecutionName = "testEphemeralWorkflow";

        createAndRegisterTaskDefinitions("storedTaskDef", 5);
        WorkflowDef workflowDefinition = createWorkflowDefinition(workflowExecutionName);
        WorkflowTask workflowTask1 = createWorkflowTask("storedTaskDef1");
        WorkflowTask workflowTask2 = createWorkflowTask("storedTaskDef2");
        workflowDefinition.getTasks().addAll(Arrays.asList(workflowTask1, workflowTask2));

        String workflowId = startWorkflow(workflowExecutionName, workflowDefinition);
        assertNotNull(workflowId);

        Workflow workflow = getWorkflow(workflowId, true);
        WorkflowDef ephemeralWorkflow = workflow.getWorkflowDefinition();
        assertNotNull(ephemeralWorkflow);
        assertEquals(workflowDefinition, ephemeralWorkflow);
    }

    @Test
    public void testEphemeralWorkflowsWithEphemeralTasks() {
        String workflowExecutionName = "ephemeralWorkflowWithEphemeralTasks";

        WorkflowDef workflowDefinition = createWorkflowDefinition(workflowExecutionName);
        WorkflowTask workflowTask1 = createWorkflowTask("ephemeralTask1");
        TaskDef taskDefinition1 = createTaskDefinition("ephemeralTaskDef1");
        workflowTask1.setTaskDefinition(taskDefinition1);
        WorkflowTask workflowTask2 = createWorkflowTask("ephemeralTask2");
        TaskDef taskDefinition2 = createTaskDefinition("ephemeralTaskDef2");
        workflowTask2.setTaskDefinition(taskDefinition2);
        workflowDefinition.getTasks().addAll(Arrays.asList(workflowTask1, workflowTask2));

        String workflowId = startWorkflow(workflowExecutionName, workflowDefinition);

        assertNotNull(workflowId);

        Workflow workflow = getWorkflow(workflowId, true);
        WorkflowDef ephemeralWorkflow = workflow.getWorkflowDefinition();
        assertNotNull(ephemeralWorkflow);
        assertEquals(workflowDefinition, ephemeralWorkflow);

        List<WorkflowTask> ephemeralTasks = ephemeralWorkflow.getTasks();
        assertEquals(2, ephemeralTasks.size());
        for (WorkflowTask ephemeralTask : ephemeralTasks) {
            assertNotNull(ephemeralTask.getTaskDefinition());
        }
    }

    @Test
    public void testEphemeralWorkflowsWithEphemeralAndStoredTasks() {
        createAndRegisterTaskDefinitions("storedTask", 1);

        WorkflowDef workflowDefinition =
                createWorkflowDefinition("testEphemeralWorkflowsWithEphemeralAndStoredTasks");

        WorkflowTask workflowTask1 = createWorkflowTask("ephemeralTask1");
        TaskDef taskDefinition1 = createTaskDefinition("ephemeralTaskDef1");
        workflowTask1.setTaskDefinition(taskDefinition1);

        WorkflowTask workflowTask2 = createWorkflowTask("storedTask0");

        workflowDefinition.getTasks().add(workflowTask1);
        workflowDefinition.getTasks().add(workflowTask2);

        String workflowExecutionName = "ephemeralWorkflowWithEphemeralAndStoredTasks";

        String workflowId = startWorkflow(workflowExecutionName, workflowDefinition);
        assertNotNull(workflowId);

        Workflow workflow = getWorkflow(workflowId, true);
        WorkflowDef ephemeralWorkflow = workflow.getWorkflowDefinition();
        assertNotNull(ephemeralWorkflow);
        assertEquals(workflowDefinition, ephemeralWorkflow);

        TaskDef storedTaskDefinition = getTaskDefinition("storedTask0");
        List<WorkflowTask> tasks = ephemeralWorkflow.getTasks();
        assertEquals(2, tasks.size());
        assertEquals(workflowTask1, tasks.get(0));
        TaskDef currentStoredTaskDefinition = tasks.get(1).getTaskDefinition();
        assertNotNull(currentStoredTaskDefinition);
        assertEquals(storedTaskDefinition, currentStoredTaskDefinition);
    }

    @Test
    public void testEventHandler() {
        String eventName = "conductor:test_workflow:complete_task_with_event";
        EventHandler eventHandler = new EventHandler();
        eventHandler.setName("test_complete_task_event");
        EventHandler.Action completeTaskAction = new EventHandler.Action();
        completeTaskAction.setAction(EventHandler.Action.Type.complete_task);
        completeTaskAction.setComplete_task(new EventHandler.TaskDetails());
        completeTaskAction.getComplete_task().setTaskRefName("test_task");
        completeTaskAction.getComplete_task().setWorkflowId("test_id");
        completeTaskAction.getComplete_task().setOutput(new HashMap<>());
        eventHandler.getActions().add(completeTaskAction);
        eventHandler.setEvent(eventName);
        eventHandler.setActive(true);
        registerEventHandler(eventHandler);

        Iterator<EventHandler> it = getEventHandlers(eventName, true);
        EventHandler result = it.next();
        assertFalse(it.hasNext());
        assertEquals(eventHandler.getName(), result.getName());
    }

    protected WorkflowTask createWorkflowTask(String name) {
        WorkflowTask workflowTask = new WorkflowTask();
        workflowTask.setName(name);
        workflowTask.setWorkflowTaskType(TaskType.SIMPLE);
        workflowTask.setTaskReferenceName(name);
        workflowTask.setDescription(getDefaultDescription(name));
        workflowTask.setDynamicTaskNameParam(DEFAULT_NULL_VALUE);
        workflowTask.setCaseValueParam(DEFAULT_NULL_VALUE);
        workflowTask.setCaseExpression(DEFAULT_NULL_VALUE);
        workflowTask.setDynamicForkTasksParam(DEFAULT_NULL_VALUE);
        workflowTask.setDynamicForkTasksInputParamName(DEFAULT_NULL_VALUE);
        workflowTask.setSink(DEFAULT_NULL_VALUE);
        workflowTask.setEvaluatorType(DEFAULT_NULL_VALUE);
        workflowTask.setExpression(DEFAULT_NULL_VALUE);
        return workflowTask;
    }

    protected TaskDef createTaskDefinition(String name) {
        TaskDef taskDefinition = new TaskDef();
        taskDefinition.setName(name);
        return taskDefinition;
    }

    protected WorkflowDef createWorkflowDefinition(String workflowName) {
        WorkflowDef workflowDefinition = new WorkflowDef();
        workflowDefinition.setName(workflowName);
        workflowDefinition.setDescription(getDefaultDescription(workflowName));
        workflowDefinition.setFailureWorkflow(DEFAULT_NULL_VALUE);
        workflowDefinition.setOwnerEmail(DEFAULT_EMAIL_ADDRESS);
        return workflowDefinition;
    }

    protected List<TaskDef> createAndRegisterTaskDefinitions(
            String prefixTaskDefinition, int numberOfTaskDefinitions) {
        String prefix = Optional.ofNullable(prefixTaskDefinition).orElse(TASK_DEFINITION_PREFIX);
        List<TaskDef> definitions = new LinkedList<>();
        for (int i = 0; i < numberOfTaskDefinitions; i++) {
            TaskDef def =
                    new TaskDef(
                            prefix + i,
                            "task " + i + DEFAULT_DESCRIPTION,
                            DEFAULT_EMAIL_ADDRESS,
                            3,
                            60,
                            60);
            def.setTimeoutPolicy(TaskDef.TimeoutPolicy.RETRY);
            definitions.add(def);
        }
        this.registerTaskDefinitions(definitions);
        return definitions;
    }

    private String getDefaultDescription(String nameResource) {
        return nameResource + " " + DEFAULT_DESCRIPTION;
    }

    protected abstract String startWorkflow(
            String workflowExecutionName, WorkflowDef workflowDefinition);

    protected abstract Workflow getWorkflow(String workflowId, boolean includeTasks);

    protected abstract TaskDef getTaskDefinition(String taskName);

    protected abstract void registerTaskDefinitions(List<TaskDef> taskDefinitionList);

    protected abstract void registerWorkflowDefinition(WorkflowDef workflowDefinition);

    protected abstract void registerEventHandler(EventHandler eventHandler);

    protected abstract Iterator<EventHandler> getEventHandlers(String event, boolean activeOnly);
}