BIG
DATA

JAVA

Storm Word Count topology

Read more about »
  • Java 9 features
  • Read about Hadoop
  • Read about Storm
  • Read about Storm
 

Introducing the word count topology

The sample word count topology shown in the following diagram will cover how to create a basic Storm project, including a spout and bolt. Our word count topology will consist of a single spout connected to three downstream bolts.

Storm word count topology

Sentence Spout

The SentenceSpout class will simply emit a stream of single-value tuples with the key name "sentence" and a string value (a sentence), as shown in the following code:

{ "sentence":"my dog has fleas" }

To keep things simple, the source of our data will be a static list of sentences that we loop over, emitting a tuple for every sentence. In a real-world application, a spout would typically connect to a dynamic source, such as tweets retrieved from the Twitter API, or the files being uploaded or any messaging queue.

Split Sentence Bolt

The split sentence bolt will subscribe to the sentence spout's tuple stream. For each tuple received, it will look up the "sentence" object's value, split the value into words, and emit a tuple for each word:

{ "word" : "my" }
{ "word" : "dog" }
{ "word" : "has" }
{ "word" : "fleas" }

Word Count Bolt

The word count bolt subscribes to the output of the SplitSentenceBolt class, keeping a running count of how many times it has seen a particular word. Whenever it receives a tuple, it will increment the counter associated with a word and emit a tuple containing the word and the current count:

{ "word" : "dog", "count" : 5 }

Report Bolt

The report bolt subscribes to the output of the WordCountBolt class and maintains a table of all words and their corresponding counts, just like WordCountBolt. When it receives a tuple, it updates the table and prints the contents to the console.

Implementing the word count topology

For now, we'll be developing and running a Storm topology in local mode. Storm's local mode simulates a Storm cluster within a single JVM instance, making it easy to develop and debug Storm topologies in a local development environment or IDE. Later, I'll show you how to take Storm topologies developed in local mode and deploy them to a fully clustered environment.

Setting up a development environment

Previous chapter you have seen how to configuring Storm Clusters and now to deploy a Storm topology to a clustered environment, requires special packaging of your compiled classes and dependencies. For this reason, it is highly recommended that you use a build management tool such as Apache Maven, Gradle, or Leinengen.

Implementing the sentence spout

To keep things simple, our SentenceSpout implementation will simulate a data source by creating a static list of sentences that gets iterated. Each sentence is emitted as a single field tuple.

public class SentenceSpout extends BaseRichSpout {
	private SpoutOutputCollector collector;
	private String[] sentences = {
			"my dog has fleas",
			"i like cold beverages",
			"the dog ate my homework",
			"don't have a cow man",
			"i don't think i like fleas"
	};
	private int index = 0;
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}
	public void open(Map config, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
	}
	public void nextTuple() {
		this.collector.emit(new Values(sentences[index]));
		index++;
		if (index >= sentences.length) {
			index = 0;
		}
		Utils.waitForMillis(1);
	}
}

The BaseRichSpout class is a convenient implementation of the ISpout and IComponent interfaces and provides default implementations for methods we don't need in this example.

The declareOutputFields() method is defined in the IComponent interface that all Storm components (spouts and bolts) must implement and is used to tell Storm what streams a component will emit and the fields each stream's tuples will contain. In this case, we're declaring that our spout will emit a single (default) stream of tuples containing a single field ("sentence").

The open() method is defined in the ISpout interface and is called whenever a spout component is initialized. The open() method takes three parameters: a map containing the Storm configuration, a TopologyContext object that provides information about a components placed in a topology, and a SpoutOutputCollector object that provides methods for emitting tuples. In this example, we don't need to perform much in terms of initialization, so the open() implementation simply stores a reference to the SpoutOutputCollector object in an instance variable.

The nextTuple() method represents the core of any spout implementation. Storm calls this method to request that the spout emit tuples to the output collector. Here, we just emit the sentence at the current index, and increment the index.

Implementing the split sentence bolt

