🐝
Mess around software system design
  • README
  • ArchitectureTradeOffAnalysis
    • Estimation
    • Middleware
    • Network
    • Server
    • Storage
  • Conversion cheat sheet
  • Scenarios
    • TinyURL
      • Estimation
      • Flowchart
      • Shortening mechanisms
      • Rest API
      • Performance
      • Storage
      • Follow-up
    • TaskScheduler
      • JDK delay queue
      • Timer based
      • RabbitMQ based
      • Kafka-based fixed delay time
      • Redis-based customized delay time
      • MySQL-based customized delay time
      • Timer TimingWheel
      • Industrial Scheduler
      • Workflow Engine
      • Airflow Arch
    • GoogleDrive
      • Estimation
      • Flowchart
      • Storage
      • Follow-up
    • Youtube
      • Estimation
      • Flowchart
      • Performance
      • Storage
      • Follow-up
      • Netflix
    • Uber
      • Estimation
      • Rest api
      • Flowchart
      • KNN algorithms
      • Geohash-based KNN mechanism
      • Redis implementation
      • Storage
    • Twitter
      • Estimation
      • Flowchart
      • Storage
      • Scalability
      • Follow-up
    • Instant messenger
      • Architecture overview
      • Presence
      • Unread count
      • Notifications
      • Read receipt
      • Large group chat
      • Storage-Offline 1:1 Chat
      • Storage-Offline group chat
      • Storage-Message roaming
      • NonFunc-Realtime
      • NonFunc-Reliability
      • NonFunc-Ordering
      • NonFunc-Security
      • Livecast-LinkedIn
    • Distributed Lock
      • Single machine
      • AP model based
      • CP model based
      • Chubby-TODO
    • Payment system
      • Resilience
      • Consistency
      • Flash sale
    • Key value store
      • Master-slave KV
      • Peer-to-peer KV
      • Distributed cache
  • Time series scenarios
    • Observability
      • TimeSeries data
      • Distributed traces
      • Logs
      • Metrics
      • NonFunc requirments
  • Search engine
    • Typeahead
    • Search engine
    • Distributed crawler
      • Estimation
      • Flowchart
      • Efficiency
      • Robustness
      • Performance
      • Storage
      • Standalone implementation
      • Python Scrapy framework
    • Stream search
  • Big data
    • GFS/HDFS
      • Data flow
      • High availability
      • Consistency
    • Map reduce
    • Big table/Hbase
    • Haystack
    • TopK
    • Stateful stream
    • Lambda architecture
    • storm架构
    • Beam架构
    • Comparing stream frameworks
    • Instagram-[TODO]
  • MicroSvcs
    • Service Registry
      • Flowchart
      • Data model
      • High availability
      • Comparison
      • Implementation
    • Service governance
      • Load balancing
      • Circuit breaker
      • Bulkhead
      • Downgrade
      • Timeout
      • API gateway
      • RateLimiter
        • Config
        • Algorithm comparison
        • Sliding window
        • Industrial impl
    • MicroSvcs_ConfigCenter-[TODO]
    • MicroSvcs_Security
      • Authentication
      • Authorization
      • Privacy
  • Cache
    • Typical topics
      • Expiration algorithm
      • Access patterns
      • Cache penetration
      • Big key
      • Hot key
      • Distributed lock
      • Data consistency
      • High availability
    • Cache_Redis
      • Data structure
      • ACID
      • Performance
      • Availability
      • Cluster
      • Applications
    • Cache_Memcached
  • Message queue
    • Overview
    • Kafka
      • Ordering
      • At least once
      • Message backlog
      • Consumer idempotency
      • High performance
      • Internal leader election
    • MySQL-based msg queue
    • Other msg queues
      • ActiveMQ-TODO
      • RabbitMQ-TODO
      • RocketMQ-TODO
      • Comparison between MQ
  • Traditional DB
    • Index data structure
    • Index categories
    • Lock
    • MVCC
    • Redo & Undo logs
    • Binlog
    • Schema design
    • DB optimization
    • Distributed transactions
    • High availability
    • Scalability
    • DB migration
    • Partition
    • Sharding
      • Sharding strategies
      • Sharding ID generator overview
        • Auto-increment key
        • UUID
        • Snowflake
        • Implement example
      • Cross-shard pagination queries
      • Non-shard key queries
      • Capacity planning
  • Non-Traditional DB
    • NoSQL overview
    • Rum guess
    • Data structure
    • MySQL based key value
    • KeyValueStore
    • ObjectStore
    • ElasticSearch
    • TableStore-[TODO]
    • Time series DB
    • DistributedAcidDatabase-[TODO]
  • Java basics
    • IO
    • Exception handling
  • Java concurrency
    • Overview
      • Synchronized
      • Reentrant lock
      • Concurrent collections
      • CAS
      • Others
    • Codes
      • ThreadLocal
      • ThreadPool
      • ThreadLifeCycle
      • SingletonPattern
      • Future
      • BlockingQueue
      • Counter
      • ConcurrentHashmap
      • DelayedQueue
  • Java JVM
    • Overview
    • Dynamic proxy
    • Class loading
    • Garbage collection
    • Visibility
  • Server
    • Nginx-[TODO]
  • Distributed system theories
    • Elementary school with CAP
    • Consistency
      • Eventual with Gossip
      • Strong with Raft
      • Tunable with Quorum
      • Fault tolerant with BFT-TODO
      • AutoMerge with CRDT
    • Time in distributed system
      • Logical time
      • Physical time
    • DDIA_Studying-[TODO]
  • Protocols
    • ApiDesign
      • REST
      • RPC
    • Websockets
    • Serialization
      • Thrift
      • Avro
    • HTTP
    • HTTPS
    • Netty-TODO
  • Statistical data structure
    • BloomFilter
    • HyperLoglog
    • CountMinSketch
  • DevOps
    • Container_Docker
    • Container_Kubernetes-[TODO]
  • Network components
    • CDN
    • DNS
    • Load balancer
    • Reverse proxy
    • 云中网络-TODO
  • Templates
    • interviewRecord
  • TODO
    • RecommendationSystem-[TODO]
    • SessionServer-[TODO]
    • Disk
    • Unix philosophy and Kafka
    • Bitcoin
    • Design pattern
      • StateMachine
      • Factory
    • Akka
    • GoogleDoc
      • CRDT
