Developers Club geek daily blog

1 year, 6 months ago
Scalding: an occasion to pass with Java to Scala

In this article I will tell about Twitter Scalding – a framework for data handling process description in Apache Hadoop. I will begin from far away, with history of frameworks over Hadoop. Then I will give the overview of opportunities Scalding. In end I will show the code samples available to understanding to that who knows Java, but is almost not familiar with Scala.

Interestingly? Went!

It is simpler, than MapReduce


When MapReduce paradigm only arose, it was the disruptive step to simplification of development of distributed computing. However understanding soon came that to write manually mapper'y and reducer'y is very tiresome. For acceleration of development there were high-level superstructures over Map/Reduce – Pig, Hive and Cascading, and then and others. Let's dwell upon the last.

Cascading is a Java-framework for process description of data handling, so-called workflow. After the description of Cascading analyzes workflow like query parsers in DBMS, builds the execution plan in the form of sequence of map/reduce of tasks and sends them to Hadoop cluster, independently managing their start and intermediate data. Unfortunately, Cascading operates with rather low-level abstractions therefore in popularity it lost long time to other mechanisms of data handling.

The successful way out was found of this situation. Twitter adapted Cascading under the needs and enveloped its abstractions in means, traditional for Scala. So Scalding – Scala a framework over Cascading'a was born. Here it is possible to make lyrical digression and to talk about Scala.

Lyrical digression about Scala
Scala is difficult. On its applicability in industrial development there are terrible fights. In this delicate question I, perhaps, will adjoin conservative supporters of Java. But I am forced to recognize that there are things which in Scala it turns out it is better to do, than in other languages, namely: to process data streams and to make interline interaction. For the dzhavist not familiar with Scala, has to note that work with collections, line processing and a funktsionalshchina become in Scala easily and easy. Familiar to you Java Stream API and java.util.functional are the sad pale copy of standard means of Scala.

So, attempt to apply standard approaches of Scala to the description of workflow of data handling was crowned with success, and Scalding have a chance to catch up in popularity of Hive, Pig and their numerous modern analogs. It turned out is so good that for the sake of it even it makes sense to learn Scala, than we now also will be engaged.

Introduction to Scalding


Now I consciously will pass everything, as for the internal Scalding and Cascading device. Let's consider that it is a black box with the nice interface which considers for us some data. If everything develops, then there will be one more article about the internal device of these frameworks.

For those who are not familiar with Scala
The declaration of type goes through a colon after a name of a variable or function.

The train (tuple) is several objects, fastened together. A classical example of a train — Pair, Triple, etc. In Scala they — part of language. Trains are written in brackets through a comma.

Generics are written in square brackets, but not to angular.

val longConstant: Long = 0L // final long longConstant = 0L;
var list: List[Int] // List<Integer> list;
(String, Int) // Pair<String, Integer>


Flat operations


The basic concept in Scalding is Pipe. Pipe is a pipeline from which towards to the programmer data fly. In fact, it is similar to Stream from Java 8. The first implementation of Pipe did not support typification, but it lasted not for long. Fans of strict typification thought up TypedPipe – the pipeline with objects of strictly set type, generic, in the terms Java.

For TypedPipe some standard operations over a flow – map, flatMap, filter, limit and others are defined. All this flat operations over a flow, theoretically, them can be executed effectively with unlimited parallelism and on any data volume.

Data in TypedPipe have to from where to be read. For this purpose in Scalding'e there is Source – data source. Its only purpose – to generate Pipe or TypedPipe. There are several ready sources, the majority of them – reading from files in different formats, but there is also a possibility of reading from any iterator (so, and from a collection in memory) and, of course, an opportunity to define the sources. What is important, the same Cascading and Scalding code works both at Hadoop cluster, and at local data, and it is very convenient for testing.

When all operations are done, there comes time to save data. Sink – the completing part of the pipeline is responsible for disk writing in Scalding. Sink'i are similar to Source'am, frequent it is the same class implementing two interfaces.

