12. Accelerate the Learning with Parallel Training using a multi-GPU Parallel Server

Hands-on description

Have you ever wondered how to train models with millions of parameters that also require large amounts of data? In fact, the size of such models can become so large that they do not even fit in the memory of a single processor+accelerator. That is why training this type of model becomes impossible through using only one GPU and we need something else to support such a computational and memory-intensive task. To do this, distributed training has been proposed in our supercomputing community as one of the solutions to this problem.

In today’s hands-on, we will present how we could paralyze a single Deep Neural Network training over many GPUs in one node of CTE-POWER using tf.distributed.MirroredStrategy() TensorFlow API. We will learn by doing that using a distributed strategy, our model can learn the same but faster, thanks to parallelization!. Let’s go!

1 — Basic concepts

“Methods that scale with computation are the future of Artificial Intelligence”— Rich Sutton, father of reinforcement learning (video 4:49)

Deep Neural Networks (DNN) base their success on building high learning capacity models with millions of parameters that are tuned in a data-driven fashion. These models are trained by processing millions of examples so that the development of more accurate algorithms is usually limited by the throughput of the computing devices on which they are trained. In this case, we try to divide the training into subtasks and run them in parallel in a distributed training fashion.


1.1 Performance metrics: Speedup, Throughput, and Scalability

In order to make the training process faster, we are going to need some performance metrics to measure it. The term performance in these systems has a double interpretation. On the one hand, it refers to the predictive accuracy of the model. On the other, to the computational speed of the process.

Accuracy is independent of the computational resources, and it is the performance metric to compare different DNN models.

In contrast, the computation speed depends on the platform on which the model is deployed. We will measure it by metrics such as Speedup, the ratio of solution time for the sequential algorithms (using one GPU in our hands-on exercises)  versus its parallel counterpart (using many GPUs). This is a prevalent concept in our daily argot in the supercomputing community.

Another important metric is Throughput. In general terms, throughput is the rate of production or the rate at which something is processed;  for example, the number of images per unit time that can be processed. This can give us a good benchmark of performance (although it depends on the neural network type).

Finally, a concept that we usually use is Scalability. It is a more generic concept that refers to the ability of a system to handle a growing amount of work efficiently. These metrics will be highly dependent on the computer cluster configuration, the type of network used, or the framework’s efficiency using the libraries and managing resources.

All these metrics will be used in the following hands-on exercises.

1.2 Parallel computer platforms

The parallel and distributed training approach is broadly used by Deep Learning practitioners. This is because DNNs are compute-intensive, making them similar to traditional supercomputing (high-performance computing, HPC) applications. Thus, large learning workloads perform very well on accelerated systems such as general-purpose graphics processing units (GPU) that have been used in the Supercomputing field.

The main idea behind this computing paradigm is to run tasks in parallel instead of serially, as it would happen in a single machine (or single GPU). Multiple GPUs increase both memory and compute available for training a DNN. In a nutshell, we have several choices, given a minibatch of training data that we want to classify. In the next subsection, we will go into an introduction of the main options.

1.3 Types of parallelism

To achieve the distribution of the training step, there are two principal implementations, and it will depend on the needs of the application to know which one will perform better, or even if a mix of both approaches can increase the performance.

For example, different layers in a Deep Learning model may be trained in parallel on different GPUs. This training procedure is commonly known as Model parallelism. Another approach is Data parallelism, where we use the same model for every execution unit, but train the model in each computing device using different training samples.


1.3.1 Data parallelism

In this mode, the training data is divided into multiple subsets, and each one of them is run on the same replicated model in a different GPU (worker nodes). These will need to synchronize the model parameters (or its “gradients”) at the end of the batch computation to ensure they are training a consistent model (just as if the algorithm run on a single processor) because each device will independently compute the errors between its predictions for its training samples and the labeled outputs (correct values for those training samples). Therefore, each device must send all of its changes to all of the models on all the other devices.

One interesting property of this setting is that it will scale with the amount of data available, and it speeds up the rate at which the entire dataset contributes to the optimization. Also, it requires less communication between nodes, as it benefits from a high amount of computations per weight. On the other hand, the model has to fit on each node entirely, and it is mainly used for speeding the computation of convolutional neural networks with large datasets.

