001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce; 020 021 import java.io.IOException; 022 import java.net.URI; 023 import java.security.PrivilegedExceptionAction; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.classification.InterfaceAudience.Private; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.conf.Configuration.IntegerRanges; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.io.RawComparator; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 037 import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 038 import org.apache.hadoop.mapreduce.task.JobContextImpl; 039 import org.apache.hadoop.mapreduce.util.ConfigUtil; 040 import org.apache.hadoop.util.StringUtils; 041 042 /** 043 * The job submitter's view of the Job. 044 * 045 * <p>It allows the user to configure the 046 * job, submit it, control its execution, and query the state. The set methods 047 * only work until the job is submitted, afterwards they will throw an 048 * IllegalStateException. </p> 049 * 050 * <p> 051 * Normally the user creates the application, describes various facets of the 052 * job via {@link Job} and then submits the job and monitor its progress.</p> 053 * 054 * <p>Here is an example on how to submit a job:</p> 055 * <p><blockquote><pre> 056 * // Create a new Job 057 * Job job = new Job(new Configuration()); 058 * job.setJarByClass(MyJob.class); 059 * 060 * // Specify various job-specific parameters 061 * job.setJobName("myjob"); 062 * 063 * job.setInputPath(new Path("in")); 064 * job.setOutputPath(new Path("out")); 065 * 066 * job.setMapperClass(MyJob.MyMapper.class); 067 * job.setReducerClass(MyJob.MyReducer.class); 068 * 069 * // Submit the job, then poll for progress until the job is complete 070 * job.waitForCompletion(true); 071 * </pre></blockquote></p> 072 * 073 * 074 */ 075 @InterfaceAudience.Public 076 @InterfaceStability.Evolving 077 public class Job extends JobContextImpl implements JobContext { 078 private static final Log LOG = LogFactory.getLog(Job.class); 079 080 @InterfaceStability.Evolving 081 public static enum JobState {DEFINE, RUNNING}; 082 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; 083 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; 084 /** Key in mapred-*.xml that sets completionPollInvervalMillis */ 085 public static final String COMPLETION_POLL_INTERVAL_KEY = 086 "mapreduce.client.completion.pollinterval"; 087 088 /** Default completionPollIntervalMillis is 5000 ms. */ 089 static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; 090 /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ 091 public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = 092 "mapreduce.client.progressmonitor.pollinterval"; 093 /** Default progMonitorPollIntervalMillis is 1000 ms. */ 094 static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; 095 096 public static final String USED_GENERIC_PARSER = 097 "mapreduce.client.genericoptionsparser.used"; 098 public static final String SUBMIT_REPLICATION = 099 "mapreduce.client.submit.file.replication"; 100 private static final String TASKLOG_PULL_TIMEOUT_KEY = 101 "mapreduce.client.tasklog.timeout"; 102 private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; 103 104 @InterfaceStability.Evolving 105 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 106 107 static { 108 ConfigUtil.loadResources(); 109 } 110 111 private JobState state = JobState.DEFINE; 112 private JobStatus status; 113 private long statustime; 114 private Cluster cluster; 115 116 @Deprecated 117 public Job() throws IOException { 118 this(new Configuration()); 119 } 120 121 @Deprecated 122 public Job(Configuration conf) throws IOException { 123 this(new JobConf(conf)); 124 } 125 126 @Deprecated 127 public Job(Configuration conf, String jobName) throws IOException { 128 this(conf); 129 setJobName(jobName); 130 } 131 132 Job(JobConf conf) throws IOException { 133 super(conf, null); 134 this.cluster = null; 135 } 136 137 Job(JobStatus status, JobConf conf) throws IOException { 138 this(conf); 139 setJobID(status.getJobID()); 140 this.status = status; 141 state = JobState.RUNNING; 142 } 143 144 145 /** 146 * Creates a new {@link Job} with no particular {@link Cluster} . 147 * A Cluster will be created with a generic {@link Configuration}. 148 * 149 * @return the {@link Job} , with no connection to a cluster yet. 150 * @throws IOException 151 */ 152 public static Job getInstance() throws IOException { 153 // create with a null Cluster 154 return getInstance(new Configuration()); 155 } 156 157 /** 158 * Creates a new {@link Job} with no particular {@link Cluster} and a 159 * given {@link Configuration}. 160 * 161 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 162 * that any necessary internal modifications do not reflect on the incoming 163 * parameter. 164 * 165 * A Cluster will be created from the conf parameter only when it's needed. 166 * 167 * @param conf the configuration 168 * @return the {@link Job} , with no connection to a cluster yet. 169 * @throws IOException 170 */ 171 public static Job getInstance(Configuration conf) throws IOException { 172 // create with a null Cluster 173 JobConf jobConf = new JobConf(conf); 174 return new Job(jobConf); 175 } 176 177 178 /** 179 * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. 180 * A Cluster will be created from the conf parameter only when it's needed. 181 * 182 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 183 * that any necessary internal modifications do not reflect on the incoming 184 * parameter. 185 * 186 * @param conf the configuration 187 * @return the {@link Job} , with no connection to a cluster yet. 188 * @throws IOException 189 */ 190 public static Job getInstance(Configuration conf, String jobName) 191 throws IOException { 192 // create with a null Cluster 193 Job result = getInstance(conf); 194 result.setJobName(jobName); 195 return result; 196 } 197 198 /** 199 * Creates a new {@link Job} with no particular {@link Cluster} and given 200 * {@link Configuration} and {@link JobStatus}. 201 * A Cluster will be created from the conf parameter only when it's needed. 202 * 203 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 204 * that any necessary internal modifications do not reflect on the incoming 205 * parameter. 206 * 207 * @param status job status 208 * @param conf job configuration 209 * @return the {@link Job} , with no connection to a cluster yet. 210 * @throws IOException 211 */ 212 public static Job getInstance(JobStatus status, Configuration conf) 213 throws IOException { 214 return new Job(status, new JobConf(conf)); 215 } 216 217 /** 218 * Creates a new {@link Job} with no particular {@link Cluster}. 219 * A Cluster will be created from the conf parameter only when it's needed. 220 * 221 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 222 * that any necessary internal modifications do not reflect on the incoming 223 * parameter. 224 * 225 * @param ignored 226 * @return the {@link Job} , with no connection to a cluster yet. 227 * @throws IOException 228 * @deprecated Use {@link #getInstance()} 229 */ 230 @Deprecated 231 public static Job getInstance(Cluster ignored) throws IOException { 232 return getInstance(); 233 } 234 235 /** 236 * Creates a new {@link Job} with no particular {@link Cluster} and given 237 * {@link Configuration}. 238 * A Cluster will be created from the conf parameter only when it's needed. 239 * 240 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 241 * that any necessary internal modifications do not reflect on the incoming 242 * parameter. 243 * 244 * @param ignored 245 * @param conf job configuration 246 * @return the {@link Job} , with no connection to a cluster yet. 247 * @throws IOException 248 * @deprecated Use {@link #getInstance(Configuration)} 249 */ 250 @Deprecated 251 public static Job getInstance(Cluster ignored, Configuration conf) 252 throws IOException { 253 return getInstance(conf); 254 } 255 256 /** 257 * Creates a new {@link Job} with no particular {@link Cluster} and given 258 * {@link Configuration} and {@link JobStatus}. 259 * A Cluster will be created from the conf parameter only when it's needed. 260 * 261 * The <code>Job</code> makes a copy of the <code>Configuration</code> so 262 * that any necessary internal modifications do not reflect on the incoming 263 * parameter. 264 * 265 * @param cluster cluster 266 * @param status job status 267 * @param conf job configuration 268 * @return the {@link Job} , with no connection to a cluster yet. 269 * @throws IOException 270 */ 271 @Private 272 public static Job getInstance(Cluster cluster, JobStatus status, 273 Configuration conf) throws IOException { 274 Job job = getInstance(status, conf); 275 job.setCluster(cluster); 276 return job; 277 } 278 279 private void ensureState(JobState state) throws IllegalStateException { 280 if (state != this.state) { 281 throw new IllegalStateException("Job in state "+ this.state + 282 " instead of " + state); 283 } 284 285 if (state == JobState.RUNNING && cluster == null) { 286 throw new IllegalStateException 287 ("Job in state " + this.state 288 + ", but it isn't attached to any job tracker!"); 289 } 290 } 291 292 /** 293 * Some methods rely on having a recent job status object. Refresh 294 * it, if necessary 295 */ 296 synchronized void ensureFreshStatus() 297 throws IOException, InterruptedException { 298 if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { 299 updateStatus(); 300 } 301 } 302 303 /** Some methods need to update status immediately. So, refresh 304 * immediately 305 * @throws IOException 306 */ 307 synchronized void updateStatus() throws IOException, InterruptedException { 308 this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 309 @Override 310 public JobStatus run() throws IOException, InterruptedException { 311 return cluster.getClient().getJobStatus(status.getJobID()); 312 } 313 }); 314 if (this.status == null) { 315 throw new IOException("Job status not available "); 316 } 317 this.statustime = System.currentTimeMillis(); 318 } 319 320 public JobStatus getStatus() throws IOException, InterruptedException { 321 ensureState(JobState.RUNNING); 322 updateStatus(); 323 return status; 324 } 325 326 private void setStatus(JobStatus status) { 327 this.status = status; 328 } 329 330 /** 331 * Returns the current state of the Job. 332 * 333 * @return JobStatus#State 334 * @throws IOException 335 * @throws InterruptedException 336 */ 337 public JobStatus.State getJobState() 338 throws IOException, InterruptedException { 339 ensureState(JobState.RUNNING); 340 updateStatus(); 341 return status.getState(); 342 } 343 344 /** 345 * Get the URL where some job progress information will be displayed. 346 * 347 * @return the URL where some job progress information will be displayed. 348 */ 349 public String getTrackingURL(){ 350 ensureState(JobState.RUNNING); 351 return status.getTrackingUrl().toString(); 352 } 353 354 /** 355 * Get the path of the submitted job configuration. 356 * 357 * @return the path of the submitted job configuration. 358 */ 359 public String getJobFile() { 360 ensureState(JobState.RUNNING); 361 return status.getJobFile(); 362 } 363 364 /** 365 * Get start time of the job. 366 * 367 * @return the start time of the job 368 */ 369 public long getStartTime() { 370 ensureState(JobState.RUNNING); 371 return status.getStartTime(); 372 } 373 374 /** 375 * Get finish time of the job. 376 * 377 * @return the finish time of the job 378 */ 379 public long getFinishTime() throws IOException, InterruptedException { 380 ensureState(JobState.RUNNING); 381 updateStatus(); 382 return status.getFinishTime(); 383 } 384 385 /** 386 * Get scheduling info of the job. 387 * 388 * @return the scheduling info of the job 389 */ 390 public String getSchedulingInfo() { 391 ensureState(JobState.RUNNING); 392 return status.getSchedulingInfo(); 393 } 394 395 /** 396 * Get scheduling info of the job. 397 * 398 * @return the scheduling info of the job 399 */ 400 public JobPriority getPriority() throws IOException, InterruptedException { 401 ensureState(JobState.RUNNING); 402 updateStatus(); 403 return status.getPriority(); 404 } 405 406 /** 407 * The user-specified job name. 408 */ 409 public String getJobName() { 410 if (state == JobState.DEFINE) { 411 return super.getJobName(); 412 } 413 ensureState(JobState.RUNNING); 414 return status.getJobName(); 415 } 416 417 public String getHistoryUrl() throws IOException, InterruptedException { 418 ensureState(JobState.RUNNING); 419 updateStatus(); 420 return status.getHistoryFile(); 421 } 422 423 public boolean isRetired() throws IOException, InterruptedException { 424 ensureState(JobState.RUNNING); 425 updateStatus(); 426 return status.isRetired(); 427 } 428 429 @Private 430 public Cluster getCluster() { 431 return cluster; 432 } 433 434 /** Only for mocks in unit tests. */ 435 @Private 436 private void setCluster(Cluster cluster) { 437 this.cluster = cluster; 438 } 439 440 /** 441 * Dump stats to screen. 442 */ 443 @Override 444 public String toString() { 445 ensureState(JobState.RUNNING); 446 String reasonforFailure = " "; 447 int numMaps = 0; 448 int numReduces = 0; 449 try { 450 updateStatus(); 451 if (status.getState().equals(JobStatus.State.FAILED)) 452 reasonforFailure = getTaskFailureEventString(); 453 numMaps = getTaskReports(TaskType.MAP).length; 454 numReduces = getTaskReports(TaskType.REDUCE).length; 455 } catch (IOException e) { 456 } catch (InterruptedException ie) { 457 } 458 StringBuffer sb = new StringBuffer(); 459 sb.append("Job: ").append(status.getJobID()).append("\n"); 460 sb.append("Job File: ").append(status.getJobFile()).append("\n"); 461 sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); 462 sb.append("\n"); 463 sb.append("Uber job : ").append(status.isUber()).append("\n"); 464 sb.append("Number of maps: ").append(numMaps).append("\n"); 465 sb.append("Number of reduces: ").append(numReduces).append("\n"); 466 sb.append("map() completion: "); 467 sb.append(status.getMapProgress()).append("\n"); 468 sb.append("reduce() completion: "); 469 sb.append(status.getReduceProgress()).append("\n"); 470 sb.append("Job state: "); 471 sb.append(status.getState()).append("\n"); 472 sb.append("retired: ").append(status.isRetired()).append("\n"); 473 sb.append("reason for failure: ").append(reasonforFailure); 474 return sb.toString(); 475 } 476 477 /** 478 * @return taskid which caused job failure 479 * @throws IOException 480 * @throws InterruptedException 481 */ 482 String getTaskFailureEventString() throws IOException, 483 InterruptedException { 484 int failCount = 1; 485 TaskCompletionEvent lastEvent = null; 486 TaskCompletionEvent[] events = ugi.doAs(new 487 PrivilegedExceptionAction<TaskCompletionEvent[]>() { 488 @Override 489 public TaskCompletionEvent[] run() throws IOException, 490 InterruptedException { 491 return cluster.getClient().getTaskCompletionEvents( 492 status.getJobID(), 0, 10); 493 } 494 }); 495 for (TaskCompletionEvent event : events) { 496 if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) { 497 failCount++; 498 lastEvent = event; 499 } 500 } 501 if (lastEvent == null) { 502 return "There are no failed tasks for the job. " 503 + "Job is failed due to some other reason and reason " 504 + "can be found in the logs."; 505 } 506 String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2); 507 String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2); 508 return (" task " + taskID + " failed " + 509 failCount + " times " + "For details check tasktracker at: " + 510 lastEvent.getTaskTrackerHttp()); 511 } 512 513 /** 514 * Get the information of the current state of the tasks of a job. 515 * 516 * @param type Type of the task 517 * @return the list of all of the map tips. 518 * @throws IOException 519 */ 520 public TaskReport[] getTaskReports(TaskType type) 521 throws IOException, InterruptedException { 522 ensureState(JobState.RUNNING); 523 final TaskType tmpType = type; 524 return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() { 525 public TaskReport[] run() throws IOException, InterruptedException { 526 return cluster.getClient().getTaskReports(getJobID(), tmpType); 527 } 528 }); 529 } 530 531 /** 532 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 533 * and 1.0. When all map tasks have completed, the function returns 1.0. 534 * 535 * @return the progress of the job's map-tasks. 536 * @throws IOException 537 */ 538 public float mapProgress() throws IOException, InterruptedException { 539 ensureState(JobState.RUNNING); 540 ensureFreshStatus(); 541 return status.getMapProgress(); 542 } 543 544 /** 545 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 546 * and 1.0. When all reduce tasks have completed, the function returns 1.0. 547 * 548 * @return the progress of the job's reduce-tasks. 549 * @throws IOException 550 */ 551 public float reduceProgress() throws IOException, InterruptedException { 552 ensureState(JobState.RUNNING); 553 ensureFreshStatus(); 554 return status.getReduceProgress(); 555 } 556 557 /** 558 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 559 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. 560 * 561 * @return the progress of the job's cleanup-tasks. 562 * @throws IOException 563 */ 564 public float cleanupProgress() throws IOException, InterruptedException { 565 ensureState(JobState.RUNNING); 566 ensureFreshStatus(); 567 return status.getCleanupProgress(); 568 } 569 570 /** 571 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 572 * and 1.0. When all setup tasks have completed, the function returns 1.0. 573 * 574 * @return the progress of the job's setup-tasks. 575 * @throws IOException 576 */ 577 public float setupProgress() throws IOException, InterruptedException { 578 ensureState(JobState.RUNNING); 579 ensureFreshStatus(); 580 return status.getSetupProgress(); 581 } 582 583 /** 584 * Check if the job is finished or not. 585 * This is a non-blocking call. 586 * 587 * @return <code>true</code> if the job is complete, else <code>false</code>. 588 * @throws IOException 589 */ 590 public boolean isComplete() throws IOException, InterruptedException { 591 ensureState(JobState.RUNNING); 592 updateStatus(); 593 return status.isJobComplete(); 594 } 595 596 /** 597 * Check if the job completed successfully. 598 * 599 * @return <code>true</code> if the job succeeded, else <code>false</code>. 600 * @throws IOException 601 */ 602 public boolean isSuccessful() throws IOException, InterruptedException { 603 ensureState(JobState.RUNNING); 604 updateStatus(); 605 return status.getState() == JobStatus.State.SUCCEEDED; 606 } 607 608 /** 609 * Kill the running job. Blocks until all job tasks have been 610 * killed as well. If the job is no longer running, it simply returns. 611 * 612 * @throws IOException 613 */ 614 public void killJob() throws IOException, InterruptedException { 615 ensureState(JobState.RUNNING); 616 cluster.getClient().killJob(getJobID()); 617 } 618 619 /** 620 * Set the priority of a running job. 621 * @param priority the new priority for the job. 622 * @throws IOException 623 */ 624 public void setPriority(JobPriority priority) 625 throws IOException, InterruptedException { 626 if (state == JobState.DEFINE) { 627 conf.setJobPriority( 628 org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); 629 } else { 630 ensureState(JobState.RUNNING); 631 final JobPriority tmpPriority = priority; 632 ugi.doAs(new PrivilegedExceptionAction<Object>() { 633 @Override 634 public Object run() throws IOException, InterruptedException { 635 cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString()); 636 return null; 637 } 638 }); 639 } 640 } 641 642 /** 643 * Get events indicating completion (success/failure) of component tasks. 644 * 645 * @param startFrom index to start fetching events from 646 * @param numEvents number of events to fetch 647 * @return an array of {@link TaskCompletionEvent}s 648 * @throws IOException 649 */ 650 public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom, 651 final int numEvents) throws IOException, InterruptedException { 652 ensureState(JobState.RUNNING); 653 return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() { 654 @Override 655 public TaskCompletionEvent[] run() throws IOException, InterruptedException { 656 return cluster.getClient().getTaskCompletionEvents(getJobID(), 657 startFrom, numEvents); 658 } 659 }); 660 } 661 662 /** 663 * Kill indicated task attempt. 664 * 665 * @param taskId the id of the task to be terminated. 666 * @throws IOException 667 */ 668 public boolean killTask(final TaskAttemptID taskId) 669 throws IOException, InterruptedException { 670 ensureState(JobState.RUNNING); 671 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 672 public Boolean run() throws IOException, InterruptedException { 673 return cluster.getClient().killTask(taskId, false); 674 } 675 }); 676 } 677 678 /** 679 * Fail indicated task attempt. 680 * 681 * @param taskId the id of the task to be terminated. 682 * @throws IOException 683 */ 684 public boolean failTask(final TaskAttemptID taskId) 685 throws IOException, InterruptedException { 686 ensureState(JobState.RUNNING); 687 return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { 688 @Override 689 public Boolean run() throws IOException, InterruptedException { 690 return cluster.getClient().killTask(taskId, true); 691 } 692 }); 693 } 694 695 /** 696 * Gets the counters for this job. May return null if the job has been 697 * retired and the job is no longer in the completed job store. 698 * 699 * @return the counters for this job. 700 * @throws IOException 701 */ 702 public Counters getCounters() 703 throws IOException, InterruptedException { 704 ensureState(JobState.RUNNING); 705 return ugi.doAs(new PrivilegedExceptionAction<Counters>() { 706 @Override 707 public Counters run() throws IOException, InterruptedException { 708 return cluster.getClient().getJobCounters(getJobID()); 709 } 710 }); 711 } 712 713 /** 714 * Gets the diagnostic messages for a given task attempt. 715 * @param taskid 716 * @return the list of diagnostic messages for the task 717 * @throws IOException 718 */ 719 public String[] getTaskDiagnostics(final TaskAttemptID taskid) 720 throws IOException, InterruptedException { 721 ensureState(JobState.RUNNING); 722 return ugi.doAs(new PrivilegedExceptionAction<String[]>() { 723 @Override 724 public String[] run() throws IOException, InterruptedException { 725 return cluster.getClient().getTaskDiagnostics(taskid); 726 } 727 }); 728 } 729 730 /** 731 * Set the number of reduce tasks for the job. 732 * @param tasks the number of reduce tasks 733 * @throws IllegalStateException if the job is submitted 734 */ 735 public void setNumReduceTasks(int tasks) throws IllegalStateException { 736 ensureState(JobState.DEFINE); 737 conf.setNumReduceTasks(tasks); 738 } 739 740 /** 741 * Set the current working directory for the default file system. 742 * 743 * @param dir the new current working directory. 744 * @throws IllegalStateException if the job is submitted 745 */ 746 public void setWorkingDirectory(Path dir) throws IOException { 747 ensureState(JobState.DEFINE); 748 conf.setWorkingDirectory(dir); 749 } 750 751 /** 752 * Set the {@link InputFormat} for the job. 753 * @param cls the <code>InputFormat</code> to use 754 * @throws IllegalStateException if the job is submitted 755 */ 756 public void setInputFormatClass(Class<? extends InputFormat> cls 757 ) throws IllegalStateException { 758 ensureState(JobState.DEFINE); 759 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 760 InputFormat.class); 761 } 762 763 /** 764 * Set the {@link OutputFormat} for the job. 765 * @param cls the <code>OutputFormat</code> to use 766 * @throws IllegalStateException if the job is submitted 767 */ 768 public void setOutputFormatClass(Class<? extends OutputFormat> cls 769 ) throws IllegalStateException { 770 ensureState(JobState.DEFINE); 771 conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 772 OutputFormat.class); 773 } 774 775 /** 776 * Set the {@link Mapper} for the job. 777 * @param cls the <code>Mapper</code> to use 778 * @throws IllegalStateException if the job is submitted 779 */ 780 public void setMapperClass(Class<? extends Mapper> cls 781 ) throws IllegalStateException { 782 ensureState(JobState.DEFINE); 783 conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); 784 } 785 786 /** 787 * Set the Jar by finding where a given class came from. 788 * @param cls the example class 789 */ 790 public void setJarByClass(Class<?> cls) { 791 ensureState(JobState.DEFINE); 792 conf.setJarByClass(cls); 793 } 794 795 /** 796 * Set the job jar 797 */ 798 public void setJar(String jar) { 799 ensureState(JobState.DEFINE); 800 conf.setJar(jar); 801 } 802 803 /** 804 * Set the reported username for this job. 805 * 806 * @param user the username for this job. 807 */ 808 public void setUser(String user) { 809 ensureState(JobState.DEFINE); 810 conf.setUser(user); 811 } 812 813 /** 814 * Set the combiner class for the job. 815 * @param cls the combiner to use 816 * @throws IllegalStateException if the job is submitted 817 */ 818 public void setCombinerClass(Class<? extends Reducer> cls 819 ) throws IllegalStateException { 820 ensureState(JobState.DEFINE); 821 conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); 822 } 823 824 /** 825 * Set the {@link Reducer} for the job. 826 * @param cls the <code>Reducer</code> to use 827 * @throws IllegalStateException if the job is submitted 828 */ 829 public void setReducerClass(Class<? extends Reducer> cls 830 ) throws IllegalStateException { 831 ensureState(JobState.DEFINE); 832 conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); 833 } 834 835 /** 836 * Set the {@link Partitioner} for the job. 837 * @param cls the <code>Partitioner</code> to use 838 * @throws IllegalStateException if the job is submitted 839 */ 840 public void setPartitionerClass(Class<? extends Partitioner> cls 841 ) throws IllegalStateException { 842 ensureState(JobState.DEFINE); 843 conf.setClass(PARTITIONER_CLASS_ATTR, cls, 844 Partitioner.class); 845 } 846 847 /** 848 * Set the key class for the map output data. This allows the user to 849 * specify the map output key class to be different than the final output 850 * value class. 851 * 852 * @param theClass the map output key class. 853 * @throws IllegalStateException if the job is submitted 854 */ 855 public void setMapOutputKeyClass(Class<?> theClass 856 ) throws IllegalStateException { 857 ensureState(JobState.DEFINE); 858 conf.setMapOutputKeyClass(theClass); 859 } 860 861 /** 862 * Set the value class for the map output data. This allows the user to 863 * specify the map output value class to be different than the final output 864 * value class. 865 * 866 * @param theClass the map output value class. 867 * @throws IllegalStateException if the job is submitted 868 */ 869 public void setMapOutputValueClass(Class<?> theClass 870 ) throws IllegalStateException { 871 ensureState(JobState.DEFINE); 872 conf.setMapOutputValueClass(theClass); 873 } 874 875 /** 876 * Set the key class for the job output data. 877 * 878 * @param theClass the key class for the job output data. 879 * @throws IllegalStateException if the job is submitted 880 */ 881 public void setOutputKeyClass(Class<?> theClass 882 ) throws IllegalStateException { 883 ensureState(JobState.DEFINE); 884 conf.setOutputKeyClass(theClass); 885 } 886 887 /** 888 * Set the value class for job outputs. 889 * 890 * @param theClass the value class for job outputs. 891 * @throws IllegalStateException if the job is submitted 892 */ 893 public void setOutputValueClass(Class<?> theClass 894 ) throws IllegalStateException { 895 ensureState(JobState.DEFINE); 896 conf.setOutputValueClass(theClass); 897 } 898 899 /** 900 * Define the comparator that controls how the keys are sorted before they 901 * are passed to the {@link Reducer}. 902 * @param cls the raw comparator 903 * @throws IllegalStateException if the job is submitted 904 */ 905 public void setSortComparatorClass(Class<? extends RawComparator> cls 906 ) throws IllegalStateException { 907 ensureState(JobState.DEFINE); 908 conf.setOutputKeyComparatorClass(cls); 909 } 910 911 /** 912 * Define the comparator that controls which keys are grouped together 913 * for a single call to 914 * {@link Reducer#reduce(Object, Iterable, 915 * org.apache.hadoop.mapreduce.Reducer.Context)} 916 * @param cls the raw comparator to use 917 * @throws IllegalStateException if the job is submitted 918 */ 919 public void setGroupingComparatorClass(Class<? extends RawComparator> cls 920 ) throws IllegalStateException { 921 ensureState(JobState.DEFINE); 922 conf.setOutputValueGroupingComparator(cls); 923 } 924 925 /** 926 * Set the user-specified job name. 927 * 928 * @param name the job's new name. 929 * @throws IllegalStateException if the job is submitted 930 */ 931 public void setJobName(String name) throws IllegalStateException { 932 ensureState(JobState.DEFINE); 933 conf.setJobName(name); 934 } 935 936 /** 937 * Turn speculative execution on or off for this job. 938 * 939 * @param speculativeExecution <code>true</code> if speculative execution 940 * should be turned on, else <code>false</code>. 941 */ 942 public void setSpeculativeExecution(boolean speculativeExecution) { 943 ensureState(JobState.DEFINE); 944 conf.setSpeculativeExecution(speculativeExecution); 945 } 946 947 /** 948 * Turn speculative execution on or off for this job for map tasks. 949 * 950 * @param speculativeExecution <code>true</code> if speculative execution 951 * should be turned on for map tasks, 952 * else <code>false</code>. 953 */ 954 public void setMapSpeculativeExecution(boolean speculativeExecution) { 955 ensureState(JobState.DEFINE); 956 conf.setMapSpeculativeExecution(speculativeExecution); 957 } 958 959 /** 960 * Turn speculative execution on or off for this job for reduce tasks. 961 * 962 * @param speculativeExecution <code>true</code> if speculative execution 963 * should be turned on for reduce tasks, 964 * else <code>false</code>. 965 */ 966 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 967 ensureState(JobState.DEFINE); 968 conf.setReduceSpeculativeExecution(speculativeExecution); 969 } 970 971 /** 972 * Specify whether job-setup and job-cleanup is needed for the job 973 * 974 * @param needed If <code>true</code>, job-setup and job-cleanup will be 975 * considered from {@link OutputCommitter} 976 * else ignored. 977 */ 978 public void setJobSetupCleanupNeeded(boolean needed) { 979 ensureState(JobState.DEFINE); 980 conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); 981 } 982 983 /** 984 * Set the given set of archives 985 * @param archives The list of archives that need to be localized 986 */ 987 public void setCacheArchives(URI[] archives) { 988 ensureState(JobState.DEFINE); 989 DistributedCache.setCacheArchives(archives, conf); 990 } 991 992 /** 993 * Set the given set of files 994 * @param files The list of files that need to be localized 995 */ 996 public void setCacheFiles(URI[] files) { 997 ensureState(JobState.DEFINE); 998 DistributedCache.setCacheFiles(files, conf); 999 } 1000 1001 /** 1002 * Add a archives to be localized 1003 * @param uri The uri of the cache to be localized 1004 */ 1005 public void addCacheArchive(URI uri) { 1006 ensureState(JobState.DEFINE); 1007 DistributedCache.addCacheArchive(uri, conf); 1008 } 1009 1010 /** 1011 * Add a file to be localized 1012 * @param uri The uri of the cache to be localized 1013 */ 1014 public void addCacheFile(URI uri) { 1015 ensureState(JobState.DEFINE); 1016 DistributedCache.addCacheFile(uri, conf); 1017 } 1018 1019 /** 1020 * Add an file path to the current set of classpath entries It adds the file 1021 * to cache as well. 1022 * 1023 * Files added with this method will not be unpacked while being added to the 1024 * classpath. 1025 * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} 1026 * method instead. 1027 * 1028 * @param file Path of the file to be added 1029 */ 1030 public void addFileToClassPath(Path file) 1031 throws IOException { 1032 ensureState(JobState.DEFINE); 1033 DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); 1034 } 1035 1036 /** 1037 * Add an archive path to the current set of classpath entries. It adds the 1038 * archive to cache as well. 1039 * 1040 * Archive files will be unpacked and added to the classpath 1041 * when being distributed. 1042 * 1043 * @param archive Path of the archive to be added 1044 */ 1045 public void addArchiveToClassPath(Path archive) 1046 throws IOException { 1047 ensureState(JobState.DEFINE); 1048 DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); 1049 } 1050 1051 /** 1052 * Originally intended to enable symlinks, but currently symlinks cannot be 1053 * disabled. 1054 */ 1055 @Deprecated 1056 public void createSymlink() { 1057 ensureState(JobState.DEFINE); 1058 DistributedCache.createSymlink(conf); 1059 } 1060 1061 /** 1062 * Expert: Set the number of maximum attempts that will be made to run a 1063 * map task. 1064 * 1065 * @param n the number of attempts per map task. 1066 */ 1067 public void setMaxMapAttempts(int n) { 1068 ensureState(JobState.DEFINE); 1069 conf.setMaxMapAttempts(n); 1070 } 1071 1072 /** 1073 * Expert: Set the number of maximum attempts that will be made to run a 1074 * reduce task. 1075 * 1076 * @param n the number of attempts per reduce task. 1077 */ 1078 public void setMaxReduceAttempts(int n) { 1079 ensureState(JobState.DEFINE); 1080 conf.setMaxReduceAttempts(n); 1081 } 1082 1083 /** 1084 * Set whether the system should collect profiler information for some of 1085 * the tasks in this job? The information is stored in the user log 1086 * directory. 1087 * @param newValue true means it should be gathered 1088 */ 1089 public void setProfileEnabled(boolean newValue) { 1090 ensureState(JobState.DEFINE); 1091 conf.setProfileEnabled(newValue); 1092 } 1093 1094 /** 1095 * Set the profiler configuration arguments. If the string contains a '%s' it 1096 * will be replaced with the name of the profiling output file when the task 1097 * runs. 1098 * 1099 * This value is passed to the task child JVM on the command line. 1100 * 1101 * @param value the configuration string 1102 */ 1103 public void setProfileParams(String value) { 1104 ensureState(JobState.DEFINE); 1105 conf.setProfileParams(value); 1106 } 1107 1108 /** 1109 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1110 * must also be called. 1111 * @param newValue a set of integer ranges of the map ids 1112 */ 1113 public void setProfileTaskRange(boolean isMap, String newValue) { 1114 ensureState(JobState.DEFINE); 1115 conf.setProfileTaskRange(isMap, newValue); 1116 } 1117 1118 private void ensureNotSet(String attr, String msg) throws IOException { 1119 if (conf.get(attr) != null) { 1120 throw new IOException(attr + " is incompatible with " + msg + " mode."); 1121 } 1122 } 1123 1124 /** 1125 * Sets the flag that will allow the JobTracker to cancel the HDFS delegation 1126 * tokens upon job completion. Defaults to true. 1127 */ 1128 public void setCancelDelegationTokenUponJobCompletion(boolean value) { 1129 ensureState(JobState.DEFINE); 1130 conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); 1131 } 1132 1133 /** 1134 * Default to the new APIs unless they are explicitly set or the old mapper or 1135 * reduce attributes are used. 1136 * @throws IOException if the configuration is inconsistant 1137 */ 1138 private void setUseNewAPI() throws IOException { 1139 int numReduces = conf.getNumReduceTasks(); 1140 String oldMapperClass = "mapred.mapper.class"; 1141 String oldReduceClass = "mapred.reducer.class"; 1142 conf.setBooleanIfUnset("mapred.mapper.new-api", 1143 conf.get(oldMapperClass) == null); 1144 if (conf.getUseNewMapper()) { 1145 String mode = "new map API"; 1146 ensureNotSet("mapred.input.format.class", mode); 1147 ensureNotSet(oldMapperClass, mode); 1148 if (numReduces != 0) { 1149 ensureNotSet("mapred.partitioner.class", mode); 1150 } else { 1151 ensureNotSet("mapred.output.format.class", mode); 1152 } 1153 } else { 1154 String mode = "map compatability"; 1155 ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); 1156 ensureNotSet(MAP_CLASS_ATTR, mode); 1157 if (numReduces != 0) { 1158 ensureNotSet(PARTITIONER_CLASS_ATTR, mode); 1159 } else { 1160 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1161 } 1162 } 1163 if (numReduces != 0) { 1164 conf.setBooleanIfUnset("mapred.reducer.new-api", 1165 conf.get(oldReduceClass) == null); 1166 if (conf.getUseNewReducer()) { 1167 String mode = "new reduce API"; 1168 ensureNotSet("mapred.output.format.class", mode); 1169 ensureNotSet(oldReduceClass, mode); 1170 } else { 1171 String mode = "reduce compatability"; 1172 ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); 1173 ensureNotSet(REDUCE_CLASS_ATTR, mode); 1174 } 1175 } 1176 } 1177 1178 private synchronized void connect() 1179 throws IOException, InterruptedException, ClassNotFoundException { 1180 if (cluster == null) { 1181 cluster = 1182 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 1183 public Cluster run() 1184 throws IOException, InterruptedException, 1185 ClassNotFoundException { 1186 return new Cluster(getConfiguration()); 1187 } 1188 }); 1189 } 1190 } 1191 1192 boolean isConnected() { 1193 return cluster != null; 1194 } 1195 1196 /** Only for mocking via unit tests. */ 1197 @Private 1198 public JobSubmitter getJobSubmitter(FileSystem fs, 1199 ClientProtocol submitClient) throws IOException { 1200 return new JobSubmitter(fs, submitClient); 1201 } 1202 /** 1203 * Submit the job to the cluster and return immediately. 1204 * @throws IOException 1205 */ 1206 public void submit() 1207 throws IOException, InterruptedException, ClassNotFoundException { 1208 ensureState(JobState.DEFINE); 1209 setUseNewAPI(); 1210 connect(); 1211 final JobSubmitter submitter = 1212 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 1213 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 1214 public JobStatus run() throws IOException, InterruptedException, 1215 ClassNotFoundException { 1216 return submitter.submitJobInternal(Job.this, cluster); 1217 } 1218 }); 1219 state = JobState.RUNNING; 1220 LOG.info("The url to track the job: " + getTrackingURL()); 1221 } 1222 1223 /** 1224 * Submit the job to the cluster and wait for it to finish. 1225 * @param verbose print the progress to the user 1226 * @return true if the job succeeded 1227 * @throws IOException thrown if the communication with the 1228 * <code>JobTracker</code> is lost 1229 */ 1230 public boolean waitForCompletion(boolean verbose 1231 ) throws IOException, InterruptedException, 1232 ClassNotFoundException { 1233 if (state == JobState.DEFINE) { 1234 submit(); 1235 } 1236 if (verbose) { 1237 monitorAndPrintJob(); 1238 } else { 1239 // get the completion poll interval from the client. 1240 int completionPollIntervalMillis = 1241 Job.getCompletionPollInterval(cluster.getConf()); 1242 while (!isComplete()) { 1243 try { 1244 Thread.sleep(completionPollIntervalMillis); 1245 } catch (InterruptedException ie) { 1246 } 1247 } 1248 } 1249 return isSuccessful(); 1250 } 1251 1252 /** 1253 * Monitor a job and print status in real-time as progress is made and tasks 1254 * fail. 1255 * @return true if the job succeeded 1256 * @throws IOException if communication to the JobTracker fails 1257 */ 1258 public boolean monitorAndPrintJob() 1259 throws IOException, InterruptedException { 1260 String lastReport = null; 1261 Job.TaskStatusFilter filter; 1262 Configuration clientConf = getConfiguration(); 1263 filter = Job.getTaskOutputFilter(clientConf); 1264 JobID jobId = getJobID(); 1265 LOG.info("Running job: " + jobId); 1266 int eventCounter = 0; 1267 boolean profiling = getProfileEnabled(); 1268 IntegerRanges mapRanges = getProfileTaskRange(true); 1269 IntegerRanges reduceRanges = getProfileTaskRange(false); 1270 int progMonitorPollIntervalMillis = 1271 Job.getProgressPollInterval(clientConf); 1272 /* make sure to report full progress after the job is done */ 1273 boolean reportedAfterCompletion = false; 1274 boolean reportedUberMode = false; 1275 while (!isComplete() || !reportedAfterCompletion) { 1276 if (isComplete()) { 1277 reportedAfterCompletion = true; 1278 } else { 1279 Thread.sleep(progMonitorPollIntervalMillis); 1280 } 1281 if (status.getState() == JobStatus.State.PREP) { 1282 continue; 1283 } 1284 if (!reportedUberMode) { 1285 reportedUberMode = true; 1286 LOG.info("Job " + jobId + " running in uber mode : " + isUber()); 1287 } 1288 String report = 1289 (" map " + StringUtils.formatPercent(mapProgress(), 0)+ 1290 " reduce " + 1291 StringUtils.formatPercent(reduceProgress(), 0)); 1292 if (!report.equals(lastReport)) { 1293 LOG.info(report); 1294 lastReport = report; 1295 } 1296 1297 TaskCompletionEvent[] events = 1298 getTaskCompletionEvents(eventCounter, 10); 1299 eventCounter += events.length; 1300 printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); 1301 } 1302 boolean success = isSuccessful(); 1303 if (success) { 1304 LOG.info("Job " + jobId + " completed successfully"); 1305 } else { 1306 LOG.info("Job " + jobId + " failed with state " + status.getState() + 1307 " due to: " + status.getFailureInfo()); 1308 } 1309 Counters counters = getCounters(); 1310 if (counters != null) { 1311 LOG.info(counters.toString()); 1312 } 1313 return success; 1314 } 1315 1316 /** 1317 * @return true if the profile parameters indicate that this is using 1318 * hprof, which generates profile files in a particular location 1319 * that we can retrieve to the client. 1320 */ 1321 private boolean shouldDownloadProfile() { 1322 // Check the argument string that was used to initialize profiling. 1323 // If this indicates hprof and file-based output, then we're ok to 1324 // download. 1325 String profileParams = getProfileParams(); 1326 1327 if (null == profileParams) { 1328 return false; 1329 } 1330 1331 // Split this on whitespace. 1332 String [] parts = profileParams.split("[ \\t]+"); 1333 1334 // If any of these indicate hprof, and the use of output files, return true. 1335 boolean hprofFound = false; 1336 boolean fileFound = false; 1337 for (String p : parts) { 1338 if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { 1339 hprofFound = true; 1340 1341 // This contains a number of comma-delimited components, one of which 1342 // may specify the file to write to. Make sure this is present and 1343 // not empty. 1344 String [] subparts = p.split(","); 1345 for (String sub : subparts) { 1346 if (sub.startsWith("file=") && sub.length() != "file=".length()) { 1347 fileFound = true; 1348 } 1349 } 1350 } 1351 } 1352 1353 return hprofFound && fileFound; 1354 } 1355 1356 private void printTaskEvents(TaskCompletionEvent[] events, 1357 Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, 1358 IntegerRanges reduceRanges) throws IOException, InterruptedException { 1359 for (TaskCompletionEvent event : events) { 1360 switch (filter) { 1361 case NONE: 1362 break; 1363 case SUCCEEDED: 1364 if (event.getStatus() == 1365 TaskCompletionEvent.Status.SUCCEEDED) { 1366 LOG.info(event.toString()); 1367 } 1368 break; 1369 case FAILED: 1370 if (event.getStatus() == 1371 TaskCompletionEvent.Status.FAILED) { 1372 LOG.info(event.toString()); 1373 // Displaying the task diagnostic information 1374 TaskAttemptID taskId = event.getTaskAttemptId(); 1375 String[] taskDiagnostics = getTaskDiagnostics(taskId); 1376 if (taskDiagnostics != null) { 1377 for (String diagnostics : taskDiagnostics) { 1378 System.err.println(diagnostics); 1379 } 1380 } 1381 } 1382 break; 1383 case KILLED: 1384 if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ 1385 LOG.info(event.toString()); 1386 } 1387 break; 1388 case ALL: 1389 LOG.info(event.toString()); 1390 break; 1391 } 1392 } 1393 } 1394 1395 /** The interval at which monitorAndPrintJob() prints status */ 1396 public static int getProgressPollInterval(Configuration conf) { 1397 // Read progress monitor poll interval from config. Default is 1 second. 1398 int progMonitorPollIntervalMillis = conf.getInt( 1399 PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); 1400 if (progMonitorPollIntervalMillis < 1) { 1401 LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 1402 " has been set to an invalid value; " 1403 + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); 1404 progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; 1405 } 1406 return progMonitorPollIntervalMillis; 1407 } 1408 1409 /** The interval at which waitForCompletion() should check. */ 1410 public static int getCompletionPollInterval(Configuration conf) { 1411 int completionPollIntervalMillis = conf.getInt( 1412 COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); 1413 if (completionPollIntervalMillis < 1) { 1414 LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 1415 " has been set to an invalid value; " 1416 + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); 1417 completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; 1418 } 1419 return completionPollIntervalMillis; 1420 } 1421 1422 /** 1423 * Get the task output filter. 1424 * 1425 * @param conf the configuration. 1426 * @return the filter level. 1427 */ 1428 public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { 1429 return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); 1430 } 1431 1432 /** 1433 * Modify the Configuration to set the task output filter. 1434 * 1435 * @param conf the Configuration to modify. 1436 * @param newValue the value to set. 1437 */ 1438 public static void setTaskOutputFilter(Configuration conf, 1439 TaskStatusFilter newValue) { 1440 conf.set(Job.OUTPUT_FILTER, newValue.toString()); 1441 } 1442 1443 public boolean isUber() throws IOException, InterruptedException { 1444 ensureState(JobState.RUNNING); 1445 updateStatus(); 1446 return status.isUber(); 1447 } 1448 1449 }