🐝
Mess around software system design
  • README
  • ArchitectureTradeOffAnalysis
    • Estimation
    • Middleware
    • Network
    • Server
    • Storage
  • Conversion cheat sheet
  • Scenarios
    • TinyURL
      • Estimation
      • Flowchart
      • Shortening mechanisms
      • Rest API
      • Performance
      • Storage
      • Follow-up
    • TaskScheduler
      • JDK delay queue
      • Timer based
      • RabbitMQ based
      • Kafka-based fixed delay time
      • Redis-based customized delay time
      • MySQL-based customized delay time
      • Timer TimingWheel
      • Industrial Scheduler
      • Workflow Engine
      • Airflow Arch
    • GoogleDrive
      • Estimation
      • Flowchart
      • Storage
      • Follow-up
    • Youtube
      • Estimation
      • Flowchart
      • Performance
      • Storage
      • Follow-up
      • Netflix
    • Uber
      • Estimation
      • Rest api
      • Flowchart
      • KNN algorithms
      • Geohash-based KNN mechanism
      • Redis implementation
      • Storage
    • Twitter
      • Estimation
      • Flowchart
      • Storage
      • Scalability
      • Follow-up
    • Instant messenger
      • Architecture overview
      • Presence
      • Unread count
      • Notifications
      • Read receipt
      • Large group chat
      • Storage-Offline 1:1 Chat
      • Storage-Offline group chat
      • Storage-Message roaming
      • NonFunc-Realtime
      • NonFunc-Reliability
      • NonFunc-Ordering
      • NonFunc-Security
      • Livecast-LinkedIn
    • Distributed Lock
      • Single machine
      • AP model based
      • CP model based
      • Chubby-TODO
    • Payment system
      • Resilience
      • Consistency
      • Flash sale
    • Key value store
      • Master-slave KV
      • Peer-to-peer KV
      • Distributed cache
  • Time series scenarios
    • Observability
      • TimeSeries data
      • Distributed traces
      • Logs
      • Metrics
      • NonFunc requirments
  • Search engine
    • Typeahead
    • Search engine
    • Distributed crawler
      • Estimation
      • Flowchart
      • Efficiency
      • Robustness
      • Performance
      • Storage
      • Standalone implementation
      • Python Scrapy framework
    • Stream search
  • Big data
    • GFS/HDFS
      • Data flow
      • High availability
      • Consistency
    • Map reduce
    • Big table/Hbase
    • Haystack
    • TopK
    • Stateful stream
    • Lambda architecture
    • storm架构
    • Beam架构
    • Comparing stream frameworks
    • Instagram-[TODO]
  • MicroSvcs
    • Service Registry
      • Flowchart
      • Data model
      • High availability
      • Comparison
      • Implementation
    • Service governance
      • Load balancing
      • Circuit breaker
      • Bulkhead
      • Downgrade
      • Timeout
      • API gateway
      • RateLimiter
        • Config
        • Algorithm comparison
        • Sliding window
        • Industrial impl
    • MicroSvcs_ConfigCenter-[TODO]
    • MicroSvcs_Security
      • Authentication
      • Authorization
      • Privacy
  • Cache
    • Typical topics
      • Expiration algorithm
      • Access patterns
      • Cache penetration
      • Big key
      • Hot key
      • Distributed lock
      • Data consistency
      • High availability
    • Cache_Redis
      • Data structure
      • ACID
      • Performance
      • Availability
      • Cluster
      • Applications
    • Cache_Memcached
  • Message queue
    • Overview
    • Kafka
      • Ordering
      • At least once
      • Message backlog
      • Consumer idempotency
      • High performance
      • Internal leader election
    • MySQL-based msg queue
    • Other msg queues
      • ActiveMQ-TODO
      • RabbitMQ-TODO
      • RocketMQ-TODO
      • Comparison between MQ
  • Traditional DB
    • Index data structure
    • Index categories
    • Lock
    • MVCC
    • Redo & Undo logs
    • Binlog
    • Schema design
    • DB optimization
    • Distributed transactions
    • High availability
    • Scalability
    • DB migration
    • Partition
    • Sharding
      • Sharding strategies
      • Sharding ID generator overview
        • Auto-increment key
        • UUID
        • Snowflake
        • Implement example
      • Cross-shard pagination queries
      • Non-shard key queries
      • Capacity planning
  • Non-Traditional DB
    • NoSQL overview
    • Rum guess
    • Data structure
    • MySQL based key value
    • KeyValueStore
    • ObjectStore
    • ElasticSearch
    • TableStore-[TODO]
    • Time series DB
    • DistributedAcidDatabase-[TODO]
  • Java basics
    • IO
    • Exception handling
  • Java concurrency
    • Overview
      • Synchronized
      • Reentrant lock
      • Concurrent collections
      • CAS
      • Others
    • Codes
      • ThreadLocal
      • ThreadPool
      • ThreadLifeCycle
      • SingletonPattern
      • Future
      • BlockingQueue
      • Counter
      • ConcurrentHashmap
      • DelayedQueue
  • Java JVM
    • Overview
    • Dynamic proxy
    • Class loading
    • Garbage collection
    • Visibility
  • Server
    • Nginx-[TODO]
  • Distributed system theories
    • Elementary school with CAP
    • Consistency
      • Eventual with Gossip
      • Strong with Raft
      • Tunable with Quorum
      • Fault tolerant with BFT-TODO
      • AutoMerge with CRDT
    • Time in distributed system
      • Logical time
      • Physical time
    • DDIA_Studying-[TODO]
  • Protocols
    • ApiDesign
      • REST
      • RPC
    • Websockets
    • Serialization
      • Thrift
      • Avro
    • HTTP
    • HTTPS
    • Netty-TODO
  • Statistical data structure
    • BloomFilter
    • HyperLoglog
    • CountMinSketch
  • DevOps
    • Container_Docker
    • Container_Kubernetes-[TODO]
  • Network components
    • CDN
    • DNS
    • Load balancer
    • Reverse proxy
    • 云中网络-TODO
  • Templates
    • interviewRecord
  • TODO
    • RecommendationSystem-[TODO]
    • SessionServer-[TODO]
    • Disk
    • Unix philosophy and Kafka
    • Bitcoin
    • Design pattern
      • StateMachine
      • Factory
    • Akka
    • GoogleDoc
      • CRDT
