Back to Repositories

Testing HystrixUtilizationStream Metrics Collection in Netflix Hystrix

This test suite evaluates the functionality of HystrixUtilizationStream, focusing on stream behavior, subscription management, and data flow in Netflix’s Hystrix library. It validates metrics collection and utilization monitoring capabilities essential for circuit breaker implementation.

Test Coverage Overview

The test suite provides comprehensive coverage of HystrixUtilizationStream functionality, including:
  • Stream data validation and command execution monitoring
  • Multiple subscriber behavior and unsubscription scenarios
  • Concurrent stream processing with different consumption rates
  • Thread pool and command utilization tracking

Implementation Analysis

The testing approach employs JUnit framework with RxJava observables to validate stream behavior. Tests utilize multiple subscribers, timing controls, and atomic operations to verify concurrent processing and data consistency across different stream consumption patterns.

Key patterns include subscription lifecycle management, stream backpressure handling, and thread-safe metric collection.

Technical Details

Testing infrastructure includes:
  • JUnit test framework with custom HystrixRequestContextRule
  • RxJava for reactive stream processing
  • CountDownLatch for synchronization
  • AtomicBoolean/AtomicInteger for thread-safe state management
  • Custom test schedulers for controlling execution timing

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through:
  • Isolation of test scenarios using Before hooks
  • Comprehensive error handling and verification
  • Proper resource cleanup and unsubscription
  • Clear separation of test cases for different behavioral aspects
  • Effective use of assertions for validation

netflix/hystrix

