Qubole Announces Open Source Spark Tooling to Improve Stability & Performance


Piero: My name is Piero, a data science product manager at Qubole. I’m responsible for improving our product and for data scientist. Today I’m going to talk about Spark tuning tool, which is an open source tool that we developed internally at Qubole to tune our Spark jobs. I want to give you a quick brief agenda for today. So, I’m just going to give you a couple of slides on whose Qubole and what we do.

Why do you actually need Spark tuning and then, what is our different strategy for tuning Spark jobs and what is use case? Then, give you a final overview and the next steps. What is Qubole? Qubole is a big data, cloud-native activation platform. What does that mean? We make your life easier for spinning up clusters and managing clusters for you when you only have to worry about doing your data ETL jobs or machine learning jobs but you want to have to worry about infrastructure later. We sit on top AWS, GCP, and Microsoft Azure and we do cluster management to configuration of Spark clusters, Hive, Presto, TensorFlow and we support also GPU accelerated instances.

This a little bit of our architecture. We cater towards data scientists, data analysts, data engineers, and cloud admins and, we support these engines like I said before. We have features like, autoscaling, caching, spot buying for AWS, alerts insights and recommendations for how to optimize your queries and reduce your cost and serverless architecture for SQL, a project that we call Aria.

We also are quite an open architecture. We support airflow, we support open source projects. We have a bunch of integrations with Jupyter notebooks, iPython notebooks and the like. We work the way you do. We can take a lot on Spark. Spark is a multi-purpose engine but, a lot of their time, the Spark jobs are filling and this is where I’m referring to you to stability tuning.

The jobs are not actually completing and the tool doesn’t cover that part. We’re in the process of devising a tool for memory configuration, that’s a GC/CPU-aware task scheduling and that’s coming up in the middle of 2018. To configure your job, you can look at the Spark UI, you’re showing the task, the jobs, you’ve shown the executors, the memory, the JVM settings, all that stuff.

That can be a little bit overwhelming. There’s no aggregate information, there is no counterfactual information on what type of actions need to be taken. That information is not readily available. There’s also a different type of tuning that you can do, even if a job completes, there could be other ways to complete the job that cost a tenth of what you’re doing right now.

There’s no good way for you to do that right now this far. That’s where our tool is coming into play. It’s this cost performance trade-off, suggesting you improvement to your configuration or your core changes and giving you an insight of where you should make changes to your core. This is the traditional approach to distributed app tuning. I’m getting a little bit of feedback, can we turn down the volume a little bit maybe.

CPU and data compression, memory tuning, network communication. You can look at all these indicators but, our approach is a little bit different we’re actually focusing mostly on task scheduling. By only focusing on those things, you would be surprised you can get huge improvements without even doing any configuration on the other elements of your architecture.

This is why distributed app tuning is so hard because, there’s so many elements that come into play. Really, the task scheduling part can give you a whole lot of benefit just by tuning that part. Here’s a Spark application example. This is very abstract. I’m not showing you any details. This is just a timeline of what’s happening. The blue boxes are the tasks that are happening in the driver.

If you will, the x-axis would be time and the y-axis is whether you’re spending time in the driver and the executor. The blue box is the driver. I should say, the dark blue boxes are driver, the light blue boxes are the executors. You will see in this example, all the orange boxes is your application is not doing anything. There’s a lot of do-nothing time where your time is wasted.

You’ve got this part here because you have this Que in your task. This task last a lot longer than this one. Here, your application is not doing nothing. There’s this executor that’s sitting there. There’s three executors here. This one is sitting here idle, not doing anything. You’re spending a lot of time in your drivers. Here, this time is wasted when you should be paralyzing this task and moving it away from your driver into your executors.

These are two liners. This is an example output of our report. What is the minimum time of the app based on the critical path? The critical path is the minimum time that your application has to spend to finish the task. The critical path, in this case, would be here. Without changing the critical path, even if you increase your number of resources, in this case, you’re still spending almost 128 minutes running your app.

What is the minimum possible time for the app with the same executor, but if you’re paralyzing the task at most and you’re decreasing this skew. I’ll explain in a second what those terms mean, but you can improve your application to this extent from 127 to 43 minutes. The first principle that we’re embracing is, move tasks from the driver to executor to work in parallel. So, I essentially shrink the time that you’re spending in your driver.

Here’s a driver executors report. You’re spending 26% of your time, in this example. 41 minutes out of 158 are being spent in the driver versus 73%. If you can bring that to 1% or 2%, you’re golden. You’re doing a good job at paralyzing your application as much as possible. What are the actions that are taken in the driver? These are some examples of things that you can do.

This could imply core changes, it could imply parameter tweaking but, essentially, you should keep in mind this. There’s things that are being listed in the driver. If you’re querying your S3 bucket or your Blob Storage in Azure, you’ll be listing things and files in your query, then those things that are happening in the driver and you’re splitting computations across the executors.

