Step1- Creating a file
J$ cat>file.txt
hi how are you
how is your job
how is your family
how is your brother
how is your sister
what is the time now
what is the strength of hadoop
Step2- loading file.txt from local file system to HDFS
J$ hadoop fs -put file.txt file
Step3- Writing programs
- DriverCode.java
- MapperCode.java
- ReducerCode.java
Step4- Compiling all above .java files
J$ javac -classpath $HADOOP_HOME/hadoop-core.jar *.java
Step5- Creating jar file
J$ jar cvf job.jar *.class
Step6- Running above job.jar on file (which there in HDFS)
J$ hadoop jar job.jar DriverCode file TestOutput
Lets start with actual code for these steps above.
Hello World Job -> WordCountJob
1. DriverCode (WordCount.java)
package com.doj.hadoop.driver; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; /** * @author Dinesh Rajput * */ public class WordCount extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } JobConf conf = new JobConf(WordCount.class); conf.setJobName("Word Count"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(WordMapper.class); conf.setCombinerClass(WordReducer.class); conf.setReducerClass(WordReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); return conf.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } }
2. MapperCode (WordMapper.java)
package com.doj.hadoop.driver; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * @author Dinesh Rajput * */ public class WordMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } }
3. ReducedCode (WordReducer.java)
package com.doj.hadoop.driver; /** * @author Dinesh Rajput * */ import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class WordReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }