Practical Introduction to Programming Deep Learning on a Supercomputing


Hands-on about how to scale a Deep Learning application in the BSC’s CTE-Power cluster as an activity of the ACM Summer School

In the first part of this seminar, we introduced that supercomputers are a key component of the progress of Artificial Intelligence and what drove changes in effective compute over the last years was the increased parallelization and distribution of the algorithms.

This post will show, for those attending the second part of this practical seminar, how these supercomputers can be used; specifically, the BSC’s CTE-POWER cluster, in that each server has two CPUs IBM Power9 and four NVIDIA V100 GPUs. At the end of this hands-on, the student will know how to allocate resources to train in parallel a neural network.

6 — BSC’s CTE-POWER Cluster

6.1 System Overview

CTE-POWER is a cluster-based on IBM Power9 processors, with a Linux Operating System and an Infiniband interconnection network. CTE-POWER has 54 compute servers, each of them:

  • 2 x IBM Power9 8335-GTG @ 3.00GHz (20 cores and 4 threads/core, total 160 threads per node)
  • 512GB of main memory distributed in 16 DIMMs x 32GB @ 2666MHz
  • 2 x SSD 1.9TB as local storage
  • 2 x 3.2TB NVME
  • 4 x GPU NVIDIA V100 (Volta) with 16GB HBM2.
  • Single Port Mellanox EDR
  • GPFS via one fiber link 10 GBit
  • The operating system is Red Hat Enterprise Linux Server 7.4.

CTE-Power9 computer server (Image from bsc.es)

Supercomputer Marenostrum — POWER-CTE cluster at Barcelona Supercomputing Center (Image by author)

 

More details of its characteristics can be found in the CTE-POWER user’s guide and also in the information of the manufacturer of the AC922 servers.

The allocation of resources from the cluster for the execution of our code will start with a ssh login in the cluster using one of the login nodes using your account:

ssh -X nct01xxx@plogin1.bsc.es

Task 1:

Once you have a login username and its associated password, you can get into the cluster.


6.2 Warm-up example: MNIST classification


Task 2:

Write your MNIST classifier program with the file name MNIST.py  .


For convenience, we will consider the same neural network that we used to classify MNIST digits in the previous part programmed previously in the Jupyter notebook:

Note: For the following code lines, beware at COPY&PASTE!. Some symbols are “converted” by the HTML tranlator into non-standard. If a command does not work properly, repeat it by typing it.

import tensorflow as tf 
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
print(tf.__version__)
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
model = Sequential()
model.add(Conv2D(32, (5, 5), activation=’relu’, 
          input_shape=(28, 28, 1)))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (5, 5), activation=’relu’))
model.add(MaxPooling2D((2, 2)))
model.add(Flatten())
model.add(Dense(10, activation=’softmax’))
model.summary()
from keras.utils import to_categorical
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data(path=’/gpfs/projects/nct00/nct00002/basics-utils/mnist.npz’)
train_images = train_images.reshape((60000, 28, 28, 1))
train_images = train_images.astype(‘float32’) / 255
test_images = test_images.reshape((10000, 28, 28, 1))
test_images = test_images.astype(‘float32’) / 255
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)
model.compile(loss=’categorical_crossentropy’,
              optimizer=’sgd’,
              metrics=[‘accuracy’])
model.fit(train_images, train_labels, batch_size=100, 
          epochs=5, verbose=1)
test_loss, test_acc = model.evaluate(test_images, test_labels)
print(‘Test accuracy:’, test_acc)

This will be the code MNIST.py (available at GitHub), which we will use as a first case study to show how to launch programs in the CTE-POWER supercomputing.

We will use the TensorFlow framework; however, the code in PyTorch code doesn’t differ too much. We will use the Keras API because since the release of Tensorflow 2.0, tf.keras.Model API has become the primary way of building neural networks, particularly those not requiring custom training loops.

6.3 Software stack required for Deep Learning

It is important to remember that we need to load all the modules that build our application’s software stack environment. At CTE-Power9 supercomputer, it can be done with the command module load before running the corresponding .py code.

In our case study, we need the following modules that include the required libraries:

module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python MNIST.py

If we want to detach the standard outputs and the standard error messages, we can add this argument 2>err.txt:

python MNIST.py 2>err.txt

Redirecting the standard error allows us to see the result of the training that gives us the Keras by the standard output without the information related to the execution environment:

