Hadoop MapReduce输入

简单地说,一个MapReduce经历了输入MapReduce处理输出三个过程。输入作为重要的一层,为不同的数据形式提供了统一的输入形式,为MapReduce屏蔽了不同的输入形式,比如纯文本输入、特殊格式的文件输入(如SequenceFile或XML文件)、数据库输入等。

1. 基本概念/过程

输入过程主要经历了InputFormat的获取若干分片InputSplit和RecordReader读取分片InputSplit数据获取Key/Value两个过程。

InputSplit输入分片

输入分片是对一块数据的引用,并非数据本身,在旧版的Hadoop中是接口,新版Hadoop中是虚父类,它要求实现方法:

// 获得分片的长度,Hadoop将优先处理长度较大的输入分片
long getLength();
// 获得分片所在的主机的名称
String[] getLocations();

具体来说,HDFS文件的输入分片就对应于HDFS中的数据块,默认64M。一块数据包含了若干的Key/Value,这些由Reader负责读取。

InputFormat获得若干InputSplit

InputFormat负责提供从作业环境中获取所有InputSplit的列表和读取InputSplit的Reader。JobContext中包含了输入文件夹等全局属性。

// 获得输入分片的列表
List<InputSplit> getSplits(JobContext context);
// 获得读取分片的Reader
RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context);

RecordReader从InputSplit中读取Key/Value

一个RecordReader对应于一块数据InputSplit,以此保证线程安全。使用RecordReader之前必须调用initialize方法传入对应的InputSplit对象。然后循环地调用nextKeyValue、getCurrentKey、getCurrentValue方法完成InputSplit的读取并返回若干key/value。

// 初始化,必须在使用RecordReader之前调用
void initialize(InputSplit split, TaskAttemptContext context);
// 判断是否还有key/value可以读取
boolean nextKeyValue();
// 获得当前的key
KEYIN getCurrentKey();
// 获得当前的value
VALUEIN getCurrentValue();

关于RecordReader的设计,我感觉有些不妥,initialize方法既然必须调用,那就应该在构造对象时进行,createRecordReader传递的InputSplit对象并没有使用,这会给人造成迷惑。

2. 最常用的TextFileInputFormat

Hadoop默认提供的输入方式为TextFileInputFormat,将输入文件夹中的每一个文件的每一行作为map函数的value输入,每一行的偏移为key输入。

FileInputFormat

FileInputFormat是InputFormat的子类,TextFileInputFormat的父类,它是个虚类,实现了InputFormat的getSplits方法但没有提供RecordReader。它的主要功能:

  1. 主要提供setInputPaths和addInputPath方法允许添加输入文件夹,本质上是设置了全局配置对象configuration的mapred.input.dir属性。

  2. 通过listStatus方法将输入文件夹里的文件过滤(如去掉隐藏文件)后返回文件列表,以FileStatus对象表示一个文件。然后通过FileSystem的接口获得这些文件的分块BlockLocation。

  3. 通过getSplits方法获得若干FileSplit(继承自InputSplit),默认一个BlockLocation对应一个FileSplit。FileSplit里面记录分块数据的文件名、存放主机、起始位置、块长度等信息。

  4. 通过重写isSplitable方法,子类可以实现将整个文件作为key/value传递给map。当文件是压缩时,isSplitable应设置为false。

TextFileInputFormat

TextFileInputFormat继承自FileInputFormat,提供了RecordReader的一个实现LineRecordReader。如果文件是压缩的,则isSplitable为假,表明只能将整个文件交给LineRecordReader处理。

LineRecordReader

LineRecordReader负责读取FileInput提供的数据块内容并按每行作为一对key/value返回,主要功能:

初始化initialize方法时传入对应的FileInput对象并设置相关属性。如果文件是压缩的,则负责提供一个解压输入流;如果数据块不是文件的第一块,则忽略该块的第一行。
nextKeyValue方法负责读取下一行并作为key/value返回,这裡的实现难点是最后一行需要跨块读取,由LineReader负责。

关于分块于跨行的处理

TextFileInputFormat提供分行的输入,但是提供给它处理的FileSplit是HDFS中的分块,经常的,处于两块之间的一行将被分成两块交给两个不同的LineRecordReader处理。因此,在LineRecordReader中,对于每一块的最后一行(可能没结束),它将读取到下一块的第一行;而对于非第一块的数据块,它将直接忽略第一行。

3. 其他常用的InputFormt

文档更新时间: 2018-11-10 19:06   作者:nick