1.3.2 Model parallelism

We could partition the network layers across multiple GPUs (even we could split the work required by individual layers). That is, each GPU takes as input the data flowing into a particular layer, processes data across several subsequent layers in the neural network, and then sends the data to the next GPU.

In this case (also known as Network Parallelism), the model will be segmented into different parts that can run concurrently, and each one will run on the same data in different nodes. This method’s scalability depends on the degree of task parallelization of the algorithm, and it is more complex to implement than the previous one. It may decrease the communication needs, as workers need only to synchronize the shared parameters (usually once for each forward or backward-propagation step) and works well for GPUs in a single server that shares a high-speed bus.

In practice, model parallelism is only used with models that are too large to fit n any single device. In this case, the parallelization of the algorithm is more complex to implement than run the same model in a different node with a subset of data.

1.3.3 Pipeline parallelism

Therefore, a natural approach for really large models is to split the model into layers so that a small consecutive set of layers are grouped into one GPU. However, this approach could lead to under-utilization of resources due to the implementation for running every data batch through multiple GPUs with sequential dependency. A new strategy called Pipeline parallelism is introduced recently.  The pipeline parallelism strategy combines model parallelism with data parallelism to reduce these limitations. The idea behind this approach is to split each mini-batch into several micro-batches and allow each GPU to process one micro-batch simultaneously. Nowadays, we have different approaches to pipeline parallelism depending on the inter-communication and the gradient aggregation used, however this is out of scope of this course. 

Task 1:

Summarizes the main advantages and disadvantages between the data parallelism and model parallelism approach.

In this hands-on, we will focus on the Data Parallelism approach.

2 — Concurrency in data parallelism training

In distributed/parallel environments, there may be multiple instances of stochastic gradient descent (SGD) running independently. Thus, to parallelise the SGD training algorithm, the overall algorithm must be adapted and consider different model consistency or parameters distribution issues.

The SGD algorithm is an iterative algorithm that involves multiple rounds of training, where the results of each round are incorporated into the model in preparation for the next round. The rounds can be run on multiple devices, either synchronously or asynchronously.

Each SGD iteration runs on a mini-batch of training samples. In synchronous training, all the devices train their local model using different parts of data from a single (large) mini-batch. They then communicate their locally calculated gradients (directly or indirectly) to all devices. Only after all devices have successfully computed and sent their gradients the model is updated. The updated model is then sent to all nodes along with splits from the next mini-batch.

The synchronous training has the obvious drawback that the update time is dependent on the slowest worker.

In asynchronous training, no device waits for updates to the model from any other device. The devices can run independently and share results as peers or communicate through one or more central servers known as “parameter servers”. In the peer architecture, each device runs a loop that reads data, computes the gradients, sends them (directly or indirectly) to all devices, and updates the model to the latest version.

While asynchronous training solves the bottleneck introduced by the slowest worker in synchronous training, it suffers from the problem known as the delayed gradient that can create delay convergence in the training process.  This is encountered when the slow worker pushes its gradients to the global model and the model is already updated by other workers. However, different techniques have been proposed to solve the problem of the delayed gradient.

In practical implementations, the approaches are synchronous for up to 32–50 nodes and asynchronous for larger clusters and heterogeneous environments, according to this survey from ETH (Zurich). In this hands-on exercise, we will focus on a synchronous training approach.

Because the updated global model is broadcast to all workers, techniques common in supercomputing collective communication are highly relevant for model gradient update and propagation. We will see it later.

For synchronous training, we can choose between two main schemes: centralized or decentralized. The choice between designing a centralized and a decentralized scheme for training depends on multiple factors, including the network topology, bandwidth, communication latency, parameter update frequency, or desired fault tolerance.

The centralized scheme would typically include a so-called Parameter Server strategy. The parameter server framework strategy, which distributes data and model parameters across multiple workers to spread the workload, is widely adopted as an efficient solution to scale the SGD training algorithms. 

(Image by author)

