Cloud Control: Efficient Hadoop ETL Processing


Jorge Rodriguez: My name is Jorge Rodriguez, I’m with BloomReach. We make pretty heave use of spot instances to run our various pipelines, in this case, we’re going to be talking mostly about our ETL processing. At BloomReach, we have achieved high levels of throughput with low latency by using spot instances and by optimizing bidding strategies. I’ll talk a bit about all those things.

I’ll start with telling you guys a little about our company, then I’ll talk about the goals of our infrastructure and what we’re aiming to do, and then some of the challenges that we face because we use a lot of spot instances, and then I’ll go into our infrastructure, both the Qubole side where we do the Qubole as a big data service and some in-house technology that we built, which is what we call the BloomStore Compute Cloud, it’s built on the Elastic Compute Cloud, and then I’ll take some questions from you guys.

At BloomReach, we’ve built a personalized discovery platform, it features applications that analyze big data, makes our customers’ digital content more discoverable, relevant, and profitable. I’m a tech lead on our data platform team and basically, I’m in charge of running our multi-regional Cassandra database, scaling on the back-end for the back-end pipelines and our real-time data processing infrastructure.

A little bit more about BloomReach, we basically have three lines of products. The first line is about customer acquisition, second one is about customer experience, and the last one is about content marketing and merchandising. Our customer acquisition is essentially enhanced discoverability, improved organic search, and basically content management and content optimization on our customers’ websites. Our Snap product is a search navigation and personalization platform, so it’s search as a service on the cloud, and our compass product is a marketing tool, so essentially it’s site analytics, analyzing user behavior, making recommendations on how you should improve your website. We run all of our infrastructure on AWS.

Some of the goals of our ETL infrastructure, in particular, is we really need to process our customers’ data fast. We receive a lot of product feeds from our customers, and we need to reflect the latest information very quickly and basically across all of our products, especially the site navigation personalization product. We really need to achieve in-order feed processing, we receive delta feeds from our customers, sometimes every hour, sometimes every half hour, sometimes every 6 hours, basically however often they want to feed us their data. Then we produce the data, and we replicate it to a production serving infrastructure. The most important thing for us, is linear scalability, we have right now about 125 customers and we’re hopefully trying to grow that very fast, so every time we bring in new customers, we really want to onboard them onto our system really quickly. We don’t really want to be expanding our main infrastructure, on-demand infrastructure, every time we’re trying to bring in one or two maybe larger customers. We find that we can get the maximum processing capacity for a budget. Spot instances in general, we found that they give us about 10 times the resources for the same price.

So, what is the challenge of AWS? It’s basically, Mike touched on that quite a bit, they can be reclaimed by AWS at any time. When we first started, we didn’t even have the two minute warning. You’re running something, and all of a sudden you’re not running it anymore. Fluctuations in spot prices, they can affect cost quite a bit, also you just want to be careful with hard-coding instance types and putting in tight bid prices to make sure your instance is not reclaimed, because you actually wind up paying more. If your bid price exceeds the on-demand price, I think Amazon would be glad to charge you more than the on-demand price [crosstalk].

Participant 1: Yes, spot price can absolutely be more expensive than on-demand. That’s probably when you should choose on-demand.

Jorge: You kind of shoot yourself in the foot if you’re bidding more than the on-demand price. If you’re going to bid that much, launch on-demand.

Participant 2: Do you pay your bid price?

Jorge: No, you pay whatever the market price is.

Participant 1: You pay the market price for the spot at that time, and in turn, the instance would be terminated if the market price goes above your max percent, so I think what you’re suggesting is don’t send the maximum too high.

Jorge: Exactly, yes, don’t go overboard on your max because-

Participant 2: ….even if you would set the price of an Spot Instance?

Jorge: It can still be reclaimed by AWS at any time, so now you have the unreliability of spot, and you’re paying the price of on-demand, it doesn’t necessarily make as much sense. It makes sense when you’re saving money or when you’re getting much higher throughput for the same amount of money, which is really what we’re trying to achieve.

