001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs.http.client;
019    
020    import org.apache.hadoop.classification.InterfaceAudience;
021    import org.apache.hadoop.conf.Configuration;
022    import org.apache.hadoop.fs.ContentSummary;
023    import org.apache.hadoop.fs.DelegationTokenRenewer;
024    import org.apache.hadoop.fs.FSDataInputStream;
025    import org.apache.hadoop.fs.FSDataOutputStream;
026    import org.apache.hadoop.fs.FileChecksum;
027    import org.apache.hadoop.fs.FileStatus;
028    import org.apache.hadoop.fs.FileSystem;
029    import org.apache.hadoop.fs.Path;
030    import org.apache.hadoop.fs.PositionedReadable;
031    import org.apache.hadoop.fs.Seekable;
032    import org.apache.hadoop.fs.permission.FsPermission;
033    import org.apache.hadoop.hdfs.DFSConfigKeys;
034    import org.apache.hadoop.net.NetUtils;
035    import org.apache.hadoop.security.UserGroupInformation;
036    import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
037    import org.apache.hadoop.security.authentication.client.Authenticator;
038    import org.apache.hadoop.security.token.Token;
039    import org.apache.hadoop.security.token.TokenIdentifier;
040    import org.apache.hadoop.util.Progressable;
041    import org.apache.hadoop.util.ReflectionUtils;
042    import org.apache.hadoop.util.StringUtils;
043    import org.json.simple.JSONArray;
044    import org.json.simple.JSONObject;
045    
046    import java.io.BufferedInputStream;
047    import java.io.BufferedOutputStream;
048    import java.io.DataInput;
049    import java.io.DataOutput;
050    import java.io.FileNotFoundException;
051    import java.io.FilterInputStream;
052    import java.io.IOException;
053    import java.io.InputStream;
054    import java.io.OutputStream;
055    import java.net.HttpURLConnection;
056    import java.net.InetSocketAddress;
057    import java.net.URI;
058    import java.net.URISyntaxException;
059    import java.net.URL;
060    import java.security.PrivilegedExceptionAction;
061    import java.text.MessageFormat;
062    import java.util.HashMap;
063    import java.util.List;
064    import java.util.Map;
065    import java.util.concurrent.Callable;
066    
067    /**
068     * HttpFSServer implementation of the FileSystemAccess FileSystem.
069     * <p/>
070     * This implementation allows a user to access HDFS over HTTP via a HttpFSServer server.
071     */
072    @InterfaceAudience.Private
073    public class HttpFSFileSystem extends FileSystem
074      implements DelegationTokenRenewer.Renewable {
075    
076      public static final String SERVICE_NAME = HttpFSUtils.SERVICE_NAME;
077    
078      public static final String SERVICE_VERSION = HttpFSUtils.SERVICE_VERSION;
079    
080      public static final String SCHEME = "webhdfs";
081    
082      public static final String OP_PARAM = "op";
083      public static final String DO_AS_PARAM = "doas";
084      public static final String OVERWRITE_PARAM = "overwrite";
085      public static final String REPLICATION_PARAM = "replication";
086      public static final String BLOCKSIZE_PARAM = "blocksize";
087      public static final String PERMISSION_PARAM = "permission";
088      public static final String DESTINATION_PARAM = "destination";
089      public static final String RECURSIVE_PARAM = "recursive";
090      public static final String OWNER_PARAM = "owner";
091      public static final String GROUP_PARAM = "group";
092      public static final String MODIFICATION_TIME_PARAM = "modificationtime";
093      public static final String ACCESS_TIME_PARAM = "accesstime";
094    
095      public static final Short DEFAULT_PERMISSION = 0755;
096    
097      public static final String RENAME_JSON = "boolean";
098    
099      public static final String DELETE_JSON = "boolean";
100    
101      public static final String MKDIRS_JSON = "boolean";
102    
103      public static final String HOME_DIR_JSON = "Path";
104    
105      public static final String SET_REPLICATION_JSON = "boolean";
106    
107      public static final String UPLOAD_CONTENT_TYPE= "application/octet-stream";
108    
109      public static enum FILE_TYPE {
110        FILE, DIRECTORY, SYMLINK;
111    
112        public static FILE_TYPE getType(FileStatus fileStatus) {
113          if (fileStatus.isFile()) {
114            return FILE;
115          }
116          if (fileStatus.isDirectory()) {
117            return DIRECTORY;
118          }
119          if (fileStatus.isSymlink()) {
120            return SYMLINK;
121          }
122          throw new IllegalArgumentException("Could not determine filetype for: " +
123                                             fileStatus.getPath());
124        }
125      }
126    
127      public static final String FILE_STATUSES_JSON = "FileStatuses";
128      public static final String FILE_STATUS_JSON = "FileStatus";
129      public static final String PATH_SUFFIX_JSON = "pathSuffix";
130      public static final String TYPE_JSON = "type";
131      public static final String LENGTH_JSON = "length";
132      public static final String OWNER_JSON = "owner";
133      public static final String GROUP_JSON = "group";
134      public static final String PERMISSION_JSON = "permission";
135      public static final String ACCESS_TIME_JSON = "accessTime";
136      public static final String MODIFICATION_TIME_JSON = "modificationTime";
137      public static final String BLOCK_SIZE_JSON = "blockSize";
138      public static final String REPLICATION_JSON = "replication";
139    
140      public static final String FILE_CHECKSUM_JSON = "FileChecksum";
141      public static final String CHECKSUM_ALGORITHM_JSON = "algorithm";
142      public static final String CHECKSUM_BYTES_JSON = "bytes";
143      public static final String CHECKSUM_LENGTH_JSON = "length";
144    
145      public static final String CONTENT_SUMMARY_JSON = "ContentSummary";
146      public static final String CONTENT_SUMMARY_DIRECTORY_COUNT_JSON = "directoryCount";
147      public static final String CONTENT_SUMMARY_FILE_COUNT_JSON = "fileCount";
148      public static final String CONTENT_SUMMARY_LENGTH_JSON = "length";
149      public static final String CONTENT_SUMMARY_QUOTA_JSON = "quota";
150      public static final String CONTENT_SUMMARY_SPACE_CONSUMED_JSON = "spaceConsumed";
151      public static final String CONTENT_SUMMARY_SPACE_QUOTA_JSON = "spaceQuota";
152    
153      public static final String ERROR_JSON = "RemoteException";
154      public static final String ERROR_EXCEPTION_JSON = "exception";
155      public static final String ERROR_CLASSNAME_JSON = "javaClassName";
156      public static final String ERROR_MESSAGE_JSON = "message";
157    
158      public static final int HTTP_TEMPORARY_REDIRECT = 307;
159    
160      private static final String HTTP_GET = "GET";
161      private static final String HTTP_PUT = "PUT";
162      private static final String HTTP_POST = "POST";
163      private static final String HTTP_DELETE = "DELETE";
164    
165      @InterfaceAudience.Private
166      public static enum Operation {
167        OPEN(HTTP_GET), GETFILESTATUS(HTTP_GET), LISTSTATUS(HTTP_GET),
168        GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
169        GETFILECHECKSUM(HTTP_GET),  GETFILEBLOCKLOCATIONS(HTTP_GET),
170        INSTRUMENTATION(HTTP_GET),
171        APPEND(HTTP_POST),
172        CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT),
173        SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
174        DELETE(HTTP_DELETE);
175    
176        private String httpMethod;
177    
178        Operation(String httpMethod) {
179          this.httpMethod = httpMethod;
180        }
181    
182        public String getMethod() {
183          return httpMethod;
184        }
185    
186      }
187    
188    
189      private AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
190      private URI uri;
191      private InetSocketAddress httpFSAddr;
192      private Path workingDir;
193      private UserGroupInformation realUser;
194      private String doAs;
195      private Token<?> delegationToken;
196    
197      //This method enables handling UGI doAs with SPNEGO, we have to
198      //fallback to the realuser who logged in with Kerberos credentials
199      private <T> T doAsRealUserIfNecessary(final Callable<T> callable)
200        throws IOException {
201        try {
202          if (realUser.getShortUserName().equals(doAs)) {
203            return callable.call();
204          } else {
205            return realUser.doAs(new PrivilegedExceptionAction<T>() {
206              @Override
207              public T run() throws Exception {
208                return callable.call();
209              }
210            });
211          }
212        } catch (Exception ex) {
213          throw new IOException(ex.toString(), ex);
214        }
215      }
216    
217      /**
218       * Convenience method that creates a <code>HttpURLConnection</code> for the
219       * HttpFSServer file system operations.
220       * <p/>
221       * This methods performs and injects any needed authentication credentials
222       * via the {@link #getConnection(URL, String)} method
223       *
224       * @param method the HTTP method.
225       * @param params the query string parameters.
226       * @param path the file path
227       * @param makeQualified if the path should be 'makeQualified'
228       *
229       * @return a <code>HttpURLConnection</code> for the HttpFSServer server,
230       *         authenticated and ready to use for the specified path and file system operation.
231       *
232       * @throws IOException thrown if an IO error occurrs.
233       */
234      private HttpURLConnection getConnection(final String method,
235          Map<String, String> params, Path path, boolean makeQualified)
236          throws IOException {
237        if (!realUser.getShortUserName().equals(doAs)) {
238          params.put(DO_AS_PARAM, doAs);
239        }
240        HttpFSKerberosAuthenticator.injectDelegationToken(params, delegationToken);
241        if (makeQualified) {
242          path = makeQualified(path);
243        }
244        final URL url = HttpFSUtils.createHttpURL(path, params);
245        return doAsRealUserIfNecessary(new Callable<HttpURLConnection>() {
246          @Override
247          public HttpURLConnection call() throws Exception {
248            return getConnection(url, method);
249          }
250        });
251      }
252    
253      /**
254       * Convenience method that creates a <code>HttpURLConnection</code> for the specified URL.
255       * <p/>
256       * This methods performs and injects any needed authentication credentials.
257       *
258       * @param url url to connect to.
259       * @param method the HTTP method.
260       *
261       * @return a <code>HttpURLConnection</code> for the HttpFSServer server, authenticated and ready to use for
262       *         the specified path and file system operation.
263       *
264       * @throws IOException thrown if an IO error occurrs.
265       */
266      private HttpURLConnection getConnection(URL url, String method) throws IOException {
267        Class<? extends Authenticator> klass =
268          getConf().getClass("httpfs.authenticator.class",
269                             HttpFSKerberosAuthenticator.class, Authenticator.class);
270        Authenticator authenticator = ReflectionUtils.newInstance(klass, getConf());
271        try {
272          HttpURLConnection conn = new AuthenticatedURL(authenticator).openConnection(url, authToken);
273          conn.setRequestMethod(method);
274          if (method.equals(HTTP_POST) || method.equals(HTTP_PUT)) {
275            conn.setDoOutput(true);
276          }
277          return conn;
278        } catch (Exception ex) {
279          throw new IOException(ex);
280        }
281      }
282    
283      /**
284       * Called after a new FileSystem instance is constructed.
285       *
286       * @param name a uri whose authority section names the host, port, etc. for this FileSystem
287       * @param conf the configuration
288       */
289      @Override
290      public void initialize(URI name, Configuration conf) throws IOException {
291        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
292    
293        //the real use is the one that has the Kerberos credentials needed for
294        //SPNEGO to work
295        realUser = ugi.getRealUser();
296        if (realUser == null) {
297          realUser = UserGroupInformation.getLoginUser();
298        }
299        doAs = ugi.getShortUserName();
300        super.initialize(name, conf);
301        try {
302          uri = new URI(name.getScheme() + "://" + name.getAuthority());
303          httpFSAddr = NetUtils.createSocketAddr(getCanonicalUri().toString());
304        } catch (URISyntaxException ex) {
305          throw new IOException(ex);
306        }
307      }
308    
309      @Override
310      public String getScheme() {
311        return SCHEME;
312      }
313    
314      /**
315       * Returns a URI whose scheme and authority identify this FileSystem.
316       *
317       * @return the URI whose scheme and authority identify this FileSystem.
318       */
319      @Override
320      public URI getUri() {
321        return uri;
322      }
323    
324      /**
325       * Get the default port for this file system.
326       * @return the default port or 0 if there isn't one
327       */
328      @Override
329      protected int getDefaultPort() {
330        return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
331            DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
332      }
333    
334      /**
335       * HttpFSServer subclass of the <code>FSDataInputStream</code>.
336       * <p/>
337       * This implementation does not support the
338       * <code>PositionReadable</code> and <code>Seekable</code> methods.
339       */
340      private static class HttpFSDataInputStream extends FilterInputStream implements Seekable, PositionedReadable {
341    
342        protected HttpFSDataInputStream(InputStream in, int bufferSize) {
343          super(new BufferedInputStream(in, bufferSize));
344        }
345    
346        @Override
347        public int read(long position, byte[] buffer, int offset, int length) throws IOException {
348          throw new UnsupportedOperationException();
349        }
350    
351        @Override
352        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
353          throw new UnsupportedOperationException();
354        }
355    
356        @Override
357        public void readFully(long position, byte[] buffer) throws IOException {
358          throw new UnsupportedOperationException();
359        }
360    
361        @Override
362        public void seek(long pos) throws IOException {
363          throw new UnsupportedOperationException();
364        }
365    
366        @Override
367        public long getPos() throws IOException {
368          throw new UnsupportedOperationException();
369        }
370    
371        @Override
372        public boolean seekToNewSource(long targetPos) throws IOException {
373          throw new UnsupportedOperationException();
374        }
375      }
376    
377      /**
378       * Opens an FSDataInputStream at the indicated Path.
379       * </p>
380       * IMPORTANT: the returned <code><FSDataInputStream/code> does not support the
381       * <code>PositionReadable</code> and <code>Seekable</code> methods.
382       *
383       * @param f the file name to open
384       * @param bufferSize the size of the buffer to be used.
385       */
386      @Override
387      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
388        Map<String, String> params = new HashMap<String, String>();
389        params.put(OP_PARAM, Operation.OPEN.toString());
390        HttpURLConnection conn = getConnection(Operation.OPEN.getMethod(), params,
391                                               f, true);
392        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
393        return new FSDataInputStream(
394          new HttpFSDataInputStream(conn.getInputStream(), bufferSize));
395      }
396    
397      /**
398       * HttpFSServer subclass of the <code>FSDataOutputStream</code>.
399       * <p/>
400       * This implementation closes the underlying HTTP connection validating the Http connection status
401       * at closing time.
402       */
403      private static class HttpFSDataOutputStream extends FSDataOutputStream {
404        private HttpURLConnection conn;
405        private int closeStatus;
406    
407        public HttpFSDataOutputStream(HttpURLConnection conn, OutputStream out, int closeStatus, Statistics stats)
408          throws IOException {
409          super(out, stats);
410          this.conn = conn;
411          this.closeStatus = closeStatus;
412        }
413    
414        @Override
415        public void close() throws IOException {
416          try {
417            super.close();
418          } finally {
419            HttpFSUtils.validateResponse(conn, closeStatus);
420          }
421        }
422    
423      }
424    
425      /**
426       * Converts a <code>FsPermission</code> to a Unix octal representation.
427       *
428       * @param p the permission.
429       *
430       * @return the Unix string symbolic reprentation.
431       */
432      public static String permissionToString(FsPermission p) {
433        return  Integer.toString((p == null) ? DEFAULT_PERMISSION : p.toShort(), 8);
434      }
435    
436      /*
437       * Common handling for uploading data for create and append operations.
438       */
439      private FSDataOutputStream uploadData(String method, Path f, Map<String, String> params,
440                                            int bufferSize, int expectedStatus) throws IOException {
441        HttpURLConnection conn = getConnection(method, params, f, true);
442        conn.setInstanceFollowRedirects(false);
443        boolean exceptionAlreadyHandled = false;
444        try {
445          if (conn.getResponseCode() == HTTP_TEMPORARY_REDIRECT) {
446            exceptionAlreadyHandled = true;
447            String location = conn.getHeaderField("Location");
448            if (location != null) {
449              conn = getConnection(new URL(location), method);
450              conn.setRequestProperty("Content-Type", UPLOAD_CONTENT_TYPE);
451              try {
452                OutputStream os = new BufferedOutputStream(conn.getOutputStream(), bufferSize);
453                return new HttpFSDataOutputStream(conn, os, expectedStatus, statistics);
454              } catch (IOException ex) {
455                HttpFSUtils.validateResponse(conn, expectedStatus);
456                throw ex;
457              }
458            } else {
459              HttpFSUtils.validateResponse(conn, HTTP_TEMPORARY_REDIRECT);
460              throw new IOException("Missing HTTP 'Location' header for [" + conn.getURL() + "]");
461            }
462          } else {
463            throw new IOException(
464              MessageFormat.format("Expected HTTP status was [307], received [{0}]",
465                                   conn.getResponseCode()));
466          }
467        } catch (IOException ex) {
468          if (exceptionAlreadyHandled) {
469            throw ex;
470          } else {
471            HttpFSUtils.validateResponse(conn, HTTP_TEMPORARY_REDIRECT);
472            throw ex;
473          }
474        }
475      }
476    
477    
478      /**
479       * Opens an FSDataOutputStream at the indicated Path with write-progress
480       * reporting.
481       * <p/>
482       * IMPORTANT: The <code>Progressable</code> parameter is not used.
483       *
484       * @param f the file name to open.
485       * @param permission file permission.
486       * @param overwrite if a file with this name already exists, then if true,
487       * the file will be overwritten, and if false an error will be thrown.
488       * @param bufferSize the size of the buffer to be used.
489       * @param replication required block replication for the file.
490       * @param blockSize block size.
491       * @param progress progressable.
492       *
493       * @throws IOException
494       * @see #setPermission(Path, FsPermission)
495       */
496      @Override
497      public FSDataOutputStream create(Path f, FsPermission permission,
498                                       boolean overwrite, int bufferSize,
499                                       short replication, long blockSize,
500                                       Progressable progress) throws IOException {
501        Map<String, String> params = new HashMap<String, String>();
502        params.put(OP_PARAM, Operation.CREATE.toString());
503        params.put(OVERWRITE_PARAM, Boolean.toString(overwrite));
504        params.put(REPLICATION_PARAM, Short.toString(replication));
505        params.put(BLOCKSIZE_PARAM, Long.toString(blockSize));
506        params.put(PERMISSION_PARAM, permissionToString(permission));
507        return uploadData(Operation.CREATE.getMethod(), f, params, bufferSize,
508                          HttpURLConnection.HTTP_CREATED);
509      }
510    
511    
512      /**
513       * Append to an existing file (optional operation).
514       * <p/>
515       * IMPORTANT: The <code>Progressable</code> parameter is not used.
516       *
517       * @param f the existing file to be appended.
518       * @param bufferSize the size of the buffer to be used.
519       * @param progress for reporting progress if it is not null.
520       *
521       * @throws IOException
522       */
523      @Override
524      public FSDataOutputStream append(Path f, int bufferSize,
525                                       Progressable progress) throws IOException {
526        Map<String, String> params = new HashMap<String, String>();
527        params.put(OP_PARAM, Operation.APPEND.toString());
528        return uploadData(Operation.APPEND.getMethod(), f, params, bufferSize,
529                          HttpURLConnection.HTTP_OK);
530      }
531    
532      /**
533       * Renames Path src to Path dst.  Can take place on local fs
534       * or remote DFS.
535       */
536      @Override
537      public boolean rename(Path src, Path dst) throws IOException {
538        Map<String, String> params = new HashMap<String, String>();
539        params.put(OP_PARAM, Operation.RENAME.toString());
540        params.put(DESTINATION_PARAM, dst.toString());
541        HttpURLConnection conn = getConnection(Operation.RENAME.getMethod(),
542                                               params, src, true);
543        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
544        JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
545        return (Boolean) json.get(RENAME_JSON);
546      }
547    
548      /**
549       * Delete a file.
550       *
551       * @deprecated Use delete(Path, boolean) instead
552       */
553      @SuppressWarnings({"deprecation"})
554      @Deprecated
555      @Override
556      public boolean delete(Path f) throws IOException {
557        return delete(f, false);
558      }
559    
560      /**
561       * Delete a file.
562       *
563       * @param f the path to delete.
564       * @param recursive if path is a directory and set to
565       * true, the directory is deleted else throws an exception. In
566       * case of a file the recursive can be set to either true or false.
567       *
568       * @return true if delete is successful else false.
569       *
570       * @throws IOException
571       */
572      @Override
573      public boolean delete(Path f, boolean recursive) throws IOException {
574        Map<String, String> params = new HashMap<String, String>();
575        params.put(OP_PARAM, Operation.DELETE.toString());
576        params.put(RECURSIVE_PARAM, Boolean.toString(recursive));
577        HttpURLConnection conn = getConnection(Operation.DELETE.getMethod(),
578                                               params, f, true);
579        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
580        JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
581        return (Boolean) json.get(DELETE_JSON);
582      }
583    
584      /**
585       * List the statuses of the files/directories in the given path if the path is
586       * a directory.
587       *
588       * @param f given path
589       *
590       * @return the statuses of the files/directories in the given patch
591       *
592       * @throws IOException
593       */
594      @Override
595      public FileStatus[] listStatus(Path f) throws IOException {
596        Map<String, String> params = new HashMap<String, String>();
597        params.put(OP_PARAM, Operation.LISTSTATUS.toString());
598        HttpURLConnection conn = getConnection(Operation.LISTSTATUS.getMethod(),
599                                               params, f, true);
600        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
601        JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
602        json = (JSONObject) json.get(FILE_STATUSES_JSON);
603        JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
604        FileStatus[] array = new FileStatus[jsonArray.size()];
605        f = makeQualified(f);
606        for (int i = 0; i < jsonArray.size(); i++) {
607          array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
608        }
609        return array;
610      }
611    
612      /**
613       * Set the current working directory for the given file system. All relative
614       * paths will be resolved relative to it.
615       *
616       * @param newDir new directory.
617       */
618      @Override
619      public void setWorkingDirectory(Path newDir) {
620        workingDir = newDir;
621      }
622    
623      /**
624       * Get the current working directory for the given file system
625       *
626       * @return the directory pathname
627       */
628      @Override
629      public Path getWorkingDirectory() {
630        if (workingDir == null) {
631          workingDir = getHomeDirectory();
632        }
633        return workingDir;
634      }
635    
636      /**
637       * Make the given file and all non-existent parents into
638       * directories. Has the semantics of Unix 'mkdir -p'.
639       * Existence of the directory hierarchy is not an error.
640       */
641      @Override
642      public boolean mkdirs(Path f, FsPermission permission) throws IOException {
643        Map<String, String> params = new HashMap<String, String>();
644        params.put(OP_PARAM, Operation.MKDIRS.toString());
645        params.put(PERMISSION_PARAM, permissionToString(permission));
646        HttpURLConnection conn = getConnection(Operation.MKDIRS.getMethod(),
647                                               params, f, true);
648        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
649        JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
650        return (Boolean) json.get(MKDIRS_JSON);
651      }
652    
653      /**
654       * Return a file status object that represents the path.
655       *
656       * @param f The path we want information from
657       *
658       * @return a FileStatus object
659       *
660       * @throws FileNotFoundException when the path does not exist;
661       * IOException see specific implementation
662       */
663      @Override
664      public FileStatus getFileStatus(Path f) throws IOException {
665        Map<String, String> params = new HashMap<String, String>();
666        params.put(OP_PARAM, Operation.GETFILESTATUS.toString());
667        HttpURLConnection conn = getConnection(Operation.GETFILESTATUS.getMethod(),
668                                               params, f, true);
669        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
670        JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
671        json = (JSONObject) json.get(FILE_STATUS_JSON);
672        f = makeQualified(f);
673        return createFileStatus(f, json);
674      }
675    
676      /**
677       * Return the current user's home directory in this filesystem.
678       * The default implementation returns "/user/$USER/".
679       */
680      @Override
681      public Path getHomeDirectory() {
682        Map<String, String> params = new HashMap<String, String>();
683        params.put(OP_PARAM, Operation.GETHOMEDIRECTORY.toString());
684        try {
685          HttpURLConnection conn =
686            getConnection(Operation.GETHOMEDIRECTORY.getMethod(), params,
687                          new Path(getUri().toString(), "/"), false);
688          HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
689          JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
690          return new Path((String) json.get(HOME_DIR_JSON));
691        } catch (IOException ex) {
692          throw new RuntimeException(ex);
693        }
694      }
695    
696      /**
697       * Set owner of a path (i.e. a file or a directory).
698       * The parameters username and groupname cannot both be null.
699       *
700       * @param p The path
701       * @param username If it is null, the original username remains unchanged.
702       * @param groupname If it is null, the original groupname remains unchanged.
703       */
704      @Override
705      public void setOwner(Path p, String username, String groupname)
706        throws IOException {
707        Map<String, String> params = new HashMap<String, String>();
708        params.put(OP_PARAM, Operation.SETOWNER.toString());
709        params.put(OWNER_PARAM, username);
710        params.put(GROUP_PARAM, groupname);
711        HttpURLConnection conn = getConnection(Operation.SETOWNER.getMethod(),
712                                               params, p, true);
713        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
714      }
715    
716      /**
717       * Set permission of a path.
718       *
719       * @param p path.
720       * @param permission permission.
721       */
722      @Override
723      public void setPermission(Path p, FsPermission permission) throws IOException {
724        Map<String, String> params = new HashMap<String, String>();
725        params.put(OP_PARAM, Operation.SETPERMISSION.toString());
726        params.put(PERMISSION_PARAM, permissionToString(permission));
727        HttpURLConnection conn = getConnection(Operation.SETPERMISSION.getMethod(), params, p, true);
728        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
729      }
730    
731      /**
732       * Set access time of a file
733       *
734       * @param p The path
735       * @param mtime Set the modification time of this file.
736       * The number of milliseconds since Jan 1, 1970.
737       * A value of -1 means that this call should not set modification time.
738       * @param atime Set the access time of this file.
739       * The number of milliseconds since Jan 1, 1970.
740       * A value of -1 means that this call should not set access time.
741       */
742      @Override
743      public void setTimes(Path p, long mtime, long atime) throws IOException {
744        Map<String, String> params = new HashMap<String, String>();
745        params.put(OP_PARAM, Operation.SETTIMES.toString());
746        params.put(MODIFICATION_TIME_PARAM, Long.toString(mtime));
747        params.put(ACCESS_TIME_PARAM, Long.toString(atime));
748        HttpURLConnection conn = getConnection(Operation.SETTIMES.getMethod(),
749                                               params, p, true);
750        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
751      }
752    
753      /**
754       * Set replication for an existing file.
755       *
756       * @param src file name
757       * @param replication new replication
758       *
759       * @return true if successful;
760       *         false if file does not exist or is a directory
761       *
762       * @throws IOException
763       */
764      @Override
765      public boolean setReplication(Path src, short replication)
766        throws IOException {
767        Map<String, String> params = new HashMap<String, String>();
768        params.put(OP_PARAM, Operation.SETREPLICATION.toString());
769        params.put(REPLICATION_PARAM, Short.toString(replication));
770        HttpURLConnection conn =
771          getConnection(Operation.SETREPLICATION.getMethod(), params, src, true);
772        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
773        JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
774        return (Boolean) json.get(SET_REPLICATION_JSON);
775      }
776    
777      private FileStatus createFileStatus(Path parent, JSONObject json) {
778        String pathSuffix = (String) json.get(PATH_SUFFIX_JSON);
779        Path path = (pathSuffix.equals("")) ? parent : new Path(parent, pathSuffix);
780        FILE_TYPE type = FILE_TYPE.valueOf((String) json.get(TYPE_JSON));
781        long len = (Long) json.get(LENGTH_JSON);
782        String owner = (String) json.get(OWNER_JSON);
783        String group = (String) json.get(GROUP_JSON);
784        FsPermission permission =
785          new FsPermission(Short.parseShort((String) json.get(PERMISSION_JSON), 8));
786        long aTime = (Long) json.get(ACCESS_TIME_JSON);
787        long mTime = (Long) json.get(MODIFICATION_TIME_JSON);
788        long blockSize = (Long) json.get(BLOCK_SIZE_JSON);
789        short replication = ((Long) json.get(REPLICATION_JSON)).shortValue();
790        FileStatus fileStatus = null;
791    
792        switch (type) {
793          case FILE:
794          case DIRECTORY:
795            fileStatus = new FileStatus(len, (type == FILE_TYPE.DIRECTORY),
796                                        replication, blockSize, mTime, aTime,
797                                        permission, owner, group, path);
798            break;
799          case SYMLINK:
800            Path symLink = null;
801            fileStatus = new FileStatus(len, false,
802                                        replication, blockSize, mTime, aTime,
803                                        permission, owner, group, symLink,
804                                        path);
805        }
806        return fileStatus;
807      }
808    
809      @Override
810      public ContentSummary getContentSummary(Path f) throws IOException {
811        Map<String, String> params = new HashMap<String, String>();
812        params.put(OP_PARAM, Operation.GETCONTENTSUMMARY.toString());
813        HttpURLConnection conn =
814          getConnection(Operation.GETCONTENTSUMMARY.getMethod(), params, f, true);
815        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
816        JSONObject json = (JSONObject) ((JSONObject)
817          HttpFSUtils.jsonParse(conn)).get(CONTENT_SUMMARY_JSON);
818        return new ContentSummary((Long) json.get(CONTENT_SUMMARY_LENGTH_JSON),
819                                  (Long) json.get(CONTENT_SUMMARY_FILE_COUNT_JSON),
820                                  (Long) json.get(CONTENT_SUMMARY_DIRECTORY_COUNT_JSON),
821                                  (Long) json.get(CONTENT_SUMMARY_QUOTA_JSON),
822                                  (Long) json.get(CONTENT_SUMMARY_SPACE_CONSUMED_JSON),
823                                  (Long) json.get(CONTENT_SUMMARY_SPACE_QUOTA_JSON)
824        );
825      }
826    
827      @Override
828      public FileChecksum getFileChecksum(Path f) throws IOException {
829        Map<String, String> params = new HashMap<String, String>();
830        params.put(OP_PARAM, Operation.GETFILECHECKSUM.toString());
831        HttpURLConnection conn =
832          getConnection(Operation.GETFILECHECKSUM.getMethod(), params, f, true);
833        HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
834        final JSONObject json = (JSONObject) ((JSONObject)
835          HttpFSUtils.jsonParse(conn)).get(FILE_CHECKSUM_JSON);
836        return new FileChecksum() {
837          @Override
838          public String getAlgorithmName() {
839            return (String) json.get(CHECKSUM_ALGORITHM_JSON);
840          }
841    
842          @Override
843          public int getLength() {
844            return ((Long) json.get(CHECKSUM_LENGTH_JSON)).intValue();
845          }
846    
847          @Override
848          public byte[] getBytes() {
849            return StringUtils.hexStringToByte((String) json.get(CHECKSUM_BYTES_JSON));
850          }
851    
852          @Override
853          public void write(DataOutput out) throws IOException {
854            throw new UnsupportedOperationException();
855          }
856    
857          @Override
858          public void readFields(DataInput in) throws IOException {
859            throw new UnsupportedOperationException();
860          }
861        };
862      }
863    
864    
865      @Override
866      @SuppressWarnings("deprecation")
867      public Token<?> getDelegationToken(final String renewer)
868        throws IOException {
869        return doAsRealUserIfNecessary(new Callable<Token<?>>() {
870          @Override
871          public Token<?> call() throws Exception {
872            return HttpFSKerberosAuthenticator.
873              getDelegationToken(uri, httpFSAddr, authToken, renewer);
874          }
875        });
876      }
877    
878    
879      @Override
880      public List<Token<?>> getDelegationTokens(final String renewer)
881        throws IOException {
882        return doAsRealUserIfNecessary(new Callable<List<Token<?>>>() {
883          @Override
884          public List<Token<?>> call() throws Exception {
885            return HttpFSKerberosAuthenticator.
886              getDelegationTokens(uri, httpFSAddr, authToken, renewer);
887          }
888        });
889      }
890    
891      public long renewDelegationToken(final Token<?> token) throws IOException {
892        return doAsRealUserIfNecessary(new Callable<Long>() {
893          @Override
894          public Long call() throws Exception {
895            return HttpFSKerberosAuthenticator.
896              renewDelegationToken(uri,  authToken, token);
897          }
898        });
899      }
900    
901      public void cancelDelegationToken(final Token<?> token) throws IOException {
902        HttpFSKerberosAuthenticator.
903          cancelDelegationToken(uri, authToken, token);
904      }
905    
906      @Override
907      public Token<?> getRenewToken() {
908        return delegationToken;
909      }
910    
911      @Override
912      public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
913        delegationToken = token;
914      }
915    
916    }