Back to Repositories

Testing Cumulative Thread Pool Event Counting in Netflix Hystrix

This test suite validates the CumulativeThreadPoolEventCounterStream functionality in Netflix’s Hystrix library, focusing on thread pool metrics collection and event counting. The tests verify how different command execution scenarios affect thread pool counters and ensure proper event accumulation over time.

Test Coverage Overview

The test suite provides comprehensive coverage of thread pool event counting scenarios in Hystrix.

Key areas tested include:
  • Empty stream behavior
  • Success, failure, and timeout scenarios
  • Thread pool rejection handling
  • Semaphore rejection cases
  • Cache response handling
  • Circuit breaker interaction

Implementation Analysis

The testing approach uses JUnit to validate the CumulativeThreadPoolEventCounterStream implementation. Tests utilize mock commands and subscribers to verify event counting behavior, with particular attention to thread pool execution and rejection metrics. The implementation leverages RxJava observables for stream handling and employs CountDownLatch for synchronization.

Technical Details

Testing tools and configuration:
  • JUnit test framework
  • RxJava for reactive streams
  • HystrixRequestContext for request isolation
  • Mock command implementations
  • Configurable thread pools and semaphores
  • Stream observation windows of 10ms with 100ms intervals

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices for concurrent systems.

Notable practices include:
  • Proper test isolation using setUp/tearDown
  • Comprehensive edge case coverage
  • Thorough verification of concurrent behavior
  • Clear test scenario organization
  • Effective use of assertions and timing controls

netflix/hystrix

