Monthly Archives: January 2017

Getting a histogram from Big Data with Scala

Scala can be used as a tool to manipulate big data. If it is used in the spark context, we have a possibility to combine two strong tools: spark with its possibility to bypass the MapReduce bottleneck and Scala with its short learning curve.

The idea that Scala can be closely integrated with Spark is already clear when Scala is started. It can be started with the command “spark-shell” from the terminal. After a few seconds the Spark sign is shown with the note “Using Scala version 2.10.4”. A few lines below, one sees “Spark context available as sc”. This is the statement that we will use.

We have quite a number of files in an HDFS directory. They can be made accessible as a RDD via:

val rawblocks = sc.textFile("/linkage")

Such a RDD is a dataset that is composed of a number of lines. The number of lines can be seen with rawblocks.count() that yields: res63: Long = 5749142.
To do anything with this RDD, we must 1/ remove lines that are used as header, 2/ split the lines and 3/identify numbers as numbers. Let us do so.

Removal of lines that act as a header can be done with this function:

def isHeader(line: String): Boolean = {
      line.contains("id_1")
      }

followed by

val noheader = rawblocks.filter(x => !isHeader(x))

Splitting the lines and identifying numbers as numbers is a bit cumbersome, but the code looks pretty straightforward:

def toDouble(s:String) = {
      if ("?".equals(s)) Double.NaN else s.toDouble
      }

case class MatchData(id1: Int, id2: Int,
      scores:Array[Double],matched: Boolean)

def parse(line: String) = {
      val pieces = line.split(',')
      val id1 = pieces(0).toInt
      val id2 = pieces(1).toInt
      val scores = pieces.slice(2, 11).map(toDouble)
      val matched = pieces(11).toBoolean
      MatchData(id1, id2, scores, matched)
      }

followed by 

val parsed = noheader.map(line => parse(line))

Then we are ready to create the histogram:

val matchCounts = parsed.map(md => md.matched).countByValue()
val matchCountsSeq = matchCounts.toSeq
matchCountsSeq.foreach(println)

 
Finally a less complicated script that calculates the mean of a column:

val famblocks = sc.textFile("/user/hdfs/fam")
case class MatchData(id1: Int, id2: String)
def parse(line: String) = {
      val pieces = line.split(',')
      val id1 = pieces(0).toInt
      val id2 = pieces(1).toString
      MatchData(id1,id2)
      }
val parsed = famblocks.map(line => parse(line))
val matchCounts = parsed.map(md => md.id1).mean()

with fam an HDFS file containing lines with a number and name, comma separated. (1,tom etc).

 

Scala

Scala is a language that is used for general purposes. One may use it as a statistical tool, a tool to undertake pattern matching etc. Just like any other programming tool like Java, C++, Fortran might do. But on top of that, Scala is used as a means to steer Big Data on a Hadoop platform. For me, being interested in Big Data, Scala is a worthwhile investment.
Let me first show a screenshot on Scala:

The programme is quite straightforward. A function is defined with a name codeer. It receives one variable. When this variable is an ‘a’ or ‘b’, it will be translated into a ‘1’or ‘2’. In all other cases, it won’t be translated. The variable ‘kijk’ that contains the resulting character is returned. Subsequently, one may use this function.

In the Big Data environment, one may start Scala by the command “spark-shell”. If everything is well installed, one sees something like:

 

It is then clear that the integration between the Big Data environment and Scala is strong in this environment. When the environment is started, one sees that a context variable is created that allows to access the Spark environment:

This then allows to retrieve a Big Data dataset as an object in Scala:

val rawblocks = sc.textFile(“linkage”)

The nice thing is that we may grab all files that are in directory “linkage” and access then as one object that is called rawblocks.

Network reaction from Python

I have a php script that runs as cgi on a webserver. The programme is quite simple. First is asks for a userid and password. The userid and password are sent as a parameter. If these value coincide with expected value, the system returns a page where the user may click on a hyperlink to continue his journey.
We thus have a stream with two parameters that is sent from the user browser to the webserver, upon which the system may react.

It is possible to mimick this with a python programme that sends the same stream to the webserver. This programme looks like:

from urllib import request, parse
url='http://62.131.51.129/load_pr/password.php'
parms = {
    'txtUsername' : 'tom',
    'txtPassword' : 'binvegni'
        }
querystring = parse.urlencode(parms)
u = request.urlopen(url, querystring.encode('ascii'))
resp = u.read()
print (resp)

This stream is sent to the webserver that runs on http://62.131.51.129. It sends the same stream of bytes as requested from script http://62.131.51.129/load_pr/password.php as it contains two parameter values (‘txtUsername’, ‘txtPassword’).
We call this Python programme with “D:\Users\tmaanen\Documents>python sendPost.py”. The answer received is:


This matches the stream as sent by the webserver as from the php script.

Finally, in case of get instead of post, the programme is slightly different:

from urllib import request, parse
url='http://62.131.51.129/load_pr/password.php'
parms = {
    'txtUsername' : 'tom',
    'txtPassword' : 'bunvegni'
        }
