Deep Learning on Qubole Using BigDL for Apache Spark – Part 2

Start Free Trial
August 3, 2017 by Updated April 15th, 2024

In Part 1 you learned how to get started with installing distributed deep learning library BigDL on Qubole.

In this Part 2 of a two-part series, you will learn how to write a deep learning application on Qubole that uses BigDL to identify handwritten digits (0 to 9).

In this application, you will:

  1. Train and validate the LeNet-5 (Convolutional Neural Networks) model using the MNIST database and then save the model.
  2. Load the saved model and identify (“recognize”) digits in a given set of arbitrary images with >= 98% accuracy.

To write the application, you will use Qubole Notebooks which are great for developing applications in Scala, Python, and R, running ETL jobs in Spark, as well as visualizing results of SQL in a single, collaborative environment. Qubole Notebooks give data scientists and data analysts an easy way to interact with data stored in Cloud data stores such as Amazon S3, Microsoft Azure Blob Store, etc.

Steps

  • #1 Start the Spark cluster that you configured in Part 1.
  • #2 Switch over to Notebooks interface
  • #3 Create a new Notebook by entering respective values and selecting the Spark cluster you configured in Part 1.
  • #4 Set spark.executor.instances and spark.qubole.max.executors to 2. (Note: At the time of writing this, BigDL required these two variables to have the same value.) To do this, click on the Interpreter link located on the top right in the Notebook you just created. Then, click on edit on the top right within the Interpreter interface and scroll down to spark interpreter. After setting the above two parameters to 2, click on Save. Finally, click on the Back to Note button located at the top of the Interpreter interface.
  • #5 In the first paragraph, copy and paste the following Scala code. When executed, it will train, validate, and save the trained model.
import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.models.lenet.Utils
import com.intel.analytics.bigdl.DataSet
import com.intel.analytics.bigdl.models.lenet._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils._
import com.intel.analytics.bigdl.optim.{LocalValidator, Top1Accuracy, Validator}

import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}

val numberOfMinNodes = 4
val numberOfCoresPerNode = 8
val mult = 64
val batchSize = numberOfMinNodes * numberOfCoresPerNode * mult

Engine.init

def load(featureFile: Path, labelFile: Path): Array[ByteRecord] = {
  val labelBuffer = ByteBuffer.wrap(Files.readAllBytes(labelFile))
  val featureBuffer = ByteBuffer.wrap(Files.readAllBytes(featureFile))
 
  val labelMagicNumber = labelBuffer.getInt()

  require(labelMagicNumber == 2049)
  val featureMagicNumber = featureBuffer.getInt()
  require(featureMagicNumber == 2051)

  val labelCount = labelBuffer.getInt()
  val featureCount = featureBuffer.getInt()
  require(labelCount == featureCount)

  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()

  val result = new Array[ByteRecord](featureCount)
  var i = 0
  while (i < featureCount) {
     val img = new Array[Byte]((rowNum * colNum))
     var y = 0
     while (y < rowNum) {
       var x = 0
       while (x < colNum) {
          img(x + y * colNum) = featureBuffer.get()
          x += 1
       }
       y += 1
    }
    result(i) = ByteRecord(img, labelBuffer.get().toFloat + 1.0f)
    i += 1
  }
  result
}

val baseDir = "/media/ephemeral0/bigdl/mnist/data/"
val trainDataFile = "train-images-idx3-ubyte"
val trainLabelFile = "train-labels-idx1-ubyte"
val validationDataFile = "t10k-images-idx3-ubyte"
val validationLabelFile = "t10k-labels-idx1-ubyte"

val trainData = Paths.get(baseDir, trainDataFile)
val trainLabel = Paths.get(baseDir, trainLabelFile)
val validationData = Paths.get(baseDir, validationDataFile)
val validationLabel = Paths.get(baseDir, validationLabelFile)

