Monthly Archives: February 2017

Create a description of a system

From time to time, I must provide a scheme on how a system works. It is not really to find a technique in which all details can be shown. I was to make a differentiation between processes, like sending mail, databases and applications. I also want to indicate what techniques are used. Moreover, I would like to indicate which elements belong to each other. I then misuse the SDL stencil in Visio. This allows to make such distinction. See the figure below:
. There within the arrows, the techniques can be shown. I also use the separated processes to indicate which applications are used with the platform on which they run. Finally this stencil has a diagramme for databases which is most convenient. Another example can be found here

ElasticSearch: Restful services

As we have seen in a previous post, we communicate with the ElasticSearch server via messages that are sent to a server. On the other hand, the server responds in messages that are received by the client. This system of messages are labelled as s “RESTful” structure. This RESTful structure is based om messages that contain all information necessary.

Such message from a client is composed of 3 elements:

  1. The host that is addressed
  2. The action that is required from the host
  3. All information necessary to be processed by the host

This information is stored in one message only. This message is complete: it is known by the system which node needs to do what with all necessary information. As an example, let us see how data are stored in ElasticSearch. Within the message, we know which node governs the ElasticSearch database; we know what needs to be done (PUT indicates data must be stored) and with the accompanying json file, se know the data that must be stored.

The server responds with a message that contains the result of the action, whether the action was successful or not along with additional information if required by the client.


The advantages of this structure is that it provides a stateless situation: the client messages do not depend on other client messages. This allows client messages to be handled in an arbitrary sequence.

This also allows decoupling: if the server decides to change its programme or if it is decided to distribute the programme over different nodes, no change in the messages are needed. Doing so, it is possible to scale up the server without interruption on the client side. Doing so, complexity is reduced as only the server layers need to be adjusted.






Curl and elasticSearch

One of the most useful utilities is “curl”. This wonderful tool can be used to transfer data from one platform to another. It is relatively easy to install in Windows, whereas under linux, it is often already installed. It must be run from the terminal in Linux or the command line in Windows. One example is transfer of data from a webserver. But other protocols can be used as well: ftp, file, pop3 etc.


An example is given below. One sees the command line that is used to invoke the curl utility. Curl is invoked by typing “curl”. The first parameter is the host that must be accessed. In this case I will access a webserver from which the data are retrieved. In this case, I will access, my own website. Doing so, I transfer the html that is generated from the webserver.



A similar statement can be user to retrieve data from a FTP server. Take as example: curl –user . This will open a FTP server.

We may use curl to add data into ElasticSearch. Doing so, we transfer data from a local platform to the ElasticSearch database. We use this command from the windows platform: curl -XPUT “”  -d”{“”title””:””TestTom”” , “”director””:””Tom van Maanen””,””year””: 1957}”. Note: from linux the aspostophes are different. There the command should be curl -XPUT ‘′  -d'{“title”:”TestTom” , “director”:”Tom van Maanen”,”year”: 1957}’. On windows, we see:


The reaction from ElasticSearch (“created”:true) indicates that the record is correctly inserted.


In a similar exercise, we may retrieve records from the ElasticSearch database. We use: curl -XGET “” -d “{“”query”” : {“”match”” : {“”title”” : “”TestTom””}}}”.


We get something like:



We see the same records that we inserted.


A new and popular nosql database is the Elastic Search database. This database is easy to install en easy to run.
But is it easy to insert data and extract the outcomes?
The principle of inserting data into ElasticSearch looks rather straight forward. One inserts json files. On the other hand, with filters, one may retrieve the outcomes as a json file.
This then shows the strength of ElasticSearch. With json files, one has the possibility to add an additional category without the necessity to add anything to the structure. This is different from the traditional RDBMS. In that case, one must modify the structure of a table in one would to add an additional type of data.
But is it easy?
Well, that depends on what you understand by “easiness”. One must be able to address the webserver that comes with ElasticSearch. One must also be able to inject a json file with an address to the webserver.
The script below privides a way how this can be done with PHP.



In the start of the script, the json file is set up. At the end of script the json file is inserted. Moreover, the script also shows how data can be retrieved. One must use an address that contains the index and doctype. As example, one may look at:,year . The index is inserted directly after the adres, and the doc_type follows directly after. After the _?search, a field is provided that acts as filter to limit the number of records retrieved. Here. only records are returned for which director:”Zoek”.

A final remark. HTML allows to insert parameters that can be used to add dynamic content. See below: $Title and $Year are sent as parameters to next page.


Scala merging files

In a previous post, I showed how two files can be merged in Scala. The idea was that RDDs were translated as data frames and a join was undertaken on these.
In this post, the philosophy is slightly different. Now the RDD is rewritten as a key-value pair with a unique key. This then allows a merge on this unique key.
Let us first see how a RDD can be created with a unique key:

