首页 > [转]自定义hadoop map/reduce输入文件切割InputFormat

[转]自定义hadoop map/reduce输入文件切割InputFormat

本文转载自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html  

  

  hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作 为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方 法。至于获得记录的方法是有不同的子类进行实现的。

        那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。 

       hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。

      但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。

      又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相 同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相 对较为复杂了,但并不是不能实现。

1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符

     1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:

public class FileInputFormatB extends FileInputFormat {

   @Override

   public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) {

        return new SearchRecordReader("");

    }

    @Override

    protected boolean isSplitable(FileSystem fs, Path filename) {

         // 输入文件不分片

        return false;

     }

}

   2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。

public class IsearchRecordReader extends RecordReader {

 private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);

 

 private CompressionCodecFactory compressionCodecs = null;

 private long start;

 private long pos;

 private long end;

 private LineReader in;

 private int maxLineLength;

 private LongWritable key = null;

 private Text value = null;

 //行分隔符,即一条记录的分隔符

 private byte[] separator = {''};

 private int sepLength = 1;

‍ public IsearchRecordReader(){

 }

 public IsearchRecordReader(String seps){

  this.separator = seps.getBytes(); 

  sepLength = separator.length;

 }

 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {

  FileSplit split = (FileSplit) genericSplit;

  Configuration job = context.getConfiguration();

  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

  this.start = split.getStart();

  this.end = (this.start + split.getLength());

  Path file = split.getPath();

  this.compressionCodecs = new CompressionCodecFactory(job);

  CompressionCodec codec = this.compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split

  FileSystem fs = file.getFileSystem(job);

  FSDataInputStream fileIn = fs.open(split.getPath());

  boolean skipFirstLine = false;

  if (codec != null) {

   this.in = new LineReader(codec.createInputStream(fileIn), job);

   this.end = Long.MAX_VALUE;

  } else {

   if (this.start != 0L) {

    skipFirstLine = true;

    this.start -= sepLength;

    fileIn.seek(this.start);

   }

   this.in = new LineReader(fileIn, job);

  }

  if (skipFirstLine) { // skip first line and re-establish "start".

   int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));

   

   if(newSize > 0){

    start += newSize;

   }

  }

  this.pos = this.start;

 }

 public boolean nextKeyValue() throws IOException {

  if (this.key == null) {

   this.key = new LongWritable();

  }

  this.key.set(this.pos);

  if (this.value == null) {

   this.value = new Text();

  }

  int newSize = 0;

  while (this.pos < this.end) {

   newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(

 (int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));

   if (newSize == 0) {

    break;

   }

   this.pos += newSize;

   if (newSize < this.maxLineLength) {

    break;

   }

   LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));

  }

  if (newSize == 0) {

   //读下一个buffer

   this.key = null;

   this.value = null;

   return false;

  }

  //读同一个buffer的下一个记录

  return true;

 }

 public LongWritable getCurrentKey() {

  return this.key;

 }

 public Text getCurrentValue() {

  return this.value;

 }

 public float getProgress() {

  if (this.start == this.end) {

   return 0.0F;

  }

  return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));

 }

 public synchronized void close() throws IOException {

  if (this.in != null)

   this.in.close();

 }

}

   3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就 是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况, 这种情况一定要加以拼接处理。

public class LineReader {

  //回车键(hadoop默认)

  //private static final byte CR = 13;

  //换行符(hadoop默认)

  //private static final byte LF = 10;

    

  //按buffer进行文件读取

  private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;

  private int bufferSize = DEFAULT_BUFFER_SIZE;

  private InputStream in;

  private byte[] buffer;

  private int bufferLength = 0;

  private int bufferPosn = 0;

  

  LineReader(InputStream in, int bufferSize) {

   this.bufferLength = 0;

    this.bufferPosn = 0;

     

   this.in = in;

   this.bufferSize = bufferSize;

   this.buffer = new byte[this.bufferSize];

  }

  public LineReader(InputStream in, Configuration conf) throws IOException {

   this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));

  }

  public void close() throws IOException {

   in.close();

  }

 public int readLine(Text str, int maxLineLength) throws IOException {

   return readLine(str, maxLineLength, Integer.MAX_VALUE);

  }

  public int readLine(Text str) throws IOException {

   return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);

  }

  //以下是需要改写的部分_start,核心代码

  public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{

   str.clear();

   Text record = new Text();

   int txtLength = 0;

   long bytesConsumed = 0L;

   boolean newline = false;

   int sepPosn = 0;

   

   do {

    //已经读到buffer的末尾了,读下一个buffer

    if (this.bufferPosn >= this.bufferLength) {

     bufferPosn = 0;

     bufferLength = in.read(buffer);

     

     //读到文件末尾了,则跳出,进行下一个文件的读取

     if (bufferLength <= 0) {

      break;

     }

    }

    

    int startPosn = this.bufferPosn;

    for (; bufferPosn < bufferLength; bufferPosn ++) {

     //处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)

     if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){

      sepPosn = 0;

     }

     

     //遇到行分隔符的第一个字符

     if (buffer[bufferPosn] == separator[sepPosn]) {

      bufferPosn ++;

      int i = 0;

      

      //判断接下来的字符是否也是行分隔符中的字符

      for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){

       

       //buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半

       if(bufferPosn + i >= bufferLength){

        bufferPosn += i - 1;

        break;

       }

       

       //一旦其中有一个字符不相同,就判定为不是分隔符

       if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){

        sepPosn = 0;

        break;

       }

      }

      

      //的确遇到了行分隔符

      if(sepPosn == sepLength){

       bufferPosn += i;

       newline = true;

       sepPosn = 0;

       break;

      }

     }

    }


    

    int readLength = this.bufferPosn - startPosn;

    bytesConsumed += readLength;

    //行分隔符不放入块中

    //int appendLength = readLength - newlineLength;

    if (readLength > maxLineLength - txtLength) {

     readLength = maxLineLength - txtLength;

    }

    if (readLength > 0) {

     record.append(this.buffer, startPosn, readLength);

     txtLength += readLength;

     

     //去掉记录的分隔符

     if(newline){

      str.set(record.getBytes(), 0, record.getLength() - sepLength);

     }

    }

   } while (!newline && (bytesConsumed < maxBytesToConsume));

   if (bytesConsumed > (long)Integer.MAX_VALUE) {

    throw new IOException("Too many bytes before newline: " + bytesConsumed);

   }

   

   return (int) bytesConsumed;

  }

  //以下是需要改写的部分_end