Epoch 1/5
600/600 [======] - 2s 3ms/step - loss: 0.9553 - accuracy: 0.7612
Epoch 2/5
600/600 [======] - 1s 2ms/step - loss: 0.2631 - accuracy: 0.9235
Epoch 3/5
600/600 [======] - 2s 3ms/step - loss: 0.1904 - accuracy: 0.9446
Epoch 4/5
600/600 [======] - 2s 3ms/step - loss: 0.1528 - accuracy: 0.9555
Epoch 5/5
600/600 [======] - 2s 3ms/step - loss: 0.1288 - accuracy: 0.9629
313/313 [======] - 1s 2ms/step - loss: 0.1096 - accuracy: 0.9671
Test accuracy: 0.9671000242233276

 


Task 3:

Launch your MNIST.py  sequential program in the CTE-POWER supercomputing (detach the standard outputs and the standard error messages).


 

Well, our code is executed in the login node shared with other jobs from users that are trying to submit jobs to the SLURM system, but what we really need is to allocate resources for our code. How can we do it?

6.4 How to allocate computing resources with SLURM

To run a code in CTE-POWER we use a SLURM workload manager. An excellent Quick Start User Guide can be found here. We have two ways to use it: sbatch and salloc commands.

The method for submitting jobs that we will center our today hands-on will be using the SLURM sbatchcommand directly. sbatch submits a batch script to Slurm. The batch script may be given sbatch through a file name on the command line (.sh file). The batch script may contain options preceded with #SBATCH before any executable commands in the script. sbatch will stop processing further #SBATCH directives once the first non-comment or non-whitespace line has been reached in the script.

sbatch exits immediately after the script is successfully transferred to the SLURM controller and assigned a Slurm job ID. The batch script is not necessarily granted resources immediately, it may sit in the queue of pending jobs for some time before its required resources become available.

By default, both standard output and standard error are directed to the files indicated by:

#SBATCH --output=MNIST_%j.out
#SBATCH --error=MNIST_%j.err

where the “%j” is replaced with the job allocation number. The file will be generated on the first node of the job allocation. When the job allocation is finally granted for the batch script, Slurm runs a single copy of the batch script on the first node in the set of allocated nodes.

An example of a job script that allocates a node with 1 GPU for our case study looks like this (MNIST.sh ):

#!/bin/bash
#SBATCH --job-name="MNIST"
#SBATCH -D .
#SBATCH --output=MNIST_%j.out
#SBATCH --error=MNIST_%j.err
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=40
#SBATCH --gres=gpu:1
#SBATCH --time=00:10:00
module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python MNIST.py

You can consult this official page documentation to know all the options we can use in the batch script preceded with#SBATCH.

These are the basic directives to submit and monitor jobs with SLURM that we will use in our case study:

  • sbatch <job_script> submits a job script to the queue system.
  • squeue shows all the submitted jobs with their <job_id>.
  • scancel <job_id> remove the job from the queue system, canceling the execution of the processes, if they were still running.

An alternative to run a job in SLURM is using the salloc command. It is used to allocate resources for a job in real-time. Typically this is used to allocate resources and spawn a shell. The shell is then used to execute srun commands to launch parallel tasks. More details of its use can be found here in another post of mine.

In summary, this can be an example of a sequence of command lines, and the expected output of their execution will be:

[CTE-login-node ~]$ sbatch MNIST.sh
Submitted batch job 4910352
[CTE-login-node ~]$ squeue
JOBID    PARTITION  NAME    USER    ST TIME  NODES  NODELIST
4910352  main       MNIST   userid  R  0:01  1      p9r1n16
[CTE-login-node ~]$ ls
MNIST.py
MNIST.sh
MNIST_4910352.err
MNIST_4910352.out

The standard output and standard error are directed to the files MNIST_4910355.out and MNIST_4910355.err, respectively. Here, the number 4910352indicates the job id assigned to the job by SLURM.

6.5 Resource reservation for ACM Summer School 2021

BSC has made a special reservation of supercomputer nodes to be used by this course. For this, you must add this line in the SLURM script:

#SBATCH --reservation=SMMRSCHL

Task 4:

Execute your MNIST.py program with the SLURM workload manager system using a job script that allocates a node with 1 GPU in CTE-POWER. Inspect the .out and .err files obtained.


 

7 — Basic concepts in distributed and parallel training Deep Learning

