Back to Repositories

Validating Workflow Creation and Execution Patterns in Conductor OSS

This test suite validates the workflow creation, registration, and execution capabilities of the Conductor OSS Java SDK. It focuses on testing various workflow components including dynamic forks, task definitions, and workflow execution patterns.

Test Coverage Overview

The test suite provides comprehensive coverage of workflow creation and execution scenarios.

Key areas tested include:
  • Workflow registration and definition validation
  • Inline workflow execution
  • Dynamic fork task generation
  • Worker task implementations
  • Switch case handling
Edge cases covered include workflow execution failures and non-existent workflow verification.

Implementation Analysis

The testing approach utilizes JUnit 5 with a custom WorkflowTestRunner for orchestrating tests. The implementation follows a builder pattern for workflow construction and employs CompletableFuture for asynchronous execution handling.

Key patterns include:
  • Builder pattern for workflow definition
  • Annotation-based worker task definitions
  • Async workflow execution validation

Technical Details

Testing tools and configuration:
  • JUnit Jupiter test framework
  • Custom WorkflowTestRunner for test orchestration
  • Mock server running on port 8080
  • Conductor version 3.7.3
  • SLF4J for logging
  • Custom annotations for worker task definitions

Best Practices Demonstrated

The test suite demonstrates several testing best practices for workflow applications.

Notable practices include:
  • Proper test setup and cleanup with @BeforeAll and @AfterAll
  • Timeout handling for async operations
  • Comprehensive error case validation
  • Clear test method naming conventions
  • Modular test organization

conductor-oss/conductor

conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/def/WorkflowCreationTests.java

            
/*
 * Copyright 2022 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.sdk.workflow.def;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.sdk.testing.WorkflowTestRunner;
import com.netflix.conductor.sdk.workflow.def.tasks.DynamicFork;
import com.netflix.conductor.sdk.workflow.def.tasks.DynamicForkInput;
import com.netflix.conductor.sdk.workflow.def.tasks.ForkJoin;
import com.netflix.conductor.sdk.workflow.def.tasks.Javascript;
import com.netflix.conductor.sdk.workflow.def.tasks.SimpleTask;
import com.netflix.conductor.sdk.workflow.def.tasks.Switch;
import com.netflix.conductor.sdk.workflow.def.tasks.Task;
import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor;
import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.OutputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;
import com.netflix.conductor.sdk.workflow.testing.TestWorkflowInput;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@Disabled
public class WorkflowCreationTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowCreationTests.class);

    private static WorkflowExecutor executor;

    private static WorkflowTestRunner runner;

    @BeforeAll
    public static void init() throws IOException {
        runner = new WorkflowTestRunner(8080, "3.7.3");
        runner.init("com.netflix.conductor.sdk");
        executor = runner.getWorkflowExecutor();
    }

    @AfterAll
    public static void cleanUp() {
        runner.shutdown();
    }

    @WorkerTask("get_user_info")
    public @OutputParam("zipCode") String getZipCode(@InputParam("name") String userName) {
        return "95014";
    }

    @WorkerTask("task2")
    public @OutputParam("greetings") String task2() {
        return "Hello World";
    }

    @WorkerTask("task3")
    public @OutputParam("greetings") String task3() {
        return "Hello World-3";
    }

    @WorkerTask("fork_gen")
    public DynamicForkInput generateDynamicFork() {
        DynamicForkInput forks = new DynamicForkInput();
        Map<String, Object> inputs = new HashMap<>();
        forks.setInputs(inputs);
        List<Task<?>> tasks = new ArrayList<>();
        forks.setTasks(tasks);

        for (int i = 0; i < 3; i++) {
            SimpleTask task = new SimpleTask("task2", "fork_task_" + i);
            tasks.add(task);
            HashMap<String, Object> taskInput = new HashMap<>();
            taskInput.put("key", "value");
            taskInput.put("key2", 101);
            inputs.put(task.getTaskReferenceName(), taskInput);
        }
        return forks;
    }

    private ConductorWorkflow<TestWorkflowInput> registerTestWorkflow()
            throws InterruptedException {
        InputStream script = getClass().getResourceAsStream("/script.js");
        SimpleTask getUserInfo = new SimpleTask("get_user_info", "get_user_info");
        getUserInfo.input("name", ConductorWorkflow.input.get("name"));

        SimpleTask sendToCupertino = new SimpleTask("task2", "cupertino");
        SimpleTask sendToNYC = new SimpleTask("task2", "nyc");

        int len = 4;
        Task<?>[][] parallelTasks = new Task[len][1];
        for (int i = 0; i < len; i++) {
            parallelTasks[i][0] = new SimpleTask("task2", "task_parallel_" + i);
        }

        WorkflowBuilder<TestWorkflowInput> builder = new WorkflowBuilder<>(executor);
        TestWorkflowInput defaultInput = new TestWorkflowInput();
        defaultInput.setName("defaultName");

        builder.name("sdk_workflow_example")
                .version(1)
                .ownerEmail("[email protected]")
                .description("Example Workflow")
                .restartable(true)
                .variables(new WorkflowState())
                .timeoutPolicy(WorkflowDef.TimeoutPolicy.TIME_OUT_WF, 100)
                .defaultInput(defaultInput)
                .add(new Javascript("js", script))
                .add(new ForkJoin("parallel", parallelTasks))
                .add(getUserInfo)
                .add(
                        new Switch("decide2", "${workflow.input.zipCode}")
                                .switchCase("95014", sendToCupertino)
                                .switchCase("10121", sendToNYC))
                // .add(new SubWorkflow("subflow", "sub_workflow_example", 5))
                .add(new SimpleTask("task2", "task222"))
                .add(new DynamicFork("dynamic_fork", new SimpleTask("fork_gen", "fork_gen")));

        ConductorWorkflow<TestWorkflowInput> workflow = builder.build();
        boolean registered = workflow.registerWorkflow(true, true);
        assertTrue(registered);

        return workflow;
    }

    @Test
    public void verifyCreatedWorkflow() throws Exception {
        ConductorWorkflow<TestWorkflowInput> conductorWorkflow = registerTestWorkflow();
        WorkflowDef def = conductorWorkflow.toWorkflowDef();
        assertNotNull(def);
        assertTrue(
                def.getTasks()
                        .get(def.getTasks().size() - 2)
                        .getType()
                        .equals(TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC));
        assertTrue(
                def.getTasks()
                        .get(def.getTasks().size() - 1)
                        .getType()
                        .equals(TaskType.TASK_TYPE_JOIN));
    }

    @Test
    public void verifyInlineWorkflowExecution() throws ValidationError {
        TestWorkflowInput workflowInput = new TestWorkflowInput("username", "10121", "US");
        try {
            Workflow run = registerTestWorkflow().execute(workflowInput).get(10, TimeUnit.SECONDS);
            assertEquals(
                    Workflow.WorkflowStatus.COMPLETED,
                    run.getStatus(),
                    run.getReasonForIncompletion());
        } catch (Exception e) {
            e.printStackTrace();
            fail(e.getMessage());
        }
    }

    @Test
    public void testWorkflowExecutionByName() throws ExecutionException, InterruptedException {

        // Register the workflow first
        registerTestWorkflow();

        TestWorkflowInput input = new TestWorkflowInput("username", "10121", "US");

        ConductorWorkflow<TestWorkflowInput> conductorWorkflow =
                new ConductorWorkflow<TestWorkflowInput>(executor)
                        .from("sdk_workflow_example", null);

        CompletableFuture<Workflow> execution = conductorWorkflow.execute(input);
        try {
            execution.get(10, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
            fail(e.getMessage());
        }
    }

    @Test
    public void verifyWorkflowExecutionFailsIfNotExists()
            throws ExecutionException, InterruptedException {

        // Register the workflow first
        registerTestWorkflow();

        TestWorkflowInput input = new TestWorkflowInput("username", "10121", "US");

        try {
            ConductorWorkflow<TestWorkflowInput> conductorWorkflow =
                    new ConductorWorkflow<TestWorkflowInput>(executor)
                            .from("non_existent_workflow", null);
            conductorWorkflow.execute(input);
            fail("execution should have failed");
        } catch (Exception e) {
        }
    }
}