Powered by GitBook
On this page
  • PriorityQueue
  • DelayQueue implementation in JDK
  • DelayedQueue interface
  • Test with Producer/Consumer pattern
  • Reference
  • Timer mechanism (Signaling)
  • Busy waiting
  • Wait notify
  • Reference

Was this helpful?

  1. Scenarios
  2. TaskScheduler

JDK delay queue

PreviousTaskSchedulerNextTimer based

Last updated 1 year ago

Was this helpful?

PriorityQueue

DelayQueue implementation in JDK

  • Internal structure: DelayQueue is a specialized PriorityQueue that orders elements based on their delay time.

  • Characteristics: When the consumer wants to take an element from the queue, they can take it only when the delay for that particular element has expired.

  • Pros:

    • Not introduce other dependencies

  • Cons:

    • It is only a data structure implementation and all queue elements will be stored within JVM memory. It would require large amounts of efforts to build a scalable delay queue implementation on top of it.

DelayedQueue interface

  • Algorithm: When the consumer tries to take an element from the queue, the DelayQueue will execute getDelay() to find out if that element is allowed to be returned from the queue. If the getDelay() method will return zero or a negative number, it means that it could be retrieved from the queue.

  • Data structure:

public class DelayQueue<E extends Delayed>
                    extends AbstractQueue<E>
                    implements BlockingQueue<E>
// Each element we want to put into the DelayQueue needs to implement the Delayed interface
public class DelayObject implements Delayed {
    private String data;
    private long startTime;

    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }

    // It will return the remaining delay associated with the item in the top of the PriorityQueue in the given time unit. 
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    // The elements in the DelayQueue will be sorted according to the expiration time. The item that will expire first is kept at the head of the queue and the element with the highest expiration time is kept at the tail of the queue:
    @Override
    public int compareTo(Delayed o) {
        return Ints.saturatedCast(
          this.startTime - ((DelayObject) o).startTime);
    }
}

Test with Producer/Consumer pattern

// DelayedQueue is a blocking queue. When delayedQueue.take() method is called, it will only return when there is an item to be returned. 
public class DelayQueueProducer implements Runnable 
{  
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToProduce;
    private Integer delayOfEachProducedMessageMilliseconds;

