Big data and Football: Using Pig to Analyze Pigskin


With the 2015 NFL Draft now complete, and minicamps now in full swing, it’s prime time to continue our NFL blog series. For this section, we will take a deeper dive into the technologies and processes used to stage the data up for ingestion by SSAS, and ultimately Hashi using D3.

From the previous post, we have the following process flow which outlines the overall technology integration path from beginning to end. What was generalized as the Hadoop stage can be broken out and detailed by several sub-steps.


Staging the data for ingestion into Hadoop

To understand the end product, it’s important to first mention that a primary goal of this project was to test the performance and ease of integration of Hadoop with other systems, specifically relational databases. Because we decided to use Microsoft SQL Server as our target endpoint, we originally chose Microsoft’s deployment of Hadoop, HDInsight as our big data platform. As a result, we chose to store our raw data extract from the Twitter API in Azure Blobs using JSON strings. After the scope expanded to sentiment analysis and machine learning, however, we decided to switch platforms to an Azure-based Linux virtual machine cluster running a traditional Hortonworks deployment for integration with Spark and Python based programming, which at the time was unsupported on HDInsight.

Thus, in order to ingest the tweets and store them in Hadoop, we needed an extra step to transfer the data from the Azure blobs to the virtual drives before moving them into the Hadoop Distributed Filesystem (HDFS). Below is the bash script using Azure CLI and Expect to move the data from Azure blobs into the Azure-hosted Hortonworks HDFS instance.

To summarize, it first retrieves a list of all blobs in the container. Then, for each blob, it downloads the file and auto-responds to the prompt if a file already exists in the target folder. Finally it copies all files in the target folder into HDFS using the fs command.

We also explored alternative messaging ingestion options, but after settling on a batch-style processing techniques, we decided to stay consistent and use batching for ingestion as well.

1  #!/bin/bash
2  echo Starting import job
3  azure storage blob list --container nfl -a twitteraggregator -k [accesskey] | sed -e 's/data: \([0-9]*\).json.*/\1/' | grep [0-9] >blobs.out
4  tail -n +2 "blobs.out" >blobs2.out
5  i=0
6  cat blobs2.out | while read line
7  do
8  ((i++))
9  echo -en " Processing file $i/`cat blobs2.out | wc -l`\r";
10 currentline=$line
11 export currentline
12 ./azureDownloadExpect.exp &> /dev/null
13 done
14 echo -en " Copying the data to HDFS to be staged by pig\r";
15 hadoop fs -copyFromLocal /mnt/datadrive2/azuredownload/ hdfs://


1 #!/usr/bin/expect
2 log_user 0
3 spawn "./"
4 expect "Do you want to overwrite"
5 send "\rno\r"

1 #!/bin/bash
2 azure storage blob download -b $currentline.json --container nfl -a twitteraggregator -k [accesskey]

Using Pig to clean JSON strings

Our JSON Azure Blobs, now properly in HDFS, are actually JSON arrays with a large number of objects in a single large array per blob. In order to natively import into Hive tables, a single JSON object per row is required. For example, we need to convert the structure
1 [{"firstName":"John", "lastName":"Doe"},{"firstName":"Anna", "lastName":"Smith"},{"firstName":"Peter","lastName":"Jones"}]


1 {"firstName":"John", "lastName":"Doe"}
2 {"firstName":"Anna", "lastName":"Smith"}
3 {"firstName":"Peter","lastName":"Jones"}

This can be done fairly simply using a Pig script by loading all files, using wildcard notation to append the text, then properly format it using a series of replace functions:

1 A = LOAD '/user/azureimport/azuredownload/*.json' AS twitter;
3      REPLACE(
4      REPLACE(
5      REPLACE(
6      REPLACE(
7      REPLACE(
8      REPLACE(twitter,
9                '\\[\\{\\"Id\\"', '\\{\\"Id\\"'),
10                '\\}\\}\\]', '\\}\\}'),
11                '\\}\\}\\,\\{\\"', '\\}\\}\n\\{\\"'),
12                '\\\\n',''),
13                '\\\\r',''),
14                '\\\\\\"','');
15 rmf PigOutput
16 STORE B INTO 'PigOutput';

It’s important to note the need for an additional dereferencing backslash for special characters in pig regular expressions. The most impressive being the expression to find the string “backslash double quotation” or /”. It requires six consecutive backslashes to be properly dereferenced.

Using Hive to natively import JSON into tables

With the raw file now properly staged and in HDFS, we are ready to work in HIVE and begin ingestion. The first step is to move the raw file into a HIVE staging table using the simple script:


1 DROP TABLE IF EXISTS tweets_raw;
3 CREATE TABLE tweets_raw
4 (
5     value STRING
6 );
8 LOAD DATA INPATH '/user/root/PigOutput' OVERWRITE INTO TABLE tweets_raw;