public class SplitSentenceBolt extends BaseRichBolt{
	private OutputCollector collector;
	public void prepare(Map config, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}
	public void execute(Tuple tuple) {
		String sentence = tuple.getStringByField("sentence");
		String[] words = sentence.split(" ");
		for(String word : words){
			this.collector.emit(new Values(word));
		}
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

The BaseRichBolt class is another convenience class that implements both the IComponent and IBolt interfaces. Extending this class frees us from having to implement methods we're not concerned with and lets us focus on the functionality we need.

The prepare() method defined by the IBolt interface is analogous to the open() method of ISpout. This is where you would prepare resources such as database connections during bolt initialization. Like the SentenceSpout class, the SplitSentenceBolt class does not require much in terms of initialization, so the prepare() method simply saves a reference to the OutputCollector object.

In the declareOutputFields() method, the SplitSentenceBolt class declares a single stream of tuples, each containing one field ("word").

The core functionality of the SplitSentenceBolt class is contained in the execute() method defined by IBolt. This method is called every time the bolt receives a tuple from a stream to which it subscribes. In this case, it looks up the value of the "sentence" field of the incoming tuple as a string, splits the value into individual words, and emits a new tuple for each word.

Implementing the word count bolt

The WordCountBolt class is the topology component that actually maintains the word count. In the bolt's prepare() method, we instantiate an instance of HashMap<String, Long> that will store all the words and their corresponding counts. It is common practice to instantiate most instance variables in the prepare() method. The reason behind this pattern lies in the fact that when a topology is deployed, its component spouts and bolts are serialized and sent across the network. If a spout or bolt has any non-serializable instance variables instantiated before serialization (created in the constructor, for example) a NotSerializableException will be thrown and the topology will fail to deploy. In this case, since HashMap<String, Long> is serializable, we could have safely instantiated it in the constructor. However, in general, it is best to limit constructor arguments to primitives and serializable objects and instantiate non-serializable objects in the prepare() method.

In the declareOutputFields() method, the WordCountBolt class declares a stream of tuples that will contain both the word received and the corresponding count. In the execute() method, we look up the count for the word received (initializing it to 0 if necessary), increment and store the count, and then emit a new tuple consisting of the word and current count. Emitting the count as a stream allows other bolts in the topology to subscribe to the stream and perform additional processing.

public class WordCountBolt extends BaseRichBolt{
	private OutputCollector collector;
	private HashMap<String, Long> counts = null;
	public void prepare(Map config, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
		this.counts = new HashMap<String, Long>();
	}
	public void execute(Tuple tuple) {
		String word = tuple.getStringByField("word");
		Long count = this.counts.get(word);
		if(count == null){
			count = 0L;
		}
		count++;
		this.counts.put(word, count);
		this.collector.emit(new Values(word, count));
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count"));
	}
}

Implementing the report bolt

The purpose of the ReportBolt class is to produce a report of the counts for each word. Like the WordCountBolt class, it uses a HashMap<String, Long> object to record the counts, but in this case, it just stores the count received from the counter bolt. One difference between the report bolt and the other bolts we've written so far is that it is a terminal bolt—it only receives tuples. Because it does not emit any streams, the declareOutputFields() method is left empty.

The report bolt also introduces the cleanup() method defined in the IBolt interface. Storm calls this method when a bolt is about to be shutdown. We exploit the cleanup() method here as a convenient way to output our final counts when the topology shuts down, but typically, the cleanup() method is used to release resources used by a bolt, such as open files or database connections.

One important thing to keep in mind about the IBolt.cleanup() method when writing bolts is that there is no guarantee that Storm will call it when a topology is running on a cluster.

public class ReportBolt extends BaseRichBolt {
	private HashMap<String, Long> counts = null;
	public void prepare(Map config, TopologyContext context,
			OutputCollector collector) {
		this.counts = new HashMap<String, Long>();
	}
	public void execute(Tuple tuple) {
		String word = tuple.getStringByField("word");
		Long count = tuple.getLongByField("count");
		this.counts.put(word, count);
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// this bolt does not emit anything
	}
	public void cleanup() {
		System.out.println("--- FINAL COUNTS ---");
		List<String> keys = new ArrayList<String>();
		keys.addAll(this.counts.keySet());
		Collections.sort(keys);
		for (String key : keys) {
			System.out.println(key + " : " + this.counts.get(key));
		}
		System.out.println("--------------");
	}
}

Implementing the word count topology

Now that we've defined the spout and bolts that will make up our computation, we're ready to wire them together into a runnable topology.

public class WordCountTopology {
	private static final String SENTENCE_SPOUT_ID = "sentence-spout";
	private static final String SPLIT_BOLT_ID = "split-bolt";
	private static final String COUNT_BOLT_ID = "count-bolt";
	private static final String REPORT_BOLT_ID = "report-bolt";
	private static final String TOPOLOGY_NAME = "word-count-topology";
	public static void main(String[] args) throws Exception {
		SentenceSpout spout = new SentenceSpout();
		SplitSentenceBolt splitBolt = new SplitSentenceBolt();
		WordCountBolt countBolt = new WordCountBolt();
		ReportBolt reportBolt = new ReportBolt();
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout(SENTENCE_SPOUT_ID, spout);
		// SentenceSpout --> SplitSentenceBolt
		builder.setBolt(SPLIT_BOLT_ID, splitBolt)
		.shuffleGrouping(SENTENCE_SPOUT_ID);
		// SplitSentenceBolt --> WordCountBolt
		builder.setBolt(COUNT_BOLT_ID, countBolt)
		.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
		// WordCountBolt --> ReportBolt
		builder.setBolt(REPORT_BOLT_ID, reportBolt)
		.globalGrouping(COUNT_BOLT_ID);
		Config config = new Config();
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology(TOPOLOGY_NAME, config, builder.
				createTopology());
		waitForSeconds(10);
		cluster.killTopology(TOPOLOGY_NAME);
		cluster.shutdown();
	}
}

Storm topologies are typically defined and run in a Java main() method. We begin the main() method by instantiating our spout and bolts and creating an instance of TopologyBuilder. The TopologyBuilder class provides a fluent-style API for defining the data flow between components in a topology. We start by registering the sentence spout and assigning it a unique ID:

builder.setSpout(SENTENCE_SPOUT_ID, spout);

The next step is to register SplitSentenceBolt and establish a subscription to the stream emitted by the SentenceSpout class:

builder.setBolt(SPLIT_BOLT_ID, splitBolt)
.shuffleGrouping(SENTENCE_SPOUT_ID);

The setBolt() method registers a bolt with the TopologyBuilder class and returns an instance of BoltDeclarer that exposes methods for defining the input source(s) for a bolt. Here we pass in the unique ID we defined for the SentenceSpout object to the shuffleGrouping() method establishing the relationship. The shuffleGrouping() method tells Storm to shuffle tuples emitted by the SentenceSpout class and distribute them evenly among instances of the SplitSentenceBolt object.

The next line establishes the connection between the SplitSentenceBolt class and the WordCountBolt class:

builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new
Fields("word"));

Here, we use the fieldsGrouping() method of the BoltDeclarer class to ensure that all tuples containing the same "word" value get routed to the same WordCountBolt instance.

The last step in defining our data flow is to route the stream of tuples emitted by the WordCountBolt instance to the ReportBolt class. In this case, we want all tuples emitted by WordCountBolt routed to a single ReportBolt task. This behavior is provided by the globalGrouping() method, as follows:

builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID);

