DelayedQueue

Interfaces to be implemented

public interface Scheduler
{
    void schedule( Task t, long delayMs );
}

public interface Task
{
    void run();
}

Single thread

  • Main thread is in Timedwaiting state for delayMs for each call of schedule()

  • Only one thread, very low CPU utilization

  • Also, this is not working as later call

  • How about sleeping in other threads

public class SchedulerImpl implements Scheduler
{
    public void schedule( Task t, long delayMs )
    {
        try
        {
            // sleep for delayMs, and then execute the task
            Thread.sleep( delayMs );
            t.run();
        }
        catch ( InterruptedException e )
        {
            // ignore
        }
    }

    public static void main( String[] args )
    {
        Scheduler scheduler = new SchedulerImpl();
        Task t1 = new TaskImpl( 1 );
        Task t2 = new TaskImpl( 2 );

        // main thread in timedwaiting state for 10000 ms
        scheduler.schedule( t1, 10000 );
        scheduler.schedule( t2, 1 );
    }
}

One thread for each task

  • No blocking when calling schedule

  • What happens if we call schedule many times

    • A lot of thread creation overhead

  • Call be alleviated by using a thread pool, but still not ideal

public class SchedulerImpl implements Scheduler
{
    public void schedule( Task t, long delayMs )
    {
        Thread t = new Thread( new Runnable() {
            public void run()
            {
                try 
                {
                    Thread.sleep( delayMs );
                    t.run();
                }
                catch ( InterruptedException e )
                {
                    // ignore;
                }
            }
        } );
        t.start();
    }
}

PriorityQueue + A background thread

package designThreadSafeEntity.delayedTaskScheduler;

import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Scheduler
{
    // order task by time to run
    private PriorityQueue<Task> tasks;

    // 
    private final Thread taskRunnerThread;

    // State indicating the scheduler is running
    // Why volatile? As long as main thread stops, runner needs to has visibility.
    private volatile boolean running;

    // Task id to assign to submitted tasks
    // AtomicInteger: Threadsafe. Do not need to add locks when assigning task Ids
    // Final: Reference of atomicInteger could not be changed
    private final AtomicInteger taskId;

    public Scheduler()
    {
        tasks = new PriorityQueue<>();
        taskRunnerThread = new Thread( new TaskRunner() );
        running = true;
        taskId = new AtomicInteger( 0 );

        // start task runner thread
        taskRunnerThread.start();
    }

    public void schedule( Task task, long delayMs )
    {
        // Set time to run and assign task id
        long timeToRun = System.currentTimeMillis() + delayMs;
        task.setTimeToRun( timeToRun );
        task.setId( taskId.incrementAndGet() );

        // Put the task in queue
        synchronized ( this )
        {
            tasks.offer( task );
            this.notify(); // only a single background thread waiting
        }
    }

    public void stop( ) throws InterruptedException
    {
        // Notify the task runner as it may be in wait()
        synchronized ( this )
        {
            running = false;
            this.notify();
        }

        // Wait for the task runner to terminate
        taskRunnerThread.join();
    }

    private class TaskRunner implements Runnable
    {
        @Override
        public void run()
        {
            while ( running )
            {
                // Need to synchronize with main thread
                synchronized( Scheduler.this )
                {
                    try 
                    {
                        // task runner is blocked when no tasks in queue
                        while ( running && tasks.isEmpty() )
                        {
                            Scheduler.this.wait();
                        }

                        // check the first task in queue
                        long now = System.currentTimeMillis();
                        Task t = tasks.peek();

                        // delay exhausted, execute task
                        if ( t.getTimeToRun() < now )
                        {
                            tasks.poll();
                            t.run();
                        }            
                        else
                        {
                            // no task executable, wait
                            Scheduler.this.wait( t.getTimeToRun() - now );
                        }
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();    
                    }
                }
            }
        }
    }

    public static void main( String[] args ) throws InterruptedException
    {
        Scheduler scheduler = new Scheduler();
        scheduler.schedule( new Task(), 1000000 );
        scheduler.schedule( new Task(), 1000 );
        Thread.sleep( 7000 );
        scheduler.stop();
    }
}

class Task implements Comparable<Task> 
{
    // When the task will be run
    private long timeToRun;
    private int id;

    public void setId( int id )
    {
        this.id = id;
    }

    public void setTimeToRun( long timeToRun )
    {
        this.timeToRun = timeToRun;
    }

    public void run()
    {
        System.out.println( "Running task " + id );
    }

    public int compareTo( Task other )
    {
        return (int) ( timeToRun - other.getTimeToRun() );
    }

    public long getTimeToRun()
    {
        return timeToRun;
    }
}
package designThreadSafeEntity.delayedTaskScheduler;

import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Scheduler
{
    private PriorityQueue<Task> tasks;
    private final Thread taskRunnerThread;
    private volatile boolean running;
    private final AtomicInteger taskId;

    public Scheduler()
    {
        tasks = new PriorityQueue<>();
        taskRunnerThread = new Thread( new TaskRunner() );
        running = true;
        taskId = new AtomicInteger( 0 );
        taskRunnerThread.start();
    }

    public void schedule( Task task, long delayMs )
    {
        long timeToRun = System.currentTimeMillis() + delayMs;
        task.setTimeToRun( timeToRun );
        task.setId( taskId.incrementAndGet() );
        synchronized ( this )
        {
            tasks.offer( task );
            this.notify();
        }
    }

    public void stop( ) throws InterruptedException
    {
        synchronized ( this )
        {
            running = false;
            this.notify();
        }
        taskRunnerThread.join();
    }

    private class TaskRunner implements Runnable
    {
        @Override
        public void run()
        {
            while ( running )
            {
                synchronized( Scheduler.this )
                {
                    try 
                    {
                        while ( running && tasks.isEmpty() )
                        {
                            Scheduler.this.wait();
                        }
                        long now = System.currentTimeMillis();
                        Task t = tasks.peek();
                        if ( t.getTimeToRun() < now )
                        {
                            tasks.poll();
                            t.run();
                        }            
                        else
                        {
                            Scheduler.this.wait( t.getTimeToRun() - now );
                        }
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();    
                    }
                }
            }
        }
    }

    public static void main( String[] args ) throws InterruptedException
    {
        Scheduler scheduler = new Scheduler();
        scheduler.schedule( new Task(), 1000000 );
        scheduler.schedule( new Task(), 1000 );
        Thread.sleep( 7000 );
        scheduler.stop();
    }
}

class Task implements Comparable<Task> 
{
    private long timeToRun;
    private int id;

    public void setId( int id )
    {
        this.id = id;
    }

    public void setTimeToRun( long timeToRun )
    {
        this.timeToRun = timeToRun;
    }

    public void run()
    {
        System.out.println( "Running task " + id );
    }

    public int compareTo( Task other )
    {
        return (int) ( timeToRun - other.getTimeToRun() );
    }

    public long getTimeToRun()
    {
        return timeToRun;
    }
}

Last updated