Getting a histogram from Big Data with Scala

Scala can be used as a tool to manipulate big data. If it is used in the spark context, we have a possibility to combine two strong tools: spark with its possibility to bypass the MapReduce bottleneck and Scala with its short learning curve.

The idea that Scala can be closely integrated with Spark is already clear when Scala is started. It can be started with the command “spark-shell” from the terminal. After a few seconds the Spark sign is shown with the note “Using Scala version 2.10.4”. A few lines below, one sees “Spark context available as sc”. This is the statement that we will use.

We have quite a number of files in an HDFS directory. They can be made accessible as a RDD via:

val rawblocks = sc.textFile("/linkage")

Such a RDD is a dataset that is composed of a number of lines. The number of lines can be seen with rawblocks.count() that yields: res63: Long = 5749142.
To do anything with this RDD, we must 1/ remove lines that are used as header, 2/ split the lines and 3/identify numbers as numbers. Let us do so.

Removal of lines that act as a header can be done with this function:

def isHeader(line: String): Boolean = {
      line.contains("id_1")
      }

followed by

val noheader = rawblocks.filter(x => !isHeader(x))

Splitting the lines and identifying numbers as numbers is a bit cumbersome, but the code looks pretty straightforward:

def toDouble(s:String) = {
      if ("?".equals(s)) Double.NaN else s.toDouble
      }

case class MatchData(id1: Int, id2: Int,
      scores:Array[Double],matched: Boolean)

def parse(line: String) = {
      val pieces = line.split(',')
      val id1 = pieces(0).toInt
      val id2 = pieces(1).toInt
      val scores = pieces.slice(2, 11).map(toDouble)
      val matched = pieces(11).toBoolean
      MatchData(id1, id2, scores, matched)
      }

followed by 

val parsed = noheader.map(line => parse(line))

Then we are ready to create the histogram:

val matchCounts = parsed.map(md => md.matched).countByValue()
val matchCountsSeq = matchCounts.toSeq
matchCountsSeq.foreach(println)

 
Finally a less complicated script that calculates the mean of a column:

val famblocks = sc.textFile("/user/hdfs/fam")
case class MatchData(id1: Int, id2: String)
def parse(line: String) = {
      val pieces = line.split(',')
      val id1 = pieces(0).toInt
      val id2 = pieces(1).toString
      MatchData(id1,id2)
      }
val parsed = famblocks.map(line => parse(line))
val matchCounts = parsed.map(md => md.id1).mean()

with fam an HDFS file containing lines with a number and name, comma separated. (1,tom etc).