Amazon Managed Streaming for Kafka (MSK) with Apache Spark on Qubole

Start Free Trial
January 15, 2019 by Updated April 4th, 2024

AWS recently announced Managed Streaming for Kafka (MSK) at AWS re:Invent 2018. Apache Kafka is one of the most popular open source streaming message queues. Kafka provides a high-throughput, low-latency technology for handling data streaming in real time. MSK allows developers to spin up Kafka as a managed service and offload operational overhead to AWS.

In this post, we will demonstrate how you can quickly build a stream data processing pipeline using Amazon MSK with Apache Spark Structured Streaming on Qubole. We will walk through the minimal setup steps and provide a working code example with detailed explanation.

Amazon MSK + Spark Streaming on Qubole

Prerequisite Steps:

First, you must execute a few steps to set up a Kafka cluster in AWS MSK:

  1. Create VPC for MSK cluster:
    https://docs.aws.amazon.com/msk/latest/developerguide/create-vpc.html
  2. Enable HA and fault tolerance for Kafka:
    https://docs.aws.amazon.com/msk/latest/developerguide/add-subnets.html
  3. Create Kafka cluster:
    https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html
  4. Create Kafka topic:
    https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html

See AWS documentation for MSK for detailed information.

After the above steps, we will have a Kafka cluster ready in Amazon VPC.

Important Information:

We need a few details from the above setup, which will be used to connect Spark on Qubole with MSK:

  • VPC name (e.g. quboleMskSampleVPC )
  • VPC ID (e.g. vpc-0e0e80……… )
  • VPC security group ID (e.g. sg-0b08……….. )
  • VPC security group name (e.g. default )
  • Kafka topic name (e.g. quboleMskSampleTopic )
  • Zookeeper Info of Kafka Cluster (e.g. “ZookeeperConnectString”: “10.0.0.254:2181,10.0.2.85:2181,10.0.1.110:2181”).
    This can be fetched using command:
    aws kafka describe-cluster –cluster-arn <clusterArn>
    Refer to this AWS documentation for more detail.
  • Broker Info of Kafka Cluster (e.g. “BootstrapBrokerString”: “10.0.0.59:9092,10.0.1.86:9092,10.0.2.68:9092”)
    This can be fetched using the following command:
    aws kafka get-bootstrap-brokers –cluster-arn <clusterArn>
    Refer to additional AWS documentation for more details.

Now that we have our environment configured and ready, use the following steps to build a Spark structured streaming application on top of MSK using Qubole Notebooks:

  1. Login into your Qubole account
  2. Go to the clusters page. Create a new Spark cluster or edit an existing cluster using the configurations below:“Advanced Configuration” tab -> “EC2 Settings”:
    Enter the MSK VPC value (i.e. quboleMskSampleVPC ) in the VPC field. Choose any of the subnets from the VPC in subnet field.
  3. Click on the “Update” button to update cluster settings.
  4. Get the Security Group name of the Spark cluster (e.g. sc-qa3_qbol_acc2345_……). Go to “Security Groups” in the AWS console and search for the security group for Kafka VPC (with vpc id):
  5. Edit “Inbound Rules”: Add a new entry for the security group to the Spark cluster to allow all Traffic.*

*This is required for the Spark cluster to send/receive data from the Kafka cluster in AWS VPC and is similar to this step in the AWS guide.

  1. Start the cluster, wait until it is up and running.
  2. Create a new notebook in Qubole, and associate it with the newly created cluster. Now add and run the following Scala paragraphs one by one in the notebook.

Define MSK/Kafka parameters with values from the “Important Info” section. We only need to get information about Kafka brokers and Kafka topics to start processing.

1. AWS MSK params

val topics = "quboleMskSampleTopic"
val brokers = "10.0.1.86:9092,10.0.0.59:9092,10.0.2.68:9092"

Now, write Spark streaming code to process the data. Also, add a Kafka producer utility method to send sample data to Kafka in Amazon MSK and verify that it is being processed by the streaming query.

In our example, we have defined that incoming data from Kafka is in JSON format and contains three String type fields: time, stock, price.

We will extract these fields from the Kafka records and then aggregate over to the stock field to find the number of records per stock.

2. Code for Structured Streaming and Kafka Producer

import sys.process._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import java.util.HashMap

object QuboleKafka extends Serializable {
    
    val topicMap = topics.split(",").map((_, 4)).toMap
    val topicToSend = topics.split(",")(0)
    val group = "group"
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
      
    @transient var producer : KafkaProducer[String, String] = null
    var messageId : Long = 1
    @transient var query : StreamingQuery = null

    val spark = SparkSession.builder.appName("MSK streaming Example").getOrCreate()

    import spark.implicits._ 
 
    def start() = {
        //Start producer for kafka
        producer = new KafkaProducer[String, String](props)
        
        //Creare a datastream from Kafka topics.
        val df = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topics)
            .option("startingOffsets", "latest")
            .load()
        df.printSchema()  
        
        // define schema of data
        val schema: StructType = new StructType().add("time", "string").add("stock", "string").add("price", "string")
        
        // covert datastream into a datasets and apply schema 
        val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
        val output = ds.select(from_json($"value", schema).as("data")).select("data.*").groupBy("stock").count()
        output.printSchema() 
        output.explain(true)
        
        // write stream to sink
        val query = output.writeStream.queryName("AggregationExample")
            .format("memory")
            .outputMode("complete")
            .option("checkpointLocation", "/tmp/test")
            .trigger(Trigger.ProcessingTime("1 seconds"))
            .start()     
    }
    
    // Send message to kafka
    def send(msg: String) = {
        val message = new ProducerRecord[String, String](topicToSend, null, s"$msg")
        messageId = messageId + 1
        producer.send(message)
    }
  }

Next, we will start the streaming query : 

3. Start streaming

QuboleKafka.start

We can also send some sample data into Kafka brokers in the expected JSON schema format by running a Kafka producer in the notebook to verify the streaming query is processing Kafka data correctly:

4. Send data to Kafka

val input: String = "{\"time\":\"1545342455623\",\"stock\":\"fb\",\"price\":\"400.0\"}"
QuboleKafka.send(input)

Verify the processed data:

5. Verify output of streaming query

spark.sql("select * from aggregationExample limit 10").show()

Lastly, stop the streaming query:

6. Stop streaming

spark.stop

We can restart the query again by starting from Paragraph 2. It will resume with the same state of data from where it left off using the checkpoint location.

The complete code for the above notebook example can be found here.

Conclusion

Qubole provides excellent support for running Spark Structured Streaming jobs on AWS. Learn more about in our Spark Streaming on Qubole webinar. It is very easy to quickly integrate Spark Structured Streaming on Qubole with AWS MSK, as shown above, to continuously process streaming data in near real time.

Start a free trial to get started.

Start Free Trial
Read Qubole Receives ISO 27001 Certification