001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.security;
020    
021    import java.io.IOException;
022    import java.util.HashSet;
023    import java.util.List;
024    import java.util.Set;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.io.Text;
034    import org.apache.hadoop.mapred.JobConf;
035    import org.apache.hadoop.mapred.Master;
036    import org.apache.hadoop.mapreduce.MRJobConfig;
037    import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
038    import org.apache.hadoop.security.Credentials;
039    import org.apache.hadoop.security.UserGroupInformation;
040    import org.apache.hadoop.security.token.Token;
041    import org.apache.hadoop.security.token.TokenIdentifier;
042    
043    
044    /**
045     * This class provides user facing APIs for transferring secrets from
046     * the job client to the tasks.
047     * The secrets can be stored just before submission of jobs and read during
048     * the task execution.  
049     */
050    @InterfaceAudience.Public
051    @InterfaceStability.Evolving
052    public class TokenCache {
053      
054      private static final Log LOG = LogFactory.getLog(TokenCache.class);
055    
056      
057      /**
058       * auxiliary method to get user's secret keys..
059       * @param alias
060       * @return secret key from the storage
061       */
062      public static byte[] getSecretKey(Credentials credentials, Text alias) {
063        if(credentials == null)
064          return null;
065        return credentials.getSecretKey(alias);
066      }
067      
068      /**
069       * Convenience method to obtain delegation tokens from namenodes 
070       * corresponding to the paths passed.
071       * @param credentials
072       * @param ps array of paths
073       * @param conf configuration
074       * @throws IOException
075       */
076      public static void obtainTokensForNamenodes(Credentials credentials,
077          Path[] ps, Configuration conf) throws IOException {
078        if (!UserGroupInformation.isSecurityEnabled()) {
079          return;
080        }
081        obtainTokensForNamenodesInternal(credentials, ps, conf);
082      }
083    
084      /**
085       * Remove jobtoken referrals which don't make sense in the context
086       * of the task execution.
087       *
088       * @param conf
089       */
090      public static void cleanUpTokenReferral(Configuration conf) {
091        conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
092      }
093    
094      static void obtainTokensForNamenodesInternal(Credentials credentials,
095          Path[] ps, Configuration conf) throws IOException {
096        Set<FileSystem> fsSet = new HashSet<FileSystem>();
097        for(Path p: ps) {
098          fsSet.add(p.getFileSystem(conf));
099        }
100        for (FileSystem fs : fsSet) {
101          obtainTokensForNamenodesInternal(fs, credentials, conf);
102        }
103      }
104    
105      /**
106       * get delegation token for a specific FS
107       * @param fs
108       * @param credentials
109       * @param p
110       * @param conf
111       * @throws IOException
112       */
113      @SuppressWarnings("deprecation")
114      static void obtainTokensForNamenodesInternal(FileSystem fs, 
115          Credentials credentials, Configuration conf) throws IOException {
116        String delegTokenRenewer = Master.getMasterPrincipal(conf);
117        if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
118          throw new IOException(
119              "Can't get Master Kerberos principal for use as renewer");
120        }
121        mergeBinaryTokens(credentials, conf);
122    
123        String fsName = fs.getCanonicalServiceName();
124        if (TokenCache.getDelegationToken(credentials, fsName) == null) {
125          List<Token<?>> tokens =
126              fs.getDelegationTokens(delegTokenRenewer, credentials);
127          if (tokens != null) {
128            for (Token<?> token : tokens) {
129              credentials.addToken(token.getService(), token);
130              LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + 
131                  ";t.service="+token.getService());
132            }
133          }
134          //Call getDelegationToken as well for now - for FS implementations
135          // which may not have implmented getDelegationTokens (hftp)
136          if (tokens == null || tokens.size() == 0) {
137            Token<?> token = fs.getDelegationToken(delegTokenRenewer);
138            if (token != null) {
139              credentials.addToken(token.getService(), token);
140              LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
141                  + ";t.service=" + token.getService());
142            }
143          }
144        }
145      }
146    
147      private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
148        String binaryTokenFilename =
149            conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
150        if (binaryTokenFilename != null) {
151          Credentials binary;
152          try {
153            binary = Credentials.readTokenStorageFile(
154                new Path("file:///" +  binaryTokenFilename), conf);
155          } catch (IOException e) {
156            throw new RuntimeException(e);
157          }
158          // supplement existing tokens with the tokens in the binary file
159          creds.mergeAll(binary);
160        }
161      }
162      
163      /**
164       * file name used on HDFS for generated job token
165       */
166      @InterfaceAudience.Private
167      public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
168    
169      /**
170       * conf setting for job tokens cache file name
171       */
172      @InterfaceAudience.Private
173      public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
174      private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
175      
176      /**
177       * 
178       * @param namenode
179       * @return delegation token
180       */
181      @InterfaceAudience.Private
182      public static Token<?> getDelegationToken(
183          Credentials credentials, String namenode) {
184        //No fs specific tokens issues by this fs. It may however issue tokens
185        // for other filesystems - which would be keyed by that filesystems name.
186        if (namenode == null)  
187          return null;
188        return (Token<?>) credentials.getToken(new Text(namenode));
189      }
190    
191      /**
192       * load job token from a file
193       * @param conf
194       * @throws IOException
195       */
196      @InterfaceAudience.Private
197      public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
198      throws IOException {
199        Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
200    
201        Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
202    
203        if(LOG.isDebugEnabled()) {
204          LOG.debug("Task: Loaded jobTokenFile from: "+
205              localJobTokenFile.toUri().getPath() 
206              +"; num of sec keys  = " + ts.numberOfSecretKeys() +
207              " Number of tokens " +  ts.numberOfTokens());
208        }
209        return ts;
210      }
211      /**
212       * store job token
213       * @param t
214       */
215      @InterfaceAudience.Private
216      public static void setJobToken(Token<? extends TokenIdentifier> t, 
217          Credentials credentials) {
218        credentials.addToken(JOB_TOKEN, t);
219      }
220      /**
221       * 
222       * @return job token
223       */
224      @SuppressWarnings("unchecked")
225      @InterfaceAudience.Private
226      public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
227        return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
228      }
229    }