001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs;
019    
020    import java.io.Closeable;
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.net.URI;
024    import java.security.PrivilegedExceptionAction;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.EnumSet;
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.IdentityHashMap;
031    import java.util.Iterator;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.NoSuchElementException;
035    import java.util.ServiceLoader;
036    import java.util.Set;
037    import java.util.Stack;
038    import java.util.TreeSet;
039    import java.util.concurrent.atomic.AtomicInteger;
040    import java.util.concurrent.atomic.AtomicLong;
041    
042    import org.apache.commons.logging.Log;
043    import org.apache.commons.logging.LogFactory;
044    import org.apache.hadoop.classification.InterfaceAudience;
045    import org.apache.hadoop.classification.InterfaceStability;
046    import org.apache.hadoop.conf.Configuration;
047    import org.apache.hadoop.conf.Configured;
048    import org.apache.hadoop.fs.Options.Rename;
049    import org.apache.hadoop.fs.permission.FsPermission;
050    import org.apache.hadoop.io.MultipleIOException;
051    import org.apache.hadoop.net.NetUtils;
052    import org.apache.hadoop.security.Credentials;
053    import org.apache.hadoop.security.SecurityUtil;
054    import org.apache.hadoop.security.UserGroupInformation;
055    import org.apache.hadoop.security.token.Token;
056    import org.apache.hadoop.util.Progressable;
057    import org.apache.hadoop.util.ReflectionUtils;
058    import org.apache.hadoop.util.ShutdownHookManager;
059    
060    /****************************************************************
061     * An abstract base class for a fairly generic filesystem.  It
062     * may be implemented as a distributed filesystem, or as a "local"
063     * one that reflects the locally-connected disk.  The local version
064     * exists for small Hadoop instances and for testing.
065     *
066     * <p>
067     *
068     * All user code that may potentially use the Hadoop Distributed
069     * File System should be written to use a FileSystem object.  The
070     * Hadoop DFS is a multi-machine system that appears as a single
071     * disk.  It's useful because of its fault tolerance and potentially
072     * very large capacity.
073     * 
074     * <p>
075     * The local implementation is {@link LocalFileSystem} and distributed
076     * implementation is DistributedFileSystem.
077     *****************************************************************/
078    @InterfaceAudience.Public
079    @InterfaceStability.Stable
080    public abstract class FileSystem extends Configured implements Closeable {
081      public static final String FS_DEFAULT_NAME_KEY = 
082                       CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
083      public static final String DEFAULT_FS = 
084                       CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
085    
086      public static final Log LOG = LogFactory.getLog(FileSystem.class);
087    
088      /**
089       * Priority of the FileSystem shutdown hook.
090       */
091      public static final int SHUTDOWN_HOOK_PRIORITY = 10;
092    
093      /** FileSystem cache */
094      static final Cache CACHE = new Cache();
095    
096      /** The key this instance is stored under in the cache. */
097      private Cache.Key key;
098    
099      /** Recording statistics per a FileSystem class */
100      private static final Map<Class<? extends FileSystem>, Statistics> 
101        statisticsTable =
102          new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
103      
104      /**
105       * The statistics for this file system.
106       */
107      protected Statistics statistics;
108    
109      /**
110       * A cache of files that should be deleted when filsystem is closed
111       * or the JVM is exited.
112       */
113      private Set<Path> deleteOnExit = new TreeSet<Path>();
114      
115      /**
116       * This method adds a file system for testing so that we can find it later. It
117       * is only for testing.
118       * @param uri the uri to store it under
119       * @param conf the configuration to store it under
120       * @param fs the file system to store
121       * @throws IOException
122       */
123      static void addFileSystemForTesting(URI uri, Configuration conf,
124          FileSystem fs) throws IOException {
125        CACHE.map.put(new Cache.Key(uri, conf), fs);
126      }
127    
128      /**
129       * Get a filesystem instance based on the uri, the passed
130       * configuration and the user
131       * @param uri of the filesystem
132       * @param conf the configuration to use
133       * @param user to perform the get as
134       * @return the filesystem instance
135       * @throws IOException
136       * @throws InterruptedException
137       */
138      public static FileSystem get(final URI uri, final Configuration conf,
139            final String user) throws IOException, InterruptedException {
140        String ticketCachePath =
141          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
142        UserGroupInformation ugi =
143            UserGroupInformation.getBestUGI(ticketCachePath, user);
144        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
145          public FileSystem run() throws IOException {
146            return get(uri, conf);
147          }
148        });
149      }
150    
151      /**
152       * Returns the configured filesystem implementation.
153       * @param conf the configuration to use
154       */
155      public static FileSystem get(Configuration conf) throws IOException {
156        return get(getDefaultUri(conf), conf);
157      }
158      
159      /** Get the default filesystem URI from a configuration.
160       * @param conf the configuration to use
161       * @return the uri of the default filesystem
162       */
163      public static URI getDefaultUri(Configuration conf) {
164        return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
165      }
166    
167      /** Set the default filesystem URI in a configuration.
168       * @param conf the configuration to alter
169       * @param uri the new default filesystem uri
170       */
171      public static void setDefaultUri(Configuration conf, URI uri) {
172        conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
173      }
174    
175      /** Set the default filesystem URI in a configuration.
176       * @param conf the configuration to alter
177       * @param uri the new default filesystem uri
178       */
179      public static void setDefaultUri(Configuration conf, String uri) {
180        setDefaultUri(conf, URI.create(fixName(uri)));
181      }
182    
183      /** Called after a new FileSystem instance is constructed.
184       * @param name a uri whose authority section names the host, port, etc.
185       *   for this FileSystem
186       * @param conf the configuration
187       */
188      public void initialize(URI name, Configuration conf) throws IOException {
189        statistics = getStatistics(name.getScheme(), getClass());    
190      }
191    
192      /**
193       * Return the protocol scheme for the FileSystem.
194       * <p/>
195       * This implementation throws an <code>UnsupportedOperationException</code>.
196       *
197       * @return the protocol scheme for the FileSystem.
198       */
199      public String getScheme() {
200        throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
201      }
202    
203      /** Returns a URI whose scheme and authority identify this FileSystem.*/
204      public abstract URI getUri();
205      
206      /**
207       * Resolve the uri's hostname and add the default port if not in the uri
208       * @return URI
209       * @see NetUtils#getCanonicalUri(URI, int)
210       */
211      protected URI getCanonicalUri() {
212        return NetUtils.getCanonicalUri(getUri(), getDefaultPort());
213      }
214      
215      /**
216       * Get the default port for this file system.
217       * @return the default port or 0 if there isn't one
218       */
219      protected int getDefaultPort() {
220        return 0;
221      }
222    
223      /**
224       * Get a canonical service name for this file system.  The token cache is
225       * the only user of this value, and uses it to lookup this filesystem's
226       * service tokens.  The token cache will not attempt to acquire tokens if the
227       * service is null.
228       * @return a service string that uniquely identifies this file system, null
229       *         if the filesystem does not implement tokens
230       * @see SecurityUtil#buildDTServiceName(URI, int) 
231       */
232      public String getCanonicalServiceName() {
233        return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
234      }
235    
236      /** @deprecated call #getUri() instead.*/
237      @Deprecated
238      public String getName() { return getUri().toString(); }
239    
240      /** @deprecated call #get(URI,Configuration) instead. */
241      @Deprecated
242      public static FileSystem getNamed(String name, Configuration conf)
243        throws IOException {
244        return get(URI.create(fixName(name)), conf);
245      }
246      
247      /** Update old-format filesystem names, for back-compatibility.  This should
248       * eventually be replaced with a checkName() method that throws an exception
249       * for old-format names. */ 
250      private static String fixName(String name) {
251        // convert old-format name to new-format name
252        if (name.equals("local")) {         // "local" is now "file:///".
253          LOG.warn("\"local\" is a deprecated filesystem name."
254                   +" Use \"file:///\" instead.");
255          name = "file:///";
256        } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
257          LOG.warn("\""+name+"\" is a deprecated filesystem name."
258                   +" Use \"hdfs://"+name+"/\" instead.");
259          name = "hdfs://"+name;
260        }
261        return name;
262      }
263    
264      /**
265       * Get the local file system.
266       * @param conf the configuration to configure the file system with
267       * @return a LocalFileSystem
268       */
269      public static LocalFileSystem getLocal(Configuration conf)
270        throws IOException {
271        return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
272      }
273    
274      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
275       * of the URI determines a configuration property name,
276       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
277       * The entire URI is passed to the FileSystem instance's initialize method.
278       */
279      public static FileSystem get(URI uri, Configuration conf) throws IOException {
280        String scheme = uri.getScheme();
281        String authority = uri.getAuthority();
282    
283        if (scheme == null && authority == null) {     // use default FS
284          return get(conf);
285        }
286    
287        if (scheme != null && authority == null) {     // no authority
288          URI defaultUri = getDefaultUri(conf);
289          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
290              && defaultUri.getAuthority() != null) {  // & default has authority
291            return get(defaultUri, conf);              // return default
292          }
293        }
294        
295        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
296        if (conf.getBoolean(disableCacheName, false)) {
297          return createFileSystem(uri, conf);
298        }
299    
300        return CACHE.get(uri, conf);
301      }
302    
303      /**
304       * Returns the FileSystem for this URI's scheme and authority and the 
305       * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
306       * @param uri of the filesystem
307       * @param conf the configuration to use
308       * @param user to perform the get as
309       * @return filesystem instance
310       * @throws IOException
311       * @throws InterruptedException
312       */
313      public static FileSystem newInstance(final URI uri, final Configuration conf,
314          final String user) throws IOException, InterruptedException {
315        String ticketCachePath =
316          conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
317        UserGroupInformation ugi =
318            UserGroupInformation.getBestUGI(ticketCachePath, user);
319        return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
320          public FileSystem run() throws IOException {
321            return newInstance(uri,conf); 
322          }
323        });
324      }
325      /** Returns the FileSystem for this URI's scheme and authority.  The scheme
326       * of the URI determines a configuration property name,
327       * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
328       * The entire URI is passed to the FileSystem instance's initialize method.
329       * This always returns a new FileSystem object.
330       */
331      public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
332        String scheme = uri.getScheme();
333        String authority = uri.getAuthority();
334    
335        if (scheme == null) {                       // no scheme: use default FS
336          return newInstance(conf);
337        }
338    
339        if (authority == null) {                       // no authority
340          URI defaultUri = getDefaultUri(conf);
341          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
342              && defaultUri.getAuthority() != null) {  // & default has authority
343            return newInstance(defaultUri, conf);              // return default
344          }
345        }
346        return CACHE.getUnique(uri, conf);
347      }
348    
349      /** Returns a unique configured filesystem implementation.
350       * This always returns a new FileSystem object.
351       * @param conf the configuration to use
352       */
353      public static FileSystem newInstance(Configuration conf) throws IOException {
354        return newInstance(getDefaultUri(conf), conf);
355      }
356    
357      /**
358       * Get a unique local file system object
359       * @param conf the configuration to configure the file system with
360       * @return a LocalFileSystem
361       * This always returns a new FileSystem object.
362       */
363      public static LocalFileSystem newInstanceLocal(Configuration conf)
364        throws IOException {
365        return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
366      }
367    
368      /**
369       * Close all cached filesystems. Be sure those filesystems are not
370       * used anymore.
371       * 
372       * @throws IOException
373       */
374      public static void closeAll() throws IOException {
375        CACHE.closeAll();
376      }
377    
378      /**
379       * Close all cached filesystems for a given UGI. Be sure those filesystems 
380       * are not used anymore.
381       * @param ugi user group info to close
382       * @throws IOException
383       */
384      public static void closeAllForUGI(UserGroupInformation ugi) 
385      throws IOException {
386        CACHE.closeAll(ugi);
387      }
388    
389      /** 
390       * Make sure that a path specifies a FileSystem.
391       * @param path to use
392       */
393      public Path makeQualified(Path path) {
394        checkPath(path);
395        return path.makeQualified(this.getUri(), this.getWorkingDirectory());
396      }
397        
398      /**
399       * Deprecated  - use @link {@link #getDelegationTokens(String)}
400       * Get a new delegation token for this file system.
401       * @param renewer the account name that is allowed to renew the token.
402       * @return a new delegation token
403       * @throws IOException
404       */
405      @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
406      @Deprecated
407      public Token<?> getDelegationToken(String renewer) throws IOException {
408        return null;
409      }
410      
411      /**
412       * Get one or more delegation tokens associated with the filesystem. Normally
413       * a file system returns a single delegation token. A file system that manages
414       * multiple file systems underneath, could return set of delegation tokens for
415       * all the file systems it manages.
416       * 
417       * @param renewer the account name that is allowed to renew the token.
418       * @return list of new delegation tokens
419       *    If delegation tokens not supported then return a list of size zero.
420       * @throws IOException
421       */
422      @InterfaceAudience.LimitedPrivate( { "HDFS", "MapReduce" })
423      public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
424        return new ArrayList<Token<?>>(0);
425      }
426      
427      /**
428       * @see #getDelegationTokens(String)
429       * This is similar to getDelegationTokens, with the added restriction that if
430       * a token is already present in the passed Credentials object - that token
431       * is returned instead of a new delegation token. 
432       * 
433       * If the token is found to be cached in the Credentials object, this API does
434       * not verify the token validity or the passed in renewer. 
435       * 
436       * 
437       * @param renewer the account name that is allowed to renew the token.
438       * @param credentials a Credentials object containing already knowing 
439       *   delegationTokens.
440       * @return a list of delegation tokens.
441       * @throws IOException
442       */
443      @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
444      public List<Token<?>> getDelegationTokens(String renewer,
445          Credentials credentials) throws IOException {
446        List<Token<?>> allTokens = getDelegationTokens(renewer);
447        List<Token<?>> newTokens = new ArrayList<Token<?>>();
448        if (allTokens != null) {
449          for (Token<?> token : allTokens) {
450            Token<?> knownToken = credentials.getToken(token.getService());
451            if (knownToken == null) {
452              newTokens.add(token);
453            } else {
454              newTokens.add(knownToken);
455            }
456          }
457        }
458        return newTokens;
459      }
460    
461      /** create a file with the provided permission
462       * The permission of the file is set to be the provided permission as in
463       * setPermission, not permission&~umask
464       * 
465       * It is implemented using two RPCs. It is understood that it is inefficient,
466       * but the implementation is thread-safe. The other option is to change the
467       * value of umask in configuration to be 0, but it is not thread-safe.
468       * 
469       * @param fs file system handle
470       * @param file the name of the file to be created
471       * @param permission the permission of the file
472       * @return an output stream
473       * @throws IOException
474       */
475      public static FSDataOutputStream create(FileSystem fs,
476          Path file, FsPermission permission) throws IOException {
477        // create the file with default permission
478        FSDataOutputStream out = fs.create(file);
479        // set its permission to the supplied one
480        fs.setPermission(file, permission);
481        return out;
482      }
483    
484      /** create a directory with the provided permission
485       * The permission of the directory is set to be the provided permission as in
486       * setPermission, not permission&~umask
487       * 
488       * @see #create(FileSystem, Path, FsPermission)
489       * 
490       * @param fs file system handle
491       * @param dir the name of the directory to be created
492       * @param permission the permission of the directory
493       * @return true if the directory creation succeeds; false otherwise
494       * @throws IOException
495       */
496      public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
497      throws IOException {
498        // create the directory using the default permission
499        boolean result = fs.mkdirs(dir);
500        // set its permission to be the supplied one
501        fs.setPermission(dir, permission);
502        return result;
503      }
504    
505      ///////////////////////////////////////////////////////////////
506      // FileSystem
507      ///////////////////////////////////////////////////////////////
508    
509      protected FileSystem() {
510        super(null);
511      }
512    
513      /** 
514       * Check that a Path belongs to this FileSystem.
515       * @param path to check
516       */
517      protected void checkPath(Path path) {
518        URI uri = path.toUri();
519        String thatScheme = uri.getScheme();
520        if (thatScheme == null)                // fs is relative
521          return;
522        URI thisUri = getCanonicalUri();
523        String thisScheme = thisUri.getScheme();
524        //authority and scheme are not case sensitive
525        if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
526          String thisAuthority = thisUri.getAuthority();
527          String thatAuthority = uri.getAuthority();
528          if (thatAuthority == null &&                // path's authority is null
529              thisAuthority != null) {                // fs has an authority
530            URI defaultUri = getDefaultUri(getConf());
531            if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
532              uri = defaultUri; // schemes match, so use this uri instead
533            } else {
534              uri = null; // can't determine auth of the path
535            }
536          }
537          if (uri != null) {
538            // canonicalize uri before comparing with this fs
539            uri = NetUtils.getCanonicalUri(uri, getDefaultPort());
540            thatAuthority = uri.getAuthority();
541            if (thisAuthority == thatAuthority ||       // authorities match
542                (thisAuthority != null &&
543                 thisAuthority.equalsIgnoreCase(thatAuthority)))
544              return;
545          }
546        }
547        throw new IllegalArgumentException("Wrong FS: "+path+
548                                           ", expected: "+this.getUri());
549      }
550    
551      /**
552       * Return an array containing hostnames, offset and size of 
553       * portions of the given file.  For a nonexistent 
554       * file or regions, null will be returned.
555       *
556       * This call is most helpful with DFS, where it returns 
557       * hostnames of machines that contain the given file.
558       *
559       * The FileSystem will simply return an elt containing 'localhost'.
560       *
561       * @param file FilesStatus to get data from
562       * @param start offset into the given file
563       * @param len length for which to get locations for
564       */
565      public BlockLocation[] getFileBlockLocations(FileStatus file, 
566          long start, long len) throws IOException {
567        if (file == null) {
568          return null;
569        }
570    
571        if (start < 0 || len < 0) {
572          throw new IllegalArgumentException("Invalid start or len parameter");
573        }
574    
575        if (file.getLen() <= start) {
576          return new BlockLocation[0];
577    
578        }
579        String[] name = { "localhost:50010" };
580        String[] host = { "localhost" };
581        return new BlockLocation[] {
582          new BlockLocation(name, host, 0, file.getLen()) };
583      }
584     
585    
586      /**
587       * Return an array containing hostnames, offset and size of 
588       * portions of the given file.  For a nonexistent 
589       * file or regions, null will be returned.
590       *
591       * This call is most helpful with DFS, where it returns 
592       * hostnames of machines that contain the given file.
593       *
594       * The FileSystem will simply return an elt containing 'localhost'.
595       *
596       * @param p path is used to identify an FS since an FS could have
597       *          another FS that it could be delegating the call to
598       * @param start offset into the given file
599       * @param len length for which to get locations for
600       */
601      public BlockLocation[] getFileBlockLocations(Path p, 
602          long start, long len) throws IOException {
603        if (p == null) {
604          throw new NullPointerException();
605        }
606        FileStatus file = getFileStatus(p);
607        return getFileBlockLocations(file, start, len);
608      }
609      
610      /**
611       * Return a set of server default configuration values
612       * @return server default configuration values
613       * @throws IOException
614       * @deprecated use {@link #getServerDefaults(Path)} instead
615       */
616      @Deprecated
617      public FsServerDefaults getServerDefaults() throws IOException {
618        Configuration conf = getConf();
619        return new FsServerDefaults(getDefaultBlockSize(), 
620            conf.getInt("io.bytes.per.checksum", 512), 
621            64 * 1024, 
622            getDefaultReplication(),
623            conf.getInt("io.file.buffer.size", 4096),
624            false,
625            // NB: ignoring the client trash configuration
626            CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT);
627      }
628    
629      /**
630       * Return a set of server default configuration values
631       * @param p path is used to identify an FS since an FS could have
632       *          another FS that it could be delegating the call to
633       * @return server default configuration values
634       * @throws IOException
635       */
636      public FsServerDefaults getServerDefaults(Path p) throws IOException {
637        return getServerDefaults();
638      }
639    
640      /**
641       * Return the fully-qualified path of path f resolving the path
642       * through any symlinks or mount point
643       * @param p path to be resolved
644       * @return fully qualified path 
645       * @throws FileNotFoundException
646       */
647       public Path resolvePath(final Path p) throws IOException {
648         checkPath(p);
649         return getFileStatus(p).getPath();
650       }
651    
652      /**
653       * Opens an FSDataInputStream at the indicated Path.
654       * @param f the file name to open
655       * @param bufferSize the size of the buffer to be used.
656       */
657      public abstract FSDataInputStream open(Path f, int bufferSize)
658        throws IOException;
659        
660      /**
661       * Opens an FSDataInputStream at the indicated Path.
662       * @param f the file to open
663       */
664      public FSDataInputStream open(Path f) throws IOException {
665        return open(f, getConf().getInt("io.file.buffer.size", 4096));
666      }
667    
668      /**
669       * Create an FSDataOutputStream at the indicated Path.
670       * Files are overwritten by default.
671       * @param f the file to create
672       */
673      public FSDataOutputStream create(Path f) throws IOException {
674        return create(f, true);
675      }
676    
677      /**
678       * Create an FSDataOutputStream at the indicated Path.
679       * @param f the file to create
680       * @param overwrite if a file with this name already exists, then if true,
681       *   the file will be overwritten, and if false an exception will be thrown.
682       */
683      public FSDataOutputStream create(Path f, boolean overwrite)
684          throws IOException {
685        return create(f, overwrite, 
686                      getConf().getInt("io.file.buffer.size", 4096),
687                      getDefaultReplication(f),
688                      getDefaultBlockSize(f));
689      }
690    
691      /**
692       * Create an FSDataOutputStream at the indicated Path with write-progress
693       * reporting.
694       * Files are overwritten by default.
695       * @param f the file to create
696       * @param progress to report progress
697       */
698      public FSDataOutputStream create(Path f, Progressable progress) 
699          throws IOException {
700        return create(f, true, 
701                      getConf().getInt("io.file.buffer.size", 4096),
702                      getDefaultReplication(f),
703                      getDefaultBlockSize(f), progress);
704      }
705    
706      /**
707       * Create an FSDataOutputStream at the indicated Path.
708       * Files are overwritten by default.
709       * @param f the file to create
710       * @param replication the replication factor
711       */
712      public FSDataOutputStream create(Path f, short replication)
713          throws IOException {
714        return create(f, true, 
715                      getConf().getInt("io.file.buffer.size", 4096),
716                      replication,
717                      getDefaultBlockSize(f));
718      }
719    
720      /**
721       * Create an FSDataOutputStream at the indicated Path with write-progress
722       * reporting.
723       * Files are overwritten by default.
724       * @param f the file to create
725       * @param replication the replication factor
726       * @param progress to report progress
727       */
728      public FSDataOutputStream create(Path f, short replication, 
729          Progressable progress) throws IOException {
730        return create(f, true, 
731                      getConf().getInt("io.file.buffer.size", 4096),
732                      replication,
733                      getDefaultBlockSize(f), progress);
734      }
735    
736        
737      /**
738       * Create an FSDataOutputStream at the indicated Path.
739       * @param f the file name to create
740       * @param overwrite if a file with this name already exists, then if true,
741       *   the file will be overwritten, and if false an error will be thrown.
742       * @param bufferSize the size of the buffer to be used.
743       */
744      public FSDataOutputStream create(Path f, 
745                                       boolean overwrite,
746                                       int bufferSize
747                                       ) throws IOException {
748        return create(f, overwrite, bufferSize, 
749                      getDefaultReplication(f),
750                      getDefaultBlockSize(f));
751      }
752        
753      /**
754       * Create an FSDataOutputStream at the indicated Path with write-progress
755       * reporting.
756       * @param f the path of the file to open
757       * @param overwrite if a file with this name already exists, then if true,
758       *   the file will be overwritten, and if false an error will be thrown.
759       * @param bufferSize the size of the buffer to be used.
760       */
761      public FSDataOutputStream create(Path f, 
762                                       boolean overwrite,
763                                       int bufferSize,
764                                       Progressable progress
765                                       ) throws IOException {
766        return create(f, overwrite, bufferSize, 
767                      getDefaultReplication(f),
768                      getDefaultBlockSize(f), progress);
769      }
770        
771        
772      /**
773       * Create an FSDataOutputStream at the indicated Path.
774       * @param f the file name to open
775       * @param overwrite if a file with this name already exists, then if true,
776       *   the file will be overwritten, and if false an error will be thrown.
777       * @param bufferSize the size of the buffer to be used.
778       * @param replication required block replication for the file. 
779       */
780      public FSDataOutputStream create(Path f, 
781                                       boolean overwrite,
782                                       int bufferSize,
783                                       short replication,
784                                       long blockSize
785                                       ) throws IOException {
786        return create(f, overwrite, bufferSize, replication, blockSize, null);
787      }
788    
789      /**
790       * Create an FSDataOutputStream at the indicated Path with write-progress
791       * reporting.
792       * @param f the file name to open
793       * @param overwrite if a file with this name already exists, then if true,
794       *   the file will be overwritten, and if false an error will be thrown.
795       * @param bufferSize the size of the buffer to be used.
796       * @param replication required block replication for the file. 
797       */
798      public FSDataOutputStream create(Path f,
799                                                boolean overwrite,
800                                                int bufferSize,
801                                                short replication,
802                                                long blockSize,
803                                                Progressable progress
804                                                ) throws IOException {
805        return this.create(f, FsPermission.getDefault().applyUMask(
806            FsPermission.getUMask(getConf())), overwrite, bufferSize,
807            replication, blockSize, progress);
808      }
809    
810      /**
811       * Create an FSDataOutputStream at the indicated Path with write-progress
812       * reporting.
813       * @param f the file name to open
814       * @param permission
815       * @param overwrite if a file with this name already exists, then if true,
816       *   the file will be overwritten, and if false an error will be thrown.
817       * @param bufferSize the size of the buffer to be used.
818       * @param replication required block replication for the file.
819       * @param blockSize
820       * @param progress
821       * @throws IOException
822       * @see #setPermission(Path, FsPermission)
823       */
824      public abstract FSDataOutputStream create(Path f,
825          FsPermission permission,
826          boolean overwrite,
827          int bufferSize,
828          short replication,
829          long blockSize,
830          Progressable progress) throws IOException;
831      
832      /**
833       * Create an FSDataOutputStream at the indicated Path with write-progress
834       * reporting.
835       * @param f the file name to open
836       * @param permission
837       * @param flags {@link CreateFlag}s to use for this stream.
838       * @param bufferSize the size of the buffer to be used.
839       * @param replication required block replication for the file.
840       * @param blockSize
841       * @param progress
842       * @throws IOException
843       * @see #setPermission(Path, FsPermission)
844       */
845      public FSDataOutputStream create(Path f,
846          FsPermission permission,
847          EnumSet<CreateFlag> flags,
848          int bufferSize,
849          short replication,
850          long blockSize,
851          Progressable progress) throws IOException {
852        // only DFS support this
853        return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
854      }
855      
856      
857      /*.
858       * This create has been added to support the FileContext that processes
859       * the permission
860       * with umask before calling this method.
861       * This a temporary method added to support the transition from FileSystem
862       * to FileContext for user applications.
863       */
864      @Deprecated
865      protected FSDataOutputStream primitiveCreate(Path f,
866         FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
867         short replication, long blockSize, Progressable progress,
868         int bytesPerChecksum) throws IOException {
869    
870        boolean pathExists = exists(f);
871        CreateFlag.validate(f, pathExists, flag);
872        
873        // Default impl  assumes that permissions do not matter and 
874        // nor does the bytesPerChecksum  hence
875        // calling the regular create is good enough.
876        // FSs that implement permissions should override this.
877    
878        if (pathExists && flag.contains(CreateFlag.APPEND)) {
879          return append(f, bufferSize, progress);
880        }
881        
882        return this.create(f, absolutePermission,
883            flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
884            blockSize, progress);
885      }
886      
887      /**
888       * This version of the mkdirs method assumes that the permission is absolute.
889       * It has been added to support the FileContext that processes the permission
890       * with umask before calling this method.
891       * This a temporary method added to support the transition from FileSystem
892       * to FileContext for user applications.
893       */
894      @Deprecated
895      protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
896        throws IOException {
897        // Default impl is to assume that permissions do not matter and hence
898        // calling the regular mkdirs is good enough.
899        // FSs that implement permissions should override this.
900       return this.mkdirs(f, absolutePermission);
901      }
902    
903    
904      /**
905       * This version of the mkdirs method assumes that the permission is absolute.
906       * It has been added to support the FileContext that processes the permission
907       * with umask before calling this method.
908       * This a temporary method added to support the transition from FileSystem
909       * to FileContext for user applications.
910       */
911      @Deprecated
912      protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
913                        boolean createParent)
914        throws IOException {
915        
916        if (!createParent) { // parent must exist.
917          // since the this.mkdirs makes parent dirs automatically
918          // we must throw exception if parent does not exist.
919          final FileStatus stat = getFileStatus(f.getParent());
920          if (stat == null) {
921            throw new FileNotFoundException("Missing parent:" + f);
922          }
923          if (!stat.isDirectory()) {
924            throw new ParentNotDirectoryException("parent is not a dir");
925          }
926          // parent does exist - go ahead with mkdir of leaf
927        }
928        // Default impl is to assume that permissions do not matter and hence
929        // calling the regular mkdirs is good enough.
930        // FSs that implement permissions should override this.
931        if (!this.mkdirs(f, absolutePermission)) {
932          throw new IOException("mkdir of "+ f + " failed");
933        }
934      }
935    
936      /**
937       * Opens an FSDataOutputStream at the indicated Path with write-progress
938       * reporting. Same as create(), except fails if parent directory doesn't
939       * already exist.
940       * @param f the file name to open
941       * @param overwrite if a file with this name already exists, then if true,
942       * the file will be overwritten, and if false an error will be thrown.
943       * @param bufferSize the size of the buffer to be used.
944       * @param replication required block replication for the file.
945       * @param blockSize
946       * @param progress
947       * @throws IOException
948       * @see #setPermission(Path, FsPermission)
949       * @deprecated API only for 0.20-append
950       */
951      @Deprecated
952      public FSDataOutputStream createNonRecursive(Path f,
953          boolean overwrite,
954          int bufferSize, short replication, long blockSize,
955          Progressable progress) throws IOException {
956        return this.createNonRecursive(f, FsPermission.getDefault(),
957            overwrite, bufferSize, replication, blockSize, progress);
958      }
959    
960      /**
961       * Opens an FSDataOutputStream at the indicated Path with write-progress
962       * reporting. Same as create(), except fails if parent directory doesn't
963       * already exist.
964       * @param f the file name to open
965       * @param permission
966       * @param overwrite if a file with this name already exists, then if true,
967       * the file will be overwritten, and if false an error will be thrown.
968       * @param bufferSize the size of the buffer to be used.
969       * @param replication required block replication for the file.
970       * @param blockSize
971       * @param progress
972       * @throws IOException
973       * @see #setPermission(Path, FsPermission)
974       * @deprecated API only for 0.20-append
975       */
976       @Deprecated
977       public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
978           boolean overwrite, int bufferSize, short replication, long blockSize,
979           Progressable progress) throws IOException {
980         return createNonRecursive(f, permission,
981             overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
982                 : EnumSet.of(CreateFlag.CREATE), bufferSize,
983                 replication, blockSize, progress);
984       }
985    
986       /**
987        * Opens an FSDataOutputStream at the indicated Path with write-progress
988        * reporting. Same as create(), except fails if parent directory doesn't
989        * already exist.
990        * @param f the file name to open
991        * @param permission
992        * @param flags {@link CreateFlag}s to use for this stream.
993        * @param bufferSize the size of the buffer to be used.
994        * @param replication required block replication for the file.
995        * @param blockSize
996        * @param progress
997        * @throws IOException
998        * @see #setPermission(Path, FsPermission)
999        * @deprecated API only for 0.20-append
1000        */
1001        @Deprecated
1002        public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1003            EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1004            Progressable progress) throws IOException {
1005          throw new IOException("createNonRecursive unsupported for this filesystem "
1006              + this.getClass());
1007        }
1008    
1009      /**
1010       * Creates the given Path as a brand-new zero-length file.  If
1011       * create fails, or if it already existed, return false.
1012       *
1013       * @param f path to use for create
1014       */
1015      public boolean createNewFile(Path f) throws IOException {
1016        if (exists(f)) {
1017          return false;
1018        } else {
1019          create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1020          return true;
1021        }
1022      }
1023    
1024      /**
1025       * Append to an existing file (optional operation).
1026       * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1027       * @param f the existing file to be appended.
1028       * @throws IOException
1029       */
1030      public FSDataOutputStream append(Path f) throws IOException {
1031        return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1032      }
1033      /**
1034       * Append to an existing file (optional operation).
1035       * Same as append(f, bufferSize, null).
1036       * @param f the existing file to be appended.
1037       * @param bufferSize the size of the buffer to be used.
1038       * @throws IOException
1039       */
1040      public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1041        return append(f, bufferSize, null);
1042      }
1043    
1044      /**
1045       * Append to an existing file (optional operation).
1046       * @param f the existing file to be appended.
1047       * @param bufferSize the size of the buffer to be used.
1048       * @param progress for reporting progress if it is not null.
1049       * @throws IOException
1050       */
1051      public abstract FSDataOutputStream append(Path f, int bufferSize,
1052          Progressable progress) throws IOException;
1053    
1054     /**
1055       * Get replication.
1056       * 
1057       * @deprecated Use getFileStatus() instead
1058       * @param src file name
1059       * @return file replication
1060       * @throws IOException
1061       */ 
1062      @Deprecated
1063      public short getReplication(Path src) throws IOException {
1064        return getFileStatus(src).getReplication();
1065      }
1066    
1067      /**
1068       * Set replication for an existing file.
1069       * 
1070       * @param src file name
1071       * @param replication new replication
1072       * @throws IOException
1073       * @return true if successful;
1074       *         false if file does not exist or is a directory
1075       */
1076      public boolean setReplication(Path src, short replication)
1077        throws IOException {
1078        return true;
1079      }
1080    
1081      /**
1082       * Renames Path src to Path dst.  Can take place on local fs
1083       * or remote DFS.
1084       * @param src path to be renamed
1085       * @param dst new path after rename
1086       * @throws IOException on failure
1087       * @return true if rename is successful
1088       */
1089      public abstract boolean rename(Path src, Path dst) throws IOException;
1090    
1091      /**
1092       * Renames Path src to Path dst
1093       * <ul>
1094       * <li
1095       * <li>Fails if src is a file and dst is a directory.
1096       * <li>Fails if src is a directory and dst is a file.
1097       * <li>Fails if the parent of dst does not exist or is a file.
1098       * </ul>
1099       * <p>
1100       * If OVERWRITE option is not passed as an argument, rename fails
1101       * if the dst already exists.
1102       * <p>
1103       * If OVERWRITE option is passed as an argument, rename overwrites
1104       * the dst if it is a file or an empty directory. Rename fails if dst is
1105       * a non-empty directory.
1106       * <p>
1107       * Note that atomicity of rename is dependent on the file system
1108       * implementation. Please refer to the file system documentation for
1109       * details. This default implementation is non atomic.
1110       * <p>
1111       * This method is deprecated since it is a temporary method added to 
1112       * support the transition from FileSystem to FileContext for user 
1113       * applications.
1114       * 
1115       * @param src path to be renamed
1116       * @param dst new path after rename
1117       * @throws IOException on failure
1118       */
1119      @Deprecated
1120      protected void rename(final Path src, final Path dst,
1121          final Rename... options) throws IOException {
1122        // Default implementation
1123        final FileStatus srcStatus = getFileStatus(src);
1124        if (srcStatus == null) {
1125          throw new FileNotFoundException("rename source " + src + " not found.");
1126        }
1127    
1128        boolean overwrite = false;
1129        if (null != options) {
1130          for (Rename option : options) {
1131            if (option == Rename.OVERWRITE) {
1132              overwrite = true;
1133            }
1134          }
1135        }
1136    
1137        FileStatus dstStatus;
1138        try {
1139          dstStatus = getFileStatus(dst);
1140        } catch (IOException e) {
1141          dstStatus = null;
1142        }
1143        if (dstStatus != null) {
1144          if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1145            throw new IOException("Source " + src + " Destination " + dst
1146                + " both should be either file or directory");
1147          }
1148          if (!overwrite) {
1149            throw new FileAlreadyExistsException("rename destination " + dst
1150                + " already exists.");
1151          }
1152          // Delete the destination that is a file or an empty directory
1153          if (dstStatus.isDirectory()) {
1154            FileStatus[] list = listStatus(dst);
1155            if (list != null && list.length != 0) {
1156              throw new IOException(
1157                  "rename cannot overwrite non empty destination directory " + dst);
1158            }
1159          }
1160          delete(dst, false);
1161        } else {
1162          final Path parent = dst.getParent();
1163          final FileStatus parentStatus = getFileStatus(parent);
1164          if (parentStatus == null) {
1165            throw new FileNotFoundException("rename destination parent " + parent
1166                + " not found.");
1167          }
1168          if (!parentStatus.isDirectory()) {
1169            throw new ParentNotDirectoryException("rename destination parent " + parent
1170                + " is a file.");
1171          }
1172        }
1173        if (!rename(src, dst)) {
1174          throw new IOException("rename from " + src + " to " + dst + " failed.");
1175        }
1176      }
1177      
1178      /**
1179       * Delete a file 
1180       * @deprecated Use {@link #delete(Path, boolean)} instead.
1181       */
1182      @Deprecated
1183      public boolean delete(Path f) throws IOException {
1184        return delete(f, true);
1185      }
1186      
1187      /** Delete a file.
1188       *
1189       * @param f the path to delete.
1190       * @param recursive if path is a directory and set to 
1191       * true, the directory is deleted else throws an exception. In
1192       * case of a file the recursive can be set to either true or false. 
1193       * @return  true if delete is successful else false. 
1194       * @throws IOException
1195       */
1196      public abstract boolean delete(Path f, boolean recursive) throws IOException;
1197    
1198      /**
1199       * Mark a path to be deleted when FileSystem is closed.
1200       * When the JVM shuts down,
1201       * all FileSystem objects will be closed automatically.
1202       * Then,
1203       * the marked path will be deleted as a result of closing the FileSystem.
1204       *
1205       * The path has to exist in the file system.
1206       * 
1207       * @param f the path to delete.
1208       * @return  true if deleteOnExit is successful, otherwise false.
1209       * @throws IOException
1210       */
1211      public boolean deleteOnExit(Path f) throws IOException {
1212        if (!exists(f)) {
1213          return false;
1214        }
1215        synchronized (deleteOnExit) {
1216          deleteOnExit.add(f);
1217        }
1218        return true;
1219      }
1220      
1221      /**
1222       * Cancel the deletion of the path when the FileSystem is closed
1223       * @param f the path to cancel deletion
1224       */
1225      public boolean cancelDeleteOnExit(Path f) {
1226        synchronized (deleteOnExit) {
1227          return deleteOnExit.remove(f);
1228        }
1229      }
1230    
1231      /**
1232       * Delete all files that were marked as delete-on-exit. This recursively
1233       * deletes all files in the specified paths.
1234       */
1235      protected void processDeleteOnExit() {
1236        synchronized (deleteOnExit) {
1237          for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1238            Path path = iter.next();
1239            try {
1240              if (exists(path)) {
1241                delete(path, true);
1242              }
1243            }
1244            catch (IOException e) {
1245              LOG.info("Ignoring failure to deleteOnExit for path " + path);
1246            }
1247            iter.remove();
1248          }
1249        }
1250      }
1251      
1252      /** Check if exists.
1253       * @param f source file
1254       */
1255      public boolean exists(Path f) throws IOException {
1256        try {
1257          return getFileStatus(f) != null;
1258        } catch (FileNotFoundException e) {
1259          return false;
1260        }
1261      }
1262    
1263      /** True iff the named path is a directory.
1264       * Note: Avoid using this method. Instead reuse the FileStatus 
1265       * returned by getFileStatus() or listStatus() methods.
1266       * @param f path to check
1267       */
1268      public boolean isDirectory(Path f) throws IOException {
1269        try {
1270          return getFileStatus(f).isDirectory();
1271        } catch (FileNotFoundException e) {
1272          return false;               // f does not exist
1273        }
1274      }
1275    
1276      /** True iff the named path is a regular file.
1277       * Note: Avoid using this method. Instead reuse the FileStatus 
1278       * returned by getFileStatus() or listStatus() methods.
1279       * @param f path to check
1280       */
1281      public boolean isFile(Path f) throws IOException {
1282        try {
1283          return getFileStatus(f).isFile();
1284        } catch (FileNotFoundException e) {
1285          return false;               // f does not exist
1286        }
1287      }
1288      
1289      /** The number of bytes in a file. */
1290      /** @deprecated Use getFileStatus() instead */
1291      @Deprecated
1292      public long getLength(Path f) throws IOException {
1293        return getFileStatus(f).getLen();
1294      }
1295        
1296      /** Return the {@link ContentSummary} of a given {@link Path}.
1297      * @param f path to use
1298      */
1299      public ContentSummary getContentSummary(Path f) throws IOException {
1300        FileStatus status = getFileStatus(f);
1301        if (status.isFile()) {
1302          // f is a file
1303          return new ContentSummary(status.getLen(), 1, 0);
1304        }
1305        // f is a directory
1306        long[] summary = {0, 0, 1};
1307        for(FileStatus s : listStatus(f)) {
1308          ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1309                                         new ContentSummary(s.getLen(), 1, 0);
1310          summary[0] += c.getLength();
1311          summary[1] += c.getFileCount();
1312          summary[2] += c.getDirectoryCount();
1313        }
1314        return new ContentSummary(summary[0], summary[1], summary[2]);
1315      }
1316    
1317      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1318          public boolean accept(Path file) {
1319            return true;
1320          }     
1321        };
1322        
1323      /**
1324       * List the statuses of the files/directories in the given path if the path is
1325       * a directory.
1326       * 
1327       * @param f given path
1328       * @return the statuses of the files/directories in the given patch
1329       * @throws FileNotFoundException when the path does not exist;
1330       *         IOException see specific implementation
1331       */
1332      public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1333                                                             IOException;
1334        
1335      /*
1336       * Filter files/directories in the given path using the user-supplied path
1337       * filter. Results are added to the given array <code>results</code>.
1338       */
1339      private void listStatus(ArrayList<FileStatus> results, Path f,
1340          PathFilter filter) throws FileNotFoundException, IOException {
1341        FileStatus listing[] = listStatus(f);
1342        if (listing == null) {
1343          throw new IOException("Error accessing " + f);
1344        }
1345    
1346        for (int i = 0; i < listing.length; i++) {
1347          if (filter.accept(listing[i].getPath())) {
1348            results.add(listing[i]);
1349          }
1350        }
1351      }
1352    
1353      /**
1354       * @return an iterator over the corrupt files under the given path
1355       * (may contain duplicates if a file has more than one corrupt block)
1356       * @throws IOException
1357       */
1358      public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1359        throws IOException {
1360        throw new UnsupportedOperationException(getClass().getCanonicalName() +
1361                                                " does not support" +
1362                                                " listCorruptFileBlocks");
1363      }
1364    
1365      /**
1366       * Filter files/directories in the given path using the user-supplied path
1367       * filter.
1368       * 
1369       * @param f
1370       *          a path name
1371       * @param filter
1372       *          the user-supplied path filter
1373       * @return an array of FileStatus objects for the files under the given path
1374       *         after applying the filter
1375       * @throws FileNotFoundException when the path does not exist;
1376       *         IOException see specific implementation   
1377       */
1378      public FileStatus[] listStatus(Path f, PathFilter filter) 
1379                                       throws FileNotFoundException, IOException {
1380        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1381        listStatus(results, f, filter);
1382        return results.toArray(new FileStatus[results.size()]);
1383      }
1384    
1385      /**
1386       * Filter files/directories in the given list of paths using default
1387       * path filter.
1388       * 
1389       * @param files
1390       *          a list of paths
1391       * @return a list of statuses for the files under the given paths after
1392       *         applying the filter default Path filter
1393       * @throws FileNotFoundException when the path does not exist;
1394       *         IOException see specific implementation
1395       */
1396      public FileStatus[] listStatus(Path[] files)
1397          throws FileNotFoundException, IOException {
1398        return listStatus(files, DEFAULT_FILTER);
1399      }
1400    
1401      /**
1402       * Filter files/directories in the given list of paths using user-supplied
1403       * path filter.
1404       * 
1405       * @param files
1406       *          a list of paths
1407       * @param filter
1408       *          the user-supplied path filter
1409       * @return a list of statuses for the files under the given paths after
1410       *         applying the filter
1411       * @throws FileNotFoundException when the path does not exist;
1412       *         IOException see specific implementation
1413       */
1414      public FileStatus[] listStatus(Path[] files, PathFilter filter)
1415          throws FileNotFoundException, IOException {
1416        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1417        for (int i = 0; i < files.length; i++) {
1418          listStatus(results, files[i], filter);
1419        }
1420        return results.toArray(new FileStatus[results.size()]);
1421      }
1422    
1423      /**
1424       * <p>Return all the files that match filePattern and are not checksum
1425       * files. Results are sorted by their names.
1426       * 
1427       * <p>
1428       * A filename pattern is composed of <i>regular</i> characters and
1429       * <i>special pattern matching</i> characters, which are:
1430       *
1431       * <dl>
1432       *  <dd>
1433       *   <dl>
1434       *    <p>
1435       *    <dt> <tt> ? </tt>
1436       *    <dd> Matches any single character.
1437       *
1438       *    <p>
1439       *    <dt> <tt> * </tt>
1440       *    <dd> Matches zero or more characters.
1441       *
1442       *    <p>
1443       *    <dt> <tt> [<i>abc</i>] </tt>
1444       *    <dd> Matches a single character from character set
1445       *     <tt>{<i>a,b,c</i>}</tt>.
1446       *
1447       *    <p>
1448       *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1449       *    <dd> Matches a single character from the character range
1450       *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1451       *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1452       *
1453       *    <p>
1454       *    <dt> <tt> [^<i>a</i>] </tt>
1455       *    <dd> Matches a single character that is not from character set or range
1456       *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1457       *     immediately to the right of the opening bracket.
1458       *
1459       *    <p>
1460       *    <dt> <tt> \<i>c</i> </tt>
1461       *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1462       *
1463       *    <p>
1464       *    <dt> <tt> {ab,cd} </tt>
1465       *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1466       *    
1467       *    <p>
1468       *    <dt> <tt> {ab,c{de,fh}} </tt>
1469       *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1470       *
1471       *   </dl>
1472       *  </dd>
1473       * </dl>
1474       *
1475       * @param pathPattern a regular expression specifying a pth pattern
1476    
1477       * @return an array of paths that match the path pattern
1478       * @throws IOException
1479       */
1480      public FileStatus[] globStatus(Path pathPattern) throws IOException {
1481        return globStatus(pathPattern, DEFAULT_FILTER);
1482      }
1483      
1484      /**
1485       * Return an array of FileStatus objects whose path names match pathPattern
1486       * and is accepted by the user-supplied path filter. Results are sorted by
1487       * their path names.
1488       * Return null if pathPattern has no glob and the path does not exist.
1489       * Return an empty array if pathPattern has a glob and no path matches it. 
1490       * 
1491       * @param pathPattern
1492       *          a regular expression specifying the path pattern
1493       * @param filter
1494       *          a user-supplied path filter
1495       * @return an array of FileStatus objects
1496       * @throws IOException if any I/O error occurs when fetching file status
1497       */
1498      public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1499          throws IOException {
1500        String filename = pathPattern.toUri().getPath();
1501        List<String> filePatterns = GlobExpander.expand(filename);
1502        if (filePatterns.size() == 1) {
1503          return globStatusInternal(pathPattern, filter);
1504        } else {
1505          List<FileStatus> results = new ArrayList<FileStatus>();
1506          for (String filePattern : filePatterns) {
1507            FileStatus[] files = globStatusInternal(new Path(filePattern), filter);
1508            for (FileStatus file : files) {
1509              results.add(file);
1510            }
1511          }
1512          return results.toArray(new FileStatus[results.size()]);
1513        }
1514      }
1515    
1516      private FileStatus[] globStatusInternal(Path pathPattern, PathFilter filter)
1517          throws IOException {
1518        Path[] parents = new Path[1];
1519        int level = 0;
1520        String filename = pathPattern.toUri().getPath();
1521        
1522        // path has only zero component
1523        if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
1524          return getFileStatus(new Path[]{pathPattern});
1525        }
1526    
1527        // path has at least one component
1528        String[] components = filename.split(Path.SEPARATOR);
1529        // get the first component
1530        if (pathPattern.isAbsolute()) {
1531          parents[0] = new Path(Path.SEPARATOR);
1532          level = 1;
1533        } else {
1534          parents[0] = new Path(Path.CUR_DIR);
1535        }
1536    
1537        // glob the paths that match the parent path, i.e., [0, components.length-1]
1538        boolean[] hasGlob = new boolean[]{false};
1539        Path[] parentPaths = globPathsLevel(parents, components, level, hasGlob);
1540        FileStatus[] results;
1541        if (parentPaths == null || parentPaths.length == 0) {
1542          results = null;
1543        } else {
1544          // Now work on the last component of the path
1545          GlobFilter fp = new GlobFilter(components[components.length - 1], filter);
1546          if (fp.hasPattern()) { // last component has a pattern
1547            // list parent directories and then glob the results
1548            try {
1549              results = listStatus(parentPaths, fp);
1550            } catch (FileNotFoundException e) {
1551              results = null;
1552            }
1553            hasGlob[0] = true;
1554          } else { // last component does not have a pattern
1555            // remove the quoting of metachars in a non-regexp expansion
1556            String name = unquotePathComponent(components[components.length - 1]);
1557            // get all the path names
1558            ArrayList<Path> filteredPaths = new ArrayList<Path>(parentPaths.length);
1559            for (int i = 0; i < parentPaths.length; i++) {
1560              parentPaths[i] = new Path(parentPaths[i], name);
1561              if (fp.accept(parentPaths[i])) {
1562                filteredPaths.add(parentPaths[i]);
1563              }
1564            }
1565            // get all their statuses
1566            results = getFileStatus(
1567                filteredPaths.toArray(new Path[filteredPaths.size()]));
1568          }
1569        }
1570    
1571        // Decide if the pathPattern contains a glob or not
1572        if (results == null) {
1573          if (hasGlob[0]) {
1574            results = new FileStatus[0];
1575          }
1576        } else {
1577          if (results.length == 0 ) {
1578            if (!hasGlob[0]) {
1579              results = null;
1580            }
1581          } else {
1582            Arrays.sort(results);
1583          }
1584        }
1585        return results;
1586      }
1587    
1588      /*
1589       * For a path of N components, return a list of paths that match the
1590       * components [<code>level</code>, <code>N-1</code>].
1591       */
1592      private Path[] globPathsLevel(Path[] parents, String[] filePattern,
1593          int level, boolean[] hasGlob) throws IOException {
1594        if (level == filePattern.length - 1)
1595          return parents;
1596        if (parents == null || parents.length == 0) {
1597          return null;
1598        }
1599        GlobFilter fp = new GlobFilter(filePattern[level]);
1600        if (fp.hasPattern()) {
1601          try {
1602            parents = FileUtil.stat2Paths(listStatus(parents, fp));
1603          } catch (FileNotFoundException e) {
1604            parents = null;
1605          }
1606          hasGlob[0] = true;
1607        } else { // the component does not have a pattern
1608          // remove the quoting of metachars in a non-regexp expansion
1609          String name = unquotePathComponent(filePattern[level]);
1610          for (int i = 0; i < parents.length; i++) {
1611            parents[i] = new Path(parents[i], name);
1612          }
1613        }
1614        return globPathsLevel(parents, filePattern, level + 1, hasGlob);
1615      }
1616    
1617      /**
1618       * The glob filter builds a regexp per path component.  If the component
1619       * does not contain a shell metachar, then it falls back to appending the
1620       * raw string to the list of built up paths.  This raw path needs to have
1621       * the quoting removed.  Ie. convert all occurances of "\X" to "X"
1622       * @param name of the path component
1623       * @return the unquoted path component
1624       */
1625      private String unquotePathComponent(String name) {
1626        return name.replaceAll("\\\\(.)", "$1");
1627      }
1628      
1629      /**
1630       * List the statuses of the files/directories in the given path if the path is
1631       * a directory. 
1632       * Return the file's status and block locations If the path is a file.
1633       * 
1634       * If a returned status is a file, it contains the file's block locations.
1635       * 
1636       * @param f is the path
1637       *
1638       * @return an iterator that traverses statuses of the files/directories 
1639       *         in the given path
1640       *
1641       * @throws FileNotFoundException If <code>f</code> does not exist
1642       * @throws IOException If an I/O error occurred
1643       */
1644      public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1645      throws FileNotFoundException, IOException {
1646        return listLocatedStatus(f, DEFAULT_FILTER);
1647      }
1648    
1649      /**
1650       * Listing a directory
1651       * The returned results include its block location if it is a file
1652       * The results are filtered by the given path filter
1653       * @param f a path
1654       * @param filter a path filter
1655       * @return an iterator that traverses statuses of the files/directories 
1656       *         in the given path
1657       * @throws FileNotFoundException if <code>f</code> does not exist
1658       * @throws IOException if any I/O error occurred
1659       */
1660      protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1661          final PathFilter filter)
1662      throws FileNotFoundException, IOException {
1663        return new RemoteIterator<LocatedFileStatus>() {
1664          private final FileStatus[] stats = listStatus(f, filter);
1665          private int i = 0;
1666    
1667          @Override
1668          public boolean hasNext() {
1669            return i<stats.length;
1670          }
1671    
1672          @Override
1673          public LocatedFileStatus next() throws IOException {
1674            if (!hasNext()) {
1675              throw new NoSuchElementException("No more entry in " + f);
1676            }
1677            FileStatus result = stats[i++];
1678            BlockLocation[] locs = result.isFile() ?
1679                getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1680                null;
1681            return new LocatedFileStatus(result, locs);
1682          }
1683        };
1684      }
1685    
1686      /**
1687       * List the statuses and block locations of the files in the given path.
1688       * 
1689       * If the path is a directory, 
1690       *   if recursive is false, returns files in the directory;
1691       *   if recursive is true, return files in the subtree rooted at the path.
1692       * If the path is a file, return the file's status and block locations.
1693       * 
1694       * @param f is the path
1695       * @param recursive if the subdirectories need to be traversed recursively
1696       *
1697       * @return an iterator that traverses statuses of the files
1698       *
1699       * @throws FileNotFoundException when the path does not exist;
1700       *         IOException see specific implementation
1701       */
1702      public RemoteIterator<LocatedFileStatus> listFiles(
1703          final Path f, final boolean recursive)
1704      throws FileNotFoundException, IOException {
1705        return new RemoteIterator<LocatedFileStatus>() {
1706          private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1707            new Stack<RemoteIterator<LocatedFileStatus>>();
1708          private RemoteIterator<LocatedFileStatus> curItor =
1709            listLocatedStatus(f);
1710          private LocatedFileStatus curFile;
1711         
1712          @Override
1713          public boolean hasNext() throws IOException {
1714            while (curFile == null) {
1715              if (curItor.hasNext()) {
1716                handleFileStat(curItor.next());
1717              } else if (!itors.empty()) {
1718                curItor = itors.pop();
1719              } else {
1720                return false;
1721              }
1722            }
1723            return true;
1724          }
1725    
1726          /**
1727           * Process the input stat.
1728           * If it is a file, return the file stat.
1729           * If it is a directory, traverse the directory if recursive is true;
1730           * ignore it if recursive is false.
1731           * @param stat input status
1732           * @throws IOException if any IO error occurs
1733           */
1734          private void handleFileStat(LocatedFileStatus stat) throws IOException {
1735            if (stat.isFile()) { // file
1736              curFile = stat;
1737            } else if (recursive) { // directory
1738              itors.push(curItor);
1739              curItor = listLocatedStatus(stat.getPath());
1740            }
1741          }
1742    
1743          @Override
1744          public LocatedFileStatus next() throws IOException {
1745            if (hasNext()) {
1746              LocatedFileStatus result = curFile;
1747              curFile = null;
1748              return result;
1749            } 
1750            throw new java.util.NoSuchElementException("No more entry in " + f);
1751          }
1752        };
1753      }
1754      
1755      /** Return the current user's home directory in this filesystem.
1756       * The default implementation returns "/user/$USER/".
1757       */
1758      public Path getHomeDirectory() {
1759        return this.makeQualified(
1760            new Path("/user/"+System.getProperty("user.name")));
1761      }
1762    
1763    
1764      /**
1765       * Set the current working directory for the given file system. All relative
1766       * paths will be resolved relative to it.
1767       * 
1768       * @param new_dir
1769       */
1770      public abstract void setWorkingDirectory(Path new_dir);
1771        
1772      /**
1773       * Get the current working directory for the given file system
1774       * @return the directory pathname
1775       */
1776      public abstract Path getWorkingDirectory();
1777      
1778      
1779      /**
1780       * Note: with the new FilesContext class, getWorkingDirectory()
1781       * will be removed. 
1782       * The working directory is implemented in FilesContext.
1783       * 
1784       * Some file systems like LocalFileSystem have an initial workingDir
1785       * that we use as the starting workingDir. For other file systems
1786       * like HDFS there is no built in notion of an inital workingDir.
1787       * 
1788       * @return if there is built in notion of workingDir then it
1789       * is returned; else a null is returned.
1790       */
1791      protected Path getInitialWorkingDirectory() {
1792        return null;
1793      }
1794    
1795      /**
1796       * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1797       */
1798      public boolean mkdirs(Path f) throws IOException {
1799        return mkdirs(f, FsPermission.getDefault());
1800      }
1801    
1802      /**
1803       * Make the given file and all non-existent parents into
1804       * directories. Has the semantics of Unix 'mkdir -p'.
1805       * Existence of the directory hierarchy is not an error.
1806       * @param f path to create
1807       * @param permission to apply to f
1808       */
1809      public abstract boolean mkdirs(Path f, FsPermission permission
1810          ) throws IOException;
1811    
1812      /**
1813       * The src file is on the local disk.  Add it to FS at
1814       * the given dst name and the source is kept intact afterwards
1815       * @param src path
1816       * @param dst path
1817       */
1818      public void copyFromLocalFile(Path src, Path dst)
1819        throws IOException {
1820        copyFromLocalFile(false, src, dst);
1821      }
1822    
1823      /**
1824       * The src files is on the local disk.  Add it to FS at
1825       * the given dst name, removing the source afterwards.
1826       * @param srcs path
1827       * @param dst path
1828       */
1829      public void moveFromLocalFile(Path[] srcs, Path dst)
1830        throws IOException {
1831        copyFromLocalFile(true, true, srcs, dst);
1832      }
1833    
1834      /**
1835       * The src file is on the local disk.  Add it to FS at
1836       * the given dst name, removing the source afterwards.
1837       * @param src path
1838       * @param dst path
1839       */
1840      public void moveFromLocalFile(Path src, Path dst)
1841        throws IOException {
1842        copyFromLocalFile(true, src, dst);
1843      }
1844    
1845      /**
1846       * The src file is on the local disk.  Add it to FS at
1847       * the given dst name.
1848       * delSrc indicates if the source should be removed
1849       * @param delSrc whether to delete the src
1850       * @param src path
1851       * @param dst path
1852       */
1853      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1854        throws IOException {
1855        copyFromLocalFile(delSrc, true, src, dst);
1856      }
1857      
1858      /**
1859       * The src files are on the local disk.  Add it to FS at
1860       * the given dst name.
1861       * delSrc indicates if the source should be removed
1862       * @param delSrc whether to delete the src
1863       * @param overwrite whether to overwrite an existing file
1864       * @param srcs array of paths which are source
1865       * @param dst path
1866       */
1867      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1868                                    Path[] srcs, Path dst)
1869        throws IOException {
1870        Configuration conf = getConf();
1871        FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1872      }
1873      
1874      /**
1875       * The src file is on the local disk.  Add it to FS at
1876       * the given dst name.
1877       * delSrc indicates if the source should be removed
1878       * @param delSrc whether to delete the src
1879       * @param overwrite whether to overwrite an existing file
1880       * @param src path
1881       * @param dst path
1882       */
1883      public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1884                                    Path src, Path dst)
1885        throws IOException {
1886        Configuration conf = getConf();
1887        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1888      }
1889        
1890      /**
1891       * The src file is under FS, and the dst is on the local disk.
1892       * Copy it from FS control to the local dst name.
1893       * @param src path
1894       * @param dst path
1895       */
1896      public void copyToLocalFile(Path src, Path dst) throws IOException {
1897        copyToLocalFile(false, src, dst);
1898      }
1899        
1900      /**
1901       * The src file is under FS, and the dst is on the local disk.
1902       * Copy it from FS control to the local dst name.
1903       * Remove the source afterwards
1904       * @param src path
1905       * @param dst path
1906       */
1907      public void moveToLocalFile(Path src, Path dst) throws IOException {
1908        copyToLocalFile(true, src, dst);
1909      }
1910    
1911      /**
1912       * The src file is under FS, and the dst is on the local disk.
1913       * Copy it from FS control to the local dst name.
1914       * delSrc indicates if the src will be removed or not.
1915       * @param delSrc whether to delete the src
1916       * @param src path
1917       * @param dst path
1918       */   
1919      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
1920        throws IOException {
1921        copyToLocalFile(delSrc, src, dst, false);
1922      }
1923      
1924        /**
1925       * The src file is under FS, and the dst is on the local disk. Copy it from FS
1926       * control to the local dst name. delSrc indicates if the src will be removed
1927       * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
1928       * as local file system or not. RawLocalFileSystem is non crc file system.So,
1929       * It will not create any crc files at local.
1930       * 
1931       * @param delSrc
1932       *          whether to delete the src
1933       * @param src
1934       *          path
1935       * @param dst
1936       *          path
1937       * @param useRawLocalFileSystem
1938       *          whether to use RawLocalFileSystem as local file system or not.
1939       * 
1940       * @throws IOException
1941       *           - if any IO error
1942       */
1943      public void copyToLocalFile(boolean delSrc, Path src, Path dst,
1944          boolean useRawLocalFileSystem) throws IOException {
1945        Configuration conf = getConf();
1946        FileSystem local = null;
1947        if (useRawLocalFileSystem) {
1948          local = getLocal(conf).getRawFileSystem();
1949        } else {
1950          local = getLocal(conf);
1951        }
1952        FileUtil.copy(this, src, local, dst, delSrc, conf);
1953      }
1954    
1955      /**
1956       * Returns a local File that the user can write output to.  The caller
1957       * provides both the eventual FS target name and the local working
1958       * file.  If the FS is local, we write directly into the target.  If
1959       * the FS is remote, we write into the tmp local area.
1960       * @param fsOutputFile path of output file
1961       * @param tmpLocalFile path of local tmp file
1962       */
1963      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1964        throws IOException {
1965        return tmpLocalFile;
1966      }
1967    
1968      /**
1969       * Called when we're all done writing to the target.  A local FS will
1970       * do nothing, because we've written to exactly the right place.  A remote
1971       * FS will copy the contents of tmpLocalFile to the correct target at
1972       * fsOutputFile.
1973       * @param fsOutputFile path of output file
1974       * @param tmpLocalFile path to local tmp file
1975       */
1976      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
1977        throws IOException {
1978        moveFromLocalFile(tmpLocalFile, fsOutputFile);
1979      }
1980    
1981      /**
1982       * No more filesystem operations are needed.  Will
1983       * release any held locks.
1984       */
1985      public void close() throws IOException {
1986        // delete all files that were marked as delete-on-exit.
1987        processDeleteOnExit();
1988        CACHE.remove(this.key, this);
1989      }
1990    
1991      /** Return the total size of all files in the filesystem.*/
1992      public long getUsed() throws IOException{
1993        long used = 0;
1994        FileStatus[] files = listStatus(new Path("/"));
1995        for(FileStatus file:files){
1996          used += file.getLen();
1997        }
1998        return used;
1999      }
2000      
2001      /**
2002       * Get the block size for a particular file.
2003       * @param f the filename
2004       * @return the number of bytes in a block
2005       */
2006      /** @deprecated Use getFileStatus() instead */
2007      @Deprecated
2008      public long getBlockSize(Path f) throws IOException {
2009        return getFileStatus(f).getBlockSize();
2010      }
2011    
2012      /**
2013       * Return the number of bytes that large input files should be optimally
2014       * be split into to minimize i/o time.
2015       * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2016       */
2017      @Deprecated
2018      public long getDefaultBlockSize() {
2019        // default to 32MB: large enough to minimize the impact of seeks
2020        return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2021      }
2022        
2023      /** Return the number of bytes that large input files should be optimally
2024       * be split into to minimize i/o time.  The given path will be used to
2025       * locate the actual filesystem.  The full path does not have to exist.
2026       * @param f path of file
2027       * @return the default block size for the path's filesystem
2028       */
2029      public long getDefaultBlockSize(Path f) {
2030        return getDefaultBlockSize();
2031      }
2032    
2033      /**
2034       * Get the default replication.
2035       * @deprecated use {@link #getDefaultReplication(Path)} instead
2036       */
2037      @Deprecated
2038      public short getDefaultReplication() { return 1; }
2039    
2040      /**
2041       * Get the default replication for a path.   The given path will be used to
2042       * locate the actual filesystem.  The full path does not have to exist.
2043       * @param path of the file
2044       * @return default replication for the path's filesystem 
2045       */
2046      public short getDefaultReplication(Path path) {
2047        return getDefaultReplication();
2048      }
2049      
2050      /**
2051       * Return a file status object that represents the path.
2052       * @param f The path we want information from
2053       * @return a FileStatus object
2054       * @throws FileNotFoundException when the path does not exist;
2055       *         IOException see specific implementation
2056       */
2057      public abstract FileStatus getFileStatus(Path f) throws IOException;
2058    
2059      /**
2060       * Get the checksum of a file.
2061       *
2062       * @param f The file path
2063       * @return The file checksum.  The default return value is null,
2064       *  which indicates that no checksum algorithm is implemented
2065       *  in the corresponding FileSystem.
2066       */
2067      public FileChecksum getFileChecksum(Path f) throws IOException {
2068        return null;
2069      }
2070      
2071      /**
2072       * Set the verify checksum flag. This is only applicable if the 
2073       * corresponding FileSystem supports checksum. By default doesn't do anything.
2074       * @param verifyChecksum
2075       */
2076      public void setVerifyChecksum(boolean verifyChecksum) {
2077        //doesn't do anything
2078      }
2079    
2080      /**
2081       * Set the write checksum flag. This is only applicable if the 
2082       * corresponding FileSystem supports checksum. By default doesn't do anything.
2083       * @param writeChecksum
2084       */
2085      public void setWriteChecksum(boolean writeChecksum) {
2086        //doesn't do anything
2087      }
2088    
2089      /**
2090       * Return a list of file status objects that corresponds to the list of paths
2091       * excluding those non-existent paths.
2092       * 
2093       * @param paths
2094       *          the list of paths we want information from
2095       * @return a list of FileStatus objects
2096       * @throws IOException
2097       *           see specific implementation
2098       */
2099      private FileStatus[] getFileStatus(Path[] paths) throws IOException {
2100        if (paths == null) {
2101          return null;
2102        }
2103        ArrayList<FileStatus> results = new ArrayList<FileStatus>(paths.length);
2104        for (int i = 0; i < paths.length; i++) {
2105          try {
2106            results.add(getFileStatus(paths[i]));
2107          } catch (FileNotFoundException e) { // do nothing
2108          }
2109        }
2110        return results.toArray(new FileStatus[results.size()]);
2111      }
2112      
2113      /**
2114       * Returns a status object describing the use and capacity of the
2115       * file system. If the file system has multiple partitions, the
2116       * use and capacity of the root partition is reflected.
2117       * 
2118       * @return a FsStatus object
2119       * @throws IOException
2120       *           see specific implementation
2121       */
2122      public FsStatus getStatus() throws IOException {
2123        return getStatus(null);
2124      }
2125    
2126      /**
2127       * Returns a status object describing the use and capacity of the
2128       * file system. If the file system has multiple partitions, the
2129       * use and capacity of the partition pointed to by the specified
2130       * path is reflected.
2131       * @param p Path for which status should be obtained. null means
2132       * the default partition. 
2133       * @return a FsStatus object
2134       * @throws IOException
2135       *           see specific implementation
2136       */
2137      public FsStatus getStatus(Path p) throws IOException {
2138        return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2139      }
2140    
2141      /**
2142       * Set permission of a path.
2143       * @param p
2144       * @param permission
2145       */
2146      public void setPermission(Path p, FsPermission permission
2147          ) throws IOException {
2148      }
2149    
2150      /**
2151       * Set owner of a path (i.e. a file or a directory).
2152       * The parameters username and groupname cannot both be null.
2153       * @param p The path
2154       * @param username If it is null, the original username remains unchanged.
2155       * @param groupname If it is null, the original groupname remains unchanged.
2156       */
2157      public void setOwner(Path p, String username, String groupname
2158          ) throws IOException {
2159      }
2160    
2161      /**
2162       * Set access time of a file
2163       * @param p The path
2164       * @param mtime Set the modification time of this file.
2165       *              The number of milliseconds since Jan 1, 1970. 
2166       *              A value of -1 means that this call should not set modification time.
2167       * @param atime Set the access time of this file.
2168       *              The number of milliseconds since Jan 1, 1970. 
2169       *              A value of -1 means that this call should not set access time.
2170       */
2171      public void setTimes(Path p, long mtime, long atime
2172          ) throws IOException {
2173      }
2174    
2175      // making it volatile to be able to do a double checked locking
2176      private volatile static boolean FILE_SYSTEMS_LOADED = false;
2177    
2178      private static final Map<String, Class<? extends FileSystem>>
2179        SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2180    
2181      private static void loadFileSystems() {
2182        synchronized (FileSystem.class) {
2183          if (!FILE_SYSTEMS_LOADED) {
2184            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2185            for (FileSystem fs : serviceLoader) {
2186              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2187            }
2188            FILE_SYSTEMS_LOADED = true;
2189          }
2190        }
2191      }
2192    
2193      public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2194          Configuration conf) throws IOException {
2195        if (!FILE_SYSTEMS_LOADED) {
2196          loadFileSystems();
2197        }
2198        Class<? extends FileSystem> clazz = null;
2199        if (conf != null) {
2200          clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2201        }
2202        if (clazz == null) {
2203          clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2204        }
2205        if (clazz == null) {
2206          throw new IOException("No FileSystem for scheme: " + scheme);
2207        }
2208        return clazz;
2209      }
2210    
2211      private static FileSystem createFileSystem(URI uri, Configuration conf
2212          ) throws IOException {
2213        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2214        if (clazz == null) {
2215          throw new IOException("No FileSystem for scheme: " + uri.getScheme());
2216        }
2217        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2218        fs.initialize(uri, conf);
2219        return fs;
2220      }
2221    
2222      /** Caching FileSystem objects */
2223      static class Cache {
2224        private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2225    
2226        private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2227        private final Set<Key> toAutoClose = new HashSet<Key>();
2228    
2229        /** A variable that makes all objects in the cache unique */
2230        private static AtomicLong unique = new AtomicLong(1);
2231    
2232        FileSystem get(URI uri, Configuration conf) throws IOException{
2233          Key key = new Key(uri, conf);
2234          return getInternal(uri, conf, key);
2235        }
2236    
2237        /** The objects inserted into the cache using this method are all unique */
2238        FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2239          Key key = new Key(uri, conf, unique.getAndIncrement());
2240          return getInternal(uri, conf, key);
2241        }
2242    
2243        private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2244          FileSystem fs;
2245          synchronized (this) {
2246            fs = map.get(key);
2247          }
2248          if (fs != null) {
2249            return fs;
2250          }
2251    
2252          fs = createFileSystem(uri, conf);
2253          synchronized (this) { // refetch the lock again
2254            FileSystem oldfs = map.get(key);
2255            if (oldfs != null) { // a file system is created while lock is releasing
2256              fs.close(); // close the new file system
2257              return oldfs;  // return the old file system
2258            }
2259            
2260            // now insert the new file system into the map
2261            if (map.isEmpty() ) {
2262              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2263            }
2264            fs.key = key;
2265            map.put(key, fs);
2266            if (conf.getBoolean("fs.automatic.close", true)) {
2267              toAutoClose.add(key);
2268            }
2269            return fs;
2270          }
2271        }
2272    
2273        synchronized void remove(Key key, FileSystem fs) {
2274          if (map.containsKey(key) && fs == map.get(key)) {
2275            map.remove(key);
2276            toAutoClose.remove(key);
2277            }
2278        }
2279    
2280        synchronized void closeAll() throws IOException {
2281          closeAll(false);
2282        }
2283    
2284        /**
2285         * Close all FileSystem instances in the Cache.
2286         * @param onlyAutomatic only close those that are marked for automatic closing
2287         */
2288        synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2289          List<IOException> exceptions = new ArrayList<IOException>();
2290    
2291          // Make a copy of the keys in the map since we'll be modifying
2292          // the map while iterating over it, which isn't safe.
2293          List<Key> keys = new ArrayList<Key>();
2294          keys.addAll(map.keySet());
2295    
2296          for (Key key : keys) {
2297            final FileSystem fs = map.get(key);
2298    
2299            if (onlyAutomatic && !toAutoClose.contains(key)) {
2300              continue;
2301            }
2302    
2303            //remove from cache
2304            remove(key, fs);
2305    
2306            if (fs != null) {
2307              try {
2308                fs.close();
2309              }
2310              catch(IOException ioe) {
2311                exceptions.add(ioe);
2312              }
2313            }
2314          }
2315    
2316          if (!exceptions.isEmpty()) {
2317            throw MultipleIOException.createIOException(exceptions);
2318          }
2319        }
2320    
2321        private class ClientFinalizer implements Runnable {
2322          public synchronized void run() {
2323            try {
2324              closeAll(true);
2325            } catch (IOException e) {
2326              LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2327            }
2328          }
2329        }
2330    
2331        synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2332          List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2333          //Make a pass over the list and collect the filesystems to close
2334          //we cannot close inline since close() removes the entry from the Map
2335          for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2336            final Key key = entry.getKey();
2337            final FileSystem fs = entry.getValue();
2338            if (ugi.equals(key.ugi) && fs != null) {
2339              targetFSList.add(fs);   
2340            }
2341          }
2342          List<IOException> exceptions = new ArrayList<IOException>();
2343          //now make a pass over the target list and close each
2344          for (FileSystem fs : targetFSList) {
2345            try {
2346              fs.close();
2347            }
2348            catch(IOException ioe) {
2349              exceptions.add(ioe);
2350            }
2351          }
2352          if (!exceptions.isEmpty()) {
2353            throw MultipleIOException.createIOException(exceptions);
2354          }
2355        }
2356    
2357        /** FileSystem.Cache.Key */
2358        static class Key {
2359          final String scheme;
2360          final String authority;
2361          final UserGroupInformation ugi;
2362          final long unique;   // an artificial way to make a key unique
2363    
2364          Key(URI uri, Configuration conf) throws IOException {
2365            this(uri, conf, 0);
2366          }
2367    
2368          Key(URI uri, Configuration conf, long unique) throws IOException {
2369            scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
2370            authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
2371            this.unique = unique;
2372            
2373            this.ugi = UserGroupInformation.getCurrentUser();
2374          }
2375    
2376          /** {@inheritDoc} */
2377          public int hashCode() {
2378            return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2379          }
2380    
2381          static boolean isEqual(Object a, Object b) {
2382            return a == b || (a != null && a.equals(b));        
2383          }
2384    
2385          /** {@inheritDoc} */
2386          public boolean equals(Object obj) {
2387            if (obj == this) {
2388              return true;
2389            }
2390            if (obj != null && obj instanceof Key) {
2391              Key that = (Key)obj;
2392              return isEqual(this.scheme, that.scheme)
2393                     && isEqual(this.authority, that.authority)
2394                     && isEqual(this.ugi, that.ugi)
2395                     && (this.unique == that.unique);
2396            }
2397            return false;        
2398          }
2399    
2400          /** {@inheritDoc} */
2401          public String toString() {
2402            return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2403          }
2404        }
2405      }
2406      
2407      public static final class Statistics {
2408        private final String scheme;
2409        private AtomicLong bytesRead = new AtomicLong();
2410        private AtomicLong bytesWritten = new AtomicLong();
2411        private AtomicInteger readOps = new AtomicInteger();
2412        private AtomicInteger largeReadOps = new AtomicInteger();
2413        private AtomicInteger writeOps = new AtomicInteger();
2414        
2415        public Statistics(String scheme) {
2416          this.scheme = scheme;
2417        }
2418    
2419        /**
2420         * Copy constructor.
2421         * 
2422         * @param st
2423         *          The input Statistics object which is cloned.
2424         */
2425        public Statistics(Statistics st) {
2426          this.scheme = st.scheme;
2427          this.bytesRead = new AtomicLong(st.bytesRead.longValue());
2428          this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
2429        }
2430    
2431        /**
2432         * Increment the bytes read in the statistics
2433         * @param newBytes the additional bytes read
2434         */
2435        public void incrementBytesRead(long newBytes) {
2436          bytesRead.getAndAdd(newBytes);
2437        }
2438        
2439        /**
2440         * Increment the bytes written in the statistics
2441         * @param newBytes the additional bytes written
2442         */
2443        public void incrementBytesWritten(long newBytes) {
2444          bytesWritten.getAndAdd(newBytes);
2445        }
2446        
2447        /**
2448         * Increment the number of read operations
2449         * @param count number of read operations
2450         */
2451        public void incrementReadOps(int count) {
2452          readOps.getAndAdd(count);
2453        }
2454    
2455        /**
2456         * Increment the number of large read operations
2457         * @param count number of large read operations
2458         */
2459        public void incrementLargeReadOps(int count) {
2460          largeReadOps.getAndAdd(count);
2461        }
2462    
2463        /**
2464         * Increment the number of write operations
2465         * @param count number of write operations
2466         */
2467        public void incrementWriteOps(int count) {
2468          writeOps.getAndAdd(count);
2469        }
2470    
2471        /**
2472         * Get the total number of bytes read
2473         * @return the number of bytes
2474         */
2475        public long getBytesRead() {
2476          return bytesRead.get();
2477        }
2478        
2479        /**
2480         * Get the total number of bytes written
2481         * @return the number of bytes
2482         */
2483        public long getBytesWritten() {
2484          return bytesWritten.get();
2485        }
2486        
2487        /**
2488         * Get the number of file system read operations such as list files
2489         * @return number of read operations
2490         */
2491        public int getReadOps() {
2492          return readOps.get() + largeReadOps.get();
2493        }
2494    
2495        /**
2496         * Get the number of large file system read operations such as list files
2497         * under a large directory
2498         * @return number of large read operations
2499         */
2500        public int getLargeReadOps() {
2501          return largeReadOps.get();
2502        }
2503    
2504        /**
2505         * Get the number of file system write operations such as create, append 
2506         * rename etc.
2507         * @return number of write operations
2508         */
2509        public int getWriteOps() {
2510          return writeOps.get();
2511        }
2512    
2513        public String toString() {
2514          return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2515              + readOps + " read ops, " + largeReadOps + " large read ops, "
2516              + writeOps + " write ops";
2517        }
2518        
2519        /**
2520         * Reset the counts of bytes to 0.
2521         */
2522        public void reset() {
2523          bytesWritten.set(0);
2524          bytesRead.set(0);
2525        }
2526        
2527        /**
2528         * Get the uri scheme associated with this statistics object.
2529         * @return the schema associated with this set of statistics
2530         */
2531        public String getScheme() {
2532          return scheme;
2533        }
2534      }
2535      
2536      /**
2537       * Get the Map of Statistics object indexed by URI Scheme.
2538       * @return a Map having a key as URI scheme and value as Statistics object
2539       * @deprecated use {@link #getAllStatistics} instead
2540       */
2541      @Deprecated
2542      public static synchronized Map<String, Statistics> getStatistics() {
2543        Map<String, Statistics> result = new HashMap<String, Statistics>();
2544        for(Statistics stat: statisticsTable.values()) {
2545          result.put(stat.getScheme(), stat);
2546        }
2547        return result;
2548      }
2549    
2550      /**
2551       * Return the FileSystem classes that have Statistics
2552       */
2553      public static synchronized List<Statistics> getAllStatistics() {
2554        return new ArrayList<Statistics>(statisticsTable.values());
2555      }
2556      
2557      /**
2558       * Get the statistics for a particular file system
2559       * @param cls the class to lookup
2560       * @return a statistics object
2561       */
2562      public static synchronized 
2563      Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
2564        Statistics result = statisticsTable.get(cls);
2565        if (result == null) {
2566          result = new Statistics(scheme);
2567          statisticsTable.put(cls, result);
2568        }
2569        return result;
2570      }
2571      
2572      /**
2573       * Reset all statistics for all file systems
2574       */
2575      public static synchronized void clearStatistics() {
2576        for(Statistics stat: statisticsTable.values()) {
2577          stat.reset();
2578        }
2579      }
2580    
2581      /**
2582       * Print all statistics for all file systems
2583       */
2584      public static synchronized
2585      void printStatistics() throws IOException {
2586        for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
2587                statisticsTable.entrySet()) {
2588          System.out.println("  FileSystem " + pair.getKey().getName() + 
2589                             ": " + pair.getValue());
2590        }
2591      }
2592    }