Back to Repositories

Testing Workflow Status Publisher Integration in Conductor-OSS

This test suite validates the workflow status publisher functionality in Conductor, focusing on the integration between workflow execution events and message queues. It verifies how the system publishes workflow status updates to configured queues when workflows are terminated or completed.

Test Coverage Overview

The test suite provides comprehensive coverage of workflow status publishing mechanisms:

  • Workflow termination event publishing and validation
  • Workflow completion notification handling
  • Message queue integration for status updates
  • Correlation ID and workflow metadata verification
  • Queue acknowledgment and message payload validation

Implementation Analysis

The testing approach uses Spring Boot test framework with JUnit for integration testing. It implements a queue-based publisher pattern to verify workflow status notifications.

Key patterns include:
  • Spring test context configuration
  • In-memory database testing
  • Queue message polling and verification
  • Workflow execution state management

Technical Details

Testing tools and configuration:

  • SpringRunner for test execution
  • In-memory database configuration
  • QueueDAO for message queue operations
  • ObjectMapper for JSON payload handling
  • Custom test property sources
  • Workflow status listener configuration

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper test setup and cleanup
  • Isolation of test cases
  • Comprehensive assertion checks
  • Clear test method naming
  • Proper resource management
  • Thorough error case handling

conductor-oss/conductor

