BIG
DATA

JAVA

Setting up a Storm Cluster

Read more about »
  • Java 9 features
  • Read about Hadoop
  • Read about Storm
  • Read about Storm
 

Set up a Zookeeper cluster

Storm uses Zookeeper for coordinating the cluster. Zookeeper is not used for message passing, so the load Storm places on Zookeeper is quite low. Single node Zookeeper clusters should be sufficient for most cases, but if you want failover or are deploying large Storm clusters you may want larger Zookeeper clusters.

ZooKeeper is created in Java. It runs in Java, release 1.6 or greater (JDK 6 or greater, FreeBSD support requires openjdk7). For instructions to install Java, click here: Setting Java

To install and setup ZooKeeper, click here:
www.corejavaguru.com/zookeeper-getting-started-with-zookeeper

Install dependencies on Nimbus and worker machines

Before we jump into installing Storm, let's take a look at the technologies with which Storm and topologies are built.

Java and Clojure

Storm runs on the Java Virtual Machine and is written with a roughly equal combination of Java and Clojure. Storm's primary interfaces are defined in Java, with the core logic being implemented mostly in Clojure. In addition to JVM languages, Storm uses Python to implement the Storm executable. Beyond those languages, Storm is a highly polyglot-friendly technology due in part to the fact that a number of its interfaces use Apache Thrift.

The components of Storm topologies (spouts and bolts) can be written in virtually any programming language supported by the operating system on which it's installed. JVM language implementations can run natively, and other implementations are possible through JNI and Storm's multilang protocol.

Python

All Storm daemons and management commands are run from a single executable file written in Python. This includes the nimbus and supervisor daemons, and as we'll see, all the commands to deploy and manage topologies.

It is for this reason that a properly configured Python interpreter be installed on all machines participating in a Storm cluster as well as any workstation used for management purposes.

So Storm's dependencies on Nimbus and the worker machines are:

  1. Java 6
  2. Python 2.6.6

These both have to be installed on all the storm nodes(machines).

Download and extract a Storm release to Nimbus and worker machines

NOTE: Storm was originally designed to run on Unix-style operating systems, but as of Version 0.9.1, it supports deployment on Windows as well.

Download a Storm release and extract the zip file somewhere on Nimbus and each of the worker machines. The Storm releases can be downloaded from here:
http://github.com/apache/storm/releases

Fill in mandatory configurations into storm.yaml

The Storm release contains a file at conf/storm.yaml that configures the Storm daemons. You can see the default configuration values here. storm.yaml overrides anything in defaults.yaml. There's a few configurations that are mandatory to get a working cluster:

1) storm.zookeeper.servers: This setting is a list of the hostnames in the ZooKeeper cluster. It should look something like:

storm.zookeeper.servers:
  - "111.222.333.444"
  - "555.666.777.888"

If the port that your Zookeeper cluster uses is different than the default, you should set storm.zookeeper.port as well.

2) storm.local.dir: The Nimbus and Supervisor daemons require a directory on the local disk to store small amounts of state (like jars, confs, and things like that). You should create that directory on each machine, give it proper permissions, and then fill in the directory location using this config. For example:

storm.local.dir: "/mnt/storm"

3) nimbus.host: The worker nodes need to know which machine is the master in order to download topology jars and confs. For example:

nimbus.host: "111.222.333.44"

4) supervisor.slots.ports: For each worker machine, you configure how many workers run on that machine with this config. Each worker uses a single port for receiving messages, and this setting defines which ports are open for use. If you define five ports here, then Storm will allocate up to five workers to run on this machine. If you define three ports, Storm will only run up to three. By default, this setting is configured to run 4 workers on the ports 6700, 6701, 6702, and 6703. For example:

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

Optional settings

There are several other settings that you may find necessary to override in addition to the settings that are mandatory for an operational cluster. Storm configuration settings follow a dotted naming convention where the prefix identifies the category of the setting;

Prefix Category
storm.* General configuration
nimbus.* Nimbus configuration
ui.* Storm UI configuration
drpc.* DRPC server configuration
supervisor.* Supervisor configuration
worker.* Worker configuration
zmq.* ZeroMQ configuration
topology.* Topology configuration

For a complete list of the default configuration settings that are available, take a look at the defaults.yaml file in the Storm source code https://github.com/nathanmarz/storm/blob/master/conf/defaults.yaml. Some of the more frequently overridden settings are outlined as follows:

  • nimbus.childopts (default: "-Xmx1024m"): This setting is a list of JVM options that will be added to the Java command line when starting the nimbus daemon.
  • ui.port (default: 8080): This specifies the listening port for the Storm UI web server.
  • ui.childopts (default: "-Xmx1024m"): This specifies the JVM options that will be added to the Java command line when starting the Storm UI service.
  • supervisor.childopts (default: "-Xmx1024m"): This specifies the JVM options that will be added to the Java command line when starting the supervisor daemon.
  • worker.childopts (default: "-Xmx768m"): This specifies the JVM options that will be added to the Java command line when starting worker processes.
  • topology.message.timeout.secs (default: 30): This configures the maximum amount of time (in seconds) for a tuple's tree to be acknowledged (fully processed) before it is considered failed (timed out). Setting this value too low may cause tuples to be replayed repeatedly. For this setting to take effect, a spout must be configured to emit anchored tuples.
  • topology.max.spout.pending (default: null): With the default value of null, Storm will stream tuples from a spout as fast as the spout can produce them. Depending on the execute latency of downstream bolts, the default behavior can overwhelm the topology, leading to message timeouts. Setting this value to a non-null number greater than 0 will cause Storm to pause streaming tuples from spouts until the number of outstanding tuples falls below that number, essentially throttling the spout. This setting, along with topology.message.timeout.secs, are two of the most important parameters when tuning a topology for performance.
  • topology.enable.message.timeouts (default: true): This sets the timeout behavior for anchored tuples. If false, anchored tuples will not time out. Use this setting with care. Consider altering topology.message.timeout.secs before setting this to false. For this setting to take effect, a spout must be configured to emit anchored tuples.

Launch daemons under supervision using "storm" script and a supervisor of your choice

The last step is to launch all the Storm daemons. It is critical that you run each of these daemons under supervision. Storm is a fail-fast system which means the processes will halt whenever an unexpected error is encountered. Storm is designed so that it can safely halt at any point and recover correctly when the process is restarted. This is why Storm keeps no state in-process -- if Nimbus or the Supervisors restart, the running topologies are unaffected. Here's how to run the Storm daemons:

  1. Nimbus: Run the command "bin/storm nimbus" under supervision on the master machine.
  2. Supervisor: Run the command "bin/storm supervisor" under supervision on each worker machine. The supervisor daemon is responsible for starting and stopping worker processes on that machine.
  3. UI: Run the Storm UI (a site you can access from the browser that gives diagnostics on the cluster and topologies) by running the command "bin/storm ui" under supervision. The UI can be accessed by navigating your web browser to http://{nimbus host}:8080.

The management commands

Storm's management commands are used to deploy and manage topologies running in a cluster. To read more about management commands click storm-management-commands