Back to Repositories

Testing Android Handler Scheduler Implementation in ReactiveX/RxAndroid

This test suite validates the HandlerScheduler implementation in RxAndroid, focusing on Android’s Handler-based scheduling mechanisms. It verifies both direct and worker-based scheduling operations, including immediate execution, delayed tasks, and periodic scheduling behaviors.

Test Coverage Overview

The test suite provides comprehensive coverage of HandlerScheduler functionality:
  • Direct scheduling operations including immediate, delayed and periodic tasks
  • Worker-based scheduling with disposal and cancellation scenarios
  • Edge cases like negative delays and error handling
  • Integration with Android’s Handler and Looper mechanisms

Implementation Analysis

The testing approach uses parameterized tests to verify both synchronous and asynchronous scheduling modes. It employs Robolectric for Android runtime simulation and implements detailed verification of scheduling behaviors through controlled task execution and timing.

Key patterns include message queue manipulation, looper control, and systematic validation of disposal mechanics.

Technical Details

Testing tools and configuration:
  • JUnit with Robolectric for Android environment simulation
  • ShadowLooper for main thread task control
  • Custom CountingRunnable for execution verification
  • RxJavaPlugins integration for error handling and scheduling hooks

Best Practices Demonstrated

The test suite exemplifies high-quality testing practices through thorough setup/teardown management, comprehensive error case coverage, and careful state validation. Notable practices include isolated test cases, proper resource cleanup, and extensive validation of scheduling behaviors across different timing scenarios.

reactivex/rxandroid

rxandroid/src/test/java/io/reactivex/rxjava3/android/schedulers/HandlerSchedulerTest.java

            
/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.reactivex.rxjava3.android.schedulers;

import android.os.Handler;
import android.os.Looper;
import android.os.Message;

import io.reactivex.rxjava3.android.testutil.CountingRunnable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.ParameterizedRobolectricTestRunner;
import org.robolectric.annotation.Config;
import org.robolectric.shadows.ShadowLooper;
import org.robolectric.shadows.ShadowMessageQueue;

import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.robolectric.Shadows.shadowOf;
import static org.robolectric.shadows.ShadowLooper.idleMainLooper;
import static org.robolectric.shadows.ShadowLooper.pauseMainLooper;
import static org.robolectric.shadows.ShadowLooper.runUiThreadTasks;
import static org.robolectric.shadows.ShadowLooper.runUiThreadTasksIncludingDelayedTasks;
import static org.robolectric.shadows.ShadowLooper.unPauseMainLooper;

@RunWith(ParameterizedRobolectricTestRunner.class)
@Config(manifest=Config.NONE, sdk = 16)
public final class HandlerSchedulerTest {

