Back to Repositories

Testing PostgreSQL Distributed Locking Implementation in Conductor-OSS

This test suite validates the PostgreSQL-based locking mechanism in Conductor OSS, focusing on lock acquisition, release, and concurrent access patterns. It ensures reliable distributed locking functionality for workflow coordination.

Test Coverage Overview

The test suite provides comprehensive coverage of the PostgresLockDAO implementation.

Key areas tested include:
  • Lock acquisition and release mechanics
  • Lock expiration handling
  • Concurrent lock acquisition scenarios
  • Multiple distinct lock management
Integration points focus on PostgreSQL database interactions and Spring framework configuration.

Implementation Analysis

The testing approach employs Spring Boot test infrastructure with JUnit 4 framework. Tests utilize a combination of direct database verification and behavioral assertions to validate locking operations.

Key patterns include:
  • Database state verification using JDBC
  • Concurrent execution testing with ExecutorService
  • Temporal testing for lock expiration
  • Spring context configuration with test properties

Technical Details

Testing tools and configuration:
  • Spring Boot Test framework
  • Flyway for database migration
  • JUnit 4 with Spring Runner
  • PostgreSQL test database
  • Custom test property sources
  • DataSource autowiring for direct DB access

Best Practices Demonstrated

The test suite exemplifies robust testing practices for distributed systems.

Notable practices include:
  • Proper test isolation using database cleaning
  • Comprehensive concurrent operation testing
  • Explicit timing verification
  • Resource cleanup in concurrent tests
  • Clear test case organization
  • Thorough edge case coverage

conductor-oss/conductor

postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresLockDAOTest.java

            
/*
 * Copyright 2024 Conductor Authors.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.conductor.postgres.dao;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.*;

import javax.sql.DataSource;

import org.flywaydb.core.Flyway;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
import com.netflix.conductor.postgres.config.PostgresConfiguration;

import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@RunWith(SpringRunner.class)
@ContextConfiguration(
        classes = {
            TestObjectMapperConfiguration.class,
            PostgresConfiguration.class,
            FlywayAutoConfiguration.class
        })
@TestPropertySource(
        properties = {
            "conductor.workflow-execution-lock.type=postgres",
            "spring.flyway.clean-disabled=false",
            "conductor.app.workflow.name-validation.enabled=true"
        })
@SpringBootTest
public class PostgresLockDAOTest {

    @Autowired private PostgresLockDAO postgresLock;

    @Autowired private DataSource dataSource;

    @Autowired private Flyway flyway;

    @Before
    public void before() {
        flyway.migrate(); // Clean and migrate the database before each test.
    }

    @Test
    public void testLockAcquisitionAndRelease() throws SQLException {
        String lockId = UUID.randomUUID().toString();
        Instant beforeAcquisitionTimeUtc = Instant.now();
        long leaseTime = 2000;

        try (var connection = dataSource.getConnection()) {
            assertTrue(
                    postgresLock.acquireLock(lockId, 500, leaseTime, TimeUnit.MILLISECONDS),
                    "Lock acquisition failed");
            Instant afterAcquisitionTimeUtc = Instant.now();

            try (var ps = connection.prepareStatement("SELECT * FROM locks WHERE lock_id = ?")) {
                ps.setString(1, lockId);
                var rs = ps.executeQuery();

                if (rs.next()) {
                    assertEquals(lockId, rs.getString("lock_id"));
                    long leaseExpirationTime = rs.getTimestamp("lease_expiration").getTime();
                    assertTrue(
                            leaseExpirationTime
                                    >= beforeAcquisitionTimeUtc
                                            .plusMillis(leaseTime)
                                            .toEpochMilli(),
                            "Lease expiration is too early");
                    assertTrue(
                            leaseExpirationTime
                                    <= afterAcquisitionTimeUtc.plusMillis(leaseTime).toEpochMilli(),
                            "Lease expiration is too late");
                } else {
                    Assertions.fail("Lock not found in the database");
                }
            }

            postgresLock.releaseLock(lockId);

            try (PreparedStatement ps =
                    connection.prepareStatement("SELECT * FROM locks WHERE lock_id = ?")) {
                ps.setString(1, lockId);
                var rs = ps.executeQuery();
                Assertions.assertFalse(rs.next(), "Lock was not released properly");
            }
        }
    }

    @Test
    public void testExpiredLockCanBeAcquiredAgain() throws InterruptedException {
        String lockId = UUID.randomUUID().toString();
        assertTrue(
                postgresLock.acquireLock(lockId, 500, 500, TimeUnit.MILLISECONDS),
                "First lock acquisition failed");

        Thread.sleep(1000); // Ensure the lock has expired.

        assertTrue(
                postgresLock.acquireLock(lockId, 500, 500, TimeUnit.MILLISECONDS),
                "Lock acquisition after expiration failed");

        postgresLock.releaseLock(lockId);
    }

    @Test
    public void testConcurrentLockAcquisition() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        String lockId = UUID.randomUUID().toString();

        Future<Boolean> future1 =
                executorService.submit(
                        () -> postgresLock.acquireLock(lockId, 2000, TimeUnit.MILLISECONDS));
        Future<Boolean> future2 =
                executorService.submit(
                        () -> postgresLock.acquireLock(lockId, 2000, TimeUnit.MILLISECONDS));

        assertTrue(
                future1.get()
                        ^ future2.get()); // One of the futures should hold the lock, the other
        // should get rejected

        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);

        postgresLock.releaseLock(lockId);
    }

    @Test
    public void testDifferentLockCanBeAcquiredConcurrently() {
        String lockId1 = UUID.randomUUID().toString();
        String lockId2 = UUID.randomUUID().toString();

        assertTrue(postgresLock.acquireLock(lockId1, 2000, 10000, TimeUnit.MILLISECONDS));
        assertTrue(postgresLock.acquireLock(lockId2, 2000, 10000, TimeUnit.MILLISECONDS));
    }
}