BIG
DATA

JAVA

Running Topologies on a Cluster

Read more about »
  • Java 9 features
  • Read about Hadoop
  • Read about Storm
  • Read about Storm
 

Running Topologies on a Production Cluster

Running topologies on a production cluster is similar to running in Local mode. First we will start with the installation of a Storm client on the client machine, which can be different from the machines in the Storm cluster because submitting and deploying topologies on a remote Storm cluster requires a Storm client.

Once we have a running cluster, let's revisit our earlier word count example and modify it so we can deploy it to a cluster as well as run it in local mode. The previous example used Storm's LocalCluster class to run in local mode:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.
createTopology());

Submitting a topology to a remote cluster is simply a matter of using Storm's StormSubmitter class, which exposes a method with the same name and signature:

StormSubmitter.submitTopology(TOPOLOGY_NAME, config,
builder.createTopology());

When developing Storm topologies, you usually aren't going to want to change code and recompile them to switch between running in local mode and deploying to a cluster. The standard way to handle this is to add an if/else block that makes that determination based on a command-line argument. In our updated example, if there are no command line arguments, we run the topology in local mode; otherwise, we use the first argument as the topology name and submit it to the cluster, as shown in the following code:

public class WordCountTopology {
	private static final String SENTENCE_SPOUT_ID =
			"sentence-spout";
	private static final String SPLIT_BOLT_ID = "split-bolt";
	private static final String COUNT_BOLT_ID = "count-bolt";
	private static final String REPORT_BOLT_ID = "report-bolt";
	private static final String TOPOLOGY_NAME =
			"word-count-topology";
	public static void main(String[] args) throws Exception {
		SentenceSpout spout = new SentenceSpout();
		SplitSentenceBolt splitBolt = new SplitSentenceBolt();
		WordCountBolt countBolt = new WordCountBolt();
		ReportBolt reportBolt = new ReportBolt();
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
		// SentenceSpout --> SplitSentenceBolt
		builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
		.setNumTasks(4)
		.shuffleGrouping(SENTENCE_SPOUT_ID);
		// SplitSentenceBolt --> WordCountBolt
		builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
		.fieldsGrouping(SPLIT_BOLT_ID,
				new Fields("word"));
		// WordCountBolt --> ReportBolt
		builder.setBolt(REPORT_BOLT_ID, reportBolt)
		.globalGrouping(COUNT_BOLT_ID);
		Config config = new Config();
		config.setNumWorkers(2);
		if(args.length == 0){
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology(TOPOLOGY_NAME, config,
					builder.createTopology());
			waitForSeconds(10);
			cluster.killTopology(TOPOLOGY_NAME);
			cluster.shutdown();
		} else{
			StormSubmitter.submitTopology(args[0],
					config, builder.createTopology());
		}
	}
}

Now create a jar containing your code and all the dependencies of your code (except for Storm -- the Storm jars will be added to the classpath on the worker nodes).

If you're using Maven, the Maven Assembly Plugin can do the packaging for you. Just add this to your pom.xml:

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
  <descriptorRefs>  
	<descriptorRef>jar-with-dependencies</descriptorRef>
  </descriptorRefs>
  <archive>
	<manifest>
	  <mainClass>com.path.to.main.Class</mainClass>
	</manifest>
  </archive>
</configuration>
</plugin>

Then perform a Maven build

mvn clean install

Next, run the storm jar command using the storm client, specifying the path to your jar, the classname to run, and any arguments it will use:

storm jar ./target/Chapter1-1.0-SNAPSHOT.jar
storm.blueprints.chapter1.WordCountTopology wordcount-topology

When the command completes, you should see the topology become active in the Storm UI and be able to click on the topology name to drill down and view the topology statistics.

storm jar will submit the jar to the cluster and configure the StormSubmitter class to talk to the right cluster. In this example, after uploading the jar storm jar calls the main function on org.me.MyTopology with the arguments "arg1", "arg2", and "arg3".

Common configurations

There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found here. The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology:

  1. Config.TOPOLOGY_WORKERS: This sets the number of worker processes to use to execute the topology. For example, if you set this to 25, there will be 25 Java processes across the cluster executing all the tasks. If you had a combined 150 parallelism across all components in the topology, each worker process will have 6 tasks running within it as threads.
  2. Config.TOPOLOGY_ACKER_EXECUTORS: This sets the number of executors that will track tuple trees and detect when a spout tuple has been fully processed. By not setting this variable or setting it as null, Storm will set the number of acker executors to be equal to the number of workers configured for this topology. If this variable is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.
  3. Config.TOPOLOGY_MAX_SPOUT_PENDING: This sets the maximum number of spout tuples that can be pending on a single spout task at once (pending means the tuple has not been acked or failed yet). It is highly recommended you set this config to prevent queue explosion.
  4. Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: This is the maximum amount of time a spout tuple has to be fully completed before it is considered failed. This value defaults to 30 seconds, which is sufficient for most topologies.
  5. Config.TOPOLOGY_SERIALIZATIONS: You can register more serializers to Storm using this config so that you can use custom types within tuples.

Killing a topology

To kill a topology, simply run:

storm kill {stormname}

Give the same name to storm kill as you used when submitting the topology.

Storm won't kill the topology immediately. Instead, it deactivates all the spouts so that they don't emit any more tuples, and then Storm waits Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS seconds before destroying all the workers. This gives the topology enough time to complete any tuples it was processing when it got killed.

Monitoring topologies

The best place to monitor a topology is using the Storm UI. The Storm UI provides information about errors happening in tasks and fine-grained stats on the throughput and latency performance of each component of each running topology.

You can also look at the worker logs on the cluster machines.