001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.fs.http.client; 019 020 import org.apache.hadoop.classification.InterfaceAudience; 021 import org.apache.hadoop.conf.Configuration; 022 import org.apache.hadoop.fs.ContentSummary; 023 import org.apache.hadoop.fs.DelegationTokenRenewer; 024 import org.apache.hadoop.fs.FSDataInputStream; 025 import org.apache.hadoop.fs.FSDataOutputStream; 026 import org.apache.hadoop.fs.FileChecksum; 027 import org.apache.hadoop.fs.FileStatus; 028 import org.apache.hadoop.fs.FileSystem; 029 import org.apache.hadoop.fs.Path; 030 import org.apache.hadoop.fs.PositionedReadable; 031 import org.apache.hadoop.fs.Seekable; 032 import org.apache.hadoop.fs.permission.FsPermission; 033 import org.apache.hadoop.hdfs.DFSConfigKeys; 034 import org.apache.hadoop.net.NetUtils; 035 import org.apache.hadoop.security.UserGroupInformation; 036 import org.apache.hadoop.security.authentication.client.AuthenticatedURL; 037 import org.apache.hadoop.security.authentication.client.Authenticator; 038 import org.apache.hadoop.security.token.Token; 039 import org.apache.hadoop.security.token.TokenIdentifier; 040 import org.apache.hadoop.util.Progressable; 041 import org.apache.hadoop.util.ReflectionUtils; 042 import org.apache.hadoop.util.StringUtils; 043 import org.json.simple.JSONArray; 044 import org.json.simple.JSONObject; 045 046 import java.io.BufferedInputStream; 047 import java.io.BufferedOutputStream; 048 import java.io.DataInput; 049 import java.io.DataOutput; 050 import java.io.FileNotFoundException; 051 import java.io.FilterInputStream; 052 import java.io.IOException; 053 import java.io.InputStream; 054 import java.io.OutputStream; 055 import java.net.HttpURLConnection; 056 import java.net.InetSocketAddress; 057 import java.net.URI; 058 import java.net.URISyntaxException; 059 import java.net.URL; 060 import java.security.PrivilegedExceptionAction; 061 import java.text.MessageFormat; 062 import java.util.HashMap; 063 import java.util.List; 064 import java.util.Map; 065 import java.util.concurrent.Callable; 066 067 /** 068 * HttpFSServer implementation of the FileSystemAccess FileSystem. 069 * <p/> 070 * This implementation allows a user to access HDFS over HTTP via a HttpFSServer server. 071 */ 072 @InterfaceAudience.Private 073 public class HttpFSFileSystem extends FileSystem 074 implements DelegationTokenRenewer.Renewable { 075 076 public static final String SERVICE_NAME = HttpFSUtils.SERVICE_NAME; 077 078 public static final String SERVICE_VERSION = HttpFSUtils.SERVICE_VERSION; 079 080 public static final String SCHEME = "webhdfs"; 081 082 public static final String OP_PARAM = "op"; 083 public static final String DO_AS_PARAM = "doas"; 084 public static final String OVERWRITE_PARAM = "overwrite"; 085 public static final String REPLICATION_PARAM = "replication"; 086 public static final String BLOCKSIZE_PARAM = "blocksize"; 087 public static final String PERMISSION_PARAM = "permission"; 088 public static final String DESTINATION_PARAM = "destination"; 089 public static final String RECURSIVE_PARAM = "recursive"; 090 public static final String OWNER_PARAM = "owner"; 091 public static final String GROUP_PARAM = "group"; 092 public static final String MODIFICATION_TIME_PARAM = "modificationtime"; 093 public static final String ACCESS_TIME_PARAM = "accesstime"; 094 095 public static final Short DEFAULT_PERMISSION = 0755; 096 097 public static final String RENAME_JSON = "boolean"; 098 099 public static final String DELETE_JSON = "boolean"; 100 101 public static final String MKDIRS_JSON = "boolean"; 102 103 public static final String HOME_DIR_JSON = "Path"; 104 105 public static final String SET_REPLICATION_JSON = "boolean"; 106 107 public static final String UPLOAD_CONTENT_TYPE= "application/octet-stream"; 108 109 public static enum FILE_TYPE { 110 FILE, DIRECTORY, SYMLINK; 111 112 public static FILE_TYPE getType(FileStatus fileStatus) { 113 if (fileStatus.isFile()) { 114 return FILE; 115 } 116 if (fileStatus.isDirectory()) { 117 return DIRECTORY; 118 } 119 if (fileStatus.isSymlink()) { 120 return SYMLINK; 121 } 122 throw new IllegalArgumentException("Could not determine filetype for: " + 123 fileStatus.getPath()); 124 } 125 } 126 127 public static final String FILE_STATUSES_JSON = "FileStatuses"; 128 public static final String FILE_STATUS_JSON = "FileStatus"; 129 public static final String PATH_SUFFIX_JSON = "pathSuffix"; 130 public static final String TYPE_JSON = "type"; 131 public static final String LENGTH_JSON = "length"; 132 public static final String OWNER_JSON = "owner"; 133 public static final String GROUP_JSON = "group"; 134 public static final String PERMISSION_JSON = "permission"; 135 public static final String ACCESS_TIME_JSON = "accessTime"; 136 public static final String MODIFICATION_TIME_JSON = "modificationTime"; 137 public static final String BLOCK_SIZE_JSON = "blockSize"; 138 public static final String REPLICATION_JSON = "replication"; 139 140 public static final String FILE_CHECKSUM_JSON = "FileChecksum"; 141 public static final String CHECKSUM_ALGORITHM_JSON = "algorithm"; 142 public static final String CHECKSUM_BYTES_JSON = "bytes"; 143 public static final String CHECKSUM_LENGTH_JSON = "length"; 144 145 public static final String CONTENT_SUMMARY_JSON = "ContentSummary"; 146 public static final String CONTENT_SUMMARY_DIRECTORY_COUNT_JSON = "directoryCount"; 147 public static final String CONTENT_SUMMARY_FILE_COUNT_JSON = "fileCount"; 148 public static final String CONTENT_SUMMARY_LENGTH_JSON = "length"; 149 public static final String CONTENT_SUMMARY_QUOTA_JSON = "quota"; 150 public static final String CONTENT_SUMMARY_SPACE_CONSUMED_JSON = "spaceConsumed"; 151 public static final String CONTENT_SUMMARY_SPACE_QUOTA_JSON = "spaceQuota"; 152 153 public static final String ERROR_JSON = "RemoteException"; 154 public static final String ERROR_EXCEPTION_JSON = "exception"; 155 public static final String ERROR_CLASSNAME_JSON = "javaClassName"; 156 public static final String ERROR_MESSAGE_JSON = "message"; 157 158 public static final int HTTP_TEMPORARY_REDIRECT = 307; 159 160 private static final String HTTP_GET = "GET"; 161 private static final String HTTP_PUT = "PUT"; 162 private static final String HTTP_POST = "POST"; 163 private static final String HTTP_DELETE = "DELETE"; 164 165 @InterfaceAudience.Private 166 public static enum Operation { 167 OPEN(HTTP_GET), GETFILESTATUS(HTTP_GET), LISTSTATUS(HTTP_GET), 168 GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET), 169 GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET), 170 INSTRUMENTATION(HTTP_GET), 171 APPEND(HTTP_POST), 172 CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT), 173 SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT), 174 DELETE(HTTP_DELETE); 175 176 private String httpMethod; 177 178 Operation(String httpMethod) { 179 this.httpMethod = httpMethod; 180 } 181 182 public String getMethod() { 183 return httpMethod; 184 } 185 186 } 187 188 189 private AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); 190 private URI uri; 191 private InetSocketAddress httpFSAddr; 192 private Path workingDir; 193 private UserGroupInformation realUser; 194 private String doAs; 195 private Token<?> delegationToken; 196 197 //This method enables handling UGI doAs with SPNEGO, we have to 198 //fallback to the realuser who logged in with Kerberos credentials 199 private <T> T doAsRealUserIfNecessary(final Callable<T> callable) 200 throws IOException { 201 try { 202 if (realUser.getShortUserName().equals(doAs)) { 203 return callable.call(); 204 } else { 205 return realUser.doAs(new PrivilegedExceptionAction<T>() { 206 @Override 207 public T run() throws Exception { 208 return callable.call(); 209 } 210 }); 211 } 212 } catch (Exception ex) { 213 throw new IOException(ex.toString(), ex); 214 } 215 } 216 217 /** 218 * Convenience method that creates a <code>HttpURLConnection</code> for the 219 * HttpFSServer file system operations. 220 * <p/> 221 * This methods performs and injects any needed authentication credentials 222 * via the {@link #getConnection(URL, String)} method 223 * 224 * @param method the HTTP method. 225 * @param params the query string parameters. 226 * @param path the file path 227 * @param makeQualified if the path should be 'makeQualified' 228 * 229 * @return a <code>HttpURLConnection</code> for the HttpFSServer server, 230 * authenticated and ready to use for the specified path and file system operation. 231 * 232 * @throws IOException thrown if an IO error occurrs. 233 */ 234 private HttpURLConnection getConnection(final String method, 235 Map<String, String> params, Path path, boolean makeQualified) 236 throws IOException { 237 if (!realUser.getShortUserName().equals(doAs)) { 238 params.put(DO_AS_PARAM, doAs); 239 } 240 HttpFSKerberosAuthenticator.injectDelegationToken(params, delegationToken); 241 if (makeQualified) { 242 path = makeQualified(path); 243 } 244 final URL url = HttpFSUtils.createHttpURL(path, params); 245 return doAsRealUserIfNecessary(new Callable<HttpURLConnection>() { 246 @Override 247 public HttpURLConnection call() throws Exception { 248 return getConnection(url, method); 249 } 250 }); 251 } 252 253 /** 254 * Convenience method that creates a <code>HttpURLConnection</code> for the specified URL. 255 * <p/> 256 * This methods performs and injects any needed authentication credentials. 257 * 258 * @param url url to connect to. 259 * @param method the HTTP method. 260 * 261 * @return a <code>HttpURLConnection</code> for the HttpFSServer server, authenticated and ready to use for 262 * the specified path and file system operation. 263 * 264 * @throws IOException thrown if an IO error occurrs. 265 */ 266 private HttpURLConnection getConnection(URL url, String method) throws IOException { 267 Class<? extends Authenticator> klass = 268 getConf().getClass("httpfs.authenticator.class", 269 HttpFSKerberosAuthenticator.class, Authenticator.class); 270 Authenticator authenticator = ReflectionUtils.newInstance(klass, getConf()); 271 try { 272 HttpURLConnection conn = new AuthenticatedURL(authenticator).openConnection(url, authToken); 273 conn.setRequestMethod(method); 274 if (method.equals(HTTP_POST) || method.equals(HTTP_PUT)) { 275 conn.setDoOutput(true); 276 } 277 return conn; 278 } catch (Exception ex) { 279 throw new IOException(ex); 280 } 281 } 282 283 /** 284 * Called after a new FileSystem instance is constructed. 285 * 286 * @param name a uri whose authority section names the host, port, etc. for this FileSystem 287 * @param conf the configuration 288 */ 289 @Override 290 public void initialize(URI name, Configuration conf) throws IOException { 291 UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); 292 293 //the real use is the one that has the Kerberos credentials needed for 294 //SPNEGO to work 295 realUser = ugi.getRealUser(); 296 if (realUser == null) { 297 realUser = UserGroupInformation.getLoginUser(); 298 } 299 doAs = ugi.getShortUserName(); 300 super.initialize(name, conf); 301 try { 302 uri = new URI(name.getScheme() + "://" + name.getAuthority()); 303 httpFSAddr = NetUtils.createSocketAddr(getCanonicalUri().toString()); 304 } catch (URISyntaxException ex) { 305 throw new IOException(ex); 306 } 307 } 308 309 @Override 310 public String getScheme() { 311 return SCHEME; 312 } 313 314 /** 315 * Returns a URI whose scheme and authority identify this FileSystem. 316 * 317 * @return the URI whose scheme and authority identify this FileSystem. 318 */ 319 @Override 320 public URI getUri() { 321 return uri; 322 } 323 324 /** 325 * Get the default port for this file system. 326 * @return the default port or 0 if there isn't one 327 */ 328 @Override 329 protected int getDefaultPort() { 330 return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, 331 DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT); 332 } 333 334 /** 335 * HttpFSServer subclass of the <code>FSDataInputStream</code>. 336 * <p/> 337 * This implementation does not support the 338 * <code>PositionReadable</code> and <code>Seekable</code> methods. 339 */ 340 private static class HttpFSDataInputStream extends FilterInputStream implements Seekable, PositionedReadable { 341 342 protected HttpFSDataInputStream(InputStream in, int bufferSize) { 343 super(new BufferedInputStream(in, bufferSize)); 344 } 345 346 @Override 347 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 348 throw new UnsupportedOperationException(); 349 } 350 351 @Override 352 public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { 353 throw new UnsupportedOperationException(); 354 } 355 356 @Override 357 public void readFully(long position, byte[] buffer) throws IOException { 358 throw new UnsupportedOperationException(); 359 } 360 361 @Override 362 public void seek(long pos) throws IOException { 363 throw new UnsupportedOperationException(); 364 } 365 366 @Override 367 public long getPos() throws IOException { 368 throw new UnsupportedOperationException(); 369 } 370 371 @Override 372 public boolean seekToNewSource(long targetPos) throws IOException { 373 throw new UnsupportedOperationException(); 374 } 375 } 376 377 /** 378 * Opens an FSDataInputStream at the indicated Path. 379 * </p> 380 * IMPORTANT: the returned <code><FSDataInputStream/code> does not support the 381 * <code>PositionReadable</code> and <code>Seekable</code> methods. 382 * 383 * @param f the file name to open 384 * @param bufferSize the size of the buffer to be used. 385 */ 386 @Override 387 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 388 Map<String, String> params = new HashMap<String, String>(); 389 params.put(OP_PARAM, Operation.OPEN.toString()); 390 HttpURLConnection conn = getConnection(Operation.OPEN.getMethod(), params, 391 f, true); 392 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 393 return new FSDataInputStream( 394 new HttpFSDataInputStream(conn.getInputStream(), bufferSize)); 395 } 396 397 /** 398 * HttpFSServer subclass of the <code>FSDataOutputStream</code>. 399 * <p/> 400 * This implementation closes the underlying HTTP connection validating the Http connection status 401 * at closing time. 402 */ 403 private static class HttpFSDataOutputStream extends FSDataOutputStream { 404 private HttpURLConnection conn; 405 private int closeStatus; 406 407 public HttpFSDataOutputStream(HttpURLConnection conn, OutputStream out, int closeStatus, Statistics stats) 408 throws IOException { 409 super(out, stats); 410 this.conn = conn; 411 this.closeStatus = closeStatus; 412 } 413 414 @Override 415 public void close() throws IOException { 416 try { 417 super.close(); 418 } finally { 419 HttpFSUtils.validateResponse(conn, closeStatus); 420 } 421 } 422 423 } 424 425 /** 426 * Converts a <code>FsPermission</code> to a Unix octal representation. 427 * 428 * @param p the permission. 429 * 430 * @return the Unix string symbolic reprentation. 431 */ 432 public static String permissionToString(FsPermission p) { 433 return Integer.toString((p == null) ? DEFAULT_PERMISSION : p.toShort(), 8); 434 } 435 436 /* 437 * Common handling for uploading data for create and append operations. 438 */ 439 private FSDataOutputStream uploadData(String method, Path f, Map<String, String> params, 440 int bufferSize, int expectedStatus) throws IOException { 441 HttpURLConnection conn = getConnection(method, params, f, true); 442 conn.setInstanceFollowRedirects(false); 443 boolean exceptionAlreadyHandled = false; 444 try { 445 if (conn.getResponseCode() == HTTP_TEMPORARY_REDIRECT) { 446 exceptionAlreadyHandled = true; 447 String location = conn.getHeaderField("Location"); 448 if (location != null) { 449 conn = getConnection(new URL(location), method); 450 conn.setRequestProperty("Content-Type", UPLOAD_CONTENT_TYPE); 451 try { 452 OutputStream os = new BufferedOutputStream(conn.getOutputStream(), bufferSize); 453 return new HttpFSDataOutputStream(conn, os, expectedStatus, statistics); 454 } catch (IOException ex) { 455 HttpFSUtils.validateResponse(conn, expectedStatus); 456 throw ex; 457 } 458 } else { 459 HttpFSUtils.validateResponse(conn, HTTP_TEMPORARY_REDIRECT); 460 throw new IOException("Missing HTTP 'Location' header for [" + conn.getURL() + "]"); 461 } 462 } else { 463 throw new IOException( 464 MessageFormat.format("Expected HTTP status was [307], received [{0}]", 465 conn.getResponseCode())); 466 } 467 } catch (IOException ex) { 468 if (exceptionAlreadyHandled) { 469 throw ex; 470 } else { 471 HttpFSUtils.validateResponse(conn, HTTP_TEMPORARY_REDIRECT); 472 throw ex; 473 } 474 } 475 } 476 477 478 /** 479 * Opens an FSDataOutputStream at the indicated Path with write-progress 480 * reporting. 481 * <p/> 482 * IMPORTANT: The <code>Progressable</code> parameter is not used. 483 * 484 * @param f the file name to open. 485 * @param permission file permission. 486 * @param overwrite if a file with this name already exists, then if true, 487 * the file will be overwritten, and if false an error will be thrown. 488 * @param bufferSize the size of the buffer to be used. 489 * @param replication required block replication for the file. 490 * @param blockSize block size. 491 * @param progress progressable. 492 * 493 * @throws IOException 494 * @see #setPermission(Path, FsPermission) 495 */ 496 @Override 497 public FSDataOutputStream create(Path f, FsPermission permission, 498 boolean overwrite, int bufferSize, 499 short replication, long blockSize, 500 Progressable progress) throws IOException { 501 Map<String, String> params = new HashMap<String, String>(); 502 params.put(OP_PARAM, Operation.CREATE.toString()); 503 params.put(OVERWRITE_PARAM, Boolean.toString(overwrite)); 504 params.put(REPLICATION_PARAM, Short.toString(replication)); 505 params.put(BLOCKSIZE_PARAM, Long.toString(blockSize)); 506 params.put(PERMISSION_PARAM, permissionToString(permission)); 507 return uploadData(Operation.CREATE.getMethod(), f, params, bufferSize, 508 HttpURLConnection.HTTP_CREATED); 509 } 510 511 512 /** 513 * Append to an existing file (optional operation). 514 * <p/> 515 * IMPORTANT: The <code>Progressable</code> parameter is not used. 516 * 517 * @param f the existing file to be appended. 518 * @param bufferSize the size of the buffer to be used. 519 * @param progress for reporting progress if it is not null. 520 * 521 * @throws IOException 522 */ 523 @Override 524 public FSDataOutputStream append(Path f, int bufferSize, 525 Progressable progress) throws IOException { 526 Map<String, String> params = new HashMap<String, String>(); 527 params.put(OP_PARAM, Operation.APPEND.toString()); 528 return uploadData(Operation.APPEND.getMethod(), f, params, bufferSize, 529 HttpURLConnection.HTTP_OK); 530 } 531 532 /** 533 * Renames Path src to Path dst. Can take place on local fs 534 * or remote DFS. 535 */ 536 @Override 537 public boolean rename(Path src, Path dst) throws IOException { 538 Map<String, String> params = new HashMap<String, String>(); 539 params.put(OP_PARAM, Operation.RENAME.toString()); 540 params.put(DESTINATION_PARAM, dst.toString()); 541 HttpURLConnection conn = getConnection(Operation.RENAME.getMethod(), 542 params, src, true); 543 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 544 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 545 return (Boolean) json.get(RENAME_JSON); 546 } 547 548 /** 549 * Delete a file. 550 * 551 * @deprecated Use delete(Path, boolean) instead 552 */ 553 @SuppressWarnings({"deprecation"}) 554 @Deprecated 555 @Override 556 public boolean delete(Path f) throws IOException { 557 return delete(f, false); 558 } 559 560 /** 561 * Delete a file. 562 * 563 * @param f the path to delete. 564 * @param recursive if path is a directory and set to 565 * true, the directory is deleted else throws an exception. In 566 * case of a file the recursive can be set to either true or false. 567 * 568 * @return true if delete is successful else false. 569 * 570 * @throws IOException 571 */ 572 @Override 573 public boolean delete(Path f, boolean recursive) throws IOException { 574 Map<String, String> params = new HashMap<String, String>(); 575 params.put(OP_PARAM, Operation.DELETE.toString()); 576 params.put(RECURSIVE_PARAM, Boolean.toString(recursive)); 577 HttpURLConnection conn = getConnection(Operation.DELETE.getMethod(), 578 params, f, true); 579 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 580 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 581 return (Boolean) json.get(DELETE_JSON); 582 } 583 584 /** 585 * List the statuses of the files/directories in the given path if the path is 586 * a directory. 587 * 588 * @param f given path 589 * 590 * @return the statuses of the files/directories in the given patch 591 * 592 * @throws IOException 593 */ 594 @Override 595 public FileStatus[] listStatus(Path f) throws IOException { 596 Map<String, String> params = new HashMap<String, String>(); 597 params.put(OP_PARAM, Operation.LISTSTATUS.toString()); 598 HttpURLConnection conn = getConnection(Operation.LISTSTATUS.getMethod(), 599 params, f, true); 600 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 601 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 602 json = (JSONObject) json.get(FILE_STATUSES_JSON); 603 JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON); 604 FileStatus[] array = new FileStatus[jsonArray.size()]; 605 f = makeQualified(f); 606 for (int i = 0; i < jsonArray.size(); i++) { 607 array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i)); 608 } 609 return array; 610 } 611 612 /** 613 * Set the current working directory for the given file system. All relative 614 * paths will be resolved relative to it. 615 * 616 * @param newDir new directory. 617 */ 618 @Override 619 public void setWorkingDirectory(Path newDir) { 620 workingDir = newDir; 621 } 622 623 /** 624 * Get the current working directory for the given file system 625 * 626 * @return the directory pathname 627 */ 628 @Override 629 public Path getWorkingDirectory() { 630 if (workingDir == null) { 631 workingDir = getHomeDirectory(); 632 } 633 return workingDir; 634 } 635 636 /** 637 * Make the given file and all non-existent parents into 638 * directories. Has the semantics of Unix 'mkdir -p'. 639 * Existence of the directory hierarchy is not an error. 640 */ 641 @Override 642 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 643 Map<String, String> params = new HashMap<String, String>(); 644 params.put(OP_PARAM, Operation.MKDIRS.toString()); 645 params.put(PERMISSION_PARAM, permissionToString(permission)); 646 HttpURLConnection conn = getConnection(Operation.MKDIRS.getMethod(), 647 params, f, true); 648 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 649 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 650 return (Boolean) json.get(MKDIRS_JSON); 651 } 652 653 /** 654 * Return a file status object that represents the path. 655 * 656 * @param f The path we want information from 657 * 658 * @return a FileStatus object 659 * 660 * @throws FileNotFoundException when the path does not exist; 661 * IOException see specific implementation 662 */ 663 @Override 664 public FileStatus getFileStatus(Path f) throws IOException { 665 Map<String, String> params = new HashMap<String, String>(); 666 params.put(OP_PARAM, Operation.GETFILESTATUS.toString()); 667 HttpURLConnection conn = getConnection(Operation.GETFILESTATUS.getMethod(), 668 params, f, true); 669 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 670 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 671 json = (JSONObject) json.get(FILE_STATUS_JSON); 672 f = makeQualified(f); 673 return createFileStatus(f, json); 674 } 675 676 /** 677 * Return the current user's home directory in this filesystem. 678 * The default implementation returns "/user/$USER/". 679 */ 680 @Override 681 public Path getHomeDirectory() { 682 Map<String, String> params = new HashMap<String, String>(); 683 params.put(OP_PARAM, Operation.GETHOMEDIRECTORY.toString()); 684 try { 685 HttpURLConnection conn = 686 getConnection(Operation.GETHOMEDIRECTORY.getMethod(), params, 687 new Path(getUri().toString(), "/"), false); 688 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 689 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 690 return new Path((String) json.get(HOME_DIR_JSON)); 691 } catch (IOException ex) { 692 throw new RuntimeException(ex); 693 } 694 } 695 696 /** 697 * Set owner of a path (i.e. a file or a directory). 698 * The parameters username and groupname cannot both be null. 699 * 700 * @param p The path 701 * @param username If it is null, the original username remains unchanged. 702 * @param groupname If it is null, the original groupname remains unchanged. 703 */ 704 @Override 705 public void setOwner(Path p, String username, String groupname) 706 throws IOException { 707 Map<String, String> params = new HashMap<String, String>(); 708 params.put(OP_PARAM, Operation.SETOWNER.toString()); 709 params.put(OWNER_PARAM, username); 710 params.put(GROUP_PARAM, groupname); 711 HttpURLConnection conn = getConnection(Operation.SETOWNER.getMethod(), 712 params, p, true); 713 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 714 } 715 716 /** 717 * Set permission of a path. 718 * 719 * @param p path. 720 * @param permission permission. 721 */ 722 @Override 723 public void setPermission(Path p, FsPermission permission) throws IOException { 724 Map<String, String> params = new HashMap<String, String>(); 725 params.put(OP_PARAM, Operation.SETPERMISSION.toString()); 726 params.put(PERMISSION_PARAM, permissionToString(permission)); 727 HttpURLConnection conn = getConnection(Operation.SETPERMISSION.getMethod(), params, p, true); 728 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 729 } 730 731 /** 732 * Set access time of a file 733 * 734 * @param p The path 735 * @param mtime Set the modification time of this file. 736 * The number of milliseconds since Jan 1, 1970. 737 * A value of -1 means that this call should not set modification time. 738 * @param atime Set the access time of this file. 739 * The number of milliseconds since Jan 1, 1970. 740 * A value of -1 means that this call should not set access time. 741 */ 742 @Override 743 public void setTimes(Path p, long mtime, long atime) throws IOException { 744 Map<String, String> params = new HashMap<String, String>(); 745 params.put(OP_PARAM, Operation.SETTIMES.toString()); 746 params.put(MODIFICATION_TIME_PARAM, Long.toString(mtime)); 747 params.put(ACCESS_TIME_PARAM, Long.toString(atime)); 748 HttpURLConnection conn = getConnection(Operation.SETTIMES.getMethod(), 749 params, p, true); 750 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 751 } 752 753 /** 754 * Set replication for an existing file. 755 * 756 * @param src file name 757 * @param replication new replication 758 * 759 * @return true if successful; 760 * false if file does not exist or is a directory 761 * 762 * @throws IOException 763 */ 764 @Override 765 public boolean setReplication(Path src, short replication) 766 throws IOException { 767 Map<String, String> params = new HashMap<String, String>(); 768 params.put(OP_PARAM, Operation.SETREPLICATION.toString()); 769 params.put(REPLICATION_PARAM, Short.toString(replication)); 770 HttpURLConnection conn = 771 getConnection(Operation.SETREPLICATION.getMethod(), params, src, true); 772 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 773 JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn); 774 return (Boolean) json.get(SET_REPLICATION_JSON); 775 } 776 777 private FileStatus createFileStatus(Path parent, JSONObject json) { 778 String pathSuffix = (String) json.get(PATH_SUFFIX_JSON); 779 Path path = (pathSuffix.equals("")) ? parent : new Path(parent, pathSuffix); 780 FILE_TYPE type = FILE_TYPE.valueOf((String) json.get(TYPE_JSON)); 781 long len = (Long) json.get(LENGTH_JSON); 782 String owner = (String) json.get(OWNER_JSON); 783 String group = (String) json.get(GROUP_JSON); 784 FsPermission permission = 785 new FsPermission(Short.parseShort((String) json.get(PERMISSION_JSON), 8)); 786 long aTime = (Long) json.get(ACCESS_TIME_JSON); 787 long mTime = (Long) json.get(MODIFICATION_TIME_JSON); 788 long blockSize = (Long) json.get(BLOCK_SIZE_JSON); 789 short replication = ((Long) json.get(REPLICATION_JSON)).shortValue(); 790 FileStatus fileStatus = null; 791 792 switch (type) { 793 case FILE: 794 case DIRECTORY: 795 fileStatus = new FileStatus(len, (type == FILE_TYPE.DIRECTORY), 796 replication, blockSize, mTime, aTime, 797 permission, owner, group, path); 798 break; 799 case SYMLINK: 800 Path symLink = null; 801 fileStatus = new FileStatus(len, false, 802 replication, blockSize, mTime, aTime, 803 permission, owner, group, symLink, 804 path); 805 } 806 return fileStatus; 807 } 808 809 @Override 810 public ContentSummary getContentSummary(Path f) throws IOException { 811 Map<String, String> params = new HashMap<String, String>(); 812 params.put(OP_PARAM, Operation.GETCONTENTSUMMARY.toString()); 813 HttpURLConnection conn = 814 getConnection(Operation.GETCONTENTSUMMARY.getMethod(), params, f, true); 815 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 816 JSONObject json = (JSONObject) ((JSONObject) 817 HttpFSUtils.jsonParse(conn)).get(CONTENT_SUMMARY_JSON); 818 return new ContentSummary((Long) json.get(CONTENT_SUMMARY_LENGTH_JSON), 819 (Long) json.get(CONTENT_SUMMARY_FILE_COUNT_JSON), 820 (Long) json.get(CONTENT_SUMMARY_DIRECTORY_COUNT_JSON), 821 (Long) json.get(CONTENT_SUMMARY_QUOTA_JSON), 822 (Long) json.get(CONTENT_SUMMARY_SPACE_CONSUMED_JSON), 823 (Long) json.get(CONTENT_SUMMARY_SPACE_QUOTA_JSON) 824 ); 825 } 826 827 @Override 828 public FileChecksum getFileChecksum(Path f) throws IOException { 829 Map<String, String> params = new HashMap<String, String>(); 830 params.put(OP_PARAM, Operation.GETFILECHECKSUM.toString()); 831 HttpURLConnection conn = 832 getConnection(Operation.GETFILECHECKSUM.getMethod(), params, f, true); 833 HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK); 834 final JSONObject json = (JSONObject) ((JSONObject) 835 HttpFSUtils.jsonParse(conn)).get(FILE_CHECKSUM_JSON); 836 return new FileChecksum() { 837 @Override 838 public String getAlgorithmName() { 839 return (String) json.get(CHECKSUM_ALGORITHM_JSON); 840 } 841 842 @Override 843 public int getLength() { 844 return ((Long) json.get(CHECKSUM_LENGTH_JSON)).intValue(); 845 } 846 847 @Override 848 public byte[] getBytes() { 849 return StringUtils.hexStringToByte((String) json.get(CHECKSUM_BYTES_JSON)); 850 } 851 852 @Override 853 public void write(DataOutput out) throws IOException { 854 throw new UnsupportedOperationException(); 855 } 856 857 @Override 858 public void readFields(DataInput in) throws IOException { 859 throw new UnsupportedOperationException(); 860 } 861 }; 862 } 863 864 865 @Override 866 @SuppressWarnings("deprecation") 867 public Token<?> getDelegationToken(final String renewer) 868 throws IOException { 869 return doAsRealUserIfNecessary(new Callable<Token<?>>() { 870 @Override 871 public Token<?> call() throws Exception { 872 return HttpFSKerberosAuthenticator. 873 getDelegationToken(uri, httpFSAddr, authToken, renewer); 874 } 875 }); 876 } 877 878 879 @Override 880 public List<Token<?>> getDelegationTokens(final String renewer) 881 throws IOException { 882 return doAsRealUserIfNecessary(new Callable<List<Token<?>>>() { 883 @Override 884 public List<Token<?>> call() throws Exception { 885 return HttpFSKerberosAuthenticator. 886 getDelegationTokens(uri, httpFSAddr, authToken, renewer); 887 } 888 }); 889 } 890 891 public long renewDelegationToken(final Token<?> token) throws IOException { 892 return doAsRealUserIfNecessary(new Callable<Long>() { 893 @Override 894 public Long call() throws Exception { 895 return HttpFSKerberosAuthenticator. 896 renewDelegationToken(uri, authToken, token); 897 } 898 }); 899 } 900 901 public void cancelDelegationToken(final Token<?> token) throws IOException { 902 HttpFSKerberosAuthenticator. 903 cancelDelegationToken(uri, authToken, token); 904 } 905 906 @Override 907 public Token<?> getRenewToken() { 908 return delegationToken; 909 } 910 911 @Override 912 public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) { 913 delegationToken = token; 914 } 915 916 }