Monthly Archives: August 2016

Python in a map reduce environment

I have written a very small python programme that follows the mapper / reducer sequence. This works as a replacement of a more complicated set of Java programmes that might be created to generate a mapper / reducer sequence. The idea is relatively simple. We create a stream from an input file. That stream is processed by a mapper programme (written in Python) that produces a series of name, value pairs. That must then be sorted and subsequently streamed to a reducer programme (written in Python). That programme then produces the final outcome.
Such logic is used on a hadoop platform. The idea to stream data to a mapper is translated on a hadoop platform as something that can be run concurrently on different nodes. After that the intermediate output stream is sent a stream of name, value pairs to a reducer where the final calculations are made.
Let us first look at an input file that is used to be analysed. It looks like:

2013-10-09	13:22	Gouda	Wafels	2.98	Visa
2013-10-09	13:22	New York	Iphone	455.76	tMasterCard
2013-10-09	13:22	New York	Rommel	354.76	tMasterCard
2016-10-09	I/O error

This input set is processed by this Python programme:

#!/usr/bin/python
# Your task is to make sure that this mapper code does not fail on corrupt data lines,
# but instead just ignores them and continues working
import sys

def mapper():
    # read standard input line by line
    for line in sys.stdin:
        # strip off extra whitespace, split on tab and put the data in an array
        data = line.strip().split("\t")

        # This is the place you need to do some defensive programming
        # what if there are not exactly 6 fields in that line?
        if len(data) != 6:
            continue

        # this next line is called 'multiple assignment' in Python
        # this is not really necessary, we could access the data
        # with data[2] and data[5], but we do this for conveniency
        # and to make the code easier to read
        date, time, store, item, cost, payment = data

        # Now print out the data that will be passed to the reducer
        print "{0}\t{1}".format(store, cost)


def main():

	mapper()
	sys.stdin = sys.__stdin__

main()

One might test whether it actually work by a command cat test | ./mapper.py. This should produce a set of name, value pairs. I got as outcome:

[training@localhost ~]$ cat test | ./mapper.py
Gouda	2.98
New York	455.76
New York	354.76

Next programme is a reducer programme that looks like:

#!/usr/bin/python

import sys


salesTotal = 0
oldKey = None

# Loop around the data
# It will be in the format key\tval
# Where key is the store name, val is the sale amount
#
# All the sales for a particular store will be presented,
# then the key will change and we'll be dealing with the next store

for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 2:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisSale = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", salesTotal
        oldKey = thisKey;
        salesTotal = 0

    oldKey = thisKey
    salesTotal += float(thisSale)

if oldKey != None:
    print oldKey, "\t", salesTotal

This can also be run on Linux:

[training@localhost ~]$ cat test | ./mapper.py | ./reducer.py
Gouda 	2.98
New York 	810.52

A next step is to run everything on hadoop as a streaming set:

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input /myinput/test -output joboutput

which generates:

packageJobJar: [mapper.py, reducer.py, /tmp/hadoop-training/hadoop-unjar5879319460019186346/] [] /tmp/streamjob1136107035137838419.jar tmpDir=null
16/08/27 12:01:12 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/08/27 12:01:12 WARN snappy.LoadSnappy: Snappy native library is available
16/08/27 12:01:12 INFO snappy.LoadSnappy: Snappy native library loaded
16/08/27 12:01:12 INFO mapred.FileInputFormat: Total input paths to process : 1
16/08/27 12:01:12 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-hdfs/cache/training/mapred/local]
16/08/27 12:01:12 INFO streaming.StreamJob: Running job: job_201608271023_0015
16/08/27 12:01:12 INFO streaming.StreamJob: To kill this job, run:
16/08/27 12:01:12 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201608271023_0015
16/08/27 12:01:12 INFO streaming.StreamJob: Tracking URL: http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201608271023_0015
16/08/27 12:01:13 INFO streaming.StreamJob:  map 0%  reduce 0%
16/08/27 12:01:16 INFO streaming.StreamJob:  map 100%  reduce 0%
16/08/27 12:01:19 INFO streaming.StreamJob:  map 100%  reduce 100%
16/08/27 12:01:21 INFO streaming.StreamJob: Job complete: job_201608271023_0015
16/08/27 12:01:21 INFO streaming.StreamJob: Output: joboutput10

The output can be inspected as

[training@localhost ~]$ hadoop fs -cat /user/training/joboutput10/part-00000
Gouda 	2.98
New York 	810.52

Three little handy Oracle statements

I have three little Oracle statements that I us quite often. I realise I use these statements in most programmes I write. But at the same time, I often forget the exact syntax. Therefore this entry in this blog. It will act as a look-up whenever I once again forgot the precise syntax.

The first retrieves the hour from a datefield. It looks like:

select to_number(substr(numtodsinterval( sysdate - trunc(sysdate), 'day' ),12,2)) as hour from dual;

