Developers Club geek daily blog

3 years ago
Hi, Habr! In the previous article we have considered paradigm of parallel computings of MapReduce. In this article we will pass from the theory to practice and we will consider Hadoop – powerful tools for work with big data from Apache foundation.

In article it is described, what tools and means are included by Hadoop how to set Hadoop at itself, instructions and examples of development of MapReduce-programs for Hadoop are given.

Big Data from And to I. Part 2: Hadoop

General information about Hadoop


As it is known the paradigm of MapReduce was offered by the Google company in 2004 in the article MapReduce:      Simplified Data Processing on Large Clusters. As the offered article contained the description of paradigm, but implementation was absent – some programmers from Yahoo have offered the implementation within works on web kraulerom of nutch. In more detail the history Hadoop can be considered it in the article The history of Hadoop: From 4 nodes to the future of data

Initially Hadoop was, first of all, the tool for data storage and start of MapReduce-tasks, now Hadoop represents big stack of the technologies anyway connected with processing of big data (not only by means of MapReduce).

The main (core) Hadoop components are:

  • Hadoop Distributed File System (HDFS) – the distributed file system allowing to store information of almost unlimited volume.

  • Hadoop YARN – framework for resource management of cluster and management of tasks, including includes MapReduce framework.

  • Hadoop common

Also there is large number of the projects which are directly connected with Hadoop, but not entering Hadoop core:

  • Hive – the tool for SQL-like of requests over big data (turns SQL queries into series of MapReduce-tasks);

  • Pig – programming language for data analysis at the high level. One line of code in this language can turn into sequence of MapReduce-tasks;

  • Hbase – the columnar database implementing BigTable paradigm;

  • Cassandra – the high-performance database distributed by key-value;

  • ZooKeeper – service for the distributed storage of configuration and synchronization of changes of this configuration;

  • Mahout – library and the engine of machine learning on big data.

Separately it would be desirable to note the Apache Spark project which represents the engine for distributed processing of data. Apache Spark usually uses the Hadoop components, such as HDFS and YARN for the work, thus itself recently became more popular, than Hadoop:

Big Data from And to I. Part 2: Hadoop

From listed component separate articles of this cycle of materials for now we sort will be devoted to some how it is possible to start working with Hadoop and to put it into practice.

The Hadoop installation on cluster by means of Cloudera Manager


Earlier the Hadoop installation represented rather heavy occupation – it was necessary to configure separately each machine in cluster, to monitor that is forgotten nothing, accurately to configure monitorings. With growth of popularity of Hadoop there were companies (such as Cloudera, Hortonworks, MapR) which provide in-house assemblies Hadoop and powerful tools for management of Hadoop-cluster. In our cycle of materials we will use assembly of Hadoop from the Cloudera company.

To set Hadoop on the cluster, it is necessary to do some simple steps:

  1. To download Cloudera Manager Express on one of machines of the cluster from here;
  2. To appropriate the rights for execution and to start;
  3. To follow instructions of installation.

The cluster has to work at one of supported operating systems of linux family: RHEL, Oracle Enterprise linux, SLES, Debian, Ubuntu.

After installation you receive the console of management of cluster where it is possible to watch the set services, to add/delete services, to monitor cluster status, to edit cluster configuration:

Big Data from And to I. Part 2: Hadoop

In more detail on cluster by means of cloudera manager it is possible to examine installation process of Hadoop according to the link in the section Quick Start.

If Hadoop is going to use for "to try" – it is possible not to bother with acquisition of expensive iron and the Hadoop setup on it, and it is simple to download the preconfigured virtual computer at the link and to use the configured hadoop'om.

Start of MapReduce of programs on Hadoop


Now we will show as to start MapReduce-task on Hadoop. As task we will use classical example of WordCount which has been sorted in the previous article of cycle. To experiment on real data, I have prepared archive from accidental news from the site lenta.ru. It is possible to download archive at the link.

I will remind the formulation of task: there is set of documents. It is necessary for each word which is found in set of documents to count, how many time meets the word in set.

