A Data Scientist’s Guide to Model Deployment on SageMaker Using MLeap and Qubole Notebooks

Start Free Trial
May 16, 2019 by Updated April 1st, 2024

Our customers at Qubole use notebooks with Apache Spark as the back-end to build machine learning pipelines. Often, it is the data scientists who develop the model and the infrastructure/engineering team who deploy the models. This interdependency consumes a lot of time and is error-prone as well. This friction often leads to critical business delays.

In this blog, we will walk through an example notebook that can do it all: train the model using Spark MLlib, serialize the models using MLeap, and deploy the model to Amazon SageMaker. The steps are simple enough for data scientists to deploy models on their own.

What Is MLeap?

MLeap provides a serialization format and execution engine for machine learning pipelines. It supports multiple frameworks like Spark MLlib, Tensorflow, Scikit-Learn, etc. for training models and exports them to MLeap bundle. It is an actively developed and easy-to-use open source tool.

Spark also provides model serialization, which is more suited for batch prediction tasks. It is not the preferred solution for low latency online inference use cases. To perform low latency online predictions, MLeap is the ideal option. MLeap bundles are also portable and can be deployed using MLeap runtime on any cloud.

What Is Amazon SageMaker?

Amazon SageMaker is a tool designed to support the entire data scientist workflow. It provides the infrastructure to build, train, and deploy models. It also has support for A/B testing, which allows you to experiment with different versions of the model at the same time. The model runs on autoscaling k8s clusters of AWS SageMaker instances that are spread across multiple availability zones to deliver both high performance and high availability.

In this blog, we will serialize a model trained using Spark MLlib in MLeap format and deploy it to SageMaker. For Tensorflow and Scikit-Learn, similar steps can be followed. These model deployments can then be used with MLeap runtime to do real-time predictions.

Now, let’s take a step-by-step look at how to do this!

Prerequisite Steps

  1. Install boto3 (1.9.103) in your cluster using Environments. You can also use node-bootstrap to install the Python packages; however, Qubole’s Environments feature provides an easy way to manage Python and R dependencies and allows you to install packages at runtime dynamically without having to restart the cluster. Run the following snippet in the notebook to install MLeap and SageMaker.
%sh
#This installs MLeap and SageMaker. After next release, these can be installed via Environments and this paragraph will not be needed.
hadoop dfs -copyToLocal s3://paid-qubole/scripts/packaged/install_mleap_and_sagemaker.bash
source install_mleap_and_sagemaker.bash && install_packages py s3_path
  1. Add the following MLeap jars in interpreter settings. The following jars are for Spark version 2.3. MLeap works with all Spark versions > 2.0 expect 2.4. You can find the MLeap jar versions for the corresponding Spark versions here.

ml.combust.mleap:mleap-runtime_2.11:0.13.0
ml.combust.mleap:mleap-spark_2.11:0.13.0
ml.combust.mleap:mleap-spark-extension_2.11:0.13.0

  1. For deploying to SageMaker, we need to upload the serialized model to s3. Make sure that you have roles configured with policies for access to Amazon ECR as well as SageMaker APIs. You will also need to add a policy that allows you to access s3 buckets where your model will be saved.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        }
    ]
}

Train the Model Using MLlib

We are building a model that will be using the zoo data set from the Machine Learning Repository at UC Irvine to predict animals based on certain features. The data set has seven different classes of animals with seventeen Boolean-valued attributes.

%sh
# download the dataset
wget https://archive.ics.uci.edu/ml/machine-learning-databases/zoo/zoo.data
# copy to hdfs
hadoop dfs -copyFromLocal file:///zoo.data hdfs:///tmp/zoo.data
# Import libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# load the dataset into spark dataframe
df = spark.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load('/tmp/zoo.data')
df=df.drop('animal_name')

input_fields =  ['hair','feathers','eggs','milk', 'airbone','aquatic','predator','toothed','backbone', 'breathes','venomous','fins', 'tail','domestic','catsize', 'legs']
# Vector Assembler combines a list of columns into a single vector column
assembler = VectorAssembler(inputCols=input_fields, outputCol="features")

