2014年10月10日 星期五

[JAVA] Big Data(4) Hadoop Multiple Input

Multiple Input

在map-reduce時,若不同的資料來源要塞給不同的mapper,最後再一起塞進reducer運算,就需要使用Multiple Input 的功能。如下圖,有三個不同的資料來源,先分別進入不同的mapper,然後最後要進到同一個reducer。



首先要先import需要的的類別
org.apache.hadoop.mapreduce.lib.input.MultipleInputs

然後在主程式(main)中寫入下面這一行,告訴電腦你要把哪一筆資料送進哪一個Mapper class的map函數

MultipleInputs.addInputPath(Job名稱, 輸入資料的位址, 格式Mapper class的名字);

接下來看一個簡單到近乎無腦的例子
假設手上有三筆資料,都包含學號、科目和成績,但是長相就是不太一樣,現在我們要計算各科的平均分數

第一筆
$HADOOP/hadoop fs -cat input_1 | head -3
WUJ-360100;math;56
WPY-802007;math;98
FKT-670008;science;67
第二筆
$HADOOP/hadoop fs -cat input_2 | head -3
{Number:FJB-004150, Subject:math, Score:96}
{Number:QDG-300700, Subject:chinese, Score:90}
{Number:JVY-030140, Subject:chinese, Score:71}
第三筆
$HADOOP/hadoop fs -cat input_3 | head -3
[Number=>ITM-501806; Subject=>science; Score=>82]
[Number=>QBE-003981; Subject=>math; Score=>85]
[Number=>EUJ-017009; Subject=>chinese; Score=>63]
以上三種長相的資料要分別送給三種不同的Mapper中處理,產生(subject, score)的pair然後統一送進一個Reducer做平均數的計算,所以要準備三種Mapper
public static class Map1 extends Mapper
{
      public void map(LongWritable key, Text value, Context con) 
        throws IOException, InterruptedException
      {
              // get the student number
              String stNum = value.toString().split(";")[1];

              // get score
              int score = Integer.parseInt(value.toString().split(";")[2]);
              con.write(new Text(stNum), new IntWritable(score));
      }
}
public static class Map2 extends Mapper
{
      public void map(LongWritable key, Text value, Context con) 
        throws IOException, InterruptedException
      {       
              // "shave" the input value
              String line = value.toString().replaceAll("}", "");

              if(line.contains(",")){
                      // get the student number
                      String stNum = line.split(",")[1].split(":")[1];

                      // get score
                      int score = Integer.parseInt(line.split(",")[2].split(":")[1]);
                      con.write(new Text(stNum), new IntWritable(score));
              }
      }
}
public static class Map3 extends Mapper
{
      public void map(LongWritable key, Text value, Context con) 
        throws IOException, InterruptedException
      {
              // "shave" the input value
              String line=value.toString().replaceAll("[]\\[]", "");


              if(line.contains(";")){
                // get the student number
                String stNum = line.split(";")[1].split("=>")[1];

                // get score
                int score = Integer.parseInt(line.split(";")[2].split("=>")[1]);
                con.write(new Text(stNum), new IntWritable(score));
              }
      }
}
Reducer其實就只需要一個就可以了
public static class Red extends Reducer
{
     public void reduce(Text stNum, Iterable scores, Context con)
      throws IOException , InterruptedException
      {
              int numerator = 0;
              int denominator = 0;
              for (IntWritable v : scores){
                  numerator += v.get();
                  denominator ++;
              }
              int avg = numerator/denominator;
              con.write(stNum, new IntWritable(avg));
      }
}
然後是比較麻煩的主程式
public static void main(String[] args) throws Exception
{
      Configuration conf=new Configuration();
      String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
      Path inPath1=new Path(files[0]);
      Path inPath2=new Path(files[1]);
      Path inPath3=new Path(files[2]);
      Path outPath=new Path(files[3]);
      FileSystem hdfs = outPath.getFileSystem(conf);
      if (hdfs.exists(outPath)){
        hdfs.delete(outPath, true);
      };

      Job exampleJob = new Job(conf,"example");
      exampleJob.setJarByClass(MpInputExp.class);
      exampleJob.setMapperClass(Map1.class);
      exampleJob.setMapperClass(Map2.class);
      exampleJob.setMapperClass(Map3.class);
      exampleJob.setReducerClass(Red.class);
      exampleJob.setOutputKeyClass(Text.class);
      exampleJob.setOutputValueClass(IntWritable.class);

      MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class);
      MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class);
      MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map3.class);
      
      FileOutputFormat.setOutputPath(exampleJob, outPath);
      System.exit(exampleJob.waitForCompletion(true) ? 0:1);
}
要注意MultipleInputs.addInputPath有沒有把Input和Mapper配對好

