Developers Club geek daily blog

2 years, 11 months ago
Hi, Habr! In the previous articles we described MapReduce paradigm, and also showed as in practice to implement and execute a MapReduce-application on Hadoop stack. Came it is time to describe different acceptances which allow to use effectively MapReduce for a solution of practical tasks, and also to show some features of Hadoop which allow to simplify development or it is essential to accelerate execution of a MapReduce-task on a cluster.

Big data from And to I. Part 3: Acceptances and strategy of development of MapReduce-applications

Map only job


As we remember, MapReduce consists of stages of Map, Shuffle and Reduce. As a rule, in practical problems of the heaviest there is Shuffle stage as at this stage there is a sorting of data. Actually there is a number of tasks in which it is possible to manage only Map stage. Here examples of such tasks:

  • Data filtering (for example, "Naiti all records from the IP address 123.123.123.123" in Web server logs);
  • Data transformation ("To delete a column in csv-logs");
  • Loading and unloading of data from an external source ("To insert all records from a log into the database").

Such problems are solved by means of Map-Only. During creation of Map-Only of a task it is necessary to specify zero quantity of reducer'ov in Hadoop:

Big data from And to I. Part 3: Acceptances and strategy of development of MapReduce-applications
Map Only Job

Example of a configuration of map-only of a task on hadoop:
Native interface Hadoop Streaming Interface
To specify zero quantity of redyyuser at job’a configuration:

job.setNumReduceTasks(0);

More unrolled example according to the link.
We do not specify a redyyuser. Example:

hadoop jar hadoop-streaming.jar \
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-file "mapper.py"

Map Only jobs actually can be very useful. For example, in the Facetz.DCA platform for detection of characteristics of users on their behavior one big map-only which each mapper accepts on user login is used and on an output gives its characteristics.

Combine


As I already wrote, usually the heaviest stage at execution of Map-Reduce of a task is a stage of shuffle. There is it because intermediate results (mapper’a output) register in a disk, are sorted and transferred on a network. However there are tasks in which such behavior seems not really reasonable. For example, in the same problem of calculation of words in documents it is possible to predagregirovat previously results of outputs of several mapper'ov on one map-reduce node of a task and to transfer to reducer already summed up values on each machine.

Big data from And to I. Part 3: Acceptances and strategy of development of MapReduce-applications
Combine. It is taken according to the link

In hadoop for this purpose it is possible to define the combining function which will process an output of part of mapper-. The combining function is very similar to reduce – it accepts an output of part of mapper'ov on an input and issues the aggregated result for these mapper'ov therefore very often reducer use and as combiner. Important difference from reduce – on the combining function not all values corresponding to one key get.

Moreover, hadoop does not guarantee that the combining function in general will be executed for mapper’a output. Therefore the combining function is not always applicable, for example, in case of search of median value in a key. Nevertheless, in those tasks where the combining function is applicable, its use allows to achieve an essential gain to the speed of execution of a MapReduce-task.

Use of Combiner'a on hadoop:

Native Interface Hadoop streaming
At a configuration of job-a to specify a class-Combiner. As a rule, it matches Reducer:

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);






To specify command in command line options - combiner. As a rule, this command matches the reducer'a command. Example:

hadoop jar hadoop-streaming.jar \
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-reducer "python reducer.py"\
-combiner "python reducer.py"\
-file "mapper.py"\
-file "reducer.py"\

Chains of MapReduce-tasks


There are situations when for a task solution one MapReduce not to manage. For example, we will consider a little modified problem of WordCount: there is a set of text documents, it is necessary to consider how many words from 1 to 1000 times in a set, how many words from 1001 to 2000, how many from 2001 to 3000 and so on met.

For a solution we will need 2 MapReduce job'a:

  1. Modified wordcount which for each word will calculate, to what of intervals it got;
  2. MapReduce counting how many time in an output of the first MapReduce met each of intervals.

Solution on a pseudo-code:
#map1
def map(doc):
	for word in doc:
		yield word, 1
#reduce1
def reduce(word, values):
	yield int(sum(values)/1000), 1

#map2
def map(doc):
	interval, cnt = doc.split()
	yield interval, cnt
#reduce2
def reduce(interval, values):
	yield interval*1000, sum(values)


To execute sequence of MapReduce-tasks on hadoop, it is rather simple as input data for the second task to specify the folder which was specified as output for the first and to start them in turn.

In practice of a chain of MapReduce-tasks can represent rather difficult sequences in which MapReduce-tasks can be connected as it is consecutive, and in parallel each other. For simplification of management of such execution plans of tasks there are separate oozie and luigi tools to which separate article of this cycle will be devoted.

Big data from And to I. Part 3: Acceptances and strategy of development of MapReduce-applications
Example of a chain of MapReduce-tasks.

Distributed cache


The important mechanism in Hadoop is Distributed Cache. Distributed Cache allows to add files (for example, text files, archives, jar-files) to an environment in which the MapReduce-task is executed.

It is possible to add the files which are stored on HDFS, local files (local for that machine with which start of a task is executed). I already implicitly showed how to use Distributed Cache together with hadoop streaming: adding through an option - file the mapper.py and reducer.py files. It is actually possible to add not only mapper.py and reducer.py, and in general any files, and then to use them as though they are in the local folder.

Use of Distributed Cache:
Native API
//конфигурация Job’a
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
                             job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//пример использования в mapper-e:
public static class MapClass extends MapReduceBase  
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;
 
 public void configure(JobConf job) {
   // получаем кэшированные данные из архивов
   File f = new File("./map.zip/some/file/in/zip.txt");
 }
 
 public void map(K key, V value, 
                 OutputCollector<K, V> output, Reporter reporter) 
 throws IOException {
   // используем данные тут
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming
#перечисляем файлы, которые необходимо добавить в distributed cache в параметре –files. Параметр –files должен идти перед другими параметрами.

yarn  hadoop-streaming.jar\
	-files mapper.py,reducer.py,some_cached_data.txt\
	-input '/some/input/path' \
	-output '/some/output/path' \ 
	-mapper 'python mapper.py' \
	-reducer 'python reducer.py' \

пример использования:
import sys
#просто читаем файл из локальной папки
data = open('some_cached_data.txt').read()

for line in sys.stdin()
	#processing input
	#use data here

Reduce Join


Those who got used to work with relational bases often use very convenient operation Join allowing to process jointly contents of some tables, having integrated them on some key. During the work with big data such task too sometimes arises. Let's review the following example:

Are available a log of two Web servers, each log has the following appearance: <timestamp>\t<ip>\t<url>. Example of a piece of a log:

1446792139	178.78.82.1	/sphingosine/unhurrying.css
1446792139	126.31.163.222	/accentually.js
1446792139	154.164.149.83	/pyroacid/unkemptly.jpg
1446792139	202.27.13.181	/Chawia.js
1446792139	67.123.248.174	/morphographical/dismain.css
1446792139	226.74.123.135	/phanerite.php
1446792139	157.109.106.104	/bisonant.css

It is necessary to count for each IP address on what of 2 servers it came more often. The result has to be presented in the form: <ip>\t<first or second>. Example of part of result:

178.78.82.1	first
126.31.163.222	second
154.164.149.83	second
226.74.123.135	first

Unfortunately, unlike relational databases, generally consolidation of two logs on a key (in this case – to the IP address) represents rather heavy operation and is solved by means of 3 MapReduce and a pattern of Reduce Join:

Big data from And to I. Part 3: Acceptances and strategy of development of MapReduce-applications
General scheme ReduceJoin

ReduceJoin works as follows:

  1. On each of input logs separate MapReduce (Map only) transforming input data to the following type is started:

    key -> (type, value)

    Where key is a key on which it is necessary to integrate tables, Type – table type (first or second in our case), and Value are the any optional data attached to a key.

  2. Outputs of both MapReduce move on an input to the 3rd MapReduce which, actually, and executes consolidation. This MapReduce contains empty Mapper which just copies input data. Further shuffle decomposes data on keys and gives on an input to a redyyuser in a type:

    key -> [(type, value)]

It is important that records from both logs get to this moment on a redyyuser and at the same time across the field of type it is possible to identify from what of two logs specific value got. Means data enough to solve an initial problem. In our case reducere just has to count for each key of records what type met more and to display this type.

MapJoin


The pattern of ReduceJoin describes the general case of consolidation of two logs on a key. However there is a special case at which the task can be simplified and accelerated significantly. It is a case at which one of logs has the size significantly of the smaller size, than another. Let's consider the following task:

There are 2 logs. The first log contains a log web cервера (same as in the previous task), the second file 100 kb in size) contains compliance of URL-> Subject. Example of the 2nd file:

/toyota.php 	auto
/football/spartak.html 	sport
/cars 	auto
/finances/money 	business

For each IP address it is necessary to calculate what pages of category from this IP address were loaded most often.

In this case we need to execute Join of 2 logs on URL too. However in this case it is not obligatory for us to start 3 MapReduce as the second log completely will get into memory. To solve a problem by means of the 1st MapReduce, we can load the second log into Distributed Cache, and at initialization of Mapper'a just to consider it in memory, having put it in the dictionary-> topic.

Further the problem is solved as follows:

Map:

# находим тематику каждой из страниц первого лога</em>
input_line -> [ip,  topic]

Reduce:

Ip -> [topics] -> [ip, most_popular_topic]

Reduce receives on an input of ip and the list of all subjects, just calculates what of subjects met most often. Thus the task is solved by means of the 1st MapReduce, and actually Join in general occurs in map (therefore if additional aggregation on a key was not necessary – it would be possible to manage MapOnly job-ohm):

Big data from And to I. Part 3: Acceptances and strategy of development of MapReduce-applications
Scheme of work MapJoin

Summary


In article we considered several patterns and acceptances of a solution of tasks by means of MapReduce, showed how to integrate MapReduce-tasks in chains and to join-it a log on a key.

In the following articles we will consider architecture of Hadoop, and also the tools simplifying work with MapReduce and allowing to bypass its shortcomings in more detail.

Links to other articles of a cycle


" Big Data from And to I. Part 1: Principles of work with big data, MapReduce paradigm
" Big Data from And to I. Part 2: Hadoop

This article is a translation of the original post at habrahabr.ru/post/270453/
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here: sysmagazine.com@gmail.com.

We believe that the knowledge, which is available at the most popular Russian IT blog habrahabr.ru, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.
Best wishes.

comments powered by Disqus