BIG
DATA

JAVA

Apache Storm Topology

Read more about »
  • Java 9 features
  • Read about Hadoop
  • Read about Storm
  • Read about Storm
 

The Storm data model

The basic unit of data that can be processed by a Storm application is called a tuple. Each tuple consists of a predefined list of fields. The value of each field can be a byte, char, integer, long, float, double, Boolean, or byte array. Storm also provides an API to define your own data types, which can be serialized as fields in a tuple. A tuple is dynamically typed, that is, you just need to define the names of the fields in a tuple and not their data type. The choice of dynamic typing helps to simplify the API and makes it easy to use.

Storm topology

To do realtime computation on Storm, you create what are called "topologies". A topology is a graph of computation. You create a Storm topology and deploy it on a Storm cluster to process the data. Each node in a topology contains processing logic, and links between nodes indicate how data should be passed around between nodes.

The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. One key difference is that a MapReduce job eventually finishes, whereas a topology runs forever (or until you kill it, of course). A topology is a graph of spouts and bolts that are connected with stream groupings.

Apache Storm Topology

Components of a Storm topology

Stream

The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics. Thus, Storm can also be viewed as a platform to transform streams. In the above topology diagram, streams are represented by arrows.

The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt consumes any number of input streams, does some processing, and possibly emits new streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases, and more.

Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Links between nodes in your topology indicate how tuples should be passed around. For example, if there is a link between Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C, then everytime Spout A emits a tuple, it will send the tuple to both Bolt B and Bolt C. All of Bolt B's output tuples will go to Bolt C as well.

Tuple

The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result. Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If you want to use another type, you'll need to implement and register a serializer for that type.

Spouts

Spouts represent the main entry point of data into a Storm topology. Spouts act as adapters that connect to a source of data, transform the data into tuples, and emit the tuples as a stream. A spout is a source of streams in a topology. Generally spouts will read tuples from an external source and emit them into the topology

Whenever a spout emits a tuple, Storm tracks all the tuples generated while processing this tuple, and when the execution of all the tuples in the graph of this source tuple is complete, it will send back an acknowledgement to the spout. This tracking happens only if a message ID was provided while emitting the tuple. If null was used as message ID, this tracking will not happen.

A tuple-processing timeout can also be defined for a topology, and if a tuple is not processed within the specified timeout, a fail message will be sent back to the spout. Again, this will happen only if you define a message ID. A small performance gain can be extracted out of Storm at the risk of some data loss by disabling the message acknowledgements, which can be done by skipping the message ID while emitting tuples.

The important methods of spout are:

  • nextTuple():This method is called by Storm to get the next tuple from the input source. Inside this method, you will have the logic of reading data from the external sources and emitting them to an instance of backtype.storm.spout.ISpoutOutputCollector. The schema for streams can be declared by using the declareStream method of backtype.storm.topology.OutputFieldsDeclarer. If a spout wants to emit data to more than one stream, it can declare multiple streams using the declareStream method and specify a stream ID while emitting the tuple.
  • ack(Object msgId):This method is invoked by Storm when the tuple with the given message ID is completely processed by the topology. At this point, the user should mark the message as processed and do the required cleaning up
  • fail(Object msgId): This method is invoked by Storm when it identifies that the tuple with the given message ID has not been processed successfully or has timed out of the configured interval. In such scenarios, the user should do the required processing so that the messages can be emitted again by the nextTuple method.
  • open(): This method is called only once—when the spout is initialized. If it is required to connect to an external source for the input data, define the logic to connect to the external source in the open method, and then keep fetching the data from this external source in the nextTuple method to emit it further.

Bolts

All processing in topologies is done in bolts. Bolts can be thought of as the operators or functions of your computation. They take as input any number of streams, process the data, and optionally emit one or more streams. Bolts may subscribe to streams emitted by spouts or other bolts, making it possible to create a complex network of stream transformations.

Ideally, each bolt in the topology should be doing a simple transformation of the tuples, and many such bolts can coordinate with each other to exhibit a complex transformation. The important methods of a bolt are:

  • execute(Tuple input): This method is executed for each tuple that comes through the subscribed input streams. In this method, you can do whatever processing is required for the tuple and then produce the output either in the form of emitting more tuples to the declared output streams or other things such as persisting the results in a database.

    If a message ID is associated with a tuple, the execute method must publish an ack or fail event using OutputCollector for the bolt or else Storm will not know whether the tuple was processed successfully or not.

  • prepare(Map stormConf, TopologyContext context, OutputCollector collector): A bolt can be executed by multiple workers in a Storm topology. The instance of a bolt is created on the client machine and then serialized and submitted to Nimbus. When Nimbus creates the worker instances for the topology, it sends this serialized bolt to the workers. The work will desterilize the bolt and call the prepare method. In this method, you should make sure the bolt is properly configured to execute tuples.

Tasks

Each spout or bolt executes as many tasks across the cluster. Each task corresponds to one thread of execution, and stream groupings define how to send tuples from one set of tasks to another set of tasks. You set the parallelism for each spout or bolt in the setSpout and setBolt methods of TopologyBuilder.

Workers

Topologies execute across one or more worker processes. Each worker process is a physical JVM and executes a subset of all the tasks for the topology. For example, if the combined parallelism of the topology is 300 and 50 workers are allocated, then each worker will execute 6 tasks (as threads within the worker). Storm tries to spread the tasks evenly across all the workers.