Back to Repositories

Testing PostgreSQL Queue Listener Operations in Conductor OSS

This test suite validates the PostgreSQL queue listener functionality in Conductor OSS, focusing on message handling, queue state notifications, and real-time queue operations monitoring. It ensures reliable message processing and queue management in a PostgreSQL-backed Conductor workflow system.

Test Coverage Overview

The test suite provides comprehensive coverage of PostgreSQL queue listener operations:

  • Message readiness verification through hasMessagesReady()
  • Queue size monitoring and updates
  • Future message delivery timing
  • Queue state notifications handling
  • Message lifecycle operations (create, pop, delete)

Implementation Analysis

The testing approach utilizes Spring Boot test framework with JUnit 4, implementing isolated test cases for queue operations. Tests leverage TestContainers for PostgreSQL integration, with careful management of database state between tests.

Key patterns include database cleanup before each test, atomic transaction handling, and simulated queue notifications using pg_notify.

Technical Details

Testing infrastructure includes:

  • Spring Boot Test framework with custom property sources
  • PostgreSQL database with Flyway migrations
  • Custom DataSource configuration
  • JSON payload handling for notifications
  • Connection pooling and transaction management

Best Practices Demonstrated

The test suite exemplifies several testing best practices:

  • Proper test isolation with @Before cleanup
  • Comprehensive edge case coverage
  • Resource cleanup and connection management
  • Clear test method naming conventions
  • Effective use of assertions for verification

conductor-oss/conductor

postgres-persistence/src/test/java/com/netflix/conductor/postgres/util/PostgresQueueListenerTest.java

            
/*
 * Copyright 2023 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.postgres.util;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.*;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ObjectNode;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.postgres.config.PostgresConfiguration;
import com.netflix.conductor.postgres.config.PostgresProperties;

import static org.junit.Assert.*;

@ContextConfiguration(
        classes = {
            TestObjectMapperConfiguration.class,
            PostgresConfiguration.class,
            FlywayAutoConfiguration.class
        })
@RunWith(SpringRunner.class)
@TestPropertySource(
        properties = {
            "conductor.elasticsearch.version=0",
            "spring.flyway.clean-disabled=false",
            "conductor.database.type=postgres",
            "conductor.postgres.experimentalQueueNotify=true",
            "conductor.postgres.experimentalQueueNotifyStalePeriod=5000"
        })
@SpringBootTest
public class PostgresQueueListenerTest {

    private PostgresQueueListener listener;

    @Qualifier("dataSource")
    @Autowired
    private DataSource dataSource;

    @Autowired private PostgresProperties properties;

    private void clearDb() {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(true);
            conn.prepareStatement("truncate table queue_message").executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private void sendNotification(String queueName, int queueDepth, long nextDelivery) {
        JsonNodeFactory factory = JsonNodeFactory.instance;
        ObjectNode payload = factory.objectNode();
        ObjectNode queueNode = factory.objectNode();
        queueNode.put("depth", queueDepth);
        queueNode.put("nextDelivery", nextDelivery);
        payload.put("__now__", System.currentTimeMillis());
        payload.put(queueName, queueNode);

        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(true);
            PreparedStatement stmt =
                    conn.prepareStatement("SELECT pg_notify('conductor_queue_state', ?)");
            stmt.setString(1, payload.toString());
            stmt.execute();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private void createQueueMessage(String queue_name, String message_id) {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(true);
            PreparedStatement stmt =
                    conn.prepareStatement(
                            "INSERT INTO queue_message (deliver_on, queue_name, message_id, priority, offset_time_seconds, payload) VALUES (current_timestamp, ?,?,?,?,?)");
            stmt.setString(1, queue_name);
            stmt.setString(2, message_id);
            stmt.setInt(3, 0);
            stmt.setInt(4, 0);
            stmt.setString(5, "dummy-payload");
            stmt.execute();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private void popQueueMessage(String message_id) {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(true);
            PreparedStatement stmt =
                    conn.prepareStatement(
                            "UPDATE queue_message SET popped = TRUE where message_id = ?");
            stmt.setString(1, message_id);
            stmt.execute();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private void deleteQueueMessage(String message_id) {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(true);
            PreparedStatement stmt =
                    conn.prepareStatement("DELETE FROM queue_message where message_id = ?");
            stmt.setString(1, message_id);
            stmt.execute();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    @Before
    public void before() {
        listener = new PostgresQueueListener(dataSource, properties);
        clearDb();
    }

    @Test
    public void testHasReadyMessages() {
        assertFalse(listener.hasMessagesReady("dummy-task"));
        sendNotification("dummy-task", 3, System.currentTimeMillis() - 1);
        assertTrue(listener.hasMessagesReady("dummy-task"));
    }

    @Test
    public void testHasReadyMessagesInFuture() throws InterruptedException {
        assertFalse(listener.hasMessagesReady("dummy-task"));
        sendNotification("dummy-task", 3, System.currentTimeMillis() + 100);
        assertFalse(listener.hasMessagesReady("dummy-task"));
        Thread.sleep(101);
        assertTrue(listener.hasMessagesReady("dummy-task"));
    }

    @Test
    public void testGetSize() {
        assertEquals(0, listener.getSize("dummy-task").get().intValue());
        sendNotification("dummy-task", 3, System.currentTimeMillis() + 100);
        assertEquals(3, listener.getSize("dummy-task").get().intValue());
    }

    @Test
    public void testTrigger() throws InterruptedException {
        assertEquals(0, listener.getSize("dummy-task").get().intValue());
        assertFalse(listener.hasMessagesReady("dummy-task"));

        createQueueMessage("dummy-task", "dummy-id1");
        createQueueMessage("dummy-task", "dummy-id2");
        assertEquals(2, listener.getSize("dummy-task").get().intValue());
        assertTrue(listener.hasMessagesReady("dummy-task"));

        popQueueMessage("dummy-id2");
        assertEquals(1, listener.getSize("dummy-task").get().intValue());
        assertTrue(listener.hasMessagesReady("dummy-task"));

        deleteQueueMessage("dummy-id2");
        assertEquals(1, listener.getSize("dummy-task").get().intValue());
        assertTrue(listener.hasMessagesReady("dummy-task"));

        deleteQueueMessage("dummy-id1");
        assertEquals(0, listener.getSize("dummy-task").get().intValue());
        assertFalse(listener.hasMessagesReady("test-task"));
    }
}