The result is a single JSON object in every row of the table. Now that we have everything well staged, we are ready to use pure HIVE code to import and flatten the data if desired. The three key functions that are used to navigate JSON in HIVE are LATERAL VIEW, json_tuple(), and for array structures, explode(). To understand the code used to import the Twitter files, you first need to understand the JSON schema. A handy tool to discover complex schemas is the hive-json-schema generator by quux00. For our raw data, the following output was generated:

1  CreatedDate string,
2  Entities struct <
3     Hashtags: array <struct <
4        indices: array <smallint>,
5        text: string>>,
6     Urls: array <struct <
7        display_url: string,
8        expanded_url: string,
9        indices: array ,
10       url: string>>>,
11 FavoriteCount INT,
12 Id bigint,
13 RetweetCount INT,
14 Text string,
15 User struct <
16    FollowersCount: INT,
17    Id: BIGINT,
18    Language: INT,
19    Location: string,
20    Name: string,
21    SatusesCount: INT>

Now that we know our schema, we can write code to properly navigate the structure, and associate values to fields, as well as flatten the data if needed. For a detailed walkthrough of how to use LATERAL VIEW and json_tuple() please refer to the excellent blog “Hive Plays Well with JSON”. In summary, using the two functions allows you to drill down into structs and pull the objects up and out of the hierarchy. If you want to flatten arrays, however, there is a somewhat less elegant solution. As before, you must transform the array structure from a square-bracket enclosed single line string to multiple lines with a single JSON object on each. Then, using the explode() function, you can then duplicate the other fields and flatten the array. Here is the full code for importing the JSON rows with the mentioned schema:

1  DROP table IF EXISTS nflTweetsFromRaw;
2  CREATE TABLE nflTweetsFromRaw
3  (
5  TweetText STRING,
6  CreatedDate STRING,
7  FavoriteCount INT,
8  RetweetCount INT,
9  UserFollowersCount INT,
11 UserName STRING,
12 UserLocation STRING,
13 HashTagText STRING,
14 DisplayURL STRING,
15 ExpandedURL STRING,
17 )
21 SELECT b.Id, b.TweetText, b.CreatedDate, b.FavoriteCount, b.RetweetCount,
22 h.UserFollowersCount, h.UserID, h.UserName, h.UserLocation, e.Hashtagtext,
23 g.displayURL, g.expandedURL, g.URL
24 FROM tweets_raw a
25 LATERAL VIEW json_tuple(a.value, 'Id', 'FavoriteCount', 'CreatedDate', 'RetweetCount', 'Text', 'User', 'Entities') b
26    AS Id, FavoriteCount, CreatedDate, RetweetCount, TweetText, User, Entities
27 LATERAL VIEW json_tuple(b.Entities, 'Urls', 'Hashtags' ) c
28    AS Urls, Hashtags
30    explode(
31       split(
32          regexp_replace(
33          regexp_replace(
34             c.Hashtags,'^\\[|\\]$', '')
35             ,'\\},\\{','\\}\\},\\{\\{')
36          ,'\\},\\{')
37          ) d
38    AS HashExplode LATERAL VIEW json_tuple(d.HashExplode, 'text') e
39       AS HashtagText
41    explode(
42       split(
43          regexp_replace(
44          regexp_replace(
45             c.Urls,'^\\[|\\]$', '')
46             ,'\\},\\{','\\}\\},\\{\\{')
47          ,'\\},\\{')
48          ) f
49    AS URLExplode LATERAL VIEW json_tuple(f.URLExplode, 'display_url', 'expanded_url', 'url') g
50       AS displayURL, expandedURL, URL
52 LATERAL VIEW json_tuple(b.User, 'FollowersCount', 'Id', 'Language', 'Location', 'Name', 'StatusesCount') h
53    AS UserFollowersCount, UserID, Language, UserLocation, UserName, UserNumberOfTweets;

As you can see, explode, split, and replace are used to flatten the “Hashtags” and “Urls” arrays, and then lateral view and json_tuple() are used to return the desired fields.

While somewhat complex, the primary advantage of this technique, other than lack of need for third party tools or SerDe, is tolerance of schema changes. If an additional field or structure is added to the JSON object, unlike many other solutions, this code will not fail. It will not, however, adapt to the change and pull in the additional fields without edit, and will fail if a referenced field is removed. This technique allows for marginal future-proofing, and is especially useful if the raw data is used in other ETL or native reporting processes that would require additional fields.

Summing Up

As a result of our ETL, we now have a fully flattened Hive table ready to be reference and processed by downstream tools. We could have also chosen to leave the arrays intact, saving storage space and reducing the required processing time, but as our end destination for this data is a relational database, flattening the data is the easiest way to stage for import. If you are looking to perform aggregations or mass-calculations on the entire dataset, then using Hadoop will most likely be more performant than document-stores such as MongoDB. However, if you plan on doing updates, or can partition your calculations and transformations to a smaller subset of the data, then you are better off using MongoDB or possibly Postgres 9.4+ new jsonb format as a document store.

1 Comment

Phone: 312-602-4000
222 W. Adams
Chicago, IL 60606
Show Buttons
Share On Facebook
Share On Twitter
Share on LinkedIn
Hide Buttons