“Methods that scale with computation are the future of Artificial Intelligence”— Rich Sutton, father of reinforcement learning (video 4:49)

Deep Neural Networks (DNN) base their success on building high learning capacity models with millions of parameters that are tuned in a data-driven fashion. These models are trained by processing millions of examples so that the development of more accurate algorithms is usually limited by the throughput of the computing devices on which they are trained.

7.1 Performance metrics: Speedup, Throughput, and Scalability

In order to make the training process faster, we are going to need some performance metrics to measure it. The term performance in these systems has a double interpretation. On the one hand, it refers to the predictive accuracy of the model. On the other, to the computational speed of the process.

The Accuracy is independent of the platform, and it is the performance metric to compare different models. In contrast, the computation speed depends on the platform on which the model is deployed. We will measure it by metrics such as Speedup, the ratio of solution time for the sequential (or 1 GPU) algorithms versus its parallel counterpart (with many GPUs). This is a prevalent concept in our daily argot in the Supercomputing community.

Another important metric is Throughput. In general terms, throughput is the rate of production or the rate at which something is processed;  for example, the number of images per unit time that can be processed. This can give us a good benchmark of performance (although it depends on the network type).

Finally, a concept that we usually use is Scalability. It is a more generic concept that refers to the ability of a system to handle a growing amount of work efficiently. These metrics will be highly dependent on the cluster configuration, the type of network used, or the framework’s efficiency using the libraries and managing resources. We will see an example of use later.

7.2 Parallel computer platforms

The parallel and distributed training approach is broadly used by Deep Learning practitioners. The main idea behind this computing paradigm is to run tasks in parallel instead of serially, as it would happen in a single machine.

DNNs are often compute-intensive, making them similar to traditional supercomputing (high-performance computing, HPC) applications. Thus, large learning workloads perform very well on accelerated systems such as general-purpose graphics processing units (GPU) that have been used in the Supercomputing field.

Multiple GPUs increase both memory and compute available for training a DNN. In a nutshell, we have several choices, given a minibatch of training data that we want to classify. In the next subsection, we will go into more detail about this.

7.3 Types of parallelism

To achieve the distribution of the training step, there are two principal implementations, and it will depend on the needs of the application to know which one will perform better, or even if a mix of both approaches can increase the performance.

For example, different layers in a Deep Learning model may be trained in parallel on different GPUs. This training procedure is commonly known as Model parallelism. Another approach is Data parallelism, where we use the same model for every execution unit, but train the model in each computing device using different training samples.

 

7.3.1 Data parallelism

In this mode, the training data is divided into multiple subsets, and each one of them is run on the same replicated model in a different GPU (worker nodes). These will need to synchronize the model parameters (or its “gradients”) at the end of the batch computation to ensure they are training a consistent model (just as if the algorithm run on a single processor) because each device will independently compute the errors between its predictions for its training samples and the labeled outputs (correct values for those training samples). Therefore, each device must send all of its changes to all of the models on all the other devices.

One interesting property of this setting is that it will scale with the amount of data available, and it speeds up the rate at which the entire dataset contributes to the optimization. Also, it requires less communication between nodes, as it benefits from a high amount of computations per weight. On the other hand, the model has to fit on each node entirely, and it is mainly used for speeding the computation of convolutional neural networks with large datasets.

7.3.2 Model parallelism

We could partition the network layers across multiple GPUs (even we could split the work required by individual layers). That is, each GPU takes as input the data flowing into a particular layer, processes data across several subsequent layers in the neural network, and then sends the data to the next GPU.

In this case (also known as Network Parallelism), the model will be segmented into different parts that can run concurrently, and each one will run on the same data in different nodes. This method’s scalability depends on the degree of task parallelization of the algorithm, and it is more complex to implement than the previous one. It may decrease the communication needs, as workers need only to synchronize the shared parameters (usually once for each forward or backward-propagation step) and works well for GPUs in a single server that shares a high-speed bus.

In practice, model parallelism is only used with models that are too large to fit n any single device. In this case, the parallelization of the algorithm is more complex to implement than run the same model in a different node with a subset of data.

In this hands-on, we will focus on the Data Parallelism approach and we won’t cover model parallelism.

8— Case study: Image classification problem using a neural network model