val trainMean = 0.13066047740239506
val trainStd = 0.3081078
val trainSet = DataSet.array(load(trainData, trainLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(batchSize)

val testMean = 0.13251460696903547
val testStd = 0.31048024
val validationSet = DataSet.array(load(validationData, validationLabel), sc) -> BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(batchSize)

val state = T("learningRate" -> 0.05 / 4 * mult)
val maxEpoch = 15

val initialModel = LeNet5(10) // 10 digit classes
val optimizer = Optimizer(
 model = initialModel, // training modes
 dataset = trainSet, // training dataset
 criterion = ClassNLLCriterion[Float]()) // loss function
 
val trainedModel = optimizer.setValidation(
 trigger = Trigger.everyEpoch,
 dataset = validationSet,
 vMethods = Array(new Top1Accuracy)).setState(state).setEndWhen(Trigger.maxEpoch(maxEpoch)).optimize()

val validator = Validator(trainedModel, validationSet)
val result = validator.test(Array(new Top1Accuracy[Float]))
result.foreach(r => {
  println(s"${r._2} is ${r._1}")
})

trainedModel.save("/media/ephemeral0/bigdl/mnist/model/trainedModel",true)

Here’s what’s happening in code:

  • Import BigDL and other Java libraries needed for the application.
  • Call Engine.init
    • The engine is part of BigDL and it appropriately initializes executor environment variables and Spark properties in order to get the best performance for deep learning applications on Spark clusters.
  • Define load function
    • This function helps prepare our MNIST dataset. Recall that we downloaded the MNIST dataset in folder ‘/media/ephemeral0/bigdl/mnist/data/’ on the cluster via bootstrap script in Part 1. This function accepts file path and returns Array[ByteRecord].
  • Load data from MINIST by creating the BigDL DataSet and then applying a series of transformers to preprocess data into Mini-batch.
    • Preprocess data, convert byte records into a grey image, normalize it and convert a batch of labeled grey images into a mini-batch.
  • Train model
    • Before training the model, we need to set hyperparameters that determine how the network is trained. For more details on hyperparameters, refer to https://colinraffel.com/wiki/neural_network_hyperparameters.
    • We use LeNet-5 as our training model. LeNet-5 is a classical CNN model used in digital number classification. For detailed information, refer to http://yann.lecun.com/exdb/lenet/.
    • Then we create the Optimizer by specifying the DataSet, the Model, and the Criterion–which, given input and target, computes gradient per given loss function.
  • Validate model
    • BigDL provides a set of metrics to evaluate the model via Validator and ValidationMethod classes. We use Top1Accuracy as validationMethod which will calculate top 1 accuracy and should yield an accuracy of >= 98%.
  • Save model
    • Lastly, we save the trained and validated model so we can use it for our image recognition application.
  • #6 Run the above paragraph — it will take a few mins to train, validate and save the model. Once completed, proceed to the next step.
  • #7 In a new paragraph, copy and paste the following Scala code. When executed, it will load the saved trained model in the previous step to identify (“recognize”) digits in a given set of arbitrary images.
import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.dataset._
import com.intel.analytics.bigdl.dataset.image._
import com.intel.analytics.bigdl.models.lenet.Utils
import com.intel.analytics.bigdl.DataSet
import com.intel.analytics.bigdl.models.lenet._
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.optim._
import com.intel.analytics.bigdl.utils._

import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}
import javax.imageio.ImageIO
import java.io.File
import java.awt.image.{BufferedImage, DataBufferByte}
import java.awt.Color

Engine.init

def image_to_byte_array(imageName: String): Array[Byte] = {
  val imgPath = new File(imageName)
  val bufferedImage = ImageIO.read(imgPath)
  val scaledImage = bufferedImage.getScaledInstance(28, 28, java.awt.Image.SCALE_SMOOTH)
  val scaledBufferedImage = new BufferedImage(28, 28, BufferedImage.TYPE_BYTE_GRAY)
  scaledBufferedImage.getGraphics.drawImage(scaledImage, 0, 0, new Color(0, 0, 0), null)
 
  // get DataBufferBytes from Raster
  val raster = scaledBufferedImage.getRaster
  val data = raster.getDataBuffer.asInstanceOf[DataBufferByte].getData
  val bytes = new Array[Byte](8 + data.length)
  val byteBuffer = ByteBuffer.wrap(bytes)
  byteBuffer.putInt(scaledBufferedImage.getWidth)
  byteBuffer.putInt(scaledBufferedImage.getHeight)
  System.arraycopy(data, 0, bytes, 8, data.length)

  bytes
}

def load_image(imageName: String): Array[ByteRecord] = {
  val byteArray = image_to_byte_array(imageName)
  val featureBuffer = ByteBuffer.wrap(byteArray)
  val rowNum = featureBuffer.getInt()
  val colNum = featureBuffer.getInt()
  val result = new Array[ByteRecord](1)
  val img = new Array[Byte]((rowNum * colNum))
  var y = 0
  while (y < rowNum) {
    var x = 0
    while (x < colNum) {
      img(x + y * colNum) = featureBuffer.get()
      x += 1
    }
    y += 1
  }
  result(0) = ByteRecord(img, 1.1f)
  result
}

val testMean = 0.13251460696903547
val testStd = 0.31048024

val trainedModel = Module.load[Float]("/media/ephemeral0/bigdl/mnist/model/trainedModel")

val dir = "/media/ephemeral0/bigdl/"
val images = Seq("zero_28x28.png", "two_28x28.png", "three.png", "four_28x28.png", "seven.png", "nine_28x28.png")

for (img <- images) { val load_result = load_image(dir + img) val rddData = sc.parallelize(load_result) val transformer = BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToSample()
  val predictDataSet = transformer(rddData)
  val predictRDD = trainedModel.predictClass(predictDataSet)
  val predictedDigit = predictRDD.take(1)(0) - 1  // deduct 1 because the first label is always 0
  println("\r\n")
  println("***** Digit in file " + img + " is: " + predictedDigit)
}

Imp: Replace YOUR_S3_BUCKET with S3 bucket in your AWS account where you uploaded test images.

  • #8 Run the above paragraph — once completed, you should see output similar to the following:

As you can see from the results, the trained model was able to identify digits (0,2,3,4,7,9) in our test image files.

In Summary

BigDL is a great open source deep learning library built to run natively on Apache Spark and autonomous big data platform Qubole provides for a perfect deployment platform for BigDL as well as other deep learning and machine learning libraries.

To learn how you can use Qubole for various workload types, click here.

Start Free Trial
Read Processing Hierarchical Data using Spark Graphx Pregel API