Map only job
As we remember, MapReduce consists of stages of Map, Shuffle and Reduce. As a rule, in practical problems of the heaviest there is Shuffle stage as at this stage there is a sorting of data. Actually there is a number of tasks in which it is possible to manage only Map stage. Here examples of such tasks:
- Data filtering (for example, "Naiti all records from the IP address 188.8.131.52" in Web server logs);
- Data transformation ("To delete a column in csv-logs");
- Loading and unloading of data from an external source ("To insert all records from a log into the database").
Such problems are solved by means of Map-Only. During creation of Map-Only of a task it is necessary to specify zero quantity of reducer'ov in Hadoop:
Example of a configuration of map-only of a task on hadoop:
|Native interface||Hadoop Streaming Interface|
|To specify zero quantity of redyyuser at job’a configuration:
More unrolled example according to the link.
|We do not specify a redyyuser. Example:
Map Only jobs actually can be very useful. For example, in the Facetz.DCA platform for detection of characteristics of users on their behavior one big map-only which each mapper accepts on user login is used and on an output gives its characteristics.
As I already wrote, usually the heaviest stage at execution of Map-Reduce of a task is a stage of shuffle. There is it because intermediate results (mapper’a output) register in a disk, are sorted and transferred on a network. However there are tasks in which such behavior seems not really reasonable. For example, in the same problem of calculation of words in documents it is possible to predagregirovat previously results of outputs of several mapper'ov on one map-reduce node of a task and to transfer to reducer already summed up values on each machine.
In hadoop for this purpose it is possible to define the combining function which will process an output of part of mapper-. The combining function is very similar to reduce – it accepts an output of part of mapper'ov on an input and issues the aggregated result for these mapper'ov therefore very often reducer use and as combiner. Important difference from reduce – on the combining function not all values corresponding to one key get.
Moreover, hadoop does not guarantee that the combining function in general will be executed for mapper’a output. Therefore the combining function is not always applicable, for example, in case of search of median value in a key. Nevertheless, in those tasks where the combining function is applicable, its use allows to achieve an essential gain to the speed of execution of a MapReduce-task.
Use of Combiner'a on hadoop:
|Native Interface||Hadoop streaming|
|At a configuration of job-a to specify a class-Combiner. As a rule, it matches Reducer:
||To specify command in command line options - combiner. As a rule, this command matches the reducer'a command. Example:
Chains of MapReduce-tasks
There are situations when for a task solution one MapReduce not to manage. For example, we will consider a little modified problem of WordCount: there is a set of text documents, it is necessary to consider how many words from 1 to 1000 times in a set, how many words from 1001 to 2000, how many from 2001 to 3000 and so on met.
For a solution we will need 2 MapReduce job'a:
- Modified wordcount which for each word will calculate, to what of intervals it got;
- MapReduce counting how many time in an output of the first MapReduce met each of intervals.
Solution on a pseudo-code:
To execute sequence of MapReduce-tasks on hadoop, it is rather simple as input data for the second task to specify the folder which was specified as output for the first and to start them in turn.
In practice of a chain of MapReduce-tasks can represent rather difficult sequences in which MapReduce-tasks can be connected as it is consecutive, and in parallel each other. For simplification of management of such execution plans of tasks there are separate oozie and luigi tools to which separate article of this cycle will be devoted.
The important mechanism in Hadoop is Distributed Cache. Distributed Cache allows to add files (for example, text files, archives, jar-files) to an environment in which the MapReduce-task is executed.
It is possible to add the files which are stored on HDFS, local files (local for that machine with which start of a task is executed). I already implicitly showed how to use Distributed Cache together with hadoop streaming: adding through an option - file the mapper.py and reducer.py files. It is actually possible to add not only mapper.py and reducer.py, and in general any files, and then to use them as though they are in the local folder.
Use of Distributed Cache:
Those who got used to work with relational bases often use very convenient operation Join allowing to process jointly contents of some tables, having integrated them on some key. During the work with big data such task too sometimes arises. Let's review the following example:
Are available a log of two Web servers, each log has the following appearance:
<timestamp>\t<ip>\t<url>. Example of a piece of a log:
1446792139 184.108.40.206 /sphingosine/unhurrying.css 1446792139 220.127.116.11 /accentually.js 1446792139 18.104.22.168 /pyroacid/unkemptly.jpg 1446792139 22.214.171.124 /Chawia.js 1446792139 126.96.36.199 /morphographical/dismain.css 1446792139 188.8.131.52 /phanerite.php 1446792139 184.108.40.206 /bisonant.css
It is necessary to count for each IP address on what of 2 servers it came more often. The result has to be presented in the form:
<ip>\t<first or second>. Example of part of result:
220.127.116.11 first 18.104.22.168 second 22.214.171.124 second 126.96.36.199 first
Unfortunately, unlike relational databases, generally consolidation of two logs on a key (in this case – to the IP address) represents rather heavy operation and is solved by means of 3 MapReduce and a pattern of Reduce Join:
ReduceJoin works as follows:
- On each of input logs separate MapReduce (Map only) transforming input data to the following type is started:
key -> (type, value)
Where key is a key on which it is necessary to integrate tables, Type – table type (first or second in our case), and Value are the any optional data attached to a key.
- Outputs of both MapReduce move on an input to the 3rd MapReduce which, actually, and executes consolidation. This MapReduce contains empty Mapper which just copies input data. Further shuffle decomposes data on keys and gives on an input to a redyyuser in a type:
key -> [(type, value)]
It is important that records from both logs get to this moment on a redyyuser and at the same time across the field of type it is possible to identify from what of two logs specific value got. Means data enough to solve an initial problem. In our case reducere just has to count for each key of records what type met more and to display this type.
The pattern of ReduceJoin describes the general case of consolidation of two logs on a key. However there is a special case at which the task can be simplified and accelerated significantly. It is a case at which one of logs has the size significantly of the smaller size, than another. Let's consider the following task:
There are 2 logs. The first log contains a log web cервера (same as in the previous task), the second file 100 kb in size) contains compliance of URL-> Subject. Example of the 2nd file:
/toyota.php auto /football/spartak.html sport /cars auto /finances/money business
For each IP address it is necessary to calculate what pages of category from this IP address were loaded most often.
In this case we need to execute Join of 2 logs on URL too. However in this case it is not obligatory for us to start 3 MapReduce as the second log completely will get into memory. To solve a problem by means of the 1st MapReduce, we can load the second log into Distributed Cache, and at initialization of Mapper'a just to consider it in memory, having put it in the dictionary-> topic.
Further the problem is solved as follows:
# находим тематику каждой из страниц первого лога</em> input_line -> [ip, topic]
Ip -> [topics] -> [ip, most_popular_topic]
Reduce receives on an input of ip and the list of all subjects, just calculates what of subjects met most often. Thus the task is solved by means of the 1st MapReduce, and actually Join in general occurs in map (therefore if additional aggregation on a key was not necessary – it would be possible to manage MapOnly job-ohm):
In article we considered several patterns and acceptances of a solution of tasks by means of MapReduce, showed how to integrate MapReduce-tasks in chains and to join-it a log on a key.
In the following articles we will consider architecture of Hadoop, and also the tools simplifying work with MapReduce and allowing to bypass its shortcomings in more detail.
Links to other articles of a cycle
" Big Data from And to I. Part 1: Principles of work with big data, MapReduce paradigm
" Big Data from And to I. Part 2: Hadoop
This article is a translation of the original post at habrahabr.ru/post/270453/
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here: firstname.lastname@example.org.
We believe that the knowledge, which is available at the most popular Russian IT blog habrahabr.ru, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.