最後來看結果(打包部分省略,可以參考這裡)
$HADOOP/hadoop fs -getmerge output_exp output_exp
cat output_exp

science 68
chinese 70
math    68
送上所有JAVA code結束這惱人的一切
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MpInputExp
{
  public static class Map1 extends Mapper
  {
        public void map(LongWritable key, Text value, Context con) 
          throws IOException, InterruptedException
        {
                // get the student number
                String stNum = value.toString().split(";")[1];

                // get score
                int score = Integer.parseInt(value.toString().split(";")[2]);
                con.write(new Text(stNum), new IntWritable(score));
        }
  }
  public static class Map2 extends Mapper
  {
        public void map(LongWritable key, Text value, Context con) 
          throws IOException, InterruptedException
        {       
                // "shave" the input value
                String line = value.toString().replaceAll("}", "");

                if(line.contains(",")){
                        // get the student number
                        String stNum = line.split(",")[1].split(":")[1];

                        // get score
                        int score = Integer.parseInt(line.split(",")[2].split(":")[1]);
                        con.write(new Text(stNum), new IntWritable(score));
                }
        }
  }
  public static class Map3 extends Mapper
  {
        public void map(LongWritable key, Text value, Context con) 
          throws IOException, InterruptedException
        {
                // "shave" the input value
                String line=value.toString().replaceAll("[]\\[]", "");


                if(line.contains(";")){
                  // get the student number
                  String stNum = line.split(";")[1].split("=>")[1];

                  // get score
                  int score = Integer.parseInt(line.split(";")[2].split("=>")[1]);
                  con.write(new Text(stNum), new IntWritable(score));
                }
        }
  }
  public static class Red extends Reducer
  {
       public void reduce(Text stNum, Iterable scores, Context con)
        throws IOException , InterruptedException
        {
                int numerator = 0;
                int denominator = 0;
                for (IntWritable v : scores){
                    numerator += v.get();
                    denominator ++;
                }
                int avg = numerator/denominator;
                con.write(stNum, new IntWritable(avg));
        }
   }
  public static void main(String[] args) throws Exception
  {
        Configuration conf=new Configuration();
        String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
        Path inPath1=new Path(files[0]);
        Path inPath2=new Path(files[1]);
        Path inPath3=new Path(files[2]);
        Path outPath=new Path(files[3]);
        FileSystem hdfs = outPath.getFileSystem(conf);
        if (hdfs.exists(outPath)){
          hdfs.delete(outPath, true);
        };

        Job exampleJob = new Job(conf,"example");
        exampleJob.setJarByClass(MpInputExp.class);
        exampleJob.setMapperClass(Map1.class);
        exampleJob.setMapperClass(Map2.class);
        exampleJob.setMapperClass(Map3.class);
        exampleJob.setReducerClass(Red.class);
        exampleJob.setOutputKeyClass(Text.class);
        exampleJob.setOutputValueClass(IntWritable.class);

        MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class);
        MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class);
        MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map2.class);
        
        FileOutputFormat.setOutputPath(exampleJob, outPath);
        System.exit(exampleJob.waitForCompletion(true) ? 0:1);
  }
}

沒有留言:

張貼留言