001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.io.compress;
019    
020    import java.util.*;
021    
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.fs.Path;
028    import org.apache.hadoop.util.ReflectionUtils;
029    
030    /**
031     * A factory that will find the correct codec for a given filename.
032     */
033    @InterfaceAudience.Public
034    @InterfaceStability.Evolving
035    public class CompressionCodecFactory {
036    
037      public static final Log LOG =
038        LogFactory.getLog(CompressionCodecFactory.class.getName());
039      
040      private static final ServiceLoader<CompressionCodec> CODEC_PROVIDERS =
041        ServiceLoader.load(CompressionCodec.class);
042    
043      /**
044       * A map from the reversed filename suffixes to the codecs.
045       * This is probably overkill, because the maps should be small, but it 
046       * automatically supports finding the longest matching suffix. 
047       */
048      private SortedMap<String, CompressionCodec> codecs = null;
049    
050        /**
051         * A map from the reversed filename suffixes to the codecs.
052         * This is probably overkill, because the maps should be small, but it
053         * automatically supports finding the longest matching suffix.
054         */
055        private Map<String, CompressionCodec> codecsByName = null;
056    
057      /**
058       * A map from class names to the codecs
059       */
060      private HashMap<String, CompressionCodec> codecsByClassName = null;
061    
062      private void addCodec(CompressionCodec codec) {
063        String suffix = codec.getDefaultExtension();
064        codecs.put(new StringBuilder(suffix).reverse().toString(), codec);
065        codecsByClassName.put(codec.getClass().getCanonicalName(), codec);
066    
067        String codecName = codec.getClass().getSimpleName();
068        codecsByName.put(codecName.toLowerCase(), codec);
069        if (codecName.endsWith("Codec")) {
070          codecName = codecName.substring(0, codecName.length() - "Codec".length());
071          codecsByName.put(codecName.toLowerCase(), codec);
072        }
073      }
074    
075      /**
076       * Print the extension map out as a string.
077       */
078      public String toString() {
079        StringBuilder buf = new StringBuilder();
080        Iterator<Map.Entry<String, CompressionCodec>> itr = 
081          codecs.entrySet().iterator();
082        buf.append("{ ");
083        if (itr.hasNext()) {
084          Map.Entry<String, CompressionCodec> entry = itr.next();
085          buf.append(entry.getKey());
086          buf.append(": ");
087          buf.append(entry.getValue().getClass().getName());
088          while (itr.hasNext()) {
089            entry = itr.next();
090            buf.append(", ");
091            buf.append(entry.getKey());
092            buf.append(": ");
093            buf.append(entry.getValue().getClass().getName());
094          }
095        }
096        buf.append(" }");
097        return buf.toString();
098      }
099    
100      /**
101       * Get the list of codecs discovered via a Java ServiceLoader, or
102       * listed in the configuration. Codecs specified in configuration come
103       * later in the returned list, and are considered to override those
104       * from the ServiceLoader.
105       * @param conf the configuration to look in
106       * @return a list of the {@link CompressionCodec} classes
107       */
108      public static List<Class<? extends CompressionCodec>> getCodecClasses(Configuration conf) {
109        List<Class<? extends CompressionCodec>> result
110          = new ArrayList<Class<? extends CompressionCodec>>();
111        // Add codec classes discovered via service loading
112        synchronized (CODEC_PROVIDERS) {
113          // CODEC_PROVIDERS is a lazy collection. Synchronize so it is
114          // thread-safe. See HADOOP-8406.
115          for (CompressionCodec codec : CODEC_PROVIDERS) {
116            result.add(codec.getClass());
117          }
118        }
119        // Add codec classes from configuration
120        String codecsString = conf.get("io.compression.codecs");
121        if (codecsString != null) {
122          StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
123          while (codecSplit.hasMoreElements()) {
124            String codecSubstring = codecSplit.nextToken();
125            if (codecSubstring.length() != 0) {
126              try {
127                Class<?> cls = conf.getClassByName(codecSubstring);
128                if (!CompressionCodec.class.isAssignableFrom(cls)) {
129                  throw new IllegalArgumentException("Class " + codecSubstring +
130                                                     " is not a CompressionCodec");
131                }
132                result.add(cls.asSubclass(CompressionCodec.class));
133              } catch (ClassNotFoundException ex) {
134                throw new IllegalArgumentException("Compression codec " + 
135                                                   codecSubstring + " not found.",
136                                                   ex);
137              }
138            }
139          }
140        }
141        return result;
142      }
143      
144      /**
145       * Sets a list of codec classes in the configuration. In addition to any
146       * classes specified using this method, {@link CompressionCodec} classes on
147       * the classpath are discovered using a Java ServiceLoader.
148       * @param conf the configuration to modify
149       * @param classes the list of classes to set
150       */
151      public static void setCodecClasses(Configuration conf,
152                                         List<Class> classes) {
153        StringBuilder buf = new StringBuilder();
154        Iterator<Class> itr = classes.iterator();
155        if (itr.hasNext()) {
156          Class cls = itr.next();
157          buf.append(cls.getName());
158          while(itr.hasNext()) {
159            buf.append(',');
160            buf.append(itr.next().getName());
161          }
162        }
163        conf.set("io.compression.codecs", buf.toString());   
164      }
165      
166      /**
167       * Find the codecs specified in the config value io.compression.codecs 
168       * and register them. Defaults to gzip and deflate.
169       */
170      public CompressionCodecFactory(Configuration conf) {
171        codecs = new TreeMap<String, CompressionCodec>();
172        codecsByClassName = new HashMap<String, CompressionCodec>();
173        codecsByName = new HashMap<String, CompressionCodec>();
174        List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf);
175        if (codecClasses == null || codecClasses.isEmpty()) {
176          addCodec(new GzipCodec());
177          addCodec(new DefaultCodec());      
178        } else {
179          for (Class<? extends CompressionCodec> codecClass : codecClasses) {
180            addCodec(ReflectionUtils.newInstance(codecClass, conf));
181          }
182        }
183      }
184      
185      /**
186       * Find the relevant compression codec for the given file based on its
187       * filename suffix.
188       * @param file the filename to check
189       * @return the codec object
190       */
191      public CompressionCodec getCodec(Path file) {
192        CompressionCodec result = null;
193        if (codecs != null) {
194          String filename = file.getName();
195          String reversedFilename = new StringBuilder(filename).reverse().toString();
196          SortedMap<String, CompressionCodec> subMap = 
197            codecs.headMap(reversedFilename);
198          if (!subMap.isEmpty()) {
199            String potentialSuffix = subMap.lastKey();
200            if (reversedFilename.startsWith(potentialSuffix)) {
201              result = codecs.get(potentialSuffix);
202            }
203          }
204        }
205        return result;
206      }
207      
208      /**
209       * Find the relevant compression codec for the codec's canonical class name.
210       * @param classname the canonical class name of the codec
211       * @return the codec object
212       */
213      public CompressionCodec getCodecByClassName(String classname) {
214        if (codecsByClassName == null) {
215          return null;
216        }
217        return codecsByClassName.get(classname);
218      }
219    
220        /**
221         * Find the relevant compression codec for the codec's canonical class name
222         * or by codec alias.
223         * <p/>
224         * Codec aliases are case insensitive.
225         * <p/>
226         * The code alias is the short class name (without the package name).
227         * If the short class name ends with 'Codec', then there are two aliases for
228         * the codec, the complete short class name and the short class name without
229         * the 'Codec' ending. For example for the 'GzipCodec' codec class name the
230         * alias are 'gzip' and 'gzipcodec'.
231         *
232         * @param codecName the canonical class name of the codec
233         * @return the codec object
234         */
235        public CompressionCodec getCodecByName(String codecName) {
236          if (codecsByClassName == null) {
237            return null;
238          }
239          CompressionCodec codec = getCodecByClassName(codecName);
240          if (codec == null) {
241            // trying to get the codec by name in case the name was specified instead a class
242            codec = codecsByName.get(codecName.toLowerCase());
243          }
244          return codec;
245        }
246    
247        /**
248         * Find the relevant compression codec for the codec's canonical class name
249         * or by codec alias and returns its implemetation class.
250         * <p/>
251         * Codec aliases are case insensitive.
252         * <p/>
253         * The code alias is the short class name (without the package name).
254         * If the short class name ends with 'Codec', then there are two aliases for
255         * the codec, the complete short class name and the short class name without
256         * the 'Codec' ending. For example for the 'GzipCodec' codec class name the
257         * alias are 'gzip' and 'gzipcodec'.
258         *
259         * @param codecName the canonical class name of the codec
260         * @return the codec class
261         */
262        public Class<? extends CompressionCodec> getCodecClassByName(String codecName) {
263          CompressionCodec codec = getCodecByName(codecName);
264          if (codec == null) {
265            return null;
266          }
267          return codec.getClass();
268        }
269    
270      /**
271       * Removes a suffix from a filename, if it has it.
272       * @param filename the filename to strip
273       * @param suffix the suffix to remove
274       * @return the shortened filename
275       */
276      public static String removeSuffix(String filename, String suffix) {
277        if (filename.endsWith(suffix)) {
278          return filename.substring(0, filename.length() - suffix.length());
279        }
280        return filename;
281      }
282      
283      /**
284       * A little test program.
285       * @param args
286       */
287      public static void main(String[] args) throws Exception {
288        Configuration conf = new Configuration();
289        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
290        boolean encode = false;
291        for(int i=0; i < args.length; ++i) {
292          if ("-in".equals(args[i])) {
293            encode = true;
294          } else if ("-out".equals(args[i])) {
295            encode = false;
296          } else {
297            CompressionCodec codec = factory.getCodec(new Path(args[i]));
298            if (codec == null) {
299              System.out.println("Codec for " + args[i] + " not found.");
300            } else { 
301              if (encode) {
302                CompressionOutputStream out = null;
303                java.io.InputStream in = null;
304                try {
305                  out = codec.createOutputStream(
306                      new java.io.FileOutputStream(args[i]));
307                  byte[] buffer = new byte[100];
308                  String inFilename = removeSuffix(args[i], 
309                      codec.getDefaultExtension());
310                  in = new java.io.FileInputStream(inFilename);
311                  int len = in.read(buffer);
312                  while (len > 0) {
313                    out.write(buffer, 0, len);
314                    len = in.read(buffer);
315                  }
316                } finally {
317                  if(out != null) { out.close(); }
318                  if(in  != null) { in.close(); }
319                }
320              } else {
321                CompressionInputStream in = null;
322                try {
323                  in = codec.createInputStream(
324                      new java.io.FileInputStream(args[i]));
325                  byte[] buffer = new byte[100];
326                  int len = in.read(buffer);
327                  while (len > 0) {
328                    System.out.write(buffer, 0, len);
329                    len = in.read(buffer);
330                  }
331                } finally {
332                  if(in != null) { in.close(); }
333                }
334              }
335            }
336          }
337        }
338      }
339    }