In this section, we will explain how to scale a “real” neural network as a classifier (ResNet50V2 with 25,613,800 parameters). As a dataset, we will use the popular CIFAR10 dataset. We will start training this classifier sequentially and later we will parallelize it over many GPUs.

8.1 Dataset: CIFAR10

CIFAR-10 is an established computer-vision dataset used for object recognition. It is a subset of the 80 million tiny images dataset and consists of 60,000 32×32 color images containing 10 object classes, with 6000 images per class. It was collected by Alex Krizhevsky, Vinod Nair, and Geoffrey Hinton. There are 50,000 training images and 10,000 test images (Learning Multiple Layers of Features from Tiny Images, Alex Krizhevsky, 2009).

 

We have preloaded CIFAR-10 dataset at CTE-POWER supercomputer in the directory /gpfs/projects/nct00/nct00002/cifar-utils/cifar-10-batches-py downloaded from http://www.cs.toronto.edu/~kriz/cifar.html.

For academic purposes, to make the training even harder and to be able to see larger training times for better comparison, we have applied a resize operation to make the images of 128×128 size. We created a custom load_data function (/gpfs/projects/nct00/nct00002/cifar-utils/load_cifar.py) that applies this resize operation and splits the data into training and test sets. We can use it as:

sys.path.append(‘/gpfs/projects/nct00/nct00002/cifar-utils’)
from cifar import load_cifar

load_cifar.py can be obtained from this repository GitHub for readers that want to review it (for the students of this course it is not necessary to download it).

8.2 Neural Networks architecture: ResNet

Now we are going to use a neural network that has a specific architecture known as ResNet. In this scientific community, we find many networks with their own name. For instance, AlexNet, by Alex Krizhevsky, is the neural network architecture that won the ImageNet 2012 competition. GoogleLeNet, which with its inception module drastically reduces the parameters of the network (15 times less than AlexNet). Others, such as the VGGnet, helped to demonstrate that the depth of the network is a critical component for good results. The interesting thing about many of these networks is that we can find them already preloaded in most of the Deep Learning frameworks.

Keras Applications are prebuilt deep learning models that are made available. These models differ in architecture and the number of parameters; you can try some of them to see how the larger models train slower than the smaller ones and achieve different accuracy.

A list of all available models can be found here (the top-1 and top-5 accuracy refers to the model’s performance on the ImageNet validation dataset.). For this hands-on, we will consider one architecture from the family of ResNet as a case study: ResNet50v2. ResNet is a family of extremely deep neural network architectures showing compelling accuracy and nice convergence behaviors, introduced by He et al. in their 2015 paper, Deep Residual Learning for Image RecognitionA few months later, the same authors published a new paper, Identity Mapping in Deep Residual Network, with a new proposal for the basic component, the residual unit, which makes training easier and improves generalization. And this lets the V2 versions:

tf.keras.applications.ResNet50V2(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling=None,
    classes=1000,
    classifier_activation="softmax",
)

