001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.partition; 020 021 import java.io.UnsupportedEncodingException; 022 import java.util.List; 023 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.apache.hadoop.classification.InterfaceAudience; 027 import org.apache.hadoop.classification.InterfaceStability; 028 import org.apache.hadoop.conf.Configurable; 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.mapreduce.Job; 031 import org.apache.hadoop.mapreduce.JobContext; 032 import org.apache.hadoop.mapreduce.MRJobConfig; 033 import org.apache.hadoop.mapreduce.Partitioner; 034 import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription; 035 036 /** 037 * Defines a way to partition keys based on certain key fields (also see 038 * {@link KeyFieldBasedComparator}. 039 * The key specification supported is of the form -k pos1[,pos2], where, 040 * pos is of the form f[.c][opts], where f is the number 041 * of the key field to use, and c is the number of the first character from 042 * the beginning of the field. Fields and character posns are numbered 043 * starting with 1; a character position of zero in pos2 indicates the 044 * field's last character. If '.c' is omitted from pos1, it defaults to 1 045 * (the beginning of the field); if omitted from pos2, it defaults to 0 046 * (the end of the field). 047 * 048 */ 049 @InterfaceAudience.Public 050 @InterfaceStability.Stable 051 public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 052 implements Configurable { 053 054 private static final Log LOG = LogFactory.getLog( 055 KeyFieldBasedPartitioner.class.getName()); 056 public static String PARTITIONER_OPTIONS = 057 "mapreduce.partition.keypartitioner.options"; 058 private int numOfPartitionFields; 059 060 private KeyFieldHelper keyFieldHelper = new KeyFieldHelper(); 061 062 private Configuration conf; 063 064 public void setConf(Configuration conf) { 065 this.conf = conf; 066 String keyFieldSeparator = 067 conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t"); 068 keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); 069 if (conf.get("num.key.fields.for.partition") != null) { 070 LOG.warn("Using deprecated num.key.fields.for.partition. " + 071 "Use mapreduce.partition.keypartitioner.options instead"); 072 this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0); 073 keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields); 074 } else { 075 String option = conf.get(PARTITIONER_OPTIONS); 076 keyFieldHelper.parseOption(option); 077 } 078 } 079 080 public Configuration getConf() { 081 return conf; 082 } 083 084 public int getPartition(K2 key, V2 value, int numReduceTasks) { 085 byte[] keyBytes; 086 087 List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs(); 088 if (allKeySpecs.size() == 0) { 089 return getPartition(key.toString().hashCode(), numReduceTasks); 090 } 091 092 try { 093 keyBytes = key.toString().getBytes("UTF-8"); 094 } catch (UnsupportedEncodingException e) { 095 throw new RuntimeException("The current system does not " + 096 "support UTF-8 encoding!", e); 097 } 098 // return 0 if the key is empty 099 if (keyBytes.length == 0) { 100 return 0; 101 } 102 103 int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 104 keyBytes.length); 105 int currentHash = 0; 106 for (KeyDescription keySpec : allKeySpecs) { 107 int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 108 keyBytes.length, lengthIndicesFirst, keySpec); 109 // no key found! continue 110 if (startChar < 0) { 111 continue; 112 } 113 int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 114 lengthIndicesFirst, keySpec); 115 currentHash = hashCode(keyBytes, startChar, endChar, 116 currentHash); 117 } 118 return getPartition(currentHash, numReduceTasks); 119 } 120 121 protected int hashCode(byte[] b, int start, int end, int currentHash) { 122 for (int i = start; i <= end; i++) { 123 currentHash = 31*currentHash + b[i]; 124 } 125 return currentHash; 126 } 127 128 protected int getPartition(int hash, int numReduceTasks) { 129 return (hash & Integer.MAX_VALUE) % numReduceTasks; 130 } 131 132 /** 133 * Set the {@link KeyFieldBasedPartitioner} options used for 134 * {@link Partitioner} 135 * 136 * @param keySpec the key specification of the form -k pos1[,pos2], where, 137 * pos is of the form f[.c][opts], where f is the number 138 * of the key field to use, and c is the number of the first character from 139 * the beginning of the field. Fields and character posns are numbered 140 * starting with 1; a character position of zero in pos2 indicates the 141 * field's last character. If '.c' is omitted from pos1, it defaults to 1 142 * (the beginning of the field); if omitted from pos2, it defaults to 0 143 * (the end of the field). 144 */ 145 public void setKeyFieldPartitionerOptions(Job job, String keySpec) { 146 job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec); 147 } 148 149 /** 150 * Get the {@link KeyFieldBasedPartitioner} options 151 */ 152 public String getKeyFieldPartitionerOption(JobContext job) { 153 return job.getConfiguration().get(PARTITIONER_OPTIONS); 154 } 155 156 157 }