    @ParameterizedRobolectricTestRunner.Parameters(name = "async = {0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[][]{
            {true},
            {false}
        });
    }

    private final Scheduler scheduler;
    private final boolean async;

    public HandlerSchedulerTest(boolean async) {
        this.scheduler = new HandlerScheduler(new Handler(Looper.getMainLooper()), async);
        this.async = async;
    }

    @Before
    public void setUp() {
        RxJavaPlugins.reset();
        pauseMainLooper(); // Take manual control of looper task queue.
    }

    @After
    public void tearDown() {
        RxJavaPlugins.reset();
        unPauseMainLooper();
    }

    @Test
    public void directScheduleOncePostsImmediately() {
        CountingRunnable counter = new CountingRunnable();
        scheduler.scheduleDirect(counter);

        runUiThreadTasks();
        assertEquals(1, counter.get());
    }

    @Test
    public void directScheduleOnceWithNegativeDelayPostsImmediately() {
        CountingRunnable counter = new CountingRunnable();
        scheduler.scheduleDirect(counter, -1, TimeUnit.MINUTES);

        runUiThreadTasks();
        assertEquals(1, counter.get());
    }

    @Test
    public void directScheduleOnceUsesHook() {
        final CountingRunnable newCounter = new CountingRunnable();
        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                runnableRef.set(runnable);
                return newCounter;
            }
        });

        CountingRunnable counter = new CountingRunnable();
        scheduler.scheduleDirect(counter);

        // Verify our runnable was passed to the schedulers hook.
        assertSame(counter, runnableRef.get());

        runUiThreadTasks();
        // Verify the scheduled runnable was the one returned from the hook.
        assertEquals(1, newCounter.get());
        assertEquals(0, counter.get());
    }

    @Test
    public void directScheduleOnceDisposedDoesNotRun() {
        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = scheduler.scheduleDirect(counter);
        disposable.dispose();

        runUiThreadTasks();
        assertEquals(0, counter.get());
    }

    @Test
    public void directScheduleOnceWithDelayPostsWithDelay() {
        CountingRunnable counter = new CountingRunnable();
        scheduler.scheduleDirect(counter, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());
    }

    @Test
    public void directScheduleOnceWithDelayUsesHook() {
        final CountingRunnable newCounter = new CountingRunnable();
        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                runnableRef.set(runnable);
                return newCounter;
            }
        });

        CountingRunnable counter = new CountingRunnable();
        scheduler.scheduleDirect(counter, 1, MINUTES);

        // Verify our runnable was passed to the schedulers hook.
        assertSame(counter, runnableRef.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        // Verify the scheduled runnable was the one returned from the hook.
        assertEquals(1, newCounter.get());
        assertEquals(0, counter.get());
    }

    @Test
    public void directScheduleOnceWithDelayDisposedDoesNotRun() {
        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = scheduler.scheduleDirect(counter, 1, MINUTES);

        idleMainLooper(30, SECONDS);
        disposable.dispose();

        idleMainLooper(30, SECONDS);
        runUiThreadTasks();
        assertEquals(0, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void directSchedulePeriodicallyReschedulesItself() {
        CountingRunnable counter = new CountingRunnable();
        scheduler.schedulePeriodicallyDirect(counter, 1, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(3, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void directSchedulePeriodicallyUsesHookOnce() {
        final CountingRunnable newCounter = new CountingRunnable();
        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                runnableRef.set(runnable);
                return newCounter;
            }
        });

        CountingRunnable counter = new CountingRunnable();
        scheduler.schedulePeriodicallyDirect(counter, 1, 1, MINUTES);

        // Verify our action was passed to the schedulers hook.
        assertSame(counter, runnableRef.get());
        runnableRef.set(null);

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        // Verify the scheduled action was the one returned from the hook.
        assertEquals(1, newCounter.get());
        assertEquals(0, counter.get());

        // Ensure the hook was not called again when the runnable re-scheduled itself.
        assertNull(runnableRef.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void directSchedulePeriodicallyDisposedDoesNotRun() {
        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = scheduler.schedulePeriodicallyDirect(counter, 1, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        disposable.dispose();

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void directSchedulePeriodicallyDisposedDuringRunDoesNotReschedule() {
        final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
        CountingRunnable counter = new CountingRunnable() {
            @Override public void run() {
                super.run();
                if (get() == 2) {
                    disposableRef.get().dispose();
                }
            }
        };
        Disposable disposable = scheduler.schedulePeriodicallyDirect(counter, 1, 1, MINUTES);
        disposableRef.set(disposable);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        // Dispose will have happened here during the last run() execution.

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void directSchedulePeriodicallyThrowingDoesNotReschedule() {
        CountingRunnable counter = new CountingRunnable() {
            @Override public void run() {
                super.run();
                if (get() == 2) {
                    throw new RuntimeException("Broken!");
                }
            }
        };
        scheduler.schedulePeriodicallyDirect(counter, 1, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        // Exception will have happened here during the last run() execution.

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());
    }

    @Test
    public void workerScheduleOncePostsImmediately() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter);

        runUiThreadTasks();
        assertEquals(1, counter.get());
    }

    @Test
    public void workerScheduleOnceWithNegativeDelayPostsImmediately() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter, -1, TimeUnit.MINUTES);

        runUiThreadTasks();
        assertEquals(1, counter.get());
    }

    @Test
    public void workerScheduleOnceUsesHook() {
        final CountingRunnable newCounter = new CountingRunnable();
        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                runnableRef.set(runnable);
                return newCounter;
            }
        });

        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter);

        // Verify our runnable was passed to the schedulers hook.
        assertSame(counter, runnableRef.get());

        runUiThreadTasks();
        // Verify the scheduled runnable was the one returned from the hook.
        assertEquals(1, newCounter.get());
        assertEquals(0, counter.get());
    }

    @Test
    public void workerScheduleOnceDisposedDoesNotRun() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = worker.schedule(counter);
        disposable.dispose();

        runUiThreadTasks();
        assertEquals(0, counter.get());
    }

    @Test
    public void workerScheduleOnceWithDelayPostsWithDelay() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());
    }

    @Test
    public void workerScheduleOnceWithDelayUsesHook() {
        final CountingRunnable newCounter = new CountingRunnable();
        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                runnableRef.set(runnable);
                return newCounter;
            }
        });

        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter, 1, MINUTES);

        // Verify our runnable was passed to the schedulers hook.
        assertSame(counter, runnableRef.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        // Verify the scheduled runnable was the one returned from the hook.
        assertEquals(1, newCounter.get());
        assertEquals(0, counter.get());
    }

    @Test
    public void workerScheduleOnceWithDelayDisposedDoesNotRun() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = worker.schedule(counter, 1, MINUTES);

        idleMainLooper(30, SECONDS);
        disposable.dispose();

        idleMainLooper(30, SECONDS);
        runUiThreadTasks();
        assertEquals(0, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void workerSchedulePeriodicallyReschedulesItself() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedulePeriodically(counter, 1, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(3, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void workerSchedulePeriodicallyUsesHookOnce() {
        Worker worker = scheduler.createWorker();

        final CountingRunnable newCounter = new CountingRunnable();
        final AtomicReference<Runnable> runnableRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                runnableRef.set(runnable);
                return newCounter;
            }
        });

        CountingRunnable counter = new CountingRunnable();
        worker.schedulePeriodically(counter, 1, 1, MINUTES);

        // Verify our action was passed to the schedulers hook.
        assertSame(counter, runnableRef.get());
        runnableRef.set(null);

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        // Verify the scheduled action was the one returned from the hook.
        assertEquals(1, newCounter.get());
        assertEquals(0, counter.get());

        // Ensure the hook was not called again when the runnable re-scheduled itself.
        assertNull(runnableRef.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void workerSchedulePeriodicallyDisposedDoesNotRun() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        disposable.dispose();

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void workerSchedulePeriodicallyDisposedDuringRunDoesNotReschedule() {
        Worker worker = scheduler.createWorker();

        final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
        CountingRunnable counter = new CountingRunnable() {
            @Override public void run() {
                super.run();
                if (get() == 2) {
                    disposableRef.get().dispose();
                }
            }
        };
        Disposable disposable = worker.schedulePeriodically(counter, 1, 1, MINUTES);
        disposableRef.set(disposable);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        // Dispose will have happened here during the last run() execution.

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void workerSchedulePeriodicallyThrowingDoesNotReschedule() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable() {
            @Override public void run() {
                super.run();
                if (get() == 2) {
                    throw new RuntimeException("Broken!");
                }
            }
        };
        worker.schedulePeriodically(counter, 1, 1, MINUTES);

        runUiThreadTasks();
        assertEquals(0, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(1, counter.get());

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());

        // Exception will have happened here during the last run() execution.

        idleMainLooper(1, MINUTES);
        runUiThreadTasks();
        assertEquals(2, counter.get());
    }

    @Test
    public void workerDisposableTracksDisposedState() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        Disposable disposable = worker.schedule(counter);
        assertFalse(disposable.isDisposed());

        disposable.dispose();
        assertTrue(disposable.isDisposed());
    }

    @Test
    public void workerUnsubscriptionDuringSchedulingCancelsScheduledAction() {
        final AtomicReference<Worker> workerRef = new AtomicReference<>();
        RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
            @Override public Runnable apply(Runnable runnable) {
                // Purposefully unsubscribe in an asinine point after the normal unsubscribed check.
                workerRef.get().dispose();
                return runnable;
            }
        });

        Worker worker = scheduler.createWorker();
        workerRef.set(worker);

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter);

        runUiThreadTasks();
        assertEquals(0, counter.get());
    }

    @Test
    public void workerDisposeCancelsScheduled() {
        Worker worker = scheduler.createWorker();

        CountingRunnable counter = new CountingRunnable();
        worker.schedule(counter, 1, MINUTES);

        worker.dispose();

        runUiThreadTasks();
        assertEquals(0, counter.get());
    }

    @Test
    public void workerUnsubscriptionDoesNotAffectOtherWorkers() {
        Worker workerA = scheduler.createWorker();
        CountingRunnable counterA = new CountingRunnable();
        workerA.schedule(counterA, 1, MINUTES);

        Worker workerB = scheduler.createWorker();
        CountingRunnable counterB = new CountingRunnable();
        workerB.schedule(counterB, 1, MINUTES);

        workerA.dispose();

        runUiThreadTasksIncludingDelayedTasks();
        assertEquals(0, counterA.get());
        assertEquals(1, counterB.get());
    }

    @Test
    public void workerTracksDisposedState() {
        Worker worker = scheduler.createWorker();
        assertFalse(worker.isDisposed());

        worker.dispose();
        assertTrue(worker.isDisposed());
    }

    @Test
    public void disposedWorkerReturnsDisposedDisposables() {
        Worker worker = scheduler.createWorker();
        worker.dispose();

        Disposable disposable = worker.schedule(new CountingRunnable());
        assertTrue(disposable.isDisposed());
    }

    @Test
    public void throwingActionRoutedToRxJavaPlugins() {
        Consumer<? super Throwable> originalErrorHandler = RxJavaPlugins.getErrorHandler();

        try {
            final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
            RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    throwableRef.set(throwable);
                }
            });

            Worker worker = scheduler.createWorker();

            final NullPointerException npe = new NullPointerException();
            Runnable action = new Runnable() {
                @Override
                public void run() {
                    throw npe;
                }
            };
            worker.schedule(action);

            runUiThreadTasks();
            assertSame(npe, throwableRef.get());
        } finally {
            RxJavaPlugins.setErrorHandler(originalErrorHandler);
        }
    }

    @Test
    public void directScheduleOnceInputValidation() {
        try {
            scheduler.scheduleDirect(null);
            fail();
        } catch (NullPointerException e) {
            assertEquals("run == null", e.getMessage());
        }
        try {
            scheduler.scheduleDirect(null, 1, MINUTES);
            fail();
        } catch (NullPointerException e) {
            assertEquals("run == null", e.getMessage());
        }
        try {
            scheduler.scheduleDirect(new CountingRunnable(), 1, null);
            fail();
        } catch (NullPointerException e) {
            assertEquals("unit == null", e.getMessage());
        }
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void directSchedulePeriodicallyInputValidation() {
        try {
            scheduler.schedulePeriodicallyDirect(null, 1, 1, MINUTES);
            fail();
        } catch (NullPointerException e) {
            assertEquals("run == null", e.getMessage());
        }
        try {
            scheduler.schedulePeriodicallyDirect(new CountingRunnable(), 1, -1, MINUTES);
            fail();
        } catch (IllegalArgumentException e) {
            assertEquals("period < 0: -1", e.getMessage());
        }
        try {
            scheduler.schedulePeriodicallyDirect(new CountingRunnable(), 1, 1, null);
            fail();
        } catch (NullPointerException e) {
            assertEquals("unit == null", e.getMessage());
        }
    }

    @Test
    public void workerScheduleOnceInputValidation() {
        Worker worker = scheduler.createWorker();
        try {
            worker.schedule(null);
            fail();
        } catch (NullPointerException e) {
            assertEquals("run == null", e.getMessage());
        }
        try {
            worker.schedule(null, 1, MINUTES);
            fail();
        } catch (NullPointerException e) {
            assertEquals("run == null", e.getMessage());
        }
        try {
            worker.schedule(new CountingRunnable(), 1, null);
            fail();
        } catch (NullPointerException e) {
            assertEquals("unit == null", e.getMessage());
        }
    }

    @Test @Ignore("Implementation delegated to default RxJava implementation")
    public void workerSchedulePeriodicallyInputValidation() {
        Worker worker = scheduler.createWorker();
        try {
            worker.schedulePeriodically(null, 1, 1, MINUTES);
            fail();
        } catch (NullPointerException e) {
            assertEquals("run == null", e.getMessage());
        }
        try {
            worker.schedulePeriodically(new CountingRunnable(), 1, -1, MINUTES);
            fail();
        } catch (IllegalArgumentException e) {
            assertEquals("period < 0: -1", e.getMessage());
        }
        try {
            worker.schedulePeriodically(new CountingRunnable(), 1, 1, null);
            fail();
        } catch (NullPointerException e) {
            assertEquals("unit == null", e.getMessage());
        }
    }

    @Test
    public void directScheduleSetAsync() {
        ShadowMessageQueue mainMessageQueue = shadowOf(Looper.getMainLooper().getQueue());

        scheduler.scheduleDirect(new Runnable() {
            @Override public void run() {
            }
        });

        Message message = mainMessageQueue.getHead();
        assertEquals(async, message.isAsynchronous());
    }

    @Test
    public void workerScheduleSetAsync() {
        ShadowMessageQueue mainMessageQueue = shadowOf(Looper.getMainLooper().getQueue());

        Worker worker = scheduler.createWorker();
        worker.schedule(new Runnable() {
            @Override public void run() {
            }
        });

        Message message = mainMessageQueue.getHead();
        assertEquals(async, message.isAsynchronous());
    }

    @Test
    public void workerSchedulePeriodicallySetAsync() {
        ShadowMessageQueue mainMessageQueue = shadowOf(Looper.getMainLooper().getQueue());

        Worker worker = scheduler.createWorker();
        worker.schedulePeriodically(new Runnable() {
            @Override public void run() {
            }
        }, 1, 1, MINUTES);

        Message message = mainMessageQueue.getHead();
        assertEquals(async, message.isAsynchronous());
    }
}