001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileStatus;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.fs.PathFilter;
034    import org.apache.hadoop.fs.BlockLocation;
035    import org.apache.hadoop.mapreduce.InputFormat;
036    import org.apache.hadoop.mapreduce.InputSplit;
037    import org.apache.hadoop.mapreduce.Job;
038    import org.apache.hadoop.mapreduce.JobContext;
039    import org.apache.hadoop.mapreduce.Mapper;
040    import org.apache.hadoop.mapreduce.security.TokenCache;
041    import org.apache.hadoop.util.ReflectionUtils;
042    import org.apache.hadoop.util.StringUtils;
043    
044    /** 
045     * A base class for file-based {@link InputFormat}s.
046     * 
047     * <p><code>FileInputFormat</code> is the base class for all file-based 
048     * <code>InputFormat</code>s. This provides a generic implementation of
049     * {@link #getSplits(JobContext)}.
050     * Subclasses of <code>FileInputFormat</code> can also override the 
051     * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
052     * not split-up and are processed as a whole by {@link Mapper}s.
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
057      public static final String INPUT_DIR = 
058        "mapreduce.input.fileinputformat.inputdir";
059      public static final String SPLIT_MAXSIZE = 
060        "mapreduce.input.fileinputformat.split.maxsize";
061      public static final String SPLIT_MINSIZE = 
062        "mapreduce.input.fileinputformat.split.minsize";
063      public static final String PATHFILTER_CLASS = 
064        "mapreduce.input.pathFilter.class";
065      public static final String NUM_INPUT_FILES =
066        "mapreduce.input.fileinputformat.numinputfiles";
067    
068      private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
069    
070      private static final double SPLIT_SLOP = 1.1;   // 10% slop
071    
072      private static final PathFilter hiddenFileFilter = new PathFilter(){
073          public boolean accept(Path p){
074            String name = p.getName(); 
075            return !name.startsWith("_") && !name.startsWith("."); 
076          }
077        }; 
078    
079      /**
080       * Proxy PathFilter that accepts a path only if all filters given in the
081       * constructor do. Used by the listPaths() to apply the built-in
082       * hiddenFileFilter together with a user provided one (if any).
083       */
084      private static class MultiPathFilter implements PathFilter {
085        private List<PathFilter> filters;
086    
087        public MultiPathFilter(List<PathFilter> filters) {
088          this.filters = filters;
089        }
090    
091        public boolean accept(Path path) {
092          for (PathFilter filter : filters) {
093            if (!filter.accept(path)) {
094              return false;
095            }
096          }
097          return true;
098        }
099      }
100    
101      /**
102       * Get the lower bound on split size imposed by the format.
103       * @return the number of bytes of the minimal split for this format
104       */
105      protected long getFormatMinSplitSize() {
106        return 1;
107      }
108    
109      /**
110       * Is the given filename splitable? Usually, true, but if the file is
111       * stream compressed, it will not be.
112       * 
113       * <code>FileInputFormat</code> implementations can override this and return
114       * <code>false</code> to ensure that individual input files are never split-up
115       * so that {@link Mapper}s process entire files.
116       * 
117       * @param context the job context
118       * @param filename the file name to check
119       * @return is this file splitable?
120       */
121      protected boolean isSplitable(JobContext context, Path filename) {
122        return true;
123      }
124    
125      /**
126       * Set a PathFilter to be applied to the input paths for the map-reduce job.
127       * @param job the job to modify
128       * @param filter the PathFilter class use for filtering the input paths.
129       */
130      public static void setInputPathFilter(Job job,
131                                            Class<? extends PathFilter> filter) {
132        job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
133                                        PathFilter.class);
134      }
135    
136      /**
137       * Set the minimum input split size
138       * @param job the job to modify
139       * @param size the minimum size
140       */
141      public static void setMinInputSplitSize(Job job,
142                                              long size) {
143        job.getConfiguration().setLong(SPLIT_MINSIZE, size);
144      }
145    
146      /**
147       * Get the minimum split size
148       * @param job the job
149       * @return the minimum number of bytes that can be in a split
150       */
151      public static long getMinSplitSize(JobContext job) {
152        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
153      }
154    
155      /**
156       * Set the maximum split size
157       * @param job the job to modify
158       * @param size the maximum split size
159       */
160      public static void setMaxInputSplitSize(Job job,
161                                              long size) {
162        job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
163      }
164    
165      /**
166       * Get the maximum split size.
167       * @param context the job to look at.
168       * @return the maximum number of bytes a split can include
169       */
170      public static long getMaxSplitSize(JobContext context) {
171        return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
172                                                  Long.MAX_VALUE);
173      }
174    
175      /**
176       * Get a PathFilter instance of the filter set for the input paths.
177       *
178       * @return the PathFilter instance set for the job, NULL if none has been set.
179       */
180      public static PathFilter getInputPathFilter(JobContext context) {
181        Configuration conf = context.getConfiguration();
182        Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
183            PathFilter.class);
184        return (filterClass != null) ?
185            (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
186      }
187    
188      /** List input directories.
189       * Subclasses may override to, e.g., select only files matching a regular
190       * expression. 
191       * 
192       * @param job the job to list input paths for
193       * @return array of FileStatus objects
194       * @throws IOException if zero items.
195       */
196      protected List<FileStatus> listStatus(JobContext job
197                                            ) throws IOException {
198        List<FileStatus> result = new ArrayList<FileStatus>();
199        Path[] dirs = getInputPaths(job);
200        if (dirs.length == 0) {
201          throw new IOException("No input paths specified in job");
202        }
203        
204        // get tokens for all the required FileSystems..
205        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
206                                            job.getConfiguration());
207    
208        List<IOException> errors = new ArrayList<IOException>();
209        
210        // creates a MultiPathFilter with the hiddenFileFilter and the
211        // user provided one (if any).
212        List<PathFilter> filters = new ArrayList<PathFilter>();
213        filters.add(hiddenFileFilter);
214        PathFilter jobFilter = getInputPathFilter(job);
215        if (jobFilter != null) {
216          filters.add(jobFilter);
217        }
218        PathFilter inputFilter = new MultiPathFilter(filters);
219        
220        for (int i=0; i < dirs.length; ++i) {
221          Path p = dirs[i];
222          FileSystem fs = p.getFileSystem(job.getConfiguration()); 
223          FileStatus[] matches = fs.globStatus(p, inputFilter);
224          if (matches == null) {
225            errors.add(new IOException("Input path does not exist: " + p));
226          } else if (matches.length == 0) {
227            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
228          } else {
229            for (FileStatus globStat: matches) {
230              if (globStat.isDirectory()) {
231                for(FileStatus stat: fs.listStatus(globStat.getPath(),
232                    inputFilter)) {
233                  result.add(stat);
234                }          
235              } else {
236                result.add(globStat);
237              }
238            }
239          }
240        }
241    
242        if (!errors.isEmpty()) {
243          throw new InvalidInputException(errors);
244        }
245        LOG.info("Total input paths to process : " + result.size()); 
246        return result;
247      }
248      
249      /**
250       * A factory that makes the split for this class. It can be overridden
251       * by sub-classes to make sub-types
252       */
253      protected FileSplit makeSplit(Path file, long start, long length, 
254                                    String[] hosts) {
255        return new FileSplit(file, start, length, hosts);
256      }
257    
258      /** 
259       * Generate the list of files and make them into FileSplits.
260       * @param job the job context
261       * @throws IOException
262       */
263      public List<InputSplit> getSplits(JobContext job) throws IOException {
264        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
265        long maxSize = getMaxSplitSize(job);
266    
267        // generate splits
268        List<InputSplit> splits = new ArrayList<InputSplit>();
269        List<FileStatus> files = listStatus(job);
270        for (FileStatus file: files) {
271          Path path = file.getPath();
272          long length = file.getLen();
273          if (length != 0) {
274            FileSystem fs = path.getFileSystem(job.getConfiguration());
275            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
276            if (isSplitable(job, path)) {
277              long blockSize = file.getBlockSize();
278              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
279    
280              long bytesRemaining = length;
281              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
282                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
283                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
284                                         blkLocations[blkIndex].getHosts()));
285                bytesRemaining -= splitSize;
286              }
287    
288              if (bytesRemaining != 0) {
289                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
290                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
291                           blkLocations[blkIndex].getHosts()));
292              }
293            } else { // not splitable
294              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
295            }
296          } else { 
297            //Create empty hosts array for zero length files
298            splits.add(makeSplit(path, 0, length, new String[0]));
299          }
300        }
301        // Save the number of input files for metrics/loadgen
302        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
303        LOG.debug("Total # of splits: " + splits.size());
304        return splits;
305      }
306    
307      protected long computeSplitSize(long blockSize, long minSize,
308                                      long maxSize) {
309        return Math.max(minSize, Math.min(maxSize, blockSize));
310      }
311    
312      protected int getBlockIndex(BlockLocation[] blkLocations, 
313                                  long offset) {
314        for (int i = 0 ; i < blkLocations.length; i++) {
315          // is the offset inside this block?
316          if ((blkLocations[i].getOffset() <= offset) &&
317              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
318            return i;
319          }
320        }
321        BlockLocation last = blkLocations[blkLocations.length -1];
322        long fileLength = last.getOffset() + last.getLength() -1;
323        throw new IllegalArgumentException("Offset " + offset + 
324                                           " is outside of file (0.." +
325                                           fileLength + ")");
326      }
327    
328      /**
329       * Sets the given comma separated paths as the list of inputs 
330       * for the map-reduce job.
331       * 
332       * @param job the job
333       * @param commaSeparatedPaths Comma separated paths to be set as 
334       *        the list of inputs for the map-reduce job.
335       */
336      public static void setInputPaths(Job job, 
337                                       String commaSeparatedPaths
338                                       ) throws IOException {
339        setInputPaths(job, StringUtils.stringToPath(
340                            getPathStrings(commaSeparatedPaths)));
341      }
342    
343      /**
344       * Add the given comma separated paths to the list of inputs for
345       *  the map-reduce job.
346       * 
347       * @param job The job to modify
348       * @param commaSeparatedPaths Comma separated paths to be added to
349       *        the list of inputs for the map-reduce job.
350       */
351      public static void addInputPaths(Job job, 
352                                       String commaSeparatedPaths
353                                       ) throws IOException {
354        for (String str : getPathStrings(commaSeparatedPaths)) {
355          addInputPath(job, new Path(str));
356        }
357      }
358    
359      /**
360       * Set the array of {@link Path}s as the list of inputs
361       * for the map-reduce job.
362       * 
363       * @param job The job to modify 
364       * @param inputPaths the {@link Path}s of the input directories/files 
365       * for the map-reduce job.
366       */ 
367      public static void setInputPaths(Job job, 
368                                       Path... inputPaths) throws IOException {
369        Configuration conf = job.getConfiguration();
370        Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
371        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
372        for(int i = 1; i < inputPaths.length;i++) {
373          str.append(StringUtils.COMMA_STR);
374          path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
375          str.append(StringUtils.escapeString(path.toString()));
376        }
377        conf.set(INPUT_DIR, str.toString());
378      }
379    
380      /**
381       * Add a {@link Path} to the list of inputs for the map-reduce job.
382       * 
383       * @param job The {@link Job} to modify
384       * @param path {@link Path} to be added to the list of inputs for 
385       *            the map-reduce job.
386       */
387      public static void addInputPath(Job job, 
388                                      Path path) throws IOException {
389        Configuration conf = job.getConfiguration();
390        path = path.getFileSystem(conf).makeQualified(path);
391        String dirStr = StringUtils.escapeString(path.toString());
392        String dirs = conf.get(INPUT_DIR);
393        conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
394      }
395      
396      // This method escapes commas in the glob pattern of the given paths.
397      private static String[] getPathStrings(String commaSeparatedPaths) {
398        int length = commaSeparatedPaths.length();
399        int curlyOpen = 0;
400        int pathStart = 0;
401        boolean globPattern = false;
402        List<String> pathStrings = new ArrayList<String>();
403        
404        for (int i=0; i<length; i++) {
405          char ch = commaSeparatedPaths.charAt(i);
406          switch(ch) {
407            case '{' : {
408              curlyOpen++;
409              if (!globPattern) {
410                globPattern = true;
411              }
412              break;
413            }
414            case '}' : {
415              curlyOpen--;
416              if (curlyOpen == 0 && globPattern) {
417                globPattern = false;
418              }
419              break;
420            }
421            case ',' : {
422              if (!globPattern) {
423                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
424                pathStart = i + 1 ;
425              }
426              break;
427            }
428          }
429        }
430        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
431        
432        return pathStrings.toArray(new String[0]);
433      }
434      
435      /**
436       * Get the list of input {@link Path}s for the map-reduce job.
437       * 
438       * @param context The job
439       * @return the list of input {@link Path}s for the map-reduce job.
440       */
441      public static Path[] getInputPaths(JobContext context) {
442        String dirs = context.getConfiguration().get(INPUT_DIR, "");
443        String [] list = StringUtils.split(dirs);
444        Path[] result = new Path[list.length];
445        for (int i = 0; i < list.length; i++) {
446          result[i] = new Path(StringUtils.unEscapeString(list[i]));
447        }
448        return result;
449      }
450    
451    }