Deep Learning on Qubole Using BigDL for Apache Spark – Part 2

August 3, 2017 by Updated January 15th, 2019

In Part 1 you learned how to get started with installing distributed deep learning library BigDL on Qubole.

In this Part 2 of a two-part series, you will learn how to write a deep learning application on Qubole that uses BigDL to identify handwritten digits (0 to 9).

In this application, you will:

  1. Train and validate LeNet-5 (Convolutional Neural Networks) model using MNIST database and then save the model.
  2. Load the saved model and identify (“recognize”) digits in a given set of arbitrary images with >= 98% accuracy.

To write the application, you will use Qubole Notebooks which are great for developing applications in Scala, Python, R, running ETL jobs in Spark, as well as visualizing results of SQL in a single, collaborative environment. Qubole Notebooks give data scientists and data analysts an easy way to interact with data stored in Cloud data stores such as Amazon S3, Microsoft Azure Blob Store, Oracle OCI Object Storage, etc.


  • #1 Start Spark cluster that you configured in Part 1.
  • #2 Switch over to Notebooks interface
  • #3 Create new Notebook by entering respective values and select Spark cluster you configured in Part 1.
  • #4 Set spark.executor.instances and spark.qubole.max.executors to 2. (Note: At the time of writing this, BigDL required these two variables to have the same value.) To do this, click on Interpreter link located on the top right in your Notebook you just created. Then, click on edit on the top right within the Interpreter interface and scroll down to spark interpreter. After setting the above two parameters to 2, click on Save. Finally, click on Back to Note button located at the top of Interpreter interface.
  • #5 In the first paragraph, copy-and-paste the following Scala code. When executed, it will train, validate, and save the trained model.
import{LocalValidator, Top1Accuracy, Validator}

import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}

val numberOfMinNodes = 4
val numberOfCoresPerNode = 8
val mult = 64
val batchSize = numberOfMinNodes * numberOfCoresPerNode * mult