Solution:
Map breaks the document for words and returns set of couples (word, 1).
Reduce sums up occurrences of each word:
def map(doc): 
for word in doc.split(): 
    yield word, 1
def reduce(word, values): 
    yield word, sum(values)


Now task to program this solution in the form of code which can be executed on Hadoop and to start.

Way No. 1. Hadoop Streaming


The easiest way to start the MapReduce-program on Hadoop – to use streaming-interface Hadoop. The Streaming-interface assumes that map and reduce are implemented in the form of programs which accept data with stdin and give out result on stdout.

The program which executes the map function is called as mapper. The program which executes reduce, is called, respectively, as reducer.

Streaming assumes the interface by default that one entering line in mapper or reducer corresponds to one entering record for map.

The output of mapper'a gets on reducer'u input in the form of couples (key, value), thus all couples corresponding to one key:

  • Will be guaranteed processed by one start of reducer'a;
  • Will be given on input in a row (that is if one reducer processes some different keys – the input will be grouped in key).

So, we implement mapper and reducer on python:

#mapper.py 
import sys 
 
def do_map(doc): 
for word in doc.split(): 
    yield word.lower(), 1 
 
for line in sys.stdin: 
    for key, value in do_map(line): 
        print(key + "\t" + str(value)) 

#reducer.py 
import sys 
 
def do_reduce(word, values): 
    return word, sum(values) 
 
prev_key = None 
values = [] 
 
for line in sys.stdin: 
    key, value = line.split("\t") 
    if key != prev_key and prev_key is not None: 
        result_key, result_value = do_reduce(prev_key, values) 
        print(result_key + "\t" + str(result_value)) 
        values = [] 
    prev_key = key 
    values.append(int(value)) 
 
if prev_key is not None: 
    result_key, result_value = do_reduce(prev_key, values) 
    print(result_key + "\t" + str(result_value)) 

Data which will be processed by Hadoop have to be stored on HDFS. Let's load our articles and we will put on HDFS. For this purpose it is necessary to use the hadoop fs team:

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz 
tar xzvf lenta_articles.tar.gz 
hadoop fs -put lenta_articles

The utility of hadoop fs supports large number of methods for manipulations with file system, many of which one in one repeat standard utilities of linux. It is possible to examine its opportunities according to the link in more detail.    

Now we will start streaming-task:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\ 
 -input lenta_articles\ 
 -output lenta_wordcount\ 
 -file mapper.py\ 
 -file reducer.py\ 
 -mapper "python mapper.py"\ 
 -reducer "python reducer.py"

The utility of yarn serves for start and management of different applications (including map-reduce based) on cluster. Hadoop-streaming.jar is just one of examples of such yarn-application.

There are start parameters further:

  • input – the folder with basic data on hdfs;
  • output – the folder on hdfs where it is necessary to put result;
  • file – files which are necessary in the course of work of map-reduce of task;
  • mapper – console team which will be used for map-stage;
  • reduce – console team which will be used for reduce-stage.

After start in the console it will be possible to see progress of execution of task and URL for viewing of more detailed information on task.

Big Data from And to I. Part 2: Hadoop

In the interface available on this URL it is possible to learn more detailed status of execution of task, to look log of each mapper and redyyuser (that is very useful in case of the fallen tasks).

Big Data from And to I. Part 2: Hadoop

The result of work after successful execution develops on HDFS in the folder which we have specified in the output field. It is possible to view its contents by means of the "hadoop fs-ls lenta_wordcount" team.

The result can be received as follows:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5 
с&09 41 
что&09 43 
на&09 82 
и&09 111 
в&09 194

The "hadoop fs-text" team gives out contents of the folder in text form. I have sorted result on number of occurrences of words. As expected, the most frequent words in language – pretexts.

Way No. 2


hadoop in itself is written on java, and the native interface at hadoop-a too of java-based. Let's show how native Java application for wordcount looks:

import java.io.IOException; 
import java.util.StringTokenizer; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 
public class WordCount { 
 
