001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.tools;
019    
020    import java.io.IOException;
021    import java.io.PrintWriter;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.classification.InterfaceAudience.Private;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.conf.Configured;
032    import org.apache.hadoop.ipc.RemoteException;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.TIPStatus;
035    import org.apache.hadoop.mapreduce.Cluster;
036    import org.apache.hadoop.mapreduce.Counters;
037    import org.apache.hadoop.mapreduce.Job;
038    import org.apache.hadoop.mapreduce.JobID;
039    import org.apache.hadoop.mapreduce.JobPriority;
040    import org.apache.hadoop.mapreduce.JobStatus;
041    import org.apache.hadoop.mapreduce.TaskAttemptID;
042    import org.apache.hadoop.mapreduce.TaskCompletionEvent;
043    import org.apache.hadoop.mapreduce.TaskReport;
044    import org.apache.hadoop.mapreduce.TaskTrackerInfo;
045    import org.apache.hadoop.mapreduce.TaskType;
046    import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
047    import org.apache.hadoop.mapreduce.v2.LogParams;
048    import org.apache.hadoop.security.AccessControlException;
049    import org.apache.hadoop.util.Tool;
050    import org.apache.hadoop.util.ToolRunner;
051    import org.apache.hadoop.yarn.logaggregation.LogDumper;
052    
053    /**
054     * Interprets the map reduce cli options 
055     */
056    @InterfaceAudience.Public
057    @InterfaceStability.Stable
058    public class CLI extends Configured implements Tool {
059      private static final Log LOG = LogFactory.getLog(CLI.class);
060      protected Cluster cluster;
061    
062      public CLI() {
063      }
064      
065      public CLI(Configuration conf) {
066        setConf(conf);
067      }
068      
069      public int run(String[] argv) throws Exception {
070        int exitCode = -1;
071        if (argv.length < 1) {
072          displayUsage("");
073          return exitCode;
074        }    
075        // process arguments
076        String cmd = argv[0];
077        String submitJobFile = null;
078        String jobid = null;
079        String taskid = null;
080        String historyFile = null;
081        String counterGroupName = null;
082        String counterName = null;
083        JobPriority jp = null;
084        String taskType = null;
085        String taskState = null;
086        int fromEvent = 0;
087        int nEvents = 0;
088        boolean getStatus = false;
089        boolean getCounter = false;
090        boolean killJob = false;
091        boolean listEvents = false;
092        boolean viewHistory = false;
093        boolean viewAllHistory = false;
094        boolean listJobs = false;
095        boolean listAllJobs = false;
096        boolean listActiveTrackers = false;
097        boolean listBlacklistedTrackers = false;
098        boolean displayTasks = false;
099        boolean killTask = false;
100        boolean failTask = false;
101        boolean setJobPriority = false;
102        boolean logs = false;
103    
104        if ("-submit".equals(cmd)) {
105          if (argv.length != 2) {
106            displayUsage(cmd);
107            return exitCode;
108          }
109          submitJobFile = argv[1];
110        } else if ("-status".equals(cmd)) {
111          if (argv.length != 2) {
112            displayUsage(cmd);
113            return exitCode;
114          }
115          jobid = argv[1];
116          getStatus = true;
117        } else if("-counter".equals(cmd)) {
118          if (argv.length != 4) {
119            displayUsage(cmd);
120            return exitCode;
121          }
122          getCounter = true;
123          jobid = argv[1];
124          counterGroupName = argv[2];
125          counterName = argv[3];
126        } else if ("-kill".equals(cmd)) {
127          if (argv.length != 2) {
128            displayUsage(cmd);
129            return exitCode;
130          }
131          jobid = argv[1];
132          killJob = true;
133        } else if ("-set-priority".equals(cmd)) {
134          if (argv.length != 3) {
135            displayUsage(cmd);
136            return exitCode;
137          }
138          jobid = argv[1];
139          try {
140            jp = JobPriority.valueOf(argv[2]); 
141          } catch (IllegalArgumentException iae) {
142            LOG.info(iae);
143            displayUsage(cmd);
144            return exitCode;
145          }
146          setJobPriority = true; 
147        } else if ("-events".equals(cmd)) {
148          if (argv.length != 4) {
149            displayUsage(cmd);
150            return exitCode;
151          }
152          jobid = argv[1];
153          fromEvent = Integer.parseInt(argv[2]);
154          nEvents = Integer.parseInt(argv[3]);
155          listEvents = true;
156        } else if ("-history".equals(cmd)) {
157          if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
158             displayUsage(cmd);
159             return exitCode;
160          }
161          viewHistory = true;
162          if (argv.length == 3 && "all".equals(argv[1])) {
163            viewAllHistory = true;
164            historyFile = argv[2];
165          } else {
166            historyFile = argv[1];
167          }
168        } else if ("-list".equals(cmd)) {
169          if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
170            displayUsage(cmd);
171            return exitCode;
172          }
173          if (argv.length == 2 && "all".equals(argv[1])) {
174            listAllJobs = true;
175          } else {
176            listJobs = true;
177          }
178        } else if("-kill-task".equals(cmd)) {
179          if (argv.length != 2) {
180            displayUsage(cmd);
181            return exitCode;
182          }
183          killTask = true;
184          taskid = argv[1];
185        } else if("-fail-task".equals(cmd)) {
186          if (argv.length != 2) {
187            displayUsage(cmd);
188            return exitCode;
189          }
190          failTask = true;
191          taskid = argv[1];
192        } else if ("-list-active-trackers".equals(cmd)) {
193          if (argv.length != 1) {
194            displayUsage(cmd);
195            return exitCode;
196          }
197          listActiveTrackers = true;
198        } else if ("-list-blacklisted-trackers".equals(cmd)) {
199          if (argv.length != 1) {
200            displayUsage(cmd);
201            return exitCode;
202          }
203          listBlacklistedTrackers = true;
204        } else if ("-list-attempt-ids".equals(cmd)) {
205          if (argv.length != 4) {
206            displayUsage(cmd);
207            return exitCode;
208          }
209          jobid = argv[1];
210          taskType = argv[2];
211          taskState = argv[3];
212          displayTasks = true;
213        } else if ("-logs".equals(cmd)) {
214          if (argv.length == 2 || argv.length ==3) {
215            logs = true;
216            jobid = argv[1];
217            if (argv.length == 3) {
218              taskid = argv[2];
219            }  else {
220              taskid = null;
221            }
222          } else {
223            displayUsage(cmd);
224            return exitCode;
225          }
226        } else {
227          displayUsage(cmd);
228          return exitCode;
229        }
230    
231        // initialize cluster
232        cluster = new Cluster(getConf());
233            
234        // Submit the request
235        try {
236          if (submitJobFile != null) {
237            Job job = Job.getInstance(new JobConf(submitJobFile));
238            job.submit();
239            System.out.println("Created job " + job.getJobID());
240            exitCode = 0;
241          } else if (getStatus) {
242            Job job = cluster.getJob(JobID.forName(jobid));
243            if (job == null) {
244              System.out.println("Could not find job " + jobid);
245            } else {
246              Counters counters = job.getCounters();
247              System.out.println();
248              System.out.println(job);
249              if (counters != null) {
250                System.out.println(counters);
251              } else {
252                System.out.println("Counters not available. Job is retired.");
253              }
254              exitCode = 0;
255            }
256          } else if (getCounter) {
257            Job job = cluster.getJob(JobID.forName(jobid));
258            if (job == null) {
259              System.out.println("Could not find job " + jobid);
260            } else {
261              Counters counters = job.getCounters();
262              if (counters == null) {
263                System.out.println("Counters not available for retired job " + 
264                jobid);
265                exitCode = -1;
266              } else {
267                System.out.println(getCounter(counters,
268                  counterGroupName, counterName));
269                exitCode = 0;
270              }
271            }
272          } else if (killJob) {
273            Job job = cluster.getJob(JobID.forName(jobid));
274            if (job == null) {
275              System.out.println("Could not find job " + jobid);
276            } else {
277              job.killJob();
278              System.out.println("Killed job " + jobid);
279              exitCode = 0;
280            }
281          } else if (setJobPriority) {
282            Job job = cluster.getJob(JobID.forName(jobid));
283            if (job == null) {
284              System.out.println("Could not find job " + jobid);
285            } else {
286              job.setPriority(jp);
287              System.out.println("Changed job priority.");
288              exitCode = 0;
289            } 
290          } else if (viewHistory) {
291            viewHistory(historyFile, viewAllHistory);
292            exitCode = 0;
293          } else if (listEvents) {
294            listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
295            exitCode = 0;
296          } else if (listJobs) {
297            listJobs(cluster);
298            exitCode = 0;
299          } else if (listAllJobs) {
300            listAllJobs(cluster);
301            exitCode = 0;
302          } else if (listActiveTrackers) {
303            listActiveTrackers(cluster);
304            exitCode = 0;
305          } else if (listBlacklistedTrackers) {
306            listBlacklistedTrackers(cluster);
307            exitCode = 0;
308          } else if (displayTasks) {
309            displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
310          } else if(killTask) {
311            TaskAttemptID taskID = TaskAttemptID.forName(taskid);
312            Job job = cluster.getJob(taskID.getJobID());
313            if (job == null) {
314              System.out.println("Could not find job " + jobid);
315            } else if (job.killTask(taskID)) {
316              System.out.println("Killed task " + taskid);
317              exitCode = 0;
318            } else {
319              System.out.println("Could not kill task " + taskid);
320              exitCode = -1;
321            }
322          } else if(failTask) {
323            TaskAttemptID taskID = TaskAttemptID.forName(taskid);
324            Job job = cluster.getJob(taskID.getJobID());
325            if (job == null) {
326                System.out.println("Could not find job " + jobid);
327            } else if(job.failTask(taskID)) {
328              System.out.println("Killed task " + taskID + " by failing it");
329              exitCode = 0;
330            } else {
331              System.out.println("Could not fail task " + taskid);
332              exitCode = -1;
333            }
334          } else if (logs) {
335            try {
336            JobID jobID = JobID.forName(jobid);
337            TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
338            LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
339            LogDumper logDumper = new LogDumper();
340            logDumper.setConf(getConf());
341            exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
342                logParams.getContainerId(), logParams.getNodeId(),
343                logParams.getOwner());
344            } catch (IOException e) {
345              if (e instanceof RemoteException) {
346                throw e;
347              } 
348              System.out.println(e.getMessage());
349            }
350          }
351        } catch (RemoteException re) {
352          IOException unwrappedException = re.unwrapRemoteException();
353          if (unwrappedException instanceof AccessControlException) {
354            System.out.println(unwrappedException.getMessage());
355          } else {
356            throw re;
357          }
358        } finally {
359          cluster.close();
360        }
361        return exitCode;
362      }
363    
364      private String getJobPriorityNames() {
365        StringBuffer sb = new StringBuffer();
366        for (JobPriority p : JobPriority.values()) {
367          sb.append(p.name()).append(" ");
368        }
369        return sb.substring(0, sb.length()-1);
370      }
371    
372      private String getTaskTypess() {
373        StringBuffer sb = new StringBuffer();
374        for (TaskType t : TaskType.values()) {
375          sb.append(t.name()).append(" ");
376        }
377        return sb.substring(0, sb.length()-1);
378      }
379    
380      /**
381       * Display usage of the command-line tool and terminate execution.
382       */
383      private void displayUsage(String cmd) {
384        String prefix = "Usage: CLI ";
385        String jobPriorityValues = getJobPriorityNames();
386        String taskTypes = getTaskTypess();
387        String taskStates = "running, completed";
388        if ("-submit".equals(cmd)) {
389          System.err.println(prefix + "[" + cmd + " <job-file>]");
390        } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
391          System.err.println(prefix + "[" + cmd + " <job-id>]");
392        } else if ("-counter".equals(cmd)) {
393          System.err.println(prefix + "[" + cmd + 
394            " <job-id> <group-name> <counter-name>]");
395        } else if ("-events".equals(cmd)) {
396          System.err.println(prefix + "[" + cmd + 
397            " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
398        } else if ("-history".equals(cmd)) {
399          System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]");
400        } else if ("-list".equals(cmd)) {
401          System.err.println(prefix + "[" + cmd + " [all]]");
402        } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
403          System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
404        } else if ("-set-priority".equals(cmd)) {
405          System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
406              "Valid values for priorities are: " 
407              + jobPriorityValues); 
408        } else if ("-list-active-trackers".equals(cmd)) {
409          System.err.println(prefix + "[" + cmd + "]");
410        } else if ("-list-blacklisted-trackers".equals(cmd)) {
411          System.err.println(prefix + "[" + cmd + "]");
412        } else if ("-list-attempt-ids".equals(cmd)) {
413          System.err.println(prefix + "[" + cmd + 
414              " <job-id> <task-type> <task-state>]. " +
415              "Valid values for <task-type> are " + taskTypes + ". " +
416              "Valid values for <task-state> are " + taskStates);
417        } else if ("-logs".equals(cmd)) {
418          System.err.println(prefix + "[" + cmd +
419              " <job-id> <task-attempt-id>]. " +
420              " <task-attempt-id> is optional to get task attempt logs.");      
421        } else {
422          System.err.printf(prefix + "<command> <args>\n");
423          System.err.printf("\t[-submit <job-file>]\n");
424          System.err.printf("\t[-status <job-id>]\n");
425          System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n");
426          System.err.printf("\t[-kill <job-id>]\n");
427          System.err.printf("\t[-set-priority <job-id> <priority>]. " +
428            "Valid values for priorities are: " + jobPriorityValues + "\n");
429          System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n");
430          System.err.printf("\t[-history <jobHistoryFile>]\n");
431          System.err.printf("\t[-list [all]]\n");
432          System.err.printf("\t[-list-active-trackers]\n");
433          System.err.printf("\t[-list-blacklisted-trackers]\n");
434          System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
435            "<task-state>]. " +
436            "Valid values for <task-type> are " + taskTypes + ". " +
437            "Valid values for <task-state> are " + taskStates);
438          System.err.printf("\t[-kill-task <task-attempt-id>]\n");
439          System.err.printf("\t[-fail-task <task-attempt-id>]\n");
440          System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n");
441          ToolRunner.printGenericCommandUsage(System.out);
442        }
443      }
444        
445      private void viewHistory(String historyFile, boolean all) 
446        throws IOException {
447        HistoryViewer historyViewer = new HistoryViewer(historyFile,
448                                            getConf(), all);
449        historyViewer.print();
450      }
451    
452      protected long getCounter(Counters counters, String counterGroupName,
453          String counterName) throws IOException {
454        return counters.findCounter(counterGroupName, counterName).getValue();
455      }
456      
457      /**
458       * List the events for the given job
459       * @param jobId the job id for the job's events to list
460       * @throws IOException
461       */
462      private void listEvents(Job job, int fromEventId, int numEvents)
463          throws IOException, InterruptedException {
464        TaskCompletionEvent[] events = job.
465          getTaskCompletionEvents(fromEventId, numEvents);
466        System.out.println("Task completion events for " + job.getJobID());
467        System.out.println("Number of events (from " + fromEventId + ") are: " 
468          + events.length);
469        for(TaskCompletionEvent event: events) {
470          System.out.println(event.getStatus() + " " + 
471            event.getTaskAttemptId() + " " + 
472            getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
473        }
474      }
475    
476      protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
477        return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
478      }
479      
480    
481      /**
482       * Dump a list of currently running jobs
483       * @throws IOException
484       */
485      private void listJobs(Cluster cluster) 
486          throws IOException, InterruptedException {
487        List<JobStatus> runningJobs = new ArrayList<JobStatus>();
488        for (JobStatus job : cluster.getAllJobStatuses()) {
489          if (!job.isJobComplete()) {
490            runningJobs.add(job);
491          }
492        }
493        displayJobList(runningJobs.toArray(new JobStatus[0]));
494      }
495        
496      /**
497       * Dump a list of all jobs submitted.
498       * @throws IOException
499       */
500      private void listAllJobs(Cluster cluster) 
501          throws IOException, InterruptedException {
502        displayJobList(cluster.getAllJobStatuses());
503      }
504      
505      /**
506       * Display the list of active trackers
507       */
508      private void listActiveTrackers(Cluster cluster) 
509          throws IOException, InterruptedException {
510        TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
511        for (TaskTrackerInfo tracker : trackers) {
512          System.out.println(tracker.getTaskTrackerName());
513        }
514      }
515    
516      /**
517       * Display the list of blacklisted trackers
518       */
519      private void listBlacklistedTrackers(Cluster cluster) 
520          throws IOException, InterruptedException {
521        TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
522        if (trackers.length > 0) {
523          System.out.println("BlackListedNode \t Reason");
524        }
525        for (TaskTrackerInfo tracker : trackers) {
526          System.out.println(tracker.getTaskTrackerName() + "\t" + 
527            tracker.getReasonForBlacklist());
528        }
529      }
530    
531      private void printTaskAttempts(TaskReport report) {
532        if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
533          System.out.println(report.getSuccessfulTaskAttemptId());
534        } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
535          for (TaskAttemptID t : 
536            report.getRunningTaskAttemptIds()) {
537            System.out.println(t);
538          }
539        }
540      }
541    
542      /**
543       * Display the information about a job's tasks, of a particular type and
544       * in a particular state
545       * 
546       * @param job the job
547       * @param type the type of the task (map/reduce/setup/cleanup)
548       * @param state the state of the task 
549       * (pending/running/completed/failed/killed)
550       */
551      protected void displayTasks(Job job, String type, String state) 
552      throws IOException, InterruptedException {
553        TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
554        for (TaskReport report : reports) {
555          TIPStatus status = report.getCurrentStatus();
556          if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
557              (state.equals("running") && status ==TIPStatus.RUNNING) ||
558              (state.equals("completed") && status == TIPStatus.COMPLETE) ||
559              (state.equals("failed") && status == TIPStatus.FAILED) ||
560              (state.equals("killed") && status == TIPStatus.KILLED)) {
561            printTaskAttempts(report);
562          }
563        }
564      }
565    
566      public void displayJobList(JobStatus[] jobs) 
567          throws IOException, InterruptedException {
568        displayJobList(jobs, new PrintWriter(System.out));
569      }
570    
571      @Private
572      public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
573      @Private
574      public static String dataPattern   = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
575      private static String memPattern   = "%dM";
576      private static String UNAVAILABLE  = "N/A";
577    
578      @Private
579      public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
580        writer.println("Total jobs:" + jobs.length);
581        writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
582          "Queue", "Priority", "UsedContainers",
583          "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
584        for (JobStatus job : jobs) {
585          int numUsedSlots = job.getNumUsedSlots();
586          int numReservedSlots = job.getNumReservedSlots();
587          int usedMem = job.getUsedMem();
588          int rsvdMem = job.getReservedMem();
589          int neededMem = job.getNeededMem();
590          writer.printf(dataPattern,
591              job.getJobID().toString(), job.getState(), job.getStartTime(),
592              job.getUsername(), job.getQueue(), 
593              job.getPriority().name(),
594              numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
595              numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
596              usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
597              rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
598              neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
599              job.getSchedulingInfo());
600        }
601        writer.flush();
602      }
603      
604      public static void main(String[] argv) throws Exception {
605        int res = ToolRunner.run(new CLI(), argv);
606        System.exit(res);
607      }
608    }