001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.*; 022 import java.util.*; 023 import java.rmi.server.UID; 024 import java.security.MessageDigest; 025 import org.apache.commons.logging.*; 026 import org.apache.hadoop.util.Options; 027 import org.apache.hadoop.fs.*; 028 import org.apache.hadoop.fs.Options.CreateOpts; 029 import org.apache.hadoop.io.compress.CodecPool; 030 import org.apache.hadoop.io.compress.CompressionCodec; 031 import org.apache.hadoop.io.compress.CompressionInputStream; 032 import org.apache.hadoop.io.compress.CompressionOutputStream; 033 import org.apache.hadoop.io.compress.Compressor; 034 import org.apache.hadoop.io.compress.Decompressor; 035 import org.apache.hadoop.io.compress.DefaultCodec; 036 import org.apache.hadoop.io.compress.GzipCodec; 037 import org.apache.hadoop.io.compress.zlib.ZlibFactory; 038 import org.apache.hadoop.io.serializer.Deserializer; 039 import org.apache.hadoop.io.serializer.Serializer; 040 import org.apache.hadoop.io.serializer.SerializationFactory; 041 import org.apache.hadoop.classification.InterfaceAudience; 042 import org.apache.hadoop.classification.InterfaceStability; 043 import org.apache.hadoop.conf.*; 044 import org.apache.hadoop.util.Progressable; 045 import org.apache.hadoop.util.Progress; 046 import org.apache.hadoop.util.ReflectionUtils; 047 import org.apache.hadoop.util.NativeCodeLoader; 048 import org.apache.hadoop.util.MergeSort; 049 import org.apache.hadoop.util.PriorityQueue; 050 import org.apache.hadoop.util.Time; 051 052 /** 053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 054 * pairs. 055 * 056 * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and 057 * {@link Sorter} classes for writing, reading and sorting respectively.</p> 058 * 059 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 060 * {@link CompressionType} used to compress key/value pairs: 061 * <ol> 062 * <li> 063 * <code>Writer</code> : Uncompressed records. 064 * </li> 065 * <li> 066 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 067 * values. 068 * </li> 069 * <li> 070 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 071 * values are collected in 'blocks' 072 * separately and compressed. The size of 073 * the 'block' is configurable. 074 * </ol> 075 * 076 * <p>The actual compression algorithm used to compress key and/or values can be 077 * specified by using the appropriate {@link CompressionCodec}.</p> 078 * 079 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 080 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 081 * 082 * <p>The {@link Reader} acts as the bridge and can read any of the above 083 * <code>SequenceFile</code> formats.</p> 084 * 085 * <h4 id="Formats">SequenceFile Formats</h4> 086 * 087 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 088 * depending on the <code>CompressionType</code> specified. All of them share a 089 * <a href="#Header">common header</a> described below. 090 * 091 * <h5 id="Header">SequenceFile Header</h5> 092 * <ul> 093 * <li> 094 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 095 * version number (e.g. SEQ4 or SEQ6) 096 * </li> 097 * <li> 098 * keyClassName -key class 099 * </li> 100 * <li> 101 * valueClassName - value class 102 * </li> 103 * <li> 104 * compression - A boolean which specifies if compression is turned on for 105 * keys/values in this file. 106 * </li> 107 * <li> 108 * blockCompression - A boolean which specifies if block-compression is 109 * turned on for keys/values in this file. 110 * </li> 111 * <li> 112 * compression codec - <code>CompressionCodec</code> class which is used for 113 * compression of keys and/or values (if compression is 114 * enabled). 115 * </li> 116 * <li> 117 * metadata - {@link Metadata} for this file. 118 * </li> 119 * <li> 120 * sync - A sync marker to denote end of the header. 121 * </li> 122 * </ul> 123 * 124 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 125 * <ul> 126 * <li> 127 * <a href="#Header">Header</a> 128 * </li> 129 * <li> 130 * Record 131 * <ul> 132 * <li>Record length</li> 133 * <li>Key length</li> 134 * <li>Key</li> 135 * <li>Value</li> 136 * </ul> 137 * </li> 138 * <li> 139 * A sync-marker every few <code>100</code> bytes or so. 140 * </li> 141 * </ul> 142 * 143 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 144 * <ul> 145 * <li> 146 * <a href="#Header">Header</a> 147 * </li> 148 * <li> 149 * Record 150 * <ul> 151 * <li>Record length</li> 152 * <li>Key length</li> 153 * <li>Key</li> 154 * <li><i>Compressed</i> Value</li> 155 * </ul> 156 * </li> 157 * <li> 158 * A sync-marker every few <code>100</code> bytes or so. 159 * </li> 160 * </ul> 161 * 162 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 163 * <ul> 164 * <li> 165 * <a href="#Header">Header</a> 166 * </li> 167 * <li> 168 * Record <i>Block</i> 169 * <ul> 170 * <li>Uncompressed number of records in the block</li> 171 * <li>Compressed key-lengths block-size</li> 172 * <li>Compressed key-lengths block</li> 173 * <li>Compressed keys block-size</li> 174 * <li>Compressed keys block</li> 175 * <li>Compressed value-lengths block-size</li> 176 * <li>Compressed value-lengths block</li> 177 * <li>Compressed values block-size</li> 178 * <li>Compressed values block</li> 179 * </ul> 180 * </li> 181 * <li> 182 * A sync-marker every block. 183 * </li> 184 * </ul> 185 * 186 * <p>The compressed blocks of key lengths and value lengths consist of the 187 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 188 * format.</p> 189 * 190 * @see CompressionCodec 191 */ 192 @InterfaceAudience.Public 193 @InterfaceStability.Stable 194 public class SequenceFile { 195 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 196 197 private SequenceFile() {} // no public ctor 198 199 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 200 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 201 private static final byte VERSION_WITH_METADATA = (byte)6; 202 private static byte[] VERSION = new byte[] { 203 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 204 }; 205 206 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 207 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 208 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 209 210 /** The number of bytes between sync points.*/ 211 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 212 213 /** 214 * The compression type used to compress key/value pairs in the 215 * {@link SequenceFile}. 216 * 217 * @see SequenceFile.Writer 218 */ 219 public static enum CompressionType { 220 /** Do not compress records. */ 221 NONE, 222 /** Compress values only, each separately. */ 223 RECORD, 224 /** Compress sequences of records together in blocks. */ 225 BLOCK 226 } 227 228 /** 229 * Get the compression type for the reduce outputs 230 * @param job the job config to look in 231 * @return the kind of compression to use 232 */ 233 static public CompressionType getDefaultCompressionType(Configuration job) { 234 String name = job.get("io.seqfile.compression.type"); 235 return name == null ? CompressionType.RECORD : 236 CompressionType.valueOf(name); 237 } 238 239 /** 240 * Set the default compression type for sequence files. 241 * @param job the configuration to modify 242 * @param val the new compression type (none, block, record) 243 */ 244 static public void setDefaultCompressionType(Configuration job, 245 CompressionType val) { 246 job.set("io.seqfile.compression.type", val.toString()); 247 } 248 249 /** 250 * Create a new Writer with the given options. 251 * @param conf the configuration to use 252 * @param opts the options to create the file with 253 * @return a new Writer 254 * @throws IOException 255 */ 256 public static Writer createWriter(Configuration conf, Writer.Option... opts 257 ) throws IOException { 258 Writer.CompressionOption compressionOption = 259 Options.getOption(Writer.CompressionOption.class, opts); 260 CompressionType kind; 261 if (compressionOption != null) { 262 kind = compressionOption.getValue(); 263 } else { 264 kind = getDefaultCompressionType(conf); 265 opts = Options.prependOptions(opts, Writer.compression(kind)); 266 } 267 switch (kind) { 268 default: 269 case NONE: 270 return new Writer(conf, opts); 271 case RECORD: 272 return new RecordCompressWriter(conf, opts); 273 case BLOCK: 274 return new BlockCompressWriter(conf, opts); 275 } 276 } 277 278 /** 279 * Construct the preferred type of SequenceFile Writer. 280 * @param fs The configured filesystem. 281 * @param conf The configuration. 282 * @param name The name of the file. 283 * @param keyClass The 'key' type. 284 * @param valClass The 'value' type. 285 * @return Returns the handle to the constructed SequenceFile Writer. 286 * @throws IOException 287 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 288 * instead. 289 */ 290 @Deprecated 291 public static Writer 292 createWriter(FileSystem fs, Configuration conf, Path name, 293 Class keyClass, Class valClass) throws IOException { 294 return createWriter(conf, Writer.filesystem(fs), 295 Writer.file(name), Writer.keyClass(keyClass), 296 Writer.valueClass(valClass)); 297 } 298 299 /** 300 * Construct the preferred type of SequenceFile Writer. 301 * @param fs The configured filesystem. 302 * @param conf The configuration. 303 * @param name The name of the file. 304 * @param keyClass The 'key' type. 305 * @param valClass The 'value' type. 306 * @param compressionType The compression type. 307 * @return Returns the handle to the constructed SequenceFile Writer. 308 * @throws IOException 309 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 310 * instead. 311 */ 312 @Deprecated 313 public static Writer 314 createWriter(FileSystem fs, Configuration conf, Path name, 315 Class keyClass, Class valClass, 316 CompressionType compressionType) throws IOException { 317 return createWriter(conf, Writer.filesystem(fs), 318 Writer.file(name), Writer.keyClass(keyClass), 319 Writer.valueClass(valClass), 320 Writer.compression(compressionType)); 321 } 322 323 /** 324 * Construct the preferred type of SequenceFile Writer. 325 * @param fs The configured filesystem. 326 * @param conf The configuration. 327 * @param name The name of the file. 328 * @param keyClass The 'key' type. 329 * @param valClass The 'value' type. 330 * @param compressionType The compression type. 331 * @param progress The Progressable object to track progress. 332 * @return Returns the handle to the constructed SequenceFile Writer. 333 * @throws IOException 334 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 335 * instead. 336 */ 337 @Deprecated 338 public static Writer 339 createWriter(FileSystem fs, Configuration conf, Path name, 340 Class keyClass, Class valClass, CompressionType compressionType, 341 Progressable progress) throws IOException { 342 return createWriter(conf, Writer.file(name), 343 Writer.filesystem(fs), 344 Writer.keyClass(keyClass), 345 Writer.valueClass(valClass), 346 Writer.compression(compressionType), 347 Writer.progressable(progress)); 348 } 349 350 /** 351 * Construct the preferred type of SequenceFile Writer. 352 * @param fs The configured filesystem. 353 * @param conf The configuration. 354 * @param name The name of the file. 355 * @param keyClass The 'key' type. 356 * @param valClass The 'value' type. 357 * @param compressionType The compression type. 358 * @param codec The compression codec. 359 * @return Returns the handle to the constructed SequenceFile Writer. 360 * @throws IOException 361 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 362 * instead. 363 */ 364 @Deprecated 365 public static Writer 366 createWriter(FileSystem fs, Configuration conf, Path name, 367 Class keyClass, Class valClass, CompressionType compressionType, 368 CompressionCodec codec) throws IOException { 369 return createWriter(conf, Writer.file(name), 370 Writer.filesystem(fs), 371 Writer.keyClass(keyClass), 372 Writer.valueClass(valClass), 373 Writer.compression(compressionType, codec)); 374 } 375 376 /** 377 * Construct the preferred type of SequenceFile Writer. 378 * @param fs The configured filesystem. 379 * @param conf The configuration. 380 * @param name The name of the file. 381 * @param keyClass The 'key' type. 382 * @param valClass The 'value' type. 383 * @param compressionType The compression type. 384 * @param codec The compression codec. 385 * @param progress The Progressable object to track progress. 386 * @param metadata The metadata of the file. 387 * @return Returns the handle to the constructed SequenceFile Writer. 388 * @throws IOException 389 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 390 * instead. 391 */ 392 @Deprecated 393 public static Writer 394 createWriter(FileSystem fs, Configuration conf, Path name, 395 Class keyClass, Class valClass, 396 CompressionType compressionType, CompressionCodec codec, 397 Progressable progress, Metadata metadata) throws IOException { 398 return createWriter(conf, Writer.file(name), 399 Writer.filesystem(fs), 400 Writer.keyClass(keyClass), 401 Writer.valueClass(valClass), 402 Writer.compression(compressionType, codec), 403 Writer.progressable(progress), 404 Writer.metadata(metadata)); 405 } 406 407 /** 408 * Construct the preferred type of SequenceFile Writer. 409 * @param fs The configured filesystem. 410 * @param conf The configuration. 411 * @param name The name of the file. 412 * @param keyClass The 'key' type. 413 * @param valClass The 'value' type. 414 * @param bufferSize buffer size for the underlaying outputstream. 415 * @param replication replication factor for the file. 416 * @param blockSize block size for the file. 417 * @param compressionType The compression type. 418 * @param codec The compression codec. 419 * @param progress The Progressable object to track progress. 420 * @param metadata The metadata of the file. 421 * @return Returns the handle to the constructed SequenceFile Writer. 422 * @throws IOException 423 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 424 * instead. 425 */ 426 @Deprecated 427 public static Writer 428 createWriter(FileSystem fs, Configuration conf, Path name, 429 Class keyClass, Class valClass, int bufferSize, 430 short replication, long blockSize, 431 CompressionType compressionType, CompressionCodec codec, 432 Progressable progress, Metadata metadata) throws IOException { 433 return createWriter(conf, Writer.file(name), 434 Writer.filesystem(fs), 435 Writer.keyClass(keyClass), 436 Writer.valueClass(valClass), 437 Writer.bufferSize(bufferSize), 438 Writer.replication(replication), 439 Writer.blockSize(blockSize), 440 Writer.compression(compressionType, codec), 441 Writer.progressable(progress), 442 Writer.metadata(metadata)); 443 } 444 445 /** 446 * Construct the preferred type of SequenceFile Writer. 447 * @param fs The configured filesystem. 448 * @param conf The configuration. 449 * @param name The name of the file. 450 * @param keyClass The 'key' type. 451 * @param valClass The 'value' type. 452 * @param bufferSize buffer size for the underlaying outputstream. 453 * @param replication replication factor for the file. 454 * @param blockSize block size for the file. 455 * @param createParent create parent directory if non-existent 456 * @param compressionType The compression type. 457 * @param codec The compression codec. 458 * @param metadata The metadata of the file. 459 * @return Returns the handle to the constructed SequenceFile Writer. 460 * @throws IOException 461 */ 462 @Deprecated 463 public static Writer 464 createWriter(FileSystem fs, Configuration conf, Path name, 465 Class keyClass, Class valClass, int bufferSize, 466 short replication, long blockSize, boolean createParent, 467 CompressionType compressionType, CompressionCodec codec, 468 Metadata metadata) throws IOException { 469 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 470 conf, name, keyClass, valClass, compressionType, codec, 471 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 472 CreateOpts.bufferSize(bufferSize), 473 createParent ? CreateOpts.createParent() 474 : CreateOpts.donotCreateParent(), 475 CreateOpts.repFac(replication), 476 CreateOpts.blockSize(blockSize) 477 ); 478 } 479 480 /** 481 * Construct the preferred type of SequenceFile Writer. 482 * @param fc The context for the specified file. 483 * @param conf The configuration. 484 * @param name The name of the file. 485 * @param keyClass The 'key' type. 486 * @param valClass The 'value' type. 487 * @param compressionType The compression type. 488 * @param codec The compression codec. 489 * @param metadata The metadata of the file. 490 * @param createFlag gives the semantics of create: overwrite, append etc. 491 * @param opts file creation options; see {@link CreateOpts}. 492 * @return Returns the handle to the constructed SequenceFile Writer. 493 * @throws IOException 494 */ 495 public static Writer 496 createWriter(FileContext fc, Configuration conf, Path name, 497 Class keyClass, Class valClass, 498 CompressionType compressionType, CompressionCodec codec, 499 Metadata metadata, 500 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 501 throws IOException { 502 return createWriter(conf, fc.create(name, createFlag, opts), 503 keyClass, valClass, compressionType, codec, metadata).ownStream(); 504 } 505 506 /** 507 * Construct the preferred type of SequenceFile Writer. 508 * @param fs The configured filesystem. 509 * @param conf The configuration. 510 * @param name The name of the file. 511 * @param keyClass The 'key' type. 512 * @param valClass The 'value' type. 513 * @param compressionType The compression type. 514 * @param codec The compression codec. 515 * @param progress The Progressable object to track progress. 516 * @return Returns the handle to the constructed SequenceFile Writer. 517 * @throws IOException 518 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 519 * instead. 520 */ 521 @Deprecated 522 public static Writer 523 createWriter(FileSystem fs, Configuration conf, Path name, 524 Class keyClass, Class valClass, 525 CompressionType compressionType, CompressionCodec codec, 526 Progressable progress) throws IOException { 527 return createWriter(conf, Writer.file(name), 528 Writer.filesystem(fs), 529 Writer.keyClass(keyClass), 530 Writer.valueClass(valClass), 531 Writer.compression(compressionType, codec), 532 Writer.progressable(progress)); 533 } 534 535 /** 536 * Construct the preferred type of 'raw' SequenceFile Writer. 537 * @param conf The configuration. 538 * @param out The stream on top which the writer is to be constructed. 539 * @param keyClass The 'key' type. 540 * @param valClass The 'value' type. 541 * @param compressionType The compression type. 542 * @param codec The compression codec. 543 * @param metadata The metadata of the file. 544 * @return Returns the handle to the constructed SequenceFile Writer. 545 * @throws IOException 546 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 547 * instead. 548 */ 549 @Deprecated 550 public static Writer 551 createWriter(Configuration conf, FSDataOutputStream out, 552 Class keyClass, Class valClass, 553 CompressionType compressionType, 554 CompressionCodec codec, Metadata metadata) throws IOException { 555 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 556 Writer.valueClass(valClass), 557 Writer.compression(compressionType, codec), 558 Writer.metadata(metadata)); 559 } 560 561 /** 562 * Construct the preferred type of 'raw' SequenceFile Writer. 563 * @param conf The configuration. 564 * @param out The stream on top which the writer is to be constructed. 565 * @param keyClass The 'key' type. 566 * @param valClass The 'value' type. 567 * @param compressionType The compression type. 568 * @param codec The compression codec. 569 * @return Returns the handle to the constructed SequenceFile Writer. 570 * @throws IOException 571 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 572 * instead. 573 */ 574 @Deprecated 575 public static Writer 576 createWriter(Configuration conf, FSDataOutputStream out, 577 Class keyClass, Class valClass, CompressionType compressionType, 578 CompressionCodec codec) throws IOException { 579 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 580 Writer.valueClass(valClass), 581 Writer.compression(compressionType, codec)); 582 } 583 584 585 /** The interface to 'raw' values of SequenceFiles. */ 586 public static interface ValueBytes { 587 588 /** Writes the uncompressed bytes to the outStream. 589 * @param outStream : Stream to write uncompressed bytes into. 590 * @throws IOException 591 */ 592 public void writeUncompressedBytes(DataOutputStream outStream) 593 throws IOException; 594 595 /** Write compressed bytes to outStream. 596 * Note: that it will NOT compress the bytes if they are not compressed. 597 * @param outStream : Stream to write compressed bytes into. 598 */ 599 public void writeCompressedBytes(DataOutputStream outStream) 600 throws IllegalArgumentException, IOException; 601 602 /** 603 * Size of stored data. 604 */ 605 public int getSize(); 606 } 607 608 private static class UncompressedBytes implements ValueBytes { 609 private int dataSize; 610 private byte[] data; 611 612 private UncompressedBytes() { 613 data = null; 614 dataSize = 0; 615 } 616 617 private void reset(DataInputStream in, int length) throws IOException { 618 if (data == null) { 619 data = new byte[length]; 620 } else if (length > data.length) { 621 data = new byte[Math.max(length, data.length * 2)]; 622 } 623 dataSize = -1; 624 in.readFully(data, 0, length); 625 dataSize = length; 626 } 627 628 public int getSize() { 629 return dataSize; 630 } 631 632 public void writeUncompressedBytes(DataOutputStream outStream) 633 throws IOException { 634 outStream.write(data, 0, dataSize); 635 } 636 637 public void writeCompressedBytes(DataOutputStream outStream) 638 throws IllegalArgumentException, IOException { 639 throw 640 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 641 } 642 643 } // UncompressedBytes 644 645 private static class CompressedBytes implements ValueBytes { 646 private int dataSize; 647 private byte[] data; 648 DataInputBuffer rawData = null; 649 CompressionCodec codec = null; 650 CompressionInputStream decompressedStream = null; 651 652 private CompressedBytes(CompressionCodec codec) { 653 data = null; 654 dataSize = 0; 655 this.codec = codec; 656 } 657 658 private void reset(DataInputStream in, int length) throws IOException { 659 if (data == null) { 660 data = new byte[length]; 661 } else if (length > data.length) { 662 data = new byte[Math.max(length, data.length * 2)]; 663 } 664 dataSize = -1; 665 in.readFully(data, 0, length); 666 dataSize = length; 667 } 668 669 public int getSize() { 670 return dataSize; 671 } 672 673 public void writeUncompressedBytes(DataOutputStream outStream) 674 throws IOException { 675 if (decompressedStream == null) { 676 rawData = new DataInputBuffer(); 677 decompressedStream = codec.createInputStream(rawData); 678 } else { 679 decompressedStream.resetState(); 680 } 681 rawData.reset(data, 0, dataSize); 682 683 byte[] buffer = new byte[8192]; 684 int bytesRead = 0; 685 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 686 outStream.write(buffer, 0, bytesRead); 687 } 688 } 689 690 public void writeCompressedBytes(DataOutputStream outStream) 691 throws IllegalArgumentException, IOException { 692 outStream.write(data, 0, dataSize); 693 } 694 695 } // CompressedBytes 696 697 /** 698 * The class encapsulating with the metadata of a file. 699 * The metadata of a file is a list of attribute name/value 700 * pairs of Text type. 701 * 702 */ 703 public static class Metadata implements Writable { 704 705 private TreeMap<Text, Text> theMetadata; 706 707 public Metadata() { 708 this(new TreeMap<Text, Text>()); 709 } 710 711 public Metadata(TreeMap<Text, Text> arg) { 712 if (arg == null) { 713 this.theMetadata = new TreeMap<Text, Text>(); 714 } else { 715 this.theMetadata = arg; 716 } 717 } 718 719 public Text get(Text name) { 720 return this.theMetadata.get(name); 721 } 722 723 public void set(Text name, Text value) { 724 this.theMetadata.put(name, value); 725 } 726 727 public TreeMap<Text, Text> getMetadata() { 728 return new TreeMap<Text, Text>(this.theMetadata); 729 } 730 731 public void write(DataOutput out) throws IOException { 732 out.writeInt(this.theMetadata.size()); 733 Iterator<Map.Entry<Text, Text>> iter = 734 this.theMetadata.entrySet().iterator(); 735 while (iter.hasNext()) { 736 Map.Entry<Text, Text> en = iter.next(); 737 en.getKey().write(out); 738 en.getValue().write(out); 739 } 740 } 741 742 public void readFields(DataInput in) throws IOException { 743 int sz = in.readInt(); 744 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 745 this.theMetadata = new TreeMap<Text, Text>(); 746 for (int i = 0; i < sz; i++) { 747 Text key = new Text(); 748 Text val = new Text(); 749 key.readFields(in); 750 val.readFields(in); 751 this.theMetadata.put(key, val); 752 } 753 } 754 755 public boolean equals(Object other) { 756 if (other == null) { 757 return false; 758 } 759 if (other.getClass() != this.getClass()) { 760 return false; 761 } else { 762 return equals((Metadata)other); 763 } 764 } 765 766 public boolean equals(Metadata other) { 767 if (other == null) return false; 768 if (this.theMetadata.size() != other.theMetadata.size()) { 769 return false; 770 } 771 Iterator<Map.Entry<Text, Text>> iter1 = 772 this.theMetadata.entrySet().iterator(); 773 Iterator<Map.Entry<Text, Text>> iter2 = 774 other.theMetadata.entrySet().iterator(); 775 while (iter1.hasNext() && iter2.hasNext()) { 776 Map.Entry<Text, Text> en1 = iter1.next(); 777 Map.Entry<Text, Text> en2 = iter2.next(); 778 if (!en1.getKey().equals(en2.getKey())) { 779 return false; 780 } 781 if (!en1.getValue().equals(en2.getValue())) { 782 return false; 783 } 784 } 785 if (iter1.hasNext() || iter2.hasNext()) { 786 return false; 787 } 788 return true; 789 } 790 791 public int hashCode() { 792 assert false : "hashCode not designed"; 793 return 42; // any arbitrary constant will do 794 } 795 796 public String toString() { 797 StringBuilder sb = new StringBuilder(); 798 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 799 Iterator<Map.Entry<Text, Text>> iter = 800 this.theMetadata.entrySet().iterator(); 801 while (iter.hasNext()) { 802 Map.Entry<Text, Text> en = iter.next(); 803 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 804 sb.append("\n"); 805 } 806 return sb.toString(); 807 } 808 } 809 810 /** Write key/value pairs to a sequence-format file. */ 811 public static class Writer implements java.io.Closeable, Syncable { 812 private Configuration conf; 813 FSDataOutputStream out; 814 boolean ownOutputStream = true; 815 DataOutputBuffer buffer = new DataOutputBuffer(); 816 817 Class keyClass; 818 Class valClass; 819 820 private final CompressionType compress; 821 CompressionCodec codec = null; 822 CompressionOutputStream deflateFilter = null; 823 DataOutputStream deflateOut = null; 824 Metadata metadata = null; 825 Compressor compressor = null; 826 827 protected Serializer keySerializer; 828 protected Serializer uncompressedValSerializer; 829 protected Serializer compressedValSerializer; 830 831 // Insert a globally unique 16-byte value every few entries, so that one 832 // can seek into the middle of a file and then synchronize with record 833 // starts and ends by scanning for this value. 834 long lastSyncPos; // position of last sync 835 byte[] sync; // 16 random bytes 836 { 837 try { 838 MessageDigest digester = MessageDigest.getInstance("MD5"); 839 long time = Time.now(); 840 digester.update((new UID()+"@"+time).getBytes()); 841 sync = digester.digest(); 842 } catch (Exception e) { 843 throw new RuntimeException(e); 844 } 845 } 846 847 public static interface Option {} 848 849 static class FileOption extends Options.PathOption 850 implements Option { 851 FileOption(Path path) { 852 super(path); 853 } 854 } 855 856 /** 857 * @deprecated only used for backwards-compatibility in the createWriter methods 858 * that take FileSystem. 859 */ 860 @Deprecated 861 private static class FileSystemOption implements Option { 862 private final FileSystem value; 863 protected FileSystemOption(FileSystem value) { 864 this.value = value; 865 } 866 public FileSystem getValue() { 867 return value; 868 } 869 } 870 871 static class StreamOption extends Options.FSDataOutputStreamOption 872 implements Option { 873 StreamOption(FSDataOutputStream stream) { 874 super(stream); 875 } 876 } 877 878 static class BufferSizeOption extends Options.IntegerOption 879 implements Option { 880 BufferSizeOption(int value) { 881 super(value); 882 } 883 } 884 885 static class BlockSizeOption extends Options.LongOption implements Option { 886 BlockSizeOption(long value) { 887 super(value); 888 } 889 } 890 891 static class ReplicationOption extends Options.IntegerOption 892 implements Option { 893 ReplicationOption(int value) { 894 super(value); 895 } 896 } 897 898 static class KeyClassOption extends Options.ClassOption implements Option { 899 KeyClassOption(Class<?> value) { 900 super(value); 901 } 902 } 903 904 static class ValueClassOption extends Options.ClassOption 905 implements Option { 906 ValueClassOption(Class<?> value) { 907 super(value); 908 } 909 } 910 911 static class MetadataOption implements Option { 912 private final Metadata value; 913 MetadataOption(Metadata value) { 914 this.value = value; 915 } 916 Metadata getValue() { 917 return value; 918 } 919 } 920 921 static class ProgressableOption extends Options.ProgressableOption 922 implements Option { 923 ProgressableOption(Progressable value) { 924 super(value); 925 } 926 } 927 928 private static class CompressionOption implements Option { 929 private final CompressionType value; 930 private final CompressionCodec codec; 931 CompressionOption(CompressionType value) { 932 this(value, null); 933 } 934 CompressionOption(CompressionType value, CompressionCodec codec) { 935 this.value = value; 936 this.codec = (CompressionType.NONE != value && null == codec) 937 ? new DefaultCodec() 938 : codec; 939 } 940 CompressionType getValue() { 941 return value; 942 } 943 CompressionCodec getCodec() { 944 return codec; 945 } 946 } 947 948 public static Option file(Path value) { 949 return new FileOption(value); 950 } 951 952 /** 953 * @deprecated only used for backwards-compatibility in the createWriter methods 954 * that take FileSystem. 955 */ 956 @Deprecated 957 private static Option filesystem(FileSystem fs) { 958 return new SequenceFile.Writer.FileSystemOption(fs); 959 } 960 961 public static Option bufferSize(int value) { 962 return new BufferSizeOption(value); 963 } 964 965 public static Option stream(FSDataOutputStream value) { 966 return new StreamOption(value); 967 } 968 969 public static Option replication(short value) { 970 return new ReplicationOption(value); 971 } 972 973 public static Option blockSize(long value) { 974 return new BlockSizeOption(value); 975 } 976 977 public static Option progressable(Progressable value) { 978 return new ProgressableOption(value); 979 } 980 981 public static Option keyClass(Class<?> value) { 982 return new KeyClassOption(value); 983 } 984 985 public static Option valueClass(Class<?> value) { 986 return new ValueClassOption(value); 987 } 988 989 public static Option metadata(Metadata value) { 990 return new MetadataOption(value); 991 } 992 993 public static Option compression(CompressionType value) { 994 return new CompressionOption(value); 995 } 996 997 public static Option compression(CompressionType value, 998 CompressionCodec codec) { 999 return new CompressionOption(value, codec); 1000 } 1001 1002 /** 1003 * Construct a uncompressed writer from a set of options. 1004 * @param conf the configuration to use 1005 * @param options the options used when creating the writer 1006 * @throws IOException if it fails 1007 */ 1008 Writer(Configuration conf, 1009 Option... opts) throws IOException { 1010 BlockSizeOption blockSizeOption = 1011 Options.getOption(BlockSizeOption.class, opts); 1012 BufferSizeOption bufferSizeOption = 1013 Options.getOption(BufferSizeOption.class, opts); 1014 ReplicationOption replicationOption = 1015 Options.getOption(ReplicationOption.class, opts); 1016 ProgressableOption progressOption = 1017 Options.getOption(ProgressableOption.class, opts); 1018 FileOption fileOption = Options.getOption(FileOption.class, opts); 1019 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1020 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1021 KeyClassOption keyClassOption = 1022 Options.getOption(KeyClassOption.class, opts); 1023 ValueClassOption valueClassOption = 1024 Options.getOption(ValueClassOption.class, opts); 1025 MetadataOption metadataOption = 1026 Options.getOption(MetadataOption.class, opts); 1027 CompressionOption compressionTypeOption = 1028 Options.getOption(CompressionOption.class, opts); 1029 // check consistency of options 1030 if ((fileOption == null) == (streamOption == null)) { 1031 throw new IllegalArgumentException("file or stream must be specified"); 1032 } 1033 if (fileOption == null && (blockSizeOption != null || 1034 bufferSizeOption != null || 1035 replicationOption != null || 1036 progressOption != null)) { 1037 throw new IllegalArgumentException("file modifier options not " + 1038 "compatible with stream"); 1039 } 1040 1041 FSDataOutputStream out; 1042 boolean ownStream = fileOption != null; 1043 if (ownStream) { 1044 Path p = fileOption.getValue(); 1045 FileSystem fs; 1046 if (fsOption != null) { 1047 fs = fsOption.getValue(); 1048 } else { 1049 fs = p.getFileSystem(conf); 1050 } 1051 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1052 bufferSizeOption.getValue(); 1053 short replication = replicationOption == null ? 1054 fs.getDefaultReplication(p) : 1055 (short) replicationOption.getValue(); 1056 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1057 blockSizeOption.getValue(); 1058 Progressable progress = progressOption == null ? null : 1059 progressOption.getValue(); 1060 out = fs.create(p, true, bufferSize, replication, blockSize, progress); 1061 } else { 1062 out = streamOption.getValue(); 1063 } 1064 Class<?> keyClass = keyClassOption == null ? 1065 Object.class : keyClassOption.getValue(); 1066 Class<?> valueClass = valueClassOption == null ? 1067 Object.class : valueClassOption.getValue(); 1068 Metadata metadata = metadataOption == null ? 1069 new Metadata() : metadataOption.getValue(); 1070 this.compress = compressionTypeOption.getValue(); 1071 final CompressionCodec codec = compressionTypeOption.getCodec(); 1072 if (codec != null && 1073 (codec instanceof GzipCodec) && 1074 !NativeCodeLoader.isNativeCodeLoaded() && 1075 !ZlibFactory.isNativeZlibLoaded(conf)) { 1076 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1077 "GzipCodec without native-hadoop " + 1078 "code!"); 1079 } 1080 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1081 } 1082 1083 /** Create the named file. 1084 * @deprecated Use 1085 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1086 * instead. 1087 */ 1088 @Deprecated 1089 public Writer(FileSystem fs, Configuration conf, Path name, 1090 Class keyClass, Class valClass) throws IOException { 1091 this.compress = CompressionType.NONE; 1092 init(conf, fs.create(name), true, keyClass, valClass, null, 1093 new Metadata()); 1094 } 1095 1096 /** Create the named file with write-progress reporter. 1097 * @deprecated Use 1098 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1099 * instead. 1100 */ 1101 @Deprecated 1102 public Writer(FileSystem fs, Configuration conf, Path name, 1103 Class keyClass, Class valClass, 1104 Progressable progress, Metadata metadata) throws IOException { 1105 this.compress = CompressionType.NONE; 1106 init(conf, fs.create(name, progress), true, keyClass, valClass, 1107 null, metadata); 1108 } 1109 1110 /** Create the named file with write-progress reporter. 1111 * @deprecated Use 1112 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1113 * instead. 1114 */ 1115 @Deprecated 1116 public Writer(FileSystem fs, Configuration conf, Path name, 1117 Class keyClass, Class valClass, 1118 int bufferSize, short replication, long blockSize, 1119 Progressable progress, Metadata metadata) throws IOException { 1120 this.compress = CompressionType.NONE; 1121 init(conf, 1122 fs.create(name, true, bufferSize, replication, blockSize, progress), 1123 true, keyClass, valClass, null, metadata); 1124 } 1125 1126 boolean isCompressed() { return compress != CompressionType.NONE; } 1127 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1128 1129 Writer ownStream() { this.ownOutputStream = true; return this; } 1130 1131 /** Write and flush the file header. */ 1132 private void writeFileHeader() 1133 throws IOException { 1134 out.write(VERSION); 1135 Text.writeString(out, keyClass.getName()); 1136 Text.writeString(out, valClass.getName()); 1137 1138 out.writeBoolean(this.isCompressed()); 1139 out.writeBoolean(this.isBlockCompressed()); 1140 1141 if (this.isCompressed()) { 1142 Text.writeString(out, (codec.getClass()).getName()); 1143 } 1144 this.metadata.write(out); 1145 out.write(sync); // write the sync bytes 1146 out.flush(); // flush header 1147 } 1148 1149 /** Initialize. */ 1150 @SuppressWarnings("unchecked") 1151 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1152 Class keyClass, Class valClass, 1153 CompressionCodec codec, Metadata metadata) 1154 throws IOException { 1155 this.conf = conf; 1156 this.out = out; 1157 this.ownOutputStream = ownStream; 1158 this.keyClass = keyClass; 1159 this.valClass = valClass; 1160 this.codec = codec; 1161 this.metadata = metadata; 1162 SerializationFactory serializationFactory = new SerializationFactory(conf); 1163 this.keySerializer = serializationFactory.getSerializer(keyClass); 1164 this.keySerializer.open(buffer); 1165 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1166 this.uncompressedValSerializer.open(buffer); 1167 if (this.codec != null) { 1168 ReflectionUtils.setConf(this.codec, this.conf); 1169 this.compressor = CodecPool.getCompressor(this.codec); 1170 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1171 this.deflateOut = 1172 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1173 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1174 this.compressedValSerializer.open(deflateOut); 1175 } 1176 writeFileHeader(); 1177 } 1178 1179 /** Returns the class of keys in this file. */ 1180 public Class getKeyClass() { return keyClass; } 1181 1182 /** Returns the class of values in this file. */ 1183 public Class getValueClass() { return valClass; } 1184 1185 /** Returns the compression codec of data in this file. */ 1186 public CompressionCodec getCompressionCodec() { return codec; } 1187 1188 /** create a sync point */ 1189 public void sync() throws IOException { 1190 if (sync != null && lastSyncPos != out.getPos()) { 1191 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1192 out.write(sync); // write sync 1193 lastSyncPos = out.getPos(); // update lastSyncPos 1194 } 1195 } 1196 1197 /** 1198 * flush all currently written data to the file system 1199 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1200 */ 1201 @Deprecated 1202 public void syncFs() throws IOException { 1203 if (out != null) { 1204 out.sync(); // flush contents to file system 1205 } 1206 } 1207 1208 @Override 1209 public void hsync() throws IOException { 1210 if (out != null) { 1211 out.hsync(); 1212 } 1213 } 1214 1215 @Override 1216 public void hflush() throws IOException { 1217 if (out != null) { 1218 out.hflush(); 1219 } 1220 } 1221 1222 /** Returns the configuration of this file. */ 1223 Configuration getConf() { return conf; } 1224 1225 /** Close the file. */ 1226 public synchronized void close() throws IOException { 1227 keySerializer.close(); 1228 uncompressedValSerializer.close(); 1229 if (compressedValSerializer != null) { 1230 compressedValSerializer.close(); 1231 } 1232 1233 CodecPool.returnCompressor(compressor); 1234 compressor = null; 1235 1236 if (out != null) { 1237 1238 // Close the underlying stream iff we own it... 1239 if (ownOutputStream) { 1240 out.close(); 1241 } else { 1242 out.flush(); 1243 } 1244 out = null; 1245 } 1246 } 1247 1248 synchronized void checkAndWriteSync() throws IOException { 1249 if (sync != null && 1250 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1251 sync(); 1252 } 1253 } 1254 1255 /** Append a key/value pair. */ 1256 public void append(Writable key, Writable val) 1257 throws IOException { 1258 append((Object) key, (Object) val); 1259 } 1260 1261 /** Append a key/value pair. */ 1262 @SuppressWarnings("unchecked") 1263 public synchronized void append(Object key, Object val) 1264 throws IOException { 1265 if (key.getClass() != keyClass) 1266 throw new IOException("wrong key class: "+key.getClass().getName() 1267 +" is not "+keyClass); 1268 if (val.getClass() != valClass) 1269 throw new IOException("wrong value class: "+val.getClass().getName() 1270 +" is not "+valClass); 1271 1272 buffer.reset(); 1273 1274 // Append the 'key' 1275 keySerializer.serialize(key); 1276 int keyLength = buffer.getLength(); 1277 if (keyLength < 0) 1278 throw new IOException("negative length keys not allowed: " + key); 1279 1280 // Append the 'value' 1281 if (compress == CompressionType.RECORD) { 1282 deflateFilter.resetState(); 1283 compressedValSerializer.serialize(val); 1284 deflateOut.flush(); 1285 deflateFilter.finish(); 1286 } else { 1287 uncompressedValSerializer.serialize(val); 1288 } 1289 1290 // Write the record out 1291 checkAndWriteSync(); // sync 1292 out.writeInt(buffer.getLength()); // total record length 1293 out.writeInt(keyLength); // key portion length 1294 out.write(buffer.getData(), 0, buffer.getLength()); // data 1295 } 1296 1297 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1298 int keyLength, ValueBytes val) throws IOException { 1299 if (keyLength < 0) 1300 throw new IOException("negative length keys not allowed: " + keyLength); 1301 1302 int valLength = val.getSize(); 1303 1304 checkAndWriteSync(); 1305 1306 out.writeInt(keyLength+valLength); // total record length 1307 out.writeInt(keyLength); // key portion length 1308 out.write(keyData, keyOffset, keyLength); // key 1309 val.writeUncompressedBytes(out); // value 1310 } 1311 1312 /** Returns the current length of the output file. 1313 * 1314 * <p>This always returns a synchronized position. In other words, 1315 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1316 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1317 * the key may be earlier in the file than key last written when this 1318 * method was called (e.g., with block-compression, it may be the first key 1319 * in the block that was being written when this method was called). 1320 */ 1321 public synchronized long getLength() throws IOException { 1322 return out.getPos(); 1323 } 1324 1325 } // class Writer 1326 1327 /** Write key/compressed-value pairs to a sequence-format file. */ 1328 static class RecordCompressWriter extends Writer { 1329 1330 RecordCompressWriter(Configuration conf, 1331 Option... options) throws IOException { 1332 super(conf, options); 1333 } 1334 1335 /** Append a key/value pair. */ 1336 @SuppressWarnings("unchecked") 1337 public synchronized void append(Object key, Object val) 1338 throws IOException { 1339 if (key.getClass() != keyClass) 1340 throw new IOException("wrong key class: "+key.getClass().getName() 1341 +" is not "+keyClass); 1342 if (val.getClass() != valClass) 1343 throw new IOException("wrong value class: "+val.getClass().getName() 1344 +" is not "+valClass); 1345 1346 buffer.reset(); 1347 1348 // Append the 'key' 1349 keySerializer.serialize(key); 1350 int keyLength = buffer.getLength(); 1351 if (keyLength < 0) 1352 throw new IOException("negative length keys not allowed: " + key); 1353 1354 // Compress 'value' and append it 1355 deflateFilter.resetState(); 1356 compressedValSerializer.serialize(val); 1357 deflateOut.flush(); 1358 deflateFilter.finish(); 1359 1360 // Write the record out 1361 checkAndWriteSync(); // sync 1362 out.writeInt(buffer.getLength()); // total record length 1363 out.writeInt(keyLength); // key portion length 1364 out.write(buffer.getData(), 0, buffer.getLength()); // data 1365 } 1366 1367 /** Append a key/value pair. */ 1368 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1369 int keyLength, ValueBytes val) throws IOException { 1370 1371 if (keyLength < 0) 1372 throw new IOException("negative length keys not allowed: " + keyLength); 1373 1374 int valLength = val.getSize(); 1375 1376 checkAndWriteSync(); // sync 1377 out.writeInt(keyLength+valLength); // total record length 1378 out.writeInt(keyLength); // key portion length 1379 out.write(keyData, keyOffset, keyLength); // 'key' data 1380 val.writeCompressedBytes(out); // 'value' data 1381 } 1382 1383 } // RecordCompressionWriter 1384 1385 /** Write compressed key/value blocks to a sequence-format file. */ 1386 static class BlockCompressWriter extends Writer { 1387 1388 private int noBufferedRecords = 0; 1389 1390 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1391 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1392 1393 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1394 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1395 1396 private final int compressionBlockSize; 1397 1398 BlockCompressWriter(Configuration conf, 1399 Option... options) throws IOException { 1400 super(conf, options); 1401 compressionBlockSize = 1402 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1403 keySerializer.close(); 1404 keySerializer.open(keyBuffer); 1405 uncompressedValSerializer.close(); 1406 uncompressedValSerializer.open(valBuffer); 1407 } 1408 1409 /** Workhorse to check and write out compressed data/lengths */ 1410 private synchronized 1411 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1412 throws IOException { 1413 deflateFilter.resetState(); 1414 buffer.reset(); 1415 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1416 uncompressedDataBuffer.getLength()); 1417 deflateOut.flush(); 1418 deflateFilter.finish(); 1419 1420 WritableUtils.writeVInt(out, buffer.getLength()); 1421 out.write(buffer.getData(), 0, buffer.getLength()); 1422 } 1423 1424 /** Compress and flush contents to dfs */ 1425 public synchronized void sync() throws IOException { 1426 if (noBufferedRecords > 0) { 1427 super.sync(); 1428 1429 // No. of records 1430 WritableUtils.writeVInt(out, noBufferedRecords); 1431 1432 // Write 'keys' and lengths 1433 writeBuffer(keyLenBuffer); 1434 writeBuffer(keyBuffer); 1435 1436 // Write 'values' and lengths 1437 writeBuffer(valLenBuffer); 1438 writeBuffer(valBuffer); 1439 1440 // Flush the file-stream 1441 out.flush(); 1442 1443 // Reset internal states 1444 keyLenBuffer.reset(); 1445 keyBuffer.reset(); 1446 valLenBuffer.reset(); 1447 valBuffer.reset(); 1448 noBufferedRecords = 0; 1449 } 1450 1451 } 1452 1453 /** Close the file. */ 1454 public synchronized void close() throws IOException { 1455 if (out != null) { 1456 sync(); 1457 } 1458 super.close(); 1459 } 1460 1461 /** Append a key/value pair. */ 1462 @SuppressWarnings("unchecked") 1463 public synchronized void append(Object key, Object val) 1464 throws IOException { 1465 if (key.getClass() != keyClass) 1466 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1467 if (val.getClass() != valClass) 1468 throw new IOException("wrong value class: "+val+" is not "+valClass); 1469 1470 // Save key/value into respective buffers 1471 int oldKeyLength = keyBuffer.getLength(); 1472 keySerializer.serialize(key); 1473 int keyLength = keyBuffer.getLength() - oldKeyLength; 1474 if (keyLength < 0) 1475 throw new IOException("negative length keys not allowed: " + key); 1476 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1477 1478 int oldValLength = valBuffer.getLength(); 1479 uncompressedValSerializer.serialize(val); 1480 int valLength = valBuffer.getLength() - oldValLength; 1481 WritableUtils.writeVInt(valLenBuffer, valLength); 1482 1483 // Added another key/value pair 1484 ++noBufferedRecords; 1485 1486 // Compress and flush? 1487 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1488 if (currentBlockSize >= compressionBlockSize) { 1489 sync(); 1490 } 1491 } 1492 1493 /** Append a key/value pair. */ 1494 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1495 int keyLength, ValueBytes val) throws IOException { 1496 1497 if (keyLength < 0) 1498 throw new IOException("negative length keys not allowed"); 1499 1500 int valLength = val.getSize(); 1501 1502 // Save key/value data in relevant buffers 1503 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1504 keyBuffer.write(keyData, keyOffset, keyLength); 1505 WritableUtils.writeVInt(valLenBuffer, valLength); 1506 val.writeUncompressedBytes(valBuffer); 1507 1508 // Added another key/value pair 1509 ++noBufferedRecords; 1510 1511 // Compress and flush? 1512 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1513 if (currentBlockSize >= compressionBlockSize) { 1514 sync(); 1515 } 1516 } 1517 1518 } // BlockCompressionWriter 1519 1520 /** Get the configured buffer size */ 1521 private static int getBufferSize(Configuration conf) { 1522 return conf.getInt("io.file.buffer.size", 4096); 1523 } 1524 1525 /** Reads key/value pairs from a sequence-format file. */ 1526 public static class Reader implements java.io.Closeable { 1527 private String filename; 1528 private FSDataInputStream in; 1529 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1530 1531 private byte version; 1532 1533 private String keyClassName; 1534 private String valClassName; 1535 private Class keyClass; 1536 private Class valClass; 1537 1538 private CompressionCodec codec = null; 1539 private Metadata metadata = null; 1540 1541 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1542 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1543 private boolean syncSeen; 1544 1545 private long headerEnd; 1546 private long end; 1547 private int keyLength; 1548 private int recordLength; 1549 1550 private boolean decompress; 1551 private boolean blockCompressed; 1552 1553 private Configuration conf; 1554 1555 private int noBufferedRecords = 0; 1556 private boolean lazyDecompress = true; 1557 private boolean valuesDecompressed = true; 1558 1559 private int noBufferedKeys = 0; 1560 private int noBufferedValues = 0; 1561 1562 private DataInputBuffer keyLenBuffer = null; 1563 private CompressionInputStream keyLenInFilter = null; 1564 private DataInputStream keyLenIn = null; 1565 private Decompressor keyLenDecompressor = null; 1566 private DataInputBuffer keyBuffer = null; 1567 private CompressionInputStream keyInFilter = null; 1568 private DataInputStream keyIn = null; 1569 private Decompressor keyDecompressor = null; 1570 1571 private DataInputBuffer valLenBuffer = null; 1572 private CompressionInputStream valLenInFilter = null; 1573 private DataInputStream valLenIn = null; 1574 private Decompressor valLenDecompressor = null; 1575 private DataInputBuffer valBuffer = null; 1576 private CompressionInputStream valInFilter = null; 1577 private DataInputStream valIn = null; 1578 private Decompressor valDecompressor = null; 1579 1580 private Deserializer keyDeserializer; 1581 private Deserializer valDeserializer; 1582 1583 /** 1584 * A tag interface for all of the Reader options 1585 */ 1586 public static interface Option {} 1587 1588 /** 1589 * Create an option to specify the path name of the sequence file. 1590 * @param value the path to read 1591 * @return a new option 1592 */ 1593 public static Option file(Path value) { 1594 return new FileOption(value); 1595 } 1596 1597 /** 1598 * Create an option to specify the stream with the sequence file. 1599 * @param value the stream to read. 1600 * @return a new option 1601 */ 1602 public static Option stream(FSDataInputStream value) { 1603 return new InputStreamOption(value); 1604 } 1605 1606 /** 1607 * Create an option to specify the starting byte to read. 1608 * @param value the number of bytes to skip over 1609 * @return a new option 1610 */ 1611 public static Option start(long value) { 1612 return new StartOption(value); 1613 } 1614 1615 /** 1616 * Create an option to specify the number of bytes to read. 1617 * @param value the number of bytes to read 1618 * @return a new option 1619 */ 1620 public static Option length(long value) { 1621 return new LengthOption(value); 1622 } 1623 1624 /** 1625 * Create an option with the buffer size for reading the given pathname. 1626 * @param value the number of bytes to buffer 1627 * @return a new option 1628 */ 1629 public static Option bufferSize(int value) { 1630 return new BufferSizeOption(value); 1631 } 1632 1633 private static class FileOption extends Options.PathOption 1634 implements Option { 1635 private FileOption(Path value) { 1636 super(value); 1637 } 1638 } 1639 1640 private static class InputStreamOption 1641 extends Options.FSDataInputStreamOption 1642 implements Option { 1643 private InputStreamOption(FSDataInputStream value) { 1644 super(value); 1645 } 1646 } 1647 1648 private static class StartOption extends Options.LongOption 1649 implements Option { 1650 private StartOption(long value) { 1651 super(value); 1652 } 1653 } 1654 1655 private static class LengthOption extends Options.LongOption 1656 implements Option { 1657 private LengthOption(long value) { 1658 super(value); 1659 } 1660 } 1661 1662 private static class BufferSizeOption extends Options.IntegerOption 1663 implements Option { 1664 private BufferSizeOption(int value) { 1665 super(value); 1666 } 1667 } 1668 1669 // only used directly 1670 private static class OnlyHeaderOption extends Options.BooleanOption 1671 implements Option { 1672 private OnlyHeaderOption() { 1673 super(true); 1674 } 1675 } 1676 1677 public Reader(Configuration conf, Option... opts) throws IOException { 1678 // Look up the options, these are null if not set 1679 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1680 InputStreamOption streamOpt = 1681 Options.getOption(InputStreamOption.class, opts); 1682 StartOption startOpt = Options.getOption(StartOption.class, opts); 1683 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1684 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1685 OnlyHeaderOption headerOnly = 1686 Options.getOption(OnlyHeaderOption.class, opts); 1687 // check for consistency 1688 if ((fileOpt == null) == (streamOpt == null)) { 1689 throw new 1690 IllegalArgumentException("File or stream option must be specified"); 1691 } 1692 if (fileOpt == null && bufOpt != null) { 1693 throw new IllegalArgumentException("buffer size can only be set when" + 1694 " a file is specified."); 1695 } 1696 // figure out the real values 1697 Path filename = null; 1698 FSDataInputStream file; 1699 final long len; 1700 if (fileOpt != null) { 1701 filename = fileOpt.getValue(); 1702 FileSystem fs = filename.getFileSystem(conf); 1703 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1704 len = null == lenOpt 1705 ? fs.getFileStatus(filename).getLen() 1706 : lenOpt.getValue(); 1707 file = openFile(fs, filename, bufSize, len); 1708 } else { 1709 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1710 file = streamOpt.getValue(); 1711 } 1712 long start = startOpt == null ? 0 : startOpt.getValue(); 1713 // really set up 1714 initialize(filename, file, start, len, conf, headerOnly != null); 1715 } 1716 1717 /** 1718 * Construct a reader by opening a file from the given file system. 1719 * @param fs The file system used to open the file. 1720 * @param file The file being read. 1721 * @param conf Configuration 1722 * @throws IOException 1723 * @deprecated Use Reader(Configuration, Option...) instead. 1724 */ 1725 @Deprecated 1726 public Reader(FileSystem fs, Path file, 1727 Configuration conf) throws IOException { 1728 this(conf, file(file.makeQualified(fs))); 1729 } 1730 1731 /** 1732 * Construct a reader by the given input stream. 1733 * @param in An input stream. 1734 * @param buffersize unused 1735 * @param start The starting position. 1736 * @param length The length being read. 1737 * @param conf Configuration 1738 * @throws IOException 1739 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1740 */ 1741 @Deprecated 1742 public Reader(FSDataInputStream in, int buffersize, 1743 long start, long length, Configuration conf) throws IOException { 1744 this(conf, stream(in), start(start), length(length)); 1745 } 1746 1747 /** Common work of the constructors. */ 1748 private void initialize(Path filename, FSDataInputStream in, 1749 long start, long length, Configuration conf, 1750 boolean tempReader) throws IOException { 1751 if (in == null) { 1752 throw new IllegalArgumentException("in == null"); 1753 } 1754 this.filename = filename == null ? "<unknown>" : filename.toString(); 1755 this.in = in; 1756 this.conf = conf; 1757 boolean succeeded = false; 1758 try { 1759 seek(start); 1760 this.end = this.in.getPos() + length; 1761 // if it wrapped around, use the max 1762 if (end < length) { 1763 end = Long.MAX_VALUE; 1764 } 1765 init(tempReader); 1766 succeeded = true; 1767 } finally { 1768 if (!succeeded) { 1769 IOUtils.cleanup(LOG, this.in); 1770 } 1771 } 1772 } 1773 1774 /** 1775 * Override this method to specialize the type of 1776 * {@link FSDataInputStream} returned. 1777 * @param fs The file system used to open the file. 1778 * @param file The file being read. 1779 * @param bufferSize The buffer size used to read the file. 1780 * @param length The length being read if it is >= 0. Otherwise, 1781 * the length is not available. 1782 * @return The opened stream. 1783 * @throws IOException 1784 */ 1785 protected FSDataInputStream openFile(FileSystem fs, Path file, 1786 int bufferSize, long length) throws IOException { 1787 return fs.open(file, bufferSize); 1788 } 1789 1790 /** 1791 * Initialize the {@link Reader} 1792 * @param tmpReader <code>true</code> if we are constructing a temporary 1793 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1794 * and hence do not initialize every component; 1795 * <code>false</code> otherwise. 1796 * @throws IOException 1797 */ 1798 private void init(boolean tempReader) throws IOException { 1799 byte[] versionBlock = new byte[VERSION.length]; 1800 in.readFully(versionBlock); 1801 1802 if ((versionBlock[0] != VERSION[0]) || 1803 (versionBlock[1] != VERSION[1]) || 1804 (versionBlock[2] != VERSION[2])) 1805 throw new IOException(this + " not a SequenceFile"); 1806 1807 // Set 'version' 1808 version = versionBlock[3]; 1809 if (version > VERSION[3]) 1810 throw new VersionMismatchException(VERSION[3], version); 1811 1812 if (version < BLOCK_COMPRESS_VERSION) { 1813 UTF8 className = new UTF8(); 1814 1815 className.readFields(in); 1816 keyClassName = className.toString(); // key class name 1817 1818 className.readFields(in); 1819 valClassName = className.toString(); // val class name 1820 } else { 1821 keyClassName = Text.readString(in); 1822 valClassName = Text.readString(in); 1823 } 1824 1825 if (version > 2) { // if version > 2 1826 this.decompress = in.readBoolean(); // is compressed? 1827 } else { 1828 decompress = false; 1829 } 1830 1831 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1832 this.blockCompressed = in.readBoolean(); // is block-compressed? 1833 } else { 1834 blockCompressed = false; 1835 } 1836 1837 // if version >= 5 1838 // setup the compression codec 1839 if (decompress) { 1840 if (version >= CUSTOM_COMPRESS_VERSION) { 1841 String codecClassname = Text.readString(in); 1842 try { 1843 Class<? extends CompressionCodec> codecClass 1844 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1845 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1846 } catch (ClassNotFoundException cnfe) { 1847 throw new IllegalArgumentException("Unknown codec: " + 1848 codecClassname, cnfe); 1849 } 1850 } else { 1851 codec = new DefaultCodec(); 1852 ((Configurable)codec).setConf(conf); 1853 } 1854 } 1855 1856 this.metadata = new Metadata(); 1857 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1858 this.metadata.readFields(in); 1859 } 1860 1861 if (version > 1) { // if version > 1 1862 in.readFully(sync); // read sync bytes 1863 headerEnd = in.getPos(); // record end of header 1864 } 1865 1866 // Initialize... *not* if this we are constructing a temporary Reader 1867 if (!tempReader) { 1868 valBuffer = new DataInputBuffer(); 1869 if (decompress) { 1870 valDecompressor = CodecPool.getDecompressor(codec); 1871 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1872 valIn = new DataInputStream(valInFilter); 1873 } else { 1874 valIn = valBuffer; 1875 } 1876 1877 if (blockCompressed) { 1878 keyLenBuffer = new DataInputBuffer(); 1879 keyBuffer = new DataInputBuffer(); 1880 valLenBuffer = new DataInputBuffer(); 1881 1882 keyLenDecompressor = CodecPool.getDecompressor(codec); 1883 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1884 keyLenDecompressor); 1885 keyLenIn = new DataInputStream(keyLenInFilter); 1886 1887 keyDecompressor = CodecPool.getDecompressor(codec); 1888 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 1889 keyIn = new DataInputStream(keyInFilter); 1890 1891 valLenDecompressor = CodecPool.getDecompressor(codec); 1892 valLenInFilter = codec.createInputStream(valLenBuffer, 1893 valLenDecompressor); 1894 valLenIn = new DataInputStream(valLenInFilter); 1895 } 1896 1897 SerializationFactory serializationFactory = 1898 new SerializationFactory(conf); 1899 this.keyDeserializer = 1900 getDeserializer(serializationFactory, getKeyClass()); 1901 if (!blockCompressed) { 1902 this.keyDeserializer.open(valBuffer); 1903 } else { 1904 this.keyDeserializer.open(keyIn); 1905 } 1906 this.valDeserializer = 1907 getDeserializer(serializationFactory, getValueClass()); 1908 this.valDeserializer.open(valIn); 1909 } 1910 } 1911 1912 @SuppressWarnings("unchecked") 1913 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 1914 return sf.getDeserializer(c); 1915 } 1916 1917 /** Close the file. */ 1918 public synchronized void close() throws IOException { 1919 // Return the decompressors to the pool 1920 CodecPool.returnDecompressor(keyLenDecompressor); 1921 CodecPool.returnDecompressor(keyDecompressor); 1922 CodecPool.returnDecompressor(valLenDecompressor); 1923 CodecPool.returnDecompressor(valDecompressor); 1924 keyLenDecompressor = keyDecompressor = null; 1925 valLenDecompressor = valDecompressor = null; 1926 1927 if (keyDeserializer != null) { 1928 keyDeserializer.close(); 1929 } 1930 if (valDeserializer != null) { 1931 valDeserializer.close(); 1932 } 1933 1934 // Close the input-stream 1935 in.close(); 1936 } 1937 1938 /** Returns the name of the key class. */ 1939 public String getKeyClassName() { 1940 return keyClassName; 1941 } 1942 1943 /** Returns the class of keys in this file. */ 1944 public synchronized Class<?> getKeyClass() { 1945 if (null == keyClass) { 1946 try { 1947 keyClass = WritableName.getClass(getKeyClassName(), conf); 1948 } catch (IOException e) { 1949 throw new RuntimeException(e); 1950 } 1951 } 1952 return keyClass; 1953 } 1954 1955 /** Returns the name of the value class. */ 1956 public String getValueClassName() { 1957 return valClassName; 1958 } 1959 1960 /** Returns the class of values in this file. */ 1961 public synchronized Class<?> getValueClass() { 1962 if (null == valClass) { 1963 try { 1964 valClass = WritableName.getClass(getValueClassName(), conf); 1965 } catch (IOException e) { 1966 throw new RuntimeException(e); 1967 } 1968 } 1969 return valClass; 1970 } 1971 1972 /** Returns true if values are compressed. */ 1973 public boolean isCompressed() { return decompress; } 1974 1975 /** Returns true if records are block-compressed. */ 1976 public boolean isBlockCompressed() { return blockCompressed; } 1977 1978 /** Returns the compression codec of data in this file. */ 1979 public CompressionCodec getCompressionCodec() { return codec; } 1980 1981 /** 1982 * Get the compression type for this file. 1983 * @return the compression type 1984 */ 1985 public CompressionType getCompressionType() { 1986 if (decompress) { 1987 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 1988 } else { 1989 return CompressionType.NONE; 1990 } 1991 } 1992 1993 /** Returns the metadata object of the file */ 1994 public Metadata getMetadata() { 1995 return this.metadata; 1996 } 1997 1998 /** Returns the configuration used for this file. */ 1999 Configuration getConf() { return conf; } 2000 2001 /** Read a compressed buffer */ 2002 private synchronized void readBuffer(DataInputBuffer buffer, 2003 CompressionInputStream filter) throws IOException { 2004 // Read data into a temporary buffer 2005 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2006 2007 try { 2008 int dataBufferLength = WritableUtils.readVInt(in); 2009 dataBuffer.write(in, dataBufferLength); 2010 2011 // Set up 'buffer' connected to the input-stream 2012 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2013 } finally { 2014 dataBuffer.close(); 2015 } 2016 2017 // Reset the codec 2018 filter.resetState(); 2019 } 2020 2021 /** Read the next 'compressed' block */ 2022 private synchronized void readBlock() throws IOException { 2023 // Check if we need to throw away a whole block of 2024 // 'values' due to 'lazy decompression' 2025 if (lazyDecompress && !valuesDecompressed) { 2026 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2027 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2028 } 2029 2030 // Reset internal states 2031 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2032 valuesDecompressed = false; 2033 2034 //Process sync 2035 if (sync != null) { 2036 in.readInt(); 2037 in.readFully(syncCheck); // read syncCheck 2038 if (!Arrays.equals(sync, syncCheck)) // check it 2039 throw new IOException("File is corrupt!"); 2040 } 2041 syncSeen = true; 2042 2043 // Read number of records in this block 2044 noBufferedRecords = WritableUtils.readVInt(in); 2045 2046 // Read key lengths and keys 2047 readBuffer(keyLenBuffer, keyLenInFilter); 2048 readBuffer(keyBuffer, keyInFilter); 2049 noBufferedKeys = noBufferedRecords; 2050 2051 // Read value lengths and values 2052 if (!lazyDecompress) { 2053 readBuffer(valLenBuffer, valLenInFilter); 2054 readBuffer(valBuffer, valInFilter); 2055 noBufferedValues = noBufferedRecords; 2056 valuesDecompressed = true; 2057 } 2058 } 2059 2060 /** 2061 * Position valLenIn/valIn to the 'value' 2062 * corresponding to the 'current' key 2063 */ 2064 private synchronized void seekToCurrentValue() throws IOException { 2065 if (!blockCompressed) { 2066 if (decompress) { 2067 valInFilter.resetState(); 2068 } 2069 valBuffer.reset(); 2070 } else { 2071 // Check if this is the first value in the 'block' to be read 2072 if (lazyDecompress && !valuesDecompressed) { 2073 // Read the value lengths and values 2074 readBuffer(valLenBuffer, valLenInFilter); 2075 readBuffer(valBuffer, valInFilter); 2076 noBufferedValues = noBufferedRecords; 2077 valuesDecompressed = true; 2078 } 2079 2080 // Calculate the no. of bytes to skip 2081 // Note: 'current' key has already been read! 2082 int skipValBytes = 0; 2083 int currentKey = noBufferedKeys + 1; 2084 for (int i=noBufferedValues; i > currentKey; --i) { 2085 skipValBytes += WritableUtils.readVInt(valLenIn); 2086 --noBufferedValues; 2087 } 2088 2089 // Skip to the 'val' corresponding to 'current' key 2090 if (skipValBytes > 0) { 2091 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2092 throw new IOException("Failed to seek to " + currentKey + 2093 "(th) value!"); 2094 } 2095 } 2096 } 2097 } 2098 2099 /** 2100 * Get the 'value' corresponding to the last read 'key'. 2101 * @param val : The 'value' to be read. 2102 * @throws IOException 2103 */ 2104 public synchronized void getCurrentValue(Writable val) 2105 throws IOException { 2106 if (val instanceof Configurable) { 2107 ((Configurable) val).setConf(this.conf); 2108 } 2109 2110 // Position stream to 'current' value 2111 seekToCurrentValue(); 2112 2113 if (!blockCompressed) { 2114 val.readFields(valIn); 2115 2116 if (valIn.read() > 0) { 2117 LOG.info("available bytes: " + valIn.available()); 2118 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2119 + " bytes, should read " + 2120 (valBuffer.getLength()-keyLength)); 2121 } 2122 } else { 2123 // Get the value 2124 int valLength = WritableUtils.readVInt(valLenIn); 2125 val.readFields(valIn); 2126 2127 // Read another compressed 'value' 2128 --noBufferedValues; 2129 2130 // Sanity check 2131 if ((valLength < 0) && LOG.isDebugEnabled()) { 2132 LOG.debug(val + " is a zero-length value"); 2133 } 2134 } 2135 2136 } 2137 2138 /** 2139 * Get the 'value' corresponding to the last read 'key'. 2140 * @param val : The 'value' to be read. 2141 * @throws IOException 2142 */ 2143 public synchronized Object getCurrentValue(Object val) 2144 throws IOException { 2145 if (val instanceof Configurable) { 2146 ((Configurable) val).setConf(this.conf); 2147 } 2148 2149 // Position stream to 'current' value 2150 seekToCurrentValue(); 2151 2152 if (!blockCompressed) { 2153 val = deserializeValue(val); 2154 2155 if (valIn.read() > 0) { 2156 LOG.info("available bytes: " + valIn.available()); 2157 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2158 + " bytes, should read " + 2159 (valBuffer.getLength()-keyLength)); 2160 } 2161 } else { 2162 // Get the value 2163 int valLength = WritableUtils.readVInt(valLenIn); 2164 val = deserializeValue(val); 2165 2166 // Read another compressed 'value' 2167 --noBufferedValues; 2168 2169 // Sanity check 2170 if ((valLength < 0) && LOG.isDebugEnabled()) { 2171 LOG.debug(val + " is a zero-length value"); 2172 } 2173 } 2174 return val; 2175 2176 } 2177 2178 @SuppressWarnings("unchecked") 2179 private Object deserializeValue(Object val) throws IOException { 2180 return valDeserializer.deserialize(val); 2181 } 2182 2183 /** Read the next key in the file into <code>key</code>, skipping its 2184 * value. True if another entry exists, and false at end of file. */ 2185 public synchronized boolean next(Writable key) throws IOException { 2186 if (key.getClass() != getKeyClass()) 2187 throw new IOException("wrong key class: "+key.getClass().getName() 2188 +" is not "+keyClass); 2189 2190 if (!blockCompressed) { 2191 outBuf.reset(); 2192 2193 keyLength = next(outBuf); 2194 if (keyLength < 0) 2195 return false; 2196 2197 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2198 2199 key.readFields(valBuffer); 2200 valBuffer.mark(0); 2201 if (valBuffer.getPosition() != keyLength) 2202 throw new IOException(key + " read " + valBuffer.getPosition() 2203 + " bytes, should read " + keyLength); 2204 } else { 2205 //Reset syncSeen 2206 syncSeen = false; 2207 2208 if (noBufferedKeys == 0) { 2209 try { 2210 readBlock(); 2211 } catch (EOFException eof) { 2212 return false; 2213 } 2214 } 2215 2216 int keyLength = WritableUtils.readVInt(keyLenIn); 2217 2218 // Sanity check 2219 if (keyLength < 0) { 2220 return false; 2221 } 2222 2223 //Read another compressed 'key' 2224 key.readFields(keyIn); 2225 --noBufferedKeys; 2226 } 2227 2228 return true; 2229 } 2230 2231 /** Read the next key/value pair in the file into <code>key</code> and 2232 * <code>val</code>. Returns true if such a pair exists and false when at 2233 * end of file */ 2234 public synchronized boolean next(Writable key, Writable val) 2235 throws IOException { 2236 if (val.getClass() != getValueClass()) 2237 throw new IOException("wrong value class: "+val+" is not "+valClass); 2238 2239 boolean more = next(key); 2240 2241 if (more) { 2242 getCurrentValue(val); 2243 } 2244 2245 return more; 2246 } 2247 2248 /** 2249 * Read and return the next record length, potentially skipping over 2250 * a sync block. 2251 * @return the length of the next record or -1 if there is no next record 2252 * @throws IOException 2253 */ 2254 private synchronized int readRecordLength() throws IOException { 2255 if (in.getPos() >= end) { 2256 return -1; 2257 } 2258 int length = in.readInt(); 2259 if (version > 1 && sync != null && 2260 length == SYNC_ESCAPE) { // process a sync entry 2261 in.readFully(syncCheck); // read syncCheck 2262 if (!Arrays.equals(sync, syncCheck)) // check it 2263 throw new IOException("File is corrupt!"); 2264 syncSeen = true; 2265 if (in.getPos() >= end) { 2266 return -1; 2267 } 2268 length = in.readInt(); // re-read length 2269 } else { 2270 syncSeen = false; 2271 } 2272 2273 return length; 2274 } 2275 2276 /** Read the next key/value pair in the file into <code>buffer</code>. 2277 * Returns the length of the key read, or -1 if at end of file. The length 2278 * of the value may be computed by calling buffer.getLength() before and 2279 * after calls to this method. */ 2280 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2281 @Deprecated 2282 synchronized int next(DataOutputBuffer buffer) throws IOException { 2283 // Unsupported for block-compressed sequence files 2284 if (blockCompressed) { 2285 throw new IOException("Unsupported call for block-compressed" + 2286 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2287 } 2288 try { 2289 int length = readRecordLength(); 2290 if (length == -1) { 2291 return -1; 2292 } 2293 int keyLength = in.readInt(); 2294 buffer.write(in, length); 2295 return keyLength; 2296 } catch (ChecksumException e) { // checksum failure 2297 handleChecksumException(e); 2298 return next(buffer); 2299 } 2300 } 2301 2302 public ValueBytes createValueBytes() { 2303 ValueBytes val = null; 2304 if (!decompress || blockCompressed) { 2305 val = new UncompressedBytes(); 2306 } else { 2307 val = new CompressedBytes(codec); 2308 } 2309 return val; 2310 } 2311 2312 /** 2313 * Read 'raw' records. 2314 * @param key - The buffer into which the key is read 2315 * @param val - The 'raw' value 2316 * @return Returns the total record length or -1 for end of file 2317 * @throws IOException 2318 */ 2319 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2320 throws IOException { 2321 if (!blockCompressed) { 2322 int length = readRecordLength(); 2323 if (length == -1) { 2324 return -1; 2325 } 2326 int keyLength = in.readInt(); 2327 int valLength = length - keyLength; 2328 key.write(in, keyLength); 2329 if (decompress) { 2330 CompressedBytes value = (CompressedBytes)val; 2331 value.reset(in, valLength); 2332 } else { 2333 UncompressedBytes value = (UncompressedBytes)val; 2334 value.reset(in, valLength); 2335 } 2336 2337 return length; 2338 } else { 2339 //Reset syncSeen 2340 syncSeen = false; 2341 2342 // Read 'key' 2343 if (noBufferedKeys == 0) { 2344 if (in.getPos() >= end) 2345 return -1; 2346 2347 try { 2348 readBlock(); 2349 } catch (EOFException eof) { 2350 return -1; 2351 } 2352 } 2353 int keyLength = WritableUtils.readVInt(keyLenIn); 2354 if (keyLength < 0) { 2355 throw new IOException("zero length key found!"); 2356 } 2357 key.write(keyIn, keyLength); 2358 --noBufferedKeys; 2359 2360 // Read raw 'value' 2361 seekToCurrentValue(); 2362 int valLength = WritableUtils.readVInt(valLenIn); 2363 UncompressedBytes rawValue = (UncompressedBytes)val; 2364 rawValue.reset(valIn, valLength); 2365 --noBufferedValues; 2366 2367 return (keyLength+valLength); 2368 } 2369 2370 } 2371 2372 /** 2373 * Read 'raw' keys. 2374 * @param key - The buffer into which the key is read 2375 * @return Returns the key length or -1 for end of file 2376 * @throws IOException 2377 */ 2378 public synchronized int nextRawKey(DataOutputBuffer key) 2379 throws IOException { 2380 if (!blockCompressed) { 2381 recordLength = readRecordLength(); 2382 if (recordLength == -1) { 2383 return -1; 2384 } 2385 keyLength = in.readInt(); 2386 key.write(in, keyLength); 2387 return keyLength; 2388 } else { 2389 //Reset syncSeen 2390 syncSeen = false; 2391 2392 // Read 'key' 2393 if (noBufferedKeys == 0) { 2394 if (in.getPos() >= end) 2395 return -1; 2396 2397 try { 2398 readBlock(); 2399 } catch (EOFException eof) { 2400 return -1; 2401 } 2402 } 2403 int keyLength = WritableUtils.readVInt(keyLenIn); 2404 if (keyLength < 0) { 2405 throw new IOException("zero length key found!"); 2406 } 2407 key.write(keyIn, keyLength); 2408 --noBufferedKeys; 2409 2410 return keyLength; 2411 } 2412 2413 } 2414 2415 /** Read the next key in the file, skipping its 2416 * value. Return null at end of file. */ 2417 public synchronized Object next(Object key) throws IOException { 2418 if (key != null && key.getClass() != getKeyClass()) { 2419 throw new IOException("wrong key class: "+key.getClass().getName() 2420 +" is not "+keyClass); 2421 } 2422 2423 if (!blockCompressed) { 2424 outBuf.reset(); 2425 2426 keyLength = next(outBuf); 2427 if (keyLength < 0) 2428 return null; 2429 2430 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2431 2432 key = deserializeKey(key); 2433 valBuffer.mark(0); 2434 if (valBuffer.getPosition() != keyLength) 2435 throw new IOException(key + " read " + valBuffer.getPosition() 2436 + " bytes, should read " + keyLength); 2437 } else { 2438 //Reset syncSeen 2439 syncSeen = false; 2440 2441 if (noBufferedKeys == 0) { 2442 try { 2443 readBlock(); 2444 } catch (EOFException eof) { 2445 return null; 2446 } 2447 } 2448 2449 int keyLength = WritableUtils.readVInt(keyLenIn); 2450 2451 // Sanity check 2452 if (keyLength < 0) { 2453 return null; 2454 } 2455 2456 //Read another compressed 'key' 2457 key = deserializeKey(key); 2458 --noBufferedKeys; 2459 } 2460 2461 return key; 2462 } 2463 2464 @SuppressWarnings("unchecked") 2465 private Object deserializeKey(Object key) throws IOException { 2466 return keyDeserializer.deserialize(key); 2467 } 2468 2469 /** 2470 * Read 'raw' values. 2471 * @param val - The 'raw' value 2472 * @return Returns the value length 2473 * @throws IOException 2474 */ 2475 public synchronized int nextRawValue(ValueBytes val) 2476 throws IOException { 2477 2478 // Position stream to current value 2479 seekToCurrentValue(); 2480 2481 if (!blockCompressed) { 2482 int valLength = recordLength - keyLength; 2483 if (decompress) { 2484 CompressedBytes value = (CompressedBytes)val; 2485 value.reset(in, valLength); 2486 } else { 2487 UncompressedBytes value = (UncompressedBytes)val; 2488 value.reset(in, valLength); 2489 } 2490 2491 return valLength; 2492 } else { 2493 int valLength = WritableUtils.readVInt(valLenIn); 2494 UncompressedBytes rawValue = (UncompressedBytes)val; 2495 rawValue.reset(valIn, valLength); 2496 --noBufferedValues; 2497 return valLength; 2498 } 2499 2500 } 2501 2502 private void handleChecksumException(ChecksumException e) 2503 throws IOException { 2504 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2505 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2506 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2507 } else { 2508 throw e; 2509 } 2510 } 2511 2512 /** disables sync. often invoked for tmp files */ 2513 synchronized void ignoreSync() { 2514 sync = null; 2515 } 2516 2517 /** Set the current byte position in the input file. 2518 * 2519 * <p>The position passed must be a position returned by {@link 2520 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2521 * position, use {@link SequenceFile.Reader#sync(long)}. 2522 */ 2523 public synchronized void seek(long position) throws IOException { 2524 in.seek(position); 2525 if (blockCompressed) { // trigger block read 2526 noBufferedKeys = 0; 2527 valuesDecompressed = true; 2528 } 2529 } 2530 2531 /** Seek to the next sync mark past a given position.*/ 2532 public synchronized void sync(long position) throws IOException { 2533 if (position+SYNC_SIZE >= end) { 2534 seek(end); 2535 return; 2536 } 2537 2538 if (position < headerEnd) { 2539 // seek directly to first record 2540 in.seek(headerEnd); 2541 // note the sync marker "seen" in the header 2542 syncSeen = true; 2543 return; 2544 } 2545 2546 try { 2547 seek(position+4); // skip escape 2548 in.readFully(syncCheck); 2549 int syncLen = sync.length; 2550 for (int i = 0; in.getPos() < end; i++) { 2551 int j = 0; 2552 for (; j < syncLen; j++) { 2553 if (sync[j] != syncCheck[(i+j)%syncLen]) 2554 break; 2555 } 2556 if (j == syncLen) { 2557 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2558 return; 2559 } 2560 syncCheck[i%syncLen] = in.readByte(); 2561 } 2562 } catch (ChecksumException e) { // checksum failure 2563 handleChecksumException(e); 2564 } 2565 } 2566 2567 /** Returns true iff the previous call to next passed a sync mark.*/ 2568 public synchronized boolean syncSeen() { return syncSeen; } 2569 2570 /** Return the current byte position in the input file. */ 2571 public synchronized long getPosition() throws IOException { 2572 return in.getPos(); 2573 } 2574 2575 /** Returns the name of the file. */ 2576 public String toString() { 2577 return filename; 2578 } 2579 2580 } 2581 2582 /** Sorts key/value pairs in a sequence-format file. 2583 * 2584 * <p>For best performance, applications should make sure that the {@link 2585 * Writable#readFields(DataInput)} implementation of their keys is 2586 * very efficient. In particular, it should avoid allocating memory. 2587 */ 2588 public static class Sorter { 2589 2590 private RawComparator comparator; 2591 2592 private MergeSort mergeSort; //the implementation of merge sort 2593 2594 private Path[] inFiles; // when merging or sorting 2595 2596 private Path outFile; 2597 2598 private int memory; // bytes 2599 private int factor; // merged per pass 2600 2601 private FileSystem fs = null; 2602 2603 private Class keyClass; 2604 private Class valClass; 2605 2606 private Configuration conf; 2607 private Metadata metadata; 2608 2609 private Progressable progressable = null; 2610 2611 /** Sort and merge files containing the named classes. */ 2612 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2613 Class valClass, Configuration conf) { 2614 this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf); 2615 } 2616 2617 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2618 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2619 Class valClass, Configuration conf) { 2620 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2621 } 2622 2623 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2624 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2625 Class valClass, Configuration conf, Metadata metadata) { 2626 this.fs = fs; 2627 this.comparator = comparator; 2628 this.keyClass = keyClass; 2629 this.valClass = valClass; 2630 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2631 this.factor = conf.getInt("io.sort.factor", 100); 2632 this.conf = conf; 2633 this.metadata = metadata; 2634 } 2635 2636 /** Set the number of streams to merge at once.*/ 2637 public void setFactor(int factor) { this.factor = factor; } 2638 2639 /** Get the number of streams to merge at once.*/ 2640 public int getFactor() { return factor; } 2641 2642 /** Set the total amount of buffer memory, in bytes.*/ 2643 public void setMemory(int memory) { this.memory = memory; } 2644 2645 /** Get the total amount of buffer memory, in bytes.*/ 2646 public int getMemory() { return memory; } 2647 2648 /** Set the progressable object in order to report progress. */ 2649 public void setProgressable(Progressable progressable) { 2650 this.progressable = progressable; 2651 } 2652 2653 /** 2654 * Perform a file sort from a set of input files into an output file. 2655 * @param inFiles the files to be sorted 2656 * @param outFile the sorted output file 2657 * @param deleteInput should the input files be deleted as they are read? 2658 */ 2659 public void sort(Path[] inFiles, Path outFile, 2660 boolean deleteInput) throws IOException { 2661 if (fs.exists(outFile)) { 2662 throw new IOException("already exists: " + outFile); 2663 } 2664 2665 this.inFiles = inFiles; 2666 this.outFile = outFile; 2667 2668 int segments = sortPass(deleteInput); 2669 if (segments > 1) { 2670 mergePass(outFile.getParent()); 2671 } 2672 } 2673 2674 /** 2675 * Perform a file sort from a set of input files and return an iterator. 2676 * @param inFiles the files to be sorted 2677 * @param tempDir the directory where temp files are created during sort 2678 * @param deleteInput should the input files be deleted as they are read? 2679 * @return iterator the RawKeyValueIterator 2680 */ 2681 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2682 boolean deleteInput) throws IOException { 2683 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2684 if (fs.exists(outFile)) { 2685 throw new IOException("already exists: " + outFile); 2686 } 2687 this.inFiles = inFiles; 2688 //outFile will basically be used as prefix for temp files in the cases 2689 //where sort outputs multiple sorted segments. For the single segment 2690 //case, the outputFile itself will contain the sorted data for that 2691 //segment 2692 this.outFile = outFile; 2693 2694 int segments = sortPass(deleteInput); 2695 if (segments > 1) 2696 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2697 tempDir); 2698 else if (segments == 1) 2699 return merge(new Path[]{outFile}, true, tempDir); 2700 else return null; 2701 } 2702 2703 /** 2704 * The backwards compatible interface to sort. 2705 * @param inFile the input file to sort 2706 * @param outFile the sorted output file 2707 */ 2708 public void sort(Path inFile, Path outFile) throws IOException { 2709 sort(new Path[]{inFile}, outFile, false); 2710 } 2711 2712 private int sortPass(boolean deleteInput) throws IOException { 2713 if(LOG.isDebugEnabled()) { 2714 LOG.debug("running sort pass"); 2715 } 2716 SortPass sortPass = new SortPass(); // make the SortPass 2717 sortPass.setProgressable(progressable); 2718 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2719 try { 2720 return sortPass.run(deleteInput); // run it 2721 } finally { 2722 sortPass.close(); // close it 2723 } 2724 } 2725 2726 private class SortPass { 2727 private int memoryLimit = memory/4; 2728 private int recordLimit = 1000000; 2729 2730 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2731 private byte[] rawBuffer; 2732 2733 private int[] keyOffsets = new int[1024]; 2734 private int[] pointers = new int[keyOffsets.length]; 2735 private int[] pointersCopy = new int[keyOffsets.length]; 2736 private int[] keyLengths = new int[keyOffsets.length]; 2737 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2738 2739 private ArrayList segmentLengths = new ArrayList(); 2740 2741 private Reader in = null; 2742 private FSDataOutputStream out = null; 2743 private FSDataOutputStream indexOut = null; 2744 private Path outName; 2745 2746 private Progressable progressable = null; 2747 2748 public int run(boolean deleteInput) throws IOException { 2749 int segments = 0; 2750 int currentFile = 0; 2751 boolean atEof = (currentFile >= inFiles.length); 2752 CompressionType compressionType; 2753 CompressionCodec codec = null; 2754 segmentLengths.clear(); 2755 if (atEof) { 2756 return 0; 2757 } 2758 2759 // Initialize 2760 in = new Reader(fs, inFiles[currentFile], conf); 2761 compressionType = in.getCompressionType(); 2762 codec = in.getCompressionCodec(); 2763 2764 for (int i=0; i < rawValues.length; ++i) { 2765 rawValues[i] = null; 2766 } 2767 2768 while (!atEof) { 2769 int count = 0; 2770 int bytesProcessed = 0; 2771 rawKeys.reset(); 2772 while (!atEof && 2773 bytesProcessed < memoryLimit && count < recordLimit) { 2774 2775 // Read a record into buffer 2776 // Note: Attempt to re-use 'rawValue' as far as possible 2777 int keyOffset = rawKeys.getLength(); 2778 ValueBytes rawValue = 2779 (count == keyOffsets.length || rawValues[count] == null) ? 2780 in.createValueBytes() : 2781 rawValues[count]; 2782 int recordLength = in.nextRaw(rawKeys, rawValue); 2783 if (recordLength == -1) { 2784 in.close(); 2785 if (deleteInput) { 2786 fs.delete(inFiles[currentFile], true); 2787 } 2788 currentFile += 1; 2789 atEof = currentFile >= inFiles.length; 2790 if (!atEof) { 2791 in = new Reader(fs, inFiles[currentFile], conf); 2792 } else { 2793 in = null; 2794 } 2795 continue; 2796 } 2797 2798 int keyLength = rawKeys.getLength() - keyOffset; 2799 2800 if (count == keyOffsets.length) 2801 grow(); 2802 2803 keyOffsets[count] = keyOffset; // update pointers 2804 pointers[count] = count; 2805 keyLengths[count] = keyLength; 2806 rawValues[count] = rawValue; 2807 2808 bytesProcessed += recordLength; 2809 count++; 2810 } 2811 2812 // buffer is full -- sort & flush it 2813 if(LOG.isDebugEnabled()) { 2814 LOG.debug("flushing segment " + segments); 2815 } 2816 rawBuffer = rawKeys.getData(); 2817 sort(count); 2818 // indicate we're making progress 2819 if (progressable != null) { 2820 progressable.progress(); 2821 } 2822 flush(count, bytesProcessed, compressionType, codec, 2823 segments==0 && atEof); 2824 segments++; 2825 } 2826 return segments; 2827 } 2828 2829 public void close() throws IOException { 2830 if (in != null) { 2831 in.close(); 2832 } 2833 if (out != null) { 2834 out.close(); 2835 } 2836 if (indexOut != null) { 2837 indexOut.close(); 2838 } 2839 } 2840 2841 private void grow() { 2842 int newLength = keyOffsets.length * 3 / 2; 2843 keyOffsets = grow(keyOffsets, newLength); 2844 pointers = grow(pointers, newLength); 2845 pointersCopy = new int[newLength]; 2846 keyLengths = grow(keyLengths, newLength); 2847 rawValues = grow(rawValues, newLength); 2848 } 2849 2850 private int[] grow(int[] old, int newLength) { 2851 int[] result = new int[newLength]; 2852 System.arraycopy(old, 0, result, 0, old.length); 2853 return result; 2854 } 2855 2856 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 2857 ValueBytes[] result = new ValueBytes[newLength]; 2858 System.arraycopy(old, 0, result, 0, old.length); 2859 for (int i=old.length; i < newLength; ++i) { 2860 result[i] = null; 2861 } 2862 return result; 2863 } 2864 2865 private void flush(int count, int bytesProcessed, 2866 CompressionType compressionType, 2867 CompressionCodec codec, 2868 boolean done) throws IOException { 2869 if (out == null) { 2870 outName = done ? outFile : outFile.suffix(".0"); 2871 out = fs.create(outName); 2872 if (!done) { 2873 indexOut = fs.create(outName.suffix(".index")); 2874 } 2875 } 2876 2877 long segmentStart = out.getPos(); 2878 Writer writer = createWriter(conf, Writer.stream(out), 2879 Writer.keyClass(keyClass), Writer.valueClass(valClass), 2880 Writer.compression(compressionType, codec), 2881 Writer.metadata(done ? metadata : new Metadata())); 2882 2883 if (!done) { 2884 writer.sync = null; // disable sync on temp files 2885 } 2886 2887 for (int i = 0; i < count; i++) { // write in sorted order 2888 int p = pointers[i]; 2889 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 2890 } 2891 writer.close(); 2892 2893 if (!done) { 2894 // Save the segment length 2895 WritableUtils.writeVLong(indexOut, segmentStart); 2896 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 2897 indexOut.flush(); 2898 } 2899 } 2900 2901 private void sort(int count) { 2902 System.arraycopy(pointers, 0, pointersCopy, 0, count); 2903 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 2904 } 2905 class SeqFileComparator implements Comparator<IntWritable> { 2906 public int compare(IntWritable I, IntWritable J) { 2907 return comparator.compare(rawBuffer, keyOffsets[I.get()], 2908 keyLengths[I.get()], rawBuffer, 2909 keyOffsets[J.get()], keyLengths[J.get()]); 2910 } 2911 } 2912 2913 /** set the progressable object in order to report progress */ 2914 public void setProgressable(Progressable progressable) 2915 { 2916 this.progressable = progressable; 2917 } 2918 2919 } // SequenceFile.Sorter.SortPass 2920 2921 /** The interface to iterate over raw keys/values of SequenceFiles. */ 2922 public static interface RawKeyValueIterator { 2923 /** Gets the current raw key 2924 * @return DataOutputBuffer 2925 * @throws IOException 2926 */ 2927 DataOutputBuffer getKey() throws IOException; 2928 /** Gets the current raw value 2929 * @return ValueBytes 2930 * @throws IOException 2931 */ 2932 ValueBytes getValue() throws IOException; 2933 /** Sets up the current key and value (for getKey and getValue) 2934 * @return true if there exists a key/value, false otherwise 2935 * @throws IOException 2936 */ 2937 boolean next() throws IOException; 2938 /** closes the iterator so that the underlying streams can be closed 2939 * @throws IOException 2940 */ 2941 void close() throws IOException; 2942 /** Gets the Progress object; this has a float (0.0 - 1.0) 2943 * indicating the bytes processed by the iterator so far 2944 */ 2945 Progress getProgress(); 2946 } 2947 2948 /** 2949 * Merges the list of segments of type <code>SegmentDescriptor</code> 2950 * @param segments the list of SegmentDescriptors 2951 * @param tmpDir the directory to write temporary files into 2952 * @return RawKeyValueIterator 2953 * @throws IOException 2954 */ 2955 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 2956 Path tmpDir) 2957 throws IOException { 2958 // pass in object to report progress, if present 2959 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 2960 return mQueue.merge(); 2961 } 2962 2963 /** 2964 * Merges the contents of files passed in Path[] using a max factor value 2965 * that is already set 2966 * @param inNames the array of path names 2967 * @param deleteInputs true if the input files should be deleted when 2968 * unnecessary 2969 * @param tmpDir the directory to write temporary files into 2970 * @return RawKeyValueIteratorMergeQueue 2971 * @throws IOException 2972 */ 2973 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 2974 Path tmpDir) 2975 throws IOException { 2976 return merge(inNames, deleteInputs, 2977 (inNames.length < factor) ? inNames.length : factor, 2978 tmpDir); 2979 } 2980 2981 /** 2982 * Merges the contents of files passed in Path[] 2983 * @param inNames the array of path names 2984 * @param deleteInputs true if the input files should be deleted when 2985 * unnecessary 2986 * @param factor the factor that will be used as the maximum merge fan-in 2987 * @param tmpDir the directory to write temporary files into 2988 * @return RawKeyValueIteratorMergeQueue 2989 * @throws IOException 2990 */ 2991 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 2992 int factor, Path tmpDir) 2993 throws IOException { 2994 //get the segments from inNames 2995 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 2996 for (int i = 0; i < inNames.length; i++) { 2997 SegmentDescriptor s = new SegmentDescriptor(0, 2998 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 2999 s.preserveInput(!deleteInputs); 3000 s.doSync(); 3001 a.add(s); 3002 } 3003 this.factor = factor; 3004 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3005 return mQueue.merge(); 3006 } 3007 3008 /** 3009 * Merges the contents of files passed in Path[] 3010 * @param inNames the array of path names 3011 * @param tempDir the directory for creating temp files during merge 3012 * @param deleteInputs true if the input files should be deleted when 3013 * unnecessary 3014 * @return RawKeyValueIteratorMergeQueue 3015 * @throws IOException 3016 */ 3017 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3018 boolean deleteInputs) 3019 throws IOException { 3020 //outFile will basically be used as prefix for temp files for the 3021 //intermediate merge outputs 3022 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3023 //get the segments from inNames 3024 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3025 for (int i = 0; i < inNames.length; i++) { 3026 SegmentDescriptor s = new SegmentDescriptor(0, 3027 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3028 s.preserveInput(!deleteInputs); 3029 s.doSync(); 3030 a.add(s); 3031 } 3032 factor = (inNames.length < factor) ? inNames.length : factor; 3033 // pass in object to report progress, if present 3034 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3035 return mQueue.merge(); 3036 } 3037 3038 /** 3039 * Clones the attributes (like compression of the input file and creates a 3040 * corresponding Writer 3041 * @param inputFile the path of the input file whose attributes should be 3042 * cloned 3043 * @param outputFile the path of the output file 3044 * @param prog the Progressable to report status during the file write 3045 * @return Writer 3046 * @throws IOException 3047 */ 3048 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3049 Progressable prog) throws IOException { 3050 Reader reader = new Reader(conf, 3051 Reader.file(inputFile), 3052 new Reader.OnlyHeaderOption()); 3053 CompressionType compress = reader.getCompressionType(); 3054 CompressionCodec codec = reader.getCompressionCodec(); 3055 reader.close(); 3056 3057 Writer writer = createWriter(conf, 3058 Writer.file(outputFile), 3059 Writer.keyClass(keyClass), 3060 Writer.valueClass(valClass), 3061 Writer.compression(compress, codec), 3062 Writer.progressable(prog)); 3063 return writer; 3064 } 3065 3066 /** 3067 * Writes records from RawKeyValueIterator into a file represented by the 3068 * passed writer 3069 * @param records the RawKeyValueIterator 3070 * @param writer the Writer created earlier 3071 * @throws IOException 3072 */ 3073 public void writeFile(RawKeyValueIterator records, Writer writer) 3074 throws IOException { 3075 while(records.next()) { 3076 writer.appendRaw(records.getKey().getData(), 0, 3077 records.getKey().getLength(), records.getValue()); 3078 } 3079 writer.sync(); 3080 } 3081 3082 /** Merge the provided files. 3083 * @param inFiles the array of input path names 3084 * @param outFile the final output file 3085 * @throws IOException 3086 */ 3087 public void merge(Path[] inFiles, Path outFile) throws IOException { 3088 if (fs.exists(outFile)) { 3089 throw new IOException("already exists: " + outFile); 3090 } 3091 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3092 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3093 3094 writeFile(r, writer); 3095 3096 writer.close(); 3097 } 3098 3099 /** sort calls this to generate the final merged output */ 3100 private int mergePass(Path tmpDir) throws IOException { 3101 if(LOG.isDebugEnabled()) { 3102 LOG.debug("running merge pass"); 3103 } 3104 Writer writer = cloneFileAttributes( 3105 outFile.suffix(".0"), outFile, null); 3106 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3107 outFile.suffix(".0.index"), tmpDir); 3108 writeFile(r, writer); 3109 3110 writer.close(); 3111 return 0; 3112 } 3113 3114 /** Used by mergePass to merge the output of the sort 3115 * @param inName the name of the input file containing sorted segments 3116 * @param indexIn the offsets of the sorted segments 3117 * @param tmpDir the relative directory to store intermediate results in 3118 * @return RawKeyValueIterator 3119 * @throws IOException 3120 */ 3121 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3122 throws IOException { 3123 //get the segments from indexIn 3124 //we create a SegmentContainer so that we can track segments belonging to 3125 //inName and delete inName as soon as we see that we have looked at all 3126 //the contained segments during the merge process & hence don't need 3127 //them anymore 3128 SegmentContainer container = new SegmentContainer(inName, indexIn); 3129 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3130 return mQueue.merge(); 3131 } 3132 3133 /** This class implements the core of the merge logic */ 3134 private class MergeQueue extends PriorityQueue 3135 implements RawKeyValueIterator { 3136 private boolean compress; 3137 private boolean blockCompress; 3138 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3139 private ValueBytes rawValue; 3140 private long totalBytesProcessed; 3141 private float progPerByte; 3142 private Progress mergeProgress = new Progress(); 3143 private Path tmpDir; 3144 private Progressable progress = null; //handle to the progress reporting object 3145 private SegmentDescriptor minSegment; 3146 3147 //a TreeMap used to store the segments sorted by size (segment offset and 3148 //segment path name is used to break ties between segments of same sizes) 3149 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3150 new TreeMap<SegmentDescriptor, Void>(); 3151 3152 @SuppressWarnings("unchecked") 3153 public void put(SegmentDescriptor stream) throws IOException { 3154 if (size() == 0) { 3155 compress = stream.in.isCompressed(); 3156 blockCompress = stream.in.isBlockCompressed(); 3157 } else if (compress != stream.in.isCompressed() || 3158 blockCompress != stream.in.isBlockCompressed()) { 3159 throw new IOException("All merged files must be compressed or not."); 3160 } 3161 super.put(stream); 3162 } 3163 3164 /** 3165 * A queue of file segments to merge 3166 * @param segments the file segments to merge 3167 * @param tmpDir a relative local directory to save intermediate files in 3168 * @param progress the reference to the Progressable object 3169 */ 3170 public MergeQueue(List <SegmentDescriptor> segments, 3171 Path tmpDir, Progressable progress) { 3172 int size = segments.size(); 3173 for (int i = 0; i < size; i++) { 3174 sortedSegmentSizes.put(segments.get(i), null); 3175 } 3176 this.tmpDir = tmpDir; 3177 this.progress = progress; 3178 } 3179 protected boolean lessThan(Object a, Object b) { 3180 // indicate we're making progress 3181 if (progress != null) { 3182 progress.progress(); 3183 } 3184 SegmentDescriptor msa = (SegmentDescriptor)a; 3185 SegmentDescriptor msb = (SegmentDescriptor)b; 3186 return comparator.compare(msa.getKey().getData(), 0, 3187 msa.getKey().getLength(), msb.getKey().getData(), 0, 3188 msb.getKey().getLength()) < 0; 3189 } 3190 public void close() throws IOException { 3191 SegmentDescriptor ms; // close inputs 3192 while ((ms = (SegmentDescriptor)pop()) != null) { 3193 ms.cleanup(); 3194 } 3195 minSegment = null; 3196 } 3197 public DataOutputBuffer getKey() throws IOException { 3198 return rawKey; 3199 } 3200 public ValueBytes getValue() throws IOException { 3201 return rawValue; 3202 } 3203 public boolean next() throws IOException { 3204 if (size() == 0) 3205 return false; 3206 if (minSegment != null) { 3207 //minSegment is non-null for all invocations of next except the first 3208 //one. For the first invocation, the priority queue is ready for use 3209 //but for the subsequent invocations, first adjust the queue 3210 adjustPriorityQueue(minSegment); 3211 if (size() == 0) { 3212 minSegment = null; 3213 return false; 3214 } 3215 } 3216 minSegment = (SegmentDescriptor)top(); 3217 long startPos = minSegment.in.getPosition(); // Current position in stream 3218 //save the raw key reference 3219 rawKey = minSegment.getKey(); 3220 //load the raw value. Re-use the existing rawValue buffer 3221 if (rawValue == null) { 3222 rawValue = minSegment.in.createValueBytes(); 3223 } 3224 minSegment.nextRawValue(rawValue); 3225 long endPos = minSegment.in.getPosition(); // End position after reading value 3226 updateProgress(endPos - startPos); 3227 return true; 3228 } 3229 3230 public Progress getProgress() { 3231 return mergeProgress; 3232 } 3233 3234 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3235 long startPos = ms.in.getPosition(); // Current position in stream 3236 boolean hasNext = ms.nextRawKey(); 3237 long endPos = ms.in.getPosition(); // End position after reading key 3238 updateProgress(endPos - startPos); 3239 if (hasNext) { 3240 adjustTop(); 3241 } else { 3242 pop(); 3243 ms.cleanup(); 3244 } 3245 } 3246 3247 private void updateProgress(long bytesProcessed) { 3248 totalBytesProcessed += bytesProcessed; 3249 if (progPerByte > 0) { 3250 mergeProgress.set(totalBytesProcessed * progPerByte); 3251 } 3252 } 3253 3254 /** This is the single level merge that is called multiple times 3255 * depending on the factor size and the number of segments 3256 * @return RawKeyValueIterator 3257 * @throws IOException 3258 */ 3259 public RawKeyValueIterator merge() throws IOException { 3260 //create the MergeStreams from the sorted map created in the constructor 3261 //and dump the final output to a file 3262 int numSegments = sortedSegmentSizes.size(); 3263 int origFactor = factor; 3264 int passNo = 1; 3265 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3266 do { 3267 //get the factor for this pass of merge 3268 factor = getPassFactor(passNo, numSegments); 3269 List<SegmentDescriptor> segmentsToMerge = 3270 new ArrayList<SegmentDescriptor>(); 3271 int segmentsConsidered = 0; 3272 int numSegmentsToConsider = factor; 3273 while (true) { 3274 //extract the smallest 'factor' number of segment pointers from the 3275 //TreeMap. Call cleanup on the empty segments (no key/value data) 3276 SegmentDescriptor[] mStream = 3277 getSegmentDescriptors(numSegmentsToConsider); 3278 for (int i = 0; i < mStream.length; i++) { 3279 if (mStream[i].nextRawKey()) { 3280 segmentsToMerge.add(mStream[i]); 3281 segmentsConsidered++; 3282 // Count the fact that we read some bytes in calling nextRawKey() 3283 updateProgress(mStream[i].in.getPosition()); 3284 } 3285 else { 3286 mStream[i].cleanup(); 3287 numSegments--; //we ignore this segment for the merge 3288 } 3289 } 3290 //if we have the desired number of segments 3291 //or looked at all available segments, we break 3292 if (segmentsConsidered == factor || 3293 sortedSegmentSizes.size() == 0) { 3294 break; 3295 } 3296 3297 numSegmentsToConsider = factor - segmentsConsidered; 3298 } 3299 //feed the streams to the priority queue 3300 initialize(segmentsToMerge.size()); clear(); 3301 for (int i = 0; i < segmentsToMerge.size(); i++) { 3302 put(segmentsToMerge.get(i)); 3303 } 3304 //if we have lesser number of segments remaining, then just return the 3305 //iterator, else do another single level merge 3306 if (numSegments <= factor) { 3307 //calculate the length of the remaining segments. Required for 3308 //calculating the merge progress 3309 long totalBytes = 0; 3310 for (int i = 0; i < segmentsToMerge.size(); i++) { 3311 totalBytes += segmentsToMerge.get(i).segmentLength; 3312 } 3313 if (totalBytes != 0) //being paranoid 3314 progPerByte = 1.0f / (float)totalBytes; 3315 //reset factor to what it originally was 3316 factor = origFactor; 3317 return this; 3318 } else { 3319 //we want to spread the creation of temp files on multiple disks if 3320 //available under the space constraints 3321 long approxOutputSize = 0; 3322 for (SegmentDescriptor s : segmentsToMerge) { 3323 approxOutputSize += s.segmentLength + 3324 ChecksumFileSystem.getApproxChkSumLength( 3325 s.segmentLength); 3326 } 3327 Path tmpFilename = 3328 new Path(tmpDir, "intermediate").suffix("." + passNo); 3329 3330 Path outputFile = lDirAlloc.getLocalPathForWrite( 3331 tmpFilename.toString(), 3332 approxOutputSize, conf); 3333 if(LOG.isDebugEnabled()) { 3334 LOG.debug("writing intermediate results to " + outputFile); 3335 } 3336 Writer writer = cloneFileAttributes( 3337 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3338 fs.makeQualified(outputFile), null); 3339 writer.sync = null; //disable sync for temp files 3340 writeFile(this, writer); 3341 writer.close(); 3342 3343 //we finished one single level merge; now clean up the priority 3344 //queue 3345 this.close(); 3346 3347 SegmentDescriptor tempSegment = 3348 new SegmentDescriptor(0, 3349 fs.getFileStatus(outputFile).getLen(), outputFile); 3350 //put the segment back in the TreeMap 3351 sortedSegmentSizes.put(tempSegment, null); 3352 numSegments = sortedSegmentSizes.size(); 3353 passNo++; 3354 } 3355 //we are worried about only the first pass merge factor. So reset the 3356 //factor to what it originally was 3357 factor = origFactor; 3358 } while(true); 3359 } 3360 3361 //Hadoop-591 3362 public int getPassFactor(int passNo, int numSegments) { 3363 if (passNo > 1 || numSegments <= factor || factor == 1) 3364 return factor; 3365 int mod = (numSegments - 1) % (factor - 1); 3366 if (mod == 0) 3367 return factor; 3368 return mod + 1; 3369 } 3370 3371 /** Return (& remove) the requested number of segment descriptors from the 3372 * sorted map. 3373 */ 3374 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3375 if (numDescriptors > sortedSegmentSizes.size()) 3376 numDescriptors = sortedSegmentSizes.size(); 3377 SegmentDescriptor[] SegmentDescriptors = 3378 new SegmentDescriptor[numDescriptors]; 3379 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3380 int i = 0; 3381 while (i < numDescriptors) { 3382 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3383 iter.remove(); 3384 } 3385 return SegmentDescriptors; 3386 } 3387 } // SequenceFile.Sorter.MergeQueue 3388 3389 /** This class defines a merge segment. This class can be subclassed to 3390 * provide a customized cleanup method implementation. In this 3391 * implementation, cleanup closes the file handle and deletes the file 3392 */ 3393 public class SegmentDescriptor implements Comparable { 3394 3395 long segmentOffset; //the start of the segment in the file 3396 long segmentLength; //the length of the segment 3397 Path segmentPathName; //the path name of the file containing the segment 3398 boolean ignoreSync = true; //set to true for temp files 3399 private Reader in = null; 3400 private DataOutputBuffer rawKey = null; //this will hold the current key 3401 private boolean preserveInput = false; //delete input segment files? 3402 3403 /** Constructs a segment 3404 * @param segmentOffset the offset of the segment in the file 3405 * @param segmentLength the length of the segment 3406 * @param segmentPathName the path name of the file containing the segment 3407 */ 3408 public SegmentDescriptor (long segmentOffset, long segmentLength, 3409 Path segmentPathName) { 3410 this.segmentOffset = segmentOffset; 3411 this.segmentLength = segmentLength; 3412 this.segmentPathName = segmentPathName; 3413 } 3414 3415 /** Do the sync checks */ 3416 public void doSync() {ignoreSync = false;} 3417 3418 /** Whether to delete the files when no longer needed */ 3419 public void preserveInput(boolean preserve) { 3420 preserveInput = preserve; 3421 } 3422 3423 public boolean shouldPreserveInput() { 3424 return preserveInput; 3425 } 3426 3427 public int compareTo(Object o) { 3428 SegmentDescriptor that = (SegmentDescriptor)o; 3429 if (this.segmentLength != that.segmentLength) { 3430 return (this.segmentLength < that.segmentLength ? -1 : 1); 3431 } 3432 if (this.segmentOffset != that.segmentOffset) { 3433 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3434 } 3435 return (this.segmentPathName.toString()). 3436 compareTo(that.segmentPathName.toString()); 3437 } 3438 3439 public boolean equals(Object o) { 3440 if (!(o instanceof SegmentDescriptor)) { 3441 return false; 3442 } 3443 SegmentDescriptor that = (SegmentDescriptor)o; 3444 if (this.segmentLength == that.segmentLength && 3445 this.segmentOffset == that.segmentOffset && 3446 this.segmentPathName.toString().equals( 3447 that.segmentPathName.toString())) { 3448 return true; 3449 } 3450 return false; 3451 } 3452 3453 public int hashCode() { 3454 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3455 } 3456 3457 /** Fills up the rawKey object with the key returned by the Reader 3458 * @return true if there is a key returned; false, otherwise 3459 * @throws IOException 3460 */ 3461 public boolean nextRawKey() throws IOException { 3462 if (in == null) { 3463 int bufferSize = getBufferSize(conf); 3464 Reader reader = new Reader(conf, 3465 Reader.file(segmentPathName), 3466 Reader.bufferSize(bufferSize), 3467 Reader.start(segmentOffset), 3468 Reader.length(segmentLength)); 3469 3470 //sometimes we ignore syncs especially for temp merge files 3471 if (ignoreSync) reader.ignoreSync(); 3472 3473 if (reader.getKeyClass() != keyClass) 3474 throw new IOException("wrong key class: " + reader.getKeyClass() + 3475 " is not " + keyClass); 3476 if (reader.getValueClass() != valClass) 3477 throw new IOException("wrong value class: "+reader.getValueClass()+ 3478 " is not " + valClass); 3479 this.in = reader; 3480 rawKey = new DataOutputBuffer(); 3481 } 3482 rawKey.reset(); 3483 int keyLength = 3484 in.nextRawKey(rawKey); 3485 return (keyLength >= 0); 3486 } 3487 3488 /** Fills up the passed rawValue with the value corresponding to the key 3489 * read earlier 3490 * @param rawValue 3491 * @return the length of the value 3492 * @throws IOException 3493 */ 3494 public int nextRawValue(ValueBytes rawValue) throws IOException { 3495 int valLength = in.nextRawValue(rawValue); 3496 return valLength; 3497 } 3498 3499 /** Returns the stored rawKey */ 3500 public DataOutputBuffer getKey() { 3501 return rawKey; 3502 } 3503 3504 /** closes the underlying reader */ 3505 private void close() throws IOException { 3506 this.in.close(); 3507 this.in = null; 3508 } 3509 3510 /** The default cleanup. Subclasses can override this with a custom 3511 * cleanup 3512 */ 3513 public void cleanup() throws IOException { 3514 close(); 3515 if (!preserveInput) { 3516 fs.delete(segmentPathName, true); 3517 } 3518 } 3519 } // SequenceFile.Sorter.SegmentDescriptor 3520 3521 /** This class provisions multiple segments contained within a single 3522 * file 3523 */ 3524 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3525 3526 SegmentContainer parentContainer = null; 3527 3528 /** Constructs a segment 3529 * @param segmentOffset the offset of the segment in the file 3530 * @param segmentLength the length of the segment 3531 * @param segmentPathName the path name of the file containing the segment 3532 * @param parent the parent SegmentContainer that holds the segment 3533 */ 3534 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3535 Path segmentPathName, SegmentContainer parent) { 3536 super(segmentOffset, segmentLength, segmentPathName); 3537 this.parentContainer = parent; 3538 } 3539 /** The default cleanup. Subclasses can override this with a custom 3540 * cleanup 3541 */ 3542 public void cleanup() throws IOException { 3543 super.close(); 3544 if (super.shouldPreserveInput()) return; 3545 parentContainer.cleanup(); 3546 } 3547 3548 public boolean equals(Object o) { 3549 if (!(o instanceof LinkedSegmentsDescriptor)) { 3550 return false; 3551 } 3552 return super.equals(o); 3553 } 3554 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3555 3556 /** The class that defines a container for segments to be merged. Primarily 3557 * required to delete temp files as soon as all the contained segments 3558 * have been looked at */ 3559 private class SegmentContainer { 3560 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3561 private int numSegmentsContained; //# of segments contained 3562 private Path inName; //input file from where segments are created 3563 3564 //the list of segments read from the file 3565 private ArrayList <SegmentDescriptor> segments = 3566 new ArrayList <SegmentDescriptor>(); 3567 /** This constructor is there primarily to serve the sort routine that 3568 * generates a single output file with an associated index file */ 3569 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3570 //get the segments from indexIn 3571 FSDataInputStream fsIndexIn = fs.open(indexIn); 3572 long end = fs.getFileStatus(indexIn).getLen(); 3573 while (fsIndexIn.getPos() < end) { 3574 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3575 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3576 Path segmentName = inName; 3577 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3578 segmentLength, segmentName, this)); 3579 } 3580 fsIndexIn.close(); 3581 fs.delete(indexIn, true); 3582 numSegmentsContained = segments.size(); 3583 this.inName = inName; 3584 } 3585 3586 public List <SegmentDescriptor> getSegmentList() { 3587 return segments; 3588 } 3589 public void cleanup() throws IOException { 3590 numSegmentsCleanedUp++; 3591 if (numSegmentsCleanedUp == numSegmentsContained) { 3592 fs.delete(inName, true); 3593 } 3594 } 3595 } //SequenceFile.Sorter.SegmentContainer 3596 3597 } // SequenceFile.Sorter 3598 3599 } // SequenceFile