Ooooh I’m Telling : Doing Swear Word Analysis with Storm on HDInsight

As promised, this is the first of three (maybe more) posts that will present an end-to-end example to showcase the distributed streaming capabilities of the Apache Storm project. This first post will provide an introduction to the project and an overview of all the moving pieces.

Please note that I will not be getting into line-by-line code examples outside of those that are relevant to the Storm deployment. Likewise, I will not get too deep into any of the peripheral technologies involved. I will leave those for either a later blog post or an exercise for you the reader. The complete project and source will be posted to GitHub (HERE).

The rationale for this project is pretty straight-forward. As a father to young kids, I became acutely aware of swear words and foul language when my kids reach the parroting stage. For those without kids, this is the stage where a two or three year old hears a word and randomly begins inserting it into their daily vocabulary (typically at the most inopportune times, like when mom is around). This offers many humorous and potentially embarrassing moments depending on the environment.

To play off of this and since Twitter is useful for all sorts of things beyond letting people know what you are doing minute by minute, I decided to do some seemingly simple swear word analysis on people’s tweets. This analysis would hopefully allow me to answer questions like:

  • Which parts of the world have the worst potty mouths?
  • In what context (sentiment) do people use swear words?
  • Can I identify topics or hashtags that people curse about the most?
  • Who are the top 10 worst potty mouths on Twitter?

Luckily, all of these question can be easily answered and since it would be nice to do it in real-time, Apache Storm is perfectly suited to the task given the volume and velocity of data in the Twitter stream.

Project Overview

The bulk of this demonstration project, which I will refer to as Twitter Tattletale revolves around common ETL types of activities you would expect to find in a non-streaming, traditional BI/batch processing solution. In fact for this pseudo phase one implementation I will ignore streaming analytics entirely (and a fancy, pretty UI) so we can even break it down into ETL steps. Please understand, that even though I am comparing this to a more traditional ETL process, these steps occur in an unbounded manner, meaning that instead of a fixed beginning and end like that of a SSIS of Hadoop job they run continuously until terminated.

Extract

The entry point for this project is an Azure Service Bus queue. An external application handles consuming the Twitter stream and publishing brokered messages to the queue so our solution will simply need to ingest these messages.

Transform

There are multiple cleaning, filtering, enrichment and transformation steps that occur within this project. The process starts by filtering tweets so that only those in English are processed. Next, external services are used to determine the tweet’s sentiment and to reverse geocode the location based on the latitude and longitude coordinates if provided. Then, the tweet text is split into words for cleaning and indexing. Noise words, special characters, user mentions and urls are all stripped away before determining if the tweet is vulgar by counting the instance/number of swear words.

Load

Rather than loading a dimensional data warehouse, the tattletale project stores data in multiple formats each with its own purpose. Raw tweet data is stored so that it can be process later in a more traditional batch process such as with HDInsight (Hadoop). Aggregated data from the stream is persisted for both the tweet and extracted topics in a database (HBase).

Now that the work has been defined, let’s move on.

Technologies and Considerations

There are some miscellaneous pieces that are ultimately external to Apache Storm but are important nonetheless.

First, is the event producer which is responsible for capturing and storing tweets to the Service Bus queue. The application uses the Tweetinvi API to capture streaming tweets. For the demo I have implemented this as an Azure Worker Role, though it couple be easily run as a console based application on your desktop. The important thing to remember is that this event producer could ultimately be anything and the source of the stream could come from virtually anywhere.

Next is Apache HBase. HBase is a distributed NoSQL database built on top of HDFS and based largely on Google Big Table. It is a columnar database, optimized for high volume, high velocity real-time random red/write access. Whew…that was a mouth full. Please note that the database you choose does not matter to Apache Storm. Outside of demo-land your choice should be driven by the requirements of whatever it is you are trying to build and implement.

SCP.Net

Now on to the good stuff. Before discussing the project specific topology, it is necessary to introduce SCP.Net (Stream Computing Platform). Storm, like most Apache projects is largely Java centric. Most of the documentation and/or examples that you’ll run across are written in either Java or Python. This abstraction allows you to build a Storm topology and deploy it a HDInsight Storm cluster either purely in .Net or in a hybrid deployment using a mix of Java and .Net.

The link referenced above does a good job of explaining SCP.Net in greater detail and you can get started using it by installing HDInsight tools for Visual Studio in the authors follow-up post.

The Twitter Tattletale Topology

Recalling from the prior post that a Storm Topology consists of streams, spouts and bolts. Let’s start with a picture (below) and then take a look at each piece of the topology for the Twitter Tattletale demo.