When parallel SGD uses parameter servers, the algorithm starts by broadcasting the model to the workers (servers). Each worker reads its own split from the mini-batch in each training iteration, computing its own gradients, and sending those gradients to one or more parameter servers. The parameter servers aggregate all the gradients from the workers and wait until all workers have completed them before they calculate the new model for the next iteration, which is then broadcasted to all workers.

The bottleneck created by sending data to a single parameter server in all-to-one reduction can be alleviated with ring-all-reduce architecture to communicate parameter updates among the nodes; based on the all-reduce introduced in this course (MPI topic).

In the ring-all-reduce architecture, there is no central server that aggregates gradients from workers. Instead, in a training iteration, each worker reads its own split for a mini-batch, calculates its gradients, sends its gradients to its successor neighbour on the ring, and receives gradients from its predecessor neighbour on the ring (each worker is assigned two neighbouring workers).

(Image by author)

Task 2:

(a) Define in one paragraph the main differences between synchronous versus asynchronous training approaches. (b) How can we alleviate the bottleneck created by using a central (single) parameter server scheme?

(Optional) If you are interested in delving into this topic, you can continue reading this post.

But luckily there are software libraries, known as DL frameworks, that facilitate this parallelization or distribution that we saw in the previous section.

We can use frameworks as TensorFlow of Pytorch to program multi-GPU training in one server. To parallelize the training of the model, you only need to wrap the model with torch.nn.parallel.DistributedDataParallelin PyTorch and with tf.distribute.MirroredStrategyin TensorFlow.

However, the number of GPUs that we can place in a server is very limited, and the solution goes through putting many of these servers together, as we did at the BSC, with the CTE-POWER supercomputer, where 54 servers are linked together with an InfiniBand network on optical fiber.

In this new scenario, it is needed an extension of the software stack to deal with multiple distributed GPUs in the neural network training process. There are many options, as Horovod and Ray. Both plug into TensorFlow or PyTorch. However, in this post, we will consider only 1 parallel server that includes 4 GPUs. Let’s see how we can take advantage of a server with 4 GPUs.

The code used in this post is based on the GitHub https://github.com/jorditorresBCN/Fundamentals-DL-CTE-POWER

3 — Parallel training with TensorFlow

If our server/node has more than one GPU, in TensorFlow the GPU with the lowest ID will be selected by default. However, TensorFlow does not place operations into multiple GPUs automatically and we need to add some code using a specific API.

3.1 TensorFlow for multiple GPUs

tf.distribute.Strategy is a TensorFlow API to distribute training across multiple GPU or TPUs with minimal code changes (from the sequential version presented in the previous post). This API can be used with a high-level API like Keras, and can also be used to distribute custom training loops.

tf.distribute.Strategy intends to cover a number of distribution strategies use cases along different axes. The official web page of this feature presents all the currently supported combinations, however, in this post we will focus our attention on tf.distribute.MirroredStrategy one of the strategies included in tf.distribute.Strategy.

3.2 “MirroredStrategy”

We will use  tf.distribute.MirroredStrategy , in this hands-on exercise, that supports the training process on multiple GPUs (multiple devices) on one server (single host). It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. These variables are kept in sync with each other by applying identical updates.

Let’s assume we are on a single machine that has multiple GPUs and we want to use more than one GPUs for training. We can accomplish this by creating our MirroredStrategy:

mirrored_strategy = tf.distribute.MirroredStrategy()

This will create a MirroredStrategy instance that will use all the GPUs visible to TensorFlow. It is possible to see the list of available GPU devices doing the following:

devices = tf.config.experimental.list_physical_devices(“GPU”)

It is also possible to use a subset of the available GPUs in the system by doing the following:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

We then need to declare our model architecture and compile it within the scope of the MirroredStrategy. To build the model and compile it inside the MirroredStrategyscope we can do it in the following way:

with mirrored_strategy.scope():
     model = tf.keras.applications.resnet_v2.ResNet50V2(
             include_top=True, weights=None, 
             input_shape=(128, 128, 3), classes=10)
     opt = tf.keras.optimizers.SGD(learning_rate)
                   optimizer=opt, metrics=[‘accuracy’])

