Back to Repositories

Testing MySQL Queue Implementation in conductor-oss

This test suite validates the MySQL queue implementation in Conductor OSS, focusing on message queue operations, deferred message handling, and unacknowledged message processing. It ensures reliable queue behavior for the conductor-oss workflow engine.

Test Coverage Overview

The test suite provides comprehensive coverage of MySQL queue operations with focus on message handling, queue state management, and edge cases.

  • Complex queue operations including push, pop, and acknowledgment
  • Message existence verification and duplicate handling
  • Deferred message processing with time offsets
  • Unacknowledged message handling and queue cleanup

Implementation Analysis

The testing approach uses Spring Boot test framework with JUnit for systematic validation of queue operations.

Key patterns include:
  • Database state verification using Flyway migrations
  • Transaction management with proper cleanup between tests
  • Asynchronous operation testing with time-based assertions
  • Integration with Spring’s dependency injection framework

Technical Details

Testing infrastructure leverages:

  • JUnit 4 test framework
  • Spring Boot Test for configuration
  • Flyway for database migration management
  • MySQL datasource configuration
  • Custom Query utility for database operations
  • ObjectMapper for JSON serialization

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through systematic validation and thorough error checking.

  • Proper test isolation with database cleanup
  • Comprehensive edge case coverage
  • Clear test method naming and organization
  • Detailed issue reproduction tests
  • Proper resource management and cleanup

conductor-oss/conductor