Topology

  1. Topology – the topology is a simple non-transactional topology, meaning that we do not necessarily care that data is processed in the order it was received.
  2. Twitter Spout –  reads brokered messages from the Azure Service Bus queue. The brokered messages consist on tweets serialized as JSON. The spout extracts the unique tweet id and tweet language and then filters out non-English tweets before emitting a tuple into the data stream. The tuple consists of the unique id and tweet JSON. Note that in an ideal (read non-demo) situation, the spout would be more generic than it is. The business of filtering should be done in a bolt, so that the spout could be potentially reused by other topologies.
  3. Blob Writer Bolt – as the name implies, this bolt runs independent of the primary processing stream to write raw tweets (JSON) to a configured container in a Azure Blob Storage account. The bolt writes one blob per tweet using blob delimiters to annotate year, month and day of the tweet so the data set can be easily partitioned in Hive for later processing. I intentionally did not perform any aggregation meaning you would potentially wind up with millions of 1k files. This is not ideal for Hadoop batch processing. A batch process could be added to handle this  and any compression (gzip/lzo).
  4. Sentiment Bolt – calls an external REST-based service to determine the tweet’s sentiment. Currently the bolt uses the Sentiment140 API to classify each tweet as positive, negative or neutral. In the future (i.e. when I have time), I will leverage an AzureML service. The sentiment bolt emits a new tuple that consist of the unique id, serialized tweet and sentiment.
  5. Geography Bolt – calls the Bing Maps API location service to reverse geo-code a tweets location when a latitude and longitude are provided. The Bing Maps API returns AdminDistrict, AdminDistrict2, CountryRegion, Locality and PostalCode all of which are emitted with the unique tweet id in a new data stream.
  6. Merge Bolt – unions or joins the split processing workflow back into a single unified workflow. Since the sentiment and geography bolts both depend on external services, the decision was made to perform both operations in parallel. The merge bolt use field grouping on the unique tweet id to ensure that tuples with same id are routed to same worker.
  7. Swear Word Bolt – handles most of the heavy lifting for this streaming process. This bolt prepares tweets by stripping out special characters, emojiis, urls and user mentions before splitting the tweet into unigrams and bigrams. Noise words are removed and a swear word dictionary identifies and counts all swear words. Two data streams are emitted from this bolt: tweets and topics.
  8. HBase Tweet Bolt – writes enriched tweets to a HBase tweets table. Internally it uses an queue to implement micro-batching that is trigger by either batch size or a system tick tuple that is fired by Storm at a configured interval. The decision to micro-batch writes was made due to the fact that the C# HBase API is implement using the HBASE REST API (formerly Stargate).
  9. HBase Topic Bolt – writes tweet topics with both sentiment, geography and a flag to indicate if swear words where found within the tweet context. Like the HBase Tweet Bolt this bolt also implements the micro-batching pattern, rather than individual writes.

Running the Demo

Since the demo consist of multiple pieces, there are prerequisites that have to be satisfied before running the Storm topology.

  1. Azure Storage Accounts need to be created for the various Azure services used.
  2. An Azure Service Bus namespace needs to be defined and a Queue to hold the tweets created.
  3. A Cloud Service needs to be provisioned and the application which captures the Twitter stream should be deployed as a Worker Role.
  4. Provision a HDInsight HBase cluster and Remote Desktop needs to be enabled if you want to use the HBase Shell to issue queries.
  5. Provision a HDInsight Storm cluster

You probably noticed that I left out compute or sizing specifications for some of the prerequisites. This is intentional since how much “horsepower” you need depends on what you are trying to do with the demo and we will discuss scaling in a later post.

After the prerequisites are in place, your topology can be deployed directly from Visual Studio using HDInsight Tools (‘Submit to Storm on HDInsight’ found on the project context menu). The add-in connects to your Azure account, finds your Storm cluster, builds the topology spec file (based on the TopologyDescriptor class marked as active), zips up the required files and finally deploys everything to the cluster.

image
image

One the topology has been submitted and is running, you can monitor it from the Storm Dashboard found in the Azure Portal or by using the Storm Topology Viewer in Visual Studio.

image
image

To see what Twitter Tattletale is actually doing, you can query HBase using the HBase shell, an HBase API or by using Hive. Creating a Hive table over HBase is by far the easiest approach (although least performant approach). The script below will create two Hive tables: one for tweets and one for topics.

DROP TABLE IF EXISTS hbase_tweets;
CREATE EXTERNAL TABLE hbase_tweets(
rowkey string, screenname string, country string, created_date string, sentiment string, swearwords int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping' = ':key,user:screenname,loc:country_region,tweet:created_dt,
    tweet:sentiment,tweet:swear_word_count')
TBLPROPERTIES ('hbase.table.name' = 'tweets');

DROP TABLE IF EXISTS hbase_topics;
CREATE EXTERNAL TABLE hbase_topics(
rowkey string, country string, created_date string, sentiment string, vulgar_tweet boolean)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping' = ':key,tp:country_region,tp:created_dt,tp:sentiment,tp:vulgar_tweet')
TBLPROPERTIES ('hbase.table.name' = 'topics');

We can now query the tables to browse the data being collected, scrubbed and enriched using simple Hive queries.

--Summarize tweets by country
SELECT
    COUNT(*) AS TotalTweets,
    SUM(IF(swearwords > 0, 1, 0)) AS VulgarTweets,
    SUM(swearwords) AS TotalSwearWords,
    country
FROM hbase_tweets
WHERE country IS NOT NULL
GROUP BY country
ORDER BY VulgarTweets DESC;
SNAGHTML14a7ea8b
--Top 15 negative bigrams by country
SELECT
    SPLIT(rowkey,'[_]')[0] AS key,
    country,
    COUNT(*) AS TopicCount
FROM hbase_topics
WHERE sentiment='Negative'
AND SIZE(SPLIT(SPLIT(rowkey,'[_]')[0],'[ ]')) > 1
GROUP BY SPLIT(rowkey,'[_]')[0], country
ORDER BY TopicCount DESC;
SNAGHTML14b6d3ae
--XBox tweets by country
SELECT
    country,
    COUNT(*) AS TopicCount
FROM hbase_topics
WHERE SPLIT(rowkey,'[_]')[0] = 'xbox'
GROUP BY country
ORDER BY TopicCount DESC
LIMIT 15;
SNAGHTML14c399f2

Credits

Before I wrap-up, I need to give credit where credit is due:

Analyzing real-time Twitter sentiment with HBase in HDInsight

Wrap-Up

In this post, I introduce the basis and topology for the my Storm demonstration project: Twitter Tattletale. Although the subject matter of the demo is meant to be light-hearted and humorous the goal of this post (and the next few) is to introduce you to this very powerful and flexible Azure offering that presents new options for handle real-time streaming data in a distributed processing environment .In my next post, we will dig into the code, talk testing and finally deployment.

Till next time!

 

Chris