Back to Repositories

Testing gRPC Integration Endpoints in Conductor-OSS

This test suite implements comprehensive integration testing for the Conductor OSS gRPC endpoints, validating workflow execution, task management, and metadata operations through gRPC communication channels. The suite ensures reliable communication between Conductor services and clients using gRPC protocols.

Test Coverage Overview

The test suite provides extensive coverage of Conductor’s gRPC functionality:
  • Workflow lifecycle management including creation, execution, and termination
  • Task definition registration and retrieval
  • Task polling and status updates
  • Workflow and task search capabilities
  • Event handler registration and management

Implementation Analysis

The testing approach utilizes Spring Boot test framework with JUnit, implementing a comprehensive end-to-end testing strategy. The suite employs abstract class patterns to provide reusable test infrastructure, with specific gRPC client implementations for different service endpoints (TaskClient, WorkflowClient, MetadataClient, EventClient).

The implementation leverages Spring’s TestPropertySource for configuration management and includes proper test isolation through SpringRunner.

Technical Details

Key technical components include:
  • Spring Boot Test framework with dedicated test properties
  • gRPC server configuration on port 8092
  • Custom gRPC clients for different service endpoints
  • JUnit assertions for validation
  • Integration with Conductor’s core services
  • Custom test property source configuration

Best Practices Demonstrated

The test suite exemplifies several testing best practices:
  • Proper separation of concerns through abstract base classes
  • Comprehensive assertion coverage for all operations
  • Systematic workflow state verification
  • Thorough error handling and validation
  • Clear test setup and teardown procedures
  • Efficient test data management

conductor-oss/conductor

