MapReduce and YARN: this is how it works
Published on: Author: Gerard Simons Category: IT development and operationsMeet MapReduce: Initially a programming model devised by Google, meant for processing large datasets in multi-parallel fashion on a cluster of computers. One of the implementations in MapReduce is YARN. Let’s discover how it works. I’ll show you a few of its use cases and give a small example application.
The name ‘MapReduce’ refers to the two phases of computation a user defines. The mapping function does a transformation of the data. The reduce phase, then reduces these numbers to the final desired value. An example would be the counting of words in texts, where the mapping would increment counters for each text, and the reduce function would add all the counters for each text to get a single value. This allows us to split data, run the mapping phase in parallel independent from each other and having the reduce phase aggregate the individual results.
YARN: de facto solution
Various implementations exist, such as in Apache Hadoop, where the second implementation, called YARN is now the de facto solution, which is said to offer better performance than the original MapReduce v1 implementation.
An offshoot of this, called Apache Spark has branched off completely away from the MapReduce programming model.
Very large use cases
As this relates to a parallel massive computation model, the possible use cases are very large, and constitute basically any problem that involves large quantities of data. The use cases are similar to those of Apache Spark.
Text mining
MapReduce is often used to do text mining, parsing and indexing text files, creating word frequency histograms and search are some of the things that can be easily parallelized in MapReduce.
Web Crawling
Google initially used MapReduce in order to crawl the web in parallel. Although Google has since moved on to other techniques, Nutch is a web crawler that still uses MapReduce for example to find new (unvisited) pages and to duplicate webpages.
Document Databases (NoSQL)
MapReduce is used as a SQL variant for NoSQL databases (such as CouchDB) that store semi-structured JSON documents in key-value pairs.
The unrelational nature of these documents make SQL a tough fit and so MapReduce is usually used to query the data.
Hello World
I used this tutorial to get Hadoop up and running on Ubuntu (in single-node configuration). You should end up with a working Hadoop installation with a single namenode.
MapReduce v1
The usual ‘Hello World’ like application used in Hadoop is the WordCount. We run this example in single-node mode. To have a real idea of the performance and installation process a pseudo-distributed (using virtual machines) or a fully distributed configuration is advised.
Roughly, the steps are: create a Java file, compile it using Hadoop (bin/hadoop), wrap it in a JAR. Start DFS put the input in the node using put command and run it. Find the full steps here.
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = NEW IntWritable(1); private Text word = NEW Text(); public void map(Object KEY, Text VALUE, Context context ) throws IOException, InterruptedException { StringTokenizer itr = NEW StringTokenizer(VALUE.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable RESULT = NEW IntWritable(); public void reduce(Text KEY, Iterable<IntWritable> VALUES, Context context ) throws IOException, InterruptedException { INT SUM = 0; FOR (IntWritable val : VALUES) { SUM += val.get(); } RESULT.set(SUM); context.write(KEY, RESULT); } } public static void main(String[] args) throws Exception { Configuration conf = NEW Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, NEW Path(args[0])); FileOutputFormat.setOutputPath(job, NEW Path(args[1])); System.exit(job.waitForCompletion(TRUE) ? 0 : 1); } }
YARN: a faster alternative?
As explained, YARN is meant to replace MapReduce as a faster alternative. Below is the WordCount example in YARN. Unfortunately, it seems more difficult to find a good example like before for YARN, so this was not attempted. I did find this example code which is supposed to do the same as before but in YARN.
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } @SuppressWarnings("deprecation") job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }