001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.*;
022    import java.util.*;
023    import java.rmi.server.UID;
024    import java.security.MessageDigest;
025    import org.apache.commons.logging.*;
026    import org.apache.hadoop.util.Options;
027    import org.apache.hadoop.fs.*;
028    import org.apache.hadoop.fs.Options.CreateOpts;
029    import org.apache.hadoop.io.compress.CodecPool;
030    import org.apache.hadoop.io.compress.CompressionCodec;
031    import org.apache.hadoop.io.compress.CompressionInputStream;
032    import org.apache.hadoop.io.compress.CompressionOutputStream;
033    import org.apache.hadoop.io.compress.Compressor;
034    import org.apache.hadoop.io.compress.Decompressor;
035    import org.apache.hadoop.io.compress.DefaultCodec;
036    import org.apache.hadoop.io.compress.GzipCodec;
037    import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038    import org.apache.hadoop.io.serializer.Deserializer;
039    import org.apache.hadoop.io.serializer.Serializer;
040    import org.apache.hadoop.io.serializer.SerializationFactory;
041    import org.apache.hadoop.classification.InterfaceAudience;
042    import org.apache.hadoop.classification.InterfaceStability;
043    import org.apache.hadoop.conf.*;
044    import org.apache.hadoop.util.Progressable;
045    import org.apache.hadoop.util.Progress;
046    import org.apache.hadoop.util.ReflectionUtils;
047    import org.apache.hadoop.util.NativeCodeLoader;
048    import org.apache.hadoop.util.MergeSort;
049    import org.apache.hadoop.util.PriorityQueue;
050    import org.apache.hadoop.util.Time;
051    
052    /** 
053     * <code>SequenceFile</code>s are flat files consisting of binary key/value 
054     * pairs.
055     * 
056     * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
057     * {@link Sorter} classes for writing, reading and sorting respectively.</p>
058     * 
059     * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
060     * {@link CompressionType} used to compress key/value pairs:
061     * <ol>
062     *   <li>
063     *   <code>Writer</code> : Uncompressed records.
064     *   </li>
065     *   <li>
066     *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
067     *                                       values.
068     *   </li>
069     *   <li>
070     *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
071     *                                      values are collected in 'blocks' 
072     *                                      separately and compressed. The size of 
073     *                                      the 'block' is configurable.
074     * </ol>
075     * 
076     * <p>The actual compression algorithm used to compress key and/or values can be
077     * specified by using the appropriate {@link CompressionCodec}.</p>
078     * 
079     * <p>The recommended way is to use the static <tt>createWriter</tt> methods
080     * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
081     *
082     * <p>The {@link Reader} acts as the bridge and can read any of the above 
083     * <code>SequenceFile</code> formats.</p>
084     *
085     * <h4 id="Formats">SequenceFile Formats</h4>
086     * 
087     * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
088     * depending on the <code>CompressionType</code> specified. All of them share a
089     * <a href="#Header">common header</a> described below.
090     * 
091     * <h5 id="Header">SequenceFile Header</h5>
092     * <ul>
093     *   <li>
094     *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
095     *             version number (e.g. SEQ4 or SEQ6)
096     *   </li>
097     *   <li>
098     *   keyClassName -key class
099     *   </li>
100     *   <li>
101     *   valueClassName - value class
102     *   </li>
103     *   <li>
104     *   compression - A boolean which specifies if compression is turned on for 
105     *                 keys/values in this file.
106     *   </li>
107     *   <li>
108     *   blockCompression - A boolean which specifies if block-compression is 
109     *                      turned on for keys/values in this file.
110     *   </li>
111     *   <li>
112     *   compression codec - <code>CompressionCodec</code> class which is used for  
113     *                       compression of keys and/or values (if compression is 
114     *                       enabled).
115     *   </li>
116     *   <li>
117     *   metadata - {@link Metadata} for this file.
118     *   </li>
119     *   <li>
120     *   sync - A sync marker to denote end of the header.
121     *   </li>
122     * </ul>
123     * 
124     * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
125     * <ul>
126     * <li>
127     * <a href="#Header">Header</a>
128     * </li>
129     * <li>
130     * Record
131     *   <ul>
132     *     <li>Record length</li>
133     *     <li>Key length</li>
134     *     <li>Key</li>
135     *     <li>Value</li>
136     *   </ul>
137     * </li>
138     * <li>
139     * A sync-marker every few <code>100</code> bytes or so.
140     * </li>
141     * </ul>
142     *
143     * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
144     * <ul>
145     * <li>
146     * <a href="#Header">Header</a>
147     * </li>
148     * <li>
149     * Record
150     *   <ul>
151     *     <li>Record length</li>
152     *     <li>Key length</li>
153     *     <li>Key</li>
154     *     <li><i>Compressed</i> Value</li>
155     *   </ul>
156     * </li>
157     * <li>
158     * A sync-marker every few <code>100</code> bytes or so.
159     * </li>
160     * </ul>
161     * 
162     * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
163     * <ul>
164     * <li>
165     * <a href="#Header">Header</a>
166     * </li>
167     * <li>
168     * Record <i>Block</i>
169     *   <ul>
170     *     <li>Uncompressed number of records in the block</li>
171     *     <li>Compressed key-lengths block-size</li>
172     *     <li>Compressed key-lengths block</li>
173     *     <li>Compressed keys block-size</li>
174     *     <li>Compressed keys block</li>
175     *     <li>Compressed value-lengths block-size</li>
176     *     <li>Compressed value-lengths block</li>
177     *     <li>Compressed values block-size</li>
178     *     <li>Compressed values block</li>
179     *   </ul>
180     * </li>
181     * <li>
182     * A sync-marker every block.
183     * </li>
184     * </ul>
185     * 
186     * <p>The compressed blocks of key lengths and value lengths consist of the 
187     * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
188     * format.</p>
189     * 
190     * @see CompressionCodec
191     */
192    @InterfaceAudience.Public
193    @InterfaceStability.Stable
194    public class SequenceFile {
195      private static final Log LOG = LogFactory.getLog(SequenceFile.class);
196    
197      private SequenceFile() {}                         // no public ctor
198    
199      private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
200      private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
201      private static final byte VERSION_WITH_METADATA = (byte)6;
202      private static byte[] VERSION = new byte[] {
203        (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
204      };
205    
206      private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
207      private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
208      private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
209    
210      /** The number of bytes between sync points.*/
211      public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
212    
213      /** 
214       * The compression type used to compress key/value pairs in the 
215       * {@link SequenceFile}.
216       * 
217       * @see SequenceFile.Writer
218       */
219      public static enum CompressionType {
220        /** Do not compress records. */
221        NONE, 
222        /** Compress values only, each separately. */
223        RECORD,
224        /** Compress sequences of records together in blocks. */
225        BLOCK
226      }
227    
228      /**
229       * Get the compression type for the reduce outputs
230       * @param job the job config to look in
231       * @return the kind of compression to use
232       */
233      static public CompressionType getDefaultCompressionType(Configuration job) {
234        String name = job.get("io.seqfile.compression.type");
235        return name == null ? CompressionType.RECORD : 
236          CompressionType.valueOf(name);
237      }
238      
239      /**
240       * Set the default compression type for sequence files.
241       * @param job the configuration to modify
242       * @param val the new compression type (none, block, record)
243       */
244      static public void setDefaultCompressionType(Configuration job, 
245                                                   CompressionType val) {
246        job.set("io.seqfile.compression.type", val.toString());
247      }
248    
249      /**
250       * Create a new Writer with the given options.
251       * @param conf the configuration to use
252       * @param opts the options to create the file with
253       * @return a new Writer
254       * @throws IOException
255       */
256      public static Writer createWriter(Configuration conf, Writer.Option... opts
257                                        ) throws IOException {
258        Writer.CompressionOption compressionOption = 
259          Options.getOption(Writer.CompressionOption.class, opts);
260        CompressionType kind;
261        if (compressionOption != null) {
262          kind = compressionOption.getValue();
263        } else {
264          kind = getDefaultCompressionType(conf);
265          opts = Options.prependOptions(opts, Writer.compression(kind));
266        }
267        switch (kind) {
268          default:
269          case NONE:
270            return new Writer(conf, opts);
271          case RECORD:
272            return new RecordCompressWriter(conf, opts);
273          case BLOCK:
274            return new BlockCompressWriter(conf, opts);
275        }
276      }
277    
278      /**
279       * Construct the preferred type of SequenceFile Writer.
280       * @param fs The configured filesystem. 
281       * @param conf The configuration.
282       * @param name The name of the file. 
283       * @param keyClass The 'key' type.
284       * @param valClass The 'value' type.
285       * @return Returns the handle to the constructed SequenceFile Writer.
286       * @throws IOException
287       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
288       *     instead.
289       */
290      @Deprecated
291      public static Writer 
292        createWriter(FileSystem fs, Configuration conf, Path name, 
293                     Class keyClass, Class valClass) throws IOException {
294        return createWriter(conf, Writer.filesystem(fs),
295                            Writer.file(name), Writer.keyClass(keyClass),
296                            Writer.valueClass(valClass));
297      }
298      
299      /**
300       * Construct the preferred type of SequenceFile Writer.
301       * @param fs The configured filesystem. 
302       * @param conf The configuration.
303       * @param name The name of the file. 
304       * @param keyClass The 'key' type.
305       * @param valClass The 'value' type.
306       * @param compressionType The compression type.
307       * @return Returns the handle to the constructed SequenceFile Writer.
308       * @throws IOException
309       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
310       *     instead.
311       */
312      @Deprecated
313      public static Writer 
314        createWriter(FileSystem fs, Configuration conf, Path name, 
315                     Class keyClass, Class valClass, 
316                     CompressionType compressionType) throws IOException {
317        return createWriter(conf, Writer.filesystem(fs),
318                            Writer.file(name), Writer.keyClass(keyClass),
319                            Writer.valueClass(valClass), 
320                            Writer.compression(compressionType));
321      }
322      
323      /**
324       * Construct the preferred type of SequenceFile Writer.
325       * @param fs The configured filesystem. 
326       * @param conf The configuration.
327       * @param name The name of the file. 
328       * @param keyClass The 'key' type.
329       * @param valClass The 'value' type.
330       * @param compressionType The compression type.
331       * @param progress The Progressable object to track progress.
332       * @return Returns the handle to the constructed SequenceFile Writer.
333       * @throws IOException
334       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
335       *     instead.
336       */
337      @Deprecated
338      public static Writer
339        createWriter(FileSystem fs, Configuration conf, Path name, 
340                     Class keyClass, Class valClass, CompressionType compressionType,
341                     Progressable progress) throws IOException {
342        return createWriter(conf, Writer.file(name),
343                            Writer.filesystem(fs),
344                            Writer.keyClass(keyClass),
345                            Writer.valueClass(valClass), 
346                            Writer.compression(compressionType),
347                            Writer.progressable(progress));
348      }
349    
350      /**
351       * Construct the preferred type of SequenceFile Writer.
352       * @param fs The configured filesystem. 
353       * @param conf The configuration.
354       * @param name The name of the file. 
355       * @param keyClass The 'key' type.
356       * @param valClass The 'value' type.
357       * @param compressionType The compression type.
358       * @param codec The compression codec.
359       * @return Returns the handle to the constructed SequenceFile Writer.
360       * @throws IOException
361       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
362       *     instead.
363       */
364      @Deprecated
365      public static Writer 
366        createWriter(FileSystem fs, Configuration conf, Path name, 
367                     Class keyClass, Class valClass, CompressionType compressionType, 
368                     CompressionCodec codec) throws IOException {
369        return createWriter(conf, Writer.file(name),
370                            Writer.filesystem(fs),
371                            Writer.keyClass(keyClass),
372                            Writer.valueClass(valClass), 
373                            Writer.compression(compressionType, codec));
374      }
375      
376      /**
377       * Construct the preferred type of SequenceFile Writer.
378       * @param fs The configured filesystem. 
379       * @param conf The configuration.
380       * @param name The name of the file. 
381       * @param keyClass The 'key' type.
382       * @param valClass The 'value' type.
383       * @param compressionType The compression type.
384       * @param codec The compression codec.
385       * @param progress The Progressable object to track progress.
386       * @param metadata The metadata of the file.
387       * @return Returns the handle to the constructed SequenceFile Writer.
388       * @throws IOException
389       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
390       *     instead.
391       */
392      @Deprecated
393      public static Writer
394        createWriter(FileSystem fs, Configuration conf, Path name, 
395                     Class keyClass, Class valClass, 
396                     CompressionType compressionType, CompressionCodec codec,
397                     Progressable progress, Metadata metadata) throws IOException {
398        return createWriter(conf, Writer.file(name),
399                            Writer.filesystem(fs),
400                            Writer.keyClass(keyClass),
401                            Writer.valueClass(valClass),
402                            Writer.compression(compressionType, codec),
403                            Writer.progressable(progress),
404                            Writer.metadata(metadata));
405      }
406    
407      /**
408       * Construct the preferred type of SequenceFile Writer.
409       * @param fs The configured filesystem.
410       * @param conf The configuration.
411       * @param name The name of the file.
412       * @param keyClass The 'key' type.
413       * @param valClass The 'value' type.
414       * @param bufferSize buffer size for the underlaying outputstream.
415       * @param replication replication factor for the file.
416       * @param blockSize block size for the file.
417       * @param compressionType The compression type.
418       * @param codec The compression codec.
419       * @param progress The Progressable object to track progress.
420       * @param metadata The metadata of the file.
421       * @return Returns the handle to the constructed SequenceFile Writer.
422       * @throws IOException
423       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
424       *     instead.
425       */
426      @Deprecated
427      public static Writer
428        createWriter(FileSystem fs, Configuration conf, Path name,
429                     Class keyClass, Class valClass, int bufferSize,
430                     short replication, long blockSize,
431                     CompressionType compressionType, CompressionCodec codec,
432                     Progressable progress, Metadata metadata) throws IOException {
433        return createWriter(conf, Writer.file(name),
434                            Writer.filesystem(fs),
435                            Writer.keyClass(keyClass),
436                            Writer.valueClass(valClass), 
437                            Writer.bufferSize(bufferSize), 
438                            Writer.replication(replication),
439                            Writer.blockSize(blockSize),
440                            Writer.compression(compressionType, codec),
441                            Writer.progressable(progress),
442                            Writer.metadata(metadata));
443      }
444    
445      /**
446       * Construct the preferred type of SequenceFile Writer.
447       * @param fs The configured filesystem.
448       * @param conf The configuration.
449       * @param name The name of the file.
450       * @param keyClass The 'key' type.
451       * @param valClass The 'value' type.
452       * @param bufferSize buffer size for the underlaying outputstream.
453       * @param replication replication factor for the file.
454       * @param blockSize block size for the file.
455       * @param createParent create parent directory if non-existent
456       * @param compressionType The compression type.
457       * @param codec The compression codec.
458       * @param metadata The metadata of the file.
459       * @return Returns the handle to the constructed SequenceFile Writer.
460       * @throws IOException
461       */
462      @Deprecated
463      public static Writer
464      createWriter(FileSystem fs, Configuration conf, Path name,
465                   Class keyClass, Class valClass, int bufferSize,
466                   short replication, long blockSize, boolean createParent,
467                   CompressionType compressionType, CompressionCodec codec,
468                   Metadata metadata) throws IOException {
469        return createWriter(FileContext.getFileContext(fs.getUri(), conf),
470            conf, name, keyClass, valClass, compressionType, codec,
471            metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
472            CreateOpts.bufferSize(bufferSize),
473            createParent ? CreateOpts.createParent()
474                         : CreateOpts.donotCreateParent(),
475            CreateOpts.repFac(replication),
476            CreateOpts.blockSize(blockSize)
477          );
478      }
479    
480      /**
481       * Construct the preferred type of SequenceFile Writer.
482       * @param fc The context for the specified file.
483       * @param conf The configuration.
484       * @param name The name of the file.
485       * @param keyClass The 'key' type.
486       * @param valClass The 'value' type.
487       * @param compressionType The compression type.
488       * @param codec The compression codec.
489       * @param metadata The metadata of the file.
490       * @param createFlag gives the semantics of create: overwrite, append etc.
491       * @param opts file creation options; see {@link CreateOpts}.
492       * @return Returns the handle to the constructed SequenceFile Writer.
493       * @throws IOException
494       */
495      public static Writer
496      createWriter(FileContext fc, Configuration conf, Path name,
497                   Class keyClass, Class valClass,
498                   CompressionType compressionType, CompressionCodec codec,
499                   Metadata metadata,
500                   final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
501                   throws IOException {
502        return createWriter(conf, fc.create(name, createFlag, opts),
503              keyClass, valClass, compressionType, codec, metadata).ownStream();
504      }
505    
506      /**
507       * Construct the preferred type of SequenceFile Writer.
508       * @param fs The configured filesystem. 
509       * @param conf The configuration.
510       * @param name The name of the file. 
511       * @param keyClass The 'key' type.
512       * @param valClass The 'value' type.
513       * @param compressionType The compression type.
514       * @param codec The compression codec.
515       * @param progress The Progressable object to track progress.
516       * @return Returns the handle to the constructed SequenceFile Writer.
517       * @throws IOException
518       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
519       *     instead.
520       */
521      @Deprecated
522      public static Writer
523        createWriter(FileSystem fs, Configuration conf, Path name, 
524                     Class keyClass, Class valClass, 
525                     CompressionType compressionType, CompressionCodec codec,
526                     Progressable progress) throws IOException {
527        return createWriter(conf, Writer.file(name),
528                            Writer.filesystem(fs),
529                            Writer.keyClass(keyClass),
530                            Writer.valueClass(valClass),
531                            Writer.compression(compressionType, codec),
532                            Writer.progressable(progress));
533      }
534    
535      /**
536       * Construct the preferred type of 'raw' SequenceFile Writer.
537       * @param conf The configuration.
538       * @param out The stream on top which the writer is to be constructed.
539       * @param keyClass The 'key' type.
540       * @param valClass The 'value' type.
541       * @param compressionType The compression type.
542       * @param codec The compression codec.
543       * @param metadata The metadata of the file.
544       * @return Returns the handle to the constructed SequenceFile Writer.
545       * @throws IOException
546       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
547       *     instead.
548       */
549      @Deprecated
550      public static Writer
551        createWriter(Configuration conf, FSDataOutputStream out, 
552                     Class keyClass, Class valClass,
553                     CompressionType compressionType,
554                     CompressionCodec codec, Metadata metadata) throws IOException {
555        return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
556                            Writer.valueClass(valClass), 
557                            Writer.compression(compressionType, codec),
558                            Writer.metadata(metadata));
559      }
560      
561      /**
562       * Construct the preferred type of 'raw' SequenceFile Writer.
563       * @param conf The configuration.
564       * @param out The stream on top which the writer is to be constructed.
565       * @param keyClass The 'key' type.
566       * @param valClass The 'value' type.
567       * @param compressionType The compression type.
568       * @param codec The compression codec.
569       * @return Returns the handle to the constructed SequenceFile Writer.
570       * @throws IOException
571       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
572       *     instead.
573       */
574      @Deprecated
575      public static Writer
576        createWriter(Configuration conf, FSDataOutputStream out, 
577                     Class keyClass, Class valClass, CompressionType compressionType,
578                     CompressionCodec codec) throws IOException {
579        return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
580                            Writer.valueClass(valClass),
581                            Writer.compression(compressionType, codec));
582      }
583      
584    
585      /** The interface to 'raw' values of SequenceFiles. */
586      public static interface ValueBytes {
587    
588        /** Writes the uncompressed bytes to the outStream.
589         * @param outStream : Stream to write uncompressed bytes into.
590         * @throws IOException
591         */
592        public void writeUncompressedBytes(DataOutputStream outStream)
593          throws IOException;
594    
595        /** Write compressed bytes to outStream. 
596         * Note: that it will NOT compress the bytes if they are not compressed.
597         * @param outStream : Stream to write compressed bytes into.
598         */
599        public void writeCompressedBytes(DataOutputStream outStream) 
600          throws IllegalArgumentException, IOException;
601    
602        /**
603         * Size of stored data.
604         */
605        public int getSize();
606      }
607      
608      private static class UncompressedBytes implements ValueBytes {
609        private int dataSize;
610        private byte[] data;
611        
612        private UncompressedBytes() {
613          data = null;
614          dataSize = 0;
615        }
616        
617        private void reset(DataInputStream in, int length) throws IOException {
618          if (data == null) {
619            data = new byte[length];
620          } else if (length > data.length) {
621            data = new byte[Math.max(length, data.length * 2)];
622          }
623          dataSize = -1;
624          in.readFully(data, 0, length);
625          dataSize = length;
626        }
627        
628        public int getSize() {
629          return dataSize;
630        }
631        
632        public void writeUncompressedBytes(DataOutputStream outStream)
633          throws IOException {
634          outStream.write(data, 0, dataSize);
635        }
636    
637        public void writeCompressedBytes(DataOutputStream outStream) 
638          throws IllegalArgumentException, IOException {
639          throw 
640            new IllegalArgumentException("UncompressedBytes cannot be compressed!");
641        }
642    
643      } // UncompressedBytes
644      
645      private static class CompressedBytes implements ValueBytes {
646        private int dataSize;
647        private byte[] data;
648        DataInputBuffer rawData = null;
649        CompressionCodec codec = null;
650        CompressionInputStream decompressedStream = null;
651    
652        private CompressedBytes(CompressionCodec codec) {
653          data = null;
654          dataSize = 0;
655          this.codec = codec;
656        }
657    
658        private void reset(DataInputStream in, int length) throws IOException {
659          if (data == null) {
660            data = new byte[length];
661          } else if (length > data.length) {
662            data = new byte[Math.max(length, data.length * 2)];
663          } 
664          dataSize = -1;
665          in.readFully(data, 0, length);
666          dataSize = length;
667        }
668        
669        public int getSize() {
670          return dataSize;
671        }
672        
673        public void writeUncompressedBytes(DataOutputStream outStream)
674          throws IOException {
675          if (decompressedStream == null) {
676            rawData = new DataInputBuffer();
677            decompressedStream = codec.createInputStream(rawData);
678          } else {
679            decompressedStream.resetState();
680          }
681          rawData.reset(data, 0, dataSize);
682    
683          byte[] buffer = new byte[8192];
684          int bytesRead = 0;
685          while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
686            outStream.write(buffer, 0, bytesRead);
687          }
688        }
689    
690        public void writeCompressedBytes(DataOutputStream outStream) 
691          throws IllegalArgumentException, IOException {
692          outStream.write(data, 0, dataSize);
693        }
694    
695      } // CompressedBytes
696      
697      /**
698       * The class encapsulating with the metadata of a file.
699       * The metadata of a file is a list of attribute name/value
700       * pairs of Text type.
701       *
702       */
703      public static class Metadata implements Writable {
704    
705        private TreeMap<Text, Text> theMetadata;
706        
707        public Metadata() {
708          this(new TreeMap<Text, Text>());
709        }
710        
711        public Metadata(TreeMap<Text, Text> arg) {
712          if (arg == null) {
713            this.theMetadata = new TreeMap<Text, Text>();
714          } else {
715            this.theMetadata = arg;
716          }
717        }
718        
719        public Text get(Text name) {
720          return this.theMetadata.get(name);
721        }
722        
723        public void set(Text name, Text value) {
724          this.theMetadata.put(name, value);
725        }
726        
727        public TreeMap<Text, Text> getMetadata() {
728          return new TreeMap<Text, Text>(this.theMetadata);
729        }
730        
731        public void write(DataOutput out) throws IOException {
732          out.writeInt(this.theMetadata.size());
733          Iterator<Map.Entry<Text, Text>> iter =
734            this.theMetadata.entrySet().iterator();
735          while (iter.hasNext()) {
736            Map.Entry<Text, Text> en = iter.next();
737            en.getKey().write(out);
738            en.getValue().write(out);
739          }
740        }
741    
742        public void readFields(DataInput in) throws IOException {
743          int sz = in.readInt();
744          if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
745          this.theMetadata = new TreeMap<Text, Text>();
746          for (int i = 0; i < sz; i++) {
747            Text key = new Text();
748            Text val = new Text();
749            key.readFields(in);
750            val.readFields(in);
751            this.theMetadata.put(key, val);
752          }    
753        }
754    
755        public boolean equals(Object other) {
756          if (other == null) {
757            return false;
758          }
759          if (other.getClass() != this.getClass()) {
760            return false;
761          } else {
762            return equals((Metadata)other);
763          }
764        }
765        
766        public boolean equals(Metadata other) {
767          if (other == null) return false;
768          if (this.theMetadata.size() != other.theMetadata.size()) {
769            return false;
770          }
771          Iterator<Map.Entry<Text, Text>> iter1 =
772            this.theMetadata.entrySet().iterator();
773          Iterator<Map.Entry<Text, Text>> iter2 =
774            other.theMetadata.entrySet().iterator();
775          while (iter1.hasNext() && iter2.hasNext()) {
776            Map.Entry<Text, Text> en1 = iter1.next();
777            Map.Entry<Text, Text> en2 = iter2.next();
778            if (!en1.getKey().equals(en2.getKey())) {
779              return false;
780            }
781            if (!en1.getValue().equals(en2.getValue())) {
782              return false;
783            }
784          }
785          if (iter1.hasNext() || iter2.hasNext()) {
786            return false;
787          }
788          return true;
789        }
790    
791        public int hashCode() {
792          assert false : "hashCode not designed";
793          return 42; // any arbitrary constant will do 
794        }
795        
796        public String toString() {
797          StringBuilder sb = new StringBuilder();
798          sb.append("size: ").append(this.theMetadata.size()).append("\n");
799          Iterator<Map.Entry<Text, Text>> iter =
800            this.theMetadata.entrySet().iterator();
801          while (iter.hasNext()) {
802            Map.Entry<Text, Text> en = iter.next();
803            sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
804            sb.append("\n");
805          }
806          return sb.toString();
807        }
808      }
809      
810      /** Write key/value pairs to a sequence-format file. */
811      public static class Writer implements java.io.Closeable, Syncable {
812        private Configuration conf;
813        FSDataOutputStream out;
814        boolean ownOutputStream = true;
815        DataOutputBuffer buffer = new DataOutputBuffer();
816    
817        Class keyClass;
818        Class valClass;
819    
820        private final CompressionType compress;
821        CompressionCodec codec = null;
822        CompressionOutputStream deflateFilter = null;
823        DataOutputStream deflateOut = null;
824        Metadata metadata = null;
825        Compressor compressor = null;
826        
827        protected Serializer keySerializer;
828        protected Serializer uncompressedValSerializer;
829        protected Serializer compressedValSerializer;
830        
831        // Insert a globally unique 16-byte value every few entries, so that one
832        // can seek into the middle of a file and then synchronize with record
833        // starts and ends by scanning for this value.
834        long lastSyncPos;                     // position of last sync
835        byte[] sync;                          // 16 random bytes
836        {
837          try {                                       
838            MessageDigest digester = MessageDigest.getInstance("MD5");
839            long time = Time.now();
840            digester.update((new UID()+"@"+time).getBytes());
841            sync = digester.digest();
842          } catch (Exception e) {
843            throw new RuntimeException(e);
844          }
845        }
846    
847        public static interface Option {}
848        
849        static class FileOption extends Options.PathOption 
850                                        implements Option {
851          FileOption(Path path) {
852            super(path);
853          }
854        }
855    
856        /**
857         * @deprecated only used for backwards-compatibility in the createWriter methods
858         * that take FileSystem.
859         */
860        @Deprecated
861        private static class FileSystemOption implements Option {
862          private final FileSystem value;
863          protected FileSystemOption(FileSystem value) {
864            this.value = value;
865          }
866          public FileSystem getValue() {
867            return value;
868          }
869        }
870    
871        static class StreamOption extends Options.FSDataOutputStreamOption 
872                                  implements Option {
873          StreamOption(FSDataOutputStream stream) {
874            super(stream);
875          }
876        }
877    
878        static class BufferSizeOption extends Options.IntegerOption
879                                      implements Option {
880          BufferSizeOption(int value) {
881            super(value);
882          }
883        }
884        
885        static class BlockSizeOption extends Options.LongOption implements Option {
886          BlockSizeOption(long value) {
887            super(value);
888          }
889        }
890    
891        static class ReplicationOption extends Options.IntegerOption
892                                       implements Option {
893          ReplicationOption(int value) {
894            super(value);
895          }
896        }
897    
898        static class KeyClassOption extends Options.ClassOption implements Option {
899          KeyClassOption(Class<?> value) {
900            super(value);
901          }
902        }
903    
904        static class ValueClassOption extends Options.ClassOption
905                                              implements Option {
906          ValueClassOption(Class<?> value) {
907            super(value);
908          }
909        }
910    
911        static class MetadataOption implements Option {
912          private final Metadata value;
913          MetadataOption(Metadata value) {
914            this.value = value;
915          }
916          Metadata getValue() {
917            return value;
918          }
919        }
920    
921        static class ProgressableOption extends Options.ProgressableOption
922                                        implements Option {
923          ProgressableOption(Progressable value) {
924            super(value);
925          }
926        }
927    
928        private static class CompressionOption implements Option {
929          private final CompressionType value;
930          private final CompressionCodec codec;
931          CompressionOption(CompressionType value) {
932            this(value, null);
933          }
934          CompressionOption(CompressionType value, CompressionCodec codec) {
935            this.value = value;
936            this.codec = (CompressionType.NONE != value && null == codec)
937              ? new DefaultCodec()
938              : codec;
939          }
940          CompressionType getValue() {
941            return value;
942          }
943          CompressionCodec getCodec() {
944            return codec;
945          }
946        }
947        
948        public static Option file(Path value) {
949          return new FileOption(value);
950        }
951    
952        /**
953         * @deprecated only used for backwards-compatibility in the createWriter methods
954         * that take FileSystem.
955         */
956        @Deprecated
957        private static Option filesystem(FileSystem fs) {
958          return new SequenceFile.Writer.FileSystemOption(fs);
959        }
960        
961        public static Option bufferSize(int value) {
962          return new BufferSizeOption(value);
963        }
964        
965        public static Option stream(FSDataOutputStream value) {
966          return new StreamOption(value);
967        }
968        
969        public static Option replication(short value) {
970          return new ReplicationOption(value);
971        }
972        
973        public static Option blockSize(long value) {
974          return new BlockSizeOption(value);
975        }
976        
977        public static Option progressable(Progressable value) {
978          return new ProgressableOption(value);
979        }
980    
981        public static Option keyClass(Class<?> value) {
982          return new KeyClassOption(value);
983        }
984        
985        public static Option valueClass(Class<?> value) {
986          return new ValueClassOption(value);
987        }
988        
989        public static Option metadata(Metadata value) {
990          return new MetadataOption(value);
991        }
992    
993        public static Option compression(CompressionType value) {
994          return new CompressionOption(value);
995        }
996    
997        public static Option compression(CompressionType value,
998            CompressionCodec codec) {
999          return new CompressionOption(value, codec);
1000        }
1001        
1002        /**
1003         * Construct a uncompressed writer from a set of options.
1004         * @param conf the configuration to use
1005         * @param options the options used when creating the writer
1006         * @throws IOException if it fails
1007         */
1008        Writer(Configuration conf, 
1009               Option... opts) throws IOException {
1010          BlockSizeOption blockSizeOption = 
1011            Options.getOption(BlockSizeOption.class, opts);
1012          BufferSizeOption bufferSizeOption = 
1013            Options.getOption(BufferSizeOption.class, opts);
1014          ReplicationOption replicationOption = 
1015            Options.getOption(ReplicationOption.class, opts);
1016          ProgressableOption progressOption = 
1017            Options.getOption(ProgressableOption.class, opts);
1018          FileOption fileOption = Options.getOption(FileOption.class, opts);
1019          FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1020          StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1021          KeyClassOption keyClassOption = 
1022            Options.getOption(KeyClassOption.class, opts);
1023          ValueClassOption valueClassOption = 
1024            Options.getOption(ValueClassOption.class, opts);
1025          MetadataOption metadataOption = 
1026            Options.getOption(MetadataOption.class, opts);
1027          CompressionOption compressionTypeOption =
1028            Options.getOption(CompressionOption.class, opts);
1029          // check consistency of options
1030          if ((fileOption == null) == (streamOption == null)) {
1031            throw new IllegalArgumentException("file or stream must be specified");
1032          }
1033          if (fileOption == null && (blockSizeOption != null ||
1034                                     bufferSizeOption != null ||
1035                                     replicationOption != null ||
1036                                     progressOption != null)) {
1037            throw new IllegalArgumentException("file modifier options not " +
1038                                               "compatible with stream");
1039          }
1040    
1041          FSDataOutputStream out;
1042          boolean ownStream = fileOption != null;
1043          if (ownStream) {
1044            Path p = fileOption.getValue();
1045            FileSystem fs;
1046            if (fsOption != null) {
1047              fs = fsOption.getValue();
1048            } else {
1049              fs = p.getFileSystem(conf);
1050            }
1051            int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1052              bufferSizeOption.getValue();
1053            short replication = replicationOption == null ? 
1054              fs.getDefaultReplication(p) :
1055              (short) replicationOption.getValue();
1056            long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1057              blockSizeOption.getValue();
1058            Progressable progress = progressOption == null ? null :
1059              progressOption.getValue();
1060            out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1061          } else {
1062            out = streamOption.getValue();
1063          }
1064          Class<?> keyClass = keyClassOption == null ?
1065              Object.class : keyClassOption.getValue();
1066          Class<?> valueClass = valueClassOption == null ?
1067              Object.class : valueClassOption.getValue();
1068          Metadata metadata = metadataOption == null ?
1069              new Metadata() : metadataOption.getValue();
1070          this.compress = compressionTypeOption.getValue();
1071          final CompressionCodec codec = compressionTypeOption.getCodec();
1072          if (codec != null &&
1073              (codec instanceof GzipCodec) &&
1074              !NativeCodeLoader.isNativeCodeLoaded() &&
1075              !ZlibFactory.isNativeZlibLoaded(conf)) {
1076            throw new IllegalArgumentException("SequenceFile doesn't work with " +
1077                                               "GzipCodec without native-hadoop " +
1078                                               "code!");
1079          }
1080          init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1081        }
1082    
1083        /** Create the named file.
1084         * @deprecated Use 
1085         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1086         *   instead.
1087         */
1088        @Deprecated
1089        public Writer(FileSystem fs, Configuration conf, Path name, 
1090                      Class keyClass, Class valClass) throws IOException {
1091          this.compress = CompressionType.NONE;
1092          init(conf, fs.create(name), true, keyClass, valClass, null, 
1093               new Metadata());
1094        }
1095        
1096        /** Create the named file with write-progress reporter.
1097         * @deprecated Use 
1098         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1099         *   instead.
1100         */
1101        @Deprecated
1102        public Writer(FileSystem fs, Configuration conf, Path name, 
1103                      Class keyClass, Class valClass,
1104                      Progressable progress, Metadata metadata) throws IOException {
1105          this.compress = CompressionType.NONE;
1106          init(conf, fs.create(name, progress), true, keyClass, valClass,
1107               null, metadata);
1108        }
1109        
1110        /** Create the named file with write-progress reporter. 
1111         * @deprecated Use 
1112         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1113         *   instead.
1114         */
1115        @Deprecated
1116        public Writer(FileSystem fs, Configuration conf, Path name,
1117                      Class keyClass, Class valClass,
1118                      int bufferSize, short replication, long blockSize,
1119                      Progressable progress, Metadata metadata) throws IOException {
1120          this.compress = CompressionType.NONE;
1121          init(conf,
1122               fs.create(name, true, bufferSize, replication, blockSize, progress),
1123               true, keyClass, valClass, null, metadata);
1124        }
1125    
1126        boolean isCompressed() { return compress != CompressionType.NONE; }
1127        boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1128        
1129        Writer ownStream() { this.ownOutputStream = true; return this;  }
1130    
1131        /** Write and flush the file header. */
1132        private void writeFileHeader() 
1133          throws IOException {
1134          out.write(VERSION);
1135          Text.writeString(out, keyClass.getName());
1136          Text.writeString(out, valClass.getName());
1137          
1138          out.writeBoolean(this.isCompressed());
1139          out.writeBoolean(this.isBlockCompressed());
1140          
1141          if (this.isCompressed()) {
1142            Text.writeString(out, (codec.getClass()).getName());
1143          }
1144          this.metadata.write(out);
1145          out.write(sync);                       // write the sync bytes
1146          out.flush();                           // flush header
1147        }
1148        
1149        /** Initialize. */
1150        @SuppressWarnings("unchecked")
1151        void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1152                  Class keyClass, Class valClass,
1153                  CompressionCodec codec, Metadata metadata) 
1154          throws IOException {
1155          this.conf = conf;
1156          this.out = out;
1157          this.ownOutputStream = ownStream;
1158          this.keyClass = keyClass;
1159          this.valClass = valClass;
1160          this.codec = codec;
1161          this.metadata = metadata;
1162          SerializationFactory serializationFactory = new SerializationFactory(conf);
1163          this.keySerializer = serializationFactory.getSerializer(keyClass);
1164          this.keySerializer.open(buffer);
1165          this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1166          this.uncompressedValSerializer.open(buffer);
1167          if (this.codec != null) {
1168            ReflectionUtils.setConf(this.codec, this.conf);
1169            this.compressor = CodecPool.getCompressor(this.codec);
1170            this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1171            this.deflateOut = 
1172              new DataOutputStream(new BufferedOutputStream(deflateFilter));
1173            this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1174            this.compressedValSerializer.open(deflateOut);
1175          }
1176          writeFileHeader();
1177        }
1178        
1179        /** Returns the class of keys in this file. */
1180        public Class getKeyClass() { return keyClass; }
1181    
1182        /** Returns the class of values in this file. */
1183        public Class getValueClass() { return valClass; }
1184    
1185        /** Returns the compression codec of data in this file. */
1186        public CompressionCodec getCompressionCodec() { return codec; }
1187        
1188        /** create a sync point */
1189        public void sync() throws IOException {
1190          if (sync != null && lastSyncPos != out.getPos()) {
1191            out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1192            out.write(sync);                          // write sync
1193            lastSyncPos = out.getPos();               // update lastSyncPos
1194          }
1195        }
1196    
1197        /**
1198         * flush all currently written data to the file system
1199         * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1200         */
1201        @Deprecated
1202        public void syncFs() throws IOException {
1203          if (out != null) {
1204            out.sync();                               // flush contents to file system
1205          }
1206        }
1207    
1208        @Override
1209        public void hsync() throws IOException {
1210          if (out != null) {
1211            out.hsync();
1212          }
1213        }
1214    
1215        @Override
1216        public void hflush() throws IOException {
1217          if (out != null) {
1218            out.hflush();
1219          }
1220        }
1221        
1222        /** Returns the configuration of this file. */
1223        Configuration getConf() { return conf; }
1224        
1225        /** Close the file. */
1226        public synchronized void close() throws IOException {
1227          keySerializer.close();
1228          uncompressedValSerializer.close();
1229          if (compressedValSerializer != null) {
1230            compressedValSerializer.close();
1231          }
1232    
1233          CodecPool.returnCompressor(compressor);
1234          compressor = null;
1235          
1236          if (out != null) {
1237            
1238            // Close the underlying stream iff we own it...
1239            if (ownOutputStream) {
1240              out.close();
1241            } else {
1242              out.flush();
1243            }
1244            out = null;
1245          }
1246        }
1247    
1248        synchronized void checkAndWriteSync() throws IOException {
1249          if (sync != null &&
1250              out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1251            sync();
1252          }
1253        }
1254    
1255        /** Append a key/value pair. */
1256        public void append(Writable key, Writable val)
1257          throws IOException {
1258          append((Object) key, (Object) val);
1259        }
1260    
1261        /** Append a key/value pair. */
1262        @SuppressWarnings("unchecked")
1263        public synchronized void append(Object key, Object val)
1264          throws IOException {
1265          if (key.getClass() != keyClass)
1266            throw new IOException("wrong key class: "+key.getClass().getName()
1267                                  +" is not "+keyClass);
1268          if (val.getClass() != valClass)
1269            throw new IOException("wrong value class: "+val.getClass().getName()
1270                                  +" is not "+valClass);
1271    
1272          buffer.reset();
1273    
1274          // Append the 'key'
1275          keySerializer.serialize(key);
1276          int keyLength = buffer.getLength();
1277          if (keyLength < 0)
1278            throw new IOException("negative length keys not allowed: " + key);
1279    
1280          // Append the 'value'
1281          if (compress == CompressionType.RECORD) {
1282            deflateFilter.resetState();
1283            compressedValSerializer.serialize(val);
1284            deflateOut.flush();
1285            deflateFilter.finish();
1286          } else {
1287            uncompressedValSerializer.serialize(val);
1288          }
1289    
1290          // Write the record out
1291          checkAndWriteSync();                                // sync
1292          out.writeInt(buffer.getLength());                   // total record length
1293          out.writeInt(keyLength);                            // key portion length
1294          out.write(buffer.getData(), 0, buffer.getLength()); // data
1295        }
1296    
1297        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1298            int keyLength, ValueBytes val) throws IOException {
1299          if (keyLength < 0)
1300            throw new IOException("negative length keys not allowed: " + keyLength);
1301    
1302          int valLength = val.getSize();
1303    
1304          checkAndWriteSync();
1305          
1306          out.writeInt(keyLength+valLength);          // total record length
1307          out.writeInt(keyLength);                    // key portion length
1308          out.write(keyData, keyOffset, keyLength);   // key
1309          val.writeUncompressedBytes(out);            // value
1310        }
1311    
1312        /** Returns the current length of the output file.
1313         *
1314         * <p>This always returns a synchronized position.  In other words,
1315         * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1316         * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1317         * the key may be earlier in the file than key last written when this
1318         * method was called (e.g., with block-compression, it may be the first key
1319         * in the block that was being written when this method was called).
1320         */
1321        public synchronized long getLength() throws IOException {
1322          return out.getPos();
1323        }
1324    
1325      } // class Writer
1326    
1327      /** Write key/compressed-value pairs to a sequence-format file. */
1328      static class RecordCompressWriter extends Writer {
1329        
1330        RecordCompressWriter(Configuration conf, 
1331                             Option... options) throws IOException {
1332          super(conf, options);
1333        }
1334    
1335        /** Append a key/value pair. */
1336        @SuppressWarnings("unchecked")
1337        public synchronized void append(Object key, Object val)
1338          throws IOException {
1339          if (key.getClass() != keyClass)
1340            throw new IOException("wrong key class: "+key.getClass().getName()
1341                                  +" is not "+keyClass);
1342          if (val.getClass() != valClass)
1343            throw new IOException("wrong value class: "+val.getClass().getName()
1344                                  +" is not "+valClass);
1345    
1346          buffer.reset();
1347    
1348          // Append the 'key'
1349          keySerializer.serialize(key);
1350          int keyLength = buffer.getLength();
1351          if (keyLength < 0)
1352            throw new IOException("negative length keys not allowed: " + key);
1353    
1354          // Compress 'value' and append it
1355          deflateFilter.resetState();
1356          compressedValSerializer.serialize(val);
1357          deflateOut.flush();
1358          deflateFilter.finish();
1359    
1360          // Write the record out
1361          checkAndWriteSync();                                // sync
1362          out.writeInt(buffer.getLength());                   // total record length
1363          out.writeInt(keyLength);                            // key portion length
1364          out.write(buffer.getData(), 0, buffer.getLength()); // data
1365        }
1366    
1367        /** Append a key/value pair. */
1368        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1369            int keyLength, ValueBytes val) throws IOException {
1370    
1371          if (keyLength < 0)
1372            throw new IOException("negative length keys not allowed: " + keyLength);
1373    
1374          int valLength = val.getSize();
1375          
1376          checkAndWriteSync();                        // sync
1377          out.writeInt(keyLength+valLength);          // total record length
1378          out.writeInt(keyLength);                    // key portion length
1379          out.write(keyData, keyOffset, keyLength);   // 'key' data
1380          val.writeCompressedBytes(out);              // 'value' data
1381        }
1382        
1383      } // RecordCompressionWriter
1384    
1385      /** Write compressed key/value blocks to a sequence-format file. */
1386      static class BlockCompressWriter extends Writer {
1387        
1388        private int noBufferedRecords = 0;
1389        
1390        private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1391        private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1392    
1393        private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1394        private DataOutputBuffer valBuffer = new DataOutputBuffer();
1395    
1396        private final int compressionBlockSize;
1397        
1398        BlockCompressWriter(Configuration conf,
1399                            Option... options) throws IOException {
1400          super(conf, options);
1401          compressionBlockSize = 
1402            conf.getInt("io.seqfile.compress.blocksize", 1000000);
1403          keySerializer.close();
1404          keySerializer.open(keyBuffer);
1405          uncompressedValSerializer.close();
1406          uncompressedValSerializer.open(valBuffer);
1407        }
1408    
1409        /** Workhorse to check and write out compressed data/lengths */
1410        private synchronized 
1411          void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1412          throws IOException {
1413          deflateFilter.resetState();
1414          buffer.reset();
1415          deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1416                           uncompressedDataBuffer.getLength());
1417          deflateOut.flush();
1418          deflateFilter.finish();
1419          
1420          WritableUtils.writeVInt(out, buffer.getLength());
1421          out.write(buffer.getData(), 0, buffer.getLength());
1422        }
1423        
1424        /** Compress and flush contents to dfs */
1425        public synchronized void sync() throws IOException {
1426          if (noBufferedRecords > 0) {
1427            super.sync();
1428            
1429            // No. of records
1430            WritableUtils.writeVInt(out, noBufferedRecords);
1431            
1432            // Write 'keys' and lengths
1433            writeBuffer(keyLenBuffer);
1434            writeBuffer(keyBuffer);
1435            
1436            // Write 'values' and lengths
1437            writeBuffer(valLenBuffer);
1438            writeBuffer(valBuffer);
1439            
1440            // Flush the file-stream
1441            out.flush();
1442            
1443            // Reset internal states
1444            keyLenBuffer.reset();
1445            keyBuffer.reset();
1446            valLenBuffer.reset();
1447            valBuffer.reset();
1448            noBufferedRecords = 0;
1449          }
1450          
1451        }
1452        
1453        /** Close the file. */
1454        public synchronized void close() throws IOException {
1455          if (out != null) {
1456            sync();
1457          }
1458          super.close();
1459        }
1460    
1461        /** Append a key/value pair. */
1462        @SuppressWarnings("unchecked")
1463        public synchronized void append(Object key, Object val)
1464          throws IOException {
1465          if (key.getClass() != keyClass)
1466            throw new IOException("wrong key class: "+key+" is not "+keyClass);
1467          if (val.getClass() != valClass)
1468            throw new IOException("wrong value class: "+val+" is not "+valClass);
1469    
1470          // Save key/value into respective buffers 
1471          int oldKeyLength = keyBuffer.getLength();
1472          keySerializer.serialize(key);
1473          int keyLength = keyBuffer.getLength() - oldKeyLength;
1474          if (keyLength < 0)
1475            throw new IOException("negative length keys not allowed: " + key);
1476          WritableUtils.writeVInt(keyLenBuffer, keyLength);
1477    
1478          int oldValLength = valBuffer.getLength();
1479          uncompressedValSerializer.serialize(val);
1480          int valLength = valBuffer.getLength() - oldValLength;
1481          WritableUtils.writeVInt(valLenBuffer, valLength);
1482          
1483          // Added another key/value pair
1484          ++noBufferedRecords;
1485          
1486          // Compress and flush?
1487          int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1488          if (currentBlockSize >= compressionBlockSize) {
1489            sync();
1490          }
1491        }
1492        
1493        /** Append a key/value pair. */
1494        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1495            int keyLength, ValueBytes val) throws IOException {
1496          
1497          if (keyLength < 0)
1498            throw new IOException("negative length keys not allowed");
1499    
1500          int valLength = val.getSize();
1501          
1502          // Save key/value data in relevant buffers
1503          WritableUtils.writeVInt(keyLenBuffer, keyLength);
1504          keyBuffer.write(keyData, keyOffset, keyLength);
1505          WritableUtils.writeVInt(valLenBuffer, valLength);
1506          val.writeUncompressedBytes(valBuffer);
1507    
1508          // Added another key/value pair
1509          ++noBufferedRecords;
1510    
1511          // Compress and flush?
1512          int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1513          if (currentBlockSize >= compressionBlockSize) {
1514            sync();
1515          }
1516        }
1517      
1518      } // BlockCompressionWriter
1519    
1520      /** Get the configured buffer size */
1521      private static int getBufferSize(Configuration conf) {
1522        return conf.getInt("io.file.buffer.size", 4096);
1523      }
1524    
1525      /** Reads key/value pairs from a sequence-format file. */
1526      public static class Reader implements java.io.Closeable {
1527        private String filename;
1528        private FSDataInputStream in;
1529        private DataOutputBuffer outBuf = new DataOutputBuffer();
1530    
1531        private byte version;
1532    
1533        private String keyClassName;
1534        private String valClassName;
1535        private Class keyClass;
1536        private Class valClass;
1537    
1538        private CompressionCodec codec = null;
1539        private Metadata metadata = null;
1540        
1541        private byte[] sync = new byte[SYNC_HASH_SIZE];
1542        private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1543        private boolean syncSeen;
1544    
1545        private long headerEnd;
1546        private long end;
1547        private int keyLength;
1548        private int recordLength;
1549    
1550        private boolean decompress;
1551        private boolean blockCompressed;
1552        
1553        private Configuration conf;
1554    
1555        private int noBufferedRecords = 0;
1556        private boolean lazyDecompress = true;
1557        private boolean valuesDecompressed = true;
1558        
1559        private int noBufferedKeys = 0;
1560        private int noBufferedValues = 0;
1561        
1562        private DataInputBuffer keyLenBuffer = null;
1563        private CompressionInputStream keyLenInFilter = null;
1564        private DataInputStream keyLenIn = null;
1565        private Decompressor keyLenDecompressor = null;
1566        private DataInputBuffer keyBuffer = null;
1567        private CompressionInputStream keyInFilter = null;
1568        private DataInputStream keyIn = null;
1569        private Decompressor keyDecompressor = null;
1570    
1571        private DataInputBuffer valLenBuffer = null;
1572        private CompressionInputStream valLenInFilter = null;
1573        private DataInputStream valLenIn = null;
1574        private Decompressor valLenDecompressor = null;
1575        private DataInputBuffer valBuffer = null;
1576        private CompressionInputStream valInFilter = null;
1577        private DataInputStream valIn = null;
1578        private Decompressor valDecompressor = null;
1579        
1580        private Deserializer keyDeserializer;
1581        private Deserializer valDeserializer;
1582    
1583        /**
1584         * A tag interface for all of the Reader options
1585         */
1586        public static interface Option {}
1587        
1588        /**
1589         * Create an option to specify the path name of the sequence file.
1590         * @param value the path to read
1591         * @return a new option
1592         */
1593        public static Option file(Path value) {
1594          return new FileOption(value);
1595        }
1596        
1597        /**
1598         * Create an option to specify the stream with the sequence file.
1599         * @param value the stream to read.
1600         * @return a new option
1601         */
1602        public static Option stream(FSDataInputStream value) {
1603          return new InputStreamOption(value);
1604        }
1605        
1606        /**
1607         * Create an option to specify the starting byte to read.
1608         * @param value the number of bytes to skip over
1609         * @return a new option
1610         */
1611        public static Option start(long value) {
1612          return new StartOption(value);
1613        }
1614        
1615        /**
1616         * Create an option to specify the number of bytes to read.
1617         * @param value the number of bytes to read
1618         * @return a new option
1619         */
1620        public static Option length(long value) {
1621          return new LengthOption(value);
1622        }
1623        
1624        /**
1625         * Create an option with the buffer size for reading the given pathname.
1626         * @param value the number of bytes to buffer
1627         * @return a new option
1628         */
1629        public static Option bufferSize(int value) {
1630          return new BufferSizeOption(value);
1631        }
1632    
1633        private static class FileOption extends Options.PathOption 
1634                                        implements Option {
1635          private FileOption(Path value) {
1636            super(value);
1637          }
1638        }
1639        
1640        private static class InputStreamOption
1641            extends Options.FSDataInputStreamOption 
1642            implements Option {
1643          private InputStreamOption(FSDataInputStream value) {
1644            super(value);
1645          }
1646        }
1647    
1648        private static class StartOption extends Options.LongOption
1649                                         implements Option {
1650          private StartOption(long value) {
1651            super(value);
1652          }
1653        }
1654    
1655        private static class LengthOption extends Options.LongOption
1656                                          implements Option {
1657          private LengthOption(long value) {
1658            super(value);
1659          }
1660        }
1661    
1662        private static class BufferSizeOption extends Options.IntegerOption
1663                                          implements Option {
1664          private BufferSizeOption(int value) {
1665            super(value);
1666          }
1667        }
1668    
1669        // only used directly
1670        private static class OnlyHeaderOption extends Options.BooleanOption 
1671                                              implements Option {
1672          private OnlyHeaderOption() {
1673            super(true);
1674          }
1675        }
1676    
1677        public Reader(Configuration conf, Option... opts) throws IOException {
1678          // Look up the options, these are null if not set
1679          FileOption fileOpt = Options.getOption(FileOption.class, opts);
1680          InputStreamOption streamOpt = 
1681            Options.getOption(InputStreamOption.class, opts);
1682          StartOption startOpt = Options.getOption(StartOption.class, opts);
1683          LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1684          BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1685          OnlyHeaderOption headerOnly = 
1686            Options.getOption(OnlyHeaderOption.class, opts);
1687          // check for consistency
1688          if ((fileOpt == null) == (streamOpt == null)) {
1689            throw new 
1690              IllegalArgumentException("File or stream option must be specified");
1691          }
1692          if (fileOpt == null && bufOpt != null) {
1693            throw new IllegalArgumentException("buffer size can only be set when" +
1694                                               " a file is specified.");
1695          }
1696          // figure out the real values
1697          Path filename = null;
1698          FSDataInputStream file;
1699          final long len;
1700          if (fileOpt != null) {
1701            filename = fileOpt.getValue();
1702            FileSystem fs = filename.getFileSystem(conf);
1703            int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1704            len = null == lenOpt
1705              ? fs.getFileStatus(filename).getLen()
1706              : lenOpt.getValue();
1707            file = openFile(fs, filename, bufSize, len);
1708          } else {
1709            len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1710            file = streamOpt.getValue();
1711          }
1712          long start = startOpt == null ? 0 : startOpt.getValue();
1713          // really set up
1714          initialize(filename, file, start, len, conf, headerOnly != null);
1715        }
1716    
1717        /**
1718         * Construct a reader by opening a file from the given file system.
1719         * @param fs The file system used to open the file.
1720         * @param file The file being read.
1721         * @param conf Configuration
1722         * @throws IOException
1723         * @deprecated Use Reader(Configuration, Option...) instead.
1724         */
1725        @Deprecated
1726        public Reader(FileSystem fs, Path file, 
1727                      Configuration conf) throws IOException {
1728          this(conf, file(file.makeQualified(fs)));
1729        }
1730    
1731        /**
1732         * Construct a reader by the given input stream.
1733         * @param in An input stream.
1734         * @param buffersize unused
1735         * @param start The starting position.
1736         * @param length The length being read.
1737         * @param conf Configuration
1738         * @throws IOException
1739         * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1740         */
1741        @Deprecated
1742        public Reader(FSDataInputStream in, int buffersize,
1743            long start, long length, Configuration conf) throws IOException {
1744          this(conf, stream(in), start(start), length(length));
1745        }
1746    
1747        /** Common work of the constructors. */
1748        private void initialize(Path filename, FSDataInputStream in,
1749                                long start, long length, Configuration conf,
1750                                boolean tempReader) throws IOException {
1751          if (in == null) {
1752            throw new IllegalArgumentException("in == null");
1753          }
1754          this.filename = filename == null ? "<unknown>" : filename.toString();
1755          this.in = in;
1756          this.conf = conf;
1757          boolean succeeded = false;
1758          try {
1759            seek(start);
1760            this.end = this.in.getPos() + length;
1761            // if it wrapped around, use the max
1762            if (end < length) {
1763              end = Long.MAX_VALUE;
1764            }
1765            init(tempReader);
1766            succeeded = true;
1767          } finally {
1768            if (!succeeded) {
1769              IOUtils.cleanup(LOG, this.in);
1770            }
1771          }
1772        }
1773    
1774        /**
1775         * Override this method to specialize the type of
1776         * {@link FSDataInputStream} returned.
1777         * @param fs The file system used to open the file.
1778         * @param file The file being read.
1779         * @param bufferSize The buffer size used to read the file.
1780         * @param length The length being read if it is >= 0.  Otherwise,
1781         *               the length is not available.
1782         * @return The opened stream.
1783         * @throws IOException
1784         */
1785        protected FSDataInputStream openFile(FileSystem fs, Path file,
1786            int bufferSize, long length) throws IOException {
1787          return fs.open(file, bufferSize);
1788        }
1789        
1790        /**
1791         * Initialize the {@link Reader}
1792         * @param tmpReader <code>true</code> if we are constructing a temporary
1793         *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1794         *                  and hence do not initialize every component; 
1795         *                  <code>false</code> otherwise.
1796         * @throws IOException
1797         */
1798        private void init(boolean tempReader) throws IOException {
1799          byte[] versionBlock = new byte[VERSION.length];
1800          in.readFully(versionBlock);
1801    
1802          if ((versionBlock[0] != VERSION[0]) ||
1803              (versionBlock[1] != VERSION[1]) ||
1804              (versionBlock[2] != VERSION[2]))
1805            throw new IOException(this + " not a SequenceFile");
1806    
1807          // Set 'version'
1808          version = versionBlock[3];
1809          if (version > VERSION[3])
1810            throw new VersionMismatchException(VERSION[3], version);
1811    
1812          if (version < BLOCK_COMPRESS_VERSION) {
1813            UTF8 className = new UTF8();
1814    
1815            className.readFields(in);
1816            keyClassName = className.toString(); // key class name
1817    
1818            className.readFields(in);
1819            valClassName = className.toString(); // val class name
1820          } else {
1821            keyClassName = Text.readString(in);
1822            valClassName = Text.readString(in);
1823          }
1824    
1825          if (version > 2) {                          // if version > 2
1826            this.decompress = in.readBoolean();       // is compressed?
1827          } else {
1828            decompress = false;
1829          }
1830    
1831          if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1832            this.blockCompressed = in.readBoolean();  // is block-compressed?
1833          } else {
1834            blockCompressed = false;
1835          }
1836          
1837          // if version >= 5
1838          // setup the compression codec
1839          if (decompress) {
1840            if (version >= CUSTOM_COMPRESS_VERSION) {
1841              String codecClassname = Text.readString(in);
1842              try {
1843                Class<? extends CompressionCodec> codecClass
1844                  = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1845                this.codec = ReflectionUtils.newInstance(codecClass, conf);
1846              } catch (ClassNotFoundException cnfe) {
1847                throw new IllegalArgumentException("Unknown codec: " + 
1848                                                   codecClassname, cnfe);
1849              }
1850            } else {
1851              codec = new DefaultCodec();
1852              ((Configurable)codec).setConf(conf);
1853            }
1854          }
1855          
1856          this.metadata = new Metadata();
1857          if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1858            this.metadata.readFields(in);
1859          }
1860          
1861          if (version > 1) {                          // if version > 1
1862            in.readFully(sync);                       // read sync bytes
1863            headerEnd = in.getPos();                  // record end of header
1864          }
1865          
1866          // Initialize... *not* if this we are constructing a temporary Reader
1867          if (!tempReader) {
1868            valBuffer = new DataInputBuffer();
1869            if (decompress) {
1870              valDecompressor = CodecPool.getDecompressor(codec);
1871              valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1872              valIn = new DataInputStream(valInFilter);
1873            } else {
1874              valIn = valBuffer;
1875            }
1876    
1877            if (blockCompressed) {
1878              keyLenBuffer = new DataInputBuffer();
1879              keyBuffer = new DataInputBuffer();
1880              valLenBuffer = new DataInputBuffer();
1881    
1882              keyLenDecompressor = CodecPool.getDecompressor(codec);
1883              keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1884                                                       keyLenDecompressor);
1885              keyLenIn = new DataInputStream(keyLenInFilter);
1886    
1887              keyDecompressor = CodecPool.getDecompressor(codec);
1888              keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1889              keyIn = new DataInputStream(keyInFilter);
1890    
1891              valLenDecompressor = CodecPool.getDecompressor(codec);
1892              valLenInFilter = codec.createInputStream(valLenBuffer, 
1893                                                       valLenDecompressor);
1894              valLenIn = new DataInputStream(valLenInFilter);
1895            }
1896            
1897            SerializationFactory serializationFactory =
1898              new SerializationFactory(conf);
1899            this.keyDeserializer =
1900              getDeserializer(serializationFactory, getKeyClass());
1901            if (!blockCompressed) {
1902              this.keyDeserializer.open(valBuffer);
1903            } else {
1904              this.keyDeserializer.open(keyIn);
1905            }
1906            this.valDeserializer =
1907              getDeserializer(serializationFactory, getValueClass());
1908            this.valDeserializer.open(valIn);
1909          }
1910        }
1911        
1912        @SuppressWarnings("unchecked")
1913        private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1914          return sf.getDeserializer(c);
1915        }
1916        
1917        /** Close the file. */
1918        public synchronized void close() throws IOException {
1919          // Return the decompressors to the pool
1920          CodecPool.returnDecompressor(keyLenDecompressor);
1921          CodecPool.returnDecompressor(keyDecompressor);
1922          CodecPool.returnDecompressor(valLenDecompressor);
1923          CodecPool.returnDecompressor(valDecompressor);
1924          keyLenDecompressor = keyDecompressor = null;
1925          valLenDecompressor = valDecompressor = null;
1926          
1927          if (keyDeserializer != null) {
1928            keyDeserializer.close();
1929          }
1930          if (valDeserializer != null) {
1931            valDeserializer.close();
1932          }
1933          
1934          // Close the input-stream
1935          in.close();
1936        }
1937    
1938        /** Returns the name of the key class. */
1939        public String getKeyClassName() {
1940          return keyClassName;
1941        }
1942    
1943        /** Returns the class of keys in this file. */
1944        public synchronized Class<?> getKeyClass() {
1945          if (null == keyClass) {
1946            try {
1947              keyClass = WritableName.getClass(getKeyClassName(), conf);
1948            } catch (IOException e) {
1949              throw new RuntimeException(e);
1950            }
1951          }
1952          return keyClass;
1953        }
1954    
1955        /** Returns the name of the value class. */
1956        public String getValueClassName() {
1957          return valClassName;
1958        }
1959    
1960        /** Returns the class of values in this file. */
1961        public synchronized Class<?> getValueClass() {
1962          if (null == valClass) {
1963            try {
1964              valClass = WritableName.getClass(getValueClassName(), conf);
1965            } catch (IOException e) {
1966              throw new RuntimeException(e);
1967            }
1968          }
1969          return valClass;
1970        }
1971    
1972        /** Returns true if values are compressed. */
1973        public boolean isCompressed() { return decompress; }
1974        
1975        /** Returns true if records are block-compressed. */
1976        public boolean isBlockCompressed() { return blockCompressed; }
1977        
1978        /** Returns the compression codec of data in this file. */
1979        public CompressionCodec getCompressionCodec() { return codec; }
1980        
1981        /**
1982         * Get the compression type for this file.
1983         * @return the compression type
1984         */
1985        public CompressionType getCompressionType() {
1986          if (decompress) {
1987            return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
1988          } else {
1989            return CompressionType.NONE;
1990          }
1991        }
1992    
1993        /** Returns the metadata object of the file */
1994        public Metadata getMetadata() {
1995          return this.metadata;
1996        }
1997        
1998        /** Returns the configuration used for this file. */
1999        Configuration getConf() { return conf; }
2000        
2001        /** Read a compressed buffer */
2002        private synchronized void readBuffer(DataInputBuffer buffer, 
2003                                             CompressionInputStream filter) throws IOException {
2004          // Read data into a temporary buffer
2005          DataOutputBuffer dataBuffer = new DataOutputBuffer();
2006    
2007          try {
2008            int dataBufferLength = WritableUtils.readVInt(in);
2009            dataBuffer.write(in, dataBufferLength);
2010          
2011            // Set up 'buffer' connected to the input-stream
2012            buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2013          } finally {
2014            dataBuffer.close();
2015          }
2016    
2017          // Reset the codec
2018          filter.resetState();
2019        }
2020        
2021        /** Read the next 'compressed' block */
2022        private synchronized void readBlock() throws IOException {
2023          // Check if we need to throw away a whole block of 
2024          // 'values' due to 'lazy decompression' 
2025          if (lazyDecompress && !valuesDecompressed) {
2026            in.seek(WritableUtils.readVInt(in)+in.getPos());
2027            in.seek(WritableUtils.readVInt(in)+in.getPos());
2028          }
2029          
2030          // Reset internal states
2031          noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2032          valuesDecompressed = false;
2033    
2034          //Process sync
2035          if (sync != null) {
2036            in.readInt();
2037            in.readFully(syncCheck);                // read syncCheck
2038            if (!Arrays.equals(sync, syncCheck))    // check it
2039              throw new IOException("File is corrupt!");
2040          }
2041          syncSeen = true;
2042    
2043          // Read number of records in this block
2044          noBufferedRecords = WritableUtils.readVInt(in);
2045          
2046          // Read key lengths and keys
2047          readBuffer(keyLenBuffer, keyLenInFilter);
2048          readBuffer(keyBuffer, keyInFilter);
2049          noBufferedKeys = noBufferedRecords;
2050          
2051          // Read value lengths and values
2052          if (!lazyDecompress) {
2053            readBuffer(valLenBuffer, valLenInFilter);
2054            readBuffer(valBuffer, valInFilter);
2055            noBufferedValues = noBufferedRecords;
2056            valuesDecompressed = true;
2057          }
2058        }
2059    
2060        /** 
2061         * Position valLenIn/valIn to the 'value' 
2062         * corresponding to the 'current' key 
2063         */
2064        private synchronized void seekToCurrentValue() throws IOException {
2065          if (!blockCompressed) {
2066            if (decompress) {
2067              valInFilter.resetState();
2068            }
2069            valBuffer.reset();
2070          } else {
2071            // Check if this is the first value in the 'block' to be read
2072            if (lazyDecompress && !valuesDecompressed) {
2073              // Read the value lengths and values
2074              readBuffer(valLenBuffer, valLenInFilter);
2075              readBuffer(valBuffer, valInFilter);
2076              noBufferedValues = noBufferedRecords;
2077              valuesDecompressed = true;
2078            }
2079            
2080            // Calculate the no. of bytes to skip
2081            // Note: 'current' key has already been read!
2082            int skipValBytes = 0;
2083            int currentKey = noBufferedKeys + 1;          
2084            for (int i=noBufferedValues; i > currentKey; --i) {
2085              skipValBytes += WritableUtils.readVInt(valLenIn);
2086              --noBufferedValues;
2087            }
2088            
2089            // Skip to the 'val' corresponding to 'current' key
2090            if (skipValBytes > 0) {
2091              if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2092                throw new IOException("Failed to seek to " + currentKey + 
2093                                      "(th) value!");
2094              }
2095            }
2096          }
2097        }
2098    
2099        /**
2100         * Get the 'value' corresponding to the last read 'key'.
2101         * @param val : The 'value' to be read.
2102         * @throws IOException
2103         */
2104        public synchronized void getCurrentValue(Writable val) 
2105          throws IOException {
2106          if (val instanceof Configurable) {
2107            ((Configurable) val).setConf(this.conf);
2108          }
2109    
2110          // Position stream to 'current' value
2111          seekToCurrentValue();
2112    
2113          if (!blockCompressed) {
2114            val.readFields(valIn);
2115            
2116            if (valIn.read() > 0) {
2117              LOG.info("available bytes: " + valIn.available());
2118              throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2119                                    + " bytes, should read " +
2120                                    (valBuffer.getLength()-keyLength));
2121            }
2122          } else {
2123            // Get the value
2124            int valLength = WritableUtils.readVInt(valLenIn);
2125            val.readFields(valIn);
2126            
2127            // Read another compressed 'value'
2128            --noBufferedValues;
2129            
2130            // Sanity check
2131            if ((valLength < 0) && LOG.isDebugEnabled()) {
2132              LOG.debug(val + " is a zero-length value");
2133            }
2134          }
2135    
2136        }
2137        
2138        /**
2139         * Get the 'value' corresponding to the last read 'key'.
2140         * @param val : The 'value' to be read.
2141         * @throws IOException
2142         */
2143        public synchronized Object getCurrentValue(Object val) 
2144          throws IOException {
2145          if (val instanceof Configurable) {
2146            ((Configurable) val).setConf(this.conf);
2147          }
2148    
2149          // Position stream to 'current' value
2150          seekToCurrentValue();
2151    
2152          if (!blockCompressed) {
2153            val = deserializeValue(val);
2154            
2155            if (valIn.read() > 0) {
2156              LOG.info("available bytes: " + valIn.available());
2157              throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2158                                    + " bytes, should read " +
2159                                    (valBuffer.getLength()-keyLength));
2160            }
2161          } else {
2162            // Get the value
2163            int valLength = WritableUtils.readVInt(valLenIn);
2164            val = deserializeValue(val);
2165            
2166            // Read another compressed 'value'
2167            --noBufferedValues;
2168            
2169            // Sanity check
2170            if ((valLength < 0) && LOG.isDebugEnabled()) {
2171              LOG.debug(val + " is a zero-length value");
2172            }
2173          }
2174          return val;
2175    
2176        }
2177    
2178        @SuppressWarnings("unchecked")
2179        private Object deserializeValue(Object val) throws IOException {
2180          return valDeserializer.deserialize(val);
2181        }
2182        
2183        /** Read the next key in the file into <code>key</code>, skipping its
2184         * value.  True if another entry exists, and false at end of file. */
2185        public synchronized boolean next(Writable key) throws IOException {
2186          if (key.getClass() != getKeyClass())
2187            throw new IOException("wrong key class: "+key.getClass().getName()
2188                                  +" is not "+keyClass);
2189    
2190          if (!blockCompressed) {
2191            outBuf.reset();
2192            
2193            keyLength = next(outBuf);
2194            if (keyLength < 0)
2195              return false;
2196            
2197            valBuffer.reset(outBuf.getData(), outBuf.getLength());
2198            
2199            key.readFields(valBuffer);
2200            valBuffer.mark(0);
2201            if (valBuffer.getPosition() != keyLength)
2202              throw new IOException(key + " read " + valBuffer.getPosition()
2203                                    + " bytes, should read " + keyLength);
2204          } else {
2205            //Reset syncSeen
2206            syncSeen = false;
2207            
2208            if (noBufferedKeys == 0) {
2209              try {
2210                readBlock();
2211              } catch (EOFException eof) {
2212                return false;
2213              }
2214            }
2215            
2216            int keyLength = WritableUtils.readVInt(keyLenIn);
2217            
2218            // Sanity check
2219            if (keyLength < 0) {
2220              return false;
2221            }
2222            
2223            //Read another compressed 'key'
2224            key.readFields(keyIn);
2225            --noBufferedKeys;
2226          }
2227    
2228          return true;
2229        }
2230    
2231        /** Read the next key/value pair in the file into <code>key</code> and
2232         * <code>val</code>.  Returns true if such a pair exists and false when at
2233         * end of file */
2234        public synchronized boolean next(Writable key, Writable val)
2235          throws IOException {
2236          if (val.getClass() != getValueClass())
2237            throw new IOException("wrong value class: "+val+" is not "+valClass);
2238    
2239          boolean more = next(key);
2240          
2241          if (more) {
2242            getCurrentValue(val);
2243          }
2244    
2245          return more;
2246        }
2247        
2248        /**
2249         * Read and return the next record length, potentially skipping over 
2250         * a sync block.
2251         * @return the length of the next record or -1 if there is no next record
2252         * @throws IOException
2253         */
2254        private synchronized int readRecordLength() throws IOException {
2255          if (in.getPos() >= end) {
2256            return -1;
2257          }      
2258          int length = in.readInt();
2259          if (version > 1 && sync != null &&
2260              length == SYNC_ESCAPE) {              // process a sync entry
2261            in.readFully(syncCheck);                // read syncCheck
2262            if (!Arrays.equals(sync, syncCheck))    // check it
2263              throw new IOException("File is corrupt!");
2264            syncSeen = true;
2265            if (in.getPos() >= end) {
2266              return -1;
2267            }
2268            length = in.readInt();                  // re-read length
2269          } else {
2270            syncSeen = false;
2271          }
2272          
2273          return length;
2274        }
2275        
2276        /** Read the next key/value pair in the file into <code>buffer</code>.
2277         * Returns the length of the key read, or -1 if at end of file.  The length
2278         * of the value may be computed by calling buffer.getLength() before and
2279         * after calls to this method. */
2280        /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2281        @Deprecated
2282        synchronized int next(DataOutputBuffer buffer) throws IOException {
2283          // Unsupported for block-compressed sequence files
2284          if (blockCompressed) {
2285            throw new IOException("Unsupported call for block-compressed" +
2286                                  " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2287          }
2288          try {
2289            int length = readRecordLength();
2290            if (length == -1) {
2291              return -1;
2292            }
2293            int keyLength = in.readInt();
2294            buffer.write(in, length);
2295            return keyLength;
2296          } catch (ChecksumException e) {             // checksum failure
2297            handleChecksumException(e);
2298            return next(buffer);
2299          }
2300        }
2301    
2302        public ValueBytes createValueBytes() {
2303          ValueBytes val = null;
2304          if (!decompress || blockCompressed) {
2305            val = new UncompressedBytes();
2306          } else {
2307            val = new CompressedBytes(codec);
2308          }
2309          return val;
2310        }
2311    
2312        /**
2313         * Read 'raw' records.
2314         * @param key - The buffer into which the key is read
2315         * @param val - The 'raw' value
2316         * @return Returns the total record length or -1 for end of file
2317         * @throws IOException
2318         */
2319        public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2320          throws IOException {
2321          if (!blockCompressed) {
2322            int length = readRecordLength();
2323            if (length == -1) {
2324              return -1;
2325            }
2326            int keyLength = in.readInt();
2327            int valLength = length - keyLength;
2328            key.write(in, keyLength);
2329            if (decompress) {
2330              CompressedBytes value = (CompressedBytes)val;
2331              value.reset(in, valLength);
2332            } else {
2333              UncompressedBytes value = (UncompressedBytes)val;
2334              value.reset(in, valLength);
2335            }
2336            
2337            return length;
2338          } else {
2339            //Reset syncSeen
2340            syncSeen = false;
2341            
2342            // Read 'key'
2343            if (noBufferedKeys == 0) {
2344              if (in.getPos() >= end) 
2345                return -1;
2346    
2347              try { 
2348                readBlock();
2349              } catch (EOFException eof) {
2350                return -1;
2351              }
2352            }
2353            int keyLength = WritableUtils.readVInt(keyLenIn);
2354            if (keyLength < 0) {
2355              throw new IOException("zero length key found!");
2356            }
2357            key.write(keyIn, keyLength);
2358            --noBufferedKeys;
2359            
2360            // Read raw 'value'
2361            seekToCurrentValue();
2362            int valLength = WritableUtils.readVInt(valLenIn);
2363            UncompressedBytes rawValue = (UncompressedBytes)val;
2364            rawValue.reset(valIn, valLength);
2365            --noBufferedValues;
2366            
2367            return (keyLength+valLength);
2368          }
2369          
2370        }
2371    
2372        /**
2373         * Read 'raw' keys.
2374         * @param key - The buffer into which the key is read
2375         * @return Returns the key length or -1 for end of file
2376         * @throws IOException
2377         */
2378        public synchronized int nextRawKey(DataOutputBuffer key) 
2379          throws IOException {
2380          if (!blockCompressed) {
2381            recordLength = readRecordLength();
2382            if (recordLength == -1) {
2383              return -1;
2384            }
2385            keyLength = in.readInt();
2386            key.write(in, keyLength);
2387            return keyLength;
2388          } else {
2389            //Reset syncSeen
2390            syncSeen = false;
2391            
2392            // Read 'key'
2393            if (noBufferedKeys == 0) {
2394              if (in.getPos() >= end) 
2395                return -1;
2396    
2397              try { 
2398                readBlock();
2399              } catch (EOFException eof) {
2400                return -1;
2401              }
2402            }
2403            int keyLength = WritableUtils.readVInt(keyLenIn);
2404            if (keyLength < 0) {
2405              throw new IOException("zero length key found!");
2406            }
2407            key.write(keyIn, keyLength);
2408            --noBufferedKeys;
2409            
2410            return keyLength;
2411          }
2412          
2413        }
2414    
2415        /** Read the next key in the file, skipping its
2416         * value.  Return null at end of file. */
2417        public synchronized Object next(Object key) throws IOException {
2418          if (key != null && key.getClass() != getKeyClass()) {
2419            throw new IOException("wrong key class: "+key.getClass().getName()
2420                                  +" is not "+keyClass);
2421          }
2422    
2423          if (!blockCompressed) {
2424            outBuf.reset();
2425            
2426            keyLength = next(outBuf);
2427            if (keyLength < 0)
2428              return null;
2429            
2430            valBuffer.reset(outBuf.getData(), outBuf.getLength());
2431            
2432            key = deserializeKey(key);
2433            valBuffer.mark(0);
2434            if (valBuffer.getPosition() != keyLength)
2435              throw new IOException(key + " read " + valBuffer.getPosition()
2436                                    + " bytes, should read " + keyLength);
2437          } else {
2438            //Reset syncSeen
2439            syncSeen = false;
2440            
2441            if (noBufferedKeys == 0) {
2442              try {
2443                readBlock();
2444              } catch (EOFException eof) {
2445                return null;
2446              }
2447            }
2448            
2449            int keyLength = WritableUtils.readVInt(keyLenIn);
2450            
2451            // Sanity check
2452            if (keyLength < 0) {
2453              return null;
2454            }
2455            
2456            //Read another compressed 'key'
2457            key = deserializeKey(key);
2458            --noBufferedKeys;
2459          }
2460    
2461          return key;
2462        }
2463    
2464        @SuppressWarnings("unchecked")
2465        private Object deserializeKey(Object key) throws IOException {
2466          return keyDeserializer.deserialize(key);
2467        }
2468    
2469        /**
2470         * Read 'raw' values.
2471         * @param val - The 'raw' value
2472         * @return Returns the value length
2473         * @throws IOException
2474         */
2475        public synchronized int nextRawValue(ValueBytes val) 
2476          throws IOException {
2477          
2478          // Position stream to current value
2479          seekToCurrentValue();
2480     
2481          if (!blockCompressed) {
2482            int valLength = recordLength - keyLength;
2483            if (decompress) {
2484              CompressedBytes value = (CompressedBytes)val;
2485              value.reset(in, valLength);
2486            } else {
2487              UncompressedBytes value = (UncompressedBytes)val;
2488              value.reset(in, valLength);
2489            }
2490             
2491            return valLength;
2492          } else {
2493            int valLength = WritableUtils.readVInt(valLenIn);
2494            UncompressedBytes rawValue = (UncompressedBytes)val;
2495            rawValue.reset(valIn, valLength);
2496            --noBufferedValues;
2497            return valLength;
2498          }
2499          
2500        }
2501    
2502        private void handleChecksumException(ChecksumException e)
2503          throws IOException {
2504          if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2505            LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2506            sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2507          } else {
2508            throw e;
2509          }
2510        }
2511    
2512        /** disables sync. often invoked for tmp files */
2513        synchronized void ignoreSync() {
2514          sync = null;
2515        }
2516        
2517        /** Set the current byte position in the input file.
2518         *
2519         * <p>The position passed must be a position returned by {@link
2520         * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2521         * position, use {@link SequenceFile.Reader#sync(long)}.
2522         */
2523        public synchronized void seek(long position) throws IOException {
2524          in.seek(position);
2525          if (blockCompressed) {                      // trigger block read
2526            noBufferedKeys = 0;
2527            valuesDecompressed = true;
2528          }
2529        }
2530    
2531        /** Seek to the next sync mark past a given position.*/
2532        public synchronized void sync(long position) throws IOException {
2533          if (position+SYNC_SIZE >= end) {
2534            seek(end);
2535            return;
2536          }
2537    
2538          if (position < headerEnd) {
2539            // seek directly to first record
2540            in.seek(headerEnd);
2541            // note the sync marker "seen" in the header
2542            syncSeen = true;
2543            return;
2544          }
2545    
2546          try {
2547            seek(position+4);                         // skip escape
2548            in.readFully(syncCheck);
2549            int syncLen = sync.length;
2550            for (int i = 0; in.getPos() < end; i++) {
2551              int j = 0;
2552              for (; j < syncLen; j++) {
2553                if (sync[j] != syncCheck[(i+j)%syncLen])
2554                  break;
2555              }
2556              if (j == syncLen) {
2557                in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2558                return;
2559              }
2560              syncCheck[i%syncLen] = in.readByte();
2561            }
2562          } catch (ChecksumException e) {             // checksum failure
2563            handleChecksumException(e);
2564          }
2565        }
2566    
2567        /** Returns true iff the previous call to next passed a sync mark.*/
2568        public synchronized boolean syncSeen() { return syncSeen; }
2569    
2570        /** Return the current byte position in the input file. */
2571        public synchronized long getPosition() throws IOException {
2572          return in.getPos();
2573        }
2574    
2575        /** Returns the name of the file. */
2576        public String toString() {
2577          return filename;
2578        }
2579    
2580      }
2581    
2582      /** Sorts key/value pairs in a sequence-format file.
2583       *
2584       * <p>For best performance, applications should make sure that the {@link
2585       * Writable#readFields(DataInput)} implementation of their keys is
2586       * very efficient.  In particular, it should avoid allocating memory.
2587       */
2588      public static class Sorter {
2589    
2590        private RawComparator comparator;
2591    
2592        private MergeSort mergeSort; //the implementation of merge sort
2593        
2594        private Path[] inFiles;                     // when merging or sorting
2595    
2596        private Path outFile;
2597    
2598        private int memory; // bytes
2599        private int factor; // merged per pass
2600    
2601        private FileSystem fs = null;
2602    
2603        private Class keyClass;
2604        private Class valClass;
2605    
2606        private Configuration conf;
2607        private Metadata metadata;
2608        
2609        private Progressable progressable = null;
2610    
2611        /** Sort and merge files containing the named classes. */
2612        public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2613                      Class valClass, Configuration conf)  {
2614          this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
2615        }
2616    
2617        /** Sort and merge using an arbitrary {@link RawComparator}. */
2618        public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2619                      Class valClass, Configuration conf) {
2620          this(fs, comparator, keyClass, valClass, conf, new Metadata());
2621        }
2622    
2623        /** Sort and merge using an arbitrary {@link RawComparator}. */
2624        public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2625                      Class valClass, Configuration conf, Metadata metadata) {
2626          this.fs = fs;
2627          this.comparator = comparator;
2628          this.keyClass = keyClass;
2629          this.valClass = valClass;
2630          this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2631          this.factor = conf.getInt("io.sort.factor", 100);
2632          this.conf = conf;
2633          this.metadata = metadata;
2634        }
2635    
2636        /** Set the number of streams to merge at once.*/
2637        public void setFactor(int factor) { this.factor = factor; }
2638    
2639        /** Get the number of streams to merge at once.*/
2640        public int getFactor() { return factor; }
2641    
2642        /** Set the total amount of buffer memory, in bytes.*/
2643        public void setMemory(int memory) { this.memory = memory; }
2644    
2645        /** Get the total amount of buffer memory, in bytes.*/
2646        public int getMemory() { return memory; }
2647    
2648        /** Set the progressable object in order to report progress. */
2649        public void setProgressable(Progressable progressable) {
2650          this.progressable = progressable;
2651        }
2652        
2653        /** 
2654         * Perform a file sort from a set of input files into an output file.
2655         * @param inFiles the files to be sorted
2656         * @param outFile the sorted output file
2657         * @param deleteInput should the input files be deleted as they are read?
2658         */
2659        public void sort(Path[] inFiles, Path outFile,
2660                         boolean deleteInput) throws IOException {
2661          if (fs.exists(outFile)) {
2662            throw new IOException("already exists: " + outFile);
2663          }
2664    
2665          this.inFiles = inFiles;
2666          this.outFile = outFile;
2667    
2668          int segments = sortPass(deleteInput);
2669          if (segments > 1) {
2670            mergePass(outFile.getParent());
2671          }
2672        }
2673    
2674        /** 
2675         * Perform a file sort from a set of input files and return an iterator.
2676         * @param inFiles the files to be sorted
2677         * @param tempDir the directory where temp files are created during sort
2678         * @param deleteInput should the input files be deleted as they are read?
2679         * @return iterator the RawKeyValueIterator
2680         */
2681        public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2682                                                  boolean deleteInput) throws IOException {
2683          Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2684          if (fs.exists(outFile)) {
2685            throw new IOException("already exists: " + outFile);
2686          }
2687          this.inFiles = inFiles;
2688          //outFile will basically be used as prefix for temp files in the cases
2689          //where sort outputs multiple sorted segments. For the single segment
2690          //case, the outputFile itself will contain the sorted data for that
2691          //segment
2692          this.outFile = outFile;
2693    
2694          int segments = sortPass(deleteInput);
2695          if (segments > 1)
2696            return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2697                         tempDir);
2698          else if (segments == 1)
2699            return merge(new Path[]{outFile}, true, tempDir);
2700          else return null;
2701        }
2702    
2703        /**
2704         * The backwards compatible interface to sort.
2705         * @param inFile the input file to sort
2706         * @param outFile the sorted output file
2707         */
2708        public void sort(Path inFile, Path outFile) throws IOException {
2709          sort(new Path[]{inFile}, outFile, false);
2710        }
2711        
2712        private int sortPass(boolean deleteInput) throws IOException {
2713          if(LOG.isDebugEnabled()) {
2714            LOG.debug("running sort pass");
2715          }
2716          SortPass sortPass = new SortPass();         // make the SortPass
2717          sortPass.setProgressable(progressable);
2718          mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2719          try {
2720            return sortPass.run(deleteInput);         // run it
2721          } finally {
2722            sortPass.close();                         // close it
2723          }
2724        }
2725    
2726        private class SortPass {
2727          private int memoryLimit = memory/4;
2728          private int recordLimit = 1000000;
2729          
2730          private DataOutputBuffer rawKeys = new DataOutputBuffer();
2731          private byte[] rawBuffer;
2732    
2733          private int[] keyOffsets = new int[1024];
2734          private int[] pointers = new int[keyOffsets.length];
2735          private int[] pointersCopy = new int[keyOffsets.length];
2736          private int[] keyLengths = new int[keyOffsets.length];
2737          private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2738          
2739          private ArrayList segmentLengths = new ArrayList();
2740          
2741          private Reader in = null;
2742          private FSDataOutputStream out = null;
2743          private FSDataOutputStream indexOut = null;
2744          private Path outName;
2745    
2746          private Progressable progressable = null;
2747    
2748          public int run(boolean deleteInput) throws IOException {
2749            int segments = 0;
2750            int currentFile = 0;
2751            boolean atEof = (currentFile >= inFiles.length);
2752            CompressionType compressionType;
2753            CompressionCodec codec = null;
2754            segmentLengths.clear();
2755            if (atEof) {
2756              return 0;
2757            }
2758            
2759            // Initialize
2760            in = new Reader(fs, inFiles[currentFile], conf);
2761            compressionType = in.getCompressionType();
2762            codec = in.getCompressionCodec();
2763            
2764            for (int i=0; i < rawValues.length; ++i) {
2765              rawValues[i] = null;
2766            }
2767            
2768            while (!atEof) {
2769              int count = 0;
2770              int bytesProcessed = 0;
2771              rawKeys.reset();
2772              while (!atEof && 
2773                     bytesProcessed < memoryLimit && count < recordLimit) {
2774    
2775                // Read a record into buffer
2776                // Note: Attempt to re-use 'rawValue' as far as possible
2777                int keyOffset = rawKeys.getLength();       
2778                ValueBytes rawValue = 
2779                  (count == keyOffsets.length || rawValues[count] == null) ? 
2780                  in.createValueBytes() : 
2781                  rawValues[count];
2782                int recordLength = in.nextRaw(rawKeys, rawValue);
2783                if (recordLength == -1) {
2784                  in.close();
2785                  if (deleteInput) {
2786                    fs.delete(inFiles[currentFile], true);
2787                  }
2788                  currentFile += 1;
2789                  atEof = currentFile >= inFiles.length;
2790                  if (!atEof) {
2791                    in = new Reader(fs, inFiles[currentFile], conf);
2792                  } else {
2793                    in = null;
2794                  }
2795                  continue;
2796                }
2797    
2798                int keyLength = rawKeys.getLength() - keyOffset;
2799    
2800                if (count == keyOffsets.length)
2801                  grow();
2802    
2803                keyOffsets[count] = keyOffset;                // update pointers
2804                pointers[count] = count;
2805                keyLengths[count] = keyLength;
2806                rawValues[count] = rawValue;
2807    
2808                bytesProcessed += recordLength; 
2809                count++;
2810              }
2811    
2812              // buffer is full -- sort & flush it
2813              if(LOG.isDebugEnabled()) {
2814                LOG.debug("flushing segment " + segments);
2815              }
2816              rawBuffer = rawKeys.getData();
2817              sort(count);
2818              // indicate we're making progress
2819              if (progressable != null) {
2820                progressable.progress();
2821              }
2822              flush(count, bytesProcessed, compressionType, codec, 
2823                    segments==0 && atEof);
2824              segments++;
2825            }
2826            return segments;
2827          }
2828    
2829          public void close() throws IOException {
2830            if (in != null) {
2831              in.close();
2832            }
2833            if (out != null) {
2834              out.close();
2835            }
2836            if (indexOut != null) {
2837              indexOut.close();
2838            }
2839          }
2840    
2841          private void grow() {
2842            int newLength = keyOffsets.length * 3 / 2;
2843            keyOffsets = grow(keyOffsets, newLength);
2844            pointers = grow(pointers, newLength);
2845            pointersCopy = new int[newLength];
2846            keyLengths = grow(keyLengths, newLength);
2847            rawValues = grow(rawValues, newLength);
2848          }
2849    
2850          private int[] grow(int[] old, int newLength) {
2851            int[] result = new int[newLength];
2852            System.arraycopy(old, 0, result, 0, old.length);
2853            return result;
2854          }
2855          
2856          private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2857            ValueBytes[] result = new ValueBytes[newLength];
2858            System.arraycopy(old, 0, result, 0, old.length);
2859            for (int i=old.length; i < newLength; ++i) {
2860              result[i] = null;
2861            }
2862            return result;
2863          }
2864    
2865          private void flush(int count, int bytesProcessed, 
2866                             CompressionType compressionType, 
2867                             CompressionCodec codec, 
2868                             boolean done) throws IOException {
2869            if (out == null) {
2870              outName = done ? outFile : outFile.suffix(".0");
2871              out = fs.create(outName);
2872              if (!done) {
2873                indexOut = fs.create(outName.suffix(".index"));
2874              }
2875            }
2876    
2877            long segmentStart = out.getPos();
2878            Writer writer = createWriter(conf, Writer.stream(out), 
2879                Writer.keyClass(keyClass), Writer.valueClass(valClass),
2880                Writer.compression(compressionType, codec),
2881                Writer.metadata(done ? metadata : new Metadata()));
2882            
2883            if (!done) {
2884              writer.sync = null;                     // disable sync on temp files
2885            }
2886    
2887            for (int i = 0; i < count; i++) {         // write in sorted order
2888              int p = pointers[i];
2889              writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2890            }
2891            writer.close();
2892            
2893            if (!done) {
2894              // Save the segment length
2895              WritableUtils.writeVLong(indexOut, segmentStart);
2896              WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2897              indexOut.flush();
2898            }
2899          }
2900    
2901          private void sort(int count) {
2902            System.arraycopy(pointers, 0, pointersCopy, 0, count);
2903            mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2904          }
2905          class SeqFileComparator implements Comparator<IntWritable> {
2906            public int compare(IntWritable I, IntWritable J) {
2907              return comparator.compare(rawBuffer, keyOffsets[I.get()], 
2908                                        keyLengths[I.get()], rawBuffer, 
2909                                        keyOffsets[J.get()], keyLengths[J.get()]);
2910            }
2911          }
2912          
2913          /** set the progressable object in order to report progress */
2914          public void setProgressable(Progressable progressable)
2915          {
2916            this.progressable = progressable;
2917          }
2918          
2919        } // SequenceFile.Sorter.SortPass
2920    
2921        /** The interface to iterate over raw keys/values of SequenceFiles. */
2922        public static interface RawKeyValueIterator {
2923          /** Gets the current raw key
2924           * @return DataOutputBuffer
2925           * @throws IOException
2926           */
2927          DataOutputBuffer getKey() throws IOException; 
2928          /** Gets the current raw value
2929           * @return ValueBytes 
2930           * @throws IOException
2931           */
2932          ValueBytes getValue() throws IOException; 
2933          /** Sets up the current key and value (for getKey and getValue)
2934           * @return true if there exists a key/value, false otherwise 
2935           * @throws IOException
2936           */
2937          boolean next() throws IOException;
2938          /** closes the iterator so that the underlying streams can be closed
2939           * @throws IOException
2940           */
2941          void close() throws IOException;
2942          /** Gets the Progress object; this has a float (0.0 - 1.0) 
2943           * indicating the bytes processed by the iterator so far
2944           */
2945          Progress getProgress();
2946        }    
2947        
2948        /**
2949         * Merges the list of segments of type <code>SegmentDescriptor</code>
2950         * @param segments the list of SegmentDescriptors
2951         * @param tmpDir the directory to write temporary files into
2952         * @return RawKeyValueIterator
2953         * @throws IOException
2954         */
2955        public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
2956                                         Path tmpDir) 
2957          throws IOException {
2958          // pass in object to report progress, if present
2959          MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
2960          return mQueue.merge();
2961        }
2962    
2963        /**
2964         * Merges the contents of files passed in Path[] using a max factor value
2965         * that is already set
2966         * @param inNames the array of path names
2967         * @param deleteInputs true if the input files should be deleted when 
2968         * unnecessary
2969         * @param tmpDir the directory to write temporary files into
2970         * @return RawKeyValueIteratorMergeQueue
2971         * @throws IOException
2972         */
2973        public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
2974                                         Path tmpDir) 
2975          throws IOException {
2976          return merge(inNames, deleteInputs, 
2977                       (inNames.length < factor) ? inNames.length : factor,
2978                       tmpDir);
2979        }
2980    
2981        /**
2982         * Merges the contents of files passed in Path[]
2983         * @param inNames the array of path names
2984         * @param deleteInputs true if the input files should be deleted when 
2985         * unnecessary
2986         * @param factor the factor that will be used as the maximum merge fan-in
2987         * @param tmpDir the directory to write temporary files into
2988         * @return RawKeyValueIteratorMergeQueue
2989         * @throws IOException
2990         */
2991        public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
2992                                         int factor, Path tmpDir) 
2993          throws IOException {
2994          //get the segments from inNames
2995          ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
2996          for (int i = 0; i < inNames.length; i++) {
2997            SegmentDescriptor s = new SegmentDescriptor(0,
2998                fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
2999            s.preserveInput(!deleteInputs);
3000            s.doSync();
3001            a.add(s);
3002          }
3003          this.factor = factor;
3004          MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3005          return mQueue.merge();
3006        }
3007    
3008        /**
3009         * Merges the contents of files passed in Path[]
3010         * @param inNames the array of path names
3011         * @param tempDir the directory for creating temp files during merge
3012         * @param deleteInputs true if the input files should be deleted when 
3013         * unnecessary
3014         * @return RawKeyValueIteratorMergeQueue
3015         * @throws IOException
3016         */
3017        public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3018                                         boolean deleteInputs) 
3019          throws IOException {
3020          //outFile will basically be used as prefix for temp files for the
3021          //intermediate merge outputs           
3022          this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3023          //get the segments from inNames
3024          ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3025          for (int i = 0; i < inNames.length; i++) {
3026            SegmentDescriptor s = new SegmentDescriptor(0,
3027                fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3028            s.preserveInput(!deleteInputs);
3029            s.doSync();
3030            a.add(s);
3031          }
3032          factor = (inNames.length < factor) ? inNames.length : factor;
3033          // pass in object to report progress, if present
3034          MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3035          return mQueue.merge();
3036        }
3037    
3038        /**
3039         * Clones the attributes (like compression of the input file and creates a 
3040         * corresponding Writer
3041         * @param inputFile the path of the input file whose attributes should be 
3042         * cloned
3043         * @param outputFile the path of the output file 
3044         * @param prog the Progressable to report status during the file write
3045         * @return Writer
3046         * @throws IOException
3047         */
3048        public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3049                                          Progressable prog) throws IOException {
3050          Reader reader = new Reader(conf,
3051                                     Reader.file(inputFile),
3052                                     new Reader.OnlyHeaderOption());
3053          CompressionType compress = reader.getCompressionType();
3054          CompressionCodec codec = reader.getCompressionCodec();
3055          reader.close();
3056    
3057          Writer writer = createWriter(conf, 
3058                                       Writer.file(outputFile), 
3059                                       Writer.keyClass(keyClass), 
3060                                       Writer.valueClass(valClass), 
3061                                       Writer.compression(compress, codec), 
3062                                       Writer.progressable(prog));
3063          return writer;
3064        }
3065    
3066        /**
3067         * Writes records from RawKeyValueIterator into a file represented by the 
3068         * passed writer
3069         * @param records the RawKeyValueIterator
3070         * @param writer the Writer created earlier 
3071         * @throws IOException
3072         */
3073        public void writeFile(RawKeyValueIterator records, Writer writer) 
3074          throws IOException {
3075          while(records.next()) {
3076            writer.appendRaw(records.getKey().getData(), 0, 
3077                             records.getKey().getLength(), records.getValue());
3078          }
3079          writer.sync();
3080        }
3081            
3082        /** Merge the provided files.
3083         * @param inFiles the array of input path names
3084         * @param outFile the final output file
3085         * @throws IOException
3086         */
3087        public void merge(Path[] inFiles, Path outFile) throws IOException {
3088          if (fs.exists(outFile)) {
3089            throw new IOException("already exists: " + outFile);
3090          }
3091          RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3092          Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3093          
3094          writeFile(r, writer);
3095    
3096          writer.close();
3097        }
3098    
3099        /** sort calls this to generate the final merged output */
3100        private int mergePass(Path tmpDir) throws IOException {
3101          if(LOG.isDebugEnabled()) {
3102            LOG.debug("running merge pass");
3103          }
3104          Writer writer = cloneFileAttributes(
3105                                              outFile.suffix(".0"), outFile, null);
3106          RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3107                                        outFile.suffix(".0.index"), tmpDir);
3108          writeFile(r, writer);
3109    
3110          writer.close();
3111          return 0;
3112        }
3113    
3114        /** Used by mergePass to merge the output of the sort
3115         * @param inName the name of the input file containing sorted segments
3116         * @param indexIn the offsets of the sorted segments
3117         * @param tmpDir the relative directory to store intermediate results in
3118         * @return RawKeyValueIterator
3119         * @throws IOException
3120         */
3121        private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3122          throws IOException {
3123          //get the segments from indexIn
3124          //we create a SegmentContainer so that we can track segments belonging to
3125          //inName and delete inName as soon as we see that we have looked at all
3126          //the contained segments during the merge process & hence don't need 
3127          //them anymore
3128          SegmentContainer container = new SegmentContainer(inName, indexIn);
3129          MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3130          return mQueue.merge();
3131        }
3132        
3133        /** This class implements the core of the merge logic */
3134        private class MergeQueue extends PriorityQueue 
3135          implements RawKeyValueIterator {
3136          private boolean compress;
3137          private boolean blockCompress;
3138          private DataOutputBuffer rawKey = new DataOutputBuffer();
3139          private ValueBytes rawValue;
3140          private long totalBytesProcessed;
3141          private float progPerByte;
3142          private Progress mergeProgress = new Progress();
3143          private Path tmpDir;
3144          private Progressable progress = null; //handle to the progress reporting object
3145          private SegmentDescriptor minSegment;
3146          
3147          //a TreeMap used to store the segments sorted by size (segment offset and
3148          //segment path name is used to break ties between segments of same sizes)
3149          private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3150            new TreeMap<SegmentDescriptor, Void>();
3151                
3152          @SuppressWarnings("unchecked")
3153          public void put(SegmentDescriptor stream) throws IOException {
3154            if (size() == 0) {
3155              compress = stream.in.isCompressed();
3156              blockCompress = stream.in.isBlockCompressed();
3157            } else if (compress != stream.in.isCompressed() || 
3158                       blockCompress != stream.in.isBlockCompressed()) {
3159              throw new IOException("All merged files must be compressed or not.");
3160            } 
3161            super.put(stream);
3162          }
3163          
3164          /**
3165           * A queue of file segments to merge
3166           * @param segments the file segments to merge
3167           * @param tmpDir a relative local directory to save intermediate files in
3168           * @param progress the reference to the Progressable object
3169           */
3170          public MergeQueue(List <SegmentDescriptor> segments,
3171              Path tmpDir, Progressable progress) {
3172            int size = segments.size();
3173            for (int i = 0; i < size; i++) {
3174              sortedSegmentSizes.put(segments.get(i), null);
3175            }
3176            this.tmpDir = tmpDir;
3177            this.progress = progress;
3178          }
3179          protected boolean lessThan(Object a, Object b) {
3180            // indicate we're making progress
3181            if (progress != null) {
3182              progress.progress();
3183            }
3184            SegmentDescriptor msa = (SegmentDescriptor)a;
3185            SegmentDescriptor msb = (SegmentDescriptor)b;
3186            return comparator.compare(msa.getKey().getData(), 0, 
3187                                      msa.getKey().getLength(), msb.getKey().getData(), 0, 
3188                                      msb.getKey().getLength()) < 0;
3189          }
3190          public void close() throws IOException {
3191            SegmentDescriptor ms;                           // close inputs
3192            while ((ms = (SegmentDescriptor)pop()) != null) {
3193              ms.cleanup();
3194            }
3195            minSegment = null;
3196          }
3197          public DataOutputBuffer getKey() throws IOException {
3198            return rawKey;
3199          }
3200          public ValueBytes getValue() throws IOException {
3201            return rawValue;
3202          }
3203          public boolean next() throws IOException {
3204            if (size() == 0)
3205              return false;
3206            if (minSegment != null) {
3207              //minSegment is non-null for all invocations of next except the first
3208              //one. For the first invocation, the priority queue is ready for use
3209              //but for the subsequent invocations, first adjust the queue 
3210              adjustPriorityQueue(minSegment);
3211              if (size() == 0) {
3212                minSegment = null;
3213                return false;
3214              }
3215            }
3216            minSegment = (SegmentDescriptor)top();
3217            long startPos = minSegment.in.getPosition(); // Current position in stream
3218            //save the raw key reference
3219            rawKey = minSegment.getKey();
3220            //load the raw value. Re-use the existing rawValue buffer
3221            if (rawValue == null) {
3222              rawValue = minSegment.in.createValueBytes();
3223            }
3224            minSegment.nextRawValue(rawValue);
3225            long endPos = minSegment.in.getPosition(); // End position after reading value
3226            updateProgress(endPos - startPos);
3227            return true;
3228          }
3229          
3230          public Progress getProgress() {
3231            return mergeProgress; 
3232          }
3233    
3234          private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3235            long startPos = ms.in.getPosition(); // Current position in stream
3236            boolean hasNext = ms.nextRawKey();
3237            long endPos = ms.in.getPosition(); // End position after reading key
3238            updateProgress(endPos - startPos);
3239            if (hasNext) {
3240              adjustTop();
3241            } else {
3242              pop();
3243              ms.cleanup();
3244            }
3245          }
3246    
3247          private void updateProgress(long bytesProcessed) {
3248            totalBytesProcessed += bytesProcessed;
3249            if (progPerByte > 0) {
3250              mergeProgress.set(totalBytesProcessed * progPerByte);
3251            }
3252          }
3253          
3254          /** This is the single level merge that is called multiple times 
3255           * depending on the factor size and the number of segments
3256           * @return RawKeyValueIterator
3257           * @throws IOException
3258           */
3259          public RawKeyValueIterator merge() throws IOException {
3260            //create the MergeStreams from the sorted map created in the constructor
3261            //and dump the final output to a file
3262            int numSegments = sortedSegmentSizes.size();
3263            int origFactor = factor;
3264            int passNo = 1;
3265            LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3266            do {
3267              //get the factor for this pass of merge
3268              factor = getPassFactor(passNo, numSegments);
3269              List<SegmentDescriptor> segmentsToMerge =
3270                new ArrayList<SegmentDescriptor>();
3271              int segmentsConsidered = 0;
3272              int numSegmentsToConsider = factor;
3273              while (true) {
3274                //extract the smallest 'factor' number of segment pointers from the 
3275                //TreeMap. Call cleanup on the empty segments (no key/value data)
3276                SegmentDescriptor[] mStream = 
3277                  getSegmentDescriptors(numSegmentsToConsider);
3278                for (int i = 0; i < mStream.length; i++) {
3279                  if (mStream[i].nextRawKey()) {
3280                    segmentsToMerge.add(mStream[i]);
3281                    segmentsConsidered++;
3282                    // Count the fact that we read some bytes in calling nextRawKey()
3283                    updateProgress(mStream[i].in.getPosition());
3284                  }
3285                  else {
3286                    mStream[i].cleanup();
3287                    numSegments--; //we ignore this segment for the merge
3288                  }
3289                }
3290                //if we have the desired number of segments
3291                //or looked at all available segments, we break
3292                if (segmentsConsidered == factor || 
3293                    sortedSegmentSizes.size() == 0) {
3294                  break;
3295                }
3296                  
3297                numSegmentsToConsider = factor - segmentsConsidered;
3298              }
3299              //feed the streams to the priority queue
3300              initialize(segmentsToMerge.size()); clear();
3301              for (int i = 0; i < segmentsToMerge.size(); i++) {
3302                put(segmentsToMerge.get(i));
3303              }
3304              //if we have lesser number of segments remaining, then just return the
3305              //iterator, else do another single level merge
3306              if (numSegments <= factor) {
3307                //calculate the length of the remaining segments. Required for 
3308                //calculating the merge progress
3309                long totalBytes = 0;
3310                for (int i = 0; i < segmentsToMerge.size(); i++) {
3311                  totalBytes += segmentsToMerge.get(i).segmentLength;
3312                }
3313                if (totalBytes != 0) //being paranoid
3314                  progPerByte = 1.0f / (float)totalBytes;
3315                //reset factor to what it originally was
3316                factor = origFactor;
3317                return this;
3318              } else {
3319                //we want to spread the creation of temp files on multiple disks if 
3320                //available under the space constraints
3321                long approxOutputSize = 0; 
3322                for (SegmentDescriptor s : segmentsToMerge) {
3323                  approxOutputSize += s.segmentLength + 
3324                                      ChecksumFileSystem.getApproxChkSumLength(
3325                                      s.segmentLength);
3326                }
3327                Path tmpFilename = 
3328                  new Path(tmpDir, "intermediate").suffix("." + passNo);
3329    
3330                Path outputFile =  lDirAlloc.getLocalPathForWrite(
3331                                                    tmpFilename.toString(),
3332                                                    approxOutputSize, conf);
3333                if(LOG.isDebugEnabled()) { 
3334                  LOG.debug("writing intermediate results to " + outputFile);
3335                }
3336                Writer writer = cloneFileAttributes(
3337                                                    fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3338                                                    fs.makeQualified(outputFile), null);
3339                writer.sync = null; //disable sync for temp files
3340                writeFile(this, writer);
3341                writer.close();
3342                
3343                //we finished one single level merge; now clean up the priority 
3344                //queue
3345                this.close();
3346                
3347                SegmentDescriptor tempSegment = 
3348                  new SegmentDescriptor(0,
3349                      fs.getFileStatus(outputFile).getLen(), outputFile);
3350                //put the segment back in the TreeMap
3351                sortedSegmentSizes.put(tempSegment, null);
3352                numSegments = sortedSegmentSizes.size();
3353                passNo++;
3354              }
3355              //we are worried about only the first pass merge factor. So reset the 
3356              //factor to what it originally was
3357              factor = origFactor;
3358            } while(true);
3359          }
3360      
3361          //Hadoop-591
3362          public int getPassFactor(int passNo, int numSegments) {
3363            if (passNo > 1 || numSegments <= factor || factor == 1) 
3364              return factor;
3365            int mod = (numSegments - 1) % (factor - 1);
3366            if (mod == 0)
3367              return factor;
3368            return mod + 1;
3369          }
3370          
3371          /** Return (& remove) the requested number of segment descriptors from the
3372           * sorted map.
3373           */
3374          public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3375            if (numDescriptors > sortedSegmentSizes.size())
3376              numDescriptors = sortedSegmentSizes.size();
3377            SegmentDescriptor[] SegmentDescriptors = 
3378              new SegmentDescriptor[numDescriptors];
3379            Iterator iter = sortedSegmentSizes.keySet().iterator();
3380            int i = 0;
3381            while (i < numDescriptors) {
3382              SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3383              iter.remove();
3384            }
3385            return SegmentDescriptors;
3386          }
3387        } // SequenceFile.Sorter.MergeQueue
3388    
3389        /** This class defines a merge segment. This class can be subclassed to 
3390         * provide a customized cleanup method implementation. In this 
3391         * implementation, cleanup closes the file handle and deletes the file 
3392         */
3393        public class SegmentDescriptor implements Comparable {
3394          
3395          long segmentOffset; //the start of the segment in the file
3396          long segmentLength; //the length of the segment
3397          Path segmentPathName; //the path name of the file containing the segment
3398          boolean ignoreSync = true; //set to true for temp files
3399          private Reader in = null; 
3400          private DataOutputBuffer rawKey = null; //this will hold the current key
3401          private boolean preserveInput = false; //delete input segment files?
3402          
3403          /** Constructs a segment
3404           * @param segmentOffset the offset of the segment in the file
3405           * @param segmentLength the length of the segment
3406           * @param segmentPathName the path name of the file containing the segment
3407           */
3408          public SegmentDescriptor (long segmentOffset, long segmentLength, 
3409                                    Path segmentPathName) {
3410            this.segmentOffset = segmentOffset;
3411            this.segmentLength = segmentLength;
3412            this.segmentPathName = segmentPathName;
3413          }
3414          
3415          /** Do the sync checks */
3416          public void doSync() {ignoreSync = false;}
3417          
3418          /** Whether to delete the files when no longer needed */
3419          public void preserveInput(boolean preserve) {
3420            preserveInput = preserve;
3421          }
3422    
3423          public boolean shouldPreserveInput() {
3424            return preserveInput;
3425          }
3426          
3427          public int compareTo(Object o) {
3428            SegmentDescriptor that = (SegmentDescriptor)o;
3429            if (this.segmentLength != that.segmentLength) {
3430              return (this.segmentLength < that.segmentLength ? -1 : 1);
3431            }
3432            if (this.segmentOffset != that.segmentOffset) {
3433              return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3434            }
3435            return (this.segmentPathName.toString()).
3436              compareTo(that.segmentPathName.toString());
3437          }
3438    
3439          public boolean equals(Object o) {
3440            if (!(o instanceof SegmentDescriptor)) {
3441              return false;
3442            }
3443            SegmentDescriptor that = (SegmentDescriptor)o;
3444            if (this.segmentLength == that.segmentLength &&
3445                this.segmentOffset == that.segmentOffset &&
3446                this.segmentPathName.toString().equals(
3447                  that.segmentPathName.toString())) {
3448              return true;
3449            }
3450            return false;
3451          }
3452    
3453          public int hashCode() {
3454            return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3455          }
3456    
3457          /** Fills up the rawKey object with the key returned by the Reader
3458           * @return true if there is a key returned; false, otherwise
3459           * @throws IOException
3460           */
3461          public boolean nextRawKey() throws IOException {
3462            if (in == null) {
3463              int bufferSize = getBufferSize(conf); 
3464              Reader reader = new Reader(conf,
3465                                         Reader.file(segmentPathName), 
3466                                         Reader.bufferSize(bufferSize),
3467                                         Reader.start(segmentOffset), 
3468                                         Reader.length(segmentLength));
3469            
3470              //sometimes we ignore syncs especially for temp merge files
3471              if (ignoreSync) reader.ignoreSync();
3472    
3473              if (reader.getKeyClass() != keyClass)
3474                throw new IOException("wrong key class: " + reader.getKeyClass() +
3475                                      " is not " + keyClass);
3476              if (reader.getValueClass() != valClass)
3477                throw new IOException("wrong value class: "+reader.getValueClass()+
3478                                      " is not " + valClass);
3479              this.in = reader;
3480              rawKey = new DataOutputBuffer();
3481            }
3482            rawKey.reset();
3483            int keyLength = 
3484              in.nextRawKey(rawKey);
3485            return (keyLength >= 0);
3486          }
3487    
3488          /** Fills up the passed rawValue with the value corresponding to the key
3489           * read earlier
3490           * @param rawValue
3491           * @return the length of the value
3492           * @throws IOException
3493           */
3494          public int nextRawValue(ValueBytes rawValue) throws IOException {
3495            int valLength = in.nextRawValue(rawValue);
3496            return valLength;
3497          }
3498          
3499          /** Returns the stored rawKey */
3500          public DataOutputBuffer getKey() {
3501            return rawKey;
3502          }
3503          
3504          /** closes the underlying reader */
3505          private void close() throws IOException {
3506            this.in.close();
3507            this.in = null;
3508          }
3509    
3510          /** The default cleanup. Subclasses can override this with a custom 
3511           * cleanup 
3512           */
3513          public void cleanup() throws IOException {
3514            close();
3515            if (!preserveInput) {
3516              fs.delete(segmentPathName, true);
3517            }
3518          }
3519        } // SequenceFile.Sorter.SegmentDescriptor
3520        
3521        /** This class provisions multiple segments contained within a single
3522         *  file
3523         */
3524        private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3525    
3526          SegmentContainer parentContainer = null;
3527    
3528          /** Constructs a segment
3529           * @param segmentOffset the offset of the segment in the file
3530           * @param segmentLength the length of the segment
3531           * @param segmentPathName the path name of the file containing the segment
3532           * @param parent the parent SegmentContainer that holds the segment
3533           */
3534          public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3535                                           Path segmentPathName, SegmentContainer parent) {
3536            super(segmentOffset, segmentLength, segmentPathName);
3537            this.parentContainer = parent;
3538          }
3539          /** The default cleanup. Subclasses can override this with a custom 
3540           * cleanup 
3541           */
3542          public void cleanup() throws IOException {
3543            super.close();
3544            if (super.shouldPreserveInput()) return;
3545            parentContainer.cleanup();
3546          }
3547          
3548          public boolean equals(Object o) {
3549            if (!(o instanceof LinkedSegmentsDescriptor)) {
3550              return false;
3551            }
3552            return super.equals(o);
3553          }
3554        } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3555    
3556        /** The class that defines a container for segments to be merged. Primarily
3557         * required to delete temp files as soon as all the contained segments
3558         * have been looked at */
3559        private class SegmentContainer {
3560          private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3561          private int numSegmentsContained; //# of segments contained
3562          private Path inName; //input file from where segments are created
3563          
3564          //the list of segments read from the file
3565          private ArrayList <SegmentDescriptor> segments = 
3566            new ArrayList <SegmentDescriptor>();
3567          /** This constructor is there primarily to serve the sort routine that 
3568           * generates a single output file with an associated index file */
3569          public SegmentContainer(Path inName, Path indexIn) throws IOException {
3570            //get the segments from indexIn
3571            FSDataInputStream fsIndexIn = fs.open(indexIn);
3572            long end = fs.getFileStatus(indexIn).getLen();
3573            while (fsIndexIn.getPos() < end) {
3574              long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3575              long segmentLength = WritableUtils.readVLong(fsIndexIn);
3576              Path segmentName = inName;
3577              segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3578                                                        segmentLength, segmentName, this));
3579            }
3580            fsIndexIn.close();
3581            fs.delete(indexIn, true);
3582            numSegmentsContained = segments.size();
3583            this.inName = inName;
3584          }
3585    
3586          public List <SegmentDescriptor> getSegmentList() {
3587            return segments;
3588          }
3589          public void cleanup() throws IOException {
3590            numSegmentsCleanedUp++;
3591            if (numSegmentsCleanedUp == numSegmentsContained) {
3592              fs.delete(inName, true);
3593            }
3594          }
3595        } //SequenceFile.Sorter.SegmentContainer
3596    
3597      } // SequenceFile.Sorter
3598    
3599    } // SequenceFile