Qubole Demo : Analysis and Export

Start Free Trial
September 26, 2013 by Updated January 8th, 2024

analysis_export

Introduction

In the first blog post of the series, We gave an overview of the data pipeline required to find the trending topics in Wikipedia. In the second blog post, we explained how to get the raw data and how to process it to create a filtered page-count table and a page-lookup table. In this blog, we will talk about the analysis performed on these tables to compute daily and monthly trends. Finally, we will discuss on steps involved to export the result to the Rails web app.

Normalized Page Count

Once we have the page look-up table, we move to the next step where we normalize the filtered page counts table. Filtered page counts contain total page views per hour per day for each page title while the look-up table has the mapping between the synonyms of a page title and the actual page served. We do a left join of both tables on a page title to resolve the synonyms.

Let’s run the following hive command in the Qubole composer

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
CREATE TABLE normalized_pagecounts (`page_id` BIGINT, `page_title` STRING, `page_url` STRING, `views` BIGINT, `bytes_sent` BIGINT) PARTITIONED BY (`date` STRING);
insert overwrite table normalized_pagecounts partition(`date`)
select pl.page_id page_id, REGEXP_REPLACE(pl.true_title, '_', ' ') page_title, pl.true_title page_url, views, bytes_sent, `date`
  FROM page_lookup pl JOIN filtered_pagecounts fp on fp.page_title = pl.redirect_title;

The above command creates a new table “normalized_pagecounts” with dynamic partitions on a date. The data in the table is now well-cleansed for the out-trend algorithm.

Normalized

Trend Analysis

Algorithm

We used baseline trending algorithms for our trend analysis. We can definitely have better algorithms.

Daily trend = slope * weighing factor

where slope = difference of views over a week and factor = log(1 + ∑pageviews in the week

Monthly trend = ∑pageviews in last 15 days – ∑pageviews in first 15 days

Implode Data

The first step of the implementation is to implode the data in the table per page title. The normalized page count table has hourly data partitioned by date. So we perform the following two operations

  • Group the table by date and page_id, and sum the views to get total page views per day for each page title.
  • Collect all such (date, page-views) entries in an array. HQL does not currently have such operations. So we have written a custom function in Java to serve our purpose. The jar file is located here. Qubole supports adding UDFs in your HQL command.

Let’s run the HQL command for this

add jar s3://dev.canopydata.com/rvenkatesh/collect_all.jar;
CREATE TEMPORARY FUNCTION collect_all AS 'com.qubole.udaf.CollectAll';
create table daily_data as  select page_id, collect_all(dt) as dates, collect_all(views) as views 
from ( select np.`date` as dt, np.page_id, sum(np.views) as views from normalized_pagecounts np 
group by np.`date`, np.page_id order by dt) daily_count 
group by page_id;

 

Title

Dates

PageViews

India[1 June 2013, 2 June 2013…… 30 June 2013][ 400, …. 300]
Calculate Trends

The trend algorithm is implemented as a Hive UDF in Python. The script can be found on GitHub. The script reads the page-count data for each page title, applies the algorithm, and emits the daily and monthly trend value 

add file s3://dev.canopydata.com/vagrawal/scripts/hive_trend_mapper.py;
DROP TABLE IF EXISTS daily_trends;
CREATE TABLE daily_trends as
 SELECT u.page_id, pl.true_title page_url, REGEXP_REPLACE(pl.true_title, '_', ' ') page_title, u.total_pageviews, u.monthly_trend, u.daily_trend, u.error 
 FROM ( FROM daily_data ndt MAP ndt.page_id, ndt.dates, ndt.views USING 'hive_trend_mapper. AS page_id string, total_pageviews BIGINT,monthly_trend FLOAT, daily_trend FLOAT, error FLOAT)
u join page_lookup_nonredirect pl on u.page_id = pl.page_i

 

Title

As of Date

Daily Trend

Monthly Trend

India30 June 2013100.452330

Export to WebApp

Web Application has the following DB tables

  • monthly_trends – to store top monthly trends
  • daily_trends – to store top daily trends
  • pages – Information on the trending pages with total page views
  • daily_timelines – imploded data for each of the trending pages

To populate the web DB we need to do the following two steps

  • HQL command: select the top 50 monthly trends and the top 50 daily trends from our final result table (daily_trends).
  • Data Export command: Once we have this data, we export the relevant values to different tables of the web app DB.

In Qubole, we can create a workflow where multiple steps are executed sequentially in one command. Also, we have a custom workflow template(query export) for running hive queries followed by a data export.

Let’s run different query export commands to populate our Webapp tables.

Export daily_trends table
insert overwrite directory 's3://your-bucket/wiki_pagecounts/tmp/daily/$yesterday$'
select cast(concat('$yesterday_trunc$', cast(page_id as string)) as bigint), page_id, daily_trend, error,
 '$yesterday_time$', '$yesterday_time$', '$yesterday$'
from daily_trends order by daily_trend desc limit 50;
Export monthly_trends table
insert overwrite directory 's3://your-bucket/wiki_pagecounts/tmp/monthly/$yesterday$'
 select cast(concat('$yesterday_trunc$', cast(page_id as string)) as bigint), '$yesterday$', page_id, monthly_trend, 
 total_pageviews, '$yesterday_time$', '$yesterday_time$' 
 from daily_trends order by monthly_trend desc limit 50;
Export to pages table
insert overwrite directory 's3://your-bucket/wiki_pagecounts/tmp/monthly_pages/$yesterday$' select page_id, page_url, page_title, 0, 0, 0, 0 from
 (select page_id, page_url, page_title, monthly_trend from
 daily_trends order by monthly_trend desc limit 50
 ) u;
insert overwrite directory 's3://your-bucket/wiki_pagecounts/tmp/daily_pages/$yesterday$' select page_id, page_url, page_title, 0, 0, 0, 0 from
 (select page_id, page_url, page_title, daily_trend from
 daily_trends order by daily_trend desc limit 50
 ) u;
Export to data_timelines
insert overwrite directory 's3://your-bucket/wiki_pagecounts/tmp/daily_timelines/$yesterday$' 
select cast(concat('$yesterday_trunc$', cast(u1.page_id as string)) as bigint), u1.page_id, dd.dates, dd.views, 0, '$yesterday_time$', '$yesterday_time$', '$yesterday$' from ( 
 select distinct(page_id) from (
 select page_id, daily_trend as trend 
 from daily_trends order by trend desc limit 50
 union all
 select page_id, monthly_trend as trend 
 from daily_trends order by trend desc limit 50
 ) u
) u1 join daily_data dd on u1.page_id = dd.page_id

Once the data has been exported, the Data Export command will transfer the data to a MySQL instance that drives the web app.

Conclusion

In this demo, we have seen how Qubole helps in creating a data pipeline. Using different tools and connectors, we can import the data into Qubole, run various analyses, and export the result to your favorite visualization engine. It enables the data engineers to focus on streamlining data pipelines instead of cluster management. In the next section, we’ll use the Qubole Scheduler to deploy the complete data pipeline.

Start Free Trial
Read Incremental Hive for Workflows