R parallelization with mclapply

mclapply is a parallelized version of lapply if provided mc.cores>1. For mc.cores == 1, it simply calls lapply​. By default, the mc.core is set to the total available number of cores on the node.

Here is an example:

library(doParallel)

pp<-function(n){return(n*n)}

system.time(
x <- lapply(1:50000,pp)
)

system.time(
x <- mclapply(1:50000, pp, mc.cores = 10L)
)

To run the program, the R module needed to be loaded as follows:
module load R/3.5.1

And use the following command to run the program in an interactive session:


$ Rscript sample1.r
Loading required package: foreach
Loading required package: iterators
Loading required package: parallel
user system elapsed
0.148 0.004 0.152

user system elapsed
0.093 0.070 0.049

In this example, the non-parallel portion takes 0.152 seconds and the parallel portion takes 0.049 seconds.

R parallelization with foreach

The foreach package must be used in conjunction with a package such as doParallel in order to execute code in parallel. The user must register a parallel backend to use, otherwise, foreach will execute tasks sequentially, even when the %dopar% operator is used.1​

The doParallel package acts as an interface between foreach and the parallel package of R 2.14.0 and later.

Here is an example of R parallel program with foreach:

library(foreach)
library(doParallel)
library(parallel)
#no_cores <- detectCores()/2

doParallel::registerDoParallel(32)

x <- iris[which(iris[,5] != "setosa"), c(1,5)]
trials <- 10000
system.time({
r <- foreach(icount(trials), .combine=rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})

system.time({
for(i in 1: trials) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})

The same task was executed with code with both parallel foreach and non-parallel foreach to demonstrate the advantage of parallelization as shown in the execution result blow:

$ Rscript sample2.r 
user system elapsed
0.026 0.001 0.028
user system elapsed
5.996 1.373 5.365

R parallelization with MPI

Message Passing Interface (MPI) is a library specification for message-passing, enabling large-scale parallelization across multiple nodes on an HPC cluster. ​RMPI provides an R wrapper to the MPI API, making MPI functions accessible to R programmers.

Here is an example:

library('Rmpi')

n_nodes = mpi.universe.size()-1
#n_nodes = mpi.comm.size(1)-1 #unlike C MPI, do not use this. comm.size() returns one since mpirun -n 1 must be used
print(n_nodes)

#spawn n_nodes workers on different compute nodes on the cluster
mpi.spawn.Rslaves(nslaves=n_nodes)

ptm<-proc.time()
# execute build-in functions on the remote workers
mpi.remote.exec(paste("I am ", mpi.comm.rank(), " of ", mpi.comm.size(), " on ",Sys.info()[c("nodename")]))
mpi.remote.exec(sum(1:mpi.comm.rank()))
print(proc.time() - ptm)
mpi.close.Rslaves()
mpi.quit()

It is a bit tricky to run an MPI R program. Unlike C or Python program specifying the number of processes with -n #processes, for R MPI program, you much run a program with the following command:

mpirun -np 1 Rscript sampleMPI1.r

Here is the batch job script if you want to submit a batch job:

#!/bin/bash
#SBATCH --job-name=my_job
#SBATCH --output=my_output_file.txt # Delete this line if you want the output file in slurm-jobID.out format. It will be different every time you submit the job.
#SBATCH --partition=testing # defq is the default queue as the all.q in SGE scripts
#SBATCH --account=testing
#SBATCH --time=00:05:00 # Time limit hrs:min:sec. It is an estimation about how long it will take to complete the job. 72:00:00 is the maximum
#SBATCH --nodes=1 # It should be 1 for all non-mpi jobs.

#SBATCH --ntasks=20 # It should be 1 for all non-MPI jobs. Otherwise, the same application will run multiple times simultaneously
#SBATCH --mail-type=ALL
#SBATCH --mail-user=youremailaddress@utsa.edu #you email address for receiving notices about your job status


. /etc/profile.d/modules.sh
# Load one of these
module load shared R/3.5.1
module load openmpi/4.0.5
#Rscript sample1.r #use this for non-MPI program
mpirun -np 1 Rscript sampleMPI1.r # for MPI program
exit 0


As with all other types of batch jobs, use the following command to submit the job to the cluster:
sbatch myjobscript

-- Zhiwei - 13 May 2021
Topic revision: r4 - 14 May 2021, AdminUser
This site is powered by FoswikiCopyright © by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding Foswiki? Send feedback