Scala merging files

By | February 6, 2017

In a previous post, I showed how two files can be merged in Scala. The idea was that RDDs were translated as data frames and a join was undertaken on these.
In this post, the philosophy is slightly different. Now the RDD is rewritten as a key-value pair with a unique key. This then allows a merge on this unique key.
Let us first see how a RDD can be created with a unique key:

val counts = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,1)).reduceByKey((v1,v2) => v1+v2)

A file is read (“keyvalue”) that is subsequently split along their comma. Each word is then rewritten as an own record. If the original file contains 6 words, we end up having 6 records. We then create a new RDD with a “1” added to each record. Subsequently the word is seen as a key. The “1” is then aggregated over the records. This result could be used as a wordcount example.
I created a similar RDD (“counts1”) that also had the words as a key.

val counts1 = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,2)).reduceByKey((v1,v2) => v1+v2)

The join can then be undertaken as:

val pipo = counts1.join(counts)

The outcomes can be shown as pipo.foreach(println).

And a similar scripts runs as

val kbreqs = sc.textFile("/user/hdfs/keyuser").filter(line => line.contains("KBDOC")).keyBy(line => (line.split(' ')(5)))
val kblist = sc.textFile("/user/hdfs/keydoc").keyBy(line => (line.split(':')(0)))
val titlereqs = kbreqs.join(kblist)

A final script:

val logs=sc.textFile("/user/hdfs/naamtoev")
val userreqs = logs.map(line => line.split(' ')).map(words => (words(0),1)).reduceByKey((v1,v2) => v1 + v2)
val accountsdata="/user/hdfs/naamstraat"
val accounts = sc.textFile(accountsdata).keyBy(line =>  line.split(',')(0))  
val accounthits = accounts.join(userreqs)
for (pair <- accounthits) {printf("%s, %s, %s, %s\n",pair._1,pair._2._1," score= ",pair._2._2)}