001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapred;
019    
020    import java.io.FileNotFoundException;
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.net.URL;
024    import java.security.PrivilegedExceptionAction;
025    import java.util.ArrayList;
026    import java.util.Collection;
027    import java.util.List;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.conf.Configuration;
032    import org.apache.hadoop.fs.FileSystem;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.io.Text;
035    import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036    import org.apache.hadoop.mapreduce.Cluster;
037    import org.apache.hadoop.mapreduce.ClusterMetrics;
038    import org.apache.hadoop.mapreduce.Job;
039    import org.apache.hadoop.mapreduce.QueueInfo;
040    import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041    import org.apache.hadoop.mapreduce.TaskType;
042    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044    import org.apache.hadoop.mapreduce.tools.CLI;
045    import org.apache.hadoop.mapreduce.util.ConfigUtil;
046    import org.apache.hadoop.security.UserGroupInformation;
047    import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048    import org.apache.hadoop.security.token.Token;
049    import org.apache.hadoop.security.token.TokenRenewer;
050    import org.apache.hadoop.util.Tool;
051    import org.apache.hadoop.util.ToolRunner;
052    
053    /**
054     * <code>JobClient</code> is the primary interface for the user-job to interact
055     * with the cluster.
056     * 
057     * <code>JobClient</code> provides facilities to submit jobs, track their 
058     * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059     * status information etc.
060     * 
061     * <p>The job submission process involves:
062     * <ol>
063     *   <li>
064     *   Checking the input and output specifications of the job.
065     *   </li>
066     *   <li>
067     *   Computing the {@link InputSplit}s for the job.
068     *   </li>
069     *   <li>
070     *   Setup the requisite accounting information for the {@link DistributedCache} 
071     *   of the job, if necessary.
072     *   </li>
073     *   <li>
074     *   Copying the job's jar and configuration to the map-reduce system directory 
075     *   on the distributed file-system. 
076     *   </li>
077     *   <li>
078     *   Submitting the job to the cluster and optionally monitoring
079     *   it's status.
080     *   </li>
081     * </ol></p>
082     *  
083     * Normally the user creates the application, describes various facets of the
084     * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
085     * the job and monitor its progress.
086     * 
087     * <p>Here is an example on how to use <code>JobClient</code>:</p>
088     * <p><blockquote><pre>
089     *     // Create a new JobConf
090     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
091     *     
092     *     // Specify various job-specific parameters     
093     *     job.setJobName("myjob");
094     *     
095     *     job.setInputPath(new Path("in"));
096     *     job.setOutputPath(new Path("out"));
097     *     
098     *     job.setMapperClass(MyJob.MyMapper.class);
099     *     job.setReducerClass(MyJob.MyReducer.class);
100     *
101     *     // Submit the job, then poll for progress until the job is complete
102     *     JobClient.runJob(job);
103     * </pre></blockquote></p>
104     * 
105     * <h4 id="JobControl">Job Control</h4>
106     * 
107     * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
108     * which cannot be done via a single map-reduce job. This is fairly easy since 
109     * the output of the job, typically, goes to distributed file-system and that 
110     * can be used as the input for the next job.</p>
111     * 
112     * <p>However, this also means that the onus on ensuring jobs are complete 
113     * (success/failure) lies squarely on the clients. In such situations the 
114     * various job-control options are:
115     * <ol>
116     *   <li>
117     *   {@link #runJob(JobConf)} : submits the job and returns only after 
118     *   the job has completed.
119     *   </li>
120     *   <li>
121     *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
122     *   returned handle to the {@link RunningJob} to query status and make 
123     *   scheduling decisions.
124     *   </li>
125     *   <li>
126     *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127     *   on job-completion, thus avoiding polling.
128     *   </li>
129     * </ol></p>
130     * 
131     * @see JobConf
132     * @see ClusterStatus
133     * @see Tool
134     * @see DistributedCache
135     */
136    @InterfaceAudience.Public
137    @InterfaceStability.Stable
138    public class JobClient extends CLI {
139      public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140      private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
141      /* notes that get delegation token was called. Again this is hack for oozie 
142       * to make sure we add history server delegation tokens to the credentials
143       *  for the job. Since the api only allows one delegation token to be returned, 
144       *  we have to add this hack.
145       */
146      private boolean getDelegationTokenCalled = false;
147      /* notes the renewer that will renew the delegation token */
148      private String dtRenewer = null;
149      /* do we need a HS delegation token for this client */
150      static final String HS_DELEGATION_TOKEN_REQUIRED 
151          = "mapreduce.history.server.delegationtoken.required";
152      static final String HS_DELEGATION_TOKEN_RENEWER 
153          = "mapreduce.history.server.delegationtoken.renewer";
154      
155      static{
156        ConfigUtil.loadResources();
157      }
158    
159      /**
160       * A NetworkedJob is an implementation of RunningJob.  It holds
161       * a JobProfile object to provide some info, and interacts with the
162       * remote service to provide certain functionality.
163       */
164      static class NetworkedJob implements RunningJob {
165        Job job;
166        /**
167         * We store a JobProfile and a timestamp for when we last
168         * acquired the job profile.  If the job is null, then we cannot
169         * perform any of the tasks.  The job might be null if the cluster
170         * has completely forgotten about the job.  (eg, 24 hours after the
171         * job completes.)
172         */
173        public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
174          job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
175        }
176    
177        public NetworkedJob(Job job) throws IOException {
178          this.job = job;
179        }
180    
181        public Configuration getConfiguration() {
182          return job.getConfiguration();
183        }
184    
185        /**
186         * An identifier for the job
187         */
188        public JobID getID() {
189          return JobID.downgrade(job.getJobID());
190        }
191        
192        /** @deprecated This method is deprecated and will be removed. Applications should 
193         * rather use {@link #getID()}.*/
194        @Deprecated
195        public String getJobID() {
196          return getID().toString();
197        }
198        
199        /**
200         * The user-specified job name
201         */
202        public String getJobName() {
203          return job.getJobName();
204        }
205    
206        /**
207         * The name of the job file
208         */
209        public String getJobFile() {
210          return job.getJobFile();
211        }
212    
213        /**
214         * A URL where the job's status can be seen
215         */
216        public String getTrackingURL() {
217          return job.getTrackingURL();
218        }
219    
220        /**
221         * A float between 0.0 and 1.0, indicating the % of map work
222         * completed.
223         */
224        public float mapProgress() throws IOException {
225          try {
226            return job.mapProgress();
227          } catch (InterruptedException ie) {
228            throw new IOException(ie);
229          }
230        }
231    
232        /**
233         * A float between 0.0 and 1.0, indicating the % of reduce work
234         * completed.
235         */
236        public float reduceProgress() throws IOException {
237          try {
238            return job.reduceProgress();
239          } catch (InterruptedException ie) {
240            throw new IOException(ie);
241          }
242        }
243    
244        /**
245         * A float between 0.0 and 1.0, indicating the % of cleanup work
246         * completed.
247         */
248        public float cleanupProgress() throws IOException {
249          try {
250            return job.cleanupProgress();
251          } catch (InterruptedException ie) {
252            throw new IOException(ie);
253          }
254        }
255    
256        /**
257         * A float between 0.0 and 1.0, indicating the % of setup work
258         * completed.
259         */
260        public float setupProgress() throws IOException {
261          try {
262            return job.setupProgress();
263          } catch (InterruptedException ie) {
264            throw new IOException(ie);
265          }
266        }
267    
268        /**
269         * Returns immediately whether the whole job is done yet or not.
270         */
271        public synchronized boolean isComplete() throws IOException {
272          try {
273            return job.isComplete();
274          } catch (InterruptedException ie) {
275            throw new IOException(ie);
276          }
277        }
278    
279        /**
280         * True iff job completed successfully.
281         */
282        public synchronized boolean isSuccessful() throws IOException {
283          try {
284            return job.isSuccessful();
285          } catch (InterruptedException ie) {
286            throw new IOException(ie);
287          }
288        }
289    
290        /**
291         * Blocks until the job is finished
292         */
293        public void waitForCompletion() throws IOException {
294          try {
295            job.waitForCompletion(false);
296          } catch (InterruptedException ie) {
297            throw new IOException(ie);
298          } catch (ClassNotFoundException ce) {
299            throw new IOException(ce);
300          }
301        }
302    
303        /**
304         * Tells the service to get the state of the current job.
305         */
306        public synchronized int getJobState() throws IOException {
307          try {
308            return job.getJobState().getValue();
309          } catch (InterruptedException ie) {
310            throw new IOException(ie);
311          }
312        }
313        
314        /**
315         * Tells the service to terminate the current job.
316         */
317        public synchronized void killJob() throws IOException {
318          try {
319            job.killJob();
320          } catch (InterruptedException ie) {
321            throw new IOException(ie);
322          }
323        }
324       
325        
326        /** Set the priority of the job.
327        * @param priority new priority of the job. 
328        */
329        public synchronized void setJobPriority(String priority) 
330                                                    throws IOException {
331          try {
332            job.setPriority(
333              org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
334          } catch (InterruptedException ie) {
335            throw new IOException(ie);
336          }
337        }
338        
339        /**
340         * Kill indicated task attempt.
341         * @param taskId the id of the task to kill.
342         * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
343         * it is just killed, w/o affecting job failure status.
344         */
345        public synchronized void killTask(TaskAttemptID taskId,
346            boolean shouldFail) throws IOException {
347          try {
348            if (shouldFail) {
349              job.failTask(taskId);
350            } else {
351              job.killTask(taskId);
352            }
353          } catch (InterruptedException ie) {
354            throw new IOException(ie);
355          }
356        }
357    
358        /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
359        @Deprecated
360        public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
361          killTask(TaskAttemptID.forName(taskId), shouldFail);
362        }
363        
364        /**
365         * Fetch task completion events from cluster for this job. 
366         */
367        public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
368            int startFrom) throws IOException {
369          try {
370            org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 
371              job.getTaskCompletionEvents(startFrom, 10);
372            TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
373            for (int i = 0 ; i < acls.length; i++ ) {
374              ret[i] = TaskCompletionEvent.downgrade(acls[i]);
375            }
376            return ret;
377          } catch (InterruptedException ie) {
378            throw new IOException(ie);
379          }
380        }
381    
382        /**
383         * Dump stats to screen
384         */
385        @Override
386        public String toString() {
387          return job.toString();
388        }
389            
390        /**
391         * Returns the counters for this job
392         */
393        public Counters getCounters() throws IOException {
394          try { 
395            Counters result = null;
396            org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
397            if(temp != null) {
398              result = Counters.downgrade(temp);
399            }
400            return result;
401          } catch (InterruptedException ie) {
402            throw new IOException(ie);
403          }
404        }
405        
406        @Override
407        public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
408          try { 
409            return job.getTaskDiagnostics(id);
410          } catch (InterruptedException ie) {
411            throw new IOException(ie);
412          }
413        }
414    
415        public String getHistoryUrl() throws IOException {
416          try {
417            return job.getHistoryUrl();
418          } catch (InterruptedException ie) {
419            throw new IOException(ie);
420          }
421        }
422    
423        public boolean isRetired() throws IOException {
424          try {
425            return job.isRetired();
426          } catch (InterruptedException ie) {
427            throw new IOException(ie);
428          }
429        }
430        
431        boolean monitorAndPrintJob() throws IOException, InterruptedException {
432          return job.monitorAndPrintJob();
433        }
434        
435        @Override
436        public String getFailureInfo() throws IOException {
437          try {
438            return job.getStatus().getFailureInfo();
439          } catch (InterruptedException ie) {
440            throw new IOException(ie);
441          }
442        }
443    
444        @Override
445        public JobStatus getJobStatus() throws IOException {
446          try {
447            return JobStatus.downgrade(job.getStatus());
448          } catch (InterruptedException ie) {
449            throw new IOException(ie);
450          }
451        }
452      }
453    
454      /**
455       * Ugi of the client. We store this ugi when the client is created and 
456       * then make sure that the same ugi is used to run the various protocols.
457       */
458      UserGroupInformation clientUgi;
459      
460      /**
461       * Create a job client.
462       */
463      public JobClient() {
464      }
465        
466      /**
467       * Build a job client with the given {@link JobConf}, and connect to the 
468       * default cluster
469       * 
470       * @param conf the job configuration.
471       * @throws IOException
472       */
473      public JobClient(JobConf conf) throws IOException {
474        init(conf);
475      }
476    
477      /**
478       * Build a job client with the given {@link Configuration}, 
479       * and connect to the default cluster
480       * 
481       * @param conf the configuration.
482       * @throws IOException
483       */
484      public JobClient(Configuration conf) throws IOException {
485        init(new JobConf(conf));
486      }
487    
488      /**
489       * Connect to the default cluster
490       * @param conf the job configuration.
491       * @throws IOException
492       */
493      public void init(JobConf conf) throws IOException {
494        setConf(conf);
495        cluster = new Cluster(conf);
496        clientUgi = UserGroupInformation.getCurrentUser();
497      }
498    
499      @InterfaceAudience.Private
500      public static class Renewer extends TokenRenewer {
501    
502        @Override
503        public boolean handleKind(Text kind) {
504          return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
505        }
506    
507        @SuppressWarnings("unchecked")
508        @Override
509        public long renew(Token<?> token, Configuration conf
510                          ) throws IOException, InterruptedException {
511          return new Cluster(conf).
512            renewDelegationToken((Token<DelegationTokenIdentifier>) token);
513        }
514    
515        @SuppressWarnings("unchecked")
516        @Override
517        public void cancel(Token<?> token, Configuration conf
518                           ) throws IOException, InterruptedException {
519          new Cluster(conf).
520            cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
521        }
522    
523        @Override
524        public boolean isManaged(Token<?> token) throws IOException {
525          return true;
526        }   
527      }
528    
529      /**
530       * Build a job client, connect to the indicated job tracker.
531       * 
532       * @param jobTrackAddr the job tracker to connect to.
533       * @param conf configuration.
534       */
535      public JobClient(InetSocketAddress jobTrackAddr, 
536                       Configuration conf) throws IOException {
537        cluster = new Cluster(jobTrackAddr, conf);
538        clientUgi = UserGroupInformation.getCurrentUser();
539      }
540    
541      /**
542       * Close the <code>JobClient</code>.
543       */
544      public synchronized void close() throws IOException {
545        cluster.close();
546      }
547    
548      /**
549       * Get a filesystem handle.  We need this to prepare jobs
550       * for submission to the MapReduce system.
551       * 
552       * @return the filesystem handle.
553       */
554      public synchronized FileSystem getFs() throws IOException {
555        try { 
556          return cluster.getFileSystem();
557        } catch (InterruptedException ie) {
558          throw new IOException(ie);
559        }
560      }
561      
562      /**
563       * Get a handle to the Cluster
564       */
565      public Cluster getClusterHandle() {
566        return cluster;
567      }
568      
569      /**
570       * Submit a job to the MR system.
571       * 
572       * This returns a handle to the {@link RunningJob} which can be used to track
573       * the running-job.
574       * 
575       * @param jobFile the job configuration.
576       * @return a handle to the {@link RunningJob} which can be used to track the
577       *         running-job.
578       * @throws FileNotFoundException
579       * @throws InvalidJobConfException
580       * @throws IOException
581       */
582      public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
583                                                         InvalidJobConfException, 
584                                                         IOException {
585        // Load in the submitted job details
586        JobConf job = new JobConf(jobFile);
587        return submitJob(job);
588      }
589        
590      /**
591       * Submit a job to the MR system.
592       * This returns a handle to the {@link RunningJob} which can be used to track
593       * the running-job.
594       * 
595       * @param conf the job configuration.
596       * @return a handle to the {@link RunningJob} which can be used to track the
597       *         running-job.
598       * @throws FileNotFoundException
599       * @throws IOException
600       */
601      public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
602                                                      IOException {
603        try {
604          conf.setBooleanIfUnset("mapred.mapper.new-api", false);
605          conf.setBooleanIfUnset("mapred.reducer.new-api", false);
606          if (getDelegationTokenCalled) {
607            conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
608            getDelegationTokenCalled = false;
609            conf.set(HS_DELEGATION_TOKEN_RENEWER, dtRenewer);
610            dtRenewer = null;
611          }
612          Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
613            @Override
614            public Job run() throws IOException, ClassNotFoundException, 
615              InterruptedException {
616              Job job = Job.getInstance(conf);
617              job.submit();
618              return job;
619            }
620          });
621          // update our Cluster instance with the one created by Job for submission
622          // (we can't pass our Cluster instance to Job, since Job wraps the config
623          // instance, and the two configs would then diverge)
624          cluster = job.getCluster();
625          return new NetworkedJob(job);
626        } catch (InterruptedException ie) {
627          throw new IOException("interrupted", ie);
628        }
629      }
630    
631      private Job getJobUsingCluster(final JobID jobid) throws IOException,
632      InterruptedException {
633        return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
634          public Job run() throws IOException, InterruptedException  {
635           return cluster.getJob(jobid);
636          }
637        });
638      }
639      /**
640       * Get an {@link RunningJob} object to track an ongoing job.  Returns
641       * null if the id does not correspond to any known job.
642       * 
643       * @param jobid the jobid of the job.
644       * @return the {@link RunningJob} handle to track the job, null if the 
645       *         <code>jobid</code> doesn't correspond to any known job.
646       * @throws IOException
647       */
648      public RunningJob getJob(final JobID jobid) throws IOException {
649        try {
650          
651          Job job = getJobUsingCluster(jobid);
652          if (job != null) {
653            JobStatus status = JobStatus.downgrade(job.getStatus());
654            if (status != null) {
655              return new NetworkedJob(status, cluster);
656            } 
657          }
658        } catch (InterruptedException ie) {
659          throw new IOException(ie);
660        }
661        return null;
662      }
663    
664      /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
665       */
666      @Deprecated
667      public RunningJob getJob(String jobid) throws IOException {
668        return getJob(JobID.forName(jobid));
669      }
670      
671      private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
672      
673      /**
674       * Get the information of the current state of the map tasks of a job.
675       * 
676       * @param jobId the job to query.
677       * @return the list of all of the map tips.
678       * @throws IOException
679       */
680      public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
681        return getTaskReports(jobId, TaskType.MAP);
682      }
683      
684      private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 
685        IOException {
686        try {
687          Job j = getJobUsingCluster(jobId);
688          if(j == null) {
689            return EMPTY_TASK_REPORTS;
690          }
691          return TaskReport.downgradeArray(j.getTaskReports(type));
692        } catch (InterruptedException ie) {
693          throw new IOException(ie);
694        }
695      }
696      
697      /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
698      @Deprecated
699      public TaskReport[] getMapTaskReports(String jobId) throws IOException {
700        return getMapTaskReports(JobID.forName(jobId));
701      }
702      
703      /**
704       * Get the information of the current state of the reduce tasks of a job.
705       * 
706       * @param jobId the job to query.
707       * @return the list of all of the reduce tips.
708       * @throws IOException
709       */    
710      public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
711        return getTaskReports(jobId, TaskType.REDUCE);
712      }
713    
714      /**
715       * Get the information of the current state of the cleanup tasks of a job.
716       * 
717       * @param jobId the job to query.
718       * @return the list of all of the cleanup tips.
719       * @throws IOException
720       */    
721      public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
722        return getTaskReports(jobId, TaskType.JOB_CLEANUP);
723      }
724    
725      /**
726       * Get the information of the current state of the setup tasks of a job.
727       * 
728       * @param jobId the job to query.
729       * @return the list of all of the setup tips.
730       * @throws IOException
731       */    
732      public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
733        return getTaskReports(jobId, TaskType.JOB_SETUP);
734      }
735    
736      
737      /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
738      @Deprecated
739      public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
740        return getReduceTaskReports(JobID.forName(jobId));
741      }
742      
743      /**
744       * Display the information about a job's tasks, of a particular type and
745       * in a particular state
746       * 
747       * @param jobId the ID of the job
748       * @param type the type of the task (map/reduce/setup/cleanup)
749       * @param state the state of the task 
750       * (pending/running/completed/failed/killed)
751       */
752      public void displayTasks(final JobID jobId, String type, String state) 
753      throws IOException {
754        try {
755          Job job = getJobUsingCluster(jobId);
756          super.displayTasks(job, type, state);
757        } catch (InterruptedException ie) {
758          throw new IOException(ie);
759        }
760      }
761      
762      /**
763       * Get status information about the Map-Reduce cluster.
764       *  
765       * @return the status information about the Map-Reduce cluster as an object
766       *         of {@link ClusterStatus}.
767       * @throws IOException
768       */
769      public ClusterStatus getClusterStatus() throws IOException {
770        try {
771          return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
772            public ClusterStatus run()  throws IOException, InterruptedException {
773              ClusterMetrics metrics = cluster.getClusterStatus();
774              return new ClusterStatus(metrics.getTaskTrackerCount(),
775                  metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
776                  metrics.getOccupiedMapSlots(),
777                  metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
778                  metrics.getReduceSlotCapacity(),
779                  cluster.getJobTrackerStatus(),
780                  metrics.getDecommissionedTaskTrackerCount());
781            }
782          });
783        }
784          catch (InterruptedException ie) {
785          throw new IOException(ie);
786        }
787      }
788    
789      private  Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
790        Collection<String> list = new ArrayList<String>();
791        for (TaskTrackerInfo info: objs) {
792          list.add(info.getTaskTrackerName());
793        }
794        return list;
795      }
796    
797      private  Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
798        Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
799        for (TaskTrackerInfo info: objs) {
800          BlackListInfo binfo = new BlackListInfo();
801          binfo.setTrackerName(info.getTaskTrackerName());
802          binfo.setReasonForBlackListing(info.getReasonForBlacklist());
803          binfo.setBlackListReport(info.getBlacklistReport());
804          list.add(binfo);
805        }
806        return list;
807      }
808    
809      /**
810       * Get status information about the Map-Reduce cluster.
811       *  
812       * @param  detailed if true then get a detailed status including the
813       *         tracker names
814       * @return the status information about the Map-Reduce cluster as an object
815       *         of {@link ClusterStatus}.
816       * @throws IOException
817       */
818      public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
819        try {
820          return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
821            public ClusterStatus run() throws IOException, InterruptedException {
822            ClusterMetrics metrics = cluster.getClusterStatus();
823            return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
824              arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
825              cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
826              metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
827              metrics.getReduceSlotCapacity(), 
828              cluster.getJobTrackerStatus());
829            }
830          });
831        } catch (InterruptedException ie) {
832          throw new IOException(ie);
833        }
834      }
835        
836    
837      /** 
838       * Get the jobs that are not completed and not failed.
839       * 
840       * @return array of {@link JobStatus} for the running/to-be-run jobs.
841       * @throws IOException
842       */
843      public JobStatus[] jobsToComplete() throws IOException {
844        List<JobStatus> stats = new ArrayList<JobStatus>();
845        for (JobStatus stat : getAllJobs()) {
846          if (!stat.isJobComplete()) {
847            stats.add(stat);
848          }
849        }
850        return stats.toArray(new JobStatus[0]);
851      }
852    
853      /** 
854       * Get the jobs that are submitted.
855       * 
856       * @return array of {@link JobStatus} for the submitted jobs.
857       * @throws IOException
858       */
859      public JobStatus[] getAllJobs() throws IOException {
860        try {
861          org.apache.hadoop.mapreduce.JobStatus[] jobs = 
862              clientUgi.doAs(new PrivilegedExceptionAction<
863                  org.apache.hadoop.mapreduce.JobStatus[]> () {
864                public org.apache.hadoop.mapreduce.JobStatus[] run() 
865                    throws IOException, InterruptedException {
866                  return cluster.getAllJobStatuses();
867                }
868              });
869          JobStatus[] stats = new JobStatus[jobs.length];
870          for (int i = 0; i < jobs.length; i++) {
871            stats[i] = JobStatus.downgrade(jobs[i]);
872          }
873          return stats;
874        } catch (InterruptedException ie) {
875          throw new IOException(ie);
876        }
877      }
878      
879      /** 
880       * Utility that submits a job, then polls for progress until the job is
881       * complete.
882       * 
883       * @param job the job configuration.
884       * @throws IOException if the job fails
885       */
886      public static RunningJob runJob(JobConf job) throws IOException {
887        JobClient jc = new JobClient(job);
888        RunningJob rj = jc.submitJob(job);
889        try {
890          if (!jc.monitorAndPrintJob(job, rj)) {
891            throw new IOException("Job failed!");
892          }
893        } catch (InterruptedException ie) {
894          Thread.currentThread().interrupt();
895        }
896        return rj;
897      }
898      
899      /**
900       * Monitor a job and print status in real-time as progress is made and tasks 
901       * fail.
902       * @param conf the job's configuration
903       * @param job the job to track
904       * @return true if the job succeeded
905       * @throws IOException if communication to the JobTracker fails
906       */
907      public boolean monitorAndPrintJob(JobConf conf, 
908                                        RunningJob job
909      ) throws IOException, InterruptedException {
910        return ((NetworkedJob)job).monitorAndPrintJob();
911      }
912    
913      static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
914        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
915      }
916      
917      static Configuration getConfiguration(String jobTrackerSpec)
918      {
919        Configuration conf = new Configuration();
920        if (jobTrackerSpec != null) {        
921          if (jobTrackerSpec.indexOf(":") >= 0) {
922            conf.set("mapred.job.tracker", jobTrackerSpec);
923          } else {
924            String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
925            URL validate = conf.getResource(classpathFile);
926            if (validate == null) {
927              throw new RuntimeException(classpathFile + " not found on CLASSPATH");
928            }
929            conf.addResource(classpathFile);
930          }
931        }
932        return conf;
933      }
934    
935      /**
936       * Sets the output filter for tasks. only those tasks are printed whose
937       * output matches the filter. 
938       * @param newValue task filter.
939       */
940      @Deprecated
941      public void setTaskOutputFilter(TaskStatusFilter newValue){
942        this.taskOutputFilter = newValue;
943      }
944        
945      /**
946       * Get the task output filter out of the JobConf.
947       * 
948       * @param job the JobConf to examine.
949       * @return the filter level.
950       */
951      public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
952        return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
953                                                "FAILED"));
954      }
955        
956      /**
957       * Modify the JobConf to set the task output filter.
958       * 
959       * @param job the JobConf to modify.
960       * @param newValue the value to set.
961       */
962      public static void setTaskOutputFilter(JobConf job, 
963                                             TaskStatusFilter newValue) {
964        job.set("jobclient.output.filter", newValue.toString());
965      }
966        
967      /**
968       * Returns task output filter.
969       * @return task filter. 
970       */
971      @Deprecated
972      public TaskStatusFilter getTaskOutputFilter(){
973        return this.taskOutputFilter; 
974      }
975    
976      protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
977          String counterGroupName, String counterName) throws IOException {
978        Counters counters = Counters.downgrade(cntrs);
979        return counters.findCounter(counterGroupName, counterName).getValue();
980      }
981    
982      /**
983       * Get status information about the max available Maps in the cluster.
984       *  
985       * @return the max available Maps in the cluster
986       * @throws IOException
987       */
988      public int getDefaultMaps() throws IOException {
989        try {
990          return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
991            @Override
992            public Integer run() throws IOException, InterruptedException {
993              return cluster.getClusterStatus().getMapSlotCapacity();
994            }
995          });
996        } catch (InterruptedException ie) {
997          throw new IOException(ie);
998        }
999      }
1000    
1001      /**
1002       * Get status information about the max available Reduces in the cluster.
1003       *  
1004       * @return the max available Reduces in the cluster
1005       * @throws IOException
1006       */
1007      public int getDefaultReduces() throws IOException {
1008        try {
1009          return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
1010            @Override
1011            public Integer run() throws IOException, InterruptedException {
1012              return cluster.getClusterStatus().getReduceSlotCapacity();
1013            }
1014          });
1015        } catch (InterruptedException ie) {
1016          throw new IOException(ie);
1017        }
1018      }
1019    
1020      /**
1021       * Grab the jobtracker system directory path where job-specific files are to be placed.
1022       * 
1023       * @return the system directory where job-specific files are to be placed.
1024       */
1025      public Path getSystemDir() {
1026        try {
1027          return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1028            @Override
1029            public Path run() throws IOException, InterruptedException {
1030              return cluster.getSystemDir();
1031            }
1032          });
1033          } catch (IOException ioe) {
1034          return null;
1035        } catch (InterruptedException ie) {
1036          return null;
1037        }
1038      }
1039    
1040      private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1041        JobQueueInfo ret = new JobQueueInfo(queue);
1042        // make sure to convert any children
1043        if (queue.getQueueChildren().size() > 0) {
1044          List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1045              .getQueueChildren().size());
1046          for (QueueInfo child : queue.getQueueChildren()) {
1047            childQueues.add(getJobQueueInfo(child));
1048          }
1049          ret.setChildren(childQueues);
1050        }
1051        return ret;
1052      }
1053    
1054      private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1055          throws IOException {
1056        JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1057        for (int i = 0; i < queues.length; i++) {
1058          ret[i] = getJobQueueInfo(queues[i]);
1059        }
1060        return ret;
1061      }
1062    
1063      /**
1064       * Returns an array of queue information objects about root level queues
1065       * configured
1066       *
1067       * @return the array of root level JobQueueInfo objects
1068       * @throws IOException
1069       */
1070      public JobQueueInfo[] getRootQueues() throws IOException {
1071        try {
1072          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1073            public JobQueueInfo[] run() throws IOException, InterruptedException {
1074              return getJobQueueInfoArray(cluster.getRootQueues());
1075            }
1076          });
1077        } catch (InterruptedException ie) {
1078          throw new IOException(ie);
1079        }
1080      }
1081    
1082      /**
1083       * Returns an array of queue information objects about immediate children
1084       * of queue queueName.
1085       * 
1086       * @param queueName
1087       * @return the array of immediate children JobQueueInfo objects
1088       * @throws IOException
1089       */
1090      public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1091        try {
1092          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1093            public JobQueueInfo[] run() throws IOException, InterruptedException {
1094              return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1095            }
1096          });
1097        } catch (InterruptedException ie) {
1098          throw new IOException(ie);
1099        }
1100      }
1101      
1102      /**
1103       * Return an array of queue information objects about all the Job Queues
1104       * configured.
1105       * 
1106       * @return Array of JobQueueInfo objects
1107       * @throws IOException
1108       */
1109      public JobQueueInfo[] getQueues() throws IOException {
1110        try {
1111          return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1112            public JobQueueInfo[] run() throws IOException, InterruptedException {
1113              return getJobQueueInfoArray(cluster.getQueues());
1114            }
1115          });
1116        } catch (InterruptedException ie) {
1117          throw new IOException(ie);
1118        }
1119      }
1120      
1121      /**
1122       * Gets all the jobs which were added to particular Job Queue
1123       * 
1124       * @param queueName name of the Job Queue
1125       * @return Array of jobs present in the job queue
1126       * @throws IOException
1127       */
1128      
1129      public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1130        try {
1131          QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1132            @Override
1133            public QueueInfo run() throws IOException, InterruptedException {
1134              return cluster.getQueue(queueName);
1135            }
1136          });
1137          if (queue == null) {
1138            return null;
1139          }
1140          org.apache.hadoop.mapreduce.JobStatus[] stats = 
1141            queue.getJobStatuses();
1142          JobStatus[] ret = new JobStatus[stats.length];
1143          for (int i = 0 ; i < stats.length; i++ ) {
1144            ret[i] = JobStatus.downgrade(stats[i]);
1145          }
1146          return ret;
1147        } catch (InterruptedException ie) {
1148          throw new IOException(ie);
1149        }
1150      }
1151      
1152      /**
1153       * Gets the queue information associated to a particular Job Queue
1154       * 
1155       * @param queueName name of the job queue.
1156       * @return Queue information associated to particular queue.
1157       * @throws IOException
1158       */
1159      public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1160        try {
1161          QueueInfo queueInfo = clientUgi.doAs(new 
1162              PrivilegedExceptionAction<QueueInfo>() {
1163            public QueueInfo run() throws IOException, InterruptedException {
1164              return cluster.getQueue(queueName);
1165            }
1166          });
1167          if (queueInfo != null) {
1168            return new JobQueueInfo(queueInfo);
1169          }
1170          return null;
1171        } catch (InterruptedException ie) {
1172          throw new IOException(ie);
1173        }
1174      }
1175      
1176      /**
1177       * Gets the Queue ACLs for current user
1178       * @return array of QueueAclsInfo object for current user.
1179       * @throws IOException
1180       */
1181      public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1182        try {
1183          org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 
1184            clientUgi.doAs(new 
1185                PrivilegedExceptionAction
1186                <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1187                  public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 
1188                  throws IOException, InterruptedException {
1189                    return cluster.getQueueAclsForCurrentUser();
1190                  }
1191            });
1192          QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1193          for (int i = 0 ; i < acls.length; i++ ) {
1194            ret[i] = QueueAclsInfo.downgrade(acls[i]);
1195          }
1196          return ret;
1197        } catch (InterruptedException ie) {
1198          throw new IOException(ie);
1199        }
1200      }
1201    
1202      /**
1203       * Get a delegation token for the user from the JobTracker.
1204       * @param renewer the user who can renew the token
1205       * @return the new token
1206       * @throws IOException
1207       */
1208      public Token<DelegationTokenIdentifier> 
1209        getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1210        getDelegationTokenCalled = true;
1211        dtRenewer = renewer.toString();
1212        return clientUgi.doAs(new 
1213            PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1214          public Token<DelegationTokenIdentifier> run() throws IOException, 
1215          InterruptedException {
1216            return cluster.getDelegationToken(renewer);
1217          }
1218        });
1219      }
1220    
1221      /**
1222       * Renew a delegation token
1223       * @param token the token to renew
1224       * @return true if the renewal went well
1225       * @throws InvalidToken
1226       * @throws IOException
1227       * @deprecated Use {@link Token#renew} instead
1228       */
1229      public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1230                                       ) throws InvalidToken, IOException, 
1231                                                InterruptedException {
1232        return token.renew(getConf());
1233      }
1234    
1235      /**
1236       * Cancel a delegation token from the JobTracker
1237       * @param token the token to cancel
1238       * @throws IOException
1239       * @deprecated Use {@link Token#cancel} instead
1240       */
1241      public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1242                                        ) throws InvalidToken, IOException, 
1243                                                 InterruptedException {
1244        token.cancel(getConf());
1245      }
1246    
1247      /**
1248       */
1249      public static void main(String argv[]) throws Exception {
1250        int res = ToolRunner.run(new JobClient(), argv);
1251        System.exit(res);
1252      }
1253    }
1254