Tags

, ,

Analyzing Twitter Data with Apache Hadoop

Problem statement:
Social media platforms like Twitter provide easy access for general public to voice their opinions. Positive or Negative opinions about a company or its products and services, can travel very quickly on social media and it can have a significant impact for a company’s brand value and market share. Therefore, companies need an effective methods and tools to monitor the messages on Twitter, in real time, related to the company.

Motivations:
The primary purpose is to analyze twitter streaming data through retweets about products and services of a company and to understand what people are talking about them, so as to take quick action to address the negative publicity and also to leverage social media for publicity and brand building.

Objectives:
The primary objective is to implement an efficient system to extract, load and analyze twitter streaming data in real time and perform analysis of the messages for understanding the public opinion about companies’ products and services.

Proposed solutions:
We will answer the questions above by using SQL queries: We want to look at which users are responsible for the most retweets for finding the influential users, in descending order of most retweeted.
We will use time zones to get a sense of where in the world the users are.
We can also formulate more complex queries to answer questions like “which were the most common hashtags?”.
We use Apache Flume, Apache HDFS, Apache Oozie, and Apache Hive to design an end-to-end data pipeline that will enable us to analyze Twitter data.

Step1:My working environment
1.1 Cloudera Manager and CDH4.4
Red Hat CentOS release 6.2 (Final) (64-bit) in Cloudera quickstart VM 4.4.0 with Hadoop 2.0.0-cdh4.4.0 .check it here:
cloudera_quickstart

step1
We need core components follow: Hadoop single node cluster 2.0.0, Flume, Oozie, and Hive.

