SparkSQL in the Cloud: Optimized Split Computation

Start Free Trial
August 30, 2016 by Updated April 24th, 2024

When it comes to Big Data processing in the cloud compared to on-premise, one of the fundamental differences between the two is how the data is stored and accessed. Not having a clear understanding of this underlying difference between, for example, AWS S3 in the cloud and HDFS on-prem leads to a suboptimal service, to say the least. At Qubole, we’ve made it part of our product development process to recognize these differences and have implemented significant performance enhancements in leading Big Data processing engines supported by Qubole Data Service (QDS) such as Hadoop, Hive, and Presto.

In this post, we will take a look at one such optimization we’ve implemented with regard to AWS S3 listings. This optimization enables split computations to run significantly faster (more on this below) on SparkSQL queries and is generally available in Apache Spark as a Service offered through QDS.

Split Computation Optimization

Performance Boost

We’ve noticed a significant performance boost as a result of this optimization especially when there are a large number of partitions in Hive tables. For instance, we executed a simple select query on a table with 5,462 partitions–each containing one file. Here are the results:

 

Without optimization (in secs)With optimization (in secs)Improvement
Query execution7821206.5x
S3 listing650881x

 

Looking at the numbers above, the performance gain due to the optimization is pretty clear.

Enabling it in QDS

In Spark versions 1.5.1 and above, you can enable this optimization by adding the following to the Spark application’s command line

–conf spark.sql.qubole.split.computation=true

Once enabled, QDS will leverage the bulk S3 listing optimization to perform faster-split computation on SparkSQL queries. This benefits partitioned Hive tables across all formats that depend on FileInputFormat. It does not affect unpartitioned tables and Parquet/ORC tables because they have their own way of listing S3 files.

How Does It Work

Spark’s TableReader creates one HadoopRDD per input partition. When this occurs, QDS adds the partition directory to a list. Later, the DAGScheduler calls getPartitions on each HadoopRDD as part of discovering dependencies and when getPartitions is called on the first HadoopRDD, QDS issues a bulk S3 listing and populates FileStatus cache. For all subsequent calls to getPartitions, QDS will use this cache, thereby speeding up the split computation.

Stay Tuned

In the future, we will:

  • Post a more in-depth follow-up blog describing the technical details of our implementation
  • Extend this optimization to Parquet and ORC tables
  • Improve performance of partition recovery in SparkSQL
  • Improve performance of INSERT-OVERWRITE queries
Start Free Trial
Read Presto Ruby Client in QDS