001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.security; 020 021 import java.io.IOException; 022 import java.util.HashSet; 023 import java.util.List; 024 import java.util.Set; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.io.Text; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.hadoop.mapred.Master; 036 import org.apache.hadoop.mapreduce.MRJobConfig; 037 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; 038 import org.apache.hadoop.security.Credentials; 039 import org.apache.hadoop.security.UserGroupInformation; 040 import org.apache.hadoop.security.token.Token; 041 import org.apache.hadoop.security.token.TokenIdentifier; 042 043 044 /** 045 * This class provides user facing APIs for transferring secrets from 046 * the job client to the tasks. 047 * The secrets can be stored just before submission of jobs and read during 048 * the task execution. 049 */ 050 @InterfaceAudience.Public 051 @InterfaceStability.Evolving 052 public class TokenCache { 053 054 private static final Log LOG = LogFactory.getLog(TokenCache.class); 055 056 057 /** 058 * auxiliary method to get user's secret keys.. 059 * @param alias 060 * @return secret key from the storage 061 */ 062 public static byte[] getSecretKey(Credentials credentials, Text alias) { 063 if(credentials == null) 064 return null; 065 return credentials.getSecretKey(alias); 066 } 067 068 /** 069 * Convenience method to obtain delegation tokens from namenodes 070 * corresponding to the paths passed. 071 * @param credentials 072 * @param ps array of paths 073 * @param conf configuration 074 * @throws IOException 075 */ 076 public static void obtainTokensForNamenodes(Credentials credentials, 077 Path[] ps, Configuration conf) throws IOException { 078 if (!UserGroupInformation.isSecurityEnabled()) { 079 return; 080 } 081 obtainTokensForNamenodesInternal(credentials, ps, conf); 082 } 083 084 /** 085 * Remove jobtoken referrals which don't make sense in the context 086 * of the task execution. 087 * 088 * @param conf 089 */ 090 public static void cleanUpTokenReferral(Configuration conf) { 091 conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); 092 } 093 094 static void obtainTokensForNamenodesInternal(Credentials credentials, 095 Path[] ps, Configuration conf) throws IOException { 096 Set<FileSystem> fsSet = new HashSet<FileSystem>(); 097 for(Path p: ps) { 098 fsSet.add(p.getFileSystem(conf)); 099 } 100 for (FileSystem fs : fsSet) { 101 obtainTokensForNamenodesInternal(fs, credentials, conf); 102 } 103 } 104 105 /** 106 * get delegation token for a specific FS 107 * @param fs 108 * @param credentials 109 * @param p 110 * @param conf 111 * @throws IOException 112 */ 113 @SuppressWarnings("deprecation") 114 static void obtainTokensForNamenodesInternal(FileSystem fs, 115 Credentials credentials, Configuration conf) throws IOException { 116 String delegTokenRenewer = Master.getMasterPrincipal(conf); 117 if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { 118 throw new IOException( 119 "Can't get Master Kerberos principal for use as renewer"); 120 } 121 mergeBinaryTokens(credentials, conf); 122 123 String fsName = fs.getCanonicalServiceName(); 124 if (TokenCache.getDelegationToken(credentials, fsName) == null) { 125 List<Token<?>> tokens = 126 fs.getDelegationTokens(delegTokenRenewer, credentials); 127 if (tokens != null) { 128 for (Token<?> token : tokens) { 129 credentials.addToken(token.getService(), token); 130 LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + 131 ";t.service="+token.getService()); 132 } 133 } 134 //Call getDelegationToken as well for now - for FS implementations 135 // which may not have implmented getDelegationTokens (hftp) 136 if (tokens == null || tokens.size() == 0) { 137 Token<?> token = fs.getDelegationToken(delegTokenRenewer); 138 if (token != null) { 139 credentials.addToken(token.getService(), token); 140 LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName 141 + ";t.service=" + token.getService()); 142 } 143 } 144 } 145 } 146 147 private static void mergeBinaryTokens(Credentials creds, Configuration conf) { 148 String binaryTokenFilename = 149 conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); 150 if (binaryTokenFilename != null) { 151 Credentials binary; 152 try { 153 binary = Credentials.readTokenStorageFile( 154 new Path("file:///" + binaryTokenFilename), conf); 155 } catch (IOException e) { 156 throw new RuntimeException(e); 157 } 158 // supplement existing tokens with the tokens in the binary file 159 creds.mergeAll(binary); 160 } 161 } 162 163 /** 164 * file name used on HDFS for generated job token 165 */ 166 @InterfaceAudience.Private 167 public static final String JOB_TOKEN_HDFS_FILE = "jobToken"; 168 169 /** 170 * conf setting for job tokens cache file name 171 */ 172 @InterfaceAudience.Private 173 public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile"; 174 private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken"); 175 176 /** 177 * 178 * @param namenode 179 * @return delegation token 180 */ 181 @InterfaceAudience.Private 182 public static Token<?> getDelegationToken( 183 Credentials credentials, String namenode) { 184 //No fs specific tokens issues by this fs. It may however issue tokens 185 // for other filesystems - which would be keyed by that filesystems name. 186 if (namenode == null) 187 return null; 188 return (Token<?>) credentials.getToken(new Text(namenode)); 189 } 190 191 /** 192 * load job token from a file 193 * @param conf 194 * @throws IOException 195 */ 196 @InterfaceAudience.Private 197 public static Credentials loadTokens(String jobTokenFile, JobConf conf) 198 throws IOException { 199 Path localJobTokenFile = new Path ("file:///" + jobTokenFile); 200 201 Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf); 202 203 if(LOG.isDebugEnabled()) { 204 LOG.debug("Task: Loaded jobTokenFile from: "+ 205 localJobTokenFile.toUri().getPath() 206 +"; num of sec keys = " + ts.numberOfSecretKeys() + 207 " Number of tokens " + ts.numberOfTokens()); 208 } 209 return ts; 210 } 211 /** 212 * store job token 213 * @param t 214 */ 215 @InterfaceAudience.Private 216 public static void setJobToken(Token<? extends TokenIdentifier> t, 217 Credentials credentials) { 218 credentials.addToken(JOB_TOKEN, t); 219 } 220 /** 221 * 222 * @return job token 223 */ 224 @SuppressWarnings("unchecked") 225 @InterfaceAudience.Private 226 public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) { 227 return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN); 228 } 229 }