hystrix-core/src/test/java/com/netflix/hystrix/metric/sample/HystrixUtilizationStreamTest.java

            
/**
 * Copyright 2016 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.netflix.hystrix.metric.sample;

import com.hystrix.junit.HystrixRequestContextRule;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.metric.CommandStreamTest;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class HystrixUtilizationStreamTest extends CommandStreamTest {

    @Rule
    public HystrixRequestContextRule ctx = new HystrixRequestContextRule();

    HystrixUtilizationStream stream;
    private final static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Util");
    private final static HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("Command");

    @Before
    public void init() {
        stream = HystrixUtilizationStream.getNonSingletonInstanceOnlyUsedInUnitTests(10);
    }

    @Test
    public void testStreamHasData() throws Exception {
        final AtomicBoolean commandShowsUp = new AtomicBoolean(false);
        final AtomicBoolean threadPoolShowsUp = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        final int NUM = 10;

        for (int i = 0; i < 2; i++) {
            HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
            cmd.observe();
        }

        stream.observe().take(NUM).subscribe(
                new Subscriber<HystrixUtilization>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
                        latch.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e);
                        latch.countDown();
                    }

                    @Override
                    public void onNext(HystrixUtilization utilization) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Received data with : " + utilization.getCommandUtilizationMap().size() + " commands");
                        if (utilization.getCommandUtilizationMap().containsKey(commandKey)) {
                            commandShowsUp.set(true);
                        }
                        if (!utilization.getThreadPoolUtilizationMap().isEmpty()) {
                            threadPoolShowsUp.set(true);
                        }
                    }
                });

        assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        assertTrue(commandShowsUp.get());
        assertTrue(threadPoolShowsUp.get());
    }

    @Test
    public void testTwoSubscribersOneUnsubscribes() throws Exception {
        final CountDownLatch latch1 = new CountDownLatch(1);
        final CountDownLatch latch2 = new CountDownLatch(1);
        final AtomicInteger payloads1 = new AtomicInteger(0);
        final AtomicInteger payloads2 = new AtomicInteger(0);

        Subscription s1 = stream
                .observe()
                .take(100)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        latch1.countDown();
                    }
                })
                .subscribe(new Subscriber<HystrixUtilization>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted");
                        latch1.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e);
                        latch1.countDown();
                    }

                    @Override
                    public void onNext(HystrixUtilization utilization) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + utilization);
                        payloads1.incrementAndGet();
                    }
                });

        Subscription s2 = stream
                .observe()
                .take(100)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        latch2.countDown();
                    }
                })
                .subscribe(new Subscriber<HystrixUtilization>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted");
                        latch2.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e);
                        latch2.countDown();
                    }

                    @Override
                    public void onNext(HystrixUtilization utilization) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + utilization);
                        payloads2.incrementAndGet();
                    }
                });
        //execute 1 command, then unsubscribe from first stream. then execute the rest
        for (int i = 0; i < 50; i++) {
            HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
            cmd.execute();
            if (i == 1) {
                s1.unsubscribe();
            }
        }

        assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS));
        assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS));
        System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get());
        assertTrue("s1 got data", payloads1.get() > 0);
        assertTrue("s2 got data", payloads2.get() > 0);
        assertTrue("s1 got less data than s2", payloads2.get() > payloads1.get());
    }

    @Test
    public void testTwoSubscribersBothUnsubscribe() throws Exception {
        final CountDownLatch latch1 = new CountDownLatch(1);
        final CountDownLatch latch2 = new CountDownLatch(1);
        final AtomicInteger payloads1 = new AtomicInteger(0);
        final AtomicInteger payloads2 = new AtomicInteger(0);

        Subscription s1 = stream
                .observe()
                .take(100)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        latch1.countDown();
                    }
                })
                .subscribe(new Subscriber<HystrixUtilization>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted");
                        latch1.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e);
                        latch1.countDown();
                    }

                    @Override
                    public void onNext(HystrixUtilization utilization) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + utilization);
                        payloads1.incrementAndGet();
                    }
                });

        Subscription s2 = stream
                .observe()
                .take(100)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        latch2.countDown();
                    }
                })
                .subscribe(new Subscriber<HystrixUtilization>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted");
                        latch2.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e);
                        latch2.countDown();
                    }

                    @Override
                    public void onNext(HystrixUtilization utilization) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + utilization);
                        payloads2.incrementAndGet();
                    }
                });
        //execute 2 commands, then unsubscribe from both streams, then execute the rest
        for (int i = 0; i < 10; i++) {
            HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
            cmd.execute();
            if (i == 2) {
                s1.unsubscribe();
                s2.unsubscribe();
            }
        }
        assertFalse(stream.isSourceCurrentlySubscribed());  //both subscriptions have been cancelled - source should be too

        assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS));
        assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS));
        System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get());
        assertTrue("s1 got data", payloads1.get() > 0);
        assertTrue("s2 got data", payloads2.get() > 0);
    }

    @Test
    public void testTwoSubscribersOneSlowOneFast() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean foundError = new AtomicBoolean(false);

        Observable<HystrixUtilization> fast = stream
                .observe()
                .observeOn(Schedulers.newThread());
        Observable<HystrixUtilization> slow = stream
                .observe()
                .observeOn(Schedulers.newThread())
                .map(new Func1<HystrixUtilization, HystrixUtilization>() {
                    @Override
                    public HystrixUtilization call(HystrixUtilization util) {
                        try {
                            Thread.sleep(100);
                            return util;
                        } catch (InterruptedException ex) {
                            return util;
                        }
                    }
                });

        Observable<Boolean> checkZippedEqual = Observable.zip(fast, slow, new Func2<HystrixUtilization, HystrixUtilization, Boolean>() {
            @Override
            public Boolean call(HystrixUtilization payload, HystrixUtilization payload2) {
                return payload == payload2;
            }
        });

        Subscription s1 = checkZippedEqual
                .take(10000)
                .subscribe(new Subscriber<Boolean>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted");
                        latch.countDown();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e);
                        e.printStackTrace();
                        foundError.set(true);
                        latch.countDown();
                    }

                    @Override
                    public void onNext(Boolean b) {
                        //System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + b);
                    }
                });

        for (int i = 0; i < 50; i++) {
            HystrixCommand<Integer> cmd = Command.from(groupKey, commandKey, HystrixEventType.SUCCESS, 50);
            cmd.execute();
        }

        latch.await(10000, TimeUnit.MILLISECONDS);
        assertFalse(foundError.get());
    }
}