Blog

×

Machine Learning with XGBoost on Qubole Spark Cluster

By June 5, 2017

This is a guest post authored by Mikhail Stolpner, Solutions Architect, Qubole

Planning XGBoost cluster

This planning guide covers a scenario when there is a need to run an XGBoost job that must complete in a certain timeframe. In this scenario, we can assume that the load requirement is known ahead of time.

Before we move to describe an approach how to configure such cluster, we will overview Spark resource allocation.

Resource Allocation

This section explains relationship between Cluster Node, Spark Executor, Task, XGBoost Worker, and Node.

The diagram bellows depicts a composition hierarchy in a single node of a Spark cluster running XGBoost. A Cluster Node runs one or more Spark Executors. Each Spark executor runs in its own JVM. Each Spark Executor can run multiple Spark Tasks. XGBoost workers are executed as Spark Tasks. A total number of XGBoost Workers in a single Cluster Node is a number of Executors N times a number of Tasks per Executor K or N * K.

Yarn manages resources of the Qubole Spark cluster. When we start a new job, Yarn is trying to fit our requirements on vcores and memory into available nodes. If a node has 64 vcores and our tasks are configured to utilize 12 vcores each, Yarn will only be able to fit 5 of these tasks into a single node, leaving 4 vcores unused, as shown on the diagram below. The same logic applies to memory.

Yarn will try to start as many Executors as specified in spark.executor.instances (see table below in Spark Configuration section) and will try to scale out to spark.qubole.max.executors if needed.

It’s important to note, that Yarn will start a single ApplicationMaster on one of the Slave Nodes. The ApplicationMaster process will take one vcore leaving this Slave Node with one vcore less than all other Slave Nodes. It will reduce a number of Executors that Yarn will be able to allocate for this node.

Spark Driver will be running on the Master Node.

Tuning Parameters

With systems as complex as XGBoost running on Spark cluster, it’s very difficult to precisely predict the best environment configuration. It’s usually the best approach to try a few different configurations and pick the one that your specific code works the best on.

There are 3 areas of configuration variables which you can change to formulate your environment: Cluster Configuration, Spark Configuration, XGBoost Configuration. We go into details on each of these areas below.

Cluster Configuration

Remember, each cluster will always have a single Master Node and a specified number of Slave Nodes. In the Qubole Cluster Configuration, you can choose node instance type for both Master Node and Slave Nodes. A node instance type will determine how much memory and how many vcores a single node will have. You can also choose a minimum and a maximum number of nodes for Slave Nodes. However, for this scenario we recommend using the same number for both min and max as the load is known ahead of time and auto-scaling is not required. A combination of these factors will determine how many vcores and how much memory will be available on your Spark cluster.

It’s also important to note that you can decide on your Cluster Composition by choosing between “On-Demand”, “Spot”, and “Spot Block”. Choosing “Spot Block” over “On-Demand” will not impact performance, but it may significantly reduce your cost. While choosing “Spot” may help to further reduce the cost, we do not recommend it in this scenario as “Spot” nodes maybe taken away in the middle of the process. It will increase a run time of the process overall.

Spark Configuration

Among many Spark parameters, we are mostly interested in the following:

ParameterDescription
spark.task.cpusDefines a number of vcores allocated to each Spark Task.

It can be set in interpreter.

spark.executor.coresDefines a number of vcores allocated for each Spark Executor.

It can be set in interpreter.

spark.executor.instancesDefines a number of Executor instances Spark will start.

It can be set in interpreter.

spark.qubole.max.executorsDefines a maximum number of Executor instances Spark will scale to.

It can be set in interpreter.

spark.executor.memoryDefines amount of memory available for each Spark Executor.

It can be set in interpreter.

XGBoost Configuration

There are only two parameters we will configure in XGBoost.

ParameterDescription
nThreadDefines a number of cores utilized by each XGBoost Worker. Cannot be more than spark.executor.cores. Remember, XGBoost worker runs as a Spark Task.
numWorkersDefines a total number of XGBoost Workers.

General Rules

Every XGBoost job is unique and may perform well on differently configured clusters. Below are some guidelines that will help you to configure your environment.

  • Plan your cluster. Think of how many Executors you would like each node to run, how many Spark Tasks you would like each Executor to contain, what it means in terms of vcores and memory on each Node.
  • Remember, that one vcore on one Slave Node will be taken by ApplicationMaster. If you have 10 nodes in your cluster with 64 vcores each and you set spark.executor.cores to 64, Yarn will not be able to place any Executor container on the node with ApplicationMaster, completely loosing this node with the rest of 63 vcores for the actual job. However, if you configure a cluster with 1000 nodes, it may worth to waste one node.
  • Yarn will only be able to create as many Executors, Tasks, etc, as much resources it has. If you see that it did not create as many Executors as you expected, it’s an indication that there is not enough resources to support your desired configuration.
  • (vcores on a single node) / (spark.executor.cores) = number of executors that can be allocated on a single node.
  • (spark.executor.cores) / (spark.task.cpus) = number of tasks (or XGBoost workers) that can be allocated on a single executor.
  • Maximum number of XGBoost workers you can run on a cluster = number of nodes * number of executors run on a single node * number of tasks (or XGBoost workers) run on a single executor.
  • Keep nThread the same as spark.task.cpus. A good range for nThread is 4…8.
  • Set spark.executor.cores to double of nThread.
  • While its seems that the better allocation and utilization of the resources you achieve the better the performance will be, it’s not always the case. Different code will benefit from a different configuration. Configuration that shows less utilization than others can actually run your code faster. To summarize, it’s beneficial to start from a configuration that allocates as much resources as possible. However, do not assume it’s the best you can get, keep experimenting until you achieve your goal.
  • There is a memory overhead at all levels – OS, Spark, Yarn. You need to take this into consideration when allocating memory for Executors with spark.executor.memory. If you simply divide all memory available into a number of Executors you want to run on a single node, Yarn will not be able to start so many Executors due to overhead.

