Back to Repositories

Testing Redis Queue Operations with DynoQueueDAO in Conductor OSS

A comprehensive test suite for validating Redis-based queue operations in Conductor OSS, focusing on the DynoQueueDAO implementation. This test suite verifies message queuing, acknowledgment, and queue management functionalities using JUnit and Redis mock objects.

Test Coverage Overview

The test suite provides extensive coverage of queue operations including:
  • Message pushing and popping operations
  • Queue size verification
  • Duplicate message handling
  • Message acknowledgment
  • Queue flushing and removal operations
Integration points include Redis command execution and queue sharding strategy implementation.

Implementation Analysis

The testing approach utilizes JUnit with mock objects for Redis operations. The implementation follows a systematic pattern of testing queue operations sequentially, using JedisMock for simulating Redis commands and custom ShardSupplier for handling queue sharding scenarios.

Key patterns include setup isolation through @Before annotation and verification through multiple assertions.

Technical Details

Testing tools and configuration:
  • JUnit 4 testing framework
  • JedisMock for Redis command simulation
  • Custom ShardSupplier implementation
  • RedisQueues integration
  • DynoQueue implementation with sharding strategy

Best Practices Demonstrated

The test suite exemplifies several testing best practices:
  • Proper test initialization and cleanup
  • Comprehensive edge case coverage
  • Mock object usage for external dependencies
  • Clear test method organization
  • Effective assertion patterns

conductor-oss/conductor

redis-persistence/src/test/java/com/netflix/conductor/redis/dao/DynoQueueDAOTest.java

            
/*
 * Copyright 2020 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.redis.dao;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.redis.config.RedisProperties;
import com.netflix.conductor.redis.dynoqueue.RedisQueuesShardingStrategyProvider;
import com.netflix.conductor.redis.jedis.JedisMock;
import com.netflix.dyno.connectionpool.Host;
import com.netflix.dyno.queues.ShardSupplier;
import com.netflix.dyno.queues.redis.RedisQueues;
import com.netflix.dyno.queues.redis.sharding.ShardingStrategy;

import redis.clients.jedis.commands.JedisCommands;

import static com.netflix.conductor.redis.dynoqueue.RedisQueuesShardingStrategyProvider.LOCAL_ONLY_STRATEGY;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DynoQueueDAOTest {

    private QueueDAO queueDAO;

    @Before
    public void init() {
        RedisProperties properties = mock(RedisProperties.class);
        when(properties.getQueueShardingStrategy()).thenReturn(LOCAL_ONLY_STRATEGY);
        JedisCommands jedisMock = new JedisMock();
        ShardSupplier shardSupplier =
                new ShardSupplier() {

                    @Override
                    public Set<String> getQueueShards() {
                        return new HashSet<>(Collections.singletonList("a"));
                    }

                    @Override
                    public String getCurrentShard() {
                        return "a";
                    }

                    @Override
                    public String getShardForHost(Host host) {
                        return "a";
                    }
                };
        ShardingStrategy shardingStrategy =
                new RedisQueuesShardingStrategyProvider(shardSupplier, properties).get();
        RedisQueues redisQueues =
                new RedisQueues(
                        jedisMock, jedisMock, "", shardSupplier, 60_000, 60_000, shardingStrategy);
        queueDAO = new DynoQueueDAO(redisQueues);
    }

    @Rule public ExpectedException expected = ExpectedException.none();

    @Test
    public void test() {
        String queueName = "TestQueue";
        long offsetTimeInSecond = 0;

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.push(queueName, messageId, offsetTimeInSecond);
        }
        int size = queueDAO.getSize(queueName);
        assertEquals(10, size);
        Map<String, Long> details = queueDAO.queuesDetail();
        assertEquals(1, details.size());
        assertEquals(10L, details.get(queueName).longValue());

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }

        List<String> popped = queueDAO.pop(queueName, 10, 100);
        assertNotNull(popped);
        assertEquals(10, popped.size());

        Map<String, Map<String, Map<String, Long>>> verbose = queueDAO.queuesDetailVerbose();
        assertEquals(1, verbose.size());
        long shardSize = verbose.get(queueName).get("a").get("size");
        long unackedSize = verbose.get(queueName).get("a").get("uacked");
        assertEquals(0, shardSize);
        assertEquals(10, unackedSize);

        popped.forEach(messageId -> queueDAO.ack(queueName, messageId));

        verbose = queueDAO.queuesDetailVerbose();
        assertEquals(1, verbose.size());
        shardSize = verbose.get(queueName).get("a").get("size");
        unackedSize = verbose.get(queueName).get("a").get("uacked");
        assertEquals(0, shardSize);
        assertEquals(0, unackedSize);

        popped = queueDAO.pop(queueName, 10, 100);
        assertNotNull(popped);
        assertEquals(0, popped.size());

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }
        size = queueDAO.getSize(queueName);
        assertEquals(10, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.remove(queueName, messageId);
        }

        size = queueDAO.getSize(queueName);
        assertEquals(0, size);

        for (int i = 0; i < 10; i++) {
            String messageId = "msg" + i;
            queueDAO.pushIfNotExists(queueName, messageId, offsetTimeInSecond);
        }
        queueDAO.flush(queueName);
        size = queueDAO.getSize(queueName);
        assertEquals(0, size);
    }
}