This allows us to create distributed variables instead of regular variables: each variable is mirrored across all the replicas and is kept in sync with each other by applying identical updates. It is important during the coding phase, that the creation of variables should be under the strategy scope. In general, this is only during the model construction step and the compile step. Training can be done as usual outside the strategy scope with:

dataset = load_data(batch_size)
model.fit(dataset, epochs=5, verbose=2)

3.3 Centralized synchronous distributed training

Centralized synchronous distributed training supported by tf.distribute.MirroredStrategy , means that the state of the per-GPU model replicas stays the same at all times. When the code opens a MirroredStrategy scope and builds a model within it, the MirroredStrategy object will create one model copy (replica) on each available GPU.

MirroredStrategy schema for 2 GPUs (image source)


Task 3:

Summarize what means to use the MirroredStrategyfor training a DL network in terms of parameters distribution and model consistency.

4 — Case study

We will use the same case study presented in the previous hands-on exercise (that classifies the CIFAR10 dataset using the ResNet50 neural network).

4.1 Parallel performance measurement

In this hands-on exercise, we will consider the epoch time as a measure of the computation time for training a Distributed Neural Network (DNN). This approximated measure in seconds, provided by Keras by the .fit method, is enough for the purpose of this academic exercise. In our case, we suggest discarding the first-time epoch as it includes creating and initializing structures. Obviously, for certain types of performance studies, it is necessary to go into more detail, differentiating the loading data, feeds forward time, loss function time, backpropagation time, etc., but it falls outside the scope of this case study we propose in this hands-on exercise.

4.2 Choose the Batch Size and Learning Rate

When training, it is required to allocate memory to store samples for training the model and the model itself. We have to keep in mind this in order to avoid an out-of-memory error.

Remember that the batch_size is the number of samples that the model will see at each training step, and in general, we want to have this number as biggest as possible (powers of 2). We can calculate it by try and error approach testing different values until an error related to the memory capacity appears:

python ResNet50.py -- epoch 1 -- batch_size 16
python ResNet50.py -- epoch 1 -- batch_size 32
python ResNet50.py -- epoch 1 -- batch_size 64

When using MirroredStrategy with multiple GPUs, the batch size indicated is divided by the number of replicas. Therefore the batch_size that we should specify to TensorFlow is equal to the maximum value for one GPU multiplied by the number of GPUs we are using. This is, in our example, use these flags in the python program:

python ResNet50.py -- epochs 5 -- batch_size 256  -- n_gpus 1
python ResNet50.py -- epochs 5 -- batch_size 512  -- n_gpus 2
python ResNet50.py -- epochs 5 -- batch_size 1024 -- n_gpus 4

Accordingly, with the batch_size, if we are using MirroredStrategy with multiple GPUs, we change the learning_rateto learning_rate*num_GPUs:

learning_rate = learning_rate_base*number_of_gpus
opt = tf.keras.optimizers.SGD(learning_rate)

We do this update of the learning_rate, the researchers say that because of a larger batch_size,  we can also take bigger steps in the direction of the minimum to preserve the number of epochs to converge.

5—Parallelization of the training step

In this section, we will show how we can parallelize the training step on the CTE-POWER cluster.

5.1 Parallel code for ResNet50 neural network

Following the steps presented in the above section on how to apply MirroredStrategy, below we present the resulting parallel code for the ResNet50:

