BIG
DATA

JAVA

Lifecycle of a Storm Topology

Read more about »
  • Java 9 features
  • Read about Hadoop
  • Read about Storm
  • Read about Storm
 

Starting a topology

  • "storm jar" command executes your class with the specified arguments. The only special thing that "storm jar" does is set the "storm.jar" environment variable for use by StormSubmitter later.
     [storm jar topology-jar-path class ...]
    

    Here, it runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter will upload the jar at topology-jar-path when the topology is submitted.

  • When your code uses StormSubmitter.submitTopology, StormSubmitter takes the following actions:
    • First, StormSubmitter uploads the jar if it hasn't been uploaded before.
    • Jar uploading is done via Nimbus's Thrift interface
    • beginFileUpload returns a path in Nimbus's inbox
    • 15 kilobytes are uploaded at a time through uploadChunk
    • finishFileUpload is called when it's finished uploading
    • Second, StormSubmitter calls submitTopology on the Nimbus thrift interface
    • The topology config is serialized using JSON (JSON is used so that writing DSL's in any language is as easy as possible)
    • Notice that the Thrift submitTopology call takes in the Nimbus inbox path where the jar was uploaded
  • Nimbus receives the topology submission.
  • Nimbus normalizes the topology configuration. The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly.
  • Nimbus sets up the static state for the topology
    • Jars and configs are kept on local filesystem because they're too big for Zookeeper. The jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}
    • setup-storm-static writes task -> component mapping into ZK
    • setup-heartbeats creates a ZK "directory" in which tasks can heartbeat
  • Nimbus calls mk-assignment to assign tasks to machines. Assignment contains:
    • master-code-dir: used by supervisors to download the correct jars/configs for the topology from Nimbus
    • task->node+port: Map from a task id to the worker that task should be running on. (A worker is identified by a node/port pair)
    • node->host: A map from node id to hostname. This is used so workers know which machines to connect to to communicate with other workers. Node ids are used to identify supervisors so that multiple supervisors can be run on one machine. One place this is done is with Mesos integration.
    • task->start-time-secs: Contains a map from task id to the timestamp at which Nimbus launched that task. This is used by Nimbus when monitoring topologies, as tasks are given a longer timeout to heartbeat when they're first launched (the launch timeout is configured by "nimbus.task.launch.secs" config)
  • Once topologies are assigned, they're initially in a deactivated mode. start-storm writes data into Zookeeper so that the cluster knows the topology is active and can start emitting tuples from spouts.
  • Supervisor runs two functions in the background:

    • synchronize-supervisor: This is called whenever assignments in Zookeeper change and also every 10 seconds.
      • Downloads code from Nimbus for topologies assigned to this machine for which it doesn't have the code yet.
      • Writes into local filesystem what this node is supposed to be running. It writes a map from port -> LocalAssignment. LocalAssignment contains a topology id as well as the list of task ids for that worker.
    • sync-processes: Reads from the LFS what synchronize-supervisor wrote and compares that to what's actually running on the machine. It then starts/stops worker processes as necessary to synchronize.
  • Worker processes start up through the mk-worker function
    • Worker connects to other workers and starts a thread to monitor for changes. So if a worker gets reassigned, the worker will automatically reconnect to the other worker's new location.
    • Monitors whether a topology is active or not and stores that state in the storm-active-atom variable. This variable is used by tasks to determine whether or not to call nextTuple on the spouts.
    • The worker launches the actual tasks as threads within it
  • Tasks are set up through the mk-task function
    • Tasks set up routing function which takes in a stream and an output tuple and returns a list of task ids to send the tuple
    • Tasks set up the spout-specific or bolt-specific code

Topology Monitoring

Nimbus monitors the topology during its lifetime

  • Schedules recurring task on the timer thread to check the topologies
  • Nimbus's behavior is represented as a finite state machine
  • The "monitor" event is called on a topology every "nimbus.monitor.freq.secs", which calls reassign-topology through reassign-transition
  • reassign-topology calls mk-assignments, the same function used to assign the topology the first time. mk-assignments is also capable of incrementally updating a topology
    • mk-assignments checks heartbeats and reassigns workers as necessary
    • Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers

Killing a topology

  • "storm kill" command runs this code which just calls the Nimbus Thrift interface to kill the topology
  • Nimbus receives the kill command
  • Nimbus applies the "kill" transition to the topology
  • The kill transition function changes the status of the topology to "killed" and schedules the "remove" event to run "wait time seconds" in the future.
    • The wait time defaults to the topology message timeout but can be overridden with the -w flag in the "storm kill" command
    • This causes the topology to be deactivated for the wait time before its actually shut down. This gives the topology a chance to finish processing what it's currently processing before shutting down the workers
    • Changing the status during the kill transition ensures that the kill protocol is fault-tolerant to Nimbus crashing. On startup, if the status of the topology is "killed", Nimbus schedules the remove event to run "wait time seconds" in the future
  • Removing a topology cleans out the assignment and static information from ZK
  • A separate cleanup thread runs the do-cleanup function which will clean up the heartbeat dir and the jars/configs stored locally.

Note: The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit "acker" bolt added to manage the acking framework (used to guarantee data processing). The implicit topology is created via the system-topology! function.

system-topology! is used in two places:

  • when Nimbus is creating tasks for the topology
  • in the worker so it knows where it needs to route messages to

Storm Fault Tolerance

What happens when a worker dies?

When a worker dies, the supervisor will restart it. If it continuously fails on startup and is unable to heartbeat to Nimbus, Nimbus will reassign the worker to another machine.

What happens when a node dies?

The tasks assigned to that machine will time-out and Nimbus will reassign those tasks to other machines.

What happens when Nimbus or Supervisor daemons die?

The Nimbus and Supervisor daemons are designed to be fail-fast (process self-destructs whenever any unexpected situation is encountered) and stateless (all state is kept in Zookeeper or on disk). As described in Setting up a Storm cluster, the Nimbus and Supervisor daemons must be run under supervision using a tool like daemontools or monit. So if the Nimbus or Supervisor daemons die, they restart like nothing happened. Most notably, no worker processes are affected by the death of Nimbus or the Supervisors. This is in contrast to Hadoop, where if the JobTracker dies, all the running jobs are lost.

Is Nimbus a single point of failure?

If you lose the Nimbus node, the workers will still continue to function. Additionally, supervisors will continue to restart workers if they die. However, without Nimbus, workers won't be reassigned to other machines when necessary (like if you lose a worker machine). So the answer is that Nimbus is "sort of" a SPOF. In practice, it's not a big deal since nothing catastrophic happens when the Nimbus daemon dies. There are plans to make Nimbus highly available in the future.