public class SchedulerImpl implements Scheduler
{
public void schedule( Task t, long delayMs )
{
try
{
// sleep for delayMs, and then execute the task
Thread.sleep( delayMs );
t.run();
}
catch ( InterruptedException e )
{
// ignore
}
}
public static void main( String[] args )
{
Scheduler scheduler = new SchedulerImpl();
Task t1 = new TaskImpl( 1 );
Task t2 = new TaskImpl( 2 );
// main thread in timedwaiting state for 10000 ms
scheduler.schedule( t1, 10000 );
scheduler.schedule( t2, 1 );
}
}
public class SchedulerImpl implements Scheduler
{
public void schedule( Task t, long delayMs )
{
Thread t = new Thread( new Runnable() {
public void run()
{
try
{
Thread.sleep( delayMs );
t.run();
}
catch ( InterruptedException e )
{
// ignore;
}
}
} );
t.start();
}
}
package designThreadSafeEntity.delayedTaskScheduler;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Scheduler
{
// order task by time to run
private PriorityQueue<Task> tasks;
//
private final Thread taskRunnerThread;
// State indicating the scheduler is running
// Why volatile? As long as main thread stops, runner needs to has visibility.
private volatile boolean running;
// Task id to assign to submitted tasks
// AtomicInteger: Threadsafe. Do not need to add locks when assigning task Ids
// Final: Reference of atomicInteger could not be changed
private final AtomicInteger taskId;
public Scheduler()
{
tasks = new PriorityQueue<>();
taskRunnerThread = new Thread( new TaskRunner() );
running = true;
taskId = new AtomicInteger( 0 );
// start task runner thread
taskRunnerThread.start();
}
public void schedule( Task task, long delayMs )
{
// Set time to run and assign task id
long timeToRun = System.currentTimeMillis() + delayMs;
task.setTimeToRun( timeToRun );
task.setId( taskId.incrementAndGet() );
// Put the task in queue
synchronized ( this )
{
tasks.offer( task );
this.notify(); // only a single background thread waiting
}
}
public void stop( ) throws InterruptedException
{
// Notify the task runner as it may be in wait()
synchronized ( this )
{
running = false;
this.notify();
}
// Wait for the task runner to terminate
taskRunnerThread.join();
}
private class TaskRunner implements Runnable
{
@Override
public void run()
{
while ( running )
{
// Need to synchronize with main thread
synchronized( Scheduler.this )
{
try
{
// task runner is blocked when no tasks in queue
while ( running && tasks.isEmpty() )
{
Scheduler.this.wait();
}
// check the first task in queue
long now = System.currentTimeMillis();
Task t = tasks.peek();
// delay exhausted, execute task
if ( t.getTimeToRun() < now )
{
tasks.poll();
t.run();
}
else
{
// no task executable, wait
Scheduler.this.wait( t.getTimeToRun() - now );
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
}
public static void main( String[] args ) throws InterruptedException
{
Scheduler scheduler = new Scheduler();
scheduler.schedule( new Task(), 1000000 );
scheduler.schedule( new Task(), 1000 );
Thread.sleep( 7000 );
scheduler.stop();
}
}
class Task implements Comparable<Task>
{
// When the task will be run
private long timeToRun;
private int id;
public void setId( int id )
{
this.id = id;
}
public void setTimeToRun( long timeToRun )
{
this.timeToRun = timeToRun;
}
public void run()
{
System.out.println( "Running task " + id );
}
public int compareTo( Task other )
{
return (int) ( timeToRun - other.getTimeToRun() );
}
public long getTimeToRun()
{
return timeToRun;
}
}
package designThreadSafeEntity.delayedTaskScheduler;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Scheduler
{
private PriorityQueue<Task> tasks;
private final Thread taskRunnerThread;
private volatile boolean running;
private final AtomicInteger taskId;
public Scheduler()
{
tasks = new PriorityQueue<>();
taskRunnerThread = new Thread( new TaskRunner() );
running = true;
taskId = new AtomicInteger( 0 );
taskRunnerThread.start();
}
public void schedule( Task task, long delayMs )
{
long timeToRun = System.currentTimeMillis() + delayMs;
task.setTimeToRun( timeToRun );
task.setId( taskId.incrementAndGet() );
synchronized ( this )
{
tasks.offer( task );
this.notify();
}
}
public void stop( ) throws InterruptedException
{
synchronized ( this )
{
running = false;
this.notify();
}
taskRunnerThread.join();
}
private class TaskRunner implements Runnable
{
@Override
public void run()
{
while ( running )
{
synchronized( Scheduler.this )
{
try
{
while ( running && tasks.isEmpty() )
{
Scheduler.this.wait();
}
long now = System.currentTimeMillis();
Task t = tasks.peek();
if ( t.getTimeToRun() < now )
{
tasks.poll();
t.run();
}
else
{
Scheduler.this.wait( t.getTimeToRun() - now );
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
}
public static void main( String[] args ) throws InterruptedException
{
Scheduler scheduler = new Scheduler();
scheduler.schedule( new Task(), 1000000 );
scheduler.schedule( new Task(), 1000 );
Thread.sleep( 7000 );
scheduler.stop();
}
}
class Task implements Comparable<Task>
{
private long timeToRun;
private int id;
public void setId( int id )
{
this.id = id;
}
public void setTimeToRun( long timeToRun )
{
this.timeToRun = timeToRun;
}
public void run()
{
System.out.println( "Running task " + id );
}
public int compareTo( Task other )
{
return (int) ( timeToRun - other.getTimeToRun() );
}
public long getTimeToRun()
{
return timeToRun;
}
}