Powered by GitBook
On this page
  • Interfaces to be implemented
  • Single thread
  • One thread for each task
  • PriorityQueue + A background thread

Was this helpful?

  1. Java concurrency
  2. Codes

DelayedQueue

PreviousConcurrentHashmapNextOverview

Last updated 3 years ago

Was this helpful?

Interfaces to be implemented

public interface Scheduler
{
    void schedule( Task t, long delayMs );
}

public interface Task
{
    void run();
}

Single thread

  • Main thread is in Timedwaiting state for delayMs for each call of schedule()

  • Only one thread, very low CPU utilization

  • Also, this is not working as later call

  • How about sleeping in other threads

public class SchedulerImpl implements Scheduler
{
    public void schedule( Task t, long delayMs )
    {
        try
        {
            // sleep for delayMs, and then execute the task
            Thread.sleep( delayMs );
            t.run();
        }
        catch ( InterruptedException e )
        {
            // ignore
        }
    }

    public static void main( String[] args )
    {
        Scheduler scheduler = new SchedulerImpl();
        Task t1 = new TaskImpl( 1 );
        Task t2 = new TaskImpl( 2 );

        // main thread in timedwaiting state for 10000 ms
        scheduler.schedule( t1, 10000 );
        scheduler.schedule( t2, 1 );
    }
}

One thread for each task

  • No blocking when calling schedule

  • What happens if we call schedule many times

    • A lot of thread creation overhead

  • Call be alleviated by using a thread pool, but still not ideal

public class SchedulerImpl implements Scheduler
{
    public void schedule( Task t, long delayMs )
    {
        Thread t = new Thread( new Runnable() {
            public void run()
            {
                try 
                {
                    Thread.sleep( delayMs );
                    t.run();
                }
                catch ( InterruptedException e )
                {
                    // ignore;
                }
            }
        } );
        t.start();
    }
}

PriorityQueue + A background thread

package designThreadSafeEntity.delayedTaskScheduler;

