Parameter server (PS) based distributed training uses data parallelism to speed up training.
We have implemented synchronous SGD in ElasticDL. When PS accumulates
grads_to_wait gradients from workers, PS averages these gradients and updates
the model with the averaged gradients. PS also maintains
equals to the number of model updates. Each worker has a local copy of the
model. Before a minibatch training step starts, if there is a new
model_version on PS, the worker will get the new model from PS to replace the
local model. After computing the gradients with a minibatch, the worker reports
the gradients to PS together with the local model version. When PS receives
gradients from a worker, it only accepts the gradients with a model version
same as the current PS
This Synchronous SGD ensures model consistency in the price of wasted and blocked computation.
- Wasted computation: when a worker reports gradients with an outdated model version to PS, PS will reject these gradients. The worker have to get the current model from PS, reuse the minibatch data to train the model again.
- Blocked computation: PS has to use a lock for model update with gradients and model read by workers to ensure model consistency.
Asynchronous SGD can avoid the wasted and blocked computation mentioned above with a relaxed model consistency.
- PS will accept all gradients from workers.
- PS does not use locks and supports concurrent model reads and updates.
Let us recall how workers train the model in synchronous SGD. Below is the pseudocode:
for minibatch in training_data: accepted = False while not accepted: local_model，model_version = get_model_from_ps() gradients = compute_gradient(local_model, minibatch) accepted = report_gradient_to_ps(gradients, model_version)
In asynchronous SGD, each worker is training the model in nearly the same way as synchronous SGD. The only difference is that the worker does not need to retrain any minibatch data as PS accepts all gradients.
for minibatch in training_data: local_model, model_version = get_model_from_ps() gradients = compute_gradient(local_model, minibatch) report_gradient_to_ps(gradients, model_version)
PS does not need locks in
ReportGradient GRPC services for
def GetModel(): pb_model = Model() for variable in pb_model: assign_value(variable, current_variable_value_in_PS) return pb_model, PS_model_version def ReportGradient(gradients, version): grad_var = zip(gradients, model_variables) optimizer.apply_gradient(grad_var) PS_model_version.atomic_add(1)
PS can processes multiple GRPC calls
concurrently. Thus, there are two kinds of relaxed model consistency.
GetModel, during the variable assign loop, there may be
ReportGradientGRPC service running and updating the variables. Thus, variables in
local_modelin workers may contain values from different model versions.
get_model_from_psis just a proximate model version.
- There may be multiple
ReportGradientrunning concurrently. Different model variables may apply these gradients in different orders.
Also, the concurrent updates to variables in
ReportGradient may cause some
gradients are not applied, as the updates can be overwritten by other concurrent
running updates. TensorFlow optimizers have an argument
ReportGradient, the argument
version may be smaller than
Staleness value is the difference between
staleness = PS_model_version - version
According to some researches, this staleness affects the training convergence, and large staleness may result in poor training accuracy. The deeper the model, the more impact of the staleness. Some optimizers such as SGD and Adagrad are more robust to staleness, some optimizers such as other with momentum are very bad with staleness.
Staleness-aware asychronous SGD proposes a method to modulate learning rate by the staleness. If the staleness is not 0, this method modulates the learning rate used in the optimizer as:
if staleness > 0: learning_rate_used = learning_rate / staleness else: learning_rate_used = learning_rate
In the pseudocode for the asynchronous SGD worker, the worker pulls model from
PS in every minibatch step. Stale synchronous parallel (SSP)
method uses the strategy that the
fastest worker can exceed the slowest one within a predefined staleness
threshold. SSP can reduce the number of
get_model_from_ps calls. The worker
training process is:
get_model_frequency = predefined_staleness_threshold local_model, model_version = get_model_from_ps() local_update_count = 0 for minibatch in training_data: gradients = compute_gradient(local_model, minibatch) report_gradient_to_ps(gradients, model_version) local_update_count += 1 if local_update_count >= get_model_frequency: local_model, model_version = get_model_from_ps() local_update_count = 0 else: apply_gradient(local_model, gradients)
Although the original SSP method uses this strategy in synchronized SGD, we can
also adopt SSP strategy in asynchronized SGD to reduce
Note that in ElasticDL, local models only have non-embedding variables. So in
apply_gradient(local_model, gradients), ElasticDL workers only update
Also, worker can run
report_gradient_to_ps concurrently with
apply_gradient(local_model, gradients) when it does not need to
- No need to use locks in
- No need to accumulate gradients in
- Users decide if disabling concurrent variable update by set
use_lockingargument in the optimizer.
- To support Staleness-aware asychronous
SGD, PS need to modulate the learning
rate in the optimizer with the staleness value. PS may have multiple threads
running concurrently for model updates with a same optimizer instance. Thus,
we cannot modify the learning rate in the optimizer instance. We may modify
the learning rate as a callable method, and use a thread local storage
threading.local()to store the staleness. The callable method uses the staleness value to modulate the learning rate. The optimizer will call this callable method when it reads the learning rate hyperparameter.
- No need to retrain with the minibatch data.
- To support SSP strategy, the worker pulls the model from PS in every
get_model_frequencyminibatch step. Also, the worker needs to update the local model with the computed gradients. model pull/updates do not include embedding variables, as we directly access the embedding vectors in the embedding service.
--use_async, default=False, help="True for asynchronous SGD, False for synchronous SGD"
--lr_staleness_modulation, default=False, help="If True, master will modulate learning rate with staleness in asynchronous SGD"
--get_model_frequency, default=1, help="worker will get_model from PS every these steps."