    public static class TokenizerMapper 
            extends Mapper<Object, Text, Text, IntWritable>{ 
 
        private final static IntWritable one = new IntWritable(1); 
        private Text word = new Text(); 
 
        public void map(Object key, Text value, Context context 
        ) throws IOException, InterruptedException { 
            StringTokenizer itr = new StringTokenizer(value.toString()); 
            while (itr.hasMoreTokens()) { 
                word.set(itr.nextToken()); 
                context.write(word, one); 
            } 
        } 
    } 
 
    public static class IntSumReducer 
            extends Reducer<Text,IntWritable,Text,IntWritable> { 
        private IntWritable result = new IntWritable(); 
 
        public void reduce(Text key, Iterable<IntWritable> values, 
                           Context context 
        ) throws IOException, InterruptedException { 
            int sum = 0; 
            for (IntWritable val : values) { 
                sum += val.get(); 
            } 
            result.set(sum); 
            context.write(key, result); 
        } 
    } 
 
    public static void main(String[] args) throws Exception { 
        Configuration conf = new Configuration(); 
        Job job = Job.getInstance(conf, "word count"); 
        job.setJarByClass(WordCount.class); 
        job.setMapperClass(TokenizerMapper.class); 
        job.setReducerClass(IntSumReducer.class); 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(IntWritable.class); 
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles")); 
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount")); 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
}

This class does absolutely the same that our example on Python. We create the classes TokenizerMapper and IntSumReducer, inheriting them from the classes Mapper and Reducer respectively. The classes transferred in quality of parameters of template specify types of entry and output values. Native API means that the map functions on input couple key value moves. As in our case empty – as key type we define key simply Object.

In the Main method we get mapreduce-task and we determine its parameters – name, mapper and reducer, way to HDFS where there are input data and where to put result.

For compilation we will need hadoop-ovsky libraries. I use for assembly of Maven for which cloudera have repository. Instructions on its setup can be found according to the link. As a result the pom.xmp file (which maven'om for the description of assembly of the project is used) at me has turned out following):

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
 
    <repositories> 
        <repository> 
            <id>cloudera</id> 
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> 
        </repository> 
    </repositories> 
 
    <dependencies> 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-common</artifactId> 
            <version>2.6.0-cdh5.4.2</version> 
        </dependency> 
 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-auth</artifactId> 
            <version>2.6.0-cdh5.4.2</version> 
        </dependency> 
 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-hdfs</artifactId> 
            <version>2.6.0-cdh5.4.2</version> 
        </dependency> 
 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-mapreduce-client-app</artifactId> 
            <version>2.6.0-cdh5.4.2</version> 
        </dependency> 
 
    </dependencies> 
 
    <groupId>org.dca.examples</groupId> 
    <artifactId>wordcount</artifactId> 
    <version>1.0-SNAPSHOT</version> 

</project>

Let's collect the project in jar-packet:

mvn clean package

After assembly of the project in the jar-file start happens similarly, as well as in case of the streaming-interface:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount

We wait for execution and we check result:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5 
с&09 41 
что&09 43 
на&09 82 
и&09 111 
в&09 194

As it is easy to guess, the result of execution of our native application matches result of streaming-application which we have started the last way.

Summary


In article we have considered Hadoop – program stack for work with big data, have described installation process of Hadoop on the example of cloudera distribution kit, have shown how to write mapreduce-programs, using the streaming-interface and native API Hadoop’a.

In the following articles of cycle we will consider in more detail architecture separate the Hadoop and Hadoop-related software component, we will show more difficult versions of MapReduce-programs, we will sort ways of simplification of work with MapReduce, and also restrictions of MapReduce and as to bypass these restrictions.

Thanks for attention, are ready to answer your questions.

Links to other articles of cycle:


Part 1: The principles of work with big data, MapReduce paradigm

This article is a translation of the original post at habrahabr.ru/post/268277/
If you have any questions regarding the material covered in the article above, please, contact the original author of the post.
If you have any complaints about this article or you want this article to be deleted, please, drop an email here: sysmagazine.com@gmail.com.

We believe that the knowledge, which is available at the most popular Russian IT blog habrahabr.ru, should be accessed by everyone, even though it is poorly translated.
Shared knowledge makes the world better.
Best wishes.

comments powered by Disqus