The grouping operations


MapReduce gives us the chance to carry out reorganization of the flow represented to TypedPipe. First of all, it is operation of the groupBy grouping which groups records from all flow in a key, analog of GROUP BY in SQL. After the TypedPipe [V] grouping takes the special Grouped form [K, V] over which additional operations become available.

First, by means of the mapGroup and mapValuesStream methods it is possible to receive the Grouped elements [K, V] in the form of couple from a key of K on which there was a grouping, and an iterator on all values V which to it suited a key. Any functions of the Scala collections are applicable to an iterator on values. But usually it is not even required since Grouped have many functions shortcuts which cover the most popular cases.

Secondly, Grouped can be sorted the operation sortBy. After that mapGroup, mapValuesStream and all their derivatives are also applicable to it.

Thirdly, Grouped [K, V1] can integrate (join) with other Grouped [K, V2]. Here the same rules work that in relational databases – leftJoin, rightJoin, join (inner), outerJoin (full) are available. On an output Grouped turns out [K, (V1, V2)].

It should be noted that when not grouped flow contains TypedPipe vapors [(K, V)], it is possible to apply the operation hashJoin to it. It is similar to normal Grouped.join, but becomes in memory. It well works for enrichment of data from small reference books, but for big tables can lead to OOM.

Grouped can be transformed back to TypedPipe the operations toTypedPipe, keys or values. The first will save both a key, and value, the others will return something one.

Scalding by example


Now, after the overview of the main opportunities of a framework let's look how it works, on an example.

Let's say we the RTB platform, and at us are history of clicks of our users on urla on the observed websites. History is presented in the huge TSV file with three columns – URL, Timestamp and UserId.

Also we have a marking of the websites on subjects. At us, at most, thousands not really are a lot of websites. All marking is located in the small TSV file with columns – Domain and Topic.

We want to understand how often the user switches between subjects. For this purpose we need to leave in the history of clicks only those events when the user passes from the website of one subject to the website another.

Let's write a code which will do for us this transformation. We will not consider infrastructure of start. If it is interesting, the complete code of an example is available on github.

In Scala it is possible to set aliasa for types. It is convenient since will allow to distinguish to us one String from another in declarations of types.

type Domain = String
type UserId = String
type Topic = String
type Url = String
type Timestamp = Long

Now we will declare classes from domain model:

case class Click(url: Url, ts: Timestamp, userId: UserId)
case class SiteInfo(domain: Domain, topic: Topic)

Case class in Scala is a convenient method of the description of classes for unchangeable values. From it the designer, heteras and an other same code is automatically generated.

Let's read the table with cliques:

val clicksPipe: TypedPipe[Click] =
  TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks))
    .map(tuple => Click.tupled(tuple))

Here we declared a source – the typified TSV with type columns (String, Long, UserId). Then we enveloped this source in TypedPipe. Further, for convenience, we converted trains from three columns (Url, Timestamp, UserId) in objects of the class Click.

TypedPipe[Click] turned out.

Let's leave from url only domains.

def url2domain(url: Url): Domain = {
  return new URL(url).getHost
}
val domainsPipe: TypedPipe[Click] =
  clicksPipe
    .map(click => click.copy(url = url2domain(click.url)))

Let's read the reference book where domains are separated on subjects, and at once we will group it in the type suitable for hashJoin.

val sitesGroupByDomain: Grouped[Domain, SiteInfo] =
  TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites))
    .map(tuple => SiteInfo.tupled(tuple))
    .groupBy(siteInfo => siteInfo.domain)

Let's add to a flow of clicks information on subjects of the websites. For this purpose sdzhoyny a flow of clicks with the reference book of domains.

val clicksWithSiteInfo: TypedPipe[(Domain, (Click, SiteInfo))] =
  domainsPipe
    .map(click => (click.url, click))
    .hashJoin(sitesGroupByDomain)