val counts = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,1)).reduceByKey((v1,v2) => v1+v2)

A file is read (“keyvalue”) that is subsequently split along their comma. Each word is then rewritten as an own record. If the original file contains 6 words, we end up having 6 records. We then create a new RDD with a “1” added to each record. Subsequently the word is seen as a key. The “1” is then aggregated over the records. This result could be used as a wordcount example.
I created a similar RDD (“counts1”) that also had the words as a key.

val counts1 = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,2)).reduceByKey((v1,v2) => v1+v2)

The join can then be undertaken as:

val pipo = counts1.join(counts)

The outcomes can be shown as pipo.foreach(println).

And a similar scripts runs as

val kbreqs = sc.textFile("/user/hdfs/keyuser").filter(line => line.contains("KBDOC")).keyBy(line => (line.split(' ')(5)))
val kblist = sc.textFile("/user/hdfs/keydoc").keyBy(line => (line.split(':')(0)))
val titlereqs = kbreqs.join(kblist)

A final script:

val logs=sc.textFile("/user/hdfs/naamtoev")
val userreqs = => line.split(' ')).map(words => (words(0),1)).reduceByKey((v1,v2) => v1 + v2)
val accountsdata="/user/hdfs/naamstraat"
val accounts = sc.textFile(accountsdata).keyBy(line =>  line.split(',')(0))  
val accounthits = accounts.join(userreqs)
for (pair <- accounthits) {printf("%s, %s, %s, %s\n",pair._1,pair._2._1," score= ",pair._2._2)}

Scala and RDDs

RDDs are the basic unit in Scala on Spark. The abbreviation stands for Resilient Distributed Dataset, This shows that we are talking on full data sets that are stored persistently on a distributed network. So the unit of work is comparable to a table. We have two different operations on this RDD. These are a filter, where some rows are left out and a map where the rows are manipulated. Below, I collected two sets of examples of scala statements on such RDD. This might then be used as a cookbook for future use.

The mapping functions:

val mydata_uc = => line.toUpperCase())
val myrdd2 = => JSON.parseFull(pair._2).get.asInstanceOf[Map[String,String]])
val myrdd2 = => line.split(' '))
val myrdd2 = mydata.flatMap(line => line.split(' '))
var pipo = => line.length)
var pipo = => (line.split(' ')(0),line.split(' ')(2)))
val users = sc.textFile("/user/hdfs/keyvalue").map(line => line.split(',')).map(fields => (fields(0),(fields(0),fields(1)))) 
val users = sc.textFile("/user/hdfs/keyvalue").keyBy(line => line.split(',')(0))
val counts = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,1))
val counts = sc.textFile("/user/hdfs/keyvalue").flatMap(line => line.split(',')).map(fields => (fields,1)).reduceByKey((v1,v2) => v1+v2)
val avglens = sc.textFile("/user/hdfs/naamtoev").flatMap(line => line.split(" ")).map(word => (word,word.length)).groupByKey().map(pair => (pair._1, pair._2.sum/pair._2.size.toDouble)) 

The filter function:

val mydata_uc = mydata.filter(line => line.startsWith("I"))
var jpglogs = logs.filter(line => line.contains(".jpg"))

Merging files in Scala

I understand that Scala may be used in an ETL context. In ETL, an important element is the merge of two files. We will get data from different sources and they must be merged in one file only. As an example, we may think of two files, one containing a number and a name, another being a file with a number and age.
In a previous post, I showed how two files can be merged in PySpark. In this post, I will show how it can be undertaken within Scala.
In my first step, I read two files. I then split the lines along the delimiter. Subsequently, I identify the different columns:

val namen=sc.textFile("/user/hdfs/namen")
case class MatchData(id1: String, id2: Int)
def parseNamen(line: String) = {
      val pieces = line.split(',')
      val id1 = pieces(0).toString
      val id2 = pieces(1).toInt
val parsedNamen = => parseNamen(line)).toDF()

val leeftijd = sc.textFile("/user/hdfs/leeftijd")
case class MatchLeeftijd(id3: Int, id4: Int)
def parseLeef(line: String) = {
      val pieces = line.split(',')
      val id3 = pieces(0).toInt
      val id4 = pieces(1).toInt
val parsedLeef = => parseLeef(line)).toDF()

I can then join the two objects with:

val samen=parsedLeef.join(parsedNamen,parsedNamen("id2") === parsedLeef("id3"))

The idea is that a column can be accessed with parsedNamen(“id2”) or parsedLeef(“id3”). For me, the non trivial item is how these columns are compared. A symbol like === is used for that purpose.
An alternative is a Carthesian product that is subsequently filtered:

val samen=parsedLeef.join(parsedNamen)
samen.filter("id2-id3 = 0").show()