With our data flow defined, the final step in running our word count computation is to build the topology and submit it to a cluster:

Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.
createTopology());
waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();

Here, we're running Storm in local mode using Storm's LocalCluster class to simulate a full-blown Storm cluster within our local development environment. Local mode is a convenient way to develop and test Storm applications without the overhead of deploying to a distributed cluster. Local mode also allows you to run Storm topologies within an IDE, setting breakpoints, halting execution, inspecting variables and profiling the application in ways that are much more time consuming or near impossible when deploying to a Storm cluster.

In this example, we create a LocalCluster instance and call the submitTopology() method with the topology name, an instance of backtype.storm.Config, and the Topology object returned by the TopologyBuilder class' createTopology() method

Storm's Config class is simply an extension of HashMap<String, Object>, which defines a number of Storm-specific constants and convenience methods for configuring a topology's runtime behavior. When a topology is submitted, Storm will merge its predefined default configuration values with the contents of the Config instance passed to the submitTopology() method, and the result will be passed to the open() and prepare() methods of the topology spouts and bolts respectively. In this sense, the Config object represents a set of configuration parameters that are global to all components in a topology.

We're now ready to run the WordCountTopology class. The main() method will submit the topology, wait for ten seconds while it runs, kill (undeploy) the topology, and finally shut down the local cluster. When the program run is complete, you should see console output similar to the following:

--- FINAL COUNTS ---
a : 1426
ate : 1426
beverages : 1426
cold : 1426
cow : 1426
dog : 2852
don't : 2851
fleas : 2851
has : 1426
have : 1426
homework : 1426
i : 4276
like : 2851
man : 1426
my : 2852
the : 1426
think : 1425
--------------