001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.input; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.fs.FileStatus; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.fs.PathFilter; 034 import org.apache.hadoop.fs.BlockLocation; 035 import org.apache.hadoop.mapreduce.InputFormat; 036 import org.apache.hadoop.mapreduce.InputSplit; 037 import org.apache.hadoop.mapreduce.Job; 038 import org.apache.hadoop.mapreduce.JobContext; 039 import org.apache.hadoop.mapreduce.Mapper; 040 import org.apache.hadoop.mapreduce.security.TokenCache; 041 import org.apache.hadoop.util.ReflectionUtils; 042 import org.apache.hadoop.util.StringUtils; 043 044 /** 045 * A base class for file-based {@link InputFormat}s. 046 * 047 * <p><code>FileInputFormat</code> is the base class for all file-based 048 * <code>InputFormat</code>s. This provides a generic implementation of 049 * {@link #getSplits(JobContext)}. 050 * Subclasses of <code>FileInputFormat</code> can also override the 051 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 052 * not split-up and are processed as a whole by {@link Mapper}s. 053 */ 054 @InterfaceAudience.Public 055 @InterfaceStability.Stable 056 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 057 public static final String INPUT_DIR = 058 "mapreduce.input.fileinputformat.inputdir"; 059 public static final String SPLIT_MAXSIZE = 060 "mapreduce.input.fileinputformat.split.maxsize"; 061 public static final String SPLIT_MINSIZE = 062 "mapreduce.input.fileinputformat.split.minsize"; 063 public static final String PATHFILTER_CLASS = 064 "mapreduce.input.pathFilter.class"; 065 public static final String NUM_INPUT_FILES = 066 "mapreduce.input.fileinputformat.numinputfiles"; 067 068 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 069 070 private static final double SPLIT_SLOP = 1.1; // 10% slop 071 072 private static final PathFilter hiddenFileFilter = new PathFilter(){ 073 public boolean accept(Path p){ 074 String name = p.getName(); 075 return !name.startsWith("_") && !name.startsWith("."); 076 } 077 }; 078 079 /** 080 * Proxy PathFilter that accepts a path only if all filters given in the 081 * constructor do. Used by the listPaths() to apply the built-in 082 * hiddenFileFilter together with a user provided one (if any). 083 */ 084 private static class MultiPathFilter implements PathFilter { 085 private List<PathFilter> filters; 086 087 public MultiPathFilter(List<PathFilter> filters) { 088 this.filters = filters; 089 } 090 091 public boolean accept(Path path) { 092 for (PathFilter filter : filters) { 093 if (!filter.accept(path)) { 094 return false; 095 } 096 } 097 return true; 098 } 099 } 100 101 /** 102 * Get the lower bound on split size imposed by the format. 103 * @return the number of bytes of the minimal split for this format 104 */ 105 protected long getFormatMinSplitSize() { 106 return 1; 107 } 108 109 /** 110 * Is the given filename splitable? Usually, true, but if the file is 111 * stream compressed, it will not be. 112 * 113 * <code>FileInputFormat</code> implementations can override this and return 114 * <code>false</code> to ensure that individual input files are never split-up 115 * so that {@link Mapper}s process entire files. 116 * 117 * @param context the job context 118 * @param filename the file name to check 119 * @return is this file splitable? 120 */ 121 protected boolean isSplitable(JobContext context, Path filename) { 122 return true; 123 } 124 125 /** 126 * Set a PathFilter to be applied to the input paths for the map-reduce job. 127 * @param job the job to modify 128 * @param filter the PathFilter class use for filtering the input paths. 129 */ 130 public static void setInputPathFilter(Job job, 131 Class<? extends PathFilter> filter) { 132 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 133 PathFilter.class); 134 } 135 136 /** 137 * Set the minimum input split size 138 * @param job the job to modify 139 * @param size the minimum size 140 */ 141 public static void setMinInputSplitSize(Job job, 142 long size) { 143 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 144 } 145 146 /** 147 * Get the minimum split size 148 * @param job the job 149 * @return the minimum number of bytes that can be in a split 150 */ 151 public static long getMinSplitSize(JobContext job) { 152 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 153 } 154 155 /** 156 * Set the maximum split size 157 * @param job the job to modify 158 * @param size the maximum split size 159 */ 160 public static void setMaxInputSplitSize(Job job, 161 long size) { 162 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 163 } 164 165 /** 166 * Get the maximum split size. 167 * @param context the job to look at. 168 * @return the maximum number of bytes a split can include 169 */ 170 public static long getMaxSplitSize(JobContext context) { 171 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 172 Long.MAX_VALUE); 173 } 174 175 /** 176 * Get a PathFilter instance of the filter set for the input paths. 177 * 178 * @return the PathFilter instance set for the job, NULL if none has been set. 179 */ 180 public static PathFilter getInputPathFilter(JobContext context) { 181 Configuration conf = context.getConfiguration(); 182 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 183 PathFilter.class); 184 return (filterClass != null) ? 185 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 186 } 187 188 /** List input directories. 189 * Subclasses may override to, e.g., select only files matching a regular 190 * expression. 191 * 192 * @param job the job to list input paths for 193 * @return array of FileStatus objects 194 * @throws IOException if zero items. 195 */ 196 protected List<FileStatus> listStatus(JobContext job 197 ) throws IOException { 198 List<FileStatus> result = new ArrayList<FileStatus>(); 199 Path[] dirs = getInputPaths(job); 200 if (dirs.length == 0) { 201 throw new IOException("No input paths specified in job"); 202 } 203 204 // get tokens for all the required FileSystems.. 205 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 206 job.getConfiguration()); 207 208 List<IOException> errors = new ArrayList<IOException>(); 209 210 // creates a MultiPathFilter with the hiddenFileFilter and the 211 // user provided one (if any). 212 List<PathFilter> filters = new ArrayList<PathFilter>(); 213 filters.add(hiddenFileFilter); 214 PathFilter jobFilter = getInputPathFilter(job); 215 if (jobFilter != null) { 216 filters.add(jobFilter); 217 } 218 PathFilter inputFilter = new MultiPathFilter(filters); 219 220 for (int i=0; i < dirs.length; ++i) { 221 Path p = dirs[i]; 222 FileSystem fs = p.getFileSystem(job.getConfiguration()); 223 FileStatus[] matches = fs.globStatus(p, inputFilter); 224 if (matches == null) { 225 errors.add(new IOException("Input path does not exist: " + p)); 226 } else if (matches.length == 0) { 227 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 228 } else { 229 for (FileStatus globStat: matches) { 230 if (globStat.isDirectory()) { 231 for(FileStatus stat: fs.listStatus(globStat.getPath(), 232 inputFilter)) { 233 result.add(stat); 234 } 235 } else { 236 result.add(globStat); 237 } 238 } 239 } 240 } 241 242 if (!errors.isEmpty()) { 243 throw new InvalidInputException(errors); 244 } 245 LOG.info("Total input paths to process : " + result.size()); 246 return result; 247 } 248 249 /** 250 * A factory that makes the split for this class. It can be overridden 251 * by sub-classes to make sub-types 252 */ 253 protected FileSplit makeSplit(Path file, long start, long length, 254 String[] hosts) { 255 return new FileSplit(file, start, length, hosts); 256 } 257 258 /** 259 * Generate the list of files and make them into FileSplits. 260 * @param job the job context 261 * @throws IOException 262 */ 263 public List<InputSplit> getSplits(JobContext job) throws IOException { 264 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 265 long maxSize = getMaxSplitSize(job); 266 267 // generate splits 268 List<InputSplit> splits = new ArrayList<InputSplit>(); 269 List<FileStatus> files = listStatus(job); 270 for (FileStatus file: files) { 271 Path path = file.getPath(); 272 long length = file.getLen(); 273 if (length != 0) { 274 FileSystem fs = path.getFileSystem(job.getConfiguration()); 275 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 276 if (isSplitable(job, path)) { 277 long blockSize = file.getBlockSize(); 278 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 279 280 long bytesRemaining = length; 281 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 282 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 283 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 284 blkLocations[blkIndex].getHosts())); 285 bytesRemaining -= splitSize; 286 } 287 288 if (bytesRemaining != 0) { 289 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 290 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 291 blkLocations[blkIndex].getHosts())); 292 } 293 } else { // not splitable 294 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); 295 } 296 } else { 297 //Create empty hosts array for zero length files 298 splits.add(makeSplit(path, 0, length, new String[0])); 299 } 300 } 301 // Save the number of input files for metrics/loadgen 302 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 303 LOG.debug("Total # of splits: " + splits.size()); 304 return splits; 305 } 306 307 protected long computeSplitSize(long blockSize, long minSize, 308 long maxSize) { 309 return Math.max(minSize, Math.min(maxSize, blockSize)); 310 } 311 312 protected int getBlockIndex(BlockLocation[] blkLocations, 313 long offset) { 314 for (int i = 0 ; i < blkLocations.length; i++) { 315 // is the offset inside this block? 316 if ((blkLocations[i].getOffset() <= offset) && 317 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 318 return i; 319 } 320 } 321 BlockLocation last = blkLocations[blkLocations.length -1]; 322 long fileLength = last.getOffset() + last.getLength() -1; 323 throw new IllegalArgumentException("Offset " + offset + 324 " is outside of file (0.." + 325 fileLength + ")"); 326 } 327 328 /** 329 * Sets the given comma separated paths as the list of inputs 330 * for the map-reduce job. 331 * 332 * @param job the job 333 * @param commaSeparatedPaths Comma separated paths to be set as 334 * the list of inputs for the map-reduce job. 335 */ 336 public static void setInputPaths(Job job, 337 String commaSeparatedPaths 338 ) throws IOException { 339 setInputPaths(job, StringUtils.stringToPath( 340 getPathStrings(commaSeparatedPaths))); 341 } 342 343 /** 344 * Add the given comma separated paths to the list of inputs for 345 * the map-reduce job. 346 * 347 * @param job The job to modify 348 * @param commaSeparatedPaths Comma separated paths to be added to 349 * the list of inputs for the map-reduce job. 350 */ 351 public static void addInputPaths(Job job, 352 String commaSeparatedPaths 353 ) throws IOException { 354 for (String str : getPathStrings(commaSeparatedPaths)) { 355 addInputPath(job, new Path(str)); 356 } 357 } 358 359 /** 360 * Set the array of {@link Path}s as the list of inputs 361 * for the map-reduce job. 362 * 363 * @param job The job to modify 364 * @param inputPaths the {@link Path}s of the input directories/files 365 * for the map-reduce job. 366 */ 367 public static void setInputPaths(Job job, 368 Path... inputPaths) throws IOException { 369 Configuration conf = job.getConfiguration(); 370 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 371 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 372 for(int i = 1; i < inputPaths.length;i++) { 373 str.append(StringUtils.COMMA_STR); 374 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 375 str.append(StringUtils.escapeString(path.toString())); 376 } 377 conf.set(INPUT_DIR, str.toString()); 378 } 379 380 /** 381 * Add a {@link Path} to the list of inputs for the map-reduce job. 382 * 383 * @param job The {@link Job} to modify 384 * @param path {@link Path} to be added to the list of inputs for 385 * the map-reduce job. 386 */ 387 public static void addInputPath(Job job, 388 Path path) throws IOException { 389 Configuration conf = job.getConfiguration(); 390 path = path.getFileSystem(conf).makeQualified(path); 391 String dirStr = StringUtils.escapeString(path.toString()); 392 String dirs = conf.get(INPUT_DIR); 393 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 394 } 395 396 // This method escapes commas in the glob pattern of the given paths. 397 private static String[] getPathStrings(String commaSeparatedPaths) { 398 int length = commaSeparatedPaths.length(); 399 int curlyOpen = 0; 400 int pathStart = 0; 401 boolean globPattern = false; 402 List<String> pathStrings = new ArrayList<String>(); 403 404 for (int i=0; i<length; i++) { 405 char ch = commaSeparatedPaths.charAt(i); 406 switch(ch) { 407 case '{' : { 408 curlyOpen++; 409 if (!globPattern) { 410 globPattern = true; 411 } 412 break; 413 } 414 case '}' : { 415 curlyOpen--; 416 if (curlyOpen == 0 && globPattern) { 417 globPattern = false; 418 } 419 break; 420 } 421 case ',' : { 422 if (!globPattern) { 423 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 424 pathStart = i + 1 ; 425 } 426 break; 427 } 428 } 429 } 430 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 431 432 return pathStrings.toArray(new String[0]); 433 } 434 435 /** 436 * Get the list of input {@link Path}s for the map-reduce job. 437 * 438 * @param context The job 439 * @return the list of input {@link Path}s for the map-reduce job. 440 */ 441 public static Path[] getInputPaths(JobContext context) { 442 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 443 String [] list = StringUtils.split(dirs); 444 Path[] result = new Path[list.length]; 445 for (int i = 0; i < list.length; i++) { 446 result[i] = new Path(StringUtils.unEscapeString(list[i])); 447 } 448 return result; 449 } 450 451 }