Link Search Menu Expand Document

Design for Horovod Based AllReduce

This document describes the design for supporting AllReduce-based distributed training based on Horovod in ElasticDL.

Motivation

We have developed elastic AllReduce based on FTlib in ElasticDL. From the benchmark report, we can find that the performance of FTlib for ResNet50 is worse than Horovod.

FTlib uses the gossip protocol to build consensus, which is not stable as described in this issue.

FTlib uses Gloo to implement elastic AllReduce because the worker process can catch the exception from Gloo and not exit if any collective communications fail. Horovod also can use Gloo as the backend. There are many small parameter tensors in the ResNet50 model. We have to launch an AllReduce operator to synchronize each tensor. It brings a lot of overhead. There are many optimizations like Tensor Fusion in Horovod to reduce the overhead. So, the performance of Horovod for ResNet50 is better than FTlib.

On the Kubernetes cluster, we usually use kubeflow/mpi-operator to submit a Horovod job. kubeflow/mpi-operator is not fault-tolerant and elastic. Horovod has supported elastic training which can scale up and down the number of workers dynamically at runtime. Elastic Horovod needs a shell script to discover worker hosts on the cluster. However, it is difficult for users to use Kubernetes API to discover worker pod hosts. What’s more, data access is a problem for data parallel training it the number of workers changes. Random sampling is a solution that may affect the training accuracy. There is a master process in ElasticDL. The master can get all worker hosts by Kubernetes API and dynamically assign data shards for workers to solve data access for elastic training. So, it is more user-friendly to run an elastic AllReduce-based training job using ElasticDL with Horovod.

The Worker Queries the Master for Rank to Initialize Horovod

When the job starts, the master will create a RendezvousServer, which has a KVStore. The master will query the worker status after the master uses Kubernetes API to launch worker pods. The master will set a worker host plan into the KVStore of RendezvousServer according to running workers. The host plan includes worker hosts and their assigned ranks which are required by Gloo.

import horovod
from horovod.run.http.http_server import  RendezvousServer
from horovod.runner.common.util.hosts import get_host_assignments

hosts = get_worker_hosts()

host_alloc_plan = get_host_assignments(hosts, num_proc)
global_rendezv_port = rendezvous.start()

# Set hosts into KVStore for Gloo
rendezvous.init(host_alloc_plan)

When the worker starts, it will query the master for the rank in the communication world by GRPC. Then, the master will send the rank according to the host plan. The GRPC protobuf to query ranks is

message GetRankRequest {
    int32 worker_id = 1;
}

message GetRankResponse {
    int32 rank_id = 1;
    int32 world_size = 2;
    int32 rendezvous_id = 3;
}

rpc get_comm_rank(ReportVersionRequest) returns (GetRankResponse);

After getting the rank, the worker will set HOROVOD_RANK and HOROVOD_SIZE. The the worker can call hvd.init() to initialize Horovod.

os.environ["HOROVOD_RANK"] = str(rank_id)
os.environ["HOROVOD_SIZE"] = str(size)
hvd.init()

Re-initialize Horovod When the Number of Workers Changes

To support elastic training, when the master detects the number of workers changes, it will create a new host plan according to running workers and put it in the KVStore. Then, the master will add 1 into rendezvous_id. In the Kubernetes cluster, the number of workers may change for the following reasons:

  1. Some workers fail because of preemption.
  2. A worker pod status becomes running.

In the first case, the Horovod AllReduce communicator will raise an exception. The worker can catch the exception and query the master for the new rank to re-initialize Horovod.

In the second case, the worker will query the master periodically. If the worker find the rendezovous_id of reponse is bigger then the current rendezvous_id, the worker will call hvd.init to re-initialize Horovod.

The Worker Averages Gradients Using Horovod

Using TensorFlow eager execution, we can use hvd.DistributedGradientTape to wrap tf.GradientTape to average gradients.

@tf.function
def training_process_with_horovod(self, features, labels):
    with tf.GradientTape() as tape:
        outputs = self._model.call(features, training=True)
        loss = self._loss(labels, outputs)
    tape = hvd.DistributedGradientTape(tape)
    grads = tape.gradient(loss, mnist_model.trainable_variables)
    return loss, grads

If some workers fail, the hvd.DistributedGradientTape will raise a tensorflow.python.framework.errors_impl.UnknownError. We can catch the error and re-initialize the Horovod context if the error contains HorovodAllreduce, HorovodAllgather, or HorovodBroadcast.

def training_process_horovod_fault_tolerance(self, freature, labels)
    from tensorflow.python.framework.errors_impl import UnknownError
    initialize_horovod = False

    hosts_update = query_worker_hosts_updated(master)
    if hosts_updated:
        initialize_horovod = True

    if not initialize_horovod:
        try:
            loss, grads = self.training_process_with_horovod(features, labels)
        except UnknownError as e:
            if ('HorovodAllreduce' in e.message or
                'HorovodAllgather' in e.message or
                'HorovodBroadcast' in e.message):
                initialize_horovod = True

    if initialize_horovod:
        hvd.shutdown()
        hvd.init()

After initializing Horovod, we should broadcast the model in alive workers to the new workers. The master can assign rank 0 to the oldest worker, as it will be used as the broadcast source to synchronize models among workers.

from horovod.tensorflow.functions import broadcast_variables
def _broadcast_model(model, optimizer, backend):
    broadcast_variables(model.variables, root_rank=0)
    broadcast_variables(optimizer.variables(), root_rank=0)