Monthly Archives: December 2016

Flume: sending data via stream

It is possible to capture streaming data in HDFS files. A tool to do this is Flume. The idea is that we have 3 elements: sources that provide a stream, a channel that transports the stream and a sink where the stream ends in a file.
This can already be seen if we look at the config file:

agent1.sources = netcat-source
agent1.sinks = hdfs-sink
agent1.channels = memory-channel

# Describe/configure the source
agent1.sources.netcat-source.type = netcat
agent1.sources.netcat-source.bind = 192.168.2.60
agent1.sources.netcat-source.port = 12345
agent1.sources.netcat-source.channels = memory-channel
# Describe the sink
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = /loudacre/webtom/
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 10000

It all starts with a source that is a netcat stream that is sent to port 12345. The source is labelled “sources”. Then we have a sink that is labelled as “hdfs-sink”. Finally, we have the channel that is labelled “memory-channel”. This “memory-channel” is also mentioned in the sources as the channel that is used to send the stream into and it is mentioned in the sink as the faucet that delivers the data.
On another machine, we start the netcat stream with:

type "C:\Program Files (x86)\netcat\readme.txt" |   "C:\Program Files (x86)\netcat\nc.exe"  192.168.2.60 12345

This sends the content of a file as stream to a netcat proces that sends the stream to host 192.168.2.60 with port 12345. Exactly these sources were mentioned in the config file as the source of the stream.
The flume process is started with

flume-ng agent --conf /etc/flume-ng/conf --conf-file home/training/training_materials/dev1/exercises/flume/solution/bonus_netcat_tom.conf --name agent1 -Dflume.root.logger=INFO,console

We may see the data being received in files of HDFS:

Partitioned Table in Hive

It is possible to partition the tables in Hive. Remember the data are stored in files. So we expect the files to be partitioned. This is accomplished by a split of the files over different directories. One directory serves one partition, a second another partition etc.
Let us take the example of 7 records that are split over 5 partitions according to their number. We have:

 

We see in the directory that corresponds to the file location, that 5 directories are present, each directory corresponding to one partition. 

Let us switch to partition 2 that stores (2, ine), (2,ellen) and (2, henkjan). When the directory with the data on partition 2 is investigated, we see a file with the data within that directory:

With the data, the nummer that is used to create the partitions is omitted. The nummer is stored in the directory name.

To create this table, we use this command:

CREATE EXTERNAL TABLE fam_by_nummer(
naam STRING)
PARTITIONED BY (nummer INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/loudacre/fam_by_nummer';

To load the table, we take a detour.
First, the data are loaded to hdfs:

sqoop import \
--connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=192.168.2.2)(port=1521))(connect_data=(service_name=orcl)))" \
--username scott --password bunvegni \
--table fam \
--columns "NUMMER, NAAM" \
--m 1 \
--target-dir /loudacre/fam_by_nummer_temp1;

Then a table is created:

CREATE EXTERNAL TABLE fam_by_nummer_temp(
nummer INT, naam STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/loudacre/fam_by_nummer_temp1';

And the data are loaded from this table into the partitioned table:

insert into table fam_by_nummer
partition (nummer)
select naam, nummer from fam_by_nummer_temp;

The exact sequence of the field wasn’t directly clear to me, but it could be derived from the Hue interface.
Adding data to an existing partition is easy.
First create a file and then upload it to the correct directory:

hadoop dfs -put /tmp/extra /loudacre/fam_by_nummer/nummer=2

Adding a new partition requires action on the Hive side:

ALTER TABLE fam_by_nummer
ADD PARTITION (nummer=6);

This must be added by the creation of the directory in the proper environment:

hadoop dfs -mkdir /loudacre/fam_by_nummer/nummer=6

And the storage of a file with data in that directory:

hadoop dfs -put /tmp/extra /loudacre/fam_by_nummer/nummer=6

 

Manipulating Avro

Avro files are binary files that contain data and the description of the files. Thereby it is a very interesting file format. One may send this file to any application that is able to read Avro files. Just as an example: one may write the file is (say) PHP and send it to (say) Java. In previous posts I showed how such file could be written and read by PHP. See a post here.
In this note I show one may use a jar file to create and to read an avro file. The jar file is avro-tools-1.8.1.jar. This jar file enables us to create an avro file from a schema definition and a json file. The schema file looks like:

{
  "type" : "record",
  "name" : "twitter_schema",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc"  : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc"  : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc"  : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

wheras the JSON data file looks like:

 {"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp":1366150681}
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp":1366154481}

This can then be combined in an avro files with:

java -jar "C:/Program Files/Java/avro-tools-1.8.1.jar" fromjson --schema-file D:\Users\tmaanen\CloudStation\java\avro2\user.avsc D:\Users\tmaanen\CloudStation\java\avro2\user.json > D:\Users\tmaanen\CloudStation\java\avro2\user.avro

We now have an avro file. This is a binary file. This file can translated to a json file with:

java -jar "C:/Program Files/Java/avro-tools-1.8.1.jar" tojson D:\Users\tmaanen\CloudStation\java\avro2\user.avro > D:\Users\tmaanen\CloudStation\java\avro2\user2.json

Likewise the scheme can be derived with:

java -jar "C:/Program Files/Java/avro-tools-1.8.1.jar" getschema D:\Users\tmaanen\CloudStation\java\avro2\part-m-00000.avro > D:\Users\tmaanen\CloudStation\java\avro2\user2.avsc

For me, this utility is very handy to investigate the result from a sqoop command. Roughly stated, such sqoop command may import the contents of a database table to an HDFS platform. Such command may look like:

sqoop import \
--connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=192.168.2.2)(port=1521))(connect_data=(service_name=orcl)))" \
--username scott --password binvegni \
--table fam \
--columns "NUMMER, NAAM" \
--m 1 \
--target-dir /loudacre/fam_avro \
--null-non-string '\\N' \
--as-avrodatafile