querystring = parse.urlencode(parms)
u = request.urlopen(url+'?'+querystring)
resp = u.read()
print (resp)

Another Pyspark scripts

In this note, I show yet another Pyspark with slightly different methods to filter. The idea is that file is read in a RDD. Subsequently, it is cleaned. That cleaning process involves a removal of lines that are too long. The lines are split with a character that is on the twentieth position. Then the lines with a number of elements unequal to 14 are filtered out. After that, some columns are kept. Finally, the values are concatenated into a line and written to disk.
We have:

devstatus = sc.textFile("/loudacre/devicestatus.txt")

# Filter out lines with < 10 characters, use the 20th character as the delimiter, parse the line, and filter out bad lines
cleanstatus = devstatus. \
    filter(lambda line: len(line) > 20). \
    map(lambda line: line.split(line[19:20])). \
    filter(lambda values: len(values) == 14)
    

# Create a new RDD containing date, manufacturer, device ID, latitude and longitude
devicedata = cleanstatus. \
    map(lambda words: (words[0], words[1].split(' ')[0], words[2], words[12], words[13]))

# Save to a CSV file as a comma-delimited string (trim parenthesis from tuple toString)
devicedata. \
    map(lambda values: ','.join(values)). \
    saveAsTextFile("/loudacre/devicestatus_etl")

A python script with many steps

Pyspark is the python language that is applied to spark. It therefore allows a wonderful merge between spark with its possibilities to circumvent the limitation that are set by the mapreduce framework and python that is relatively simple.

In the scheme below, some steps are shown that might be used.

sc.textFile allow to read a file and process its as a RDD (resilient distributed dataset). This stands for a dataset that is distributed over nodes and which can be recreated fast.

 

flatMap allows to create multiple lines from one line.

Map processes one line. From one word, two fields are created: the original word and a field with the length of a word.

filter allows to filter the lines.

groupByKey aggregates the lines by the first field that acts as a key.

map then translates the aggregate into something that is human readable.

collect displays the results.

 

The 1000th wordcount example

I just discovered the 1000th wordcount example. It is based on Pyspark. The idea is actually quite simple. One creates a script. This script can be written in any editor. The programme can then be run from the terminal by spark-submit [programme]. As an example, one may start the programme below with: spark-submit –master yarn-cluster prog1.py /user/hdfs/fam /user/hdfs/prog1A. The option –master yarn-cluster indicates how the programme is run. In this case the cluster is used. It can also be run locally; in that case –master local is used.

import sys
from pyspark import SparkContext
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print >> sys.stderr, "Usage: WordCount <file>"
        exit(-1)
sc = SparkContext()
counts = sc.textFile(sys.argv[1]) \
.flatMap(lambda line: line.split(',')) \
.map(lambda word: (word,1)) \
.reduceByKey(lambda v1,v2: v1+v2)
counts.saveAsTextFile(sys.argv[2])
sc.stop()

Joining files with Pyspark

Pyspark allows us to process files in a big data/ Hadoop environment. I showed in another post how Pyspark can be started and how it can be used.
The concept of Pyspark is very interesting. It allows us to circumvent the limitations of the mapreduce framework. Mapreduce is somewhat limiting as we have two steps: the map phase where records are processed and a reduce phase where aggregates are processed.
Pyspark has more flexibility as we may mix both phases: first a mapping, then reduce, then going back to mapping.

To show how it works, we start with two files. One file is called “namen” and it has next content:

tom,1
ine,2
paula,3
stella,4
bart,5

A second file is called leeftijd with subsequent content:

1,59
2,58
3,28
4,26
5,22

The two files both have a key (‘1′,’2’,’3’etc.). In the first file, we have a name with the key. In the second file, we have a key with the age attached.
The purpose of my exercise is a join the two files on their key. The results should be a file that contains the name and the age. The programme reads like:

namen=sc.textFile('/user/hdfs/namen')
namen1 = namen.map(lambda line: line.split(',')).map(lambda fields: (fields[1],(fields[1],fields[0])))
leeftijd = sc.textFile('/user/hdfs/leeftijd')
leeftijd1 = leeftijd.map(lambda line: line.split(',')).map(lambda fields: (fields[0],(fields[0],fields[1])))
samen=leeftijd1.join(namen1)
samen1=samen.map(lambda x: (x[1][1][1],x[1][0][1]))
samen1.saveAsTextFile('/user/hdfs/naamleeftijd')

Pyspark works with RDD. Very roughly stated, these are tables with rows that may contain data elements or structures with data elements. On these RDD, we have mapping and filter functions that allow to modify a RDD. There are several ways to create such RDD. One possibility is to make use of the “sc” object. It has a method textFile that allows to read a file and store the content as a RDD.

When a file is read, each line is an element. As we have a key and a name in the file namen, we would like to be able to access the key and the name as two separate elements. So the line is split into two fields. in a subsequent step, the key is separated from the remainder of the line.

The same exercise is repeated for the leeftijd file. Also there, we would like to have a key and something that contains the remainder of the values.

We then have 2 RDD, both with a key and additional information. That can then be joined into a new RDD.

We then take the relevant items and write the content to a file on Hadoop.