001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    import java.net.URI;
023    import java.security.PrivilegedExceptionAction;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.classification.InterfaceAudience.Private;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.conf.Configuration.IntegerRanges;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.RawComparator;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
037    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
038    import org.apache.hadoop.mapreduce.task.JobContextImpl;
039    import org.apache.hadoop.mapreduce.util.ConfigUtil;
040    import org.apache.hadoop.util.StringUtils;
041    
042    /**
043     * The job submitter's view of the Job.
044     * 
045     * <p>It allows the user to configure the
046     * job, submit it, control its execution, and query the state. The set methods
047     * only work until the job is submitted, afterwards they will throw an 
048     * IllegalStateException. </p>
049     * 
050     * <p>
051     * Normally the user creates the application, describes various facets of the
052     * job via {@link Job} and then submits the job and monitor its progress.</p>
053     * 
054     * <p>Here is an example on how to submit a job:</p>
055     * <p><blockquote><pre>
056     *     // Create a new Job
057     *     Job job = new Job(new Configuration());
058     *     job.setJarByClass(MyJob.class);
059     *     
060     *     // Specify various job-specific parameters     
061     *     job.setJobName("myjob");
062     *     
063     *     job.setInputPath(new Path("in"));
064     *     job.setOutputPath(new Path("out"));
065     *     
066     *     job.setMapperClass(MyJob.MyMapper.class);
067     *     job.setReducerClass(MyJob.MyReducer.class);
068     *
069     *     // Submit the job, then poll for progress until the job is complete
070     *     job.waitForCompletion(true);
071     * </pre></blockquote></p>
072     * 
073     * 
074     */
075    @InterfaceAudience.Public
076    @InterfaceStability.Evolving
077    public class Job extends JobContextImpl implements JobContext {  
078      private static final Log LOG = LogFactory.getLog(Job.class);
079    
080      @InterfaceStability.Evolving
081      public static enum JobState {DEFINE, RUNNING};
082      private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
083      public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
084      /** Key in mapred-*.xml that sets completionPollInvervalMillis */
085      public static final String COMPLETION_POLL_INTERVAL_KEY = 
086        "mapreduce.client.completion.pollinterval";
087      
088      /** Default completionPollIntervalMillis is 5000 ms. */
089      static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
090      /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
091      public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
092        "mapreduce.client.progressmonitor.pollinterval";
093      /** Default progMonitorPollIntervalMillis is 1000 ms. */
094      static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
095    
096      public static final String USED_GENERIC_PARSER = 
097        "mapreduce.client.genericoptionsparser.used";
098      public static final String SUBMIT_REPLICATION = 
099        "mapreduce.client.submit.file.replication";
100      private static final String TASKLOG_PULL_TIMEOUT_KEY =
101               "mapreduce.client.tasklog.timeout";
102      private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
103    
104      @InterfaceStability.Evolving
105      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
106    
107      static {
108        ConfigUtil.loadResources();
109      }
110    
111      private JobState state = JobState.DEFINE;
112      private JobStatus status;
113      private long statustime;
114      private Cluster cluster;
115    
116      @Deprecated
117      public Job() throws IOException {
118        this(new Configuration());
119      }
120    
121      @Deprecated
122      public Job(Configuration conf) throws IOException {
123        this(new JobConf(conf));
124      }
125    
126      @Deprecated
127      public Job(Configuration conf, String jobName) throws IOException {
128        this(conf);
129        setJobName(jobName);
130      }
131    
132      Job(JobConf conf) throws IOException {
133        super(conf, null);
134        this.cluster = null;
135      }
136    
137      Job(JobStatus status, JobConf conf) throws IOException {
138        this(conf);
139        setJobID(status.getJobID());
140        this.status = status;
141        state = JobState.RUNNING;
142      }
143    
144          
145      /**
146       * Creates a new {@link Job} with no particular {@link Cluster} .
147       * A Cluster will be created with a generic {@link Configuration}.
148       * 
149       * @return the {@link Job} , with no connection to a cluster yet.
150       * @throws IOException
151       */
152      public static Job getInstance() throws IOException {
153        // create with a null Cluster
154        return getInstance(new Configuration());
155      }
156          
157      /**
158       * Creates a new {@link Job} with no particular {@link Cluster} and a 
159       * given {@link Configuration}.
160       * 
161       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
162       * that any necessary internal modifications do not reflect on the incoming 
163       * parameter.
164       * 
165       * A Cluster will be created from the conf parameter only when it's needed.
166       * 
167       * @param conf the configuration
168       * @return the {@link Job} , with no connection to a cluster yet.
169       * @throws IOException
170       */
171      public static Job getInstance(Configuration conf) throws IOException {
172        // create with a null Cluster
173        JobConf jobConf = new JobConf(conf);
174        return new Job(jobConf);
175      }
176    
177          
178      /**
179       * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
180       * A Cluster will be created from the conf parameter only when it's needed.
181       *
182       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
183       * that any necessary internal modifications do not reflect on the incoming 
184       * parameter.
185       * 
186       * @param conf the configuration
187       * @return the {@link Job} , with no connection to a cluster yet.
188       * @throws IOException
189       */
190      public static Job getInstance(Configuration conf, String jobName)
191               throws IOException {
192        // create with a null Cluster
193        Job result = getInstance(conf);
194        result.setJobName(jobName);
195        return result;
196      }
197      
198      /**
199       * Creates a new {@link Job} with no particular {@link Cluster} and given
200       * {@link Configuration} and {@link JobStatus}.
201       * A Cluster will be created from the conf parameter only when it's needed.
202       * 
203       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
204       * that any necessary internal modifications do not reflect on the incoming 
205       * parameter.
206       * 
207       * @param status job status
208       * @param conf job configuration
209       * @return the {@link Job} , with no connection to a cluster yet.
210       * @throws IOException
211       */
212      public static Job getInstance(JobStatus status, Configuration conf) 
213      throws IOException {
214        return new Job(status, new JobConf(conf));
215      }
216    
217      /**
218       * Creates a new {@link Job} with no particular {@link Cluster}.
219       * A Cluster will be created from the conf parameter only when it's needed.
220       *
221       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
222       * that any necessary internal modifications do not reflect on the incoming 
223       * parameter.
224       * 
225       * @param ignored
226       * @return the {@link Job} , with no connection to a cluster yet.
227       * @throws IOException
228       * @deprecated Use {@link #getInstance()}
229       */
230      @Deprecated
231      public static Job getInstance(Cluster ignored) throws IOException {
232        return getInstance();
233      }
234      
235      /**
236       * Creates a new {@link Job} with no particular {@link Cluster} and given
237       * {@link Configuration}.
238       * A Cluster will be created from the conf parameter only when it's needed.
239       * 
240       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
241       * that any necessary internal modifications do not reflect on the incoming 
242       * parameter.
243       * 
244       * @param ignored
245       * @param conf job configuration
246       * @return the {@link Job} , with no connection to a cluster yet.
247       * @throws IOException
248       * @deprecated Use {@link #getInstance(Configuration)}
249       */
250      @Deprecated
251      public static Job getInstance(Cluster ignored, Configuration conf) 
252          throws IOException {
253        return getInstance(conf);
254      }
255      
256      /**
257       * Creates a new {@link Job} with no particular {@link Cluster} and given
258       * {@link Configuration} and {@link JobStatus}.
259       * A Cluster will be created from the conf parameter only when it's needed.
260       * 
261       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
262       * that any necessary internal modifications do not reflect on the incoming 
263       * parameter.
264       * 
265       * @param cluster cluster
266       * @param status job status
267       * @param conf job configuration
268       * @return the {@link Job} , with no connection to a cluster yet.
269       * @throws IOException
270       */
271      @Private
272      public static Job getInstance(Cluster cluster, JobStatus status, 
273          Configuration conf) throws IOException {
274        Job job = getInstance(status, conf);
275        job.setCluster(cluster);
276        return job;
277      }
278    
279      private void ensureState(JobState state) throws IllegalStateException {
280        if (state != this.state) {
281          throw new IllegalStateException("Job in state "+ this.state + 
282                                          " instead of " + state);
283        }
284    
285        if (state == JobState.RUNNING && cluster == null) {
286          throw new IllegalStateException
287            ("Job in state " + this.state
288             + ", but it isn't attached to any job tracker!");
289        }
290      }
291    
292      /**
293       * Some methods rely on having a recent job status object.  Refresh
294       * it, if necessary
295       */
296      synchronized void ensureFreshStatus() 
297          throws IOException, InterruptedException {
298        if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
299          updateStatus();
300        }
301      }
302        
303      /** Some methods need to update status immediately. So, refresh
304       * immediately
305       * @throws IOException
306       */
307      synchronized void updateStatus() throws IOException, InterruptedException {
308        this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
309          @Override
310          public JobStatus run() throws IOException, InterruptedException {
311            return cluster.getClient().getJobStatus(status.getJobID());
312          }
313        });
314        if (this.status == null) {
315          throw new IOException("Job status not available ");
316        }
317        this.statustime = System.currentTimeMillis();
318      }
319      
320      public JobStatus getStatus() throws IOException, InterruptedException {
321        ensureState(JobState.RUNNING);
322        updateStatus();
323        return status;
324      }
325      
326      private void setStatus(JobStatus status) {
327        this.status = status;
328      }
329    
330      /**
331       * Returns the current state of the Job.
332       * 
333       * @return JobStatus#State
334       * @throws IOException
335       * @throws InterruptedException
336       */
337      public JobStatus.State getJobState() 
338          throws IOException, InterruptedException {
339        ensureState(JobState.RUNNING);
340        updateStatus();
341        return status.getState();
342      }
343      
344      /**
345       * Get the URL where some job progress information will be displayed.
346       * 
347       * @return the URL where some job progress information will be displayed.
348       */
349      public String getTrackingURL(){
350        ensureState(JobState.RUNNING);
351        return status.getTrackingUrl().toString();
352      }
353    
354      /**
355       * Get the path of the submitted job configuration.
356       * 
357       * @return the path of the submitted job configuration.
358       */
359      public String getJobFile() {
360        ensureState(JobState.RUNNING);
361        return status.getJobFile();
362      }
363    
364      /**
365       * Get start time of the job.
366       * 
367       * @return the start time of the job
368       */
369      public long getStartTime() {
370        ensureState(JobState.RUNNING);
371        return status.getStartTime();
372      }
373    
374      /**
375       * Get finish time of the job.
376       * 
377       * @return the finish time of the job
378       */
379      public long getFinishTime() throws IOException, InterruptedException {
380        ensureState(JobState.RUNNING);
381        updateStatus();
382        return status.getFinishTime();
383      }
384    
385      /**
386       * Get scheduling info of the job.
387       * 
388       * @return the scheduling info of the job
389       */
390      public String getSchedulingInfo() {
391        ensureState(JobState.RUNNING);
392        return status.getSchedulingInfo();
393      }
394    
395      /**
396       * Get scheduling info of the job.
397       * 
398       * @return the scheduling info of the job
399       */
400      public JobPriority getPriority() throws IOException, InterruptedException {
401        ensureState(JobState.RUNNING);
402        updateStatus();
403        return status.getPriority();
404      }
405    
406      /**
407       * The user-specified job name.
408       */
409      public String getJobName() {
410        if (state == JobState.DEFINE) {
411          return super.getJobName();
412        }
413        ensureState(JobState.RUNNING);
414        return status.getJobName();
415      }
416    
417      public String getHistoryUrl() throws IOException, InterruptedException {
418        ensureState(JobState.RUNNING);
419        updateStatus();
420        return status.getHistoryFile();
421      }
422    
423      public boolean isRetired() throws IOException, InterruptedException {
424        ensureState(JobState.RUNNING);
425        updateStatus();
426        return status.isRetired();
427      }
428      
429      @Private
430      public Cluster getCluster() {
431        return cluster;
432      }
433    
434      /** Only for mocks in unit tests. */
435      @Private
436      private void setCluster(Cluster cluster) {
437        this.cluster = cluster;
438      }
439    
440      /**
441       * Dump stats to screen.
442       */
443      @Override
444      public String toString() {
445        ensureState(JobState.RUNNING);
446        String reasonforFailure = " ";
447        int numMaps = 0;
448        int numReduces = 0;
449        try {
450          updateStatus();
451          if (status.getState().equals(JobStatus.State.FAILED))
452            reasonforFailure = getTaskFailureEventString();
453          numMaps = getTaskReports(TaskType.MAP).length;
454          numReduces = getTaskReports(TaskType.REDUCE).length;
455        } catch (IOException e) {
456        } catch (InterruptedException ie) {
457        }
458        StringBuffer sb = new StringBuffer();
459        sb.append("Job: ").append(status.getJobID()).append("\n");
460        sb.append("Job File: ").append(status.getJobFile()).append("\n");
461        sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
462        sb.append("\n");
463        sb.append("Uber job : ").append(status.isUber()).append("\n");
464        sb.append("Number of maps: ").append(numMaps).append("\n");
465        sb.append("Number of reduces: ").append(numReduces).append("\n");
466        sb.append("map() completion: ");
467        sb.append(status.getMapProgress()).append("\n");
468        sb.append("reduce() completion: ");
469        sb.append(status.getReduceProgress()).append("\n");
470        sb.append("Job state: ");
471        sb.append(status.getState()).append("\n");
472        sb.append("retired: ").append(status.isRetired()).append("\n");
473        sb.append("reason for failure: ").append(reasonforFailure);
474        return sb.toString();
475      }
476    
477      /**
478       * @return taskid which caused job failure
479       * @throws IOException
480       * @throws InterruptedException
481       */
482      String getTaskFailureEventString() throws IOException,
483          InterruptedException {
484        int failCount = 1;
485        TaskCompletionEvent lastEvent = null;
486        TaskCompletionEvent[] events = ugi.doAs(new 
487            PrivilegedExceptionAction<TaskCompletionEvent[]>() {
488              @Override
489              public TaskCompletionEvent[] run() throws IOException,
490              InterruptedException {
491                return cluster.getClient().getTaskCompletionEvents(
492                    status.getJobID(), 0, 10);
493              }
494            });
495        for (TaskCompletionEvent event : events) {
496          if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
497            failCount++;
498            lastEvent = event;
499          }
500        }
501        if (lastEvent == null) {
502          return "There are no failed tasks for the job. "
503              + "Job is failed due to some other reason and reason "
504              + "can be found in the logs.";
505        }
506        String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
507        String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
508        return (" task " + taskID + " failed " +
509          failCount + " times " + "For details check tasktracker at: " +
510          lastEvent.getTaskTrackerHttp());
511      }
512    
513      /**
514       * Get the information of the current state of the tasks of a job.
515       * 
516       * @param type Type of the task
517       * @return the list of all of the map tips.
518       * @throws IOException
519       */
520      public TaskReport[] getTaskReports(TaskType type) 
521          throws IOException, InterruptedException {
522        ensureState(JobState.RUNNING);
523        final TaskType tmpType = type;
524        return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
525          public TaskReport[] run() throws IOException, InterruptedException {
526            return cluster.getClient().getTaskReports(getJobID(), tmpType);
527          }
528        });
529      }
530    
531      /**
532       * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
533       * and 1.0.  When all map tasks have completed, the function returns 1.0.
534       * 
535       * @return the progress of the job's map-tasks.
536       * @throws IOException
537       */
538      public float mapProgress() throws IOException, InterruptedException {
539        ensureState(JobState.RUNNING);
540        ensureFreshStatus();
541        return status.getMapProgress();
542      }
543    
544      /**
545       * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
546       * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
547       * 
548       * @return the progress of the job's reduce-tasks.
549       * @throws IOException
550       */
551      public float reduceProgress() throws IOException, InterruptedException {
552        ensureState(JobState.RUNNING);
553        ensureFreshStatus();
554        return status.getReduceProgress();
555      }
556    
557      /**
558       * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
559       * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
560       * 
561       * @return the progress of the job's cleanup-tasks.
562       * @throws IOException
563       */
564      public float cleanupProgress() throws IOException, InterruptedException {
565        ensureState(JobState.RUNNING);
566        ensureFreshStatus();
567        return status.getCleanupProgress();
568      }
569    
570      /**
571       * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
572       * and 1.0.  When all setup tasks have completed, the function returns 1.0.
573       * 
574       * @return the progress of the job's setup-tasks.
575       * @throws IOException
576       */
577      public float setupProgress() throws IOException, InterruptedException {
578        ensureState(JobState.RUNNING);
579        ensureFreshStatus();
580        return status.getSetupProgress();
581      }
582    
583      /**
584       * Check if the job is finished or not. 
585       * This is a non-blocking call.
586       * 
587       * @return <code>true</code> if the job is complete, else <code>false</code>.
588       * @throws IOException
589       */
590      public boolean isComplete() throws IOException, InterruptedException {
591        ensureState(JobState.RUNNING);
592        updateStatus();
593        return status.isJobComplete();
594      }
595    
596      /**
597       * Check if the job completed successfully. 
598       * 
599       * @return <code>true</code> if the job succeeded, else <code>false</code>.
600       * @throws IOException
601       */
602      public boolean isSuccessful() throws IOException, InterruptedException {
603        ensureState(JobState.RUNNING);
604        updateStatus();
605        return status.getState() == JobStatus.State.SUCCEEDED;
606      }
607    
608      /**
609       * Kill the running job.  Blocks until all job tasks have been
610       * killed as well.  If the job is no longer running, it simply returns.
611       * 
612       * @throws IOException
613       */
614      public void killJob() throws IOException, InterruptedException {
615        ensureState(JobState.RUNNING);
616        cluster.getClient().killJob(getJobID());
617      }
618    
619      /**
620       * Set the priority of a running job.
621       * @param priority the new priority for the job.
622       * @throws IOException
623       */
624      public void setPriority(JobPriority priority) 
625          throws IOException, InterruptedException {
626        if (state == JobState.DEFINE) {
627          conf.setJobPriority(
628            org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
629        } else {
630          ensureState(JobState.RUNNING);
631          final JobPriority tmpPriority = priority;
632          ugi.doAs(new PrivilegedExceptionAction<Object>() {
633            @Override
634            public Object run() throws IOException, InterruptedException {
635              cluster.getClient().setJobPriority(getJobID(), tmpPriority.toString());
636              return null;
637            }
638          });
639        }
640      }
641    
642      /**
643       * Get events indicating completion (success/failure) of component tasks.
644       *  
645       * @param startFrom index to start fetching events from
646       * @param numEvents number of events to fetch
647       * @return an array of {@link TaskCompletionEvent}s
648       * @throws IOException
649       */
650      public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
651          final int numEvents) throws IOException, InterruptedException {
652        ensureState(JobState.RUNNING);
653        return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
654          @Override
655          public TaskCompletionEvent[] run() throws IOException, InterruptedException {
656            return cluster.getClient().getTaskCompletionEvents(getJobID(),
657                startFrom, numEvents); 
658          }
659        });
660        }
661      
662      /**
663       * Kill indicated task attempt.
664       * 
665       * @param taskId the id of the task to be terminated.
666       * @throws IOException
667       */
668      public boolean killTask(final TaskAttemptID taskId) 
669          throws IOException, InterruptedException {
670        ensureState(JobState.RUNNING);
671        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
672          public Boolean run() throws IOException, InterruptedException {
673            return cluster.getClient().killTask(taskId, false);
674          }
675        });
676      }
677    
678      /**
679       * Fail indicated task attempt.
680       * 
681       * @param taskId the id of the task to be terminated.
682       * @throws IOException
683       */
684      public boolean failTask(final TaskAttemptID taskId) 
685          throws IOException, InterruptedException {
686        ensureState(JobState.RUNNING);
687        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
688          @Override
689          public Boolean run() throws IOException, InterruptedException {
690            return cluster.getClient().killTask(taskId, true);
691          }
692        });
693      }
694    
695      /**
696       * Gets the counters for this job. May return null if the job has been
697       * retired and the job is no longer in the completed job store.
698       * 
699       * @return the counters for this job.
700       * @throws IOException
701       */
702      public Counters getCounters() 
703          throws IOException, InterruptedException {
704        ensureState(JobState.RUNNING);
705        return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
706          @Override
707          public Counters run() throws IOException, InterruptedException {
708            return cluster.getClient().getJobCounters(getJobID());
709          }
710        });
711      }
712    
713      /**
714       * Gets the diagnostic messages for a given task attempt.
715       * @param taskid
716       * @return the list of diagnostic messages for the task
717       * @throws IOException
718       */
719      public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
720          throws IOException, InterruptedException {
721        ensureState(JobState.RUNNING);
722        return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
723          @Override
724          public String[] run() throws IOException, InterruptedException {
725            return cluster.getClient().getTaskDiagnostics(taskid);
726          }
727        });
728      }
729    
730      /**
731       * Set the number of reduce tasks for the job.
732       * @param tasks the number of reduce tasks
733       * @throws IllegalStateException if the job is submitted
734       */
735      public void setNumReduceTasks(int tasks) throws IllegalStateException {
736        ensureState(JobState.DEFINE);
737        conf.setNumReduceTasks(tasks);
738      }
739    
740      /**
741       * Set the current working directory for the default file system.
742       * 
743       * @param dir the new current working directory.
744       * @throws IllegalStateException if the job is submitted
745       */
746      public void setWorkingDirectory(Path dir) throws IOException {
747        ensureState(JobState.DEFINE);
748        conf.setWorkingDirectory(dir);
749      }
750    
751      /**
752       * Set the {@link InputFormat} for the job.
753       * @param cls the <code>InputFormat</code> to use
754       * @throws IllegalStateException if the job is submitted
755       */
756      public void setInputFormatClass(Class<? extends InputFormat> cls
757                                      ) throws IllegalStateException {
758        ensureState(JobState.DEFINE);
759        conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
760                      InputFormat.class);
761      }
762    
763      /**
764       * Set the {@link OutputFormat} for the job.
765       * @param cls the <code>OutputFormat</code> to use
766       * @throws IllegalStateException if the job is submitted
767       */
768      public void setOutputFormatClass(Class<? extends OutputFormat> cls
769                                       ) throws IllegalStateException {
770        ensureState(JobState.DEFINE);
771        conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
772                      OutputFormat.class);
773      }
774    
775      /**
776       * Set the {@link Mapper} for the job.
777       * @param cls the <code>Mapper</code> to use
778       * @throws IllegalStateException if the job is submitted
779       */
780      public void setMapperClass(Class<? extends Mapper> cls
781                                 ) throws IllegalStateException {
782        ensureState(JobState.DEFINE);
783        conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
784      }
785    
786      /**
787       * Set the Jar by finding where a given class came from.
788       * @param cls the example class
789       */
790      public void setJarByClass(Class<?> cls) {
791        ensureState(JobState.DEFINE);
792        conf.setJarByClass(cls);
793      }
794    
795      /**
796       * Set the job jar 
797       */
798      public void setJar(String jar) {
799        ensureState(JobState.DEFINE);
800        conf.setJar(jar);
801      }
802    
803      /**
804       * Set the reported username for this job.
805       * 
806       * @param user the username for this job.
807       */
808      public void setUser(String user) {
809        ensureState(JobState.DEFINE);
810        conf.setUser(user);
811      }
812    
813      /**
814       * Set the combiner class for the job.
815       * @param cls the combiner to use
816       * @throws IllegalStateException if the job is submitted
817       */
818      public void setCombinerClass(Class<? extends Reducer> cls
819                                   ) throws IllegalStateException {
820        ensureState(JobState.DEFINE);
821        conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
822      }
823    
824      /**
825       * Set the {@link Reducer} for the job.
826       * @param cls the <code>Reducer</code> to use
827       * @throws IllegalStateException if the job is submitted
828       */
829      public void setReducerClass(Class<? extends Reducer> cls
830                                  ) throws IllegalStateException {
831        ensureState(JobState.DEFINE);
832        conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
833      }
834    
835      /**
836       * Set the {@link Partitioner} for the job.
837       * @param cls the <code>Partitioner</code> to use
838       * @throws IllegalStateException if the job is submitted
839       */
840      public void setPartitionerClass(Class<? extends Partitioner> cls
841                                      ) throws IllegalStateException {
842        ensureState(JobState.DEFINE);
843        conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
844                      Partitioner.class);
845      }
846    
847      /**
848       * Set the key class for the map output data. This allows the user to
849       * specify the map output key class to be different than the final output
850       * value class.
851       * 
852       * @param theClass the map output key class.
853       * @throws IllegalStateException if the job is submitted
854       */
855      public void setMapOutputKeyClass(Class<?> theClass
856                                       ) throws IllegalStateException {
857        ensureState(JobState.DEFINE);
858        conf.setMapOutputKeyClass(theClass);
859      }
860    
861      /**
862       * Set the value class for the map output data. This allows the user to
863       * specify the map output value class to be different than the final output
864       * value class.
865       * 
866       * @param theClass the map output value class.
867       * @throws IllegalStateException if the job is submitted
868       */
869      public void setMapOutputValueClass(Class<?> theClass
870                                         ) throws IllegalStateException {
871        ensureState(JobState.DEFINE);
872        conf.setMapOutputValueClass(theClass);
873      }
874    
875      /**
876       * Set the key class for the job output data.
877       * 
878       * @param theClass the key class for the job output data.
879       * @throws IllegalStateException if the job is submitted
880       */
881      public void setOutputKeyClass(Class<?> theClass
882                                    ) throws IllegalStateException {
883        ensureState(JobState.DEFINE);
884        conf.setOutputKeyClass(theClass);
885      }
886    
887      /**
888       * Set the value class for job outputs.
889       * 
890       * @param theClass the value class for job outputs.
891       * @throws IllegalStateException if the job is submitted
892       */
893      public void setOutputValueClass(Class<?> theClass
894                                      ) throws IllegalStateException {
895        ensureState(JobState.DEFINE);
896        conf.setOutputValueClass(theClass);
897      }
898    
899      /**
900       * Define the comparator that controls how the keys are sorted before they
901       * are passed to the {@link Reducer}.
902       * @param cls the raw comparator
903       * @throws IllegalStateException if the job is submitted
904       */
905      public void setSortComparatorClass(Class<? extends RawComparator> cls
906                                         ) throws IllegalStateException {
907        ensureState(JobState.DEFINE);
908        conf.setOutputKeyComparatorClass(cls);
909      }
910    
911      /**
912       * Define the comparator that controls which keys are grouped together
913       * for a single call to 
914       * {@link Reducer#reduce(Object, Iterable, 
915       *                       org.apache.hadoop.mapreduce.Reducer.Context)}
916       * @param cls the raw comparator to use
917       * @throws IllegalStateException if the job is submitted
918       */
919      public void setGroupingComparatorClass(Class<? extends RawComparator> cls
920                                             ) throws IllegalStateException {
921        ensureState(JobState.DEFINE);
922        conf.setOutputValueGroupingComparator(cls);
923      }
924    
925      /**
926       * Set the user-specified job name.
927       * 
928       * @param name the job's new name.
929       * @throws IllegalStateException if the job is submitted
930       */
931      public void setJobName(String name) throws IllegalStateException {
932        ensureState(JobState.DEFINE);
933        conf.setJobName(name);
934      }
935    
936      /**
937       * Turn speculative execution on or off for this job. 
938       * 
939       * @param speculativeExecution <code>true</code> if speculative execution 
940       *                             should be turned on, else <code>false</code>.
941       */
942      public void setSpeculativeExecution(boolean speculativeExecution) {
943        ensureState(JobState.DEFINE);
944        conf.setSpeculativeExecution(speculativeExecution);
945      }
946    
947      /**
948       * Turn speculative execution on or off for this job for map tasks. 
949       * 
950       * @param speculativeExecution <code>true</code> if speculative execution 
951       *                             should be turned on for map tasks,
952       *                             else <code>false</code>.
953       */
954      public void setMapSpeculativeExecution(boolean speculativeExecution) {
955        ensureState(JobState.DEFINE);
956        conf.setMapSpeculativeExecution(speculativeExecution);
957      }
958    
959      /**
960       * Turn speculative execution on or off for this job for reduce tasks. 
961       * 
962       * @param speculativeExecution <code>true</code> if speculative execution 
963       *                             should be turned on for reduce tasks,
964       *                             else <code>false</code>.
965       */
966      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
967        ensureState(JobState.DEFINE);
968        conf.setReduceSpeculativeExecution(speculativeExecution);
969      }
970    
971      /**
972       * Specify whether job-setup and job-cleanup is needed for the job 
973       * 
974       * @param needed If <code>true</code>, job-setup and job-cleanup will be
975       *               considered from {@link OutputCommitter} 
976       *               else ignored.
977       */
978      public void setJobSetupCleanupNeeded(boolean needed) {
979        ensureState(JobState.DEFINE);
980        conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
981      }
982    
983      /**
984       * Set the given set of archives
985       * @param archives The list of archives that need to be localized
986       */
987      public void setCacheArchives(URI[] archives) {
988        ensureState(JobState.DEFINE);
989        DistributedCache.setCacheArchives(archives, conf);
990      }
991    
992      /**
993       * Set the given set of files
994       * @param files The list of files that need to be localized
995       */
996      public void setCacheFiles(URI[] files) {
997        ensureState(JobState.DEFINE);
998        DistributedCache.setCacheFiles(files, conf);
999      }
1000    
1001      /**
1002       * Add a archives to be localized
1003       * @param uri The uri of the cache to be localized
1004       */
1005      public void addCacheArchive(URI uri) {
1006        ensureState(JobState.DEFINE);
1007        DistributedCache.addCacheArchive(uri, conf);
1008      }
1009      
1010      /**
1011       * Add a file to be localized
1012       * @param uri The uri of the cache to be localized
1013       */
1014      public void addCacheFile(URI uri) {
1015        ensureState(JobState.DEFINE);
1016        DistributedCache.addCacheFile(uri, conf);
1017      }
1018    
1019      /**
1020       * Add an file path to the current set of classpath entries It adds the file
1021       * to cache as well.
1022       * 
1023       * Files added with this method will not be unpacked while being added to the
1024       * classpath.
1025       * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
1026       * method instead.
1027       *
1028       * @param file Path of the file to be added
1029       */
1030      public void addFileToClassPath(Path file)
1031        throws IOException {
1032        ensureState(JobState.DEFINE);
1033        DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
1034      }
1035    
1036      /**
1037       * Add an archive path to the current set of classpath entries. It adds the
1038       * archive to cache as well.
1039       * 
1040       * Archive files will be unpacked and added to the classpath
1041       * when being distributed.
1042       *
1043       * @param archive Path of the archive to be added
1044       */
1045      public void addArchiveToClassPath(Path archive)
1046        throws IOException {
1047        ensureState(JobState.DEFINE);
1048        DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
1049      }
1050    
1051      /**
1052       * Originally intended to enable symlinks, but currently symlinks cannot be
1053       * disabled.
1054       */
1055      @Deprecated
1056      public void createSymlink() {
1057        ensureState(JobState.DEFINE);
1058        DistributedCache.createSymlink(conf);
1059      }
1060      
1061      /** 
1062       * Expert: Set the number of maximum attempts that will be made to run a
1063       * map task.
1064       * 
1065       * @param n the number of attempts per map task.
1066       */
1067      public void setMaxMapAttempts(int n) {
1068        ensureState(JobState.DEFINE);
1069        conf.setMaxMapAttempts(n);
1070      }
1071    
1072      /** 
1073       * Expert: Set the number of maximum attempts that will be made to run a
1074       * reduce task.
1075       * 
1076       * @param n the number of attempts per reduce task.
1077       */
1078      public void setMaxReduceAttempts(int n) {
1079        ensureState(JobState.DEFINE);
1080        conf.setMaxReduceAttempts(n);
1081      }
1082    
1083      /**
1084       * Set whether the system should collect profiler information for some of 
1085       * the tasks in this job? The information is stored in the user log 
1086       * directory.
1087       * @param newValue true means it should be gathered
1088       */
1089      public void setProfileEnabled(boolean newValue) {
1090        ensureState(JobState.DEFINE);
1091        conf.setProfileEnabled(newValue);
1092      }
1093    
1094      /**
1095       * Set the profiler configuration arguments. If the string contains a '%s' it
1096       * will be replaced with the name of the profiling output file when the task
1097       * runs.
1098       *
1099       * This value is passed to the task child JVM on the command line.
1100       *
1101       * @param value the configuration string
1102       */
1103      public void setProfileParams(String value) {
1104        ensureState(JobState.DEFINE);
1105        conf.setProfileParams(value);
1106      }
1107    
1108      /**
1109       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1110       * must also be called.
1111       * @param newValue a set of integer ranges of the map ids
1112       */
1113      public void setProfileTaskRange(boolean isMap, String newValue) {
1114        ensureState(JobState.DEFINE);
1115        conf.setProfileTaskRange(isMap, newValue);
1116      }
1117    
1118      private void ensureNotSet(String attr, String msg) throws IOException {
1119        if (conf.get(attr) != null) {
1120          throw new IOException(attr + " is incompatible with " + msg + " mode.");
1121        }    
1122      }
1123      
1124      /**
1125       * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
1126       * tokens upon job completion. Defaults to true.
1127       */
1128      public void setCancelDelegationTokenUponJobCompletion(boolean value) {
1129        ensureState(JobState.DEFINE);
1130        conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
1131      }
1132    
1133      /**
1134       * Default to the new APIs unless they are explicitly set or the old mapper or
1135       * reduce attributes are used.
1136       * @throws IOException if the configuration is inconsistant
1137       */
1138      private void setUseNewAPI() throws IOException {
1139        int numReduces = conf.getNumReduceTasks();
1140        String oldMapperClass = "mapred.mapper.class";
1141        String oldReduceClass = "mapred.reducer.class";
1142        conf.setBooleanIfUnset("mapred.mapper.new-api",
1143                               conf.get(oldMapperClass) == null);
1144        if (conf.getUseNewMapper()) {
1145          String mode = "new map API";
1146          ensureNotSet("mapred.input.format.class", mode);
1147          ensureNotSet(oldMapperClass, mode);
1148          if (numReduces != 0) {
1149            ensureNotSet("mapred.partitioner.class", mode);
1150           } else {
1151            ensureNotSet("mapred.output.format.class", mode);
1152          }      
1153        } else {
1154          String mode = "map compatability";
1155          ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
1156          ensureNotSet(MAP_CLASS_ATTR, mode);
1157          if (numReduces != 0) {
1158            ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
1159           } else {
1160            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1161          }
1162        }
1163        if (numReduces != 0) {
1164          conf.setBooleanIfUnset("mapred.reducer.new-api",
1165                                 conf.get(oldReduceClass) == null);
1166          if (conf.getUseNewReducer()) {
1167            String mode = "new reduce API";
1168            ensureNotSet("mapred.output.format.class", mode);
1169            ensureNotSet(oldReduceClass, mode);   
1170          } else {
1171            String mode = "reduce compatability";
1172            ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
1173            ensureNotSet(REDUCE_CLASS_ATTR, mode);   
1174          }
1175        }   
1176      }
1177    
1178      private synchronized void connect()
1179              throws IOException, InterruptedException, ClassNotFoundException {
1180        if (cluster == null) {
1181          cluster = 
1182            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
1183                       public Cluster run()
1184                              throws IOException, InterruptedException, 
1185                                     ClassNotFoundException {
1186                         return new Cluster(getConfiguration());
1187                       }
1188                     });
1189        }
1190      }
1191    
1192      boolean isConnected() {
1193        return cluster != null;
1194      }
1195    
1196      /** Only for mocking via unit tests. */
1197      @Private
1198      public JobSubmitter getJobSubmitter(FileSystem fs, 
1199          ClientProtocol submitClient) throws IOException {
1200        return new JobSubmitter(fs, submitClient);
1201      }
1202      /**
1203       * Submit the job to the cluster and return immediately.
1204       * @throws IOException
1205       */
1206      public void submit() 
1207             throws IOException, InterruptedException, ClassNotFoundException {
1208        ensureState(JobState.DEFINE);
1209        setUseNewAPI();
1210        connect();
1211        final JobSubmitter submitter = 
1212            getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
1213        status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
1214          public JobStatus run() throws IOException, InterruptedException, 
1215          ClassNotFoundException {
1216            return submitter.submitJobInternal(Job.this, cluster);
1217          }
1218        });
1219        state = JobState.RUNNING;
1220        LOG.info("The url to track the job: " + getTrackingURL());
1221       }
1222      
1223      /**
1224       * Submit the job to the cluster and wait for it to finish.
1225       * @param verbose print the progress to the user
1226       * @return true if the job succeeded
1227       * @throws IOException thrown if the communication with the 
1228       *         <code>JobTracker</code> is lost
1229       */
1230      public boolean waitForCompletion(boolean verbose
1231                                       ) throws IOException, InterruptedException,
1232                                                ClassNotFoundException {
1233        if (state == JobState.DEFINE) {
1234          submit();
1235        }
1236        if (verbose) {
1237          monitorAndPrintJob();
1238        } else {
1239          // get the completion poll interval from the client.
1240          int completionPollIntervalMillis = 
1241            Job.getCompletionPollInterval(cluster.getConf());
1242          while (!isComplete()) {
1243            try {
1244              Thread.sleep(completionPollIntervalMillis);
1245            } catch (InterruptedException ie) {
1246            }
1247          }
1248        }
1249        return isSuccessful();
1250      }
1251      
1252      /**
1253       * Monitor a job and print status in real-time as progress is made and tasks 
1254       * fail.
1255       * @return true if the job succeeded
1256       * @throws IOException if communication to the JobTracker fails
1257       */
1258      public boolean monitorAndPrintJob() 
1259          throws IOException, InterruptedException {
1260        String lastReport = null;
1261        Job.TaskStatusFilter filter;
1262        Configuration clientConf = getConfiguration();
1263        filter = Job.getTaskOutputFilter(clientConf);
1264        JobID jobId = getJobID();
1265        LOG.info("Running job: " + jobId);
1266        int eventCounter = 0;
1267        boolean profiling = getProfileEnabled();
1268        IntegerRanges mapRanges = getProfileTaskRange(true);
1269        IntegerRanges reduceRanges = getProfileTaskRange(false);
1270        int progMonitorPollIntervalMillis = 
1271          Job.getProgressPollInterval(clientConf);
1272        /* make sure to report full progress after the job is done */
1273        boolean reportedAfterCompletion = false;
1274        boolean reportedUberMode = false;
1275        while (!isComplete() || !reportedAfterCompletion) {
1276          if (isComplete()) {
1277            reportedAfterCompletion = true;
1278          } else {
1279            Thread.sleep(progMonitorPollIntervalMillis);
1280          }
1281          if (status.getState() == JobStatus.State.PREP) {
1282            continue;
1283          }      
1284          if (!reportedUberMode) {
1285            reportedUberMode = true;
1286            LOG.info("Job " + jobId + " running in uber mode : " + isUber());
1287          }      
1288          String report = 
1289            (" map " + StringUtils.formatPercent(mapProgress(), 0)+
1290                " reduce " + 
1291                StringUtils.formatPercent(reduceProgress(), 0));
1292          if (!report.equals(lastReport)) {
1293            LOG.info(report);
1294            lastReport = report;
1295          }
1296    
1297          TaskCompletionEvent[] events = 
1298            getTaskCompletionEvents(eventCounter, 10); 
1299          eventCounter += events.length;
1300          printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
1301        }
1302        boolean success = isSuccessful();
1303        if (success) {
1304          LOG.info("Job " + jobId + " completed successfully");
1305        } else {
1306          LOG.info("Job " + jobId + " failed with state " + status.getState() + 
1307              " due to: " + status.getFailureInfo());
1308        }
1309        Counters counters = getCounters();
1310        if (counters != null) {
1311          LOG.info(counters.toString());
1312        }
1313        return success;
1314      }
1315    
1316      /**
1317       * @return true if the profile parameters indicate that this is using
1318       * hprof, which generates profile files in a particular location
1319       * that we can retrieve to the client.
1320       */
1321      private boolean shouldDownloadProfile() {
1322        // Check the argument string that was used to initialize profiling.
1323        // If this indicates hprof and file-based output, then we're ok to
1324        // download.
1325        String profileParams = getProfileParams();
1326    
1327        if (null == profileParams) {
1328          return false;
1329        }
1330    
1331        // Split this on whitespace.
1332        String [] parts = profileParams.split("[ \\t]+");
1333    
1334        // If any of these indicate hprof, and the use of output files, return true.
1335        boolean hprofFound = false;
1336        boolean fileFound = false;
1337        for (String p : parts) {
1338          if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
1339            hprofFound = true;
1340    
1341            // This contains a number of comma-delimited components, one of which
1342            // may specify the file to write to. Make sure this is present and
1343            // not empty.
1344            String [] subparts = p.split(",");
1345            for (String sub : subparts) {
1346              if (sub.startsWith("file=") && sub.length() != "file=".length()) {
1347                fileFound = true;
1348              }
1349            }
1350          }
1351        }
1352    
1353        return hprofFound && fileFound;
1354      }
1355    
1356      private void printTaskEvents(TaskCompletionEvent[] events,
1357          Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
1358          IntegerRanges reduceRanges) throws IOException, InterruptedException {
1359        for (TaskCompletionEvent event : events) {
1360          switch (filter) {
1361          case NONE:
1362            break;
1363          case SUCCEEDED:
1364            if (event.getStatus() == 
1365              TaskCompletionEvent.Status.SUCCEEDED) {
1366              LOG.info(event.toString());
1367            }
1368            break; 
1369          case FAILED:
1370            if (event.getStatus() == 
1371              TaskCompletionEvent.Status.FAILED) {
1372              LOG.info(event.toString());
1373              // Displaying the task diagnostic information
1374              TaskAttemptID taskId = event.getTaskAttemptId();
1375              String[] taskDiagnostics = getTaskDiagnostics(taskId); 
1376              if (taskDiagnostics != null) {
1377                for (String diagnostics : taskDiagnostics) {
1378                  System.err.println(diagnostics);
1379                }
1380              }
1381            }
1382            break; 
1383          case KILLED:
1384            if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
1385              LOG.info(event.toString());
1386            }
1387            break; 
1388          case ALL:
1389            LOG.info(event.toString());
1390            break;
1391          }
1392        }
1393      }
1394    
1395      /** The interval at which monitorAndPrintJob() prints status */
1396      public static int getProgressPollInterval(Configuration conf) {
1397        // Read progress monitor poll interval from config. Default is 1 second.
1398        int progMonitorPollIntervalMillis = conf.getInt(
1399          PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
1400        if (progMonitorPollIntervalMillis < 1) {
1401          LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
1402            " has been set to an invalid value; "
1403            + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
1404          progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
1405        }
1406        return progMonitorPollIntervalMillis;
1407      }
1408    
1409      /** The interval at which waitForCompletion() should check. */
1410      public static int getCompletionPollInterval(Configuration conf) {
1411        int completionPollIntervalMillis = conf.getInt(
1412          COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
1413        if (completionPollIntervalMillis < 1) { 
1414          LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
1415           " has been set to an invalid value; "
1416           + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
1417          completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
1418        }
1419        return completionPollIntervalMillis;
1420      }
1421    
1422      /**
1423       * Get the task output filter.
1424       * 
1425       * @param conf the configuration.
1426       * @return the filter level.
1427       */
1428      public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
1429        return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
1430      }
1431    
1432      /**
1433       * Modify the Configuration to set the task output filter.
1434       * 
1435       * @param conf the Configuration to modify.
1436       * @param newValue the value to set.
1437       */
1438      public static void setTaskOutputFilter(Configuration conf, 
1439          TaskStatusFilter newValue) {
1440        conf.set(Job.OUTPUT_FILTER, newValue.toString());
1441      }
1442    
1443      public boolean isUber() throws IOException, InterruptedException {
1444        ensureState(JobState.RUNNING);
1445        updateStatus();
1446        return status.isUber();
1447      }
1448      
1449    }