def load(featureFile: Path, labelFile: Path): Array[ByteRecord] = {
  val labelBuffer = ByteBuffer.wrap(Files.readAllBytes(labelFile))
  val featureBuffer = ByteBuffer.wrap(Files.readAllBytes(featureFile))
  val labelMagicNumber = labelBuffer.getInt()

  require(labelMagicNumber == 2049)
  val featureMagicNumber = featureBuffer.getInt()
  require(featureMagicNumber == 2051)

  val labelCount = labelBuffer.getInt()
  val featureCount = featureBuffer.getInt()
  require(labelCount == featureCount)

  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()

  val result = new Array[ByteRecord](featureCount)
  var i = 0
  while (i < featureCount) {
     val img = new Array[Byte]((rowNum * colNum))
     var y = 0
     while (y < rowNum) {
       var x = 0
       while (x < colNum) {
          img(x + y * colNum) = featureBuffer.get()
          x += 1
       y += 1
    result(i) = ByteRecord(img, labelBuffer.get().toFloat + 1.0f)
    i += 1

val baseDir = "/media/ephemeral0/bigdl/mnist/data/"
val trainDataFile = "train-images-idx3-ubyte"
val trainLabelFile = "train-labels-idx1-ubyte"
val validationDataFile = "t10k-images-idx3-ubyte"
val validationLabelFile = "t10k-labels-idx1-ubyte"

val trainData = Paths.get(baseDir, trainDataFile)
val trainLabel = Paths.get(baseDir, trainLabelFile)
val validationData = Paths.get(baseDir, validationDataFile)
val validationLabel = Paths.get(baseDir, validationLabelFile)

val trainMean = 0.13066047740239506
val trainStd = 0.3081078
val trainSet = DataSet.array(load(trainData, trainLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(batchSize)

val testMean = 0.13251460696903547
val testStd = 0.31048024
val validationSet = DataSet.array(load(validationData, validationLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(batchSize)

val state = T("learningRate" -> 0.05 / 4 * mult)
val maxEpoch = 15

val initialModel = LeNet5(10) // 10 digit classes
val optimizer = Optimizer(
 model = initialModel, // training modes
 dataset = trainSet, // training dataset
 criterion = ClassNLLCriterion[Float]()) // loss function
val trainedModel = optimizer.setValidation(
 trigger = Trigger.everyEpoch,
 dataset = validationSet,
 vMethods = Array(new Top1Accuracy)).setState(state).setEndWhen(Trigger.maxEpoch(maxEpoch)).optimize()

val validator = Validator(trainedModel, validationSet)
val result = validator.test(Array(new Top1Accuracy[Float]))
result.foreach(r => {
  println(s"${r._2} is ${r._1}")

Here’s what’s happening in code:

  • Import BigDL and other Java libraries needed for the application.
  • Call Engine.init
    • Engine is part of BigDL and it appropriately initializes executor environment variables and Spark properties in order to get the best performance for deep learning applications on Spark clusters.
  • Define load function
    • This function helps prepare our MNIST dataset. Recall that we downloaded MNIST dataset in folder ‘/media/ephemeral0/bigdl/mnist/data/’ on the cluster via bootstrap script in Part 1. This function accepts file path and returns Array[ByteRecord].
  • Load data from MINIST by creating the BigDL DataSet and then apply a series of transformers to preprocess data into Mini-batch.
    • Preprocess data, convert byte records into grey image, normalize it and convert a batch of labeled grey images into a mini-batch.
  • Train model
    • Before training the model, we need to set hyperparameters which determine how the network is trained. For more details on hyperparameters, refer to
    • We use LeNet-5 as our training model. LeNet-5 is a classical CNN model used in digital number classification. For detailed information, refer to
    • Then we create the Optimizer by specifying the DataSet, the Model and the Criterion–which, given input and target, computes gradient per given loss function.
  • Validate model
    • BigDL provides a set of metrics to evaluate the model via Validator and ValidationMethod classes. We use Top1Accuracy as validationMethod which will calculate top 1 accuracy and should yield accuracy of >= 98%.
  • Save model
    • Lastly, we save the trained and validated model so we can use it for our image recognition application.
  • #6 Run the above paragraph — it will take a few mins to train, validate and save the model. Once completed, proceed to the next step.
  • #7 In a new paragraph, copy-and-paste the following Scala code. When executed, it will load the saved trained model in previous step to identify (“recognize”) digits in a given set of arbitrary images.

import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}
import javax.imageio.ImageIO
import java.awt.image.{BufferedImage, DataBufferByte}
import java.awt.Color


def image_to_byte_array(imageName: String): Array[Byte] = {
  val imgPath = new File(imageName)
  val bufferedImage =
  val scaledImage = bufferedImage.getScaledInstance(28, 28, java.awt.Image.SCALE_SMOOTH)
  val scaledBufferedImage = new BufferedImage(28, 28, BufferedImage.TYPE_BYTE_GRAY)
  scaledBufferedImage.getGraphics.drawImage(scaledImage, 0, 0, new Color(0, 0, 0), null)
  // get DataBufferBytes from Raster
  val raster = scaledBufferedImage.getRaster
  val data = raster.getDataBuffer.asInstanceOf[DataBufferByte].getData
  val bytes = new Array[Byte](8 + data.length)
  val byteBuffer = ByteBuffer.wrap(bytes)
  System.arraycopy(data, 0, bytes, 8, data.length)


def load_image(imageName: String): Array[ByteRecord] = {
  val byteArray = image_to_byte_array(imageName)
  val featureBuffer = ByteBuffer.wrap(byteArray)
  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()
  val result = new Array[ByteRecord](1)
  val img = new Array[Byte]((rowNum * colNum))
  var y = 0
  while (y < rowNum) {
    var x = 0
    while (x < colNum) {
      img(x + y * colNum) = featureBuffer.get()
      x += 1
    y += 1
  result(0) = ByteRecord(img, 1.1f)

val testMean = 0.13251460696903547
val testStd = 0.31048024

val trainedModel = Module.load[Float]("/media/ephemeral0/bigdl/mnist/model/trainedModel")

val dir = "/media/ephemeral0/bigdl/"
val images = Seq("zero_28x28.png", "two_28x28.png", "three.png", "four_28x28.png", "seven.png", "nine_28x28.png")

for (img <- images) { val load_result = load_image(dir + img) val rddData = sc.parallelize(load_result) val transformer = BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToSample()
  val predictDataSet = transformer(rddData)
  val predictRDD = trainedModel.predictClass(predictDataSet)
  val predictedDigit = predictRDD.take(1)(0) - 1  // deduct 1 because the first label is always 0
  println("***** Digit in file " + img + " is: " + predictedDigit)

Imp: Replace YOUR_S3_BUCKET with S3 bucket in your AWS account where you uploaded test images.

  • #8 Run the above paragraph — once completed, you should see output similar to the following:

As you can see from the results, the trained model was able to identify digits (0,2,3,4,7,9) in our test image files.

In Summary

BigDL is a great open source deep learning library built to run natively on Apache Spark and autonomous big data platform Qubole provides for a perfect deployment platform for BigDL as well as other deep learning and machine learning libraries.

To learn how you can use Qubole for various workload types, click here.

  • Blog Subscription

    Get the latest updates on all things big data.
  • Recent Posts

  • Categories

  • Events

    Spark + AI Summit

    Apr. 23, 2019 | San Francisco, CA

    Open Data Science Conference East

    Apr. 30, 2019 | Boston, MA

    AWS Summit Mumbai

    May. 10, 2019 | Mumbai, India

    Informatica World

    May. 20, 2019 | Las Vegas, NV

    Disney Data & Analytics Conference

    Aug. 20, 2019 | Orlando, FL

    Strata NY

    Sep. 23, 2019 | New York, NY

    Big Data World Asia

    Oct. 9, 2019 | Singapore

    Microsoft Ignite

    Nov. 4, 2019 | Orlando, FL

    AWS re:Invent

    Dec. 2, 2019 | Las Vegas, NV