That job is mostly done in the driver. You’re loading the Hive tables. By loading, there’s a temp directory that’s being created in Hive. That’s a metadata operation in HDFS but in S3, it could potentially mean copying the data from your finals to your staging and back to your finals. That can be very time-consuming. You’re collecting or taking data from your executor or your driver.

Essentially, you’re saying, bring your data back to your driver and then, depending on which algorithm you choose, that can be more or less heavy. You can look at these things and try to minimize the amount of time spent in the driver. The other principle is, increase the number of tasks. It’s always much much greater than a number of cores. In this case, in this part here, we only have two tasks but, we have three cores.

This guy is not doing anything, so this is bad. In order to change it, you can tweak these parameters. If you’re in HDFS, you can tweak the block size. You can change the min/max split size, you can tweak the spark-default.parallelism parameter or you can increase the number of partitions that your data is operating in. You can re-partition your data so you’re not doing as much shuffling on the data round and using less network communication. The third principle, and this is a little bit harder to do but, you can still be done, is how do you decrease the skewness so that to make the task as uniform as possible. Essentially, why this happens is because the data, the keys in the data split the data in uniform parts. When you’re doing data operations, you’re spending a lot of time in some parts of the data because, the data chunks are much larger than others. We can provide more detail but, there’s a lot of tweaking that can be done there.

Here is an example of the wasted time report. In this case, we’re wasting 98% of our times. If you calculate all the computer hours in all the executors, we’re wasting 98% of our resources. This is massive, it means you’re completing your job but it’s extremely inefficient and you don’t want to do that. We want to try to optimize the user resources. What does an ideal spark application look like? It looks like very little time is spent in a driver, very little skew and lots of parallelism. And, so, there’s very little doing-nothing areas that are being executed.

Here’s a very simple– well, apparently simple, the seemingly simple example use case. This is a large travel website of one of our clients in the US. They gave to us a query and it was taking 158 minutes with the cluster of 50 nodes. A huge job and it was a puzzle while it was taking so long. With the help of our Spark tuning tool, we diagnosed where the problems were and we came to the conclusion– and this took a few weeks of work, it wasn’t just that simple because it was a job that had almost 800 stages, so it was an extremely complex job.

But, first conclusion, there was not enough parallelism, and that was due to a very simple inefficiency here, the table here, you’re doing it for each and, within each partition, you’re writing back to your disk. So, it’s essentially wasting a lot of time instead of creating unnecessary tasks with the line of code. Just by changing this into this, we saved a ton of time. That was the first change.

The second change is to increase the number of default partitions that Spark operates on. The default partition is usually 200 but, when your data is extremely large, that may not be enough because you want to create more tasks, then there are cores. So, we increase the number of partitions, the shuffle partition parameter to 800, and then, improve the job. The first change brought down with exactly the same number of resources, run time from 158 to 42 minutes and, with the second change, from 42 to 10 minutes. With these two changes. You can see that sometimes, in the rush of doing things and you’re just getting your stuff done, but there’s huge inefficiencies and you can complete the task just with a fraction of the resources that are taking place right now.

Next, I want to just give you a quick demo. This is what Qubole looks like. First, I’ll go the Analyze tab that’s formatted a little bit nicer. Essentially, what we’re doing here, you are submitting a Spark job and you’re adding this jar, that’s open source, and we’re profiling the code by embracing it with these curly braces. And then, this is just a very silly example but it gives you an idea of what the code is doing. The output looks something like this, this is the report and, by stage ID, it give you how much time was spent in this particular stage and what was the IO percentage here and how much shuffling was there, and how many compute resources were wasted.

Also, what is the PR ratio is the number of tasks in the stage divided by the number of cores. Essentially, if you’re doing enough parallelism, in that particular stage, and then, there’s a bunch of other very technical indicators here on garbage collection and the like, they aggregate and summarize what’s available in the Spark UI. But, this is the, I would say, the less interesting part, the more interesting part is, we also provide a counterfactual on here, if you were to increase the number of executors from one to 10, we estimate the time for you.

Here, you can see that, by increasing the number of executors, you’re not doing so much because of that critical path. If you’re not creating enough parallel tasks, if you’re not creating enough tasks, you’re not changing those parameters, your data is very skewed, then, no matter how many cores you add to the job, it may still not improve your performance that much.

Here, you have that little simple output I gave you, how much time is spent in the executor versus the driver, it gives you an idea of whether you want to make those changes around the driver. Then, here’s the report on time wasted on this task. This is less interesting, on the example that I show you, but just to give you an idea of what the output looks like. This is our Analyze tab where it keeps the history of commands. We also have a notebook interface, where you can look at the example the same way. Here, I configured the notebooks with having that jar available and I did exactly the same thing and it’s shown in the report, it can be also using the notebook.

We have a blog post about this. We also make the source code available for you, and we also are, in front here, we have a booth right in front here if you’d like to hear more details about this or talk about any of the things I’ve spoken about today. Is there time for some Q&A? I’m sure– how does that work? Three minutes? Are there any questions? All right. Thanks guys, thanks for listening.