The “50” stand for the number of weight layers in the network. The arguments for the network are:

  • include_top: whether to include the fully-connected layer at the top of the network.
  • weights: one of None (random initialization), ‘imagenet’ (pre-training on ImageNet), or the path to the weights file to be loaded.
  • input_tensor: optional Keras tensor (i.e. output of layers.Input()) to use as image input for the model.
  • input_shape: optional shape tuple, only to be specified if include_top is False (otherwise the input shape has to be (224, 224, 3) (with 'channels_last' data format) or (3, 224, 224)(with 'channels_first' data format). It should have exactly 3 inputs channels, and width and height should be no smaller than 32. E.g. (200, 200, 3)would be one valid value.
  • pooling: Optional pooling mode for feature extraction when include_top is False. (a)None means that the output of the model will be the 4D tensor output of the last convolutional block. (b) avg means that global average pooling will be applied to the output of the last convolutional block, and thus the output of the model will be a 2D tensor. (c)max means that global max pooling will be applied.
  • classes: optional number of classes to classify images into, only to be specified if include_topis True, and if no weights argument is specified.
  • classifier_activation: A str or callable. The activation function to use on the “top” layer. Ignored unless include_top=True. Set classifier_activation=None to return the logits of the “top” layer.

Note that if weights="imagenet", Tensorflow middleware requires a connection to the internet to download the imagenet weights (pre-training on ImageNet). Due we are not centering our interest in Accuracy, we didn’t download the file with the imagenet weights; therefore, it must be used weights=None.

8.3 Frameworks for parallel and distributed training

We can use frameworks like TensorFlow or Pytorch to program multi-GPU training in one server. To parallelize the training of the model, you only need to wrap the model with torch.nn.parallel.DistributedDataParallelif you use PyTorch or with tf.distribute.MirroredStrategy if you use TensorFlow.

However, the number of GPUs that we can place in a server is very limited, and the solution goes through putting many of these servers together, as we did at the BSC, with the CTE-POWER supercomputer, where 54 servers are linked together with an InfiniBand network on optical fiber.

In this new scenario, we need an extension of the software stack to deal with multiple distributed GPUs in the neural network training process. There are many options, as Horovod and Ray. Both plug into TensorFlow or PyTorch. But the distribution is not included in this hands-on for time constraints. If the reader is interested in using Horovod, he/she can follow this hands-on about Horovod.

As a summary, in the following sections 9 of this hands-on, we will present in detail how to scale the training of a DL neural network on multiple GPUs in one server using TensorFlowtf.distributed.MirroredStrategy()API.

8.4 Sequential version of ResNet

Before showing how to train a neural network in parallel, let’s start with a sequential version of the training in order to get familiarized with the classifier.

The sequential code to train the previously described problem of classification of the CIFAR10 dataset using a ResNet50 neural network could be the following (we will refer to it as ResNet50_seq.py):

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import numpy as np
import argparse
import time
import sys
sys.path.append(‘/gpfs/projects/nct00/nct00002/cifar-utils’)
from cifar import load_cifar
parser = argparse.ArgumentParser()
parser.add_argument(‘ — epochs’, type=int, default=5)
parser.add_argument(‘ — batch_size’, type=int, default=2048)
args = parser.parse_args()
batch_size = args.batch_size
epochs = args.epochs
train_ds, test_ds = load_cifar(batch_size)
model = tf.keras.applications.resnet_v2.ResNet50V2(    
        include_top=True, 
        weights=None, 
        input_shape=(128, 128, 3), 
        classes=10)
opt = tf.keras.optimizers.SGD(0.01)
model.compile(loss=’sparse_categorical_crossentropy’,
              optimizer=opt,
              metrics=[‘accuracy’])
model.fit(train_ds, epochs=epochs, verbose=2)

ResNet50_seq.py file can be downloaded from the course repository GitHub.

To run it using the SLURM system can be done using the following SLURM script (we will refer to it as ResNet50_seq.sh):

#!/bin/bash
#SBATCH — job-name=”ResNet50_seq”
#SBATCH -D .
#SBATCH — output=RESNET50_seq_%j.out
#SBATCH — error=RESNET50_seq_%j.err
#SBATCH — nodes=1
#SBATCH — ntasks=1
#SBATCH — cpus-per-task=160
#SBATCH — time=00:15:00
module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python ResNet50_seq.py --epochs 20 --batch_size 256

ResNet50_seq.sh can be downloaded from the course repository GitHub.


Task 5:

Execute your ResNet50_seq.py program in CTE-POWER with the SLURM workload manager system using the job script ResNet50_seq.sh presented in this section. Check the .out file.


If we check the .out file we can see the result of the output that gives us the Keras:

Epoch 1/20
196/196 - 41s - loss: 2.0176 - accuracy: 0.2584
Epoch 2/20
196/196 - 41s - loss: 1.7346 - accuracy: 0.3648
Epoch 3/20
196/196 - 41s - loss: 1.5624 - accuracy: 0.4271
Epoch 4/20
196/196 - 41s - loss: 1.4427 - accuracy: 0.4715
Epoch 5/20
196/196 - 41s - loss: 1.3523 - accuracy: 0.5090
Epoch 6/20
196/196 - 41s - loss: 1.2699 - accuracy: 0.5417
Epoch 7/20
196/196 - 41s - loss: 1.1894 - accuracy: 0.5719
Epoch 8/20
196/196 - 41s - loss: 1.1048 - accuracy: 0.6076
Epoch 9/20
196/196 - 41s - loss: 1.0136 - accuracy: 0.6439
Epoch 10/20
196/196 - 41s - loss: 0.9174 - accuracy: 0.6848
Epoch 11/20
196/196 - 41s - loss: 0.8117 - accuracy: 0.7256
Epoch 12/20
196/196 - 41s - loss: 0.6989 - accuracy: 0.7705
Epoch 13/20
196/196 - 41s - loss: 0.5858 - accuracy: 0.8117
Epoch 14/20
196/196 - 41s - loss: 0.4870 - accuracy: 0.8482
Epoch 15/20
196/196 - 41s - loss: 0.4003 - accuracy: 0.8749
Epoch 16/20
196/196 - 41s - loss: 0.3194 - accuracy: 0.9040
Epoch 17/20
196/196 - 41s - loss: 0.2620 - accuracy: 0.9227
Epoch 18/20
196/196 - 41s - loss: 0.2008 - accuracy: 0.9421
Epoch 19/20
196/196 - 41s - loss: 0.1441 - accuracy: 0.9615
Epoch 20/20
196/196 - 41s - loss: 0.0742 - accuracy: 0.9859

For the purpose of this post, to calculate the time (which will be the metric that we will use to compare performance), we can use the time that Keras himself tells us that it takes an epoch (sometimes we discard the first epoch as it is different from the rest since it has to create structures in memory and initialize them). Remember that we are in a teaching example, and with this approximate measure of time, we have enough for the course goals.

For any other of the networks available in Keras, simply change in the code the piece of code that identifies the network (resnet_v2.ResNet50V2 in our example) for the corresponding network.

9 — Parallel training with TensorFlow

tf.distribute.Strategy is a TensorFlow API to distribute training across multiple GPU or TPUs with minimal code changes (from the sequential version presented in the previous post). This API can be used with a high-level API like Keras, and can also be used to distribute custom training loops.

tf.distribute.Strategy intends to cover a number of distribution strategies use cases along different axes. The official web page of this feature presents all the currently supported combinations, however, in this post we will focus our attention on tf.distribute.MirroredStrategy.

9.1 “MirroredStrategy” strategy

We will use  tf.distribute.MirroredStrategy , in this hands-on, that supports synchronous distributed training on multiple GPUs (multiple-devices) on one server (single-host). It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. These variables are kept in sync with each other by applying identical updates.

With this strategy, efficient all-reduce algorithms are used to communicate the variable updates across the GPUs. By default, it uses NVIDIA NCCL as the all-reduce implementation. More detail of this can be found in this post.

Here is the simplest way of creating MirroredStrategy:

mirrored_strategy = tf.distribute.MirroredStrategy()

This will create a MirroredStrategy instance that will use all the GPUs visible to TensorFlow. It is possible to see the list of available GPU devices doing the following:

devices = tf.config.experimental.list_physical_devices(“GPU”)

It is also possible to use a subset of the available GPUs in the system by doing the following:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

To build the model and compile it inside the MirroredStrategyscope you can do it in the following way:

with mirrored_strategy.scope():
     model = tf.keras.applications.resnet_v2.ResNet50V2(
             include_top=True, weights=None, 
             input_shape=(128, 128, 3), classes=10)
  
     opt = tf.keras.optimizers.SGD(learning_rate)
 
     model.compile(loss=’sparse_categorical_crossentropy’, 
                   optimizer=opt, metrics=[‘accuracy’])

This allows us to create distributed variables instead of regular variables: each variable is mirrored across all the replicas and is kept in sync with each other by applying identical updates. It is important during the coding phase, that the creation of variables should be under the strategy scope. In general, this is only during the model construction step and the compile step. Training can be done as usual outside the strategy scope with:

dataset = load_data(batch_size)
model.fit(dataset, epochs=5, verbose=2)

 

Synchronous distributed training, supported by tf.distribute.MirroredStrategy , means that the state of the per-GPU model replicas stays the same at all times. When the code open a  scope MirroredStrategy and build a model within it, the MirroredStrategy object will create one model copy (replica) on each available GPU.

9.2 Parallel performance measurement

In this post, we will consider the epoch time as a measure of the computation time for training a DNN. This approximated measure in seconds provided by Keras during the .fit is enough for the purpose of this academic post. In our case, as we already introduced, we will discard the first-time epoch as it includes creating and initializing structures. Obviously, for certain types of performance studies, it is necessary to go into more detail, differentiating the loading data, feeds forward time, loss function time, back propagation time, etc., but it falls outside the scope of this case study we propose in this hands-on.

9.3 Choose the Batch Size and Learning Rate

When training, it is required to allocate memory to store samples for training the model and the model itself. We have to keep in mind this in order to avoid an out-of-memory error.

Remember that the batch_size is the number of samples that the model will see at each training step, and in general, we want to have this number as biggest as possible (powers of 2). We can calculate it by try and error approach testing different values until an error related to the memory capacity appears:

python ResNet50.py -- epoch 1 -- batch_size 16
python ResNet50.py -- epoch 1 -- batch_size 32
python ResNet50.py -- epoch 1 -- batch_size 64
.
.
.

When using MirroredStrategy with multiple GPUs, the batch size indicated is divided by the number of replicas. Therefore the batch_size that we should specify to TensorFlow is equal to the maximum value for one GPU multiplied by the number of GPUs we are using. This is, in our example, use these flags in the python program:

python ResNet50.py -- epochs 5 -- batch_size 256  -- n_gpus 1
python ResNet50.py -- epochs 5 -- batch_size 512  -- n_gpus 2
python ResNet50.py -- epochs 5 -- batch_size 1024 -- n_gpus 4

Accordingly, with the batch_size, if we are using MirroredStrategy with multiple GPUs, we want the learning_rateto be:

learning_rate = learning_rate_base*number_of_gpus
opt = tf.keras.optimizers.SGD(learning_rate)

Because of a larger batch_size, we also want to take bigger steps in the direction of the minimum to preserve the number of epochs to converge.

9.4 Parallelization of the training step

We will use the same sequential example presented in this post (that classifies the CIFAR10 dataset using the ResNet50 neural network) to show how we can parallelize the training step on the CTE-POWER cluster.

9.4.1 Parallel code for ResNet50 neural network

Following the steps presented in the above section on how to apply MirroredStrategy, below we present the resulting parallel code for the ResNet50:

import tensorflow as tf 
from tensorflow.keras import layers 
from tensorflow.keras import models
import numpy as np
import argparse
import time
import sys
sys.path.append(‘/gpfs/projects/nct00/nct00002/cifar-utils’)
from cifar import load_cifar
parser = argparse.ArgumentParser()
parser.add_argument(‘ -- epochs’, type=int, default=5)
parser.add_argument(‘ -- batch_size’, type=int, default=2048)
parser.add_argument(‘ -- n_gpus’, type=int, default=1)
args = parser.parse_args()
batch_size = args.batch_size
epochs = args.epochs
n_gpus = args.n_gpus
train_ds, test_ds = load_cifar(batch_size)
device_type = ‘GPU’
devices = tf.config.experimental.list_physical_devices(
          device_type)
devices_names = [d.name.split(“e:”)[1] for d in devices]
strategy = tf.distribute.MirroredStrategy(
           devices=devices_names[:n_gpus])
with strategy.scope():
     model = tf.keras.applications.resnet_v2.ResNet50V2(
             include_top=True, weights=None,
             input_shape=(128, 128, 3), classes=10)
     opt = tf.keras.optimizers.SGD(0.01*n_gpus)
     model.compile(loss=’sparse_categorical_crossentropy’, 
                   optimizer=opt, metrics=[‘accuracy’])
model.fit(train_ds, epochs=epochs, verbose=2)

9.4.2 SLURM script

The SLURM script that allocates resources and executes the model for different number of GPUs can be as the follow one (ResNet50.sh):

#!/bin/bash
#SBATCH --job-name=”ResNet50"
#SBATCH --D .
#SBATCH --output=ResNet50_%j.output
#SBATCH --error=ResNet50_%j.err
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=160
#SBATCH --gres=gpu:4
#SBATCH --time=00:30:00
module purge; module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python ResNet50.py -- epochs 5 -- batch_size 256 -- n_gpus 1
python ResNet50.py -- epochs 5 -- batch_size 512 -- n_gpus 2
python ResNet50.py -- epochs 5 -- batch_size 1024 -- n_gpus 4

Because we use the same SLURM script for all three executions, pay attention to indicate the number of GPUs required with --gres=gpu:4 .


Task 6:

Execute the parallel ResNet50.py program for 1,2, and 4 GPUs in CTE-POWER using the job script ResNet50.sh presented in this section. Inspect the .out file.


Once we run the script, in the file that has stored the standard output we find the following execution times:

python ResNet50.py --epochs 5 --batch_size 256 --n_gpus 1 
Epoch 1/5 
196/196 - 49s - loss: 2.0408 - accuracy: 0.2506
Epoch 2/5
196/196 - 45s - loss: 1.7626 - accuracy: 0.3536
Epoch 3/5
196/196 - 45s - loss: 1.5863 - accuracy: 0.4164
Epoch 4/5
196/196 - 45s - loss: 1.4550 - accuracy: 0.4668
Epoch 5/5
196/196 - 45s - loss: 1.3539 - accuracy: 0.5070
python ResNet50.py --epochs 5 --batch_size 512 --n_gpus 2
Epoch 1/5
98/98 - 26s - loss: 2.0314 - accuracy: 0.2498
Epoch 2/5
98/98 - 24s - loss: 1.7187 - accuracy: 0.3641
Epoch 3/5
98/98 - 24s - loss: 1.5731 - accuracy: 0.4207
Epoch 4/5
98/98 - 24s - loss: 1.4543 - accuracy: 0.4686
Epoch 5/5
98/98 - 24s - loss: 1.3609 - accuracy: 0.5049
python ResNet50.py --epochs 5 --batch_size 1024 --n_gpus 4 
Epoch 1/5
49/49 - 14s - loss: 2.0557 - accuracy: 0.2409
Epoch 2/5
49/49 - 12s - loss: 1.7348 - accuracy: 0.3577
Epoch 3/5
49/49 - 12s - loss: 1.5696 - accuracy: 0.4180
Epoch 4/5
49/49 - 12s - loss: 1.4609 - accuracy: 0.4625
Epoch 5/5
49/49 - 12s - loss: 1.3689 - accuracy: 0.5010

It is important to note that we center our interest on the computational speed of the process rather than the model’s accuracy. For this reason, we will only execute few epochs during the training, due as we can see, training times per epoch are constant (approx.), and with 5 epochs for each experiment, we achieve the same accuracy in all three cases. That means that only 5 epochs allow comparing the three options.

1 GPU: 45 seconds
2 GPU: 24 seconds
4 GPU: 12 seconds

It is obvious that if we want to do a deep/real performance analysis, we should carry out several tests and then take the averages of the times obtained in each one of them. But given the academic purpose of this post (and the cost of supercomputing resources, which we have to save!), it is enough only one execution.

9.5 Analysis of the results

In summary, the time required for doing one epoch is as shown in the following plot:

We can translate this to images per second (since we know there are 50,000 images), which gives us the throughput:

Finally, as I said, the speedup is the most relevant metric:

It’s an almost linear speedup. We refer to linear speedup linear when the workload is equally divided between the number of GPUs.

10 — Bonus: Parallelization of the ResNet152V2 neural network


Task 7:

If you want to learn more and consolidate the knowledge acquired, now it’s your turn to get your hands dirty to reproduce the above results for the ResNET152V2, and then compare them with those presented in the previous section. Are you ready? Come on! You can generate the plots of epoch time, image/second, and Speedup.


Remember that the first step is to find the best batch_size. If you plot the results of both case studies together, you should find results equivalent to those shown below:

A couple of relevant things are observed. The first is that the time to run a ResNet152 epoch is much longer, and therefore the throughput in images per second is much lower than on the ResNet50 network. Why is this happening? ResNet152 network is deeper, meaning that it has more layers, therefore the training time will be higher.

It can be seen that the speedup for the ResNet152 is no longer linear; could you try to give a possible answer as to why this is happening? Obviously, it depends on many things and it is required a detailed analysis; however, due to the biggest size of the network, it is adding additional latency for synchronization.

Conclusions

In this hands-on, we have presented how we could paralyze a single Deep Neural Network training over many GPUs in one server using tf.distributed.MirroredStrategy() TensorFlow API.

As we can see in the results, the accuracy achieved by our model is more or less constant independently of the number of GPUs. In summary, using a distributed strategy, our model can learn the same but faster, thanks to parallelization!

Thanks to parallelization, Deep Learning can learn the same but faster!


Acknowledgment: Many thanks to Juan Luis Domínguez and Oriol Aranda from BSC-CNS, who wrote the first version of the codes that appear in this hands-on, and to Carlos Tripiana and Félix Ramos for the essential support using the POWER-CTE cluster. Also, many thanks to Alvaro Jover Alvarez, Miquel Escobar Castells, and Raul Garcia Fuentes for their contributions to the proofreading of this document. The code used in this post is based on the GitHub https://github.com/jorditorresBCN/PATC-2021