All right, I’ll go a little bit of a high level for our ETL infrastructure, and we actually use this very similar infrastructure across many of our teams. A lot of back-end pipelines use this kind of infrastructure. For the ETL, we have FTP servers or customers will upload their files onto FTP servers, we have a pooling client on those servers that will then launch our ETL jobs. The ETL jobs are running on Qubole, which is Qubole’s Hadoop as a service. They depend on something that we built in-house which I mentioned is called the Bloomstore Compute Cloud. The Bloomstore Compute Cloud basically is an API, and you tell it, “I need this data, and I need these Solr collections, please launch me a cluster”. What we’ll do is we’ll on-the-fly launch you a cluster that will either have Solr, Cassandra or both. We’ve actually expanded for real-time streaming applications, we expanded to installing Spark and installing Kafka and some other technologies as well. For the most part, most of our jobs right now, they’re depending on Cassandra and Solr. What happens is, the job will run, it will create whatever data needs to be created on these spot clusters, EC2 clusters, which are running on spot, and then it will promote that data to the production infrastructure.

Our production infrastructure is Cassandra as a data store, and Solr for searching. We run all of our infrastructure on AWS, and we run across different regions, but all of our back-end pipelines are running in the east region right now. Front-end, we’re in the west, in the east, we’re in the EU, and we just recently expanded to APAC.

The Hadoop part of this is, here we’re showing the ETL, it runs on the Qubole data service which is on EC2. The biggest feature that we benefit from on Qubole is the auto-scaling. It’s really awesome how it works, it’s really saved us a lot of time and effort. What you do is, you set up a Hadoop cluster and you tell it how much spot utilization you want and how much on-demand utilization you want. For all of our production jobs, we’re right now running at about 90% spot and 10% on-demand, so basically, our master nodes run on-demand and some worker nodes as well. For staging in dev pipelines, we’re running 100% on spot. Another benefit of Qubole is whenever you get a job, it gets kicked off right away and it starts to run, and the cluster will expand or contract depending on what workload you’re giving it.

There’s a lot of user interface that we get from Qubole, we get a lot of jobs success failure tracking and this is both as in visual interfaces and we also get APIs which we can use to automate our tracking. It’s got simplified log access and even when your instances are taken away, for the most part, you get your logs persisted, which is really useful for when you’re trying to debug. Did you have a question here? Okay. This has really reduced our operational overhead. One other nice feature that we like on Qubole is the fact that it provides us with the Ganglia metrics. We have a manager on our team who’s all about utilization, so he’ll come by, and he’ll check out your metrics and if you’ve got some free memory, if you’ve got some free CPU, you better get on that. You better be using less instances, you better be maximizing utilization on those instances. He one time told me that, “If you’re stealing some resources from a neighbor VM, you’re doing a great job, you deserve a raise”.


Jorge: Some of the numbers for our ETL pipelines, we’re running about a 1,000+ jobs per day, we’re processing over 40 million products into our infrastructure, the numbers are much larger when you look at skews versus products, like different iterations of a product, but a little bit harder to get the numbers. The job runtime, in general, is between 10 to 20 minutes for each of these jobs, there’s a couple of outliers if customers have really large data or really large number of products. The spot instance failure rate that we’re experiencing is about two over a 24-hour period, so for our use case, that’s actually just about 4%. The nice thing about on Qubole, is that even when spot instances are reclaimed, your job doesn’t necessarily have to fail because it will just spawn a new spot instance and your job will continue running.

All right, on to Bloomstore Compute Cloud. This is elastic infrastructure that we run on AWS. As I mentioned, we will provision a Cassandra Solr cluster on-the-fly and it will replicate whatever data you requested to that cluster, it supports both spot and on-demand instances. Mostly we use spot, I would say like 99% of the time we’ll use spot. There’s some very rare cases where, if a job will fail two times, then the third time we’re going to go to on-demand because we have to make sure that it succeeds quickly, we don’t want to let too much time run by so we’ll take the hit. It’s a create-use-terminate model so you’ll create the cluster, you will run your pipelines on it, when you’re done, you’ll publish your data to whichever place you want, for the most part, it’s going to be your production infrastructure, and then when that’s done, you’ll terminate your instances.

