001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.mapreduce.tools; 019 020 import java.io.IOException; 021 import java.io.PrintWriter; 022 import java.util.ArrayList; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.classification.InterfaceAudience.Private; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.conf.Configured; 032 import org.apache.hadoop.ipc.RemoteException; 033 import org.apache.hadoop.mapred.JobConf; 034 import org.apache.hadoop.mapred.TIPStatus; 035 import org.apache.hadoop.mapreduce.Cluster; 036 import org.apache.hadoop.mapreduce.Counters; 037 import org.apache.hadoop.mapreduce.Job; 038 import org.apache.hadoop.mapreduce.JobID; 039 import org.apache.hadoop.mapreduce.JobPriority; 040 import org.apache.hadoop.mapreduce.JobStatus; 041 import org.apache.hadoop.mapreduce.TaskAttemptID; 042 import org.apache.hadoop.mapreduce.TaskCompletionEvent; 043 import org.apache.hadoop.mapreduce.TaskReport; 044 import org.apache.hadoop.mapreduce.TaskTrackerInfo; 045 import org.apache.hadoop.mapreduce.TaskType; 046 import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 047 import org.apache.hadoop.mapreduce.v2.LogParams; 048 import org.apache.hadoop.security.AccessControlException; 049 import org.apache.hadoop.util.Tool; 050 import org.apache.hadoop.util.ToolRunner; 051 import org.apache.hadoop.yarn.logaggregation.LogDumper; 052 053 /** 054 * Interprets the map reduce cli options 055 */ 056 @InterfaceAudience.Public 057 @InterfaceStability.Stable 058 public class CLI extends Configured implements Tool { 059 private static final Log LOG = LogFactory.getLog(CLI.class); 060 protected Cluster cluster; 061 062 public CLI() { 063 } 064 065 public CLI(Configuration conf) { 066 setConf(conf); 067 } 068 069 public int run(String[] argv) throws Exception { 070 int exitCode = -1; 071 if (argv.length < 1) { 072 displayUsage(""); 073 return exitCode; 074 } 075 // process arguments 076 String cmd = argv[0]; 077 String submitJobFile = null; 078 String jobid = null; 079 String taskid = null; 080 String historyFile = null; 081 String counterGroupName = null; 082 String counterName = null; 083 JobPriority jp = null; 084 String taskType = null; 085 String taskState = null; 086 int fromEvent = 0; 087 int nEvents = 0; 088 boolean getStatus = false; 089 boolean getCounter = false; 090 boolean killJob = false; 091 boolean listEvents = false; 092 boolean viewHistory = false; 093 boolean viewAllHistory = false; 094 boolean listJobs = false; 095 boolean listAllJobs = false; 096 boolean listActiveTrackers = false; 097 boolean listBlacklistedTrackers = false; 098 boolean displayTasks = false; 099 boolean killTask = false; 100 boolean failTask = false; 101 boolean setJobPriority = false; 102 boolean logs = false; 103 104 if ("-submit".equals(cmd)) { 105 if (argv.length != 2) { 106 displayUsage(cmd); 107 return exitCode; 108 } 109 submitJobFile = argv[1]; 110 } else if ("-status".equals(cmd)) { 111 if (argv.length != 2) { 112 displayUsage(cmd); 113 return exitCode; 114 } 115 jobid = argv[1]; 116 getStatus = true; 117 } else if("-counter".equals(cmd)) { 118 if (argv.length != 4) { 119 displayUsage(cmd); 120 return exitCode; 121 } 122 getCounter = true; 123 jobid = argv[1]; 124 counterGroupName = argv[2]; 125 counterName = argv[3]; 126 } else if ("-kill".equals(cmd)) { 127 if (argv.length != 2) { 128 displayUsage(cmd); 129 return exitCode; 130 } 131 jobid = argv[1]; 132 killJob = true; 133 } else if ("-set-priority".equals(cmd)) { 134 if (argv.length != 3) { 135 displayUsage(cmd); 136 return exitCode; 137 } 138 jobid = argv[1]; 139 try { 140 jp = JobPriority.valueOf(argv[2]); 141 } catch (IllegalArgumentException iae) { 142 LOG.info(iae); 143 displayUsage(cmd); 144 return exitCode; 145 } 146 setJobPriority = true; 147 } else if ("-events".equals(cmd)) { 148 if (argv.length != 4) { 149 displayUsage(cmd); 150 return exitCode; 151 } 152 jobid = argv[1]; 153 fromEvent = Integer.parseInt(argv[2]); 154 nEvents = Integer.parseInt(argv[3]); 155 listEvents = true; 156 } else if ("-history".equals(cmd)) { 157 if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) { 158 displayUsage(cmd); 159 return exitCode; 160 } 161 viewHistory = true; 162 if (argv.length == 3 && "all".equals(argv[1])) { 163 viewAllHistory = true; 164 historyFile = argv[2]; 165 } else { 166 historyFile = argv[1]; 167 } 168 } else if ("-list".equals(cmd)) { 169 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 170 displayUsage(cmd); 171 return exitCode; 172 } 173 if (argv.length == 2 && "all".equals(argv[1])) { 174 listAllJobs = true; 175 } else { 176 listJobs = true; 177 } 178 } else if("-kill-task".equals(cmd)) { 179 if (argv.length != 2) { 180 displayUsage(cmd); 181 return exitCode; 182 } 183 killTask = true; 184 taskid = argv[1]; 185 } else if("-fail-task".equals(cmd)) { 186 if (argv.length != 2) { 187 displayUsage(cmd); 188 return exitCode; 189 } 190 failTask = true; 191 taskid = argv[1]; 192 } else if ("-list-active-trackers".equals(cmd)) { 193 if (argv.length != 1) { 194 displayUsage(cmd); 195 return exitCode; 196 } 197 listActiveTrackers = true; 198 } else if ("-list-blacklisted-trackers".equals(cmd)) { 199 if (argv.length != 1) { 200 displayUsage(cmd); 201 return exitCode; 202 } 203 listBlacklistedTrackers = true; 204 } else if ("-list-attempt-ids".equals(cmd)) { 205 if (argv.length != 4) { 206 displayUsage(cmd); 207 return exitCode; 208 } 209 jobid = argv[1]; 210 taskType = argv[2]; 211 taskState = argv[3]; 212 displayTasks = true; 213 } else if ("-logs".equals(cmd)) { 214 if (argv.length == 2 || argv.length ==3) { 215 logs = true; 216 jobid = argv[1]; 217 if (argv.length == 3) { 218 taskid = argv[2]; 219 } else { 220 taskid = null; 221 } 222 } else { 223 displayUsage(cmd); 224 return exitCode; 225 } 226 } else { 227 displayUsage(cmd); 228 return exitCode; 229 } 230 231 // initialize cluster 232 cluster = new Cluster(getConf()); 233 234 // Submit the request 235 try { 236 if (submitJobFile != null) { 237 Job job = Job.getInstance(new JobConf(submitJobFile)); 238 job.submit(); 239 System.out.println("Created job " + job.getJobID()); 240 exitCode = 0; 241 } else if (getStatus) { 242 Job job = cluster.getJob(JobID.forName(jobid)); 243 if (job == null) { 244 System.out.println("Could not find job " + jobid); 245 } else { 246 Counters counters = job.getCounters(); 247 System.out.println(); 248 System.out.println(job); 249 if (counters != null) { 250 System.out.println(counters); 251 } else { 252 System.out.println("Counters not available. Job is retired."); 253 } 254 exitCode = 0; 255 } 256 } else if (getCounter) { 257 Job job = cluster.getJob(JobID.forName(jobid)); 258 if (job == null) { 259 System.out.println("Could not find job " + jobid); 260 } else { 261 Counters counters = job.getCounters(); 262 if (counters == null) { 263 System.out.println("Counters not available for retired job " + 264 jobid); 265 exitCode = -1; 266 } else { 267 System.out.println(getCounter(counters, 268 counterGroupName, counterName)); 269 exitCode = 0; 270 } 271 } 272 } else if (killJob) { 273 Job job = cluster.getJob(JobID.forName(jobid)); 274 if (job == null) { 275 System.out.println("Could not find job " + jobid); 276 } else { 277 job.killJob(); 278 System.out.println("Killed job " + jobid); 279 exitCode = 0; 280 } 281 } else if (setJobPriority) { 282 Job job = cluster.getJob(JobID.forName(jobid)); 283 if (job == null) { 284 System.out.println("Could not find job " + jobid); 285 } else { 286 job.setPriority(jp); 287 System.out.println("Changed job priority."); 288 exitCode = 0; 289 } 290 } else if (viewHistory) { 291 viewHistory(historyFile, viewAllHistory); 292 exitCode = 0; 293 } else if (listEvents) { 294 listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents); 295 exitCode = 0; 296 } else if (listJobs) { 297 listJobs(cluster); 298 exitCode = 0; 299 } else if (listAllJobs) { 300 listAllJobs(cluster); 301 exitCode = 0; 302 } else if (listActiveTrackers) { 303 listActiveTrackers(cluster); 304 exitCode = 0; 305 } else if (listBlacklistedTrackers) { 306 listBlacklistedTrackers(cluster); 307 exitCode = 0; 308 } else if (displayTasks) { 309 displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); 310 } else if(killTask) { 311 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 312 Job job = cluster.getJob(taskID.getJobID()); 313 if (job == null) { 314 System.out.println("Could not find job " + jobid); 315 } else if (job.killTask(taskID)) { 316 System.out.println("Killed task " + taskid); 317 exitCode = 0; 318 } else { 319 System.out.println("Could not kill task " + taskid); 320 exitCode = -1; 321 } 322 } else if(failTask) { 323 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 324 Job job = cluster.getJob(taskID.getJobID()); 325 if (job == null) { 326 System.out.println("Could not find job " + jobid); 327 } else if(job.failTask(taskID)) { 328 System.out.println("Killed task " + taskID + " by failing it"); 329 exitCode = 0; 330 } else { 331 System.out.println("Could not fail task " + taskid); 332 exitCode = -1; 333 } 334 } else if (logs) { 335 try { 336 JobID jobID = JobID.forName(jobid); 337 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 338 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 339 LogDumper logDumper = new LogDumper(); 340 logDumper.setConf(getConf()); 341 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(), 342 logParams.getContainerId(), logParams.getNodeId(), 343 logParams.getOwner()); 344 } catch (IOException e) { 345 if (e instanceof RemoteException) { 346 throw e; 347 } 348 System.out.println(e.getMessage()); 349 } 350 } 351 } catch (RemoteException re) { 352 IOException unwrappedException = re.unwrapRemoteException(); 353 if (unwrappedException instanceof AccessControlException) { 354 System.out.println(unwrappedException.getMessage()); 355 } else { 356 throw re; 357 } 358 } finally { 359 cluster.close(); 360 } 361 return exitCode; 362 } 363 364 private String getJobPriorityNames() { 365 StringBuffer sb = new StringBuffer(); 366 for (JobPriority p : JobPriority.values()) { 367 sb.append(p.name()).append(" "); 368 } 369 return sb.substring(0, sb.length()-1); 370 } 371 372 private String getTaskTypess() { 373 StringBuffer sb = new StringBuffer(); 374 for (TaskType t : TaskType.values()) { 375 sb.append(t.name()).append(" "); 376 } 377 return sb.substring(0, sb.length()-1); 378 } 379 380 /** 381 * Display usage of the command-line tool and terminate execution. 382 */ 383 private void displayUsage(String cmd) { 384 String prefix = "Usage: CLI "; 385 String jobPriorityValues = getJobPriorityNames(); 386 String taskTypes = getTaskTypess(); 387 String taskStates = "running, completed"; 388 if ("-submit".equals(cmd)) { 389 System.err.println(prefix + "[" + cmd + " <job-file>]"); 390 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 391 System.err.println(prefix + "[" + cmd + " <job-id>]"); 392 } else if ("-counter".equals(cmd)) { 393 System.err.println(prefix + "[" + cmd + 394 " <job-id> <group-name> <counter-name>]"); 395 } else if ("-events".equals(cmd)) { 396 System.err.println(prefix + "[" + cmd + 397 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 398 } else if ("-history".equals(cmd)) { 399 System.err.println(prefix + "[" + cmd + " <jobHistoryFile>]"); 400 } else if ("-list".equals(cmd)) { 401 System.err.println(prefix + "[" + cmd + " [all]]"); 402 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 403 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 404 } else if ("-set-priority".equals(cmd)) { 405 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 406 "Valid values for priorities are: " 407 + jobPriorityValues); 408 } else if ("-list-active-trackers".equals(cmd)) { 409 System.err.println(prefix + "[" + cmd + "]"); 410 } else if ("-list-blacklisted-trackers".equals(cmd)) { 411 System.err.println(prefix + "[" + cmd + "]"); 412 } else if ("-list-attempt-ids".equals(cmd)) { 413 System.err.println(prefix + "[" + cmd + 414 " <job-id> <task-type> <task-state>]. " + 415 "Valid values for <task-type> are " + taskTypes + ". " + 416 "Valid values for <task-state> are " + taskStates); 417 } else if ("-logs".equals(cmd)) { 418 System.err.println(prefix + "[" + cmd + 419 " <job-id> <task-attempt-id>]. " + 420 " <task-attempt-id> is optional to get task attempt logs."); 421 } else { 422 System.err.printf(prefix + "<command> <args>\n"); 423 System.err.printf("\t[-submit <job-file>]\n"); 424 System.err.printf("\t[-status <job-id>]\n"); 425 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]\n"); 426 System.err.printf("\t[-kill <job-id>]\n"); 427 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 428 "Valid values for priorities are: " + jobPriorityValues + "\n"); 429 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]\n"); 430 System.err.printf("\t[-history <jobHistoryFile>]\n"); 431 System.err.printf("\t[-list [all]]\n"); 432 System.err.printf("\t[-list-active-trackers]\n"); 433 System.err.printf("\t[-list-blacklisted-trackers]\n"); 434 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 435 "<task-state>]. " + 436 "Valid values for <task-type> are " + taskTypes + ". " + 437 "Valid values for <task-state> are " + taskStates); 438 System.err.printf("\t[-kill-task <task-attempt-id>]\n"); 439 System.err.printf("\t[-fail-task <task-attempt-id>]\n"); 440 System.err.printf("\t[-logs <job-id> <task-attempt-id>]\n\n"); 441 ToolRunner.printGenericCommandUsage(System.out); 442 } 443 } 444 445 private void viewHistory(String historyFile, boolean all) 446 throws IOException { 447 HistoryViewer historyViewer = new HistoryViewer(historyFile, 448 getConf(), all); 449 historyViewer.print(); 450 } 451 452 protected long getCounter(Counters counters, String counterGroupName, 453 String counterName) throws IOException { 454 return counters.findCounter(counterGroupName, counterName).getValue(); 455 } 456 457 /** 458 * List the events for the given job 459 * @param jobId the job id for the job's events to list 460 * @throws IOException 461 */ 462 private void listEvents(Job job, int fromEventId, int numEvents) 463 throws IOException, InterruptedException { 464 TaskCompletionEvent[] events = job. 465 getTaskCompletionEvents(fromEventId, numEvents); 466 System.out.println("Task completion events for " + job.getJobID()); 467 System.out.println("Number of events (from " + fromEventId + ") are: " 468 + events.length); 469 for(TaskCompletionEvent event: events) { 470 System.out.println(event.getStatus() + " " + 471 event.getTaskAttemptId() + " " + 472 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 473 } 474 } 475 476 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 477 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 478 } 479 480 481 /** 482 * Dump a list of currently running jobs 483 * @throws IOException 484 */ 485 private void listJobs(Cluster cluster) 486 throws IOException, InterruptedException { 487 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 488 for (JobStatus job : cluster.getAllJobStatuses()) { 489 if (!job.isJobComplete()) { 490 runningJobs.add(job); 491 } 492 } 493 displayJobList(runningJobs.toArray(new JobStatus[0])); 494 } 495 496 /** 497 * Dump a list of all jobs submitted. 498 * @throws IOException 499 */ 500 private void listAllJobs(Cluster cluster) 501 throws IOException, InterruptedException { 502 displayJobList(cluster.getAllJobStatuses()); 503 } 504 505 /** 506 * Display the list of active trackers 507 */ 508 private void listActiveTrackers(Cluster cluster) 509 throws IOException, InterruptedException { 510 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 511 for (TaskTrackerInfo tracker : trackers) { 512 System.out.println(tracker.getTaskTrackerName()); 513 } 514 } 515 516 /** 517 * Display the list of blacklisted trackers 518 */ 519 private void listBlacklistedTrackers(Cluster cluster) 520 throws IOException, InterruptedException { 521 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 522 if (trackers.length > 0) { 523 System.out.println("BlackListedNode \t Reason"); 524 } 525 for (TaskTrackerInfo tracker : trackers) { 526 System.out.println(tracker.getTaskTrackerName() + "\t" + 527 tracker.getReasonForBlacklist()); 528 } 529 } 530 531 private void printTaskAttempts(TaskReport report) { 532 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 533 System.out.println(report.getSuccessfulTaskAttemptId()); 534 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 535 for (TaskAttemptID t : 536 report.getRunningTaskAttemptIds()) { 537 System.out.println(t); 538 } 539 } 540 } 541 542 /** 543 * Display the information about a job's tasks, of a particular type and 544 * in a particular state 545 * 546 * @param job the job 547 * @param type the type of the task (map/reduce/setup/cleanup) 548 * @param state the state of the task 549 * (pending/running/completed/failed/killed) 550 */ 551 protected void displayTasks(Job job, String type, String state) 552 throws IOException, InterruptedException { 553 TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type)); 554 for (TaskReport report : reports) { 555 TIPStatus status = report.getCurrentStatus(); 556 if ((state.equals("pending") && status ==TIPStatus.PENDING) || 557 (state.equals("running") && status ==TIPStatus.RUNNING) || 558 (state.equals("completed") && status == TIPStatus.COMPLETE) || 559 (state.equals("failed") && status == TIPStatus.FAILED) || 560 (state.equals("killed") && status == TIPStatus.KILLED)) { 561 printTaskAttempts(report); 562 } 563 } 564 } 565 566 public void displayJobList(JobStatus[] jobs) 567 throws IOException, InterruptedException { 568 displayJobList(jobs, new PrintWriter(System.out)); 569 } 570 571 @Private 572 public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 573 @Private 574 public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 575 private static String memPattern = "%dM"; 576 private static String UNAVAILABLE = "N/A"; 577 578 @Private 579 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 580 writer.println("Total jobs:" + jobs.length); 581 writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName", 582 "Queue", "Priority", "UsedContainers", 583 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 584 for (JobStatus job : jobs) { 585 int numUsedSlots = job.getNumUsedSlots(); 586 int numReservedSlots = job.getNumReservedSlots(); 587 int usedMem = job.getUsedMem(); 588 int rsvdMem = job.getReservedMem(); 589 int neededMem = job.getNeededMem(); 590 writer.printf(dataPattern, 591 job.getJobID().toString(), job.getState(), job.getStartTime(), 592 job.getUsername(), job.getQueue(), 593 job.getPriority().name(), 594 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 595 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 596 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 597 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 598 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 599 job.getSchedulingInfo()); 600 } 601 writer.flush(); 602 } 603 604 public static void main(String[] argv) throws Exception { 605 int res = ToolRunner.run(new CLI(), argv); 606 System.exit(res); 607 } 608 }