import tensorflow as tf 
from tensorflow.keras import layers 
from tensorflow.keras import models
import numpy as np
import argparse
import time
import sys
from cifar import load_cifar
parser = argparse.ArgumentParser()
parser.add_argument(‘ -- epochs’, type=int, default=5)
parser.add_argument(‘ -- batch_size’, type=int, default=2048)
parser.add_argument(‘ -- n_gpus’, type=int, default=1)
args = parser.parse_args()
batch_size = args.batch_size
epochs = args.epochs
n_gpus = args.n_gpus
train_ds, test_ds = load_cifar(batch_size)
device_type = ‘GPU’
devices = tf.config.experimental.list_physical_devices(
devices_names = [d.name.split(“e:”)[1] for d in devices]
strategy = tf.distribute.MirroredStrategy(
with strategy.scope():
     model = tf.keras.applications.resnet_v2.ResNet50V2(
             include_top=True, weights=None,
             input_shape=(128, 128, 3), classes=10)
     opt = tf.keras.optimizers.SGD(0.01*n_gpus)
                   optimizer=opt, metrics=[‘accuracy’])
model.fit(train_ds, epochs=epochs, verbose=2)

5.2 SLURM script 

The SLURM script that allocates resources and executes the model for different numbers of GPUs can be as the following one (ResNet50.sh):

#SBATCH --job-name=”ResNet50"
#SBATCH --output=ResNet50_%j.output
#SBATCH --error=ResNet50_%j.err
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=160
#SBATCH --gres=gpu:4
#SBATCH --time=00:30:00
module purge; module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python ResNet50.py -- epochs 5 -- batch_size 256 -- n_gpus 1
python ResNet50.py -- epochs 5 -- batch_size 512 -- n_gpus 2
python ResNet50.py -- epochs 5 -- batch_size 1024 -- n_gpus 4

If we use the same SLURM script for all three executions, pay attention to indicating the maximum number of GPUs required with --gres=gpu:4 .

Some important note from support@bsc.es especifically for CTE-POWER:

If you want GPUs <=2, you need to indicate “#SBATCH -c ” <=80.

If you want more than 2 GPUs, then you need to ask at least  “#SBATCH -c 81”
(which are actually 120 because if you order 3 GPUs, there are 40 cores for each GPU).

I’ll break it down a bit more to make it clearer:

number of GPU=1 –> 40 <= number of cores<=80

number of GPU=2 –> number of cores = 80

number of GPU=3 –> 120 <= number of cores <=160

number of GPU=4 –> number of cores = 160

Once we run the script, in the file that has stored the standard output we find the following execution times:

python ResNet50.py --epochs 5 --batch_size 256 --n_gpus 1 
Epoch 1/5 
196/196 - 49s - loss: 2.0408 - accuracy: 0.2506
Epoch 2/5
196/196 - 45s - loss: 1.7626 - accuracy: 0.3536
Epoch 3/5
196/196 - 45s - loss: 1.5863 - accuracy: 0.4164
Epoch 4/5
196/196 - 45s - loss: 1.4550 - accuracy: 0.4668
Epoch 5/5
196/196 - 45s - loss: 1.3539 - accuracy: 0.5070
python ResNet50.py --epochs 5 --batch_size 512 --n_gpus 2
Epoch 1/5
98/98 - 26s - loss: 2.0314 - accuracy: 0.2498
Epoch 2/5
98/98 - 24s - loss: 1.7187 - accuracy: 0.3641
Epoch 3/5
98/98 - 24s - loss: 1.5731 - accuracy: 0.4207
Epoch 4/5
98/98 - 24s - loss: 1.4543 - accuracy: 0.4686
Epoch 5/5
98/98 - 24s - loss: 1.3609 - accuracy: 0.5049
python ResNet50.py --epochs 5 --batch_size 1024 --n_gpus 4 
Epoch 1/5
49/49 - 14s - loss: 2.0557 - accuracy: 0.2409
Epoch 2/5
49/49 - 12s - loss: 1.7348 - accuracy: 0.3577
Epoch 3/5
49/49 - 12s - loss: 1.5696 - accuracy: 0.4180
Epoch 4/5
49/49 - 12s - loss: 1.4609 - accuracy: 0.4625
Epoch 5/5
49/49 - 12s - loss: 1.3689 - accuracy: 0.5010

It is important to note that we center our interest on the computational speed of the process rather than the model’s accuracy. For this reason, we will only execute a few epochs during the training; due as we can see, training times per epoch are constant (approx.), and with 5 epochs for each experiment, we achieve the same accuracy in all three cases. That means that only 5 epochs allow for comparing the three options.

1 GPU: 45 seconds
2 GPU: 24 seconds
4 GPU: 12 seconds

It is obvious that if we want to do a deep/real performance analysis, we should carry out several tests and then take the averages of the times obtained in each one of them. But given the academic purpose of this exercise (and the cost of supercomputing resources offered by BSC, which we have to save!), it is enough only one execution.


Task 4:

Determine the maximum batch_size that we can use for training, following the previous approach presented in section 3.2. Explain the steps done and include in your answer the relevant information in the .err file. 

Task 5:

Execute the parallel ResNet50.py program for 1,2, and 4 GPUs in CTE-POWER cluster using your SLURM job script with the results obtained in the previous Task (  ResNet50.sh presented in GitHub can help to write your code). Inspect the .out and .err files.  Include in the answer the codes  ResNet50.py , ResNet50.sh  and the relevant part of the  .out and .err files that justify your results.

(*) Warning with the learning_rate  requirements presented in section 4.2 of this hands-on.

6—Analysis of the results

It is time to analyze the results obtained in Task 4. We expect that on average the time required for doing one epoch should be similar to the values shown  in the following plot (results of Task 4 done by the teacher):

We can translate this to images per second (since we know there are 50,000 images), which gives us the throughput:

Finally, as I said, the speedup is the most relevant metric:

It’s an almost linear speedup!. Remember that we refer to linear speedup when the workload is equally divided between the number of GPUs.

As we can see in the results, the accuracy achieved by our model is more or less constant independently of the number of GPUs. In summary, using a distributed strategy, our model can learn the same but faster, thanks to parallelization!

Task 6:

Obtain the data from your  .out file and generate the plots of epoch time, image/second, and speedup for your ResNET50V2 classifier. Include in the answer the plots (choose your preferred tool to generate plots) and the relevant part of the  .out and .err files that justify your results if it was not included in the previous Task 4.

7 —Parallelization of the ResNet152V2 neural network

Now it’s your turn to get your hands really dirty and reproduce the above results for an new neural network, the ResNET152V2 classifier.


Task 7:

Create a new .py  to do the parallelization experiments with the ResNET152V2new classifier. Include the file .pyin the answer to this Task.

Remember that the first step is to find the best batch_size.

Task 8:

Determine the maximum batch_size that we can use for training this classifier (following the approach presented in section 3.2). Include in your answer the relevant information from in the .err file. 

Task 9:

Execute the parallel ResNET152V2.py program for 1,2, and 4 GPUs in CTE-POWER cluster using your job script ResNET152V2.sh based on the previously  ResNet50.sh used. Inspect the .out and .err files.  Include in the answer the codes ResNET152V2.py , ResNET152V2.shused  and the relevant part of the  .out and .err files that demonstrate your results.

(*) Warning with the learning_rate  requirements presented in section 4.2.

Task 10:

Generate the plots of epoch time, image/second, and Speedup and compare them with those presented in the previous section. Include in the answer the relevant part of the  .out and .err files that justify your results (if not included in Task 9).

As a hint, if you plot the results of both case studies together, you should find results comparable to those shown below (my executions):

A couple of relevant things are observed. The first is that the time to run a ResNet152 epoch is much longer, and therefore the throughput in images per second is much lower than on the ResNet50 network. Why is this happening? ResNet152 network is deeper, meaning that it has more layers, therefore the training time will be higher.

It can be seen that the speedup for the ResNet152 is no longer linear; could you try to give a possible answer as to why this is happening? Obviously, it depends on many things, and it is required a detailed analysis; however, due to the biggest size of the network, it adds additional latency for synchronization.

Hands-on Report

Task 11:

Write a report for this hands-on exercise that includes all the tasks detailing the steps that are done, the code used, and the results. Once finished, generate a PDF version and submit it to the inbox “exercise 14” at “racó” intranet.

Acknowledgment: Many thanks to Juan Luis Domínguez and Oriol Aranda, who wrote the first version of the codes that appear in this hands-on, and to Carlos Tripiana and Félix Ramos for the essential support using the CTE-POWER cluster. Also, many thanks to Alvaro Jover Alvarez, Miquel Escobar Castells, and Raul Garcia Fuentes for their contributions to the proofreading of previous versions of this post.