# To read more about decision trees, refer this link
dt = DecisionTreeClassifier(labelCol="type")
# create a pipeline with data preprocessing steps and model
modelPipeline = Pipeline(stages = [assembler, dt])
(trainingData, testData) = df.randomSplit([0.8, 0.2])

evaluator = MulticlassClassificationEvaluator(labelCol="type", predictionCol="prediction", metricName="accuracy")
# Find the best model parameters using ParamGridBuilder.
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 4, 6])
             .addGrid(dt.impurity, ['gini', 'entropy'])
             .addGrid(dt.maxBins, [10, 20, 40])
             .build())
             
cv = CrossValidator(estimator=modelPipeline, estimatorParamMaps=paramGrid, evaluator=evaluator)

# train on the training dataframe
pipelineModel = cv.fit(trainingData)
# best model
final_model = pipelineModel.bestModel
# transform the test dataframe
predictions = final_model.transform(testData)
accuracy = evaluator.evaluate(predictions)

print("Test Error = {}".format(1.0 - accuracy))

Serialize the Trained Model Using MLeap

Serialize the model using MLeap. MLeap also stores the structure of input DataFrame to validate the LeapFrames during inferencing (runtime).

import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
import os
file_name = '/tmp/mleap-zoo.zip'
if os.path.isfile(file_name):
    os.remove(file_name)
final_model.serializeToBundle("jar:file:{}".format(file_name), final_model.transform(testData))

Convert the model to the tar.gz format required by SageMaker and upload the model to s3.

import zipfile
import tarfile
import boto3

with zipfile.ZipFile(file_name) as zf:
    zf.extractall("/tmp/mleap-zoo")
    
with tarfile.open("/tmp/mleap-zoo.tar.gz", "w:gz") as tar:
    tar.add("/tmp/mleap-zoo/bundle.json", arcname='bundle.json')
    tar.add("/tmp/mleap-zoo/root", arcname='root')

s3 = boto3.client('s3')

filename = 'mleap-zoo.tar.gz'
bucket_name = 

# Uploads the given file using a managed uploader, which will split up large
# files automatically and upload parts in parallel.
s3.upload_file("/tmp/" + filename, bucket_name, filename)

Deploy Your Model to SageMaker

Initialize a SageMaker client and use it to create a SageMaker model, endpoint configuration, and endpoint. In the SageMaker model, you will need to specify the location where the image is present in ECR.

import SageMaker
import boto3
import json
from sagemaker.sparkml.model import SparkMLModel

boto_session = boto3.Session(region_name='us-east-1')
sess = sagemaker.Session(boto_session=boto_session)
sagemaker_session = sess.boto_session.client('sagemaker')
account = boto3.client('sts', region_name="us-east-1").get_caller_identity()['Account']

model_name = "simple-mleap-zoo"
endpoint_name = 'simple-mleap-zoo-ep'

# Define the input and output schema of your model
schema = {"input":[....], "output": {.....}}
schema_json = json.dumps(schema)

role = 'arn:aws:iam::{}:role/sagemaker-access'.format(account)
model_location = 
# SparkMlModel uses the image build by - https://github.com/aws/sagemaker-sparkml-serving-container
sparkml_model = SparkMLModel(model_data=model_location, 
                             role=role, 
                             sagemaker_session=sess, 
                             name=model_name,
                             env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})                      
print("Created model: {}".format(model_name))
endpoint_name = 'simple-mleap-zoo-ep'
sparkml_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

Happy Inferencing!

Once the endpoint is active, invoke the endpoint and get started with predicting.

# The data should be as per the schema specified in the above step
test_data = {"data": []}
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer, content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)
print(predictor.predict(test_data))

Next Steps

You can try this in a Qubole notebook by importing it using this link.

Conclusion

Qubole provides excellent integrations for data science workflows. It is very easy to quickly develop an entire pipeline to build, train, and deploy models, as shown above. This will enable data scientists to get quick feedback about their models without relying on engineering teams.

Start Free Trial
Read Using Qubole Notebooks to Predict Future Sales with PySpark