BIG
DATA

JAVA

Storm Stream Groupings

Read more about »
  • Java 9 features
  • Read about Hadoop
  • Read about Storm
  • Read about Storm
 

Stream groupings

One of the most important things that you need to do when designing a topology is to define how data is exchanged between components (how streams are consumed by the bolts). A Stream Grouping specifies which streams are consumed by each bolt and how the stream will be consumed.

A stream grouping tells a topology how to send tuples between two components. Remember, spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like this:

Apache Storm Stream Groupings

When defining a topology, we create a graph of computation with a number of bolt-processing streams. At a more granular level, each bolt executes as multiple tasks in the topology. A stream will be partitioned into a number of partitions and divided among the bolts' tasks. Thus, each task of a particular bolt will only get a subset of the tuples from the subscribed streams.

Stream grouping in Storm provides complete control over how this partitioning of tuples happens among many tasks of a bolt subscribed to a stream. Grouping for a bolt can be defined on the instance of the backtype.storm.topology.InputDeclarer class returned when defining bolts using the backtype.storm.topology.TopologyBuilder.setBolt method.

The stream grouping is set when the topology is defined,like below:

...
builder.setBolt("word-counter", new WordCounter())
.shuffleGrouping("word-reader");
...

In the preceding code block, a bolt is set on the topology builder, and then a source is set using the shuffle stream grouping. A stream grouping normally takes the source component ID as a parameter, and optionally other parameters as well, depending on the kind of stream grouping.

Storm supports the following types of stream groupings:

  • Shuffle grouping
  • Fields grouping
  • All grouping
  • Global grouping
  • Direct grouping
  • Local or shuffle grouping
  • Custom grouping

Shuffle grouping

Shuffle Grouping is the most commonly used grouping. Shuffle grouping distributes tuples in a uniform, random way across the tasks. An equal number of tuples will be processed by each task. This grouping is ideal when you want to distribute your processing load uniformly across the tasks and where there is no requirement of any data-driven partitioning.

Fields grouping

Fields Grouping allows you to control how tuples are sent to bolts, based on one or more fields of the tuple. It guarantees that a given set of values for a combination of fields is always sent to the same bolt. For example, if you want that all the tweets from a particular user should go to a single task, then you can partition the tweet stream using fields grouping on the username field in the following manner:

builder.setSpout("1", new TweetSpout());
builder.setBolt("2", new TweetCounter()).fieldsGrouping("1",
new Fields("username"))

Another common usage of fields grouping is to join streams. Since partitioning happens solely on the basis of field values and not the stream type, we can join two streams with any common join fields.

All grouping

All grouping is a special grouping that does not partition the tuples but sends a single copy of each tuple to all instances of the receiving bolt. This kind of grouping is used to send signals to bolts. For example, if you need to refresh a cache, you can send a refresh cache signal to all bolts.

Another example, if you are doing some kind of filtering on the streams, then you have to pass the filter parameters to all the bolts. This can be achieved by sending those parameters over a stream that is subscribed by all bolts' tasks with all grouping.

Example:
builder.setSpout("1", new TweetSpout());
builder.setSpout("signals", new SignalSpout());
builder.setBolt("2", new TweetCounter()).fieldsGrouping("1",
new Fields("username")).allGrouping("signals");

Global grouping

Global Grouping sends tuples generated by all instances of the source to a single target instance (specifically, the task with lowest ID). A general use case of this is when there needs to be a reduce phase in your topology where you want to combine results from previous steps in the topology in a single bolt.

Direct grouping

In direct grouping, the emitter decides where each tuple will go for processing. For example, say we have a log stream and we want to process each log entry using a specific bolt task on the basis of the type of resource. In this case, we can use direct grouping.

Local or shuffle grouping

If the tuple source and target bolt tasks are running in the same worker, using this grouping will act as a shuffle grouping only between the target tasks running on the same worker, thus minimizing any network hops resulting in increased performance. In case there are no target bolt tasks running on the source worker process, this grouping will act similar to the shuffle grouping mentioned earlier.

Custom grouping

you can define your own custom grouping by implementing the backtype.storm.grouping.CustomStreamGrouping interface.


Apache Storm Stream Groupings-shuffle-grouping Apache Storm Stream Groupings-fields-grouping Apache Storm Stream Groupings-global-grouping Storm Stream Groupings-all grouping

Guaranteeing Message Processing

Storm guarantees that each message coming off a spout will be fully processed. What does it mean for a message to be "fully processed"? A tuple coming off a spout can trigger thousands of tuples to be created based on it. For example, consider the following topology:

Apache Storm - Guaranteeing Message Processing

Here, Spout A emits a tuple T(A), which is processed by bolt B, and bolt C which emits the tuples T(AB) and T(AC), respectively. So, when all the tuples produced due to tuple T(A)—namely the tuple tree T(A), T(AB), and T(AC)—are processed, we say that the tuple has been processed completely.

When some of the tuples in a tuple tree fail to process, either due to a runtime error or a timeout, which is configurable for each topology, then Storm considers this to be a failed tuple.

What happens if a message is fully processed or fails to be fully processed?

Let's take a look at the lifecycle of a tuple coming off of a spout

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

First, Storm requests a tuple from the Spout by calling the nextTuple method on the Spout. The Spout uses the SpoutOutputCollector provided in the open method to emit a tuple to one of its output streams. When emitting a tuple, the Spout provides a "message id" that will be used to identify the tuple later. Emitting a message to the SpoutOutputCollector looks like this:

_collector.emit(new Values("field1", "field2", 3) , msgId);

Next, the tuple gets sent to consuming bolts and Storm takes care of tracking the tree of messages that is created. If Storm detects that a tuple is fully processed, Storm will call the ack method on the originating Spout task with the message id that the Spout provided to Storm. Likewise, if the tuple times-out Storm will call the fail method on the Spout. Note that a tuple will be acked or failed by the exact same Spout task that created it.

Storm's reliability API

The following are the three steps that are required by Storm in order to guarantee message processing:

  • Tag each tuple emitted by a spout with a unique message ID. This can be done by using the backtype.storm.spout.SpoutOutputColletor.emit method that takes a messageId argument as follows:
    spoutOutputCollector.emit(Collections.singletonList(
    (Object)tuple), generateMessageId(tuple));
    

    Storm uses this message ID to track the state of the tuple tree generated by this tuple. If you use one of the emit methods that don't take a messageId argument, Storm will not track it for complete processing. When the message is processed completely, Storm will send an acknowledgement with the same messageId argument that was used while emitting the tuple.

  • When one of the bolts in the topology needs to produce a new tuple in the course of processing a message, for example, bolt B in the preceding topology, then it should emit the new tuple anchored with the original tuple that it got from the spout. This can be done by using the overloaded emit methods in the backtype.storm.task.OutputCollector class that takes an anchor tuple as an argument. If you are emitting multiple tuples from the same input tuple, then anchor each outgoing tuple. The emit method is given in the following line of code:
    collector.emit(inputTuple, transform(inputTuple));
    
  • Whenever you are done with processing a tuple in the execute method of your bolt, send an acknowledgment using the backtype.storm.task.OutputCollector.ack method. When the acknowledgement reaches the emitting spout, you can safely mark the message as being processed.

    Similarly, if there is a problem in processing a tuple, a failure signal should be sent back using the backtype.storm.task.OutputCollector.fail method so that Storm can replay the failed message.