Karthik Panel – Data Platforms 2017
Karthik Ramasamy: I’ll be talking about some of the Twitter infrastructure that we built especially with the focus on real-time. I used to manage the real-time compute team. Like in March I left Twitter and inspired by Ashish and Joy I decided to start another company based on some of the technologies that we developed. I am a co-founder of a company called Streamlio. We are currently at stealth mode but we will come out in a few months.
Why real-time? The information age data is coming at a high velocity and there are a lot of data sources that are continuously generating real-time, ranging from data sources from Twitter and data sources coming out of IoT data and all data coming out of games when you’re playing in dynamically.
There are a bunch of sources and more and more we are going into a real-time connected world ranging from autonomous cars to connected vehicles to Internet of Things as well as digital assistants and health care, machine-generated data. The huge amount of data like Ashish Thusoo mentioned, with over 44 Exabyte of data expected by 2020. These are going to be one of the bigger contributors for those numbers.
Why real-time? At Twitter we do real-time because the nature of the data that is coming to us is in the form of streams. What we do with that stream of data coming at Twitter is to identify some real-time trends, what is going on in the world right now. Then during sports and other real-time actions, lot of millions and millions of conversations happen on the Twitter stream itself. In order to bubble up those conversation and exploit those conversation for monetizing purposes we do lot of real-time.
Of course, our monetization strategy used to be or is still based on ads. We have to inject relevant ads at relevant time at relevant conversation. Again this has to be done on real-time. Think about ads going out during Super Bowl time because that is a very active time where lot of people are active in Twitter and we need to inject the ad which has increasing likelihood of being clicked.
Then finally we have a real-time search where if a twitter tweets lands up in the Twitter infrastructure it has to be indexed within a couple of under milliseconds and it has to be searchable right away. All these are a big challenge.
If we look at real-time the value of data decreases over a period of time. It has the highest value when the data is produced right away, where you can make preventive and predictive decisions. As the data ages, it becomes actionable and slowly moves into more of the traditional batch intelligence. Our real-time focus is mainly on the real-time in milliseconds as well as in seconds.
What is real-time? If you go and ask a Wall Street broker or a Wall Street trader, “Give me a millisecond or probably a nanosecond.” Give me a nanosecond because every nanosecond gain is considered as a gain in terms of the number of dollars that they earn. On the other hand if you go to retail companies and ask for what is your real-time, it’s probably a day. It’s all contextual.
From a real-time space like, if you look at the data analytics, you can broadly segment into four categories. The real real-time which is the low latency. The latency is sensitive which is more of OLTP transactions that you’ve seen and the real-time which is the budget of 10 millisecond to one second. It could be approximate and finally is the batch which is high throughput and it takes probably anything more than an hour and I’ve given some examples of each one of them as well.
How does the real-time stack look like in general when you operate? Again as Ashish pointed out, there’s too much complexity in big data. Real-time stack on its own has its own set of complexities. First is the data collectors which collect data from where the data is produced in the first place. Then the next one is staging the data so that people can share the same data across several job that is going to do another different type of analytics on the data.
You need some form of storage to store the results somewhere so that you can query it later or you can power the dashboards. Finally you need a compute engine that allows you to transform the data or do some aggregation on the data or do some machine learning on the data. Essentially there are multiple moving parts that you have to take care of.
Now let’s look at what was at Twitter before we joined. Twitter messaging used to be a big set of what you call confluent of several components and that cost a lot of complexity. As you can see there are multiple messaging components. One is for something called Kestrel that was home developed and that was developed for scalability and the simplicity of operations but that was not enough.
In order to do duplicate logs at database levels, we have to go to BookKeeper because that gave no loss of data, high durability and consistency. We have to end up doing that for the use case. We also started using MySQL for the user profile. Similarly we were– At the time we had Kafka also. That was used for getting the logs, the web logs out of the Twitter web server so that we can analyze those logs. Managing all of these are a big nightmare.
In fact like we have to have different teams doing the individual components, developing those components, advancing those features on those components and satisfying those distributed different use cases. We have to rethink about, “Are we interested in spending so much money in developing the variant of the same pieces again and again, or we wanted to unify all of them into a one single piece which can support different use cases and different workloads and different performance characteristics.”
We wanted to have some unified stack with the data for various workloads and also we wanted to have consistency across the board so that we can have our database systems, our key-value store systems, can replicate data at a high level and also accurate. Then you want multi-tenancy because we have a big compute cluster that is running and maintained by a single team and we wanted to piggyback on those clusters rather than having our own cluster for each component. Again the complexity. Another thing is we wanted to scale resources independently.
You might have a need for a storage at some point, whereas on the other hand during Super Bowl time or even Oscars time, you might want to scale the amount of data being served because more online jobs might come along for analyzing that period of time and also the data volume might grow as well too. Finally we wanted to have some ease of manageability.
Finally we simplified the stack using something called event bus and distributed log. This was implemented homegrown and it has a simple notion of a publisher and subscriber.
The publisher publishes the data and stored into an event slash distributed log and subscribers just consume the data out of this logs. We have metadata piece that allows you where the data is and where the partition is. What is inside very simple.
Again like we use the BookKeeper because BookKeeper has the notion of the durability and consistency and we built something called a write proxy layer and a read proxy layer that allows you to scale. For example, if you have a one stream that you wanted to find out to probably the 200, 300 times or even million times, we just have to scale the read proxy come back to the storage because the storage is constant at the point.
If on the other hand if the data is increasing but the amount of people consuming the data is small, you can increase the BookKeeper layer so that you can increase the storage if you want. That gave a lot of flexibility in terms of scaling the individual actions independently. Distributed log is running in production for the last three years and we have been serving like around 20 PB per day out even though the data is in 400 TB per day and around two trillion events are processed.
The reason why you see a huge fan-out is because there are very popular sources within Twitter itself like the Twitter Firehose and Twitter Engagement course. These are all the data that we collect based on users interactivity with Twitter applications. That data is 400 TB but since these data are very popular everybody wants to use it for different kind of analytics ranging from fraud deductions as well as fake accounts as well as finding out how the ad is performing or not performing.
All these things are done using the same set of data. That’s why you see a 20 PB data. The cool thing is it is justified to 10-millisecond latency. The data is in within five millisecond it is visible for any consumption that you’d like.
Now let’s look at the compute. Compute was essentially done by Apache Storm that we open sourced in 2011 and by the time we have grown between 2011 to 2013 there are a lot of real-time jobs that were going in because a lot of use cases have been coming up and people have been wanting to piggyback on the real-time infrastructure and as the size of the whole thing data crunched as well as the number of jobs are increasing Storm started cracking up in terms of the scalability and the amount of data that it’s processing and incidents.
The scale that we were running is around 3600 nodes in Storm and we
we had all kind of issues. Probably we are the only guys, we have been running streaming at this scale. So what are the issues to be specific?
I don’t know how many of you have a good understanding of the Storm architecture but I’ll quickly run through it. There is a notion of a Nimbus node, which is essentially the master node and then there’s a bunch of other nodes, called the slave nodes, which actually do the work. The master node is a single point of failure and this master node also was doing lot of functionalities together in the sense of scheduling and monitoring and there is no notion of resources reservation.
Because of the lack of these features what happened was like, it was coming to a grinding halt in the sense like when you go to a web page served by Nimbus it will take two or three minutes to even load up that page because it is doing so much work and it was just absolutely killing the developer productivity as well as the agile methods that we follow in terms of scaling very quickly.
Of course, another one was like when multiple jobs of Storm are running together in a single node, or sharing some jobs in the single node, so it used to have issues in terms of one guy tending to occupy more CPU, then suddenly saying that, “Oh, tripping the other job.”
Take the example of some jobs during Oscar time, because a lot of data is being consumed at the time. If two jobs are expanding to increase the CPU usage, so since the CPU is limited on a particular machine, so one guy will try to take more than the other guy, then the other guy will not perform and it will drop the data, so that leads to a lot of issues in terms of interference.
Finally like Storm was threaded system and threaded systems are not really ideal in the scenarios where people are writing their own jobs at lower levels. It’s very hard to debug and you can’t get the logs in one place so that he can see what’s going on in the particular task.
It is difficult to tune in the sense like, how many resources did you give it to a threaded-based process, because disparate set of threads are running in a single process and how much resources predictably we can allocate. Then because of the fact that the multiple level of scheduling happens in the operating system, as well as within JVM, and also at the Storm, there was a complexity in terms of what time the particular task is scheduled.
Finally, we had already a bunch of other issues in terms of overloaded ZooKeeper. I don’t want to go into detail of those. If you want, I can point to the paper. But again, these are some of the problems that we faced. I mean, it’s not very efficient and not flexible to support mixed workloads, because some people wanted in real time.
“Hey, I want a low- latency jobs.” For example, I want the end-to-end latency in 20 milliseconds. On the other hand, some other workload they are interested, I don’t mind one second or a couple of seconds, but I want high throughput. How do you support these mixed workloads within a single infrastructure, rather than putting yet another infrastructure into the whole place.
That’s when we, our team looked at it and said, “Okay, like then we evaluated all our options and we finally decided to do a new system. Because when we decided to write Heron, at that point, the only systems that were available was Samza, which is from LinkedIn. Another Spark streaming was very infancy at that point. We evaluated Spark streaming, we couldn’t get it to work with 50 nodes at the time, so we decided that doesn’t work very well for us because we were trying to do on 3,000 nodes at least.
That’s what our goal was. The Samza, when we evaluated it was a good system but the API was not matching because we have a lot of machinery running on our own API. Ultimately we wanted to have a system where we can pull under the rug and put in the new system without application getting changed. All they have to do is just recompile application, launch it and they get all the-