DelayedQueue
Interfaces to be implemented
public interface Scheduler
{
void schedule( Task t, long delayMs );
}
public interface Task
{
void run();
}
Single thread
Main thread is in Timedwaiting state for delayMs for each call of schedule()
Only one thread, very low CPU utilization
Also, this is not working as later call
How about sleeping in other threads
public class SchedulerImpl implements Scheduler
{
public void schedule( Task t, long delayMs )
{
try
{
// sleep for delayMs, and then execute the task
Thread.sleep( delayMs );
t.run();
}
catch ( InterruptedException e )
{
// ignore
}
}
public static void main( String[] args )
{
Scheduler scheduler = new SchedulerImpl();
Task t1 = new TaskImpl( 1 );
Task t2 = new TaskImpl( 2 );
// main thread in timedwaiting state for 10000 ms
scheduler.schedule( t1, 10000 );
scheduler.schedule( t2, 1 );
}
}
One thread for each task
No blocking when calling schedule
What happens if we call schedule many times
A lot of thread creation overhead
Call be alleviated by using a thread pool, but still not ideal
public class SchedulerImpl implements Scheduler
{
public void schedule( Task t, long delayMs )
{
Thread t = new Thread( new Runnable() {
public void run()
{
try
{
Thread.sleep( delayMs );
t.run();
}
catch ( InterruptedException e )
{
// ignore;
}
}
} );
t.start();
}
}
PriorityQueue + A background thread
package designThreadSafeEntity.delayedTaskScheduler;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Scheduler
{
// order task by time to run
private PriorityQueue<Task> tasks;
//
private final Thread taskRunnerThread;
// State indicating the scheduler is running
// Why volatile? As long as main thread stops, runner needs to has visibility.
private volatile boolean running;
// Task id to assign to submitted tasks
// AtomicInteger: Threadsafe. Do not need to add locks when assigning task Ids
// Final: Reference of atomicInteger could not be changed
private final AtomicInteger taskId;
public Scheduler()
{
tasks = new PriorityQueue<>();
taskRunnerThread = new Thread( new TaskRunner() );
running = true;
taskId = new AtomicInteger( 0 );
// start task runner thread
taskRunnerThread.start();
}
public void schedule( Task task, long delayMs )
{
// Set time to run and assign task id
long timeToRun = System.currentTimeMillis() + delayMs;
task.setTimeToRun( timeToRun );
task.setId( taskId.incrementAndGet() );
// Put the task in queue
synchronized ( this )
{
tasks.offer( task );
this.notify(); // only a single background thread waiting
}
}
public void stop( ) throws InterruptedException
{
// Notify the task runner as it may be in wait()
synchronized ( this )
{
running = false;
this.notify();
}
// Wait for the task runner to terminate
taskRunnerThread.join();
}
private class TaskRunner implements Runnable
{
@Override
public void run()
{
while ( running )
{
// Need to synchronize with main thread
synchronized( Scheduler.this )
{
try
{
// task runner is blocked when no tasks in queue
while ( running && tasks.isEmpty() )
{
Scheduler.this.wait();
}
// check the first task in queue
long now = System.currentTimeMillis();
Task t = tasks.peek();
// delay exhausted, execute task
if ( t.getTimeToRun() < now )
{
tasks.poll();
t.run();
}
else
{
// no task executable, wait
Scheduler.this.wait( t.getTimeToRun() - now );
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
}
public static void main( String[] args ) throws InterruptedException
{
Scheduler scheduler = new Scheduler();
scheduler.schedule( new Task(), 1000000 );
scheduler.schedule( new Task(), 1000 );
Thread.sleep( 7000 );
scheduler.stop();
}
}
class Task implements Comparable<Task>
{
// When the task will be run
private long timeToRun;
private int id;
public void setId( int id )
{
this.id = id;
}
public void setTimeToRun( long timeToRun )
{
this.timeToRun = timeToRun;
}
public void run()
{
System.out.println( "Running task " + id );
}
public int compareTo( Task other )
{
return (int) ( timeToRun - other.getTimeToRun() );
}
public long getTimeToRun()
{
return timeToRun;
}
}
package designThreadSafeEntity.delayedTaskScheduler;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Scheduler
{
private PriorityQueue<Task> tasks;
private final Thread taskRunnerThread;
private volatile boolean running;
private final AtomicInteger taskId;
public Scheduler()
{
tasks = new PriorityQueue<>();
taskRunnerThread = new Thread( new TaskRunner() );
running = true;
taskId = new AtomicInteger( 0 );
taskRunnerThread.start();
}
public void schedule( Task task, long delayMs )
{
long timeToRun = System.currentTimeMillis() + delayMs;
task.setTimeToRun( timeToRun );
task.setId( taskId.incrementAndGet() );
synchronized ( this )
{
tasks.offer( task );
this.notify();
}
}
public void stop( ) throws InterruptedException
{
synchronized ( this )
{
running = false;
this.notify();
}
taskRunnerThread.join();
}
private class TaskRunner implements Runnable
{
@Override
public void run()
{
while ( running )
{
synchronized( Scheduler.this )
{
try
{
while ( running && tasks.isEmpty() )
{
Scheduler.this.wait();
}
long now = System.currentTimeMillis();
Task t = tasks.peek();
if ( t.getTimeToRun() < now )
{
tasks.poll();
t.run();
}
else
{
Scheduler.this.wait( t.getTimeToRun() - now );
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
}
public static void main( String[] args ) throws InterruptedException
{
Scheduler scheduler = new Scheduler();
scheduler.schedule( new Task(), 1000000 );
scheduler.schedule( new Task(), 1000 );
Thread.sleep( 7000 );
scheduler.stop();
}
}
class Task implements Comparable<Task>
{
private long timeToRun;
private int id;
public void setId( int id )
{
this.id = id;
}
public void setTimeToRun( long timeToRun )
{
this.timeToRun = timeToRun;
}
public void run()
{
System.out.println( "Running task " + id );
}
public int compareTo( Task other )
{
return (int) ( timeToRun - other.getTimeToRun() );
}
public long getTimeToRun()
{
return timeToRun;
}
}
Last updated