The output from such command might be an avro file that might be called part-m-00000.avro. The question is: how do I know that this file contains the correct data? I could then import the avro file to Windows and translate it with:

java -jar "C:/Program Files/Java/avro-tools-1.8.1.jar" tojson D:\Users\tmaanen\CloudStation\java\avro2\part-m-00000.avro > D:\Users\tmaanen\CloudStation\java\avro2\part-m-00000.json

This provides me the confirmation that the avro file is correct.

Parquet format

As we know, we may store table definitions in the metastore. These table definitions then refer to a location where the data are stored. The format of the data might be an ordinary text file or it might be an avro file. Another possibility is a parquet file. This parquet format is an example of a packed/ zipped format.
To create such table is rather straightforward. First, we transfer a table to a parquet file on HDFS:

sqoop import \
--connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=192.168.2.2)(port=1521))(connect_data=(service_name=orcl)))" \
--username scott --password binvegni \
--table fam \
--columns "NUMMER, NAAM" \
--m 1 \
--target-dir /loudacre/fam_parquet \
--as-parquetfile;

This results in a file that can be found in directory /loudacre/fam_parquet. For some reason, the file is called 5fe8fcaa-6095-40ec-b499-d73d6d971b6f.parquet. From Impala, we may then define the table with:

CREATE EXTERNAL TABLE fam_parquet
LIKE PARQUET '/loudacre/fam_parquet/5fe8fcaa-6095-40ec-b499-d73d6d971b6f.parquet'
STORED AS PARQUET
LOCATION '/loudacre/fam_parquet/';

Avro format

In Hive, we see a situation where a table definition is stored in a metastore. This table definition is linked to a directory where the data are stored. It is possible to use different formats here. One may think of a text format. But other formats are possible too. One example is the avro format. This format can be characterised by a combination of file definition and the actual data.
Let us see how these data can be imported.
A possibility is to use the scoop command to import data in avro format:

sqoop import \
--connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=192.168.2.2)(port=1521))(connect_data=(service_name=orcl)))" \
--username scott --password binvegni \
--table fam \
--columns "NUMMER, NAAM" \
--m 1 \
--target-dir /loudacre/fam_avro \
--null-non-string '\\N' \
--as-avrodatafile

The result is composed of two parts:
1: The directory where the table content is stored. This can be found on HDFS in directory in /loudacre/fam_avro. This can be verified with “hadoop dfs -ls /loudacre/fam_avro”. This provides an overview of the files where the data are stored. When this is opened, one sees:

So, we have the data. The data definition is stored in the same directory as the directory where the scoop command was launched. To get this on HDFS platform, we issue ” hadoop dfs -put sqoop_import_fam.avsc /loudacre/”. This stored the definition on HDFS level. This can be investigated:

If we have the data (in directory /loudacre/fam_avro) and the definition (in file /loudacre/sqoop_import_fam.avsc), we may proceed by a definition in hive. Then hive is started by: “beeline -u jdbc:hive2://192.168.2.60:10000 -n training -p training”, followed by:

CREATE EXTERNAL TABLE fam_avro
STORED AS AVRO
LOCATION '/loudacre/fam_avro'
TBLPROPERTIES ('avro.schema.url'=
'hdfs:/loudacre/sqoop_import_fam.avsc');

A check with “select * from fam_avro” showed the data as expected.

 

Create a Hive table – 3 ways

In this little note, I want to show three different ways to create a table on Hive. The first one starts with a file on HDFS that is available and we create a table upon this file. This table is defined as an external file that is exposed as a table.

The code to be executed in Hive is:

CREATE EXTERNAL TABLE fam
(nummer SMALLINT,
name STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hdfs'

And we may say the content of the table from, say SQL Developer:

 

From the clause “LOCATION ‘/user/hdfs'”, we know that files that will deliver the data will be stored in directory /user/hdfs. If we want to add data to the table, we only have to copy files to that location. Let us start with two files:

We then copy an additional file to this environment ( hadoop dfs -put /tmp/fam3 /user/hdfs ). This shows:

 

When we then start the Hive environment: (  beeline -u jdbc:hive2://192.168.2.60:10000 -n training -p training ), we may issue the select command: select * from fam; . This provides the content of the fam table. One may see the content of the newly added fam3 table. This shows clearly how easy it is to add data to a Hadoop platform. The only thing we have to do is to add files to the correct environment. These files will be automatically recognised as part of a table.

An alternative to adding data to a table is to move data within the HDFS environment. Suppose we have a file on HDFS that contains data that should be added to the table. First, we start Hive:

beeline -u jdbc:hive2://192.168.2.60:10000 -n training -p training

I then moved data into the table:

LOAD DATA INPATH '/user/training/fam4' INTO TABLE fam;

I then saw the file being added to the data environment:

 

 

Another way is a direct import from, say Oracle, which creates on the fly a file on HDFS and which adds the table definition to Hive. The code is:

sqoop import \
--connect "jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=192.168.2.2)(port=1521))(connect_data=(service_name=orcl)))" \
--username scott --password binvegni \
--table fam \
--columns "NUMMER, NAAM" \
--m 1 \
--fields-terminated-by ',' \
--table device \
--hive-import

Which creates a file on hdfs:

and a table in Hive that can be seen from, say Excel:

 

 

It is good to realise that we must undertake two actions:

1:An action to store the data (in a file, that is stored in a HDFS directory.

2: An action to create table definitions that are stored in a metastore.

 

A third manner is to use the HCatalog:

hcat -e "CREATE TABLE prut \
(id INT, company STRING) \
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \
LOCATION '/user/prut'"