001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs; 020 021 import java.io.BufferedOutputStream; 022 import java.io.DataOutput; 023 import java.io.File; 024 import java.io.FileInputStream; 025 import java.io.FileNotFoundException; 026 import java.io.FileOutputStream; 027 import java.io.IOException; 028 import java.io.OutputStream; 029 import java.io.FileDescriptor; 030 import java.net.URI; 031 import java.nio.ByteBuffer; 032 import java.util.Arrays; 033 import java.util.StringTokenizer; 034 035 import org.apache.hadoop.classification.InterfaceAudience; 036 import org.apache.hadoop.classification.InterfaceStability; 037 import org.apache.hadoop.conf.Configuration; 038 import org.apache.hadoop.fs.permission.FsPermission; 039 import org.apache.hadoop.io.nativeio.NativeIO; 040 import org.apache.hadoop.util.Progressable; 041 import org.apache.hadoop.util.Shell; 042 import org.apache.hadoop.util.StringUtils; 043 044 /**************************************************************** 045 * Implement the FileSystem API for the raw local filesystem. 046 * 047 *****************************************************************/ 048 @InterfaceAudience.Public 049 @InterfaceStability.Stable 050 public class RawLocalFileSystem extends FileSystem { 051 static final URI NAME = URI.create("file:///"); 052 private Path workingDir; 053 054 public RawLocalFileSystem() { 055 workingDir = getInitialWorkingDirectory(); 056 } 057 058 private Path makeAbsolute(Path f) { 059 if (f.isAbsolute()) { 060 return f; 061 } else { 062 return new Path(workingDir, f); 063 } 064 } 065 066 /** Convert a path to a File. */ 067 public File pathToFile(Path path) { 068 checkPath(path); 069 if (!path.isAbsolute()) { 070 path = new Path(getWorkingDirectory(), path); 071 } 072 return new File(path.toUri().getPath()); 073 } 074 075 public URI getUri() { return NAME; } 076 077 public void initialize(URI uri, Configuration conf) throws IOException { 078 super.initialize(uri, conf); 079 setConf(conf); 080 } 081 082 class TrackingFileInputStream extends FileInputStream { 083 public TrackingFileInputStream(File f) throws IOException { 084 super(f); 085 } 086 087 public int read() throws IOException { 088 int result = super.read(); 089 if (result != -1) { 090 statistics.incrementBytesRead(1); 091 } 092 return result; 093 } 094 095 public int read(byte[] data) throws IOException { 096 int result = super.read(data); 097 if (result != -1) { 098 statistics.incrementBytesRead(result); 099 } 100 return result; 101 } 102 103 public int read(byte[] data, int offset, int length) throws IOException { 104 int result = super.read(data, offset, length); 105 if (result != -1) { 106 statistics.incrementBytesRead(result); 107 } 108 return result; 109 } 110 } 111 112 /******************************************************* 113 * For open()'s FSInputStream. 114 *******************************************************/ 115 class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor { 116 private FileInputStream fis; 117 private long position; 118 119 public LocalFSFileInputStream(Path f) throws IOException { 120 this.fis = new TrackingFileInputStream(pathToFile(f)); 121 } 122 123 public void seek(long pos) throws IOException { 124 fis.getChannel().position(pos); 125 this.position = pos; 126 } 127 128 public long getPos() throws IOException { 129 return this.position; 130 } 131 132 public boolean seekToNewSource(long targetPos) throws IOException { 133 return false; 134 } 135 136 /* 137 * Just forward to the fis 138 */ 139 public int available() throws IOException { return fis.available(); } 140 public void close() throws IOException { fis.close(); } 141 @Override 142 public boolean markSupported() { return false; } 143 144 public int read() throws IOException { 145 try { 146 int value = fis.read(); 147 if (value >= 0) { 148 this.position++; 149 } 150 return value; 151 } catch (IOException e) { // unexpected exception 152 throw new FSError(e); // assume native fs error 153 } 154 } 155 156 public int read(byte[] b, int off, int len) throws IOException { 157 try { 158 int value = fis.read(b, off, len); 159 if (value > 0) { 160 this.position += value; 161 } 162 return value; 163 } catch (IOException e) { // unexpected exception 164 throw new FSError(e); // assume native fs error 165 } 166 } 167 168 public int read(long position, byte[] b, int off, int len) 169 throws IOException { 170 ByteBuffer bb = ByteBuffer.wrap(b, off, len); 171 try { 172 return fis.getChannel().read(bb, position); 173 } catch (IOException e) { 174 throw new FSError(e); 175 } 176 } 177 178 public long skip(long n) throws IOException { 179 long value = fis.skip(n); 180 if (value > 0) { 181 this.position += value; 182 } 183 return value; 184 } 185 186 @Override 187 public FileDescriptor getFileDescriptor() throws IOException { 188 return fis.getFD(); 189 } 190 } 191 192 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 193 if (!exists(f)) { 194 throw new FileNotFoundException(f.toString()); 195 } 196 return new FSDataInputStream(new BufferedFSInputStream( 197 new LocalFSFileInputStream(f), bufferSize)); 198 } 199 200 /********************************************************* 201 * For create()'s FSOutputStream. 202 *********************************************************/ 203 class LocalFSFileOutputStream extends OutputStream { 204 private FileOutputStream fos; 205 206 private LocalFSFileOutputStream(Path f, boolean append) throws IOException { 207 this.fos = new FileOutputStream(pathToFile(f), append); 208 } 209 210 /* 211 * Just forward to the fos 212 */ 213 public void close() throws IOException { fos.close(); } 214 public void flush() throws IOException { fos.flush(); } 215 public void write(byte[] b, int off, int len) throws IOException { 216 try { 217 fos.write(b, off, len); 218 } catch (IOException e) { // unexpected exception 219 throw new FSError(e); // assume native fs error 220 } 221 } 222 223 public void write(int b) throws IOException { 224 try { 225 fos.write(b); 226 } catch (IOException e) { // unexpected exception 227 throw new FSError(e); // assume native fs error 228 } 229 } 230 } 231 232 /** {@inheritDoc} */ 233 public FSDataOutputStream append(Path f, int bufferSize, 234 Progressable progress) throws IOException { 235 if (!exists(f)) { 236 throw new FileNotFoundException("File " + f + " not found"); 237 } 238 if (getFileStatus(f).isDirectory()) { 239 throw new IOException("Cannot append to a diretory (=" + f + " )"); 240 } 241 return new FSDataOutputStream(new BufferedOutputStream( 242 new LocalFSFileOutputStream(f, true), bufferSize), statistics); 243 } 244 245 /** {@inheritDoc} */ 246 @Override 247 public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 248 short replication, long blockSize, Progressable progress) 249 throws IOException { 250 return create(f, overwrite, true, bufferSize, replication, blockSize, progress); 251 } 252 253 private FSDataOutputStream create(Path f, boolean overwrite, 254 boolean createParent, int bufferSize, short replication, long blockSize, 255 Progressable progress) throws IOException { 256 if (exists(f) && !overwrite) { 257 throw new IOException("File already exists: "+f); 258 } 259 Path parent = f.getParent(); 260 if (parent != null && !mkdirs(parent)) { 261 throw new IOException("Mkdirs failed to create " + parent.toString()); 262 } 263 return new FSDataOutputStream(new BufferedOutputStream( 264 new LocalFSFileOutputStream(f, false), bufferSize), statistics); 265 } 266 267 /** {@inheritDoc} */ 268 @Override 269 public FSDataOutputStream create(Path f, FsPermission permission, 270 boolean overwrite, int bufferSize, short replication, long blockSize, 271 Progressable progress) throws IOException { 272 273 FSDataOutputStream out = create(f, 274 overwrite, bufferSize, replication, blockSize, progress); 275 setPermission(f, permission); 276 return out; 277 } 278 279 /** {@inheritDoc} */ 280 @Override 281 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 282 boolean overwrite, 283 int bufferSize, short replication, long blockSize, 284 Progressable progress) throws IOException { 285 FSDataOutputStream out = create(f, 286 overwrite, false, bufferSize, replication, blockSize, progress); 287 setPermission(f, permission); 288 return out; 289 } 290 291 public boolean rename(Path src, Path dst) throws IOException { 292 if (pathToFile(src).renameTo(pathToFile(dst))) { 293 return true; 294 } 295 return FileUtil.copy(this, src, this, dst, true, getConf()); 296 } 297 298 /** 299 * Delete the given path to a file or directory. 300 * @param p the path to delete 301 * @param recursive to delete sub-directories 302 * @return true if the file or directory and all its contents were deleted 303 * @throws IOException if p is non-empty and recursive is false 304 */ 305 public boolean delete(Path p, boolean recursive) throws IOException { 306 File f = pathToFile(p); 307 if (f.isFile()) { 308 return f.delete(); 309 } else if (!recursive && f.isDirectory() && 310 (FileUtil.listFiles(f).length != 0)) { 311 throw new IOException("Directory " + f.toString() + " is not empty"); 312 } 313 return FileUtil.fullyDelete(f); 314 } 315 316 public FileStatus[] listStatus(Path f) throws IOException { 317 File localf = pathToFile(f); 318 FileStatus[] results; 319 320 if (!localf.exists()) { 321 throw new FileNotFoundException("File " + f + " does not exist"); 322 } 323 if (localf.isFile()) { 324 return new FileStatus[] { 325 new RawLocalFileStatus(localf, getDefaultBlockSize(f), this) }; 326 } 327 328 String[] names = localf.list(); 329 if (names == null) { 330 return null; 331 } 332 results = new FileStatus[names.length]; 333 int j = 0; 334 for (int i = 0; i < names.length; i++) { 335 try { 336 results[j] = getFileStatus(new Path(f, names[i])); 337 j++; 338 } catch (FileNotFoundException e) { 339 // ignore the files not found since the dir list may have have changed 340 // since the names[] list was generated. 341 } 342 } 343 if (j == names.length) { 344 return results; 345 } 346 return Arrays.copyOf(results, j); 347 } 348 349 /** 350 * Creates the specified directory hierarchy. Does not 351 * treat existence as an error. 352 */ 353 public boolean mkdirs(Path f) throws IOException { 354 if(f == null) { 355 throw new IllegalArgumentException("mkdirs path arg is null"); 356 } 357 Path parent = f.getParent(); 358 File p2f = pathToFile(f); 359 if(parent != null) { 360 File parent2f = pathToFile(parent); 361 if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) { 362 throw new FileAlreadyExistsException("Parent path is not a directory: " 363 + parent); 364 } 365 } 366 return (parent == null || mkdirs(parent)) && 367 (p2f.mkdir() || p2f.isDirectory()); 368 } 369 370 /** {@inheritDoc} */ 371 @Override 372 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 373 boolean b = mkdirs(f); 374 if(b) { 375 setPermission(f, permission); 376 } 377 return b; 378 } 379 380 381 @Override 382 protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) 383 throws IOException { 384 boolean b = mkdirs(f); 385 setPermission(f, absolutePermission); 386 return b; 387 } 388 389 390 @Override 391 public Path getHomeDirectory() { 392 return this.makeQualified(new Path(System.getProperty("user.home"))); 393 } 394 395 /** 396 * Set the working directory to the given directory. 397 */ 398 @Override 399 public void setWorkingDirectory(Path newDir) { 400 workingDir = makeAbsolute(newDir); 401 checkPath(workingDir); 402 403 } 404 405 @Override 406 public Path getWorkingDirectory() { 407 return workingDir; 408 } 409 410 @Override 411 protected Path getInitialWorkingDirectory() { 412 return this.makeQualified(new Path(System.getProperty("user.dir"))); 413 } 414 415 /** {@inheritDoc} */ 416 @Override 417 public FsStatus getStatus(Path p) throws IOException { 418 File partition = pathToFile(p == null ? new Path("/") : p); 419 //File provides getUsableSpace() and getFreeSpace() 420 //File provides no API to obtain used space, assume used = total - free 421 return new FsStatus(partition.getTotalSpace(), 422 partition.getTotalSpace() - partition.getFreeSpace(), 423 partition.getFreeSpace()); 424 } 425 426 // In the case of the local filesystem, we can just rename the file. 427 public void moveFromLocalFile(Path src, Path dst) throws IOException { 428 rename(src, dst); 429 } 430 431 // We can write output directly to the final location 432 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 433 throws IOException { 434 return fsOutputFile; 435 } 436 437 // It's in the right place - nothing to do. 438 public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile) 439 throws IOException { 440 } 441 442 public void close() throws IOException { 443 super.close(); 444 } 445 446 public String toString() { 447 return "LocalFS"; 448 } 449 450 public FileStatus getFileStatus(Path f) throws IOException { 451 File path = pathToFile(f); 452 if (path.exists()) { 453 return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(f), this); 454 } else { 455 throw new FileNotFoundException("File " + f + " does not exist"); 456 } 457 } 458 459 static class RawLocalFileStatus extends FileStatus { 460 /* We can add extra fields here. It breaks at least CopyFiles.FilePair(). 461 * We recognize if the information is already loaded by check if 462 * onwer.equals(""). 463 */ 464 private boolean isPermissionLoaded() { 465 return !super.getOwner().equals(""); 466 } 467 468 RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) { 469 super(f.length(), f.isDirectory(), 1, defaultBlockSize, 470 f.lastModified(), fs.makeQualified(new Path(f.getPath()))); 471 } 472 473 @Override 474 public FsPermission getPermission() { 475 if (!isPermissionLoaded()) { 476 loadPermissionInfo(); 477 } 478 return super.getPermission(); 479 } 480 481 @Override 482 public String getOwner() { 483 if (!isPermissionLoaded()) { 484 loadPermissionInfo(); 485 } 486 return super.getOwner(); 487 } 488 489 @Override 490 public String getGroup() { 491 if (!isPermissionLoaded()) { 492 loadPermissionInfo(); 493 } 494 return super.getGroup(); 495 } 496 497 /// loads permissions, owner, and group from `ls -ld` 498 private void loadPermissionInfo() { 499 IOException e = null; 500 try { 501 StringTokenizer t = new StringTokenizer( 502 execCommand(new File(getPath().toUri()), 503 Shell.getGET_PERMISSION_COMMAND())); 504 //expected format 505 //-rw------- 1 username groupname ... 506 String permission = t.nextToken(); 507 if (permission.length() > 10) { //files with ACLs might have a '+' 508 permission = permission.substring(0, 10); 509 } 510 setPermission(FsPermission.valueOf(permission)); 511 t.nextToken(); 512 setOwner(t.nextToken()); 513 setGroup(t.nextToken()); 514 } catch (Shell.ExitCodeException ioe) { 515 if (ioe.getExitCode() != 1) { 516 e = ioe; 517 } else { 518 setPermission(null); 519 setOwner(null); 520 setGroup(null); 521 } 522 } catch (IOException ioe) { 523 e = ioe; 524 } finally { 525 if (e != null) { 526 throw new RuntimeException("Error while running command to get " + 527 "file permissions : " + 528 StringUtils.stringifyException(e)); 529 } 530 } 531 } 532 533 @Override 534 public void write(DataOutput out) throws IOException { 535 if (!isPermissionLoaded()) { 536 loadPermissionInfo(); 537 } 538 super.write(out); 539 } 540 } 541 542 /** 543 * Use the command chown to set owner. 544 */ 545 @Override 546 public void setOwner(Path p, String username, String groupname) 547 throws IOException { 548 if (username == null && groupname == null) { 549 throw new IOException("username == null && groupname == null"); 550 } 551 552 if (username == null) { 553 execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 554 } else { 555 //OWNER[:[GROUP]] 556 String s = username + (groupname == null? "": ":" + groupname); 557 execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s); 558 } 559 } 560 561 /** 562 * Use the command chmod to set permission. 563 */ 564 @Override 565 public void setPermission(Path p, FsPermission permission) 566 throws IOException { 567 if (NativeIO.isAvailable()) { 568 NativeIO.chmod(pathToFile(p).getCanonicalPath(), 569 permission.toShort()); 570 } else { 571 execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND, 572 String.format("%05o", permission.toShort())); 573 } 574 } 575 576 private static String execCommand(File f, String... cmd) throws IOException { 577 String[] args = new String[cmd.length + 1]; 578 System.arraycopy(cmd, 0, args, 0, cmd.length); 579 args[cmd.length] = FileUtil.makeShellPath(f, true); 580 String output = Shell.execCommand(args); 581 return output; 582 } 583 584 }