hystrix-core/src/test/java/com/netflix/hystrix/metric/consumer/CumulativeThreadPoolEventCounterStreamTest.java

            
/**
 * Copyright 2016 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixRequestLog;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.metric.CommandStreamTest;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import rx.Subscriber;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

public class CumulativeThreadPoolEventCounterStreamTest extends CommandStreamTest {
    HystrixRequestContext context;
    CumulativeThreadPoolEventCounterStream stream;

    private static Subscriber<long[]> getSubscriber(final CountDownLatch latch) {
        return new Subscriber<long[]>() {
            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                fail(e.getMessage());
            }

            @Override
            public void onNext(long[] eventCounts) {
                System.out.println("OnNext @ " + System.currentTimeMillis() + " : " + eventCounts[0] + " : " + eventCounts[1]);
            }
        };
    }

    @Before
    public void setUp() {
        context = HystrixRequestContext.initializeContext();
    }

    @After
    public void tearDown() {
        context.shutdown();
        stream.unsubscribe();
        RollingCommandEventCounterStream.reset();
    }

    @Test
    public void testEmptyStreamProducesZeros() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-A");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-A");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-A");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        //no writes

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testSingleSuccess() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-B");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-B");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-B");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);

        cmd.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testSingleFailure() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-C");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-C");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-C");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);

        cmd.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testSingleTimeout() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-D");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-D");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-D");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.TIMEOUT);

        cmd.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testSingleBadRequest() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-E");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-E");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-E");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.BAD_REQUEST);

        cmd.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testRequestFromCache() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-F");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-F");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-F");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);
        CommandStreamTest.Command cmd2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.RESPONSE_FROM_CACHE);
        CommandStreamTest.Command cmd3 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.RESPONSE_FROM_CACHE);

        cmd1.observe();
        cmd2.observe();
        cmd3.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }

        System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());

        //RESPONSE_FROM_CACHE should not show up at all in thread pool counters - just the success
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testShortCircuited() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-G");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-G");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-G");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        //3 failures in a row will trip circuit.  let bucket roll once then submit 2 requests.
        //should see 3 FAILUREs and 2 SHORT_CIRCUITs and each should see a FALLBACK_SUCCESS

        CommandStreamTest.Command failure1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);
        CommandStreamTest.Command failure2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);
        CommandStreamTest.Command failure3 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20);

        CommandStreamTest.Command shortCircuit1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS);
        CommandStreamTest.Command shortCircuit2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS);

        failure1.observe();
        failure2.observe();
        failure3.observe();

        try {
            Thread.sleep(100);
        } catch (InterruptedException ie) {
            fail(ie.getMessage());
        }

        shortCircuit1.observe();
        shortCircuit2.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }

        System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
        assertTrue(shortCircuit1.isResponseShortCircuited());
        assertTrue(shortCircuit2.isResponseShortCircuited());

        //only the FAILUREs should show up in thread pool counters
        assertEquals(2, stream.getLatest().length);
        assertEquals(3, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testSemaphoreRejected() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-H");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-H");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-H");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        //10 commands will saturate semaphore when called from different threads.
        //submit 2 more requests and they should be SEMAPHORE_REJECTED
        //should see 10 SUCCESSes, 2 SEMAPHORE_REJECTED and 2 FALLBACK_SUCCESSes

        List<Command> saturators = new ArrayList<Command>();

        for (int i = 0; i < 10; i++) {
            saturators.add(CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 300, HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE));
        }

        CommandStreamTest.Command rejected1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 0, HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE);
        CommandStreamTest.Command rejected2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 0, HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE);

        for (final CommandStreamTest.Command saturator : saturators) {
            new Thread(new HystrixContextRunnable(new Runnable() {
                @Override
                public void run() {
                    saturator.observe();
                }
            })).start();
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException ie) {
            fail(ie.getMessage());
        }

        rejected1.observe();
        rejected2.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }

        System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
        assertTrue(rejected1.isResponseSemaphoreRejected());
        assertTrue(rejected2.isResponseSemaphoreRejected());

        //none of these got executed on a thread-pool, so thread pool metrics should be 0
        assertEquals(2, stream.getLatest().length);
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testThreadPoolRejected() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-I");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-I");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-I");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        //10 commands will saturate threadpools when called concurrently.
        //submit 2 more requests and they should be THREADPOOL_REJECTED
        //should see 10 SUCCESSes, 2 THREADPOOL_REJECTED and 2 FALLBACK_SUCCESSes

        List<CommandStreamTest.Command> saturators = new ArrayList<CommandStreamTest.Command>();

        for (int i = 0; i < 10; i++) {
            saturators.add(CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 700));
        }

        CommandStreamTest.Command rejected1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 0);
        CommandStreamTest.Command rejected2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 0);

        for (final CommandStreamTest.Command saturator : saturators) {
            saturator.observe();
        }

        try {
            Thread.sleep(100);
        } catch (InterruptedException ie) {
            fail(ie.getMessage());
        }

        rejected1.observe();
        rejected2.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }

        System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
        assertTrue(rejected1.isResponseThreadPoolRejected());
        assertTrue(rejected2.isResponseThreadPoolRejected());

        //all 12 commands got submitted to thread pool, 10 accepted, 2 rejected is expected
        assertEquals(2, stream.getLatest().length);
        assertEquals(10, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(2, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testFallbackFailure() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-J");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-J");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-J");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20, HystrixEventType.FALLBACK_FAILURE);

        cmd.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testFallbackMissing() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-K");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-K");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-K");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20, HystrixEventType.FALLBACK_MISSING);

        cmd.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }
        assertEquals(2, stream.getLatest().length);
        assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    @Test
    public void testFallbackRejection() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-L");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-L");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-L");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(10).subscribe(getSubscriber(latch));

        //fallback semaphore size is 5.  So let 5 commands saturate that semaphore, then
        //let 2 more commands go to fallback.  they should get rejected by the fallback-semaphore

        List<CommandStreamTest.Command> fallbackSaturators = new ArrayList<CommandStreamTest.Command>();
        for (int i = 0; i < 5; i++) {
            fallbackSaturators.add(CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20, HystrixEventType.FALLBACK_SUCCESS, 400));
        }

        CommandStreamTest.Command rejection1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20, HystrixEventType.FALLBACK_SUCCESS, 0);
        CommandStreamTest.Command rejection2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 20, HystrixEventType.FALLBACK_SUCCESS, 0);

        for (CommandStreamTest.Command saturator: fallbackSaturators) {
            saturator.observe();
        }

        try {
            Thread.sleep(70);
        } catch (InterruptedException ex) {
            fail(ex.getMessage());
        }

        rejection1.observe();
        rejection2.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }

        //all 7 commands executed on-thread, so should be executed according to thread-pool metrics
        assertEquals(2, stream.getLatest().length);
        assertEquals(7, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }

    //in a rolling window, take(30) would age out all counters.  in the cumulative count, we expect them to remain non-zero forever
    @Test
    public void testMultipleEventsOverTimeGetStoredAndDoNotAgeOut() {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("Cumulative-ThreadPool-M");
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("Cumulative-ThreadPool-M");
        HystrixCommandKey key = HystrixCommandKey.Factory.asKey("Cumulative-Counter-M");
        stream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, 10, 100);
        stream.startCachingStreamValuesIfUnstarted();

        final CountDownLatch latch = new CountDownLatch(1);
        stream.observe().take(30).subscribe(getSubscriber(latch));

        CommandStreamTest.Command cmd1 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.SUCCESS, 20);
        CommandStreamTest.Command cmd2 = CommandStreamTest.Command.from(groupKey, key, HystrixEventType.FAILURE, 10);

        cmd1.observe();
        cmd2.observe();

        try {
            assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
        } catch (InterruptedException ex) {
            fail("Interrupted ex");
        }

        //all commands should have aged out
        assertEquals(2, stream.getLatest().length);
        assertEquals(2, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
        assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
    }
}