This is all the things that we built in-house, so what is the technology behind it? Mainly it’s cluster management, it’s dynamic cluster provisioning and resource allocation. It allocates instances according to requirements and costs. Mike was talking about building a model based on spot price history, and what the current market is. I’ll go a little bit more into detail into our optimal bidder in a second. Another thing that we do, is we pool instances. This is actually something that’s both helped in terms of speed, the jobs will obviously run faster if you don’t have to wait the five to seven minutes of provisioning time. You get reliability, because we found that AWS has been very reliable for us, but I would say, the least reliable piece for us has been the APIs, so the provisioning of the APIs. We’ve had a couple issues over the last six months. We’ve had two, which isn’t necessarily that much, but if you’re really relying on that API, then that can actually hurt you. Neither of those were necessarily too long, but for us even an hour is kind of long. What we do is, when a job is finished and it’s done using an instance, we’ll reclaim it and we’ll put it back into a pool. Depending on the demand, we might terminate it once the hour us up. As Mike mentioned, if you have AWS, you pay for an hour. If an instance has ran for an hour and five minutes, then it doesn’t necessarily make sense to terminate it because if they reclaim it, yes, you won’t have to pay for those five minutes, but if you’re the one terminating the instance, now you’re paying for two hours. You just lost 55 minutes of processing time on that instance. What we’ll do is we’ll put that instance back into a pool, and when the next job comes in, we’ll reuse that instance for whatever we need to use it for. That saves us money, gives us additional reliability, because even when the AWS APIs go down, if they do, you can provision instances, but the instance that you were already provisioned, they’re fine. In fact, the spot prices are going down because nobody can provision. If you’re keeping your instances around, then you’re making use of them and you’re saving money. In fact, take it down whenever you want guys. We’re good with that.


Participant 3: I don’t know if I like that.


Jorge: The other two pieces of technology essentially, are data management technology. We have a Solr halfed API, which essentially manages Solr data, it can replicate from one Solr cluster to another, it can do verifications on the data sanity, it can do data integrity checks. You actually use this for a production infrastructure as well. It’s an open source project. If you guys check out our blog post, there’s really cool blog post on the Solr halfed technology, it may even be useful for some people. Then the other one is a Cassandra replication service. Solr does have a replication API which you can use which was really handy, we had to build around it a little bit. On the Cassandra side, there really was no replication technology we could use, so we went ahead and we built our own, and it’s been working fairly well for us.

Our optimal bidder, the way that our optimal bidder works, it’s just a simple API. You tell it what requirements you want. You tell it your disk requirements. Do you need an SSD or are you okay with a spinning disk? How much storage do you need for each particular individual node. How much storage do you need for the whole cluster together. Same with a memory. How much memory do you need per node. How much memory minimum do you need for the entire cluster. Again with the course, how much processing power you need per node and per cluster. The optimization function you can specify, whether you want to optimize on the CPU per dollar cost, or if you want to optimize on the memory per dollar cost. Internally, what the optimal bidder is doing is basically, exactly what Mike said you should do if you want to use spot instances on a large scale. It’s just building a model, it’s looking at the history of the spot prices and it’s trying to make intelligent decisions about how much am I willing to pay for this instance. Then obviously, the output of the bidder is which instance type you should get, how many instances of it you should get, which availability zone, because the prices will vary depending on the availability zone, and how much the current spot price is, what is it going for right now, and what you should bid. It always takes into account what is the on-demand cost, don’t bid more than the on-demand cost.

EC2 also came for us with a lot of additional benefits. One of the biggest ones is, if you think of your Cassandra cluster or your Solr index, it’s a very fixed resource. Especially on the Cassandra side, it’s really hard to scale up and scale down a Cassandra cluster when you have a lot of data in it. For us, in our use case, if we want to add a node to our Cassandra cluster, it takes us six hours. If we launch it, and it starts to join, it’ll take six hours before that node is running. If you’re working on a peak workload, and a back-end load, you really don’t have six hours, by the time the six hours have passed, you already missed your SLA, and you’ve got upset customers, and it’s never a good thing.

