001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.partition;
020    
021    import java.io.UnsupportedEncodingException;
022    import java.util.List;
023    
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configurable;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.mapreduce.Job;
031    import org.apache.hadoop.mapreduce.JobContext;
032    import org.apache.hadoop.mapreduce.MRJobConfig;
033    import org.apache.hadoop.mapreduce.Partitioner;
034    import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
035    
036     /**   
037      *  Defines a way to partition keys based on certain key fields (also see
038      *  {@link KeyFieldBasedComparator}.
039      *  The key specification supported is of the form -k pos1[,pos2], where,
040      *  pos is of the form f[.c][opts], where f is the number
041      *  of the key field to use, and c is the number of the first character from
042      *  the beginning of the field. Fields and character posns are numbered 
043      *  starting with 1; a character position of zero in pos2 indicates the
044      *  field's last character. If '.c' is omitted from pos1, it defaults to 1
045      *  (the beginning of the field); if omitted from pos2, it defaults to 0 
046      *  (the end of the field).
047      * 
048      */
049    @InterfaceAudience.Public
050    @InterfaceStability.Stable
051    public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 
052        implements Configurable {
053    
054      private static final Log LOG = LogFactory.getLog(
055                                       KeyFieldBasedPartitioner.class.getName());
056      public static String PARTITIONER_OPTIONS = 
057        "mapreduce.partition.keypartitioner.options";
058      private int numOfPartitionFields;
059      
060      private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
061      
062      private Configuration conf;
063    
064      public void setConf(Configuration conf) {
065        this.conf = conf;
066        String keyFieldSeparator = 
067          conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
068        keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
069        if (conf.get("num.key.fields.for.partition") != null) {
070          LOG.warn("Using deprecated num.key.fields.for.partition. " +
071                    "Use mapreduce.partition.keypartitioner.options instead");
072          this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
073          keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
074        } else {
075          String option = conf.get(PARTITIONER_OPTIONS);
076          keyFieldHelper.parseOption(option);
077        }
078      }
079    
080      public Configuration getConf() {
081        return conf;
082      }
083      
084      public int getPartition(K2 key, V2 value, int numReduceTasks) {
085        byte[] keyBytes;
086    
087        List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
088        if (allKeySpecs.size() == 0) {
089          return getPartition(key.toString().hashCode(), numReduceTasks);
090        }
091    
092        try {
093          keyBytes = key.toString().getBytes("UTF-8");
094        } catch (UnsupportedEncodingException e) {
095          throw new RuntimeException("The current system does not " +
096              "support UTF-8 encoding!", e);
097        }
098        // return 0 if the key is empty
099        if (keyBytes.length == 0) {
100          return 0;
101        }
102        
103        int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
104            keyBytes.length);
105        int currentHash = 0;
106        for (KeyDescription keySpec : allKeySpecs) {
107          int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 
108            keyBytes.length, lengthIndicesFirst, keySpec);
109           // no key found! continue
110          if (startChar < 0) {
111            continue;
112          }
113          int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
114              lengthIndicesFirst, keySpec);
115          currentHash = hashCode(keyBytes, startChar, endChar, 
116              currentHash);
117        }
118        return getPartition(currentHash, numReduceTasks);
119      }
120      
121      protected int hashCode(byte[] b, int start, int end, int currentHash) {
122        for (int i = start; i <= end; i++) {
123          currentHash = 31*currentHash + b[i];
124        }
125        return currentHash;
126      }
127    
128      protected int getPartition(int hash, int numReduceTasks) {
129        return (hash & Integer.MAX_VALUE) % numReduceTasks;
130      }
131      
132      /**
133       * Set the {@link KeyFieldBasedPartitioner} options used for 
134       * {@link Partitioner}
135       * 
136       * @param keySpec the key specification of the form -k pos1[,pos2], where,
137       *  pos is of the form f[.c][opts], where f is the number
138       *  of the key field to use, and c is the number of the first character from
139       *  the beginning of the field. Fields and character posns are numbered 
140       *  starting with 1; a character position of zero in pos2 indicates the
141       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
142       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
143       *  (the end of the field).
144       */
145      public void setKeyFieldPartitionerOptions(Job job, String keySpec) {
146        job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec);
147      }
148      
149      /**
150       * Get the {@link KeyFieldBasedPartitioner} options
151       */
152      public String getKeyFieldPartitionerOption(JobContext job) {
153        return job.getConfiguration().get(PARTITIONER_OPTIONS);
154      }
155    
156    
157    }