001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs;
019    
020    import java.io.*;
021    import java.nio.ByteBuffer;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    
026    /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
027     * and buffers input through a {@link BufferedInputStream}. */
028    @InterfaceAudience.Public
029    @InterfaceStability.Stable
030    public class FSDataInputStream extends DataInputStream
031        implements Seekable, PositionedReadable, Closeable, ByteBufferReadable, HasFileDescriptor {
032    
033      public FSDataInputStream(InputStream in)
034        throws IOException {
035        super(in);
036        if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
037          throw new IllegalArgumentException(
038              "In is not an instance of Seekable or PositionedReadable");
039        }
040      }
041      
042      /**
043       * Seek to the given offset.
044       *
045       * @param desired offset to seek to
046       */
047      public synchronized void seek(long desired) throws IOException {
048        ((Seekable)in).seek(desired);
049      }
050    
051      /**
052       * Get the current position in the input stream.
053       *
054       * @return current position in the input stream
055       */
056      public long getPos() throws IOException {
057        return ((Seekable)in).getPos();
058      }
059      
060      /**
061       * Read bytes from the given position in the stream to the given buffer.
062       *
063       * @param position  position in the input stream to seek
064       * @param buffer    buffer into which data is read
065       * @param offset    offset into the buffer in which data is written
066       * @param length    maximum number of bytes to read
067       * @return total number of bytes read into the buffer, or <code>-1</code>
068       *         if there is no more data because the end of the stream has been
069       *         reached
070       */
071      public int read(long position, byte[] buffer, int offset, int length)
072        throws IOException {
073        return ((PositionedReadable)in).read(position, buffer, offset, length);
074      }
075    
076      /**
077       * Read bytes from the given position in the stream to the given buffer.
078       * Continues to read until <code>length</code> bytes have been read.
079       *
080       * @param position  position in the input stream to seek
081       * @param buffer    buffer into which data is read
082       * @param offset    offset into the buffer in which data is written
083       * @param length    the number of bytes to read
084       * @throws EOFException If the end of stream is reached while reading.
085       *                      If an exception is thrown an undetermined number
086       *                      of bytes in the buffer may have been written. 
087       */
088      public void readFully(long position, byte[] buffer, int offset, int length)
089        throws IOException {
090        ((PositionedReadable)in).readFully(position, buffer, offset, length);
091      }
092      
093      /**
094       * See {@link #readFully(long, byte[], int, int)}.
095       */
096      public void readFully(long position, byte[] buffer)
097        throws IOException {
098        ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
099      }
100      
101      /**
102       * Seek to the given position on an alternate copy of the data.
103       *
104       * @param  targetPos  position to seek to
105       * @return true if a new source is found, false otherwise
106       */
107      public boolean seekToNewSource(long targetPos) throws IOException {
108        return ((Seekable)in).seekToNewSource(targetPos); 
109      }
110      
111      /**
112       * Get a reference to the wrapped input stream. Used by unit tests.
113       *
114       * @return the underlying input stream
115       */
116      @InterfaceAudience.LimitedPrivate({"HDFS"})
117      public InputStream getWrappedStream() {
118        return in;
119      }
120    
121      public int read(ByteBuffer buf) throws IOException {
122        if (in instanceof ByteBufferReadable) {
123          return ((ByteBufferReadable)in).read(buf);
124        }
125    
126        throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
127      }
128    
129      @Override
130      public FileDescriptor getFileDescriptor() throws IOException {
131        if (in instanceof HasFileDescriptor) {
132          return ((HasFileDescriptor) in).getFileDescriptor();
133        } else if (in instanceof FileInputStream) {
134          return ((FileInputStream) in).getFD();
135        } else {
136          return null;
137        }
138      }
139    }