Back to Repositories

Testing PostgreSQL Queue Management Operations in Conductor OSS

A comprehensive test suite for PostgreSQL queue operations in the Conductor OSS framework. This test class validates queue management functionality including message pushing, polling, acknowledgment, and deferred message handling.

Test Coverage Overview

The test suite provides extensive coverage of PostgreSQL queue operations with focus on message handling and queue state management.

  • Complex queue operations including push, pop, and ack
  • Message polling with priority handling
  • Deferred message processing
  • Unacknowledged message handling
  • Queue size and state verification

Implementation Analysis

The testing approach employs JUnit and Spring Boot Test framework features for database integration testing.

Key patterns include:
  • Database cleanup between tests using @Before
  • Spring context configuration for test environment
  • Flyway integration for database migrations
  • Transaction management and connection handling

Technical Details

Testing infrastructure leverages:

  • Spring Boot Test framework
  • JUnit 4 test runner
  • Flyway for database migrations
  • PostgreSQL datasource configuration
  • ObjectMapper for JSON handling
  • Custom Query utility class

Best Practices Demonstrated

The test suite showcases several testing best practices for database integration testing.

  • Proper test isolation through database cleanup
  • Comprehensive edge case handling
  • Issue-specific regression tests
  • Clear test method organization
  • Detailed assertion messages

conductor-oss/conductor

postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresQueueDAOTest.java

            
/*
 * Copyright 2023 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.postgres.dao;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.postgres.config.PostgresConfiguration;
import com.netflix.conductor.postgres.util.Query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@ContextConfiguration(
        classes = {
            TestObjectMapperConfiguration.class,
            PostgresConfiguration.class,
            FlywayAutoConfiguration.class
        })
@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.flyway.clean-disabled=false")
public class PostgresQueueDAOTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresQueueDAOTest.class);

    @Autowired private PostgresQueueDAO queueDAO;

    @Qualifier("dataSource")
    @Autowired
    private DataSource dataSource;

    @Autowired private ObjectMapper objectMapper;

    @Rule public TestName name = new TestName();

    @Autowired Flyway flyway;

    // clean the database between tests.
    @Before
    public void before() {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(true);
            String[] stmts =
                    new String[] {"truncate table queue;", "truncate table queue_message;"};
            for (String stmt : stmts) {
                conn.prepareStatement(stmt).executeUpdate();
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Test
    public void complexQueueTest() {
        String queueName = "TestQueue";
        long offsetTimeInSecond = 0;

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.push(queueName, messageId, offsetTimeInSecond);
        }
        int size = queueDAO.getSize(queueName);
        assertEquals(10, size);
        Map<String, Long> details = queueDAO.queuesDetail();
        assertEquals(1, details.size());
        assertEquals(10L, details.get(queueName).longValue());

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }

        List<String> popped = queueDAO.pop(queueName, 10, 100);
        assertNotNull(popped);
        assertEquals(10, popped.size());

        Map<String, Map<String, Map<String, Long>>> verbose = queueDAO.queuesDetailVerbose();
        assertEquals(1, verbose.size());
        long shardSize = verbose.get(queueName).get("a").get("size");
        long unackedSize = verbose.get(queueName).get("a").get("uacked");
        assertEquals(0, shardSize);
        assertEquals(10, unackedSize);

        popped.forEach(messageId -> queueDAO.ack(queueName, messageId));

        verbose = queueDAO.queuesDetailVerbose();
        assertEquals(1, verbose.size());
        shardSize = verbose.get(queueName).get("a").get("size");
        unackedSize = verbose.get(queueName).get("a").get("uacked");
        assertEquals(0, shardSize);
        assertEquals(0, unackedSize);

        popped = queueDAO.pop(queueName, 10, 100);
        assertNotNull(popped);
        assertEquals(0, popped.size());

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }
        size = queueDAO.getSize(queueName);
        assertEquals(10, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            assertTrue(queueDAO.containsMessage(queueName, messageId));
            queueDAO.remove(queueName, messageId);
        }

        size = queueDAO.getSize(queueName);
        assertEquals(0, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }
        queueDAO.flush(queueName);
        size = queueDAO.getSize(queueName);
        assertEquals(0, size);
    }

    /**
     * Test fix for https://github.com/Netflix/conductor/issues/399
     *
     * @since 1.8.2-rc5
     */
    @Test
    public void pollMessagesTest() {
        final List<Message> messages = new ArrayList<>();
        final String queueName = "issue399_testQueue";
        final int totalSize = 10;

        for (int i = 0; i < totalSize; i++) {
            String payload = "{\"id\": " + i + ", \"msg\":\"test " + i + "\"}";
            Message m = new Message("testmsg-" + i, payload, "");
            if (i % 2 == 0) {
                // Set priority on message with pair id
                m.setPriority(99 - i);
            }
            messages.add(m);
        }

        // Populate the queue with our test message batch
        queueDAO.push(queueName, ImmutableList.copyOf(messages));

        // Assert that all messages were persisted and no extras are in there
        assertEquals("Queue size mismatch", totalSize, queueDAO.getSize(queueName));

        List<Message> zeroPoll = queueDAO.pollMessages(queueName, 0, 10_000);
        assertTrue("Zero poll should be empty", zeroPoll.isEmpty());

        final int firstPollSize = 3;
        List<Message> firstPoll = queueDAO.pollMessages(queueName, firstPollSize, 10_000);
        assertNotNull("First poll was null", firstPoll);
        assertFalse("First poll was empty", firstPoll.isEmpty());
        assertEquals("First poll size mismatch", firstPollSize, firstPoll.size());

        final int secondPollSize = 4;
        List<Message> secondPoll = queueDAO.pollMessages(queueName, secondPollSize, 10_000);
        assertNotNull("Second poll was null", secondPoll);
        assertFalse("Second poll was empty", secondPoll.isEmpty());
        assertEquals("Second poll size mismatch", secondPollSize, secondPoll.size());

        // Assert that the total queue size hasn't changed
        assertEquals(
                "Total queue size should have remained the same",
                totalSize,
                queueDAO.getSize(queueName));

        // Assert that our un-popped messages match our expected size
        final long expectedSize = totalSize - firstPollSize - secondPollSize;
        try (Connection c = dataSource.getConnection()) {
            String UNPOPPED =
                    "SELECT COUNT(*) FROM queue_message WHERE queue_name = ? AND popped = false";
            try (Query q = new Query(objectMapper, c, UNPOPPED)) {
                long count = q.addParameter(queueName).executeCount();
                assertEquals("Remaining queue size mismatch", expectedSize, count);
            }
        } catch (Exception ex) {
            fail(ex.getMessage());
        }
    }

    /** Test fix for https://github.com/Netflix/conductor/issues/1892 */
    @Test
    public void containsMessageTest() {
        String queueName = "TestQueue";
        long offsetTimeInSecond = 0;

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.push(queueName, messageId, offsetTimeInSecond);
        }
        int size = queueDAO.getSize(queueName);
        assertEquals(10, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            assertTrue(queueDAO.containsMessage(queueName, messageId));
            queueDAO.remove(queueName, messageId);
        }
        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            assertFalse(queueDAO.containsMessage(queueName, messageId));
        }
    }

    /**
     * Test fix for https://github.com/Netflix/conductor/issues/448
     *
     * @since 1.8.2-rc5
     */
    @Test
    public void pollDeferredMessagesTest() throws InterruptedException {
        final List<Message> messages = new ArrayList<>();
        final String queueName = "issue448_testQueue";
        final int totalSize = 10;

        for (int i = 0; i < totalSize; i++) {
            int offset = 0;
            if (i < 5) {
                offset = 0;
            } else if (i == 6 || i == 7) {
                // Purposefully skipping id:5 to test out of order deliveries
                // Set id:6 and id:7 for a 2s delay to be picked up in the second polling batch
                offset = 5;
            } else {
                // Set all other queue messages to have enough of a delay that they won't
                // accidentally
                // be picked up.
                offset = 10_000 + i;
            }

            String payload = "{\"id\": " + i + ",\"offset_time_seconds\":" + offset + "}";
            Message m = new Message("testmsg-" + i, payload, "");
            messages.add(m);
            queueDAO.push(queueName, "testmsg-" + i, offset);
        }

        // Assert that all messages were persisted and no extras are in there
        assertEquals("Queue size mismatch", totalSize, queueDAO.getSize(queueName));

        final int firstPollSize = 4;
        List<Message> firstPoll = queueDAO.pollMessages(queueName, firstPollSize, 100);
        assertNotNull("First poll was null", firstPoll);
        assertFalse("First poll was empty", firstPoll.isEmpty());
        assertEquals("First poll size mismatch", firstPollSize, firstPoll.size());

        List<String> firstPollMessageIds =
                messages.stream()
                        .map(Message::getId)
                        .collect(Collectors.toList())
                        .subList(0, firstPollSize + 1);

        for (int i = 0; i < firstPollSize; i++) {
            String actual = firstPoll.get(i).getId();
            assertTrue("Unexpected Id: " + actual, firstPollMessageIds.contains(actual));
        }

        final int secondPollSize = 3;

        // Sleep a bit to get the next batch of messages
        LOGGER.info("Sleeping for second poll...");
        Thread.sleep(5_000);

        // Poll for many more messages than expected
        List<Message> secondPoll = queueDAO.pollMessages(queueName, secondPollSize + 10, 100);
        assertNotNull("Second poll was null", secondPoll);
        assertFalse("Second poll was empty", secondPoll.isEmpty());
        assertEquals("Second poll size mismatch", secondPollSize, secondPoll.size());

        List<String> expectedIds = Arrays.asList("testmsg-4", "testmsg-6", "testmsg-7");
        for (int i = 0; i < secondPollSize; i++) {
            String actual = secondPoll.get(i).getId();
            assertTrue("Unexpected Id: " + actual, expectedIds.contains(actual));
        }

        // Assert that the total queue size hasn't changed
        assertEquals(
                "Total queue size should have remained the same",
                totalSize,
                queueDAO.getSize(queueName));

        // Assert that our un-popped messages match our expected size
        final long expectedSize = totalSize - firstPollSize - secondPollSize;
        try (Connection c = dataSource.getConnection()) {
            String UNPOPPED =
                    "SELECT COUNT(*) FROM queue_message WHERE queue_name = ? AND popped = false";
            try (Query q = new Query(objectMapper, c, UNPOPPED)) {
                long count = q.addParameter(queueName).executeCount();
                assertEquals("Remaining queue size mismatch", expectedSize, count);
            }
        } catch (Exception ex) {
            fail(ex.getMessage());
        }
    }

    // @Test
    public void processUnacksTest() {
        processUnacks(
                () -> {
                    // Process unacks
                    queueDAO.processUnacks("process_unacks_test");
                },
                "process_unacks_test");
    }

    // @Test
    public void processAllUnacksTest() {
        processUnacks(
                () -> {
                    // Process all unacks
                    queueDAO.processAllUnacks();
                },
                "process_unacks_test");
    }

    private void processUnacks(Runnable unack, String queueName) {
        // Count of messages in the queue(s)
        final int count = 10;
        // Number of messages to process acks for
        final int unackedCount = 4;
        // A secondary queue to make sure we don't accidentally process other queues
        final String otherQueueName = "process_unacks_test_other_queue";

        // Create testing queue with some messages (but not all) that will be popped/acked.
        for (int i = 0; i < count; i++) {
            int offset = 0;
            if (i >= unackedCount) {
                offset = 1_000_000;
            }

            queueDAO.push(queueName, "unack-" + i, offset);
        }

        // Create a second queue to make sure that unacks don't occur for it
        for (int i = 0; i < count; i++) {
            queueDAO.push(otherQueueName, "other-" + i, 0);
        }

        // Poll for first batch of messages (should be equal to unackedCount)
        List<Message> polled = queueDAO.pollMessages(queueName, 100, 10_000);
        assertNotNull(polled);
        assertFalse(polled.isEmpty());
        assertEquals(unackedCount, polled.size());

        // Poll messages from the other queue so we know they don't get unacked later
        queueDAO.pollMessages(otherQueueName, 100, 10_000);

        // Ack one of the polled messages
        assertTrue(queueDAO.ack(queueName, "unack-1"));

        // Should have one less un-acked popped message in the queue
        Long uacked = queueDAO.queuesDetailVerbose().get(queueName).get("a").get("uacked");
        assertNotNull(uacked);
        assertEquals(uacked.longValue(), unackedCount - 1);

        unack.run();

        // Check uacks for both queues after processing
        Map<String, Map<String, Map<String, Long>>> details = queueDAO.queuesDetailVerbose();
        uacked = details.get(queueName).get("a").get("uacked");
        assertNotNull(uacked);
        assertEquals(
                "The messages that were polled should be unacked still",
                uacked.longValue(),
                unackedCount - 1);

        Long otherUacked = details.get(otherQueueName).get("a").get("uacked");
        assertNotNull(otherUacked);
        assertEquals(
                "Other queue should have all unacked messages", otherUacked.longValue(), count);

        Long size = queueDAO.queuesDetail().get(queueName);
        assertNotNull(size);
        assertEquals(size.longValue(), count - unackedCount);
    }
}