The Qubole Data Service (QDS) is a Software-as-a-Service analytics platform running on leading cloud offerings like AWS. Targeted towards data analysts, data scientists and ETL engineers - it can help users to get started analyzing data in a matter of minutes.
SummaryHere at Qubole, our core focus is on providing the best platform to analyze data in the Cloud. We are simplifying the complicated infrastructure necessary for analytics and making the tools accessible for users with various skill sets and experience levels. We have also continued to optimize Hadoop and Hive for use in the Cloud and are proud to report that we are faster than ever! In our latest direct comparison with Amazon's Elastic MapReduce (EMR), Qubole was up to:
- 2x faster in launching a cluster
- 5x faster in query execution against data in S3
- 2x faster in writing data to S3
For details on how we set up these tests, keep reading. And if you have feedback on scenarios that you'd like to see us explore and test in the future, let us know!
SetupWe used 3-node Hadoop clusters in Qubole and Amazon's distribution of EMR. All the instances were of type m1.xlarge and were on-demand instances. As is standard in Hadoop, one instance (master) was dedicated to running the Hadoop JobTracker and NameNode. In both cases, Hive queries were run from the master node. Qubole used a local (Derby) database as the Hive metastore while EMR used a local mysql instance. Hadoop and Hive are highly sensitive to configuration parameters. We used default configuration settings for each platform - so the measurements here reflect the default experience for a user on these platforms. There were two exceptions. In EMR, we set the parameter hive.optimize.s3.query to true. This parameter is supposed to optimize queries against S3 datasets per EMR documentation. Furthermore, in Qubole, we disabled our caching framework that automatically caches S3 data in HDFS as we wanted to do a fair comparison where both systems access S3 data directly.
Cluster Bringup TimeLaunching the above mentioned cluster took 302 seconds in EMR, while it took 147 seconds in Qubole. Qubole was 2x faster. In the case of Qubole, a cluster is considered to be launched when HDFS is up and running. In the case of EMR, we considered startup time to be the difference between StartDate and CreationDate in the AWS console. Launching a 11-node cluster showed similar characteristics.
Dataset and QueriesIn our experiments, we used the popular TPC-H dataset. We generated a TPC-H scale 5 (5GB) dataset using the dbgen utility. Users tend to partition data by day. To simulate this, we split the lineitem into 1000 partitions (for approximately 3 years of data), each with one file. None of the other tables were partitioned. The data was in delimited text format. We uploaded the dataset to S3 using the s3cmd utility. We created external Hive tables against datasets residing in S3. Here's a sample DDL for the lineitem table:
create external table lineitem (L_ORDERKEY INT, L_PARTKEY INT, L_SUPPKEY INT, L_LINENUMBER INT, L_QUANTITY INT, L_EXTENDEDPRICE DOUBLE, L_DISCOUNT DOUBLE, L_TAX DOUBLE, L_RETURNFLAG STRING, L_LINESTATUS STRING, L_SHIPDATE STRING, L_COMMITDATE STRING,
L_RECEIPTDATE STRING, L_SHIPINSTRUCT STRING, L_SHIPMODE STRING, L_COMMENT STRING)
partitioned by (dummy STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
alter table lineitem recover partitions;
Hive does not yet support the original forms of the TPC-H queries. We used the modified versions of the TPC-H queries contributed by Reynold Xin available here. We've uploaded the data to a publically accessible S3 location. The complete set of DDLs and queries is available in our public bitbucket repository via the following git command:
git clone 'https://bitbucket.org/qubole/tpch.git'
For each query, we took an average of 3 runs in Qubole and EMR and plotted speedup times calculated as
Speedup = EMR_time / Qubole_time
Qubole shows an average speedup of ~4x over EMR. We repeated the experiment for smaller scale dataset of 1GB with 303 partitions mimicing the use-case of queries over data filtered over time (e.g. last year of data). Qubole showed an average speedup of 5x for the smaller datasets.
Writing data to S3The TPC-H queries tested above are read-heavy. We wanted to compare write performance to S3 in both platforms. We performed a table loading test by creating a partitioned version of the orders table using the following statements:
create external table orders_part (O_ORDERKEY INT, O_CUSTKEY INT, O_ORDERSTATUS STRING, O_TOTALPRICE DOUBLE, O_ORDERPRIORITY STRING, O_CLERK STRING, O_SHIPPRIORITY INT, O_COMMENT STRING)
partitioned by (O_ORDERDATE STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
insert overwrite table orders_part partition(o_orderdate)
select O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT, O_ORDERDATE
Qubole executed the above statement twice as fast as EMR.
ConclusionQubole greatly simplifies the analytics process with features like self-managing and auto-scaling Hadoop technology, approximate query support and easy ways to integrate data. Query performance also has an impact on user experience - cleansing and analyzing a few terabytes of semi-structured data such as social media data, web and online gaming logs can be time consuming. With Qubole, in many cases, these processes can finish in a fraction of the time and cost. Developers authoring queries for their pipelines or business analysts preparing or analyzing data can be much more productive. Of course, performance comparisons have their limitations and the proof of the pudding is in the eating. So don't take our word for it - bring your datasets and workloads and signup for a free trial!
Qubole offers Hive as a service. When a user logs in to Qubole, he/she sees the tables and functions associated with their account and can submit a HiveQL command via the composer pane. Qubole takes care of executing the HiveQL command, spawning a Hadoop cluster if necessary and saving results and logs. Now, multiple users belonging to different accounts may issue commands at the same time. Under the covers, these commands, along with contextual information, go into a common queue and are picked up in a FIFO manner. A wrapper script that does per-account housekeeping picks up a HiveQL command and fires up a Hive JVM with the right configuration parameters and passes the command as an argument. This Hive JVM compiles the command and runs it against the customer's cluster. This sequence of events is shown below.
Bringing up a new Hive JVM for every command has a performance impact. Spawning a JVM with all the required jars and classes takes approx 7 seconds. This overhead becomes significant for short running commands (e.g. EXPLAIN, CREATE EXTERNAL TABLE). This latency is especially problematic when a user is composing a new query and it takes 7 seconds to get a syntax error back from the system. So we started looking into what would it take to eliminate this latency and improve user experience. The answer here was to have a pre-warmed server that serves requests to lightweight clients. The new flow of control looks like this:
The devil is often, however, in the details. Hive is known to have issues related to multi-threading. Hive does come with a HiveServer, but we needed a server that could support different accounts, operate against different metastores and talk to different hadoop clusters. Hence, we started working on a multi-threaded, multi-tenant Qubole Hive Server (QHS). The first order of the day was fixing thread-safety issues and any code that assumes that Hive talks to a single metastore. A lot of it involved code hygiene like changing Static vars to ThreadLocals, adding synchronization, cleaning out or disabling internal caches. We're in the process of contributing this back to the community.
We used Thrift to generate the client and server stubs. The API is relatively modest. The client (in python) submits HiveQL command along with metastore information and cluster information and gets a command_id in response. The client polls the server and receives a completed status when the command is done. If the user wishes to cancel a running query, the client can pass a cancel request to QHS with the appropriate command_id. QHS maintains per-client state in its internal data structures and uses one thread to handle a command at a time. QHS needs to keep track of currently running mapreduce jobs and associate them with command_ids in case it needs to cancel a running mapreduce job. We also implemented a graceful shutdown of QHS that waits for currently running queries to complete to make it easier to release new internal versions of Hive with minimal disruption to our users.
The net effect of QHS was that the .
There was one issue that was a little perplexing. After running for a week or so, QHS starting throwing "too many files open" exceptions. A quick lsof call confirmed that there were numerous open file handles. Surprisingly, though, these all pointed to jar files. After some investigation, we found that the URLClassLoader leaks file handles to jars it opens (see this link for some dirty details). These are never garbage collected. We ended up using the non-standard ClassLoaderUtil.releaseLoader to free up resources. Java 7 has a nicer solution for this where URLClassLoader has a close method that performs the necessary cleanup.
If you're interested in giving Qubole a try, please sign up here for a free account.
This is a guest blog post written by Marc Rossen, a Qubole user and advocate. Team Qubole is grateful for Marc’s contribution to our blog.
About Marc and Mediamath
My name is Marc Rossen. I run the Client Facing Analytics & Insights team at MediaMath. I have been solving technical analytics problems in the digital marketing space for over a decade. I came across Qubole less than a year ago and I have been using their service over the past five months. I am happy to write this guest blog post on my experiences in Qubole, based on their request.
MediaMath, a 260-employee company based out of New York City, founded in 2007, is the leading global digital media-buying platform. At MediaMath, we develop and sell tools for Digital Marketing Managers under the TerminalOne brand. TerminalOne allows Marketing Managers to plan, execute, optimize, and analyze marketing programs.
The Analytics & Insights team is responsible for delivering decision-making infrastructure and advisory services to our clients. We do this by helping them answer complex business questions using analytics that produce actionable insights. Examples of our work include but are not limited to:
- Segmenting audiences based on their behavior including such topics as user pathway and multi-dimensional recency analysis
- Building customer profiles (both uni/multivariate) across thousands of first party (i.e., client CRM files) and third party (i.e., demographic) segments
- Simplified attribution insights showing the effects of upper funnel prospecting on lower funnel remarketing media strategies
Our flagship product captures all kinds of data that is generated when our customers run digital marketing campaigns on TerminalOne. This data amounts to a few terabytes of structured and semi-structured data in a day. It consists of information on marketing plans, ad campaigns, ad impressions served, clicks, conversions, revenue, audience behavior, audience profile data, etc. At MediaMath, we are always looking to enhance our cutting edge infrastructure. We were looking to take our existing capabilities to the next level to manage new innovative analytics tasks. Processing this raw data to segment the audience, optimize campaign yield, compute revenue attribution, etc., is a non-trivial problem for some of the following reasons:
1. Complexity of transforming Semi-Structured data
Transforming session log data to construct user sessions and click-path analysis for further analysis is a complex process. We knew that Apache Hadoop was an attractive alternative but we wanted a solution that our analysts could easily use and get started with quickly and did not have to worry about the operational management of such technical options. We wanted a solution where analysts could focus on their data and transformations without having to think about issues such as cluster sizes, Apache Hadoop versions, machine types and other elements of cluster operations.
2. Data Pipelines
We needed a service to develop data pipelines that repeated the same transformations, day-after-day, week-after-week, without much intervention from my team, once it was setup. Automating the execution of the data pipeline, while honoring the interdependencies between the pipeline activities was a crucial requirement! We had learnt our lessons via prior experiments with cron that this wasn’t the best approach.
3. Low risk Apache Hadoop
We needed something that was reliable and easy to learn, setup, use and put into production without the risk and high expectations that comes with committing millions of dollars in upfront investment.
We evaluated a few Apache Hadoop based offerings and decided to give Qubole a try:
1. Big Data Analytics Solution
During our trial, we quickly created an account on Qubole and the team helped us upload sample data. We started using the system and immediately started to see the value of it. Within hours, we were able to re-use a number of very useful, business-critical, custom Python libraries that we had developed, matured, and stabilized. These libraries computed revenue attribution by customer and by campaign by mashing together semi-structured and relational data, as well as other useful tricks.
We also noticed that the cloud-based Qubole clusters automatically grew the number of compute nodes as we started to run more queries and scaled the cluster down as the number of queries went down. This operational efficiency was a plus as we didn’t have to continually reach out to our partners in Engineering who have the complex task of managing our mission critical production systems.
3. Data Pipelines
Qubole’s engineering team worked with our team to build a custom data collector from our Oracle Database to my Amazon S3 account. Using their S3 Loader and Sqoop-as-a-Service offering, they setup a pipeline that loaded the S3 data into Qubole’s Big Data Analytics Solution, did all kinds of processing, and pushed the resulting summaries into a MySQL instance that both our customers and we could query using our BI tools. We were set up and running in a few days.
4. Low Risk
Qubole‘s interfaces, including its easy to use GUI that really simplifies big data and its support for SQL with easy ways of embedding custom libraries, made it easy to learn. Using their GUI, setting up and tearing down clusters was totally transparent -- as an analyst I did not have to take on such an operations headache. We saved the company a few million dollars of upfront investment by going with Qubole. Also, the Qubole guys are a seasoned bunch who seem know what they are doing, and have credible answers and solutions to the team’s questions. They are a Skype-chat or a phone call away whenever my team needs help with issues or change requests. I don’t feel I am taking on a huge risk by going with Qubole. Over time, they have become a partner in my team’s success, one to whom I delegate my big data platform needs.
I will conclude by saying that I am generally very happy with Qubole. Our goal at MediaMath was to take our existing industry leading infrastructure to the next level handling new complex analytics tasks. Qubole has helped us enable this goal with minimal risk. I wish the Qubole team the best, and wish you well in your journey of discovering a big data solution!
As Qubole Data Service has gained adoption - many of our customers asked for import and export facility from their relational data sources into the Cloud (S3). Dimension data from such data sources are an important part of data analysis. Log files (aka. Fact tables) in S3 are often desired to be joined with such dimension data as part of data analysis and/or pipelines. In many cases - end results of data analysis - whether of an ad-hoc query or a data pipeline - are often required to be exported back into relational databases. Stored in this form - the results are often used to drive reporting tools or power online applications. Fortunately, we found an excellent tool in open source - Apache Sqoop - for moving data to and from relational databases. This post describes our experiences using Sqoop and how we made it easy to integrate a lot of different data sources in the Cloud.
Apache Sqoop can import and export data from relational databases over JDBC to HDFS. Sqoop allows importing full table, selected columns - and even allows the flexibility of specifying free-form queries to extract data and write into HDFS. Moreover it can do this using multiple parallel connections where required. In doing so it takes advantage of the inherent parallelism of Hadoop. Hadoop is already one of the core offerings of QDS - and our self-managing and auto-scaling Hadoop clusters make adopting new Hadoop based applications a breeze. However - we had to solve a number of limitations before we could offer Sqoop as a service to our users:
- S3 support: Sqoop can be used to dump data to and from HDFS - but the target for data import and export in QDS is frequently S3. Hadoop clusters and HDFS instances bought up by QDS on behalf of customers are ephemeral.
- Exporting Hive Tables and Partitions: Although Sqoop supports importing to a Hive table/partition, it does not allow exporting from a table or a partition.
- Upsert Support: Upserts are a common mode for exporting data to a database - where existing rows are updated and new rows are inserted. Unfortunately, Sqoop does not support upserts for many databases, including Mysql.
Loading data to S3
To solve the first problem, we first use Sqoop to copy data into HDFS - and then generate Hive queries to copy/load that data into tables in S3. Both of these activities, today, occur on a shared Hadoop cluster run by Qubole. Fortunately, auto-scaling means we have little administration overhead of running a continuously variable customer workload. In addition the various optimizations we have made to fix Hive performance on S3 have been critical in making this arrangement work.
We have added upsert support for Mysql over JDBC. Sqoop exports data using SQL queries. We alter the generation of these queries to achieve the desired effect. Note that the notion of upsert in MySql is slightly different from the generic notion of upserts in databases.
Exporting Hive Tables/Partitions
While Hive Tables/Partitions can always be dumped to HDFS and exported by sqoop from there - an obvious optimization is to read data directly from the files backing Hive Tables in cases where it's feasible to do so. We have made these enhancements where possible.
Integration into QDS
One of the best things about building a service is that we can integrate useful primitives like Sqoop throughout our service:
- Users can define database end points using the DbTap abstraction (see Documentation). These database endpoints can be used for various activities throughout our system - including for exporting and importing data via Sqoop.
- One-time Export and Import commands available via our REST API are implemented using Sqoop.
- Import and Export commands can be embedded inside workflows and invoked in our batch processing infrastructure called the Scheduler.
Further integration points seem attractive. Users find it useful to run a query and then to later export it's results to a relational database. An export option can be displayed alongside historical queries visible via Qpal.
A persistent focus at Qubole has been in making Big Data components accessible and relevant to analysts throughout an organization. Interfaces that make it easy to author and test uses of powerful constructs like Sqoop are essential. With respect to Sqoop - these priorities resulted in many back-end and front-end improvements:
It is easy to erroneously configure a data import/export process. Simple errors (for example - mismatched schemas, or bad table/column names) can take a significant while to discover. To that end - we have added a test-mode to our Sqoop executions where we short-circuit expensive steps (like running Hive queries or Map-Reduce jobs) and just go through the rest of the steps. This has allowed us to quickly validate commands and give feedback on a lot of errors to users early.
Often a user may try to move too much data by mistake (an incorrect where clause on the source data for example!). In order to catch these - we impose reasonable limits on total data transfer by a single import/export command (that can always be overriden in genuine use cases).
We allow users to kill a running import/export command which might have been mistakenly launched. This is a generic functionality that is supported for all QDS commands. For Sqoop - we borrowed code from Hive to keep track of any map-reduce jobs spawned by the Sqoop command and requesting the Job-Tracker to kill them on receiving any signal.
Sqoop's parallelism is a boon when dealing with large data volumes - but can also make it harder to debug any execution failures. Users must go through the failed tasks belonging to any launched Hadoop Jobs and pull up their logs to find out the reason for failure. Apache Hive short-circuits this process by showing logs from the tasks with most failures automatically. We borrowed this idea/code from Hive and added it to Sqoop to help make it easier to debug failures in Sqoop runs.
Making sophisticated tools like Sqoop as widely accessible as possible is always a challenge. A case study in point is Sqoop import configuration. After looking at the configuration options carefully - we decided to categorize the configurations into one of two types:
- Simple Mode: where the user specified columns and where conditions for data extractions
- Advanced Mode: where the user can specify a free form query to extract data
Sophisticated configuration around parallel extracts were only made available in the Advanced mode - and that too only if the user actually wanted a parallel extract. Some of these flows are shown below. We hope that this makes Sqoop imports extremely accessible for the vanilla use case - while at the same time allowing the power users full flexibility and helping them avoid errors.
Conclusion and Roadmap
Apache Sqoop has been an excellent starting point for our data integration efforts. For the most part - it just works and the quality of the codebase is excellent (our sincere thanks to its developer community). The functionality described in this post is now available generally - users can signup for QDS and create a free account. We are actively working on additional integration points of Sqoop in our browser application. Going forward we will be integrating support for importing/exporting data from other types of databases - like MongoDB and CouchDB - into QDS. We are also working on integrating Sqoop's incremental extraction features into our product. We are also investigating closer integration between Sqoop and Hive - to let users have direct access to external data sources using Hive. Democratizing data, it turns out, creates interesting engineering problems. All the way from creating intuitive User Interfaces, to coming up with future-proof abstractions and apis, writing operational software to run complex distributed systems without downtime and finally to having a rocking back-end data processing stack. If these sorts of problems interest you, ping us at email@example.com. We're hiring!
It started with an innocent tweet in response to a blog post on how to optimize top-k queries.
My colleague, Shrikanth, pointed out that Hive does not, in fact, have this optimization.
After a couple of months, I finally got a chance to implement the optimization and that is topic of this blog.
Here's an example of a top-k query:
SELECT * FROM T ORDER BY a DESC LIMIT 10
The user is interested in knowing the rows of T with the top 10 values of column a. You can imagine other variants where T is a derived table or a view that could encompass other computations or aggregations.
How does Hive execute such queries?
In simple terms, the above query is executed using the following steps.
- A number of map tasks read parts of table T
- Each map task sorts its portion of the data on a and writes it to disk
- There is a single reducer that reads data from all the mappers and merges these in order of a
- The reducer invokes a limit operator on the merged stream which allows only 10 rows to pass through
The query is interested in only top 10 rows yet all rows of T are being sorted, written to disk and transmitted to the reducer and merged. If T is very large, this can take a very long time.
Every row in the final top 10 must have been part of the top 10 rows out of some mapper. This key observation allows us to restrict each mapper to sending only their top 10 rows. This substantially reduces the I/O cost involved in executing the above query. The way we implemented this solution inside the Qubole Data Service was split between Hive and Hadoop. In Hadoop, we introduced a new parameter "map.sort.limitrecords" which limits the number of records each mapper outputs. Hive creates a map-reduce job corresponding to this query. In Hive, we identified the top-K query pattern and set this parameter in the created job to the appropriate value. The advantage of this approach is that regular map-reduce jobs that want the top-k property can also take advantage of this optimization by setting this parameter.
We demonstrate the performance improvements using a synthetic dataset with 10 million rows that has 1 million unique entries for column v. We compare the performance of two queries:
Q1: SELECT v, COUNT(*) c FROM T GROUP BY v ORDER BY c DESC LIMIT 10
Q2: SELECT * FROM T ORDER BY v LIMIT 10
The first query is interested in attribute that is repeated most frequently whereas the second query is interested in top-10 rows. Both queries benefit from the optimization, albeit to different degrees. Q1 seems an improvement of about 10% while Q2 is improved by 7.5x! Note that the first query spends a long time in the grouping phase and the effect of the optimization is subdued. The second query benefits considerably from the optimization.
We are helping get a similar optimization in Apache Hive tracked here. This optimization available as part of the Qubole Data Service. Signup today to get an account and give it a ride! If these sorts of problems interest you, ping us at firstname.lastname@example.org. We're hiring!