The second one provides a row number to each row. This row number is reset at 1 whenever one field changes value.

select tel1, tel2, ROW_NUMBER () OVER (PARTITION BY  tel1 ORDER BY  tel2) AS RN
from
(select 1 as tel1, 1 as tel2 from dual union
select 1 as tel1, 2 as tel2 from dual union
select 2 as tel1, 1 as tel2 from dual union
select 2 as tel1, 2 as tel2 from dual)A;

A third handy Oracle statement is generating a number that is recognised in Excel as a date. Excel starts its date range by stating 01 January 1900 being equal to 1, 02 January 1900 being equal to 2, and so forth. This goes on and 42602 equals 20 August 2016. The statement is:

select to_date(sysdate) - to_date('19000101','yyyymmdd')+2 as datum from dual;

Why this “2” in this statement? This happens as Excel knows 29 Feb 1900 that never existed. Therefore 59 is 28 Feb 1900, 60 is 29 Feb 1900 and 61 is 1 March 1900. Excel thus assumes 1900 is a leap year, whereas this is never included in the Georgian calendar. This is a well-known issue with Excel. It is a bug that is maintained as to support backward compatability with Lotus 123.

Reading XML in Oracle -3

I now have a project where I need to write the content of an XML file into an Oracle table. The idea is actually quite simple. An XML file might be seen as a document that had an hierachical structure. It is composed of several seperate subdocuments, that could be seen as several trees.   Each tree is a trunk  with branches, where a brach is translated into a table. The content of one branch is one record. A subdocument may have one or multiple branches and it thus generates one or more records. As we have one or more subdocuments in the document, we end up having one or more records in a table. The records stem from a branch in the subdocument and we may have more than one subdocument in the document.

I created two procedures to read this XML file.

The first procedure is a SQL LDR job that adds the XML document to an Oracle table. The XML document is stored as an object in an Oracle table.

The second procedure retrieves the information from the XML object into records.

Let me go in some detail here.

Reading the XML object

I created a table that may contain the XML objects. Its DDL is:

 CREATE TABLE "SCOTT"."XML_TAB_CONTRACT" 
   (	"XML_DATA" "XMLTYPE", 
	"ID" NUMBER(*,0), 
	"CONTRACT" NUMBER
   )

The XML will be stored in XML_DATA which is a field of XMLTYPE.

The CTL file that actually loads the XML file reads as:

load data
infile *
append
into table XML_TAB_CONTRACT
fields terminated by ';' 
(
 filename FILLER char(52),
 XML_DATA  lobfile(filename) terminated by eof,
 id,
CONTRACT 
)

begindata
C:\Users\Tom\Dropbox\Uitwisseling\Voorbeeld2078a.xml;201608131;2078

And the command to start the loading process is as:SQLLDR CONTROL=C:\Users\Tom\Dropbox\Uitwisseling\xml.ctl LOG=sample.log, BAD=baz.bad, USERID=scott/binvegni, ERRORS=999, LOAD=500000, DISCARD=toss.dsc, DISCARDMAX=5. This starts the SQL LDR process which uses the CTL file to know which xml file must be loaded into which table.

Reading the XML objects.

The code to read the XML object is composed of two loops.
One loop reads the XML file and looks for the branches that must be translated as records into the table. The branches are indicated by “/Message/PayLoad/ShipmentAsn4”. A cursor is defined that retrieves the set of XML objects. After that a loop is created that processes each object, one by one.
The second loop actually translates the branch into a record. In the second loop, we retrieve the actual value one by one and store it as an attribute in an object that is indicated by “r” .

reate or replace PROCEDURE SEARCH_XML
AS
v_waarde integer;
name varchar(100);
flip varchar(10);
Lcntr integer:=0;
docu varchar(9999);
CURSOR c1
IS
SELECT EXTRACT(SYS_MAKEXML(0,"A1"."SYS_NC00002$"),'/Message/PayLoad/ShipmentAsn4') "AAPJE" FROM "SCOTT"."XML_TAB_CONTRACT" "A1"  ;
l_aantal   c1%ROWTYPE;
BEGIN
 dbms_output.enable;
 OPEN c1;
 LOOP
   Lcntr := Lcntr + 1;
   FETCH c1 INTO l_aantal;
   EXIT WHEN c1%NOTFOUND;
    FOR r IN (
    SELECT ExtractValue(Value(p),'/ShipmentAsn4/GlnShipTo/text()') as flip
    FROM TABLE(XMLSequence(Extract(XMLTYPE(''||l_aantal.aapje.getClobVal()||''),'     /totaal/ShipmentAsn4'))) p
   ) LOOP
   v_waarde := adresseq.nextval;
   dbms_output.put_line(Lcntr ||' ' || v_waarde||' '||r.flip );
  END LOOP; --rij in tak XML
  END LOOP; --vrsch XML in tabel

END SEARCH_XML;