Back to Repositories

Testing ZooKeeper Distributed Lock Implementation in spring-boot-demo

This test suite demonstrates distributed locking mechanisms using ZooKeeper in a Spring Boot application. It evaluates different approaches to handling concurrent operations through ZooKeeper-based locks, including manual locking, AOP-based locking, and a baseline test without locks.

Test Coverage Overview

The test suite covers three main scenarios for distributed locking:
  • Baseline concurrent operations without locks
  • AOP-based distributed locking using custom @ZooLock annotation
  • Manual distributed locking implementation using InterProcessMutex
Each test verifies the atomic decrement of a shared counter across 10,000 concurrent operations, demonstrating thread safety and distributed synchronization.

Implementation Analysis

The testing approach utilizes Spring’s test framework with JUnit4 integration.
  • Uses ExecutorService for concurrent operation simulation
  • Implements AspectJProxyFactory for AOP testing
  • Leverages CuratorFramework client for ZooKeeper operations
The tests demonstrate both declarative (AOP) and imperative (manual) locking patterns.

Technical Details

Testing infrastructure includes:
  • Spring Boot Test context with @SpringBootTest
  • JUnit4 with SpringRunner
  • Curator Framework for ZooKeeper client operations
  • Custom ZooLock annotation and aspect
  • ExecutorService configured with 1000 threads
  • One-minute test execution window

Best Practices Demonstrated

The test suite exemplifies robust distributed testing practices:
  • Proper resource cleanup and lock release handling
  • Timeout configurations for lock acquisition
  • Separation of concerns between locking mechanisms
  • Comprehensive logging for debugging
  • Consistent error handling patterns

xkcoding/spring-boot-demo

demo-zookeeper/src/test/java/com/xkcoding/zookeeper/SpringBootDemoZookeeperApplicationTests.java

            
package com.xkcoding.zookeeper;

import com.xkcoding.zookeeper.annotation.ZooLock;
import com.xkcoding.zookeeper.aspectj.ZooLockAspect;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.aop.aspectj.annotation.AspectJProxyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringBootDemoZookeeperApplicationTests {

    public Integer getCount() {
        return count;
    }

    private Integer count = 10000;
    private ExecutorService executorService = Executors.newFixedThreadPool(1000);

    @Autowired
    private CuratorFramework zkClient;

    /**
     * 不使用分布式锁,程序结束查看count的值是否为0
     */
    @Test
    public void test() throws InterruptedException {
        IntStream.range(0, 10000).forEach(i -> executorService.execute(this::doBuy));
        TimeUnit.MINUTES.sleep(1);
        log.error("count值为{}", count);
    }

    /**
     * 测试AOP分布式锁
     */
    @Test
    public void testAopLock() throws InterruptedException {
        // 测试类中使用AOP需要手动代理
        SpringBootDemoZookeeperApplicationTests target = new SpringBootDemoZookeeperApplicationTests();
        AspectJProxyFactory factory = new AspectJProxyFactory(target);
        ZooLockAspect aspect = new ZooLockAspect(zkClient);
        factory.addAspect(aspect);
        SpringBootDemoZookeeperApplicationTests proxy = factory.getProxy();
        IntStream.range(0, 10000).forEach(i -> executorService.execute(() -> proxy.aopBuy(i)));
        TimeUnit.MINUTES.sleep(1);
        log.error("count值为{}", proxy.getCount());
    }

    /**
     * 测试手动加锁
     */
    @Test
    public void testManualLock() throws InterruptedException {
        IntStream.range(0, 10000).forEach(i -> executorService.execute(this::manualBuy));
        TimeUnit.MINUTES.sleep(1);
        log.error("count值为{}", count);
    }

    @ZooLock(key = "buy", timeout = 1, timeUnit = TimeUnit.MINUTES)
    public void aopBuy(int userId) {
        log.info("{} 正在出库。。。", userId);
        doBuy();
        log.info("{} 扣库存成功。。。", userId);
    }

    public void manualBuy() {
        String lockPath = "/buy";
        log.info("try to buy sth.");
        try {
            InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
            try {
                if (lock.acquire(1, TimeUnit.MINUTES)) {
                    doBuy();
                    log.info("buy successfully!");
                }
            } finally {
                lock.release();
            }
        } catch (Exception e) {
            log.error("zk error");
        }
    }

    public void doBuy() {
        count--;
        log.info("count值为{}", count);
    }

}