storm架构
History
Storm is an improvements on Yahoo S4. It solved the following pain points:
Yahoo S4 will create a huge number of PE, consuming huge number of memory and GC cost.
Yahoo S4 needs to embed data distribution logic into business logic layer.
Model
Spout: Data source.
Tuple: The minimum unit for data transmission. A key, value pair.
Streams: A stream contain huge number of tuples.
Bolts: The place where business logic is calculated.
Architecture
Nimbus
Master node in cluster. Resource manager and job scheduler.
Supervisor
Receive jobs from Nimus.
Monitor whether workers are alive.
Assign jobs to workers.
Worker
Each worker process is an independent JVM.
Zookeeper
Nimbus write corresponding tasks to Zookeeper for durability and high availability.
Fault tolerant (At least once)
AckerBolt
When Spout sends out a message, it will also notify AckerBolt.
Once Bolt finished processing root tuple, it will notify AckerBolt.
Bolt will tell AckerBolt two pieces of information:
It has finished processing a tuple.
What derivative downstream tuples it has already sent out.
Last layer bolt will notify that there are no additional tuples.
XOR
It could only guarantee that each tuple sent out by spout is processed at least once.
Example ads stream architecture
Ads model
Each log entry represents an ad display: Ad location + Ad customer location + Ad ID
Event type: Impression means display, or click
UID:
Event ID: Unique ID for retry.
Last timestamp:
Tasks
Calculate realtime ads fee for each customer.
Calculate ads click rate. For an ad with low click rate, stop ad display.
Architecture
KafkaSpout which pulls log from Kafka, parse segments and send to downstream bolt.
For each log from Kafka spout, it will send to the following two types of bolts: AdsCounterBolt and ClientSpentBolt
AckerBolt guarantees that each message is processed at least once.
AdsCtrBolt
AdsCtrBolt: Calculate click rate for different type of ads
Maintains an in-memory map of Ads ID => (Num of display, Num of click, AD cost) and update to HBase every minute.
ClientSpentBolt
ClientSpentBolt: Calculate cost for each client and update HBase
Update the cost data in HBase according to a higher frequency, even update HBase with each click.
NonFunc requirement
Exactly once delivery
Example conditions that will trigger duplicated messages:
A KafkaSpout has hardware failure and need to reboot
The offset in Kafka is stored inside Zookeeper.
Zookeeper is designed as a coarse distributed lock, not as a high-throughput KV so usually only a batch offset update will be recorded by Zookeeper.
A KafkaSpout reboot usually indicates that a large number of messages need to be resent.
A ClientSpentBolt has high latency when write message. As a result of Storm AckerBolt, it will resend the message.
Dedupe with bloomfilter
Put all message id inside this bloomfilter.
Parition the global bloomfilter into multiple time window.
Typically for a resend, it won't span beyond 30 minutes.
Fault tolerant (In-memory machine reboot)
There are some states maintained inside memory. Take the example above, AdsCtrBolt maintains an in-memory map of Ads ID => (Num of display, Num of click, AD cost).
To make scaling easier, these in-memory states need to be persisted in an external datastore.
Time window accuracy
Processing time instead of event time
Use the example of calculating ads click rate every min, Storm realizes it by TickTuple.
Storm will send a signal to both bolt and spout according to the specified timestamp.
However, here we are using the timestamp where message is transmitted to bolts/spouts instead of when the ad click happen.
In other words, Storm is using processing time instead of event time.
Use cases need event time
Advertisment click: If a customer does not allocate any budget for December, an ad click happens on 11:59:59 11/30.
Replay logs: In some analytics cases, kafka log replay is needed
Challenges in using the event time
If using the event time to replace the processing time, there will be some challenges.
Use the example of calculating ads click rate every min,
Maintain two instead of one level mapping
A time window bounded mapping needed to be maintained.
Instead of Ads ID => (Num of display, Num of click, AD cost),
[TimeWindow1: Ads ID => (Num of display, Num of click, AD cost)], ..., [TimeWindowN: Ads ID => (Num of display, Num of click, AD cost)]
When to persist memory data
The log sent from upstream is not strictly ordered by timestamp
After persisting data in one time window, if another record in that time window comes again, how will we respond to that (Discard / or read data from DB and update)
Real world
Last updated