☰
StormSubmitter later.
[storm jar topology-jar-path class ...]
Here, it runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter will upload the jar at topology-jar-path when the topology is submitted.
StormSubmitter.submitTopology, StormSubmitter takes the following actions:
StormSubmitter uploads the jar if it hasn't been uploaded before.
beginFileUpload returns a path in Nimbus's inboxuploadChunkfinishFileUpload is called when it's finished uploadingStormSubmitter calls submitTopology on the Nimbus thrift interface
submitTopology call takes in the Nimbus inbox path where the jar was uploadedsetup-storm-static writes task -> component mapping into ZKsetup-heartbeats creates a ZK "directory" in which tasks can heartbeatmk-assignment to assign tasks to machines. Assignment contains:
master-code-dir: used by supervisors to download the correct jars/configs for the topology from Nimbustask->node+port: Map from a task id to the worker that task should be running on. (A worker is identified by a node/port pair)node->host: A map from node id to hostname. This is used so workers know which machines to connect to to communicate with other workers. Node ids are used to identify supervisors so that multiple supervisors can be run on one machine. One place this is done is with Mesos integration.task->start-time-secs: Contains a map from task id to the timestamp at which Nimbus launched that task. This is used by Nimbus when monitoring topologies, as tasks are given a longer timeout to heartbeat when they're first launched (the launch timeout is configured by "nimbus.task.launch.secs" config)start-storm writes data into Zookeeper so that the cluster knows the topology is active and can start emitting tuples from spouts.
Supervisor runs two functions in the background:
synchronize-supervisor: This is called whenever assignments in Zookeeper change and also every 10 seconds.
sync-processes: Reads from the LFS what synchronize-supervisor wrote and compares that to what's actually running on the machine. It then starts/stops worker processes as necessary to synchronize.
mk-worker function
storm-active-atom variable. This variable is used by tasks to determine whether or not to call nextTuple on the spouts.
mk-task function
Nimbus monitors the topology during its lifetime
reassign-topology through reassign-transition
reassign-topology calls mk-assignments, the same function used to assign the topology the first time. mk-assignments is also capable of incrementally updating a topology
mk-assignments checks heartbeats and reassigns workers as necessarydo-cleanup function which will clean up the heartbeat dir and the jars/configs stored locally.Note: The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit "acker" bolt added to manage the acking framework (used to guarantee data processing). The implicit topology is created via the system-topology! function.
system-topology! is used in two places:
When a worker dies, the supervisor will restart it. If it continuously fails on startup and is unable to heartbeat to Nimbus, Nimbus will reassign the worker to another machine.
The Nimbus and Supervisor daemons are designed to be fail-fast (process self-destructs whenever any unexpected situation is encountered) and stateless (all state is kept in Zookeeper or on disk). As described in Setting up a Storm cluster, the Nimbus and Supervisor daemons must be run under supervision using a tool like daemontools or monit. So if the Nimbus or Supervisor daemons die, they restart like nothing happened. Most notably, no worker processes are affected by the death of Nimbus or the Supervisors. This is in contrast to Hadoop, where if the JobTracker dies, all the running jobs are lost.
If you lose the Nimbus node, the workers will still continue to function. Additionally, supervisors will continue to restart workers if they die. However, without Nimbus, workers won't be reassigned to other machines when necessary (like if you lose a worker machine). So the answer is that Nimbus is "sort of" a SPOF. In practice, it's not a big deal since nothing catastrophic happens when the Nimbus daemon dies. There are plans to make Nimbus highly available in the future.