In previous blog posts, we explained how to create a data pipeline to process the raw data, generate a list of trending topics and export it to the web app. In this blog, we will explain to you how to deploy the data pipeline using the Qubole scheduler. The data pipeline shown in the image below will be run every day.
The data pipeline is divided into four schedules:
- Schedule No. 463 downloads the data from wikimedia website using a Shell Command.
- Schedule No. 464 filters and normalizes the Pagecount table.
- Schedule No. 465 implodes the data, scores all the topics and exports it to the web app.
- Schedule No. 525 deletes data older than two months.
The commands deployed are by design incomplete. For e.g. consider the query to implode data.
add jar s3://dev.canopydata.com/rvenkatesh/collect_all.jar; CREATE TEMPORARY FUNCTION collect_all AS 'com.qubole.udaf.CollectAll'; drop table if exists daily_data;
create tmp table daily_data as select page_id, collect_all(dt) as dates, collect_all(views) as views from ( select np.`date` as dt, np.page_id, sum(np.views) as views from normalized_pagecounts np where np.`date` >= '$thirtydaysago$' and np.`date` <= '$yesterday$' group by np.`date`, np.page_id order by dt ) daily_count group by page_id;
The query implodes last 30 days of page views for every topic. The range of dates changes every day and the filter is expressed by the following where clause:
where np.`date` >= '$thirtydaysago$' and np.`date` <= '$yesterday$'
yest = Qubole_nominal_time.clone().subtract('days', 1)
yesterday = yest.format('YYYY-MM-DD')
thirtydaysago = Qubole_nominal_time.clone().subtract('days', 30).format('YYYY-MM-DD')
Subsequent schedules should only run if a previous schedule successfully generated data.
For e.g. schedule no. 465 process the latest partition in normalized_pagecounts table which is generated in schedule no. 464.
Such a dependency can be captured in the scheduler as shown below.
If there is no data for today’s partition, then the queries are not run and an error is generated.
History and reruns
The scheduler keeps track of every instance of the schedule. The history is useful to triage errors if any, fix and rerun a particular instance. When an instance is rerun, the exact context is replicated. The screenshot shows an error in instance no. 14 and its rerun.
Finally, for the lights-out operation, we setup notifications for all the schedules to send emails on failures. These schedules have been running smoothly in general and updating the web app with fresh content everyday.
This blog post concludes the series describing how we setup the Demotrends website using Qubole to process all the data. We hope you are inspired to build your own data-driven websites and would love to hear about them!