Isolation is really a good benefit, pipelines get their own cluster, they cannot interrupt each other, we don’t have one team arguing with another about how their scans are making their lookups run a lot slower. It’s multi-tenant, multi-application support, so we have various teams. We have, I think, seven team now at BloomReach. All seven teams are actually using our EC2 infrastructure. We use it not just for running these back-end jobs, we actually use it for dev purposes too. If a developer wants to do some experimentation on Solr or Cassandra, they’ll go into our API and they’ll launch a cluster and they will replicate the data that we already have, whatever data they want, and they can start to experiment, run their analytics, run their pipelines, it’s really good for a developer use case. Sometimes, they’re a little bit annoyed if the instance does get taken away, but we’ve done a good enough job with our bidding that that rarely happens.

It provides dynamic scaling, each pipeline can define its replication loading requirements. For example, on a Cassandra pipeline, if you’re doing a lot of scans, you’re going to need a lot of CPU basically, that’s what we’ve found, especially if you’re scanning a large amount of data. The disk I own, the SSD, is really fast but the CPU is actually really important as well. It’s a production safeguard because there is no direct access to our production back-end, it safeguards from bad clients and access patterns. When we first got started about three years ago, we used to have a lot of issues with developers kind of getting a little crazy, maybe they were putting too many nodes into their EMR job. Next thing we know, we’re getting paged left and right because they’re hitting our back-end clusters way too hard and things are crashing. This actually really helped us to safeguard the back-end from clients. Right now, we really don’t have any direct access to our production infrastructure from the back-end, only from front-end applications. Again, we’re saving on costs because we’re provisioning spot instances, we’re trying to be smart about how we bid for the spot instances, and this saves us a lot of money, gives us a lot more compute power for the same cost and it keeps our margins nice, when you’re a startup, that’s always a nice focus to have.

Some stats on our EC2 infrastructure, right now, we’re running about up to 250 Solr Cassandra clusters per hour. This is across multiple teams, not just the ETL infrastructure. Our spot instance failure rate on most days will be between zero to five spot instances will get reclaimed for the whole day. Our worst day, I took a look for this presentation at the last month, what was our worst day. In our worst day, we lost 85 spot instances for the entire day. It’s about three and a half per hour. If you’re considering that we’re launching 250 clusters per hour, that’s a fairly low rate of instances getting taken away from us. We’re getting the benefit of the lower costs and we’re not really paying a price in losing out on instances. The nice thing about spot instances is, I think, that even if there’s a lot more competition, the spot fleet is something that’s going to bring a lot more users to the market. I actually think that’s probably going to be even better because AWS always has to stay ahead of the demand. Even if there is more a spot usage, I really feel AWS is going to keep up and keep launching infrastructure so that we can continue to use it, at least, I hope, but if they don’t, we can always fall back to on-demand. It’ll hurt our pockets, but our stuff is still going to run just the same. We’re processing about 40 million documents per day for the ETL pipeline and across all of our teams, about 160 million documents per day.

In conclusion, you can find ways to work around the unreliability of the spot instances. It’s going to take some experimentation, it’s going to take some work, and it’s going to take some solving of the problems. Building your price model is going to take you some time but, in the end, it’s really going to be worth it because you’re going to save a lot of money. You’re going to get a lot more done for a lot less money, and that’s something that’s going to help your company in the long term. You build this tooling, you build it one time, you may make some improvements, iterative improvements here and there, but in the long term, you’re going to save a lot of money. One second. We have some good blog posts about this subject and about our production infrastructure, it involves Cassandra and Solr on our engineering blog. I’d encourage if you’re interested to take a look at that.

We’re hiring. Obligatory recruitment, if I didn’t put that in there, I’d get in trouble with the recruiting team. Feel free to reach out to me, my email is [email protected] .