    // standard constructor

    @Override
    public void run() 
    {
        for (int i = 0; i < numberOfElementsToProduce; i++) 
        {
            DelayObject object
              = new DelayObject(
                UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);
            try 
            {
                queue.put(object);
                Thread.sleep(500);
            } 
            catch (InterruptedException ie) 
            {
                ie.printStackTrace();
            }
        }
    }
}

public class DelayQueueConsumer implements Runnable 
{
    private BlockingQueue<DelayObject> queue;
    private Integer numberOfElementsToTake;
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();

    // standard constructors

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) 
        {
            try 
            {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } 
            catch (InterruptedException e) 
            {
                e.printStackTrace();
            }
        }
    }
}

Reference

Timer mechanism (Signaling)

Busy waiting

  • Def: Setting the signal values in some shared object variable. Thread A may set the boolean member variable hasDataToProcess to true from inside a synchronized block, and thread B may read the hasDataToProcess member variable, also inside a synchronized block.

  • Example: Thread B is constantly checking signal from thread A which causes hasDataToProcess() to return true on a loop. This is called busy waiting

// class definition
public class MySignal
{
  protected boolean hasDataToProcess = false;

  public synchronized boolean hasDataToProcess()
  {
    return this.hasDataToProcess;
  }

  public synchronized void setHasDataToProcess(boolean hasData)
  {
    this.hasDataToProcess = hasData;  
  }
}

...

// main program
protected MySignal sharedSignal = ...

// Thread B is busy waiting for thread a to set 

while(!sharedSignal.hasDataToProcess())
{
  //do nothing... busy waiting
}

Wait notify

  • Pros:

    • Reduce the CPU load caused by waiting thread in busy waiting mode.

  • Cons:

    • Missed signals: if you call notify() before wait() it is lost.

    • it can be sometimes unclear if notify() and wait() are called on the same object.

    • There is nothing in wait/notify which requires a state change, yet this is required in most cases.

    • Spurious wakeups: wait() can return spuriously

// Clients: Insert delayed tasks to delayQueues (Redis sorted set)
InsertDelayTasks(String msg)
{
    // score = current time + delay time
    redis.zdd(delayTaskSortedSets,score,msg)

    // the number of elements in delayTaskSortedSets
    len = zcount(delayTaskSortedSets, 0, -1)

    // notify polling thread if there exists delayed tasks to be executed
    synchronized(delayTaskSortedSets)
    {
        if(len > 0)
        {
            delayTaskSortedSets.notify()
        }
    } 
}

// DelayQueue server polling thread: Scan delayQueues and put expired tasks to ready queue
GetDelayMsg()
{   
    while(True)
    {
        // Wait until the number of elements inside delayTasksSortedTask is bigger than 0 
        synchronized(delayTaskSortedSets)
        {
            while (0 == zcount(delayTaskSortedSets,0, -1))
            {
                delayTaskSortedSets.wait()
            }
        }

        // Peek the top element from delayTasksSortedSet
        msg = redis.zcard(delayTaskSortedSets,0,1)
        waittime = score - curtime

        if(waittime > 0)
        {
            // Still need to wait
            synchronized(delayTaskSortedSets)
            {
                delayTaskSortedSets.wait(waittime)
            }
        }
        else
        {
            // Add to an element to ReadyQueue
            readyQueue.put(delayTaskSortedSets, msg)
            redis.zrem(msg);
        }
    }
}

// ReadyQueue server processing thread: Process ReadyQueue elements 
ProcessReady()
{
    while(True)
    {
        msg = blockingReadyQueue.take()
        MQ.insert(msg)
    }

    mq.inset(msg)
}

Reference

  • Single machine delayed scheduler

    • https://soulmachine.gitbooks.io/system-design/content/cn/task-scheduler.html

    • https://zhuanlan.zhihu.com/p/228420432

  • Naive impl in Java: https://medium.com/nerd-for-tech/distributed-task-scheduler-redis-329475df9dcf

PriorityQueue
DelayQueue implementation in JDK
DelayedQueue interface
Test with Producer/Consumer pattern
Reference
Timer mechanism (Signaling)
Busy waiting
Wait notify
Reference
https://www.baeldung.com/java-delay-queue