1.2 Install MySQL
MySQL is the recommended database for the Oozie database and the Hive metastore. Click here: http://dev.mysql.com/doc/refman/5.1/en/linux-installation-native.html for installation documentation.
Command to execute the MySQL installation:
step1-2
Starting our MySQL daemon:
step1-3
Step 2: Configuring Apache Flume:
2.1 Create twitter access token:
Create twitter account: Go to https://dev.twitter.com and to my applications then create a twitter account as shown below.
step2
2.2 Create my application as shown below in screenshots.
step2-1
2.3 Details about my application
step2-2
2.4 Create My Access Token button in the “Keys and access tokens” section
My access token was generated in some time which i would need to use in flume.conf file for flume configuration in next few steps.
step2-4
Step 3: Setting up flume agent:
3.1 Build or Download the custom Flume Source
Go to this link and download code from here (https://github.com/cloudera/cdh-twitter-example).
step3
For our case, we use the jar which is shared under twitter-project then we downloaded it in the /home/cloudera/Downloads directory:
step3-1
3.2 Add the JAR to the Flume classpath
With the flume-sources-jar shared in twitter-project folder.
[cloudera@localhost flume-sources]$ sudo cp flume-sources-1.0-SNAPSHOT.jar /usr/lib/flume-ng/lib/
[cloudera@localhost flume-sources]$ sudo chmod +r /usr/lib/flume-ng/lib/flume-sources-1.0-SNAPSHOT.jar

step3-2
3.3 Create a HDFS directory  “/user/cloudera/twitter/”  where you will get the twitter inputs.
[cloudera@localhost]$ hdfs dfs -mkdir /user/cloudera/twitter/
step3-3
3.4 Edit flume conf  “flume-sources/flume.conf”  to change twitter keys and tokens and hdfs path.

Use twitter access tokens from twitter account. Change HDFS path to “/user/cloudera/twitter/%Y/%m/%d/%H/”
step3-4

[cloudera@localhost flume-sources]$ pwd
/home/cloudera/Downloads/flume-sources
[cloudera@localhost flume-sources]$ cat flume.conf

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = M0zPfqY7IgI8oKRnZn0o6U6Me
TwitterAgent.sources.Twitter.consumerSecret = ARwzrT4kFe22S3OUPQZuXaHNYGlkyyLdMvMJDwJwXyb3gJQToi
TwitterAgent.sources.Twitter.accessToken = 2898574486-20SikvAYDykhFFEepxTVyM5yIAERWXq5KQJDXgO
TwitterAgent.sources.Twitter.accessTokenSecret = hvnVYZn07GT5DnVjJqC3YTG6Mwwv0wgEALoAuldaaKLrx
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
#TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.path = /user/cloudera/twitter/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount =0
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 100000
TwitterAgent.channels.MemChannel.transactionCapacity = 10000

Step 4: Apache Hive Setup:
4.1 Build or Download the JSON SerDe
A pre-built version of the JSON SerDe is available here: http://files.cloudera.com/samples/hive-serdes-1.0-SNAPSHOT.jar.
The hive-serdes directory contains a Maven project with a JSON SerDe which enables Hive to query raw JSON data.
To build the hive-serdes JAR, from the root of the git repository:

step4

[cloudera@localhost hive-serdes]$cd hive-serdes
[cloudera@localhost hive-serdes]$mvn package
[cloudera@localhost hive-serdes]$cd ../..

This will generate a file called hive-serdes-1.0-SNAPSHOT.jar in the target directory.
4.2 Create the Hive directory hierarchy

 [cloudera@localhost]$sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse 
[cloudera@localhost]$sudo -u hdfs hadoop fs -chown -R hive:hive /user/hive 
[cloudera@localhost]$sudo -u hdfs hadoop fs -chmod 750 /user/hive 
[cloudera@localhost]$sudo -u hdfs hadoop fs -chmod 770 /user/hive/warehouse 
[cloudera@localhost]$hdfs dfs –ls -R 770 /user/hive 

step4-1

You’ll also want to add whatever user you plan on executing Hive scripts with to the hive Unix group:

/home/cloudera$sudo usermod -a -G hive

4.3 Configure the Hive metastore
4.3.1 After installation completes and starting the mysql daemon, verify it status.
The Hive metastore should be configured to use MySQL. Follow these instructions to install mysql then to configure the metastore.

step4-2

4.3.2 Configure the MySQL Service and Connector
Make sure to install the MySQL JDBC driver in /var/lib/hive/lib.

step4-3

[cloudera@localhost network-scripts]$ sudo yum install mysql-connector-java
Loaded plugins: fastestmirror
Setting up Install Process
Loading mirror speeds from cached hostfile
epel/metalink | 13 kB
* base: bay.uchicago.edu
* epel: fedora.mirrors.pair.com
* extras: centos.eecs.wsu.edu
* rpmforge: apt.sw.be
* updates: centos.sonn.com
base | 3.7 kB
cloudera-cdh4 | 951 B
cloudera-gplextras | 951 B 00:00
cloudera-impala| 951 B 00:00
cloudera-manager| 951 B 00:00
cloudera-search| 951 B 00:00
epel| 4.4 kB 00:00
epel/primary_db84% [===========epel/primary_db 88% [===========epel/primary_db 92% [===========epel/primary_db | 6.6 MB 00:11
extras | 3.4 kB 00:00
rpmforge| 1.9 kB 00:00
updates| 3.4 kB 00:00
updates/primary_db | 3.3 MB 00:08 vmware-tools | 951 B 00:00
Package 1:mysql-connector-java-5.1.17-6.el6.noarch already installed and latest version
Nothing to do

step4-4

4.3.3 Create the Database and User

[cloudera@localhost ~]$ mysql -u root –p
Enter password:
mysql> CREATE DATABASE metastore;
mysql> USE metastore;
mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-
schema-0.10.0.mysql.sql;

step4-5

4.3.4 Configure the Metastore Service to Communicate with the MySQL Database
The hive-site.xml file is located on /etc/hive/conf

step4-6

step4-7

step4-8

Step 5: Create the tweets_partioned table:
5.1 Run following command for building SerDe library.

[cloudera@localhost Downloads]$ cd hive-serdes
[cloudera@localhost hive-serdes]$mvn clean install –DskipTests

For our case you use the jar which is shared under twitter-project.

step4-9

5.2 Start hive using hive command.

step4-10
5.3 Now add jar using add jar command as shown below.
Hive> add jar /home/cloudera/Downloads/hive-serdes/hive-serdes-1.0-SNAPSHOT.jar;
step4-11
5.4 Create a external table in hive as shown below.

step4-12

See below the screenshot for tweets_partioned:

hive> describe extended tweets_partioned;
OK
seid bigint from deserializer
created_at string from deserializer
source string from deserializer
favorited boolean from deserializer
retweeted_status struct from deserializer
entities struct<urls:array,user_mentions:array,hashtags:array> from deserializer
text string from deserializer
user struct from deserializer
in_reply_to_screen_name string from deserializer
year int
month int
dt int
dthour int
Detailed Table Information Table(tableName:tweets_partioned, dbName:default, owner:cloudera, createTime:1434769977, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:seid, type:bigint, comment:null), FieldSchema(name:created_at, type:string, comment:null), FieldSchema(name:source, type:string, comment:null), FieldSchema(name:favorited, type:boolean, comment:null), FieldSchema(name:retweeted_status, type:struct, comment:null), FieldSchema(name:entities, type:struct<urls:array,user_mentions:array,hashtags:array>, comment:null), FieldSchema(name:text, type:string, comment:null), FieldSchema(name:user, type:struct, comment:null), FieldSchema(name:in_reply_to_screen_name, type:string, comment:null)], location:hdfs://localhost.localdomain:8020/user/cloudera/twitter, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:com.cloudera.hive.serde.JSONSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:year, type:int, comment:null), FieldSchema(name:month, type:int, comment:null), FieldSchema(name:dt, type:int, comment:null), FieldSchema(name:dthour, type:int, comment:null)], parameters:{totalSize=442522876, EXTERNAL=TRUE, numRows=0, rawDataSize=0, numFiles=9238, numPartitions=89, transient_lastDdlTime=1434776297}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)

Step 6: Apache Oozie Workflow Setup:

6.1 Configure Oozie to use MySQL
If using Cloudera Manager, We reconfigured Oozie to use MySQL via the service configuration page on the Databases tab. We restart the Oozie service after reconfiguring. We installed the MySQL JDBC driver in /usr/lib/oozie/libext.

step5

See below the screenshot for Oozie Web console:

6-1

6.2 Create a lib directory and copy any necessary external JARs into it

External JARs are provided to Oozie through a lib directory in the workflow directory. The workflow will need a copy of the MySQL JDBC driver and the hive-serdes JAR.

step5-1

step5-2

step5-3

step5-4

6.3 Copy hive-site.xml to the oozie directory
To execute the Hive action, Oozie needs a copy of hive-site.xml.

[cloudera@localhost ~]$ sudo cp /etc/hive/conf/hive-site.xml /home/cloudera/oozie
[cloudera@localhost ~]$ sudo chown oozie:oozie /home/cloudera/oozie/hive-site.xml

step5-5

6.4 Copy the oozie directory to HDFS

[cloudera@localhost ~]$ hdfs dfs -put oozie /user/cloudera/

step5-6

6.5 Install the Oozie ShareLib in HDFS

[cloudera@localhost ~]$ sudo -u hdfs hadoop fs -mkdir /user/oozie/share/lib
[cloudera@localhost ~]$ sudo -u hdfs hadoop fs –chown oozie:oozie /user/oozie/share/lib

step5-7

In order to use the Hive action, the Oozie ShareLib must be installed. Installation instructions can be found here: http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/4.2.1/CDH4-Installation-Guide/cdh4ig_topic_17_6.html.

Step 7: Starting the data pipeline:
7.1 Start the Flume agent
Create the HDFS directory hierarchy for the Flume sink. Make sure that it will be accessible by the user running the Oozie workflow.

[cloudera@localhost ~]$ hdfs dfs –mkdir /user/cloudera/twitter
[cloudera@localhost ~]$ cd Download/flume-sources
[cloudera@localhost ~]$ flume-ng agent -n TwitterAgent -c conf -f flume.conf

See below the screenshot for the flume agent:
flume-agent1
flume-agent2
The screenshot was truncated by myself..
7.2 In some time file will start coming in HDFS. Check using below command.

[cloudera@localhost flume-sources]$hdfs dfs -ls /user/cloudera/twitter

See all results on this attached file called
7.3 Adjust the start time of the Oozie coordinator workflow in job.properties
You will need to modify the job.properties file, and change the jobStart, jobEnd, and initialDataset parameters. The start and end times are in UTC, because the version of Oozie packaged in CDH4 does not yet support custom timezones for workflows. The initial dataset should be set to something before the actual start time of your job in your local time zone. Additionally, the tzOffset parameter should be set to the difference between the server’s timezone and UTC. By default, it is set to -8, which is correct for US Pacific Time.

[cloudera@localhost ~]$ hdfs dfs -cat oozie/job.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0</span>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
nameNode=hdfs://localhost.localdomain:8020
jobTracker=localhost.localdomain:8021
workflowRoot=${nameNode}/user/${user.name}/oozie
# jobStart and jobEnd must be in UTC, because Oozie does not yet support
# customtimezones
jobStart=2015-06-16T21:00Z
jobEnd=2015-12-22T03:00Z
# This should be set to an hour boundary. In this case, it is set to 8 hours
# before the jobStart, since PST is GMT-8
#initialDataset=2012-08-23T09:00Z
initialDataset=2015-06-16T01:00Z
# Timezone offset between UTC and the server timezone
tzOffset=-8
oozie.use.system.libpath=true
oozie.coord.application.path=${nameNode}/user/${user.name}/oozie/coord-app.xml

See all results on this attached file called

7.4 Start the Oozie coordinator workflow
On this case, we used Oozie, on 2015/06/16 after starting Start the Flume agent:
Screenshot for starting the oozie coordinator:

cloudera@localhost ~]$oozie job -oozie http://localhost:11000/oozie -config oozie/job.properties –run

Once executed this command, the coordinator created and ran instances of the workflow each hour, and data were automatically become queryable through Hive as the partitions get created.

Below the information about the job id :
Screenshot for Oozie Web Console showing the state of the jobs:
oozie1
oozie2
oozie3
Screenshot for Job DAG:
dag

Screenshot for file browser:

browser

Step 8: To find out influential celebrity or people by Querying semi-structured Data with Apache Hive:

8.1 Load twitter streaming data from hdfs to hive table by using load command:
Load twitter streaming data from hdfs to hive table wich I got in May 2015 (view Step 7) by using the script shell below to build all command for load:

if [ -f /home/cloudera/load.hive ] ;
then
echo >/home/cloudera/load.hive
fi
for dt in $(hdfs dfs -ls /user/cloudera/twitter/2015/05/|awk '{print $8}'|awk -F "/" '{print $7}')
do
for dthour in $(hdfs dfs -ls /user/cloudera/twitter/2015/05/$dt|awk -F "/" '{print $8}')
do
echo "LOAD DATA INPATH '/user/cloudera/twitter/2015/05/$dt/$dthour/' OVERWRITE INTO TABLE tweets_partioned PARTITION (year=2015, month=05, dt=$dt, dthour=$dthour);" >> /home/cloudera/load.hive
done
done

Then we obtained the file load.hive which content commands for load, example:

The following lines were truncated. See all results on the file
Then, we executed the command below to load data on the hive table tweets_partioned:

[cloudera@localhost ~]$ hive -i load.hive

Number of lines on the table: 116785

8.2 Using Hive to find influential users:

The query below will find usernames, and the number of retweets they have generated across all the tweets that we have data for:

select t.retweeted_screen_name, sum(retweets) as total_retweets, count(*) as tweet_count
from
(
select retweeted_status.user.screen_name as retweeted_screen_name, retweeted_status.text, max(retweeted_status.retweet_count) as retweets
from tweets_partioned
group by retweeted_status.user.screen_name,retweeted_status.text) t
group by t.retweeted_screen_name
order by total_retweets DESC, tweet_count ASC
limit 15;

Results in:

For the few days of data, I found that these were the most retweeted users for the industry:

retweeted_screen_name total_retweets tweet_count
SoftLayer                  8448        8
tableau                    2905       31
DEVOPS_BORAT               2858        2
KirkDBorne                 2737      170
keen_io                    1925        8
hortonworks                1360       25
googleanalytics            1242       19
mhoye                      1225        1
garybernhardt              1083        3
analyticbridge             1045      132
charlesmilander             890        3
BetaList                    784        1
BigDataBlogs                662      159
sadserver                   643        1
VadymMelnyk                 642        2

Below the screenshot:

We can see from these results whose tweets are getting heard by the widest audience, and also to know whether these people are communicating on a regular basis or not. This information can be used to more carefully target our messaging in order to get them talking about these products, which, in turn, will get other people talking about them.
8.3 Geographic distributions of users:
Which time zones are the most active per day? :

SELECT user.time_zone, SUBSTR (created_at, 0, 3), COUNT (*) AS total_count
FROM tweets_partioned
WHERE user.time_zone IS NOT NULL
GROUP BY user.time_zone,
SUBSTR (created_at, 0, 3)
ORDER BY total_count DESC
LIMIT 15;

Results in:

user.time_zone created_at total_count
Eastern Time (US & Canada) Mon 2856
Eastern Time (US & Canada) Sun 2170
Eastern Time (US & Canada) Wed 2157
Pacific Time (US & Canada) Mon 1894
Eastern Time (US & Canada) Sat 1728
Pacific Time (US & Canada) Wed 1576
Central Time (US & Canada) Mon 1498
London                     Mon 1463
London                     Sun 1395
Eastern Time (US & Canada) Tue 1330
Pacific Time (US & Canada) Sun 1265
Pacific Time (US & Canada) Sat 1174
Amsterdam                  Mon 1087
London                     Sat 1069
Amsterdam                  Sun 1028

Below the screenshot:

Interestingly, more users are tweeting about the selected products on the east coast, then the west coast or pacific. Europe also seems to be pretty interested in big data infrastructure.

8.4 “Which were the most common hashtags? “:
More complex queries to ask questions like “Which were the most common hashtags? “:

SELECT
LOWER(hashtags.text),COUNT(*) AS total_count
FROM tweets_partioned LATERAL VIEW EXPLODE(entities.hashtags) t1 AS hashtags
GROUP BY LOWER(hashtags.text)
ORDER BY total_count DESC
LIMIT 15;

Results in:

hashtags.text           total_count
bigdata                 42019
analytics               12344
iot                     11455
healthcare               5692
wearables                5430
nanotechnology           5243
datascience              4353
job                      3162
jobs                     2931
cloud                    2871
Hadoop                   2753
data                     2740
cloudcomputing           2330
tech                     2102
machinelearning          2053

See below the screenshot:

The first term that shows up is big data, which means several people search for this item, followed by iot, then analytics and so on.

browser

Conclusion:
In this report we’ve seen how we can take some of the components of Cloudera CDH and combine those to create a data infrastructure that provide companies or organization an easy access for general public to voice their opinions about their products. This same infrastructure could be used for a variety of applications designed to look at Twitter data, such as identifying spam accounts, or identifying clusters of keywords.
Though Companies or organizations can used this information as a feedback mechanism for their products. They can extend this product in the future as the sentiment analysis task to know in real time the feedback of their products. The automatic sentiment analysis reduces human intervention and, thus, the complexity and cost of the whole process.