Let's group a flow of clicks in users and we will sort by a click taymstemp. Besides, we are not interested in information on the domain any more, there is enough only information on subject of the website. For this purpose we will enter the auxiliary class reflecting active interest of the user in subject in a timepoint.

case class TopicActivity(topic: Topic, ts: Timestamp, userId: UserId)

val topicActivityStreamPerUser: SortedGrouped[UserId, TopicActivity] =
  clicksWithSiteInfo
    .map(tuple => {
      val (domain, (click, siteInfo)) = tuple
      TopicActivity(siteInfo.topic, click.ts, click.userId)
    })
    .groupBy(activity => activity.userId)
    .sortBy(activity => activity.ts)

The most difficult moment – in a flow of the user activities it is necessary to catch the moments of switching of subjects. For a hunt for switchings we will write function on Scala in Java-style. It accumulates result in ArrayBuffer (analog of ArrayList) that can potentially lead to OOM on very long stories.

def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = {
  val result = ArrayBuffer[TopicActivity]()
  var firstTs = 0l
  var lastTopic = null.asInstanceOf[Topic]
  for (activity <- activities) {
    if (firstTs == 0l || lastTopic != activity.topic) {
      result.append(activity)
      firstTs = activity.ts
      lastTopic = activity.topic
    }
  }
  result.toIterator
}

val firstTopicActivitiesPipe: TypedPipe[TopicActivity] =
  topicActivityStreamPerUser
    .mapGroup((userId, activities) => topicSwitches(userId, activities))
    .values

In a flow there were only first activities of each interest. On them it is possible to trace as focus of interest of the user during time changed. It was necessary to write result in the file.

firstTopicActivitiesPipe
  .map(activity => (activity.topic, activity.ts, activity.userId))
  .write(TypedTsv(args.required("output")))

That's all. We described nontrivial transformation of data literally in 40 lines.

Final code in scala-way


If to follow kanonichny scala-way, then the code will turn out even well. Besides, it is possible to rewrite function of search of switchings between subjects from iterative approach on functional, having cleaned use of the buffer. Now process will not fall even on an infinite input. Theoretically …

def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = {
  activities.scanLeft(Helper())((helper, activity) => {
    if (helper.topic.isEmpty || helper.topic.get != activity.topic) {
      Helper(Some(activity.topic), activity.ts, true)
    } else {
      Helper(helper.topic, helper.firstTs, false)
    }
  }).filter(_.firstVisit).map(helper => TopicActivity(helper.topic.get, helper.firstTs, userId))
}

TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks))
  .map(tuple => Click.tupled(tuple))
  .map(click => click.copy(url = new URL(click.url).getHost))
  .map(click => (click.url, click))
  .hashJoin(
    TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites))
      .map(tuple => SiteInfo.tupled(tuple))
      .groupBy(_.domain)
  )
  .map({case (_, (click, siteInfo)) => TopicActivity(siteInfo.topic, click.ts, click.userId)})
  .groupBy(_.userId)
  .sortBy(_.ts)
  .mapGroup(topicSwitches)
  .values
  .write(TypedTsv(outputPath))

In the following articles I will sort questions of the organization of a code of line data handling and their testing. And, in the end, I will tell how all this works from within.

Disclaimer
I wrote an example code to the most clear to the Java-programmer, avoiding any magic conversions and not saving on baytika. It is necessary to show that it is possible to add a few Scala to ETL processes quickly and without serious consequences. The code turned out not optimum. If you know a method to write more effectively – you the good fellow.



Resources


Complete code of an example on github
Scalding Wiki
Book "Programming MapReduce with Scalding"

This article is a translation of the original post at habrahabr.ru/post/273611/
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here: sysmagazine.com@gmail.com.

We believe that the knowledge, which is available at the most popular Russian IT blog habrahabr.ru, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.
Best wishes.

comments powered by Disqus