Setting Up XGBoost Cluster

Build XGBoost JARs:

You can build XGBoost JAR files with the following script:

wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O 
/etc/yum.repos.d/epel-apache-maven.repo
sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
yum install -y apache-maven
git clone --recursive https://github.com/dmlc/xgboost
cd xgboost
make -j4
export JAVA_HOME=/usr/lib/jvm/java-openjdk/
cd jvm-packages
mvn package

At this point you have your XGBoost JARs ready to deploy to Spark cluster.

Operationalizing XGboost with Bbootstrap

Place JARs (xgboost4j-0.7.jar, xgboost4j-spark-0.7.jar) on S3 bucket

Please refer to this document for more details.

Create bootstrap script to copy JARs to Spark library:

hadoop dfs -get s3://bucket/xgboost4j-0.7.jar /usr/lib/spark/lib/
hadoop dfs -get s3://bucket/xgboost4j-spark-0.7.jar /usr/lib/spark/lib/

Note, you can also use s3cmd to copy files from S3.

Executing Notebooks

You will need to set all parameters mentioned in the Spark Configuration section in an Interpreter bound to your notebook: spark.executor.cores, spark.executor.instances, spark.qubole.max.executors, spark.executor.memory.

You will also need to configure XGBoost in your code. Below is a quick example on how that can be done:

val params = List(
 "nthread" -> 4).toMap
val xgboostModel = XGBoost.train(rdd, param, numIterations, numWorkers, useExternalMemory=false) 

Tools on Qubole Platform

Qubole offers an extensive set of tools that can help you monitor and tune your Spark jobs. We will review them in this section.

Resource Manager

Resource Manager is a Yarn UI and it shows resource allocation. Note, resource allocation does not reflect actual load. However, you cannot load your resources at a high level if you do not allocate them right.

Nodes page on the Resource Manager is a great tool to find out how resources have been allocated on your nodes. Remember, you need to start a Spark job before you can see any report on this tool.

To navigate to Resource Manager, open Clusters page, hover over Resources to the right from your cluster and click Resource Manager. Then clink on Nodes link on the left.

Ganglia

Ganglia is a monitoring tool and it shows actual load as opposed to resource allocation. You can start from Utilization Heatmap and drill down to many metrics available.

To navigate to Ganglia, open Clusters page, hover over Resources to the right from your cluster and click Ganglia.

Spark Application Master

ApplicationMaster is available through a link on the Resource Manager. Once you are on that page, click on the link that is to the right from your application. It will bring Spark Application Master UI.

Click on Stages. It will show you completed and active stages. Very helpful combined with Ganglia, as it shows which step of your process utilizes resources at what level. It is very interesting to see that different overall configuration can improve some parts of your process but not the others.

Study Case

We experimented with a Notebook outlined below and measured performance of the last step – the training process.

sc.parallelize(Seq("")).map(x => {
 import ml.dmlc.xgboost4j.scala.{Booster, DMatrix}
}).collect()

//Load data into RDD
val trainingRDD = spark.read.csv(“S3://bucket/”)

// Prepare parameters
val params = List("nthread" -> 4).toMap 
val iterations = 200
val workers = 78

// Train the model
val xgboostModel = XGBoost.train(trainingRDD, params, iterations, workers, useExternalMemory=true)

The notebook processed 1TB of data. As you can see from the table below, the most optimal runs (highlighted) have average load of 75% and relatively small number of workers, spark.task.cpus = 4 that is also equal to nThread, and a spark.executor.cores double of nThread. This data can give you a starting point for your experiments. This cluster was configured to utilize AWS instances with 64 vcores.

Let’s examine the first configuration in this table. With 8 vcores per Executor, 64 vcores on a node, and 10 nodes in total, it may look as we should have 80 Executors. However, spark.executor.instances is set to 79. That’s because we lose one vcore on one Node to ApplicationMaster and Yarn can only allocate 7 Executors instead of 8 on that Node.

References

Explore the references below to dig deeper into various areas of Spark, Yarn, Qubole.

http://docs.qubole.com/en/latest/user-guide/clusters/configuring-clusters.html

http://docs.qubole.com/en/latest/user-guide/clusters/node-bootstrap.html#nodebootstrapscript

http://docs.qubole.com/en/latest/user-guide/spark/index.html

http://docs.qubole.com/en/latest/user-guide/spark/debugging-spark.html

https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/YARN.html

https://hortonworks.com/blog/apache-hadoop-yarn-concepts-and-applications/

http://xgboost.readthedocs.io/en/latest/tutorials/aws_yarn.html

Share our Post

Leave a Reply

Your email address will not be published. Required fields are marked *