In the previous blog post we gave an overview of the data pipeline required to find the trending topics in Wikipedia. In this blog post, we’ll explain how to get the raw data, process it and get it ready for analysis.
Download Pagecount files
The first step is to download page count data feed.
In this demo, all the queries will process data at a granularity of days. Therefore, we’ll partition the table by date. To process data using Qubole, it has to be stored on AWS S3 service. We have written a python script to download data to S3 and partition by day. The script is available in Github. We ran the script using the Shell Command . Shell command will run the python script in one of the slaves on the hadoop cluster. After the command succeeds, one days data is available in a S3 directory.
We have made the data available in S3 at s3://paid-qubole/default-datasets/demotrends/pagecounts/.
Create a Hive Table
Apache Hive has a feature – External Tables – which is backed by data NOT managed by Apache Hive. Since Pagecount will be created and updated by external scripts, we should create an external table with the following command:
CREATE EXTERNAL TABLE pagecounts (`group` STRING, `page_title` STRING, `views` BIGINT, `bytes_sent` BIGINT)
PARTITIONED BY (`date` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 's3n://paid-qubole/default-datasets/demotrends/pagecounts/';
ALTER TABLE pagecounts recover partitions;
Filter the pagecounts table
The raw data contains rows from many groups. Since we only care about topics in the ‘en’ group, lets run a HQL command to filter rows.
set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.dynamic.partition=true; CREATE TABLE filtered_pagecounts (`group` STRING, `page_title` STRING, `views` BIGINT, `bytes_sent` BIGINT) PARTITIONED BY (`date` STRING);
FROM pagecounts pvs INSERT OVERWRITE TABLE filtered_pagecounts PARTITION(`date`) SELECT pvs.`group`, pvs.page_title, pvs.views, pvs.bytes_sent, pvs.`date` where
not pvs.page_title RLIKE '(MEDIA|SPECIAL||Talk|User|User_talk|Project|Project_talk|File|File_talk|MediaWiki|MediaWiki_talk|Template|Template_talk|Help|Help_talk|Category|Category_talk|Portal|Wikipedia|Wikipedia_talk)\:(.*)' and
pvs.`group` = 'en' and not pvs.page_title RLIKE '([a-z])(.*)' and not pvs.page_title RLIKE '(.*).(jpg|gif|png|JPG|GIF|PNG|txt|ico)' and
pvs.page_title <> '404_error/' and pvs.page_title <> 'Main_Page' and pvs.page_title <> 'Hypertext_Transfer_Protocol' and pvs.page_title <> 'Favicon.ico' and pvs.page_title <> 'Search';
Above Hive command creates another table ‘filtered_pagecounts’ which is also partitioned by date. The command will process many partitions. We have to specify ‘nonstrict’ partition mode and turn on Dynamic Partitioning because we are not inserting into a specific partition. Click here for more details on Dynamic Partitions.
Rest of the command filters out rows that match a set of criteria.
Create Synonym Lookup Table
Wikimedia provides Page and Redirect tables as mysql dumps. These tables are very big. Page table has 10+ million rows and redirect table has 6+ million rows and it took us 11+ hours to import the mysql dumps into a db. Once the data is in mysql db, transformation required on these tables is simple and can be done by writing some join queries. But the size of tables is huge making it a very time consuming task to do these transforms in mysql.
Instead of using mysql we can transform these tables in Apache Hive.
Import Data from MySql tables to Hive
Qubole provides connectors to pull data from many RDBMS and no sql dbs into Hive. Using Qubole we can pull all the data in pages and redirect table into Qubole. To import data from an external data source you first need to create a DbTap by giving us the basic Db credentials. This DbTap can be used to create a Data Import Command. The command accepts arguments for dbtap, parallelism, db query, hive table etc to load data into Page and Redirect table in hive. An example of the command to download the page table is shown below.
These command run for about 2-5 minutes (exact time depends on how much parallelism is used) each and import 10million+ rows from mysql into hive.
Page and Redirect Tables as Flat Files
Qubole Import Command by default generates flat files in s3 as output. We have provided the output data (for page and redirect hive tables) of db import commands described in last step in a “paid-qubole” s3 bucket. You can directly create the page and redirect hive tables on top of this data:
CREATE EXTERNAL TABLE page (`page_id` BIGINT, `page_latest` BIGINT, `page_title` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LOCATION 's3n://paid-qubole/default-datasets/demotrends/page/';
CREATE EXTERNAL TABLE redirect (`rd_from` BIGINT, `page_title` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LOCATION 's3n://paid-qubole/default-datasets/demotrends/redirect/';
This will save you 11+ hours of time for uploading mysql dumps to a database.
Transforming data in Hive using HQL
Once the data is in hive tables(page and redirect), we use a 3 step process to create the synonym lookup table.
- Generate a table (page_lookup_nonredirect) for canonical pages i.e. pages that are not redirected to another page:
CREATE TABLE page_lookup_nonredirect (redirect_id bigint, `redirect_title` STRING, `true_title` STRING, `page_id` BIGINT, page_version BIGINT); INSERT OVERWRITE TABLE page_lookup_nonredirect SELECT page.page_id as redircet_id, page.page_title as redirect_title, page.page_id, page.page_title true_title, page.page_latest FROM page LEFT OUTER JOIN redirect ON page.page_id = redirect.rd_from WHERE redirect.rd_from IS NULL;
- Generate a table (page_lookup_redirect) for titles that are redirected to a canonical title:
CREATE TABLE page_lookup_redirect (redirect_id bigint, `redirect_title` STRING, `true_title` STRING, `page_id` BIGINT, page_version BIGINT);
insert overwrite table page_lookup_redirect
select original_page.page_id redirect_id, original_page.page_title redirect_title, final_page.page_id, final_page.page_title as true_title, final_page.page_latest from page final_page join redirect on (redirect.page_title = final_page.page_title) join page original_page on (redirect.rd_from = original_page.page_id);
- Finally, union page_lookup_nonredirect and page_lookup_redirect
CREATE TABLE page_lookup (redirect_id bigint, `redirect_title` STRING, `true_title` STRING, `page_id` BIGINT, page_version BIGINT); INSERT OVERWRITE TABLE page_lookup select redirect_id, redirect_title, page_id, true_title, page_latest from ( select redirect_id, redirect_title, true_title, page_id, page_latest FROM page_lookup_nonredirect UNION ALL select redirect_id, redirect_title, true_title, page_id, page_latest FROM page_lookup_redirect ) u;
- Qubole makes it easy to Import data into Hive from Mysql, postgres, vertica, redshift and mongo db.
- Hive support many of complex sql constructs like join, left join, outer joins and union.
- Because hive runs everything in parallel all these queries complete very fast.
In the next section, we’ll use Apache Hive to process the data and score all the topics.