001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce; 020 021 import java.io.IOException; 022 import java.net.InetSocketAddress; 023 import java.security.PrivilegedExceptionAction; 024 import java.util.ArrayList; 025 import java.util.List; 026 import java.util.ServiceLoader; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceStability; 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.fs.Path; 035 import org.apache.hadoop.io.Text; 036 import org.apache.hadoop.ipc.RemoteException; 037 import org.apache.hadoop.mapred.JobConf; 038 import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 039 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 040 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 041 import org.apache.hadoop.mapreduce.util.ConfigUtil; 042 import org.apache.hadoop.mapreduce.v2.LogParams; 043 import org.apache.hadoop.security.AccessControlException; 044 import org.apache.hadoop.security.UserGroupInformation; 045 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 046 import org.apache.hadoop.security.token.Token; 047 048 /** 049 * Provides a way to access information about the map/reduce cluster. 050 */ 051 @InterfaceAudience.Public 052 @InterfaceStability.Evolving 053 public class Cluster { 054 055 @InterfaceStability.Evolving 056 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 057 058 private ClientProtocolProvider clientProtocolProvider; 059 private ClientProtocol client; 060 private UserGroupInformation ugi; 061 private Configuration conf; 062 private FileSystem fs = null; 063 private Path sysDir = null; 064 private Path stagingAreaDir = null; 065 private Path jobHistoryDir = null; 066 private static final Log LOG = LogFactory.getLog(Cluster.class); 067 068 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = 069 ServiceLoader.load(ClientProtocolProvider.class); 070 071 static { 072 ConfigUtil.loadResources(); 073 } 074 075 public Cluster(Configuration conf) throws IOException { 076 this(null, conf); 077 } 078 079 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 080 throws IOException { 081 this.conf = conf; 082 this.ugi = UserGroupInformation.getCurrentUser(); 083 initialize(jobTrackAddr, conf); 084 } 085 086 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 087 throws IOException { 088 089 synchronized (frameworkLoader) { 090 for (ClientProtocolProvider provider : frameworkLoader) { 091 LOG.debug("Trying ClientProtocolProvider : " 092 + provider.getClass().getName()); 093 ClientProtocol clientProtocol = null; 094 try { 095 if (jobTrackAddr == null) { 096 clientProtocol = provider.create(conf); 097 } else { 098 clientProtocol = provider.create(jobTrackAddr, conf); 099 } 100 101 if (clientProtocol != null) { 102 clientProtocolProvider = provider; 103 client = clientProtocol; 104 LOG.debug("Picked " + provider.getClass().getName() 105 + " as the ClientProtocolProvider"); 106 break; 107 } 108 else { 109 LOG.debug("Cannot pick " + provider.getClass().getName() 110 + " as the ClientProtocolProvider - returned null protocol"); 111 } 112 } 113 catch (Exception e) { 114 LOG.info("Failed to use " + provider.getClass().getName() 115 + " due to error: " + e.getMessage()); 116 } 117 } 118 } 119 120 if (null == clientProtocolProvider || null == client) { 121 throw new IOException( 122 "Cannot initialize Cluster. Please check your configuration for " 123 + MRConfig.FRAMEWORK_NAME 124 + " and the correspond server addresses."); 125 } 126 } 127 128 ClientProtocol getClient() { 129 return client; 130 } 131 132 Configuration getConf() { 133 return conf; 134 } 135 136 /** 137 * Close the <code>Cluster</code>. 138 */ 139 public synchronized void close() throws IOException { 140 clientProtocolProvider.close(client); 141 } 142 143 private Job[] getJobs(JobStatus[] stats) throws IOException { 144 List<Job> jobs = new ArrayList<Job>(); 145 for (JobStatus stat : stats) { 146 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 147 } 148 return jobs.toArray(new Job[0]); 149 } 150 151 /** 152 * Get the file system where job-specific files are stored 153 * 154 * @return object of FileSystem 155 * @throws IOException 156 * @throws InterruptedException 157 */ 158 public synchronized FileSystem getFileSystem() 159 throws IOException, InterruptedException { 160 if (this.fs == null) { 161 try { 162 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 163 public FileSystem run() throws IOException, InterruptedException { 164 final Path sysDir = new Path(client.getSystemDir()); 165 return sysDir.getFileSystem(getConf()); 166 } 167 }); 168 } catch (InterruptedException e) { 169 throw new RuntimeException(e); 170 } 171 } 172 return fs; 173 } 174 175 /** 176 * Get job corresponding to jobid. 177 * 178 * @param jobId 179 * @return object of {@link Job} 180 * @throws IOException 181 * @throws InterruptedException 182 */ 183 public Job getJob(JobID jobId) throws IOException, InterruptedException { 184 JobStatus status = client.getJobStatus(jobId); 185 if (status != null) { 186 return Job.getInstance(this, status, new JobConf(status.getJobFile())); 187 } 188 return null; 189 } 190 191 /** 192 * Get all the queues in cluster. 193 * 194 * @return array of {@link QueueInfo} 195 * @throws IOException 196 * @throws InterruptedException 197 */ 198 public QueueInfo[] getQueues() throws IOException, InterruptedException { 199 return client.getQueues(); 200 } 201 202 /** 203 * Get queue information for the specified name. 204 * 205 * @param name queuename 206 * @return object of {@link QueueInfo} 207 * @throws IOException 208 * @throws InterruptedException 209 */ 210 public QueueInfo getQueue(String name) 211 throws IOException, InterruptedException { 212 return client.getQueue(name); 213 } 214 215 /** 216 * Get log parameters for the specified jobID or taskAttemptID 217 * @param jobID the job id. 218 * @param taskAttemptID the task attempt id. Optional. 219 * @return the LogParams 220 * @throws IOException 221 * @throws InterruptedException 222 */ 223 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 224 throws IOException, InterruptedException { 225 return client.getLogFileParams(jobID, taskAttemptID); 226 } 227 228 /** 229 * Get current cluster status. 230 * 231 * @return object of {@link ClusterMetrics} 232 * @throws IOException 233 * @throws InterruptedException 234 */ 235 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 236 return client.getClusterMetrics(); 237 } 238 239 /** 240 * Get all active trackers in the cluster. 241 * 242 * @return array of {@link TaskTrackerInfo} 243 * @throws IOException 244 * @throws InterruptedException 245 */ 246 public TaskTrackerInfo[] getActiveTaskTrackers() 247 throws IOException, InterruptedException { 248 return client.getActiveTrackers(); 249 } 250 251 /** 252 * Get blacklisted trackers. 253 * 254 * @return array of {@link TaskTrackerInfo} 255 * @throws IOException 256 * @throws InterruptedException 257 */ 258 public TaskTrackerInfo[] getBlackListedTaskTrackers() 259 throws IOException, InterruptedException { 260 return client.getBlacklistedTrackers(); 261 } 262 263 /** 264 * Get all the jobs in cluster. 265 * 266 * @return array of {@link Job} 267 * @throws IOException 268 * @throws InterruptedException 269 * @deprecated Use {@link #getAllJobStatuses()} instead. 270 */ 271 @Deprecated 272 public Job[] getAllJobs() throws IOException, InterruptedException { 273 return getJobs(client.getAllJobs()); 274 } 275 276 /** 277 * Get job status for all jobs in the cluster. 278 * @return job status for all jobs in cluster 279 * @throws IOException 280 * @throws InterruptedException 281 */ 282 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 283 return client.getAllJobs(); 284 } 285 286 /** 287 * Grab the jobtracker system directory path where 288 * job-specific files will be placed. 289 * 290 * @return the system directory where job-specific files are to be placed. 291 */ 292 public Path getSystemDir() throws IOException, InterruptedException { 293 if (sysDir == null) { 294 sysDir = new Path(client.getSystemDir()); 295 } 296 return sysDir; 297 } 298 299 /** 300 * Grab the jobtracker's view of the staging directory path where 301 * job-specific files will be placed. 302 * 303 * @return the staging directory where job-specific files are to be placed. 304 */ 305 public Path getStagingAreaDir() throws IOException, InterruptedException { 306 if (stagingAreaDir == null) { 307 stagingAreaDir = new Path(client.getStagingAreaDir()); 308 } 309 return stagingAreaDir; 310 } 311 312 /** 313 * Get the job history file path for a given job id. The job history file at 314 * this path may or may not be existing depending on the job completion state. 315 * The file is present only for the completed jobs. 316 * @param jobId the JobID of the job submitted by the current user. 317 * @return the file path of the job history file 318 * @throws IOException 319 * @throws InterruptedException 320 */ 321 public String getJobHistoryUrl(JobID jobId) throws IOException, 322 InterruptedException { 323 if (jobHistoryDir == null) { 324 jobHistoryDir = new Path(client.getJobHistoryDir()); 325 } 326 return new Path(jobHistoryDir, jobId.toString() + "_" 327 + ugi.getShortUserName()).toString(); 328 } 329 330 /** 331 * Gets the Queue ACLs for current user 332 * @return array of QueueAclsInfo object for current user. 333 * @throws IOException 334 */ 335 public QueueAclsInfo[] getQueueAclsForCurrentUser() 336 throws IOException, InterruptedException { 337 return client.getQueueAclsForCurrentUser(); 338 } 339 340 /** 341 * Gets the root level queues. 342 * @return array of JobQueueInfo object. 343 * @throws IOException 344 */ 345 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 346 return client.getRootQueues(); 347 } 348 349 /** 350 * Returns immediate children of queueName. 351 * @param queueName 352 * @return array of JobQueueInfo which are children of queueName 353 * @throws IOException 354 */ 355 public QueueInfo[] getChildQueues(String queueName) 356 throws IOException, InterruptedException { 357 return client.getChildQueues(queueName); 358 } 359 360 /** 361 * Get the JobTracker's status. 362 * 363 * @return {@link JobTrackerStatus} of the JobTracker 364 * @throws IOException 365 * @throws InterruptedException 366 */ 367 public JobTrackerStatus getJobTrackerStatus() throws IOException, 368 InterruptedException { 369 return client.getJobTrackerStatus(); 370 } 371 372 /** 373 * Get the tasktracker expiry interval for the cluster 374 * @return the expiry interval in msec 375 */ 376 public long getTaskTrackerExpiryInterval() throws IOException, 377 InterruptedException { 378 return client.getTaskTrackerExpiryInterval(); 379 } 380 381 /** 382 * Get a delegation token for the user from the JobTracker. 383 * @param renewer the user who can renew the token 384 * @return the new token 385 * @throws IOException 386 */ 387 public Token<DelegationTokenIdentifier> 388 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 389 // client has already set the service 390 return client.getDelegationToken(renewer); 391 } 392 393 /** 394 * Renew a delegation token 395 * @param token the token to renew 396 * @return the new expiration time 397 * @throws InvalidToken 398 * @throws IOException 399 * @deprecated Use {@link Token#renew} instead 400 */ 401 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 402 ) throws InvalidToken, IOException, 403 InterruptedException { 404 try { 405 return client.renewDelegationToken(token); 406 } catch (RemoteException re) { 407 throw re.unwrapRemoteException(InvalidToken.class, 408 AccessControlException.class); 409 } 410 } 411 412 /** 413 * Cancel a delegation token from the JobTracker 414 * @param token the token to cancel 415 * @throws IOException 416 * @deprecated Use {@link Token#cancel} instead 417 */ 418 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 419 ) throws IOException, 420 InterruptedException { 421 try { 422 client.cancelDelegationToken(token); 423 } catch (RemoteException re) { 424 throw re.unwrapRemoteException(InvalidToken.class, 425 AccessControlException.class); 426 } 427 } 428 429 }