This post was written by Yogesh Garg and Sumit Maheshwari, who are Members of the Technical Staff at Qubole.
We are pleased to announce that Qubole has open sourced an Airflow extension to connect with Qubole Data Service (QDS). Using this extension, our customers will be able to use Airflow for creation and management of complex data pipelines while submitting jobs directly to QDS, thus taking advantage of Qubole’s advantages in auto-scaling, cluster management, and persisted jobs and results. In this post, we’ll walk through the customer use case, our reason for integrating with Airflow, and how to use the new operator in Airflow, QuboleOperator.
One of the most common use cases for our customers is large-scale ETL of data through complex data pipelines. Qubole’s auto-scaling clusters are ideal for ETL workloads, where the size of the data processed is not always predictable. And our automatic spot integration reduces the total cost of running these jobs.
We have seen many approaches to the development and management of these data pipelines, including the use of solutions such as Oozie, Luigi, and Pinball. In addition, some customers make use of Qubole’s own workflow command, for serial execution of jobs. Recently, we’ve noticed the adoption by some customers of Airflow, an open source data pipelines management platform. Since we’re always looking to bring the newest standards and best practices to our customers, we decided to investigate Airflow and see if we could easily integrate it with QDS.
One of the most distinguishing features of Airflow over other solutions like Oozie is the representation of a DAG. An Airflow DAG can be represented in a Python script. This allows a Data Engineer to represent complex workflows quite easily using an object oriented paradigm. Also, Python has a rich set of open source libraries for data analysis which makes it a go-to language for data engineers. Another important aspect is the ease of flexibility that Airflow offers to customize and plug in various external services. This is something we found missing in other solutions.
An example DAG-based workflow in Airflow
From an architectural point of view, Airflow is simple and scalable. One of our customers is driving their ETL data pipeline through Airflow, submitting more than 100,000 QDS commands per month through a 150+ node DAG workflow. Needless to say, Airflow is also quite easy to setup and maintain.
We were also impressed to see out of the box support for various important features like backfill, conditional branching, inter-communication of tasks and ability to visualize progress of the DAGs. There are other advanced features available as well which makes Airflow better than other scripting based solutions like Spotify’s Luigi.
We also found a recent blog post from Marton Trencseni that did a nice job of comparing and contrasting Airflow with Luigi and Oozie. Interestingly, he came to a similar solution as us in recommending Airflow over the other two.
We have introduced a new type of Airflow operator: QuboleOperator. The operator can be used just like any other existing Airflow operator. During the execution of the operator in the workflow, it will submit a command to QDS and wait until the command finishes. Any valid Qubole command can be executed from the QuboleOperator. Apart from required Airflow parameters such as ‘task_id’ and ‘dag’, there are other key value arguments needed to submit a command inside QDS. For example, to submit a Hive command inside QDS, QuboleOperator will be defined as below:
hive_operator = QuboleOperator(task_id='hive_show_table', command_type='hivecmd', query='show tables', cluster_label='default', fetch_logs=True, dag=dag)
To check the different command types and their required parameters that are supported, you can check the detailed documentation on QuboleOperator class inside the Airflow codebase. There is also a full-fledged example DAG for QuboleOperator with various use cases.
Here is an example operator in Airflow and what the corresponding command looks like in QDS
Setting up Airflow with Qubole
Airflow can be set up by following this guide. Following additional steps will be required to integrate with QDS.
- Install qds-sdk if it is not already installed. To install execute
pip install qds-sdkcommand. There is no upgrade required if qds-sdk is already installed. Alternatively you can execute
pip install airflow[qds]to achieve the same.
- Register Qubole connection inside Airflow. Within Airflow server, go to Admin->Connections page. Click on “Create” tab. Fill up the following values of fields in the form and click on Save.Conn Id: qubole_default
Conn Type: HTTP
Logs of the QDS command can be stored inside logs of the Airflow task by setting “fetch_logs=True” while instantiating QuboleOperator. There is a provision to fetch results and logs explicitly and save in a desired file using
get_log methods of QuboleOperator. As a fall back, you can always check results and logs from QDS Analyze Page for the corresponding command ID.
As a next step ahead, we are considering to provide Airflow as a Service to our customers. This will allow them to use Airflow inside QDS without any need to set up and maintain any Airflow instances. We are working on other improvements as well to improve the user experience.
You can email [email protected] if you have questions about our integration with Airflow, our future plans, or to leave us some feedback.