import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Scheduler
{
    // order task by time to run
    private PriorityQueue<Task> tasks;

    // 
    private final Thread taskRunnerThread;

    // State indicating the scheduler is running
    // Why volatile? As long as main thread stops, runner needs to has visibility.
    private volatile boolean running;

    // Task id to assign to submitted tasks
    // AtomicInteger: Threadsafe. Do not need to add locks when assigning task Ids
    // Final: Reference of atomicInteger could not be changed
    private final AtomicInteger taskId;

    public Scheduler()
    {
        tasks = new PriorityQueue<>();
        taskRunnerThread = new Thread( new TaskRunner() );
        running = true;
        taskId = new AtomicInteger( 0 );

        // start task runner thread
        taskRunnerThread.start();
    }

    public void schedule( Task task, long delayMs )
    {
        // Set time to run and assign task id
        long timeToRun = System.currentTimeMillis() + delayMs;
        task.setTimeToRun( timeToRun );
        task.setId( taskId.incrementAndGet() );

        // Put the task in queue
        synchronized ( this )
        {
            tasks.offer( task );
            this.notify(); // only a single background thread waiting
        }
    }

    public void stop( ) throws InterruptedException
    {
        // Notify the task runner as it may be in wait()
        synchronized ( this )
        {
            running = false;
            this.notify();
        }

        // Wait for the task runner to terminate
        taskRunnerThread.join();
    }

    private class TaskRunner implements Runnable
    {
        @Override
        public void run()
        {
            while ( running )
            {
                // Need to synchronize with main thread
                synchronized( Scheduler.this )
                {
                    try 
                    {
                        // task runner is blocked when no tasks in queue
                        while ( running && tasks.isEmpty() )
                        {
                            Scheduler.this.wait();
                        }

                        // check the first task in queue
                        long now = System.currentTimeMillis();
                        Task t = tasks.peek();

                        // delay exhausted, execute task
                        if ( t.getTimeToRun() < now )
                        {
                            tasks.poll();
                            t.run();
                        }            
                        else
                        {
                            // no task executable, wait
                            Scheduler.this.wait( t.getTimeToRun() - now );
                        }
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();    
                    }
                }
            }
        }
    }

    public static void main( String[] args ) throws InterruptedException
    {
        Scheduler scheduler = new Scheduler();
        scheduler.schedule( new Task(), 1000000 );
        scheduler.schedule( new Task(), 1000 );
        Thread.sleep( 7000 );
        scheduler.stop();
    }
}

class Task implements Comparable<Task> 
{
    // When the task will be run
    private long timeToRun;
    private int id;

    public void setId( int id )
    {
        this.id = id;
    }

    public void setTimeToRun( long timeToRun )
    {
        this.timeToRun = timeToRun;
    }

    public void run()
    {
        System.out.println( "Running task " + id );
    }

    public int compareTo( Task other )
    {
        return (int) ( timeToRun - other.getTimeToRun() );
    }

    public long getTimeToRun()
    {
        return timeToRun;
    }
}
package designThreadSafeEntity.delayedTaskScheduler;

import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Scheduler
{
    private PriorityQueue<Task> tasks;
    private final Thread taskRunnerThread;
    private volatile boolean running;
    private final AtomicInteger taskId;

    public Scheduler()
    {
        tasks = new PriorityQueue<>();
        taskRunnerThread = new Thread( new TaskRunner() );
        running = true;
        taskId = new AtomicInteger( 0 );
        taskRunnerThread.start();
    }

    public void schedule( Task task, long delayMs )
    {
        long timeToRun = System.currentTimeMillis() + delayMs;
        task.setTimeToRun( timeToRun );
        task.setId( taskId.incrementAndGet() );
        synchronized ( this )
        {
            tasks.offer( task );
            this.notify();
        }
    }

    public void stop( ) throws InterruptedException
    {
        synchronized ( this )
        {
            running = false;
            this.notify();
        }
        taskRunnerThread.join();
    }

    private class TaskRunner implements Runnable
    {
        @Override
        public void run()
        {
            while ( running )
            {
                synchronized( Scheduler.this )
                {
                    try 
                    {
                        while ( running && tasks.isEmpty() )
                        {
                            Scheduler.this.wait();
                        }
                        long now = System.currentTimeMillis();
                        Task t = tasks.peek();
                        if ( t.getTimeToRun() < now )
                        {
                            tasks.poll();
                            t.run();
                        }            
                        else
                        {
                            Scheduler.this.wait( t.getTimeToRun() - now );
                        }
                    }
                    catch ( InterruptedException e )
                    {
                        Thread.currentThread().interrupt();    
                    }
                }
            }
        }
    }

    public static void main( String[] args ) throws InterruptedException
    {
        Scheduler scheduler = new Scheduler();
        scheduler.schedule( new Task(), 1000000 );
        scheduler.schedule( new Task(), 1000 );
        Thread.sleep( 7000 );
        scheduler.stop();
    }
}

class Task implements Comparable<Task> 
{
    private long timeToRun;
    private int id;

    public void setId( int id )
    {
        this.id = id;
    }

    public void setTimeToRun( long timeToRun )
    {
        this.timeToRun = timeToRun;
    }

    public void run()
    {
        System.out.println( "Running task " + id );
    }

    public int compareTo( Task other )
    {
        return (int) ( timeToRun - other.getTimeToRun() );
    }

    public long getTimeToRun()
    {
        return timeToRun;
    }
}
Interfaces to be implemented
Single thread
One thread for each task
PriorityQueue + A background thread