mysql-persistence/src/test/java/com/netflix/conductor/mysql/dao/MySQLQueueDAOTest.java

            
/*
 * Copyright 2023 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.mysql.dao;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.mysql.config.MySQLConfiguration;
import com.netflix.conductor.mysql.util.Query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@ContextConfiguration(
        classes = {
            TestObjectMapperConfiguration.class,
            MySQLConfiguration.class,
            FlywayAutoConfiguration.class
        })
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.flyway.clean-disabled=false")
public class MySQLQueueDAOTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(MySQLQueueDAOTest.class);

    @Autowired private MySQLQueueDAO queueDAO;

    @Autowired private ObjectMapper objectMapper;

    @Qualifier("dataSource")
    @Autowired
    private DataSource dataSource;

    @Autowired Flyway flyway;

    // clean the database between tests.
    @Before
    public void before() {
        flyway.clean();
        flyway.migrate();
    }

    @Test
    public void complexQueueTest() {
        String queueName = "TestQueue";
        long offsetTimeInSecond = 0;

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.push(queueName, messageId, offsetTimeInSecond);
        }
        int size = queueDAO.getSize(queueName);
        assertEquals(10, size);
        Map<String, Long> details = queueDAO.queuesDetail();
        assertEquals(1, details.size());
        assertEquals(10L, details.get(queueName).longValue());

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }

        List<String> popped = queueDAO.pop(queueName, 10, 100);
        assertNotNull(popped);
        assertEquals(10, popped.size());

        Map<String, Map<String, Map<String, Long>>> verbose = queueDAO.queuesDetailVerbose();
        assertEquals(1, verbose.size());
        long shardSize = verbose.get(queueName).get("a").get("size");
        long unackedSize = verbose.get(queueName).get("a").get("uacked");
        assertEquals(0, shardSize);
        assertEquals(10, unackedSize);

        popped.forEach(messageId -> queueDAO.ack(queueName, messageId));

        verbose = queueDAO.queuesDetailVerbose();
        assertEquals(1, verbose.size());
        shardSize = verbose.get(queueName).get("a").get("size");
        unackedSize = verbose.get(queueName).get("a").get("uacked");
        assertEquals(0, shardSize);
        assertEquals(0, unackedSize);

        popped = queueDAO.pop(queueName, 10, 100);
        assertNotNull(popped);
        assertEquals(0, popped.size());

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }
        size = queueDAO.getSize(queueName);
        assertEquals(10, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            assertTrue(queueDAO.containsMessage(queueName, messageId));
            queueDAO.remove(queueName, messageId);
        }

        size = queueDAO.getSize(queueName);
        assertEquals(0, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }
        queueDAO.flush(queueName);
        size = queueDAO.getSize(queueName);
        assertEquals(0, size);
    }

    /** Test fix for https://github.com/Netflix/conductor/issues/1892 */
    @Test
    public void containsMessageTest() {
        String queueName = "TestQueue";
        long offsetTimeInSecond = 0;

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.push(queueName, messageId, offsetTimeInSecond);
        }
        int size = queueDAO.getSize(queueName);
        assertEquals(10, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            assertTrue(queueDAO.containsMessage(queueName, messageId));
            queueDAO.remove(queueName, messageId);
        }
        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            assertFalse(queueDAO.containsMessage(queueName, messageId));
        }
    }

    /**
     * Test fix for https://github.com/Netflix/conductor/issues/399
     *
     * @since 1.8.2-rc5
     */
    @Test
    public void pollMessagesTest() {
        final List<Message> messages = new ArrayList<>();
        final String queueName = "issue399_testQueue";
        final int totalSize = 10;

        for (int i = 0; i < totalSize; i++) {
            String payload = "{\"id\": " + i + ", \"msg\":\"test " + i + "\"}";
            Message m = new Message("testmsg-" + i, payload, "");
            if (i % 2 == 0) {
                // Set priority on message with pair id
                m.setPriority(99 - i);
            }
            messages.add(m);
        }

        // Populate the queue with our test message batch
        queueDAO.push(queueName, ImmutableList.copyOf(messages));

        // Assert that all messages were persisted and no extras are in there
        assertEquals("Queue size mismatch", totalSize, queueDAO.getSize(queueName));

        final int firstPollSize = 3;
        List<Message> firstPoll = queueDAO.pollMessages(queueName, firstPollSize, 10_000);
        assertNotNull("First poll was null", firstPoll);
        assertFalse("First poll was empty", firstPoll.isEmpty());
        assertEquals("First poll size mismatch", firstPollSize, firstPoll.size());

        final int secondPollSize = 4;
        List<Message> secondPoll = queueDAO.pollMessages(queueName, secondPollSize, 10_000);
        assertNotNull("Second poll was null", secondPoll);
        assertFalse("Second poll was empty", secondPoll.isEmpty());
        assertEquals("Second poll size mismatch", secondPollSize, secondPoll.size());

        // Assert that the total queue size hasn't changed
        assertEquals(
                "Total queue size should have remained the same",
                totalSize,
                queueDAO.getSize(queueName));

        // Assert that our un-popped messages match our expected size
        final long expectedSize = totalSize - firstPollSize - secondPollSize;
        try (Connection c = dataSource.getConnection()) {
            String UNPOPPED =
                    "SELECT COUNT(*) FROM queue_message WHERE queue_name = ? AND popped = false";
            try (Query q = new Query(objectMapper, c, UNPOPPED)) {
                long count = q.addParameter(queueName).executeCount();
                assertEquals("Remaining queue size mismatch", expectedSize, count);
            }
        } catch (Exception ex) {
            fail(ex.getMessage());
        }
    }

    /**
     * Test fix for https://github.com/Netflix/conductor/issues/448
     *
     * @since 1.8.2-rc5
     */
    @Test
    public void pollDeferredMessagesTest() throws InterruptedException {
        final List<Message> messages = new ArrayList<>();
        final String queueName = "issue448_testQueue";
        final int totalSize = 10;

        for (int i = 0; i < totalSize; i++) {
            int offset = 0;
            if (i < 5) {
                offset = 0;
            } else if (i == 6 || i == 7) {
                // Purposefully skipping id:5 to test out of order deliveries
                // Set id:6 and id:7 for a 2s delay to be picked up in the second polling batch
                offset = 5;
            } else {
                // Set all other queue messages to have enough of a delay that they won't
                // accidentally
                // be picked up.
                offset = 10_000 + i;
            }

            String payload = "{\"id\": " + i + ",\"offset_time_seconds\":" + offset + "}";
            Message m = new Message("testmsg-" + i, payload, "");
            messages.add(m);
            queueDAO.push(queueName, "testmsg-" + i, offset);
        }

        // Assert that all messages were persisted and no extras are in there
        assertEquals("Queue size mismatch", totalSize, queueDAO.getSize(queueName));

        final int firstPollSize = 4;
        List<Message> firstPoll = queueDAO.pollMessages(queueName, firstPollSize, 100);
        assertNotNull("First poll was null", firstPoll);
        assertFalse("First poll was empty", firstPoll.isEmpty());
        assertEquals("First poll size mismatch", firstPollSize, firstPoll.size());

        List<String> firstPollMessageIds =
                messages.stream()
                        .map(Message::getId)
                        .collect(Collectors.toList())
                        .subList(0, firstPollSize + 1);

        for (int i = 0; i < firstPollSize; i++) {
            String actual = firstPoll.get(i).getId();
            assertTrue("Unexpected Id: " + actual, firstPollMessageIds.contains(actual));
        }

        final int secondPollSize = 3;

        // Sleep a bit to get the next batch of messages
        LOGGER.debug("Sleeping for second poll...");
        Thread.sleep(5_000);

        // Poll for many more messages than expected
        List<Message> secondPoll = queueDAO.pollMessages(queueName, secondPollSize + 10, 100);
        assertNotNull("Second poll was null", secondPoll);
        assertFalse("Second poll was empty", secondPoll.isEmpty());
        assertEquals("Second poll size mismatch", secondPollSize, secondPoll.size());

        List<String> expectedIds = Arrays.asList("testmsg-4", "testmsg-6", "testmsg-7");
        for (int i = 0; i < secondPollSize; i++) {
            String actual = secondPoll.get(i).getId();
            assertTrue("Unexpected Id: " + actual, expectedIds.contains(actual));
        }

        // Assert that the total queue size hasn't changed
        assertEquals(
                "Total queue size should have remained the same",
                totalSize,
                queueDAO.getSize(queueName));

        // Assert that our un-popped messages match our expected size
        final long expectedSize = totalSize - firstPollSize - secondPollSize;
        try (Connection c = dataSource.getConnection()) {
            String UNPOPPED =
                    "SELECT COUNT(*) FROM queue_message WHERE queue_name = ? AND popped = false";
            try (Query q = new Query(objectMapper, c, UNPOPPED)) {
                long count = q.addParameter(queueName).executeCount();
                assertEquals("Remaining queue size mismatch", expectedSize, count);
            }
        } catch (Exception ex) {
            fail(ex.getMessage());
        }
    }

    @Test
    public void processUnacksTest() {
        final String queueName = "process_unacks_test";
        // Count of messages in the queue(s)
        final int count = 10;
        // Number of messages to process acks for
        final int unackedCount = 4;
        // A secondary queue to make sure we don't accidentally process other queues
        final String otherQueueName = "process_unacks_test_other_queue";

        // Create testing queue with some messages (but not all) that will be popped/acked.
        for (int i = 0; i < count; i++) {
            int offset = 0;
            if (i >= unackedCount) {
                offset = 1_000_000;
            }

            queueDAO.push(queueName, "unack-" + i, offset);
        }

        // Create a second queue to make sure that unacks don't occur for it
        for (int i = 0; i < count; i++) {
            queueDAO.push(otherQueueName, "other-" + i, 0);
        }

        // Poll for first batch of messages (should be equal to unackedCount)
        List<Message> polled = queueDAO.pollMessages(queueName, 100, 10_000);
        assertNotNull(polled);
        assertFalse(polled.isEmpty());
        assertEquals(unackedCount, polled.size());

        // Poll messages from the other queue so we know they don't get unacked later
        queueDAO.pollMessages(otherQueueName, 100, 10_000);

        // Ack one of the polled messages
        assertTrue(queueDAO.ack(queueName, "unack-1"));

        // Should have one less un-acked popped message in the queue
        Long uacked = queueDAO.queuesDetailVerbose().get(queueName).get("a").get("uacked");
        assertNotNull(uacked);
        assertEquals(uacked.longValue(), unackedCount - 1);

        // Process unacks
        queueDAO.processUnacks(queueName);

        // Check uacks for both queues after processing
        Map<String, Map<String, Map<String, Long>>> details = queueDAO.queuesDetailVerbose();
        uacked = details.get(queueName).get("a").get("uacked");
        assertNotNull(uacked);
        assertEquals(
                "The messages that were polled should be unacked still",
                uacked.longValue(),
                unackedCount - 1);

        Long otherUacked = details.get(otherQueueName).get("a").get("uacked");
        assertNotNull(otherUacked);
        assertEquals(
                "Other queue should have all unacked messages", otherUacked.longValue(), count);

        Long size = queueDAO.queuesDetail().get(queueName);
        assertNotNull(size);
        assertEquals(size.longValue(), count - unackedCount);
    }
}