現在的位置: 首頁 > 黃專家專欄 > 正文

作業的提交和監控(二)

2014年11月03日 黃專家專欄 ⁄ 共 4632字 ⁄ 字號 評論關閉

文件分片

函數

1
2
3
4
5
6
7
8
9
10
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
    Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    maps = writeNewSplits(job, jobSubmitDir);
  } else {
    maps = writeOldSplits(jConf, jobSubmitDir);
  }
  return maps;
}

執行文件分片,并得到需要的 map 數目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public InputSplit[] getSplits(JobConf job, int numSplits)
  throws IOException {
  // 得到輸入文件的各種狀態
  FileStatus[] files = listStatus(job);

  // Save the number of input files in the job-conf
  // conf 中設置輸入文件的數目
  job.setLong(NUM_INPUT_FILES, files.length);

  // 計算總的大小
  long totalSize = 0;                           // compute total size
  for (FileStatus file: files) {                // check we have valid files
    if (file.isDir()) {
      throw new IOException("Not a file: "+ file.getPath());
    }
    totalSize += file.getLen();
  }

  // numSplits 傳進來的是 map 的數目
  // 獲得每一個分片的期望大小
  long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

  // 獲得最小的分片大小,這個可以在 mapred.min.split.size 中設置
  long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                          minSplitSize);

  // generate splits
  // 以下是生成分片的計算
  ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  NetworkTopology clusterMap = new NetworkTopology();
  for (FileStatus file: files) {
    Path path = file.getPath();
    FileSystem fs = path.getFileSystem(job);
    long length = file.getLen();
    BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    // isSplitable 是判斷該文件是否可以分片
    // 一般情況下都是可以的,但是如果是 stream compressed 的方式,那么是不可以的
    if ((length != 0) && isSplitable(fs, path)) { 
      long blockSize = file.getBlockSize();

      // 計算每一個分片大小的實際函數
      // 得到真實的分片大小
      long splitSize = computeSplitSize(goalSize, minSize, blockSize);

      long bytesRemaining = length;

      // 允許最后一個分片在 SPLIT_SLOP(默認 1.1) 比例之下
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[] splitHosts = getSplitHosts(blkLocations, 
            length-bytesRemaining, splitSize, clusterMap);
        // 加入分片
        splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
            splitHosts));
        bytesRemaining -= splitSize;
      }

      // 加入最后一個分片
      // 這個比例最大不超過期望分片的 1.1
      if (bytesRemaining != 0) {
        splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                   blkLocations[blkLocations.length-1].getHosts()));
      }
    } else if (length != 0) {
      String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
      splits.add(new FileSplit(path, 0, length, splitHosts));
    } else { 
      //Create empty hosts array for zero length files
      splits.add(new FileSplit(path, 0, length, new String[0]));
    }
  }
  LOG.debug("Total # of splits: " + splits.size());
  return splits.toArray(new FileSplit[splits.size()]);
}

protected long computeSplitSize(long goalSize, long minSize,
                                     long blockSize) {
  // 計算分片大小,很明顯
  // 這里設定了最大最小值,每一個分片大小在 minSize 和 blockSize 之間
  return Math.max(minSize, Math.min(goalSize, blockSize));
}

這樣看,要想設置超過大于 block size 的也是可以的,只要將 minSize 設置很大即可 以上分片算法只是單純計算需要多少個 map ,根據設定的 mapred.map.tasks 計算出這個任務需要多少個 map 最終的 map 數目,可能和 mapred.map.tasks 不同

但是這樣仍然會有一個問題,就是這個只是按照輸入文件的大小做邏輯的切分,但是如果文件中含有邊界(比如 Text 文件就是以行作為邊界),那么實際的劃分就不一定是這樣的。

這個是由 RecordReader 實現的,它將某一個 split 解析成一個個 key 和 value 對

我們看看實際的 TextInputFormat 類,它其實生成了 LineRecordReader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public LineRecordReader(Configuration job, FileSplit split,
    byte[] recordDelimiter) throws IOException {
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                  Integer.MAX_VALUE);

  // 得到文件開始和結束的位置
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();
  compressionCodecs = new CompressionCodecFactory(job);
  final CompressionCodec codec = compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());

  // skipFirstLine 表示跳過第一行
  boolean skipFirstLine = false;
  if (codec != null) {
    in = new LineReader(codec.createInputStream(fileIn), job,
          recordDelimiter);
    end = Long.MAX_VALUE;
  } else {
    if (start != 0) {
      // 如果開始的位置不是整個文件的開始
      // 那么,有可能是在行的中間, LineRecordReader 的處理方式是跳過這行,從下一行處理起
      skipFirstLine = true;
      --start;
      fileIn.seek(start);
    }
    in = new LineReader(fileIn, job, recordDelimiter);
  }
  if (skipFirstLine) {  // skip first line and re-establish "start".
      // 跳過第一行
    start += in.readLine(new Text(), 0,
                         (int)Math.min((long)Integer.MAX_VALUE, end - start));
  }
  this.pos = start;
}

public synchronized boolean next(LongWritable key, Text value)
  throws IOException {

  while (pos < end) {
    key.set(pos);

    // 在這里, 會處理一個完整行
    // 但是有可能最后一行的另外一個部分在另一個 split 里面
    // 但是 FSDataInputStream fileIn 作為一個抽象,這樣的操作使得對 Reader 透明了
    int newSize = in.readLine(value, maxLineLength,
                              Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                       maxLineLength));
    if (newSize == 0) {
      return false;
    }
    pos += newSize;
    if (newSize < maxLineLength) {
      return true;
    }

    // line too long. try again
    LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
  }

  return false;
}

以上代碼我們可以知道,TextInputFormat 生成的 LineRecordReader 會根據行邊界來切分,避免了 split 邏輯分片不考慮邊界的情況。

其實 SequenceFileInputFormat 輸入也同樣有邊界問題,這是根據創建時候的序列點來實現的。 具體代碼可以看 SequenceFileRecordReader 里面的實現

抱歉!評論已關閉.

奔驰宝马破解版下载 买平码中平码怎么算钱 英超直播网站 炒股游戏app 三分彩是合法吗 捕鱼王者归来app 山西大唐麻将 上海15选5开奖走势图 网络上赚钱 龙兴山西麻将俱乐部 上海天天彩选4历史 大盘走势图怎么看 豪利棋牌天天送6元 李奎打鱼怎么玩才能赢钱 看股票软件 每天送6金币棋牌游 …? 兼职网赚联盟