ElasticDL Worker Training Performance Optimization
This document describes the design for performance optimization for ElasticDL worker training process.
Current worker training pipeline (@July 24, 2019)
while True:
task = get_task()
if no task, break
while True:
minibatch = get_batch()
if no batch, break
features, labels = input_fn(minibatch)
get_model()
if task is training:
loss = compute_loss(features, labels)
report_gradient(loss)
elif task is evaluation:
metrics = compute_metrics(features, labels)
report_evaluation_metrics(metrics)
elif task is prediction:
predict = compute_predict(features)
report_prediction_outputs(predict)
report_task_result()
There are 4 types of ElasticDL jobs:
- Training-only
- Training-with-evaluation
- Evaluation-only
- Prediction-only
Test setting
Here, we will concentrate on the performance of the training-only case.
All tests are done with CIFAR10 training data. The timing-related arguments used in the test are:
--num_epochs=1
--master_resource_request="cpu=2,memory=2048Mi,ephemeral-storage=5000Mi"
--worker_resource_request="cpu=4,memory=2048Mi,ephemeral-storage=5000Mi,gpu=1"
--minibatch_size=128
--num_minibatches_per_task=32
--num_workers=1
--grads_to_wait=1
--model_def=cifar10_functional_api.cifar10_functional_api.custom_model
Current perf for training-only job
The total training time and some break-down timing (in seconds) are below:
Total Time | get_batch | input_fn | compute_loss | get_model | report_gradient |
---|---|---|---|---|---|
51.0 | 6.7 | 17.9 | 14.8 | 5.0 | 5.0 |
(6.7 + 17.9) / 51.0 = 48% time is spent on training data read and preprocessing
(get_batch
+ input_fn
).
14.8 / 51.0 = 29% time is on model compute (compute_loss
).
(5.0 + 5.0) / 51.0 = 20% time is on PS related ops (get_model
+
report_gradient
)
Optimization Strategy
Data Read and Preprocessing
Data Read and preprocessing (get_batch
and input_fn
) are serialized with
the model compute and PS-related ops within one thread. They can be processed
asynchronously and in parallel, so that their processing time is overlapping
with the other parts of the training pipeline.
We can use TensorFlow Dataset for its asynchronization and C++ level multithreading.
One way is to generate a dataset for each task after get_task
. But the
dataset will get stuck whenever get_task
is needed, and there is overhead in
creating a new dataset.
A better way is to use a shared dataset, which will call get_task
to get new
training data when it runs out the data.
The user needs to provide dataset_fn
instead of input_fn
. dataset_fn
would take a RecordIO dataset as input, decode and preprocessing the data as
needed.
For example, instead of:
def input_fn(records):
feature_description = {
"image": tf.io.FixedLenFeature([32, 32, 3], tf.float32),
"label": tf.io.FixedLenFeature([1], tf.int64),
}
image_list = []
label_list = []
for r in records:
# deserialization
r = tf.io.parse_single_example(r, feature_description)
label = r["label"].numpy()
image = r["image"].numpy()
# processing data
image = image.astype(np.float32)
image /= 255
label = label.astype(np.int32)
image_list.append(image)
label_list.append(label)
# batching
batch_size = len(image_list)
images = np.concatenate(image_list, axis=0)
images = np.reshape(images, (batch_size, 32, 32, 3))
images = tf.convert_to_tensor(value=images)
labels = np.array(label_list)
return ({"image": images}, labels)
Define dataset_fn
instead:
def dataset_fn(dataset):
def _parse_data(record):
feature_description = {
"image": tf.io.FixedLenFeature([32, 32, 3], tf.float32),
"label": tf.io.FixedLenFeature([1], tf.int64),
}
r = tf.io.parse_single_example(record, feature_description)
label = r["label"]
image = r["image"]
return image, label
dataset = dataset.map(_parse_data)
dataset = dataset.map(
lambda x, y: (
{"image": tf.math.divide(tf.cast(x, tf.float32), 255.0)},
y,
)
)
return dataset
Implementation
For training-only, we can use a shared dataset as below:
def data_generator(self):
task = get_task()
if no task, break
for r in recordio_data(task):
yield r
# Create recordio dataset
recordio_dataset = tf.data.Dataset.from_generator(
data_generator, (tf.string), (tf.TensorShape([]))
# apply dataset_fn
dataset = dataset_fn(recordio_dataset)
# batching
dataset = dataset.batch(minibatch)
# training loop
for d in dataset:
features = d[0]
labels = d[1]
get_model()
loss = compute_loss(features, labels)
report_gradient(loss)
if a task finishs:
report_task_result()
To support the full functionalities of the worker (training-only, training-with-evaluation, evaluation-only, predict-only), we need to keep track of the pending tasks to get:
- the current task type(training, evaluation, or predict).
- when a task finishes, so we can
report_task_result
.
Roadmap for this implementaion
- Add prerequisite:
- The master knows the type of the job from user provided arguments. Add an extra argument for the job type in worker initialization.
- Add
dataset_fn
in model_zoo examples.
- Support training-only with a shared dataset.
- If the job type is not training-only, keep use the legacy code.
- Add evaluation-only, predict-only support.
- Support training-with-evaluation.
- remove the legacy code, including
input_fn
.
Model Compute
The TensorFlow eager mode comes at the expense of performance compared with the
graph mode.
tf.function
can be used to accelerate the model compute parts(compute_loss
,
compute_metrics
, compute_predict
).
Define compute_loss
, compute_metrics
, compute_predict
as
tf.function
-decorated functions.
Expected perf after the optimization
The timing below is from a prototype testing.
Total Time | compute_loss |
get_model |
report_gradient |
---|---|---|---|
23.8 | 4.2 | 5.4 | 7.6 |