Monthly Archives: October 2015

Sqoop and Hive

It is possible to use Sqoop to directly load from a RDBMS into Hive. This opens interesting possibilities. Data that are stored in a RDBMS and that need to be analysed on a cheaper platform, can be migrated via Sqoop to a Hadoop platform. Sqoop is generaly seen as a reliable medium to undertake such migration. The command is straigthforward:

sqoop import --connect jdbc:mysql:// --table persons -m 1 --username thom --password thom24257  --hive-import

I noticed from the log that the import is done in three steps:

  • In a first step, the data are imported from the RDMBS and the data are stored as HDFS datasets.
  • The table is defined on Hive in the metadata store.
  • In the third step, the data are moved from the HDFS platform to the Hive Warehouse.

Hive – mapreduce extension

It is good to realise that Hive is built upon a mapreduce framework. The idea is that Hive is developed by facebook to facilitate analysis on Hadoop files. It is possible to use some kind of a SQL dialect in stead of a Python or a java programme to do your analysis. When a Hive command is run, one sees clearly the map reduce steps. See below.

[pivhdsne:~]$ hive -e "select count(*) fron drink;"
15/10/19 22:14:48 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
15/10/19 22:14:48 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
2015-10-19 22:15:34,169 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.14 sec
2015-10-19 22:15:35,241 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.14 sec
2015-10-19 22:15:36,320 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.14 sec
2015-10-19 22:15:37,461 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.47 sec
2015-10-19 22:15:38,502 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.47 sec
2015-10-19 22:15:39,559 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.47 sec
MapReduce Total cumulative CPU time: 5 seconds 470 msec
Ended Job = job_1445283750993_0001
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 5.47 sec   HDFS Read: 266 HDFS Write: 2 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 470 msec

This also led to criticism upon Hive. It is stated that Hive is still limited by the bottlenecks within mapreduce. Therefore other parties, such as Cloudera developed Impala to circumvent such bottlenecks.
But Hive doesn’t stand still. With Hortonworks an improved version of Hive is developed that seems to provide far better performance. A name that is often mentioned is Tez: the name of the execution engine that is used within this context.

Python: yet another way to implement map/ reduce

In this blog, I will discuss the word count problem as done with Python. It is often used to show how map reduce works. In most examples, it is developed within the context of a Java programme. The idea is that the programme is split into two stages. In one stage, calculations are made on a part of the total data set. The results are stored in a name, value set. This stage is called the mapper stage. In a subsequent stage, the reduce stage, the results as stored in name, value sets are taken together. They are summarised into a single solution. This stage is called the reduce stage and it will generate one final outcome.
In Python, the mapper programme looks like:

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

The outcomes are stored as word,1. This is the name, value set that is sent by the mapper as an intermediate outcome. This programme can also be run on its own via:

/usr/local/hadoop/bin/hadoop dfs -cat /user/drink/drink | /home/hduser/

leading to:

hduser@ubuntu:~$ /usr/local/hadoop/bin/hadoop dfs -cat /user/drink/drink | /home/hduser/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/10/18 12:27:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
choco	1
sinas	1
cola	1
water	1

The programme works on streams. Hence the input must be generated as a stream that is sent to the STDIN.
The second part, the reduce stage looks like:

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Both parts can be run via:

/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar 
-file /home/hduser/  -mapper /home/hduser/
-file /home/hduser/  -reducer /home/hduser/
-input /user/drink/* 
-output /user/output91

The intermediate results are sent as a stream from mapper to reducer. To handle this on the hadoop platform, the streaming jar is used. Roughly stated, the system works as follows: stream input > mapper > reducer > outcome. Thanks to the hadoop streaming jar, we do not have to indicate that intermediate outcomes are generated on one server and are sent to another server where the reduce part happens.

ODBC en Hive


In my view, the new development that we see now is building links to a Hadoop platform. One such development is building ODBC drivers that allow windows tools to access a Hadoop platform. An an example, one may think of Excel accessing tables on Hive. Think for a second on the possibilities: one may still use Excel whereas the actual handling takes place on hadoop and the files are stored under Hive. One such example is  a wonderful ODBC driver that is taken from MapR with its 64 bit compagnion here . These ODBC drivers are developed by MapR, but I was able to use them in connection with a Pivotal framework. It took a bit of trial and error, but I got it going. I had to find out whether my office was a 32 bit or a 64 bit. I then remembered that the first 64 bits office version was 2010 and mine was 2007. Hence I took the 32 bit. Then I remembered, Pivotal uses an older version of Hive, so I used the Hive 1 version of the ODBC driver. All in all, the connectio was successful and I could access the Pivotal environment from Excel. Wht a view!

Pig revisited

Recently, I revisited Pig. Pig is a language that allows you to analyse data sets in a Hadoop environment. It is created at Yahoo to circumvent the technicalities of creating a MapReduce Java job. Yahoo claims that most of her queries on a Hadoop platform can be replaced by a Pig script. As Pig is much easier to write, this leads to much time gains.
After some study, I realised that two concepts are central in Pig: the concept of a tuple that corresponds roughly to a line and bag that stands for a set of within that line.
Let us analyse a Pig script. The script reads as:

A = load '/user/gpadmin/testtom/data-00000'  USING  PigStorage(',') as (id:int,name:chararray);
B = foreach A generate id, name, 1 as een; 
C = group B by een; 
D = foreach C generate group, AVG( as gem;
dump D;

The first line reads a dataset. It generates a set of tuples, where each tuple has two variables: id and name. One could think of different lines, where each line contains an id and a name.
We have:


The second line goes through the set of tuples from A and adds to each tuple a variable een that has one value: 1. Hence each tuple has three variables: id, name and een. Hence, we have different lines. Each line contains an is, name and een. We have:


The third line creates a new tuple. This tuple is based on the value for een. Hence, each line contains data that relate to one value of een. Within the tuple, we have a bag where tuples are collected from the source.
We have:


Here, 1 is the value for een, upon with the grouping is based. Within een=1, we have a bag that contains all source tuples that are related to een=1.
We may proceed by applying a summary function upon that bag. That is done in the last line where an average is taken. For each value of een, an average on id is calculated. This is done in the fourth line.
The end result is:

2015-10-04 21:34:59,868 [main] INFO - Key [pig.schematuple] was not set... will not generate code.
2015-10-04 21:34:59,872 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2015-10-04 21:34:59,872 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

The end result is 1 (grouping value) and 3 (the average of id within the bag).

Another Pig program:

A = load '/user/gpadmin/name.txt'  USING  PigStorage(',') as (name:chararray,age:int);
B = filter A by age > 15;
C = load '/user/gpadmin/drink.txt'  USING  PigStorage(',') as (klant:chararray,drank:chararray);
D = join B by name, C by klant;
E = foreach D generate name, drank, 1 as een;
F = group E by name;
G = foreach F generate group, COUNT(E.een) as totaal;
dump G;

This programme reads two files from an HDFS platform. It then inner joins the two files. After that, bags are created. These bags can be indicated by E.een,, E.drank. These bags are grouped by name.