001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs;
020    
021    import java.io.BufferedOutputStream;
022    import java.io.DataOutput;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.FileNotFoundException;
026    import java.io.FileOutputStream;
027    import java.io.IOException;
028    import java.io.OutputStream;
029    import java.io.FileDescriptor;
030    import java.net.URI;
031    import java.nio.ByteBuffer;
032    import java.util.Arrays;
033    import java.util.StringTokenizer;
034    
035    import org.apache.hadoop.classification.InterfaceAudience;
036    import org.apache.hadoop.classification.InterfaceStability;
037    import org.apache.hadoop.conf.Configuration;
038    import org.apache.hadoop.fs.permission.FsPermission;
039    import org.apache.hadoop.io.nativeio.NativeIO;
040    import org.apache.hadoop.util.Progressable;
041    import org.apache.hadoop.util.Shell;
042    import org.apache.hadoop.util.StringUtils;
043    
044    /****************************************************************
045     * Implement the FileSystem API for the raw local filesystem.
046     *
047     *****************************************************************/
048    @InterfaceAudience.Public
049    @InterfaceStability.Stable
050    public class RawLocalFileSystem extends FileSystem {
051      static final URI NAME = URI.create("file:///");
052      private Path workingDir;
053      
054      public RawLocalFileSystem() {
055        workingDir = getInitialWorkingDirectory();
056      }
057      
058      private Path makeAbsolute(Path f) {
059        if (f.isAbsolute()) {
060          return f;
061        } else {
062          return new Path(workingDir, f);
063        }
064      }
065      
066      /** Convert a path to a File. */
067      public File pathToFile(Path path) {
068        checkPath(path);
069        if (!path.isAbsolute()) {
070          path = new Path(getWorkingDirectory(), path);
071        }
072        return new File(path.toUri().getPath());
073      }
074    
075      public URI getUri() { return NAME; }
076      
077      public void initialize(URI uri, Configuration conf) throws IOException {
078        super.initialize(uri, conf);
079        setConf(conf);
080      }
081      
082      class TrackingFileInputStream extends FileInputStream {
083        public TrackingFileInputStream(File f) throws IOException {
084          super(f);
085        }
086        
087        public int read() throws IOException {
088          int result = super.read();
089          if (result != -1) {
090            statistics.incrementBytesRead(1);
091          }
092          return result;
093        }
094        
095        public int read(byte[] data) throws IOException {
096          int result = super.read(data);
097          if (result != -1) {
098            statistics.incrementBytesRead(result);
099          }
100          return result;
101        }
102        
103        public int read(byte[] data, int offset, int length) throws IOException {
104          int result = super.read(data, offset, length);
105          if (result != -1) {
106            statistics.incrementBytesRead(result);
107          }
108          return result;
109        }
110      }
111    
112      /*******************************************************
113       * For open()'s FSInputStream.
114       *******************************************************/
115      class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
116        private FileInputStream fis;
117        private long position;
118    
119        public LocalFSFileInputStream(Path f) throws IOException {
120          this.fis = new TrackingFileInputStream(pathToFile(f));
121        }
122        
123        public void seek(long pos) throws IOException {
124          fis.getChannel().position(pos);
125          this.position = pos;
126        }
127        
128        public long getPos() throws IOException {
129          return this.position;
130        }
131        
132        public boolean seekToNewSource(long targetPos) throws IOException {
133          return false;
134        }
135        
136        /*
137         * Just forward to the fis
138         */
139        public int available() throws IOException { return fis.available(); }
140        public void close() throws IOException { fis.close(); }
141        @Override
142        public boolean markSupported() { return false; }
143        
144        public int read() throws IOException {
145          try {
146            int value = fis.read();
147            if (value >= 0) {
148              this.position++;
149            }
150            return value;
151          } catch (IOException e) {                 // unexpected exception
152            throw new FSError(e);                   // assume native fs error
153          }
154        }
155        
156        public int read(byte[] b, int off, int len) throws IOException {
157          try {
158            int value = fis.read(b, off, len);
159            if (value > 0) {
160              this.position += value;
161            }
162            return value;
163          } catch (IOException e) {                 // unexpected exception
164            throw new FSError(e);                   // assume native fs error
165          }
166        }
167        
168        public int read(long position, byte[] b, int off, int len)
169          throws IOException {
170          ByteBuffer bb = ByteBuffer.wrap(b, off, len);
171          try {
172            return fis.getChannel().read(bb, position);
173          } catch (IOException e) {
174            throw new FSError(e);
175          }
176        }
177        
178        public long skip(long n) throws IOException {
179          long value = fis.skip(n);
180          if (value > 0) {
181            this.position += value;
182          }
183          return value;
184        }
185    
186        @Override
187        public FileDescriptor getFileDescriptor() throws IOException {
188          return fis.getFD();
189        }
190      }
191      
192      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
193        if (!exists(f)) {
194          throw new FileNotFoundException(f.toString());
195        }
196        return new FSDataInputStream(new BufferedFSInputStream(
197            new LocalFSFileInputStream(f), bufferSize));
198      }
199      
200      /*********************************************************
201       * For create()'s FSOutputStream.
202       *********************************************************/
203      class LocalFSFileOutputStream extends OutputStream {
204        private FileOutputStream fos;
205        
206        private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
207          this.fos = new FileOutputStream(pathToFile(f), append);
208        }
209        
210        /*
211         * Just forward to the fos
212         */
213        public void close() throws IOException { fos.close(); }
214        public void flush() throws IOException { fos.flush(); }
215        public void write(byte[] b, int off, int len) throws IOException {
216          try {
217            fos.write(b, off, len);
218          } catch (IOException e) {                // unexpected exception
219            throw new FSError(e);                  // assume native fs error
220          }
221        }
222        
223        public void write(int b) throws IOException {
224          try {
225            fos.write(b);
226          } catch (IOException e) {              // unexpected exception
227            throw new FSError(e);                // assume native fs error
228          }
229        }
230      }
231    
232      /** {@inheritDoc} */
233      public FSDataOutputStream append(Path f, int bufferSize,
234          Progressable progress) throws IOException {
235        if (!exists(f)) {
236          throw new FileNotFoundException("File " + f + " not found");
237        }
238        if (getFileStatus(f).isDirectory()) {
239          throw new IOException("Cannot append to a diretory (=" + f + " )");
240        }
241        return new FSDataOutputStream(new BufferedOutputStream(
242            new LocalFSFileOutputStream(f, true), bufferSize), statistics);
243      }
244    
245      /** {@inheritDoc} */
246      @Override
247      public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
248        short replication, long blockSize, Progressable progress)
249        throws IOException {
250        return create(f, overwrite, true, bufferSize, replication, blockSize, progress);
251      }
252    
253      private FSDataOutputStream create(Path f, boolean overwrite,
254          boolean createParent, int bufferSize, short replication, long blockSize,
255          Progressable progress) throws IOException {
256        if (exists(f) && !overwrite) {
257          throw new IOException("File already exists: "+f);
258        }
259        Path parent = f.getParent();
260        if (parent != null && !mkdirs(parent)) {
261          throw new IOException("Mkdirs failed to create " + parent.toString());
262        }
263        return new FSDataOutputStream(new BufferedOutputStream(
264            new LocalFSFileOutputStream(f, false), bufferSize), statistics);
265      }
266    
267      /** {@inheritDoc} */
268      @Override
269      public FSDataOutputStream create(Path f, FsPermission permission,
270        boolean overwrite, int bufferSize, short replication, long blockSize,
271        Progressable progress) throws IOException {
272    
273        FSDataOutputStream out = create(f,
274            overwrite, bufferSize, replication, blockSize, progress);
275        setPermission(f, permission);
276        return out;
277      }
278    
279      /** {@inheritDoc} */
280      @Override
281      public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
282          boolean overwrite,
283          int bufferSize, short replication, long blockSize,
284          Progressable progress) throws IOException {
285        FSDataOutputStream out = create(f,
286            overwrite, false, bufferSize, replication, blockSize, progress);
287        setPermission(f, permission);
288        return out;
289      }
290    
291      public boolean rename(Path src, Path dst) throws IOException {
292        if (pathToFile(src).renameTo(pathToFile(dst))) {
293          return true;
294        }
295        return FileUtil.copy(this, src, this, dst, true, getConf());
296      }
297      
298      /**
299       * Delete the given path to a file or directory.
300       * @param p the path to delete
301       * @param recursive to delete sub-directories
302       * @return true if the file or directory and all its contents were deleted
303       * @throws IOException if p is non-empty and recursive is false 
304       */
305      public boolean delete(Path p, boolean recursive) throws IOException {
306        File f = pathToFile(p);
307        if (f.isFile()) {
308          return f.delete();
309        } else if (!recursive && f.isDirectory() && 
310            (FileUtil.listFiles(f).length != 0)) {
311          throw new IOException("Directory " + f.toString() + " is not empty");
312        }
313        return FileUtil.fullyDelete(f);
314      }
315     
316      public FileStatus[] listStatus(Path f) throws IOException {
317        File localf = pathToFile(f);
318        FileStatus[] results;
319    
320        if (!localf.exists()) {
321          throw new FileNotFoundException("File " + f + " does not exist");
322        }
323        if (localf.isFile()) {
324          return new FileStatus[] {
325            new RawLocalFileStatus(localf, getDefaultBlockSize(f), this) };
326        }
327    
328        String[] names = localf.list();
329        if (names == null) {
330          return null;
331        }
332        results = new FileStatus[names.length];
333        int j = 0;
334        for (int i = 0; i < names.length; i++) {
335          try {
336            results[j] = getFileStatus(new Path(f, names[i]));
337            j++;
338          } catch (FileNotFoundException e) {
339            // ignore the files not found since the dir list may have have changed
340            // since the names[] list was generated.
341          }
342        }
343        if (j == names.length) {
344          return results;
345        }
346        return Arrays.copyOf(results, j);
347      }
348    
349      /**
350       * Creates the specified directory hierarchy. Does not
351       * treat existence as an error.
352       */
353      public boolean mkdirs(Path f) throws IOException {
354        if(f == null) {
355          throw new IllegalArgumentException("mkdirs path arg is null");
356        }
357        Path parent = f.getParent();
358        File p2f = pathToFile(f);
359        if(parent != null) {
360          File parent2f = pathToFile(parent);
361          if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
362            throw new FileAlreadyExistsException("Parent path is not a directory: " 
363                + parent);
364          }
365        }
366        return (parent == null || mkdirs(parent)) &&
367          (p2f.mkdir() || p2f.isDirectory());
368      }
369    
370      /** {@inheritDoc} */
371      @Override
372      public boolean mkdirs(Path f, FsPermission permission) throws IOException {
373        boolean b = mkdirs(f);
374        if(b) {
375          setPermission(f, permission);
376        }
377        return b;
378      }
379      
380    
381      @Override
382      protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
383        throws IOException {
384        boolean b = mkdirs(f);
385        setPermission(f, absolutePermission);
386        return b;
387      }
388      
389      
390      @Override
391      public Path getHomeDirectory() {
392        return this.makeQualified(new Path(System.getProperty("user.home")));
393      }
394    
395      /**
396       * Set the working directory to the given directory.
397       */
398      @Override
399      public void setWorkingDirectory(Path newDir) {
400        workingDir = makeAbsolute(newDir);
401        checkPath(workingDir);
402        
403      }
404      
405      @Override
406      public Path getWorkingDirectory() {
407        return workingDir;
408      }
409      
410      @Override
411      protected Path getInitialWorkingDirectory() {
412        return this.makeQualified(new Path(System.getProperty("user.dir")));
413      }
414    
415      /** {@inheritDoc} */
416      @Override
417      public FsStatus getStatus(Path p) throws IOException {
418        File partition = pathToFile(p == null ? new Path("/") : p);
419        //File provides getUsableSpace() and getFreeSpace()
420        //File provides no API to obtain used space, assume used = total - free
421        return new FsStatus(partition.getTotalSpace(), 
422          partition.getTotalSpace() - partition.getFreeSpace(),
423          partition.getFreeSpace());
424      }
425      
426      // In the case of the local filesystem, we can just rename the file.
427      public void moveFromLocalFile(Path src, Path dst) throws IOException {
428        rename(src, dst);
429      }
430      
431      // We can write output directly to the final location
432      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
433        throws IOException {
434        return fsOutputFile;
435      }
436      
437      // It's in the right place - nothing to do.
438      public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
439        throws IOException {
440      }
441      
442      public void close() throws IOException {
443        super.close();
444      }
445      
446      public String toString() {
447        return "LocalFS";
448      }
449      
450      public FileStatus getFileStatus(Path f) throws IOException {
451        File path = pathToFile(f);
452        if (path.exists()) {
453          return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(f), this);
454        } else {
455          throw new FileNotFoundException("File " + f + " does not exist");
456        }
457      }
458    
459      static class RawLocalFileStatus extends FileStatus {
460        /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
461         * We recognize if the information is already loaded by check if
462         * onwer.equals("").
463         */
464        private boolean isPermissionLoaded() {
465          return !super.getOwner().equals(""); 
466        }
467        
468        RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
469          super(f.length(), f.isDirectory(), 1, defaultBlockSize,
470                f.lastModified(), fs.makeQualified(new Path(f.getPath())));
471        }
472        
473        @Override
474        public FsPermission getPermission() {
475          if (!isPermissionLoaded()) {
476            loadPermissionInfo();
477          }
478          return super.getPermission();
479        }
480    
481        @Override
482        public String getOwner() {
483          if (!isPermissionLoaded()) {
484            loadPermissionInfo();
485          }
486          return super.getOwner();
487        }
488    
489        @Override
490        public String getGroup() {
491          if (!isPermissionLoaded()) {
492            loadPermissionInfo();
493          }
494          return super.getGroup();
495        }
496    
497        /// loads permissions, owner, and group from `ls -ld`
498        private void loadPermissionInfo() {
499          IOException e = null;
500          try {
501            StringTokenizer t = new StringTokenizer(
502                execCommand(new File(getPath().toUri()), 
503                            Shell.getGET_PERMISSION_COMMAND()));
504            //expected format
505            //-rw-------    1 username groupname ...
506            String permission = t.nextToken();
507            if (permission.length() > 10) { //files with ACLs might have a '+'
508              permission = permission.substring(0, 10);
509            }
510            setPermission(FsPermission.valueOf(permission));
511            t.nextToken();
512            setOwner(t.nextToken());
513            setGroup(t.nextToken());
514          } catch (Shell.ExitCodeException ioe) {
515            if (ioe.getExitCode() != 1) {
516              e = ioe;
517            } else {
518              setPermission(null);
519              setOwner(null);
520              setGroup(null);
521            }
522          } catch (IOException ioe) {
523            e = ioe;
524          } finally {
525            if (e != null) {
526              throw new RuntimeException("Error while running command to get " +
527                                         "file permissions : " + 
528                                         StringUtils.stringifyException(e));
529            }
530          }
531        }
532    
533        @Override
534        public void write(DataOutput out) throws IOException {
535          if (!isPermissionLoaded()) {
536            loadPermissionInfo();
537          }
538          super.write(out);
539        }
540      }
541    
542      /**
543       * Use the command chown to set owner.
544       */
545      @Override
546      public void setOwner(Path p, String username, String groupname)
547        throws IOException {
548        if (username == null && groupname == null) {
549          throw new IOException("username == null && groupname == null");
550        }
551    
552        if (username == null) {
553          execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
554        } else {
555          //OWNER[:[GROUP]]
556          String s = username + (groupname == null? "": ":" + groupname);
557          execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
558        }
559      }
560    
561      /**
562       * Use the command chmod to set permission.
563       */
564      @Override
565      public void setPermission(Path p, FsPermission permission)
566        throws IOException {
567        if (NativeIO.isAvailable()) {
568          NativeIO.chmod(pathToFile(p).getCanonicalPath(),
569                         permission.toShort());
570        } else {
571          execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
572              String.format("%05o", permission.toShort()));
573        }
574      }
575    
576      private static String execCommand(File f, String... cmd) throws IOException {
577        String[] args = new String[cmd.length + 1];
578        System.arraycopy(cmd, 0, args, 0, cmd.length);
579        args[cmd.length] = FileUtil.makeShellPath(f, true);
580        String output = Shell.execCommand(args);
581        return output;
582      }
583    
584    }