test-util/src/test/java/com/netflix/conductor/test/integration/grpc/AbstractGrpcEndToEndTest.java

            
/*
 * Copyright 2023 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.test.integration.grpc;

import java.util.*;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.ConductorTestApp;
import com.netflix.conductor.client.grpc.EventClient;
import com.netflix.conductor.client.grpc.MetadataClient;
import com.netflix.conductor.client.grpc.TaskClient;
import com.netflix.conductor.client.grpc.WorkflowClient;
import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskDef.TimeoutPolicy;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.test.integration.AbstractEndToEndTest;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@SpringBootTest(
        classes = ConductorTestApp.class,
        properties = {"conductor.grpc-server.enabled=true", "conductor.grpc-server.port=8092"})
@TestPropertySource(locations = "classpath:application-integrationtest.properties")
public abstract class AbstractGrpcEndToEndTest extends AbstractEndToEndTest {

    protected static TaskClient taskClient;
    protected static WorkflowClient workflowClient;
    protected static MetadataClient metadataClient;
    protected static EventClient eventClient;

    @Override
    protected String startWorkflow(String workflowExecutionName, WorkflowDef workflowDefinition) {
        StartWorkflowRequest workflowRequest =
                new StartWorkflowRequest()
                        .withName(workflowExecutionName)
                        .withWorkflowDef(workflowDefinition);
        return workflowClient.startWorkflow(workflowRequest);
    }

    @Override
    protected Workflow getWorkflow(String workflowId, boolean includeTasks) {
        return workflowClient.getWorkflow(workflowId, includeTasks);
    }

    @Override
    protected TaskDef getTaskDefinition(String taskName) {
        return metadataClient.getTaskDef(taskName);
    }

    @Override
    protected void registerTaskDefinitions(List<TaskDef> taskDefinitionList) {
        metadataClient.registerTaskDefs(taskDefinitionList);
    }

    @Override
    protected void registerWorkflowDefinition(WorkflowDef workflowDefinition) {
        metadataClient.registerWorkflowDef(workflowDefinition);
    }

    @Override
    protected void registerEventHandler(EventHandler eventHandler) {
        eventClient.registerEventHandler(eventHandler);
    }

    @Override
    protected Iterator<EventHandler> getEventHandlers(String event, boolean activeOnly) {
        return eventClient.getEventHandlers(event, activeOnly);
    }

    @Test
    public void testAll() throws Exception {
        assertNotNull(taskClient);
        List<TaskDef> defs = new LinkedList<>();
        for (int i = 0; i < 5; i++) {
            TaskDef def = new TaskDef("t" + i, "task " + i, DEFAULT_EMAIL_ADDRESS, 3, 60, 60);
            def.setTimeoutPolicy(TimeoutPolicy.RETRY);
            defs.add(def);
        }
        metadataClient.registerTaskDefs(defs);

        for (int i = 0; i < 5; i++) {
            final String taskName = "t" + i;
            TaskDef def = metadataClient.getTaskDef(taskName);
            assertNotNull(def);
            assertEquals(taskName, def.getName());
        }

        WorkflowDef def = createWorkflowDefinition("test" + UUID.randomUUID());
        WorkflowTask t0 = createWorkflowTask("t0");
        WorkflowTask t1 = createWorkflowTask("t1");

        def.getTasks().add(t0);
        def.getTasks().add(t1);

        metadataClient.registerWorkflowDef(def);
        WorkflowDef found = metadataClient.getWorkflowDef(def.getName(), null);
        assertNotNull(found);
        assertEquals(def, found);

        String correlationId = "test_corr_id";
        StartWorkflowRequest startWf = new StartWorkflowRequest();
        startWf.setName(def.getName());
        startWf.setCorrelationId(correlationId);

        String workflowId = workflowClient.startWorkflow(startWf);
        assertNotNull(workflowId);

        Workflow workflow = workflowClient.getWorkflow(workflowId, false);
        assertEquals(0, workflow.getTasks().size());
        assertEquals(workflowId, workflow.getWorkflowId());

        workflow = workflowClient.getWorkflow(workflowId, true);
        assertNotNull(workflow);
        assertEquals(WorkflowStatus.RUNNING, workflow.getStatus());
        assertEquals(1, workflow.getTasks().size());
        assertEquals(t0.getTaskReferenceName(), workflow.getTasks().get(0).getReferenceTaskName());
        assertEquals(workflowId, workflow.getWorkflowId());

        List<String> runningIds =
                workflowClient.getRunningWorkflow(def.getName(), def.getVersion());
        assertNotNull(runningIds);
        assertEquals(1, runningIds.size());
        assertEquals(workflowId, runningIds.get(0));

        List<Task> polled =
                taskClient.batchPollTasksByTaskType("non existing task", "test", 1, 100);
        assertNotNull(polled);
        assertEquals(0, polled.size());

        polled = taskClient.batchPollTasksByTaskType(t0.getName(), "test", 1, 100);
        assertNotNull(polled);
        assertEquals(1, polled.size());
        assertEquals(t0.getName(), polled.get(0).getTaskDefName());
        Task task = polled.get(0);

        task.getOutputData().put("key1", "value1");
        task.setStatus(Status.COMPLETED);
        taskClient.updateTask(new TaskResult(task));

        polled = taskClient.batchPollTasksByTaskType(t0.getName(), "test", 1, 100);
        assertNotNull(polled);
        assertTrue(polled.toString(), polled.isEmpty());

        workflow = workflowClient.getWorkflow(workflowId, true);
        assertNotNull(workflow);
        assertEquals(WorkflowStatus.RUNNING, workflow.getStatus());
        assertEquals(2, workflow.getTasks().size());
        assertEquals(t0.getTaskReferenceName(), workflow.getTasks().get(0).getReferenceTaskName());
        assertEquals(t1.getTaskReferenceName(), workflow.getTasks().get(1).getReferenceTaskName());
        assertEquals(Status.COMPLETED, workflow.getTasks().get(0).getStatus());
        assertEquals(Status.SCHEDULED, workflow.getTasks().get(1).getStatus());

        Task taskById = taskClient.getTaskDetails(task.getTaskId());
        assertNotNull(taskById);
        assertEquals(task.getTaskId(), taskById.getTaskId());

        Thread.sleep(1000);
        SearchResult<WorkflowSummary> searchResult =
                workflowClient.search("workflowType='" + def.getName() + "'");
        assertNotNull(searchResult);
        assertEquals(1, searchResult.getTotalHits());
        assertEquals(workflow.getWorkflowId(), searchResult.getResults().get(0).getWorkflowId());

        SearchResult<Workflow> searchResultV2 =
                workflowClient.searchV2("workflowType='" + def.getName() + "'");
        assertNotNull(searchResultV2);
        assertEquals(1, searchResultV2.getTotalHits());
        assertEquals(workflow.getWorkflowId(), searchResultV2.getResults().get(0).getWorkflowId());

        SearchResult<WorkflowSummary> searchResultAdvanced =
                workflowClient.search(0, 1, null, null, "workflowType='" + def.getName() + "'");
        assertNotNull(searchResultAdvanced);
        assertEquals(1, searchResultAdvanced.getTotalHits());
        assertEquals(
                workflow.getWorkflowId(), searchResultAdvanced.getResults().get(0).getWorkflowId());

        SearchResult<Workflow> searchResultV2Advanced =
                workflowClient.searchV2(0, 1, null, null, "workflowType='" + def.getName() + "'");
        assertNotNull(searchResultV2Advanced);
        assertEquals(1, searchResultV2Advanced.getTotalHits());
        assertEquals(
                workflow.getWorkflowId(),
                searchResultV2Advanced.getResults().get(0).getWorkflowId());

        SearchResult<TaskSummary> taskSearchResult =
                taskClient.search("taskType='" + t0.getName() + "'");
        assertNotNull(taskSearchResult);
        assertEquals(1, searchResultV2Advanced.getTotalHits());
        assertEquals(t0.getName(), taskSearchResult.getResults().get(0).getTaskDefName());

        SearchResult<TaskSummary> taskSearchResultAdvanced =
                taskClient.search(0, 1, null, null, "taskType='" + t0.getName() + "'");
        assertNotNull(taskSearchResultAdvanced);
        assertEquals(1, taskSearchResultAdvanced.getTotalHits());
        assertEquals(t0.getName(), taskSearchResultAdvanced.getResults().get(0).getTaskDefName());

        SearchResult<Task> taskSearchResultV2 =
                taskClient.searchV2("taskType='" + t0.getName() + "'");
        assertNotNull(taskSearchResultV2);
        assertEquals(1, searchResultV2Advanced.getTotalHits());
        assertEquals(
                t0.getTaskReferenceName(),
                taskSearchResultV2.getResults().get(0).getReferenceTaskName());

        SearchResult<Task> taskSearchResultV2Advanced =
                taskClient.searchV2(0, 1, null, null, "taskType='" + t0.getName() + "'");
        assertNotNull(taskSearchResultV2Advanced);
        assertEquals(1, taskSearchResultV2Advanced.getTotalHits());
        assertEquals(
                t0.getTaskReferenceName(),
                taskSearchResultV2Advanced.getResults().get(0).getReferenceTaskName());

        workflowClient.terminateWorkflow(workflowId, "terminate reason");
        workflow = workflowClient.getWorkflow(workflowId, true);
        assertNotNull(workflow);
        assertEquals(WorkflowStatus.TERMINATED, workflow.getStatus());

        workflowClient.restart(workflowId, false);
        workflow = workflowClient.getWorkflow(workflowId, true);
        assertNotNull(workflow);
        assertEquals(WorkflowStatus.RUNNING, workflow.getStatus());
        assertEquals(1, workflow.getTasks().size());
    }
}