workflow-event-listener/src/test/java/com/netflix/conductor/test/listener/WorkflowStatusPublisherIntegrationTest.java

            
/*
 * Copyright 2023 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.test.listener;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.ConductorTestApp;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.netflix.conductor.service.WorkflowService;

import com.fasterxml.jackson.databind.ObjectMapper;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED;

import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest(
        classes = ConductorTestApp.class,
        properties = {
            "conductor.db.type=memory",
            "conductor.workflow-status-listener.type=queue_publisher",
            "conductor.workflow-status-listener.queue-publisher.successQueue=dummy",
            "conductor.workflow-status-listener.queue-publisher.failureQueue=dummy",
            "conductor.workflow-status-listener.queue-publisher.finalizeQueue=final",
            "conductor.app.workflow.name-validation.enabled=true"
        })
@TestPropertySource(locations = "classpath:application-integrationtest.properties")
public class WorkflowStatusPublisherIntegrationTest {

    private final String CALLBACK_QUEUE = "dummy";
    private final String FINALIZED_QUEUE = "final";
    private static final String LINEAR_WORKFLOW_T1_T2 = "junit_test_wf";
    private static final int WORKFLOW_VERSION = 1;
    private static final String INCOMPLETION_REASON = "test reason";
    private static final String DEFAULT_OWNER_EMAIL = "[email protected]";

    @Autowired private ObjectMapper objectMapper;

    @Autowired QueueDAO queueDAO;

    @Autowired protected MetadataService metadataService;

    @Autowired protected ExecutionService workflowExecutionService;

    @Autowired protected WorkflowService workflowExecutor;

    @Before
    public void setUp() {
        TaskDef taskDef = new TaskDef();
        taskDef.setName("junit_task_1");
        taskDef.setTimeoutSeconds(120);
        taskDef.setResponseTimeoutSeconds(120);
        taskDef.setRetryCount(1);
        taskDef.setOwnerEmail(DEFAULT_OWNER_EMAIL);
        metadataService.registerTaskDef(Collections.singletonList(taskDef));
    }

    @After
    public void cleanUp() {
        List<String> workflows =
                metadataService.getWorkflowDefs().stream()
                        .map(WorkflowDef::getName)
                        .collect(Collectors.toList());
        for (String wfName : workflows) {
            List<String> running =
                    workflowExecutionService.getRunningWorkflows(wfName, WORKFLOW_VERSION);
            for (String wfid : running) {
                workflowExecutor.terminateWorkflow(wfid, "cleanup");
            }
        }
        queueDAO.queuesDetail().keySet().forEach(queueDAO::flush);
    }

    @Test
    public void testListenerOnTerminatedWorkflow() throws IOException {
        String id =
                startOrLoadWorkflowExecution(
                        LINEAR_WORKFLOW_T1_T2,
                        1,
                        "testWorkflowTerminatedListener",
                        new HashMap<>());
        workflowExecutor.terminateWorkflow(id, INCOMPLETION_REASON);

        List<Message> callbackMessages = queueDAO.pollMessages(CALLBACK_QUEUE, 1, 200);
        queueDAO.ack(CALLBACK_QUEUE, callbackMessages.get(0).getId());

        WorkflowSummary payload =
                objectMapper.readValue(callbackMessages.get(0).getPayload(), WorkflowSummary.class);
        assertEquals(id, callbackMessages.get(0).getId());
        assertEquals(LINEAR_WORKFLOW_T1_T2, payload.getWorkflowType());
        assertEquals("testWorkflowTerminatedListener", payload.getCorrelationId());
        assertEquals(Workflow.WorkflowStatus.TERMINATED, payload.getStatus());
        assertEquals(INCOMPLETION_REASON, payload.getReasonForIncompletion());

        // check finalized queue
        callbackMessages = queueDAO.pollMessages(FINALIZED_QUEUE, 1, 200);
        queueDAO.ack(CALLBACK_QUEUE, callbackMessages.get(0).getId());

        payload =
                objectMapper.readValue(callbackMessages.get(0).getPayload(), WorkflowSummary.class);
        assertEquals(id, callbackMessages.get(0).getId());
        assertEquals(LINEAR_WORKFLOW_T1_T2, payload.getWorkflowType());
        assertEquals("testWorkflowTerminatedListener", payload.getCorrelationId());
        assertEquals(Workflow.WorkflowStatus.TERMINATED, payload.getStatus());
        assertEquals(INCOMPLETION_REASON, payload.getReasonForIncompletion());
    }

    @Test
    public void testListenerOnCompletedWorkflow() throws IOException, InterruptedException {
        WorkflowDef workflowDef = new WorkflowDef();
        workflowDef.setName(LINEAR_WORKFLOW_T1_T2);
        workflowDef.setDescription(workflowDef.getName());
        workflowDef.setVersion(WORKFLOW_VERSION);
        workflowDef.setSchemaVersion(2);
        workflowDef.setOwnerEmail(DEFAULT_OWNER_EMAIL);
        workflowDef.setWorkflowStatusListenerEnabled(true);
        LinkedList<WorkflowTask> wftasks = new LinkedList<>();

        WorkflowTask wft1 = new WorkflowTask();
        wft1.setName("junit_task_1");
        wft1.setTaskReferenceName("t1");

        wftasks.add(wft1);
        workflowDef.setTasks(wftasks);

        metadataService.updateWorkflowDef(Collections.singletonList(workflowDef));

        String id =
                startOrLoadWorkflowExecution(
                        workflowDef.getName(), 1, "testWorkflowCompletedListener", new HashMap<>());

        List<Task> tasks = workflowExecutionService.getTasks("junit_task_1", null, 1);
        tasks.get(0).setStatus(COMPLETED);
        workflowExecutionService.updateTask(new TaskResult(tasks.get(0)));

        checkIfWorkflowIsCompleted(id);

        List<Message> callbackMessages = queueDAO.pollMessages(CALLBACK_QUEUE, 1, 200);
        queueDAO.ack(CALLBACK_QUEUE, callbackMessages.get(0).getId());

        WorkflowSummary payload =
                objectMapper.readValue(callbackMessages.get(0).getPayload(), WorkflowSummary.class);
        assertEquals(id, callbackMessages.get(0).getId());
        assertEquals(LINEAR_WORKFLOW_T1_T2, payload.getWorkflowType());
        assertEquals("testWorkflowCompletedListener", payload.getCorrelationId());
        assertEquals(Workflow.WorkflowStatus.COMPLETED, payload.getStatus());

        // check finalized queue
        callbackMessages = queueDAO.pollMessages(FINALIZED_QUEUE, 1, 200);
        queueDAO.ack(CALLBACK_QUEUE, callbackMessages.get(0).getId());

        payload =
                objectMapper.readValue(callbackMessages.get(0).getPayload(), WorkflowSummary.class);
        assertEquals(id, callbackMessages.get(0).getId());
        assertEquals(LINEAR_WORKFLOW_T1_T2, payload.getWorkflowType());
        assertEquals("testWorkflowCompletedListener", payload.getCorrelationId());
        assertEquals(Workflow.WorkflowStatus.COMPLETED, payload.getStatus());
    }

    @SuppressWarnings("BusyWait")
    private void checkIfWorkflowIsCompleted(String id) throws InterruptedException {
        int statusRetrieveAttempts = 0;
        while (workflowExecutor.getExecutionStatus(id, false).getStatus()
                != Workflow.WorkflowStatus.COMPLETED) {
            if (statusRetrieveAttempts > 5) {
                break;
            }
            Thread.sleep(100);
            statusRetrieveAttempts++;
        }
    }

    private String startOrLoadWorkflowExecution(
            String workflowName, int version, String correlationId, Map<String, Object> input) {
        StartWorkflowRequest startWorkflowInput = new StartWorkflowRequest();
        startWorkflowInput.setName(workflowName);
        startWorkflowInput.setVersion(version);
        startWorkflowInput.setCorrelationId(correlationId);
        startWorkflowInput.setInput(input);
        return workflowExecutor.startWorkflow(startWorkflowInput);
    }
}