001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapred; 019 020 import java.io.FileNotFoundException; 021 import java.io.IOException; 022 import java.net.InetSocketAddress; 023 import java.net.URL; 024 import java.security.PrivilegedExceptionAction; 025 import java.util.ArrayList; 026 import java.util.Collection; 027 import java.util.List; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.Text; 035 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; 036 import org.apache.hadoop.mapreduce.Cluster; 037 import org.apache.hadoop.mapreduce.ClusterMetrics; 038 import org.apache.hadoop.mapreduce.Job; 039 import org.apache.hadoop.mapreduce.QueueInfo; 040 import org.apache.hadoop.mapreduce.TaskTrackerInfo; 041 import org.apache.hadoop.mapreduce.TaskType; 042 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 043 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 044 import org.apache.hadoop.mapreduce.tools.CLI; 045 import org.apache.hadoop.mapreduce.util.ConfigUtil; 046 import org.apache.hadoop.security.UserGroupInformation; 047 import org.apache.hadoop.security.token.SecretManager.InvalidToken; 048 import org.apache.hadoop.security.token.Token; 049 import org.apache.hadoop.security.token.TokenRenewer; 050 import org.apache.hadoop.util.Tool; 051 import org.apache.hadoop.util.ToolRunner; 052 053 /** 054 * <code>JobClient</code> is the primary interface for the user-job to interact 055 * with the cluster. 056 * 057 * <code>JobClient</code> provides facilities to submit jobs, track their 058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster 059 * status information etc. 060 * 061 * <p>The job submission process involves: 062 * <ol> 063 * <li> 064 * Checking the input and output specifications of the job. 065 * </li> 066 * <li> 067 * Computing the {@link InputSplit}s for the job. 068 * </li> 069 * <li> 070 * Setup the requisite accounting information for the {@link DistributedCache} 071 * of the job, if necessary. 072 * </li> 073 * <li> 074 * Copying the job's jar and configuration to the map-reduce system directory 075 * on the distributed file-system. 076 * </li> 077 * <li> 078 * Submitting the job to the cluster and optionally monitoring 079 * it's status. 080 * </li> 081 * </ol></p> 082 * 083 * Normally the user creates the application, describes various facets of the 084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 085 * the job and monitor its progress. 086 * 087 * <p>Here is an example on how to use <code>JobClient</code>:</p> 088 * <p><blockquote><pre> 089 * // Create a new JobConf 090 * JobConf job = new JobConf(new Configuration(), MyJob.class); 091 * 092 * // Specify various job-specific parameters 093 * job.setJobName("myjob"); 094 * 095 * job.setInputPath(new Path("in")); 096 * job.setOutputPath(new Path("out")); 097 * 098 * job.setMapperClass(MyJob.MyMapper.class); 099 * job.setReducerClass(MyJob.MyReducer.class); 100 * 101 * // Submit the job, then poll for progress until the job is complete 102 * JobClient.runJob(job); 103 * </pre></blockquote></p> 104 * 105 * <h4 id="JobControl">Job Control</h4> 106 * 107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 108 * which cannot be done via a single map-reduce job. This is fairly easy since 109 * the output of the job, typically, goes to distributed file-system and that 110 * can be used as the input for the next job.</p> 111 * 112 * <p>However, this also means that the onus on ensuring jobs are complete 113 * (success/failure) lies squarely on the clients. In such situations the 114 * various job-control options are: 115 * <ol> 116 * <li> 117 * {@link #runJob(JobConf)} : submits the job and returns only after 118 * the job has completed. 119 * </li> 120 * <li> 121 * {@link #submitJob(JobConf)} : only submits the job, then poll the 122 * returned handle to the {@link RunningJob} to query status and make 123 * scheduling decisions. 124 * </li> 125 * <li> 126 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification 127 * on job-completion, thus avoiding polling. 128 * </li> 129 * </ol></p> 130 * 131 * @see JobConf 132 * @see ClusterStatus 133 * @see Tool 134 * @see DistributedCache 135 */ 136 @InterfaceAudience.Public 137 @InterfaceStability.Stable 138 public class JobClient extends CLI { 139 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 140 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 141 /* notes that get delegation token was called. Again this is hack for oozie 142 * to make sure we add history server delegation tokens to the credentials 143 * for the job. Since the api only allows one delegation token to be returned, 144 * we have to add this hack. 145 */ 146 private boolean getDelegationTokenCalled = false; 147 /* notes the renewer that will renew the delegation token */ 148 private String dtRenewer = null; 149 /* do we need a HS delegation token for this client */ 150 static final String HS_DELEGATION_TOKEN_REQUIRED 151 = "mapreduce.history.server.delegationtoken.required"; 152 static final String HS_DELEGATION_TOKEN_RENEWER 153 = "mapreduce.history.server.delegationtoken.renewer"; 154 155 static{ 156 ConfigUtil.loadResources(); 157 } 158 159 /** 160 * A NetworkedJob is an implementation of RunningJob. It holds 161 * a JobProfile object to provide some info, and interacts with the 162 * remote service to provide certain functionality. 163 */ 164 static class NetworkedJob implements RunningJob { 165 Job job; 166 /** 167 * We store a JobProfile and a timestamp for when we last 168 * acquired the job profile. If the job is null, then we cannot 169 * perform any of the tasks. The job might be null if the cluster 170 * has completely forgotten about the job. (eg, 24 hours after the 171 * job completes.) 172 */ 173 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException { 174 job = Job.getInstance(cluster, status, new JobConf(status.getJobFile())); 175 } 176 177 public NetworkedJob(Job job) throws IOException { 178 this.job = job; 179 } 180 181 public Configuration getConfiguration() { 182 return job.getConfiguration(); 183 } 184 185 /** 186 * An identifier for the job 187 */ 188 public JobID getID() { 189 return JobID.downgrade(job.getJobID()); 190 } 191 192 /** @deprecated This method is deprecated and will be removed. Applications should 193 * rather use {@link #getID()}.*/ 194 @Deprecated 195 public String getJobID() { 196 return getID().toString(); 197 } 198 199 /** 200 * The user-specified job name 201 */ 202 public String getJobName() { 203 return job.getJobName(); 204 } 205 206 /** 207 * The name of the job file 208 */ 209 public String getJobFile() { 210 return job.getJobFile(); 211 } 212 213 /** 214 * A URL where the job's status can be seen 215 */ 216 public String getTrackingURL() { 217 return job.getTrackingURL(); 218 } 219 220 /** 221 * A float between 0.0 and 1.0, indicating the % of map work 222 * completed. 223 */ 224 public float mapProgress() throws IOException { 225 try { 226 return job.mapProgress(); 227 } catch (InterruptedException ie) { 228 throw new IOException(ie); 229 } 230 } 231 232 /** 233 * A float between 0.0 and 1.0, indicating the % of reduce work 234 * completed. 235 */ 236 public float reduceProgress() throws IOException { 237 try { 238 return job.reduceProgress(); 239 } catch (InterruptedException ie) { 240 throw new IOException(ie); 241 } 242 } 243 244 /** 245 * A float between 0.0 and 1.0, indicating the % of cleanup work 246 * completed. 247 */ 248 public float cleanupProgress() throws IOException { 249 try { 250 return job.cleanupProgress(); 251 } catch (InterruptedException ie) { 252 throw new IOException(ie); 253 } 254 } 255 256 /** 257 * A float between 0.0 and 1.0, indicating the % of setup work 258 * completed. 259 */ 260 public float setupProgress() throws IOException { 261 try { 262 return job.setupProgress(); 263 } catch (InterruptedException ie) { 264 throw new IOException(ie); 265 } 266 } 267 268 /** 269 * Returns immediately whether the whole job is done yet or not. 270 */ 271 public synchronized boolean isComplete() throws IOException { 272 try { 273 return job.isComplete(); 274 } catch (InterruptedException ie) { 275 throw new IOException(ie); 276 } 277 } 278 279 /** 280 * True iff job completed successfully. 281 */ 282 public synchronized boolean isSuccessful() throws IOException { 283 try { 284 return job.isSuccessful(); 285 } catch (InterruptedException ie) { 286 throw new IOException(ie); 287 } 288 } 289 290 /** 291 * Blocks until the job is finished 292 */ 293 public void waitForCompletion() throws IOException { 294 try { 295 job.waitForCompletion(false); 296 } catch (InterruptedException ie) { 297 throw new IOException(ie); 298 } catch (ClassNotFoundException ce) { 299 throw new IOException(ce); 300 } 301 } 302 303 /** 304 * Tells the service to get the state of the current job. 305 */ 306 public synchronized int getJobState() throws IOException { 307 try { 308 return job.getJobState().getValue(); 309 } catch (InterruptedException ie) { 310 throw new IOException(ie); 311 } 312 } 313 314 /** 315 * Tells the service to terminate the current job. 316 */ 317 public synchronized void killJob() throws IOException { 318 try { 319 job.killJob(); 320 } catch (InterruptedException ie) { 321 throw new IOException(ie); 322 } 323 } 324 325 326 /** Set the priority of the job. 327 * @param priority new priority of the job. 328 */ 329 public synchronized void setJobPriority(String priority) 330 throws IOException { 331 try { 332 job.setPriority( 333 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority)); 334 } catch (InterruptedException ie) { 335 throw new IOException(ie); 336 } 337 } 338 339 /** 340 * Kill indicated task attempt. 341 * @param taskId the id of the task to kill. 342 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise 343 * it is just killed, w/o affecting job failure status. 344 */ 345 public synchronized void killTask(TaskAttemptID taskId, 346 boolean shouldFail) throws IOException { 347 try { 348 if (shouldFail) { 349 job.failTask(taskId); 350 } else { 351 job.killTask(taskId); 352 } 353 } catch (InterruptedException ie) { 354 throw new IOException(ie); 355 } 356 } 357 358 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ 359 @Deprecated 360 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { 361 killTask(TaskAttemptID.forName(taskId), shouldFail); 362 } 363 364 /** 365 * Fetch task completion events from cluster for this job. 366 */ 367 public synchronized TaskCompletionEvent[] getTaskCompletionEvents( 368 int startFrom) throws IOException { 369 try { 370 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 371 job.getTaskCompletionEvents(startFrom, 10); 372 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length]; 373 for (int i = 0 ; i < acls.length; i++ ) { 374 ret[i] = TaskCompletionEvent.downgrade(acls[i]); 375 } 376 return ret; 377 } catch (InterruptedException ie) { 378 throw new IOException(ie); 379 } 380 } 381 382 /** 383 * Dump stats to screen 384 */ 385 @Override 386 public String toString() { 387 return job.toString(); 388 } 389 390 /** 391 * Returns the counters for this job 392 */ 393 public Counters getCounters() throws IOException { 394 try { 395 Counters result = null; 396 org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); 397 if(temp != null) { 398 result = Counters.downgrade(temp); 399 } 400 return result; 401 } catch (InterruptedException ie) { 402 throw new IOException(ie); 403 } 404 } 405 406 @Override 407 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { 408 try { 409 return job.getTaskDiagnostics(id); 410 } catch (InterruptedException ie) { 411 throw new IOException(ie); 412 } 413 } 414 415 public String getHistoryUrl() throws IOException { 416 try { 417 return job.getHistoryUrl(); 418 } catch (InterruptedException ie) { 419 throw new IOException(ie); 420 } 421 } 422 423 public boolean isRetired() throws IOException { 424 try { 425 return job.isRetired(); 426 } catch (InterruptedException ie) { 427 throw new IOException(ie); 428 } 429 } 430 431 boolean monitorAndPrintJob() throws IOException, InterruptedException { 432 return job.monitorAndPrintJob(); 433 } 434 435 @Override 436 public String getFailureInfo() throws IOException { 437 try { 438 return job.getStatus().getFailureInfo(); 439 } catch (InterruptedException ie) { 440 throw new IOException(ie); 441 } 442 } 443 444 @Override 445 public JobStatus getJobStatus() throws IOException { 446 try { 447 return JobStatus.downgrade(job.getStatus()); 448 } catch (InterruptedException ie) { 449 throw new IOException(ie); 450 } 451 } 452 } 453 454 /** 455 * Ugi of the client. We store this ugi when the client is created and 456 * then make sure that the same ugi is used to run the various protocols. 457 */ 458 UserGroupInformation clientUgi; 459 460 /** 461 * Create a job client. 462 */ 463 public JobClient() { 464 } 465 466 /** 467 * Build a job client with the given {@link JobConf}, and connect to the 468 * default cluster 469 * 470 * @param conf the job configuration. 471 * @throws IOException 472 */ 473 public JobClient(JobConf conf) throws IOException { 474 init(conf); 475 } 476 477 /** 478 * Build a job client with the given {@link Configuration}, 479 * and connect to the default cluster 480 * 481 * @param conf the configuration. 482 * @throws IOException 483 */ 484 public JobClient(Configuration conf) throws IOException { 485 init(new JobConf(conf)); 486 } 487 488 /** 489 * Connect to the default cluster 490 * @param conf the job configuration. 491 * @throws IOException 492 */ 493 public void init(JobConf conf) throws IOException { 494 setConf(conf); 495 cluster = new Cluster(conf); 496 clientUgi = UserGroupInformation.getCurrentUser(); 497 } 498 499 @InterfaceAudience.Private 500 public static class Renewer extends TokenRenewer { 501 502 @Override 503 public boolean handleKind(Text kind) { 504 return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind); 505 } 506 507 @SuppressWarnings("unchecked") 508 @Override 509 public long renew(Token<?> token, Configuration conf 510 ) throws IOException, InterruptedException { 511 return new Cluster(conf). 512 renewDelegationToken((Token<DelegationTokenIdentifier>) token); 513 } 514 515 @SuppressWarnings("unchecked") 516 @Override 517 public void cancel(Token<?> token, Configuration conf 518 ) throws IOException, InterruptedException { 519 new Cluster(conf). 520 cancelDelegationToken((Token<DelegationTokenIdentifier>) token); 521 } 522 523 @Override 524 public boolean isManaged(Token<?> token) throws IOException { 525 return true; 526 } 527 } 528 529 /** 530 * Build a job client, connect to the indicated job tracker. 531 * 532 * @param jobTrackAddr the job tracker to connect to. 533 * @param conf configuration. 534 */ 535 public JobClient(InetSocketAddress jobTrackAddr, 536 Configuration conf) throws IOException { 537 cluster = new Cluster(jobTrackAddr, conf); 538 clientUgi = UserGroupInformation.getCurrentUser(); 539 } 540 541 /** 542 * Close the <code>JobClient</code>. 543 */ 544 public synchronized void close() throws IOException { 545 cluster.close(); 546 } 547 548 /** 549 * Get a filesystem handle. We need this to prepare jobs 550 * for submission to the MapReduce system. 551 * 552 * @return the filesystem handle. 553 */ 554 public synchronized FileSystem getFs() throws IOException { 555 try { 556 return cluster.getFileSystem(); 557 } catch (InterruptedException ie) { 558 throw new IOException(ie); 559 } 560 } 561 562 /** 563 * Get a handle to the Cluster 564 */ 565 public Cluster getClusterHandle() { 566 return cluster; 567 } 568 569 /** 570 * Submit a job to the MR system. 571 * 572 * This returns a handle to the {@link RunningJob} which can be used to track 573 * the running-job. 574 * 575 * @param jobFile the job configuration. 576 * @return a handle to the {@link RunningJob} which can be used to track the 577 * running-job. 578 * @throws FileNotFoundException 579 * @throws InvalidJobConfException 580 * @throws IOException 581 */ 582 public RunningJob submitJob(String jobFile) throws FileNotFoundException, 583 InvalidJobConfException, 584 IOException { 585 // Load in the submitted job details 586 JobConf job = new JobConf(jobFile); 587 return submitJob(job); 588 } 589 590 /** 591 * Submit a job to the MR system. 592 * This returns a handle to the {@link RunningJob} which can be used to track 593 * the running-job. 594 * 595 * @param conf the job configuration. 596 * @return a handle to the {@link RunningJob} which can be used to track the 597 * running-job. 598 * @throws FileNotFoundException 599 * @throws IOException 600 */ 601 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, 602 IOException { 603 try { 604 conf.setBooleanIfUnset("mapred.mapper.new-api", false); 605 conf.setBooleanIfUnset("mapred.reducer.new-api", false); 606 if (getDelegationTokenCalled) { 607 conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); 608 getDelegationTokenCalled = false; 609 conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer); 610 dtRenewer = null; 611 } 612 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { 613 @Override 614 public Job run() throws IOException, ClassNotFoundException, 615 InterruptedException { 616 Job job = Job.getInstance(conf); 617 job.submit(); 618 return job; 619 } 620 }); 621 // update our Cluster instance with the one created by Job for submission 622 // (we can't pass our Cluster instance to Job, since Job wraps the config 623 // instance, and the two configs would then diverge) 624 cluster = job.getCluster(); 625 return new NetworkedJob(job); 626 } catch (InterruptedException ie) { 627 throw new IOException("interrupted", ie); 628 } 629 } 630 631 private Job getJobUsingCluster(final JobID jobid) throws IOException, 632 InterruptedException { 633 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { 634 public Job run() throws IOException, InterruptedException { 635 return cluster.getJob(jobid); 636 } 637 }); 638 } 639 /** 640 * Get an {@link RunningJob} object to track an ongoing job. Returns 641 * null if the id does not correspond to any known job. 642 * 643 * @param jobid the jobid of the job. 644 * @return the {@link RunningJob} handle to track the job, null if the 645 * <code>jobid</code> doesn't correspond to any known job. 646 * @throws IOException 647 */ 648 public RunningJob getJob(final JobID jobid) throws IOException { 649 try { 650 651 Job job = getJobUsingCluster(jobid); 652 if (job != null) { 653 JobStatus status = JobStatus.downgrade(job.getStatus()); 654 if (status != null) { 655 return new NetworkedJob(status, cluster); 656 } 657 } 658 } catch (InterruptedException ie) { 659 throw new IOException(ie); 660 } 661 return null; 662 } 663 664 /**@deprecated Applications should rather use {@link #getJob(JobID)}. 665 */ 666 @Deprecated 667 public RunningJob getJob(String jobid) throws IOException { 668 return getJob(JobID.forName(jobid)); 669 } 670 671 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; 672 673 /** 674 * Get the information of the current state of the map tasks of a job. 675 * 676 * @param jobId the job to query. 677 * @return the list of all of the map tips. 678 * @throws IOException 679 */ 680 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { 681 return getTaskReports(jobId, TaskType.MAP); 682 } 683 684 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 685 IOException { 686 try { 687 Job j = getJobUsingCluster(jobId); 688 if(j == null) { 689 return EMPTY_TASK_REPORTS; 690 } 691 return TaskReport.downgradeArray(j.getTaskReports(type)); 692 } catch (InterruptedException ie) { 693 throw new IOException(ie); 694 } 695 } 696 697 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/ 698 @Deprecated 699 public TaskReport[] getMapTaskReports(String jobId) throws IOException { 700 return getMapTaskReports(JobID.forName(jobId)); 701 } 702 703 /** 704 * Get the information of the current state of the reduce tasks of a job. 705 * 706 * @param jobId the job to query. 707 * @return the list of all of the reduce tips. 708 * @throws IOException 709 */ 710 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException { 711 return getTaskReports(jobId, TaskType.REDUCE); 712 } 713 714 /** 715 * Get the information of the current state of the cleanup tasks of a job. 716 * 717 * @param jobId the job to query. 718 * @return the list of all of the cleanup tips. 719 * @throws IOException 720 */ 721 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException { 722 return getTaskReports(jobId, TaskType.JOB_CLEANUP); 723 } 724 725 /** 726 * Get the information of the current state of the setup tasks of a job. 727 * 728 * @param jobId the job to query. 729 * @return the list of all of the setup tips. 730 * @throws IOException 731 */ 732 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException { 733 return getTaskReports(jobId, TaskType.JOB_SETUP); 734 } 735 736 737 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/ 738 @Deprecated 739 public TaskReport[] getReduceTaskReports(String jobId) throws IOException { 740 return getReduceTaskReports(JobID.forName(jobId)); 741 } 742 743 /** 744 * Display the information about a job's tasks, of a particular type and 745 * in a particular state 746 * 747 * @param jobId the ID of the job 748 * @param type the type of the task (map/reduce/setup/cleanup) 749 * @param state the state of the task 750 * (pending/running/completed/failed/killed) 751 */ 752 public void displayTasks(final JobID jobId, String type, String state) 753 throws IOException { 754 try { 755 Job job = getJobUsingCluster(jobId); 756 super.displayTasks(job, type, state); 757 } catch (InterruptedException ie) { 758 throw new IOException(ie); 759 } 760 } 761 762 /** 763 * Get status information about the Map-Reduce cluster. 764 * 765 * @return the status information about the Map-Reduce cluster as an object 766 * of {@link ClusterStatus}. 767 * @throws IOException 768 */ 769 public ClusterStatus getClusterStatus() throws IOException { 770 try { 771 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 772 public ClusterStatus run() throws IOException, InterruptedException { 773 ClusterMetrics metrics = cluster.getClusterStatus(); 774 return new ClusterStatus(metrics.getTaskTrackerCount(), 775 metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(), 776 metrics.getOccupiedMapSlots(), 777 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 778 metrics.getReduceSlotCapacity(), 779 cluster.getJobTrackerStatus(), 780 metrics.getDecommissionedTaskTrackerCount()); 781 } 782 }); 783 } 784 catch (InterruptedException ie) { 785 throw new IOException(ie); 786 } 787 } 788 789 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) { 790 Collection<String> list = new ArrayList<String>(); 791 for (TaskTrackerInfo info: objs) { 792 list.add(info.getTaskTrackerName()); 793 } 794 return list; 795 } 796 797 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) { 798 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>(); 799 for (TaskTrackerInfo info: objs) { 800 BlackListInfo binfo = new BlackListInfo(); 801 binfo.setTrackerName(info.getTaskTrackerName()); 802 binfo.setReasonForBlackListing(info.getReasonForBlacklist()); 803 binfo.setBlackListReport(info.getBlacklistReport()); 804 list.add(binfo); 805 } 806 return list; 807 } 808 809 /** 810 * Get status information about the Map-Reduce cluster. 811 * 812 * @param detailed if true then get a detailed status including the 813 * tracker names 814 * @return the status information about the Map-Reduce cluster as an object 815 * of {@link ClusterStatus}. 816 * @throws IOException 817 */ 818 public ClusterStatus getClusterStatus(boolean detailed) throws IOException { 819 try { 820 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 821 public ClusterStatus run() throws IOException, InterruptedException { 822 ClusterMetrics metrics = cluster.getClusterStatus(); 823 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), 824 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), 825 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 826 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 827 metrics.getReduceSlotCapacity(), 828 cluster.getJobTrackerStatus()); 829 } 830 }); 831 } catch (InterruptedException ie) { 832 throw new IOException(ie); 833 } 834 } 835 836 837 /** 838 * Get the jobs that are not completed and not failed. 839 * 840 * @return array of {@link JobStatus} for the running/to-be-run jobs. 841 * @throws IOException 842 */ 843 public JobStatus[] jobsToComplete() throws IOException { 844 List<JobStatus> stats = new ArrayList<JobStatus>(); 845 for (JobStatus stat : getAllJobs()) { 846 if (!stat.isJobComplete()) { 847 stats.add(stat); 848 } 849 } 850 return stats.toArray(new JobStatus[0]); 851 } 852 853 /** 854 * Get the jobs that are submitted. 855 * 856 * @return array of {@link JobStatus} for the submitted jobs. 857 * @throws IOException 858 */ 859 public JobStatus[] getAllJobs() throws IOException { 860 try { 861 org.apache.hadoop.mapreduce.JobStatus[] jobs = 862 clientUgi.doAs(new PrivilegedExceptionAction< 863 org.apache.hadoop.mapreduce.JobStatus[]> () { 864 public org.apache.hadoop.mapreduce.JobStatus[] run() 865 throws IOException, InterruptedException { 866 return cluster.getAllJobStatuses(); 867 } 868 }); 869 JobStatus[] stats = new JobStatus[jobs.length]; 870 for (int i = 0; i < jobs.length; i++) { 871 stats[i] = JobStatus.downgrade(jobs[i]); 872 } 873 return stats; 874 } catch (InterruptedException ie) { 875 throw new IOException(ie); 876 } 877 } 878 879 /** 880 * Utility that submits a job, then polls for progress until the job is 881 * complete. 882 * 883 * @param job the job configuration. 884 * @throws IOException if the job fails 885 */ 886 public static RunningJob runJob(JobConf job) throws IOException { 887 JobClient jc = new JobClient(job); 888 RunningJob rj = jc.submitJob(job); 889 try { 890 if (!jc.monitorAndPrintJob(job, rj)) { 891 throw new IOException("Job failed!"); 892 } 893 } catch (InterruptedException ie) { 894 Thread.currentThread().interrupt(); 895 } 896 return rj; 897 } 898 899 /** 900 * Monitor a job and print status in real-time as progress is made and tasks 901 * fail. 902 * @param conf the job's configuration 903 * @param job the job to track 904 * @return true if the job succeeded 905 * @throws IOException if communication to the JobTracker fails 906 */ 907 public boolean monitorAndPrintJob(JobConf conf, 908 RunningJob job 909 ) throws IOException, InterruptedException { 910 return ((NetworkedJob)job).monitorAndPrintJob(); 911 } 912 913 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 914 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 915 } 916 917 static Configuration getConfiguration(String jobTrackerSpec) 918 { 919 Configuration conf = new Configuration(); 920 if (jobTrackerSpec != null) { 921 if (jobTrackerSpec.indexOf(":") >= 0) { 922 conf.set("mapred.job.tracker", jobTrackerSpec); 923 } else { 924 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; 925 URL validate = conf.getResource(classpathFile); 926 if (validate == null) { 927 throw new RuntimeException(classpathFile + " not found on CLASSPATH"); 928 } 929 conf.addResource(classpathFile); 930 } 931 } 932 return conf; 933 } 934 935 /** 936 * Sets the output filter for tasks. only those tasks are printed whose 937 * output matches the filter. 938 * @param newValue task filter. 939 */ 940 @Deprecated 941 public void setTaskOutputFilter(TaskStatusFilter newValue){ 942 this.taskOutputFilter = newValue; 943 } 944 945 /** 946 * Get the task output filter out of the JobConf. 947 * 948 * @param job the JobConf to examine. 949 * @return the filter level. 950 */ 951 public static TaskStatusFilter getTaskOutputFilter(JobConf job) { 952 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 953 "FAILED")); 954 } 955 956 /** 957 * Modify the JobConf to set the task output filter. 958 * 959 * @param job the JobConf to modify. 960 * @param newValue the value to set. 961 */ 962 public static void setTaskOutputFilter(JobConf job, 963 TaskStatusFilter newValue) { 964 job.set("jobclient.output.filter", newValue.toString()); 965 } 966 967 /** 968 * Returns task output filter. 969 * @return task filter. 970 */ 971 @Deprecated 972 public TaskStatusFilter getTaskOutputFilter(){ 973 return this.taskOutputFilter; 974 } 975 976 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, 977 String counterGroupName, String counterName) throws IOException { 978 Counters counters = Counters.downgrade(cntrs); 979 return counters.findCounter(counterGroupName, counterName).getValue(); 980 } 981 982 /** 983 * Get status information about the max available Maps in the cluster. 984 * 985 * @return the max available Maps in the cluster 986 * @throws IOException 987 */ 988 public int getDefaultMaps() throws IOException { 989 try { 990 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 991 @Override 992 public Integer run() throws IOException, InterruptedException { 993 return cluster.getClusterStatus().getMapSlotCapacity(); 994 } 995 }); 996 } catch (InterruptedException ie) { 997 throw new IOException(ie); 998 } 999 } 1000 1001 /** 1002 * Get status information about the max available Reduces in the cluster. 1003 * 1004 * @return the max available Reduces in the cluster 1005 * @throws IOException 1006 */ 1007 public int getDefaultReduces() throws IOException { 1008 try { 1009 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 1010 @Override 1011 public Integer run() throws IOException, InterruptedException { 1012 return cluster.getClusterStatus().getReduceSlotCapacity(); 1013 } 1014 }); 1015 } catch (InterruptedException ie) { 1016 throw new IOException(ie); 1017 } 1018 } 1019 1020 /** 1021 * Grab the jobtracker system directory path where job-specific files are to be placed. 1022 * 1023 * @return the system directory where job-specific files are to be placed. 1024 */ 1025 public Path getSystemDir() { 1026 try { 1027 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1028 @Override 1029 public Path run() throws IOException, InterruptedException { 1030 return cluster.getSystemDir(); 1031 } 1032 }); 1033 } catch (IOException ioe) { 1034 return null; 1035 } catch (InterruptedException ie) { 1036 return null; 1037 } 1038 } 1039 1040 private JobQueueInfo getJobQueueInfo(QueueInfo queue) { 1041 JobQueueInfo ret = new JobQueueInfo(queue); 1042 // make sure to convert any children 1043 if (queue.getQueueChildren().size() > 0) { 1044 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue 1045 .getQueueChildren().size()); 1046 for (QueueInfo child : queue.getQueueChildren()) { 1047 childQueues.add(getJobQueueInfo(child)); 1048 } 1049 ret.setChildren(childQueues); 1050 } 1051 return ret; 1052 } 1053 1054 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 1055 throws IOException { 1056 JobQueueInfo[] ret = new JobQueueInfo[queues.length]; 1057 for (int i = 0; i < queues.length; i++) { 1058 ret[i] = getJobQueueInfo(queues[i]); 1059 } 1060 return ret; 1061 } 1062 1063 /** 1064 * Returns an array of queue information objects about root level queues 1065 * configured 1066 * 1067 * @return the array of root level JobQueueInfo objects 1068 * @throws IOException 1069 */ 1070 public JobQueueInfo[] getRootQueues() throws IOException { 1071 try { 1072 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1073 public JobQueueInfo[] run() throws IOException, InterruptedException { 1074 return getJobQueueInfoArray(cluster.getRootQueues()); 1075 } 1076 }); 1077 } catch (InterruptedException ie) { 1078 throw new IOException(ie); 1079 } 1080 } 1081 1082 /** 1083 * Returns an array of queue information objects about immediate children 1084 * of queue queueName. 1085 * 1086 * @param queueName 1087 * @return the array of immediate children JobQueueInfo objects 1088 * @throws IOException 1089 */ 1090 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { 1091 try { 1092 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1093 public JobQueueInfo[] run() throws IOException, InterruptedException { 1094 return getJobQueueInfoArray(cluster.getChildQueues(queueName)); 1095 } 1096 }); 1097 } catch (InterruptedException ie) { 1098 throw new IOException(ie); 1099 } 1100 } 1101 1102 /** 1103 * Return an array of queue information objects about all the Job Queues 1104 * configured. 1105 * 1106 * @return Array of JobQueueInfo objects 1107 * @throws IOException 1108 */ 1109 public JobQueueInfo[] getQueues() throws IOException { 1110 try { 1111 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1112 public JobQueueInfo[] run() throws IOException, InterruptedException { 1113 return getJobQueueInfoArray(cluster.getQueues()); 1114 } 1115 }); 1116 } catch (InterruptedException ie) { 1117 throw new IOException(ie); 1118 } 1119 } 1120 1121 /** 1122 * Gets all the jobs which were added to particular Job Queue 1123 * 1124 * @param queueName name of the Job Queue 1125 * @return Array of jobs present in the job queue 1126 * @throws IOException 1127 */ 1128 1129 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { 1130 try { 1131 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() { 1132 @Override 1133 public QueueInfo run() throws IOException, InterruptedException { 1134 return cluster.getQueue(queueName); 1135 } 1136 }); 1137 if (queue == null) { 1138 return null; 1139 } 1140 org.apache.hadoop.mapreduce.JobStatus[] stats = 1141 queue.getJobStatuses(); 1142 JobStatus[] ret = new JobStatus[stats.length]; 1143 for (int i = 0 ; i < stats.length; i++ ) { 1144 ret[i] = JobStatus.downgrade(stats[i]); 1145 } 1146 return ret; 1147 } catch (InterruptedException ie) { 1148 throw new IOException(ie); 1149 } 1150 } 1151 1152 /** 1153 * Gets the queue information associated to a particular Job Queue 1154 * 1155 * @param queueName name of the job queue. 1156 * @return Queue information associated to particular queue. 1157 * @throws IOException 1158 */ 1159 public JobQueueInfo getQueueInfo(final String queueName) throws IOException { 1160 try { 1161 QueueInfo queueInfo = clientUgi.doAs(new 1162 PrivilegedExceptionAction<QueueInfo>() { 1163 public QueueInfo run() throws IOException, InterruptedException { 1164 return cluster.getQueue(queueName); 1165 } 1166 }); 1167 if (queueInfo != null) { 1168 return new JobQueueInfo(queueInfo); 1169 } 1170 return null; 1171 } catch (InterruptedException ie) { 1172 throw new IOException(ie); 1173 } 1174 } 1175 1176 /** 1177 * Gets the Queue ACLs for current user 1178 * @return array of QueueAclsInfo object for current user. 1179 * @throws IOException 1180 */ 1181 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { 1182 try { 1183 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 1184 clientUgi.doAs(new 1185 PrivilegedExceptionAction 1186 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() { 1187 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 1188 throws IOException, InterruptedException { 1189 return cluster.getQueueAclsForCurrentUser(); 1190 } 1191 }); 1192 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; 1193 for (int i = 0 ; i < acls.length; i++ ) { 1194 ret[i] = QueueAclsInfo.downgrade(acls[i]); 1195 } 1196 return ret; 1197 } catch (InterruptedException ie) { 1198 throw new IOException(ie); 1199 } 1200 } 1201 1202 /** 1203 * Get a delegation token for the user from the JobTracker. 1204 * @param renewer the user who can renew the token 1205 * @return the new token 1206 * @throws IOException 1207 */ 1208 public Token<DelegationTokenIdentifier> 1209 getDelegationToken(final Text renewer) throws IOException, InterruptedException { 1210 getDelegationTokenCalled = true; 1211 dtRenewer = renewer.toString(); 1212 return clientUgi.doAs(new 1213 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { 1214 public Token<DelegationTokenIdentifier> run() throws IOException, 1215 InterruptedException { 1216 return cluster.getDelegationToken(renewer); 1217 } 1218 }); 1219 } 1220 1221 /** 1222 * Renew a delegation token 1223 * @param token the token to renew 1224 * @return true if the renewal went well 1225 * @throws InvalidToken 1226 * @throws IOException 1227 * @deprecated Use {@link Token#renew} instead 1228 */ 1229 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 1230 ) throws InvalidToken, IOException, 1231 InterruptedException { 1232 return token.renew(getConf()); 1233 } 1234 1235 /** 1236 * Cancel a delegation token from the JobTracker 1237 * @param token the token to cancel 1238 * @throws IOException 1239 * @deprecated Use {@link Token#cancel} instead 1240 */ 1241 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 1242 ) throws InvalidToken, IOException, 1243 InterruptedException { 1244 token.cancel(getConf()); 1245 } 1246 1247 /** 1248 */ 1249 public static void main(String argv[]) throws Exception { 1250 int res = ToolRunner.run(new JobClient(), argv); 1251 System.exit(res); 1252 } 1253 } 1254