//以下是hadoop-core中LineReader的源码_start

public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{

    str.clear();

    int txtLength = 0;

    int newlineLength = 0;

    boolean prevCharCR = false;

    long bytesConsumed = 0L;

    do {

      int startPosn = this.bufferPosn;

      if (this.bufferPosn >= this.bufferLength) {

        startPosn = this.bufferPosn = 0;

        if (prevCharCR)  bytesConsumed ++;

        this.bufferLength = this.in.read(this.buffer);

        if (this.bufferLength <= 0)  break;

      }

      for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {

        if (this.buffer[this.bufferPosn] == LF) {

          newlineLength = (prevCharCR) ? 2 : 1;

          this.bufferPosn ++;

          break;

        }

        if (prevCharCR) {

          newlineLength = 1;

          break;

        }

        prevCharCR = this.buffer[this.bufferPosn] == CR;

      }

      int readLength = this.bufferPosn - startPosn;

      if ((prevCharCR) && (newlineLength == 0))

        --readLength;

      bytesConsumed += readLength;

      int appendLength = readLength - newlineLength;

      if (appendLength > maxLineLength - txtLength) {

        appendLength = maxLineLength - txtLength;

      }

      if (appendLength > 0) {

        str.append(this.buffer, startPosn, appendLength);

        txtLength += appendLength; }

    }

    while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));

    if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);

    return (int)bytesConsumed;

  }

//以下是hadoop-core中LineReader的源码_end

}

2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如 果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继 续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。

     这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。

转载于:https://www.cnblogs.com/Dreama/archive/2011/09/19/2181523.html

更多相关:

  •         Apache POI是一个开源的利用Java读写Excel,WORD等微软OLE2组件文档的项目。        我的需求是对Excel的数据进行导入或将数据以Excel的形式导出。先上简单的测试代码:package com.xing.studyTest.poi;import java.io.FileInputSt...

  • 要取得[a,b)的随机整数,使用(rand() % (b-a))+ a; 要取得[a,b]的随机整数,使用(rand() % (b-a+1))+ a; 要取得(a,b]的随机整数,使用(rand() % (b-a))+ a + 1; 通用公式:a + rand() % n;其中的a是起始值,n是整数的范围。 要取得a到b之间的...

  • 利用本征图像分解(Intrinsic Image Decomposition)算法,将图像分解为shading(illumination) image 和 reflectance(albedo) image,计算图像的reflectance image。 Reflectance Image 是指在变化的光照条件下能够维持不变的图像部分...

  • 题目:面试题39. 数组中出现次数超过一半的数字 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输出: 2 限制: 1 <= 数组长度 <= 50000 解题: cl...

  • 题目:二叉搜索树的后序遍历序列 输入一个整数数组,判断该数组是不是某二叉搜索树的后序遍历结果。如果是则返回 true,否则返回 false。假设输入的数组的任意两个数字都互不相同。 参考以下这颗二叉搜索树:      5     /    2   6   /  1   3示例 1: 输入: [1,6,3,2,5] 输出...

  • #include #include #include #include #include #include #include

  • 题目:表示数值的字符串 请实现一个函数用来判断字符串是否表示数值(包括整数和小数)。例如,字符串"+100"、"5e2"、"-123"、"3.1416"、"0123"及"-1E-16"都表示数值,但"12e"、"1a3.14"、"1.2.3"、"+-5"及"12e+5.4"都不是。 解题: 数值错误的形式有多种多样,但是正确的...

  • 加法伺候  //超过20位数值相加---------------------------------------- function bigNumAdd(a, b) {if (!(typeof a === "string" && typeof b === "string")) return console.log("传入参数必...

  • 业务场景: 从中文字句中匹配出指定的中文子字符串 .这样的情况我在工作中遇到非常多, 特梳理总结如下. 难点: 处理GBK和utf8之类的字符编码, 同时正则匹配Pattern中包含汉字,要汉字正常发挥作用,必须非常谨慎.推荐最好统一为utf8编码,如果不是这种最优情况,也有酌情处理. 往往一个具有普适性的正则表达式会简化程...

  • 简单record 一下 #include // 'struct sockaddr_in' #include #include // 'struct ifreq' and 'struct if_nameindex' #include #inc...

  • 再次重申awk的语法 awk [options] ‘Pattern {Actions}’ file1,file2… awk默认分隔符是空格,分隔符有分为“输入分隔符”和“输出分隔符”。 输入分隔符:awk在处理每一行文本的时候,以默认的空格将文本分隔成一个个单词作为变量。 输出分隔符:awk处理完文本之后,输出显示的时候...