001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.yarn.applications.distributedshell; 020 021 import java.io.BufferedReader; 022 import java.io.IOException; 023 import java.io.InputStreamReader; 024 import java.net.InetSocketAddress; 025 import java.net.URI; 026 import java.net.URISyntaxException; 027 import java.util.ArrayList; 028 import java.util.HashMap; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Vector; 032 import java.util.concurrent.CopyOnWriteArrayList; 033 import java.util.concurrent.atomic.AtomicInteger; 034 035 import org.apache.commons.cli.CommandLine; 036 import org.apache.commons.cli.GnuParser; 037 import org.apache.commons.cli.HelpFormatter; 038 import org.apache.commons.cli.Options; 039 import org.apache.commons.cli.ParseException; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 import org.apache.hadoop.classification.InterfaceAudience; 044 import org.apache.hadoop.classification.InterfaceStability; 045 import org.apache.hadoop.conf.Configuration; 046 import org.apache.hadoop.net.NetUtils; 047 import org.apache.hadoop.yarn.api.AMRMProtocol; 048 import org.apache.hadoop.yarn.api.ApplicationConstants; 049 import org.apache.hadoop.yarn.api.ContainerManager; 050 051 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 052 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 053 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 054 //import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; 055 //import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; 056 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; 057 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 058 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 059 060 import org.apache.hadoop.yarn.api.records.AMResponse; 061 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 062 import org.apache.hadoop.yarn.api.records.Container; 063 import org.apache.hadoop.yarn.api.records.ContainerId; 064 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 065 import org.apache.hadoop.yarn.api.records.ContainerState; 066 import org.apache.hadoop.yarn.api.records.ContainerStatus; 067 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 068 import org.apache.hadoop.yarn.api.records.LocalResource; 069 import org.apache.hadoop.yarn.api.records.LocalResourceType; 070 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 071 import org.apache.hadoop.yarn.api.records.Priority; 072 import org.apache.hadoop.yarn.api.records.Resource; 073 import org.apache.hadoop.yarn.api.records.ResourceRequest; 074 import org.apache.hadoop.yarn.conf.YarnConfiguration; 075 import org.apache.hadoop.yarn.exceptions.YarnRemoteException; 076 import org.apache.hadoop.yarn.ipc.YarnRPC; 077 import org.apache.hadoop.yarn.util.ConverterUtils; 078 import org.apache.hadoop.yarn.util.Records; 079 080 /** 081 * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. 082 * 083 * <p>This class is meant to act as an example on how to write yarn-based application masters. </p> 084 * 085 * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher. 086 * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with 087 * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code> 088 * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client 089 * as well as a tracking url that a client can use to keep track of status/job history if needed. </p> 090 * 091 * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals 092 * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the 093 * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat. 094 * 095 * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the 096 * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest} 097 * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements. 098 * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code> 099 * of the set of newly allocated containers, completed containers as well as current state of available resources. </p> 100 * 101 * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via 102 * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, 103 * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} 104 * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p> 105 * 106 * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code> 107 * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} 108 * by querying for the status of the allocated container's {@link ContainerId}. 109 * 110 * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest} 111 * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed. 112 */ 113 @InterfaceAudience.Public 114 @InterfaceStability.Unstable 115 public class ApplicationMaster { 116 117 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 118 119 // Configuration 120 private Configuration conf; 121 // YARN RPC to communicate with the Resource Manager or Node Manager 122 private YarnRPC rpc; 123 124 // Handle to communicate with the Resource Manager 125 private AMRMProtocol resourceManager; 126 127 // Application Attempt Id ( combination of attemptId and fail count ) 128 private ApplicationAttemptId appAttemptID; 129 130 // TODO 131 // For status update for clients - yet to be implemented 132 // Hostname of the container 133 private String appMasterHostname = ""; 134 // Port on which the app master listens for status update requests from clients 135 private int appMasterRpcPort = 0; 136 // Tracking url to which app master publishes info for clients to monitor 137 private String appMasterTrackingUrl = ""; 138 139 // App Master configuration 140 // No. of containers to run shell command on 141 private int numTotalContainers = 1; 142 // Memory to request for the container on which the shell command will run 143 private int containerMemory = 10; 144 // Priority of the request 145 private int requestPriority; 146 147 // Incremental counter for rpc calls to the RM 148 private AtomicInteger rmRequestID = new AtomicInteger(); 149 150 // Simple flag to denote whether all works is done 151 private boolean appDone = false; 152 // Counter for completed containers ( complete denotes successful or failed ) 153 private AtomicInteger numCompletedContainers = new AtomicInteger(); 154 // Allocated container count so that we know how many containers has the RM 155 // allocated to us 156 private AtomicInteger numAllocatedContainers = new AtomicInteger(); 157 // Count of failed containers 158 private AtomicInteger numFailedContainers = new AtomicInteger(); 159 // Count of containers already requested from the RM 160 // Needed as once requested, we should not request for containers again and again. 161 // Only request for more if the original requirement changes. 162 private AtomicInteger numRequestedContainers = new AtomicInteger(); 163 164 // Shell command to be executed 165 private String shellCommand = ""; 166 // Args to be passed to the shell command 167 private String shellArgs = ""; 168 // Env variables to be setup for the shell command 169 private Map<String, String> shellEnv = new HashMap<String, String>(); 170 171 // Location of shell script ( obtained from info set in env ) 172 // Shell script path in fs 173 private String shellScriptPath = ""; 174 // Timestamp needed for creating a local resource 175 private long shellScriptPathTimestamp = 0; 176 // File length needed for local resource 177 private long shellScriptPathLen = 0; 178 179 // Hardcoded path to shell script in launch container's local env 180 private final String ExecShellStringPath = "ExecShellScript.sh"; 181 182 // Containers to be released 183 private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>(); 184 185 // Launch threads 186 private List<Thread> launchThreads = new ArrayList<Thread>(); 187 188 /** 189 * @param args Command line args 190 */ 191 public static void main(String[] args) { 192 boolean result = false; 193 try { 194 ApplicationMaster appMaster = new ApplicationMaster(); 195 LOG.info("Initializing ApplicationMaster"); 196 boolean doRun = appMaster.init(args); 197 if (!doRun) { 198 System.exit(0); 199 } 200 result = appMaster.run(); 201 } catch (Throwable t) { 202 LOG.fatal("Error running ApplicationMaster", t); 203 System.exit(1); 204 } 205 if (result) { 206 LOG.info("Application Master completed successfully. exiting"); 207 System.exit(0); 208 } 209 else { 210 LOG.info("Application Master failed. exiting"); 211 System.exit(2); 212 } 213 } 214 215 /** 216 * Dump out contents of $CWD and the environment to stdout for debugging 217 */ 218 private void dumpOutDebugInfo() { 219 220 LOG.info("Dump debug output"); 221 Map<String, String> envs = System.getenv(); 222 for (Map.Entry<String, String> env : envs.entrySet()) { 223 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 224 System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue()); 225 } 226 227 String cmd = "ls -al"; 228 Runtime run = Runtime.getRuntime(); 229 Process pr = null; 230 try { 231 pr = run.exec(cmd); 232 pr.waitFor(); 233 234 BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream())); 235 String line = ""; 236 while ((line=buf.readLine())!=null) { 237 LOG.info("System CWD content: " + line); 238 System.out.println("System CWD content: " + line); 239 } 240 buf.close(); 241 } catch (IOException e) { 242 e.printStackTrace(); 243 } catch (InterruptedException e) { 244 e.printStackTrace(); 245 } 246 } 247 248 public ApplicationMaster() throws Exception { 249 // Set up the configuration and RPC 250 conf = new Configuration(); 251 rpc = YarnRPC.create(conf); 252 } 253 /** 254 * Parse command line options 255 * @param args Command line args 256 * @return Whether init successful and run should be invoked 257 * @throws ParseException 258 * @throws IOException 259 */ 260 public boolean init(String[] args) throws ParseException, IOException { 261 262 Options opts = new Options(); 263 opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes"); 264 opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); 265 opts.addOption("shell_script", true, "Location of the shell script to be executed"); 266 opts.addOption("shell_args", true, "Command line args for the shell script"); 267 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 268 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 269 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 270 opts.addOption("priority", true, "Application Priority. Default 0"); 271 opts.addOption("debug", false, "Dump out debug information"); 272 273 opts.addOption("help", false, "Print usage"); 274 CommandLine cliParser = new GnuParser().parse(opts, args); 275 276 if (args.length == 0) { 277 printUsage(opts); 278 throw new IllegalArgumentException("No args specified for application master to initialize"); 279 } 280 281 if (cliParser.hasOption("help")) { 282 printUsage(opts); 283 return false; 284 } 285 286 if (cliParser.hasOption("debug")) { 287 dumpOutDebugInfo(); 288 } 289 290 Map<String, String> envs = System.getenv(); 291 292 appAttemptID = Records.newRecord(ApplicationAttemptId.class); 293 if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { 294 if (cliParser.hasOption("app_attempt_id")) { 295 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 296 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 297 } 298 else { 299 throw new IllegalArgumentException("Application Attempt Id not set in the environment"); 300 } 301 } else { 302 ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); 303 appAttemptID = containerId.getApplicationAttemptId(); 304 } 305 306 LOG.info("Application master for app" 307 + ", appId=" + appAttemptID.getApplicationId().getId() 308 + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() 309 + ", attemptId=" + appAttemptID.getAttemptId()); 310 311 if (!cliParser.hasOption("shell_command")) { 312 throw new IllegalArgumentException("No shell command specified to be executed by application master"); 313 } 314 shellCommand = cliParser.getOptionValue("shell_command"); 315 316 if (cliParser.hasOption("shell_args")) { 317 shellArgs = cliParser.getOptionValue("shell_args"); 318 } 319 if (cliParser.hasOption("shell_env")) { 320 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 321 for (String env : shellEnvs) { 322 env = env.trim(); 323 int index = env.indexOf('='); 324 if (index == -1) { 325 shellEnv.put(env, ""); 326 continue; 327 } 328 String key = env.substring(0, index); 329 String val = ""; 330 if (index < (env.length()-1)) { 331 val = env.substring(index+1); 332 } 333 shellEnv.put(key, val); 334 } 335 } 336 337 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 338 shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 339 340 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 341 shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 342 } 343 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 344 shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 345 } 346 347 if (!shellScriptPath.isEmpty() 348 && (shellScriptPathTimestamp <= 0 349 || shellScriptPathLen <= 0)) { 350 LOG.error("Illegal values in env for shell script path" 351 + ", path=" + shellScriptPath 352 + ", len=" + shellScriptPathLen 353 + ", timestamp=" + shellScriptPathTimestamp); 354 throw new IllegalArgumentException("Illegal values in env for shell script path"); 355 } 356 } 357 358 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 359 numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 360 requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 361 362 return true; 363 } 364 365 /** 366 * Helper function to print usage 367 * @param opts Parsed command line options 368 */ 369 private void printUsage(Options opts) { 370 new HelpFormatter().printHelp("ApplicationMaster", opts); 371 } 372 373 /** 374 * Main run function for the application master 375 * @throws YarnRemoteException 376 */ 377 public boolean run() throws YarnRemoteException { 378 LOG.info("Starting ApplicationMaster"); 379 380 // Connect to ResourceManager 381 resourceManager = connectToRM(); 382 383 // Setup local RPC Server to accept status requests directly from clients 384 // TODO need to setup a protocol for client to be able to communicate to the RPC server 385 // TODO use the rpc port info to register with the RM for the client to send requests to this app master 386 387 // Register self with ResourceManager 388 RegisterApplicationMasterResponse response = registerToRM(); 389 // Dump out information about cluster capability as seen by the resource manager 390 int minMem = response.getMinimumResourceCapability().getMemory(); 391 int maxMem = response.getMaximumResourceCapability().getMemory(); 392 LOG.info("Min mem capabililty of resources in this cluster " + minMem); 393 LOG.info("Max mem capabililty of resources in this cluster " + maxMem); 394 395 // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 396 // a multiple of the min value and cannot exceed the max. 397 // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min 398 if (containerMemory < minMem) { 399 LOG.info("Container memory specified below min threshold of cluster. Using min value." 400 + ", specified=" + containerMemory 401 + ", min=" + minMem); 402 containerMemory = minMem; 403 } 404 else if (containerMemory > maxMem) { 405 LOG.info("Container memory specified above max threshold of cluster. Using max value." 406 + ", specified=" + containerMemory 407 + ", max=" + maxMem); 408 containerMemory = maxMem; 409 } 410 411 // Setup heartbeat emitter 412 // TODO poll RM every now and then with an empty request to let RM know that we are alive 413 // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 414 // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS 415 // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 416 // is not required. 417 418 // Setup ask for containers from RM 419 // Send request for containers to RM 420 // Until we get our fully allocated quota, we keep on polling RM for containers 421 // Keep looping until all the containers are launched and shell script executed on them 422 // ( regardless of success/failure). 423 424 int loopCounter = -1; 425 426 while (numCompletedContainers.get() < numTotalContainers 427 && !appDone) { 428 loopCounter++; 429 430 // log current state 431 LOG.info("Current application state: loop=" + loopCounter 432 + ", appDone=" + appDone 433 + ", total=" + numTotalContainers 434 + ", requested=" + numRequestedContainers 435 + ", completed=" + numCompletedContainers 436 + ", failed=" + numFailedContainers 437 + ", currentAllocated=" + numAllocatedContainers); 438 439 // Sleep before each loop when asking RM for containers 440 // to avoid flooding RM with spurious requests when it 441 // need not have any available containers 442 // Sleeping for 1000 ms. 443 try { 444 Thread.sleep(1000); 445 } catch (InterruptedException e) { 446 LOG.info("Sleep interrupted " + e.getMessage()); 447 } 448 449 // No. of containers to request 450 // For the first loop, askCount will be equal to total containers needed 451 // From that point on, askCount will always be 0 as current implementation 452 // does not change its ask on container failures. 453 int askCount = numTotalContainers - numRequestedContainers.get(); 454 numRequestedContainers.addAndGet(askCount); 455 456 // Setup request to be sent to RM to allocate containers 457 List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>(); 458 if (askCount > 0) { 459 ResourceRequest containerAsk = setupContainerAskForRM(askCount); 460 resourceReq.add(containerAsk); 461 } 462 463 // Send the request to RM 464 LOG.info("Asking RM for containers" 465 + ", askCount=" + askCount); 466 AMResponse amResp =sendContainerAskToRM(resourceReq); 467 468 // Retrieve list of allocated containers from the response 469 List<Container> allocatedContainers = amResp.getAllocatedContainers(); 470 LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); 471 numAllocatedContainers.addAndGet(allocatedContainers.size()); 472 for (Container allocatedContainer : allocatedContainers) { 473 LOG.info("Launching shell command on a new container." 474 + ", containerId=" + allocatedContainer.getId() 475 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 476 + ":" + allocatedContainer.getNodeId().getPort() 477 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 478 + ", containerState" + allocatedContainer.getState() 479 + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); 480 //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString()); 481 482 LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer); 483 Thread launchThread = new Thread(runnableLaunchContainer); 484 485 // launch and start the container on a separate thread to keep the main thread unblocked 486 // as all containers may not be allocated at one go. 487 launchThreads.add(launchThread); 488 launchThread.start(); 489 } 490 491 // Check what the current available resources in the cluster are 492 // TODO should we do anything if the available resources are not enough? 493 Resource availableResources = amResp.getAvailableResources(); 494 LOG.info("Current available resources in the cluster " + availableResources); 495 496 // Check the completed containers 497 List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses(); 498 LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); 499 for (ContainerStatus containerStatus : completedContainers) { 500 LOG.info("Got container status for containerID= " + containerStatus.getContainerId() 501 + ", state=" + containerStatus.getState() 502 + ", exitStatus=" + containerStatus.getExitStatus() 503 + ", diagnostics=" + containerStatus.getDiagnostics()); 504 505 // non complete containers should not be here 506 assert(containerStatus.getState() == ContainerState.COMPLETE); 507 508 // increment counters for completed/failed containers 509 int exitStatus = containerStatus.getExitStatus(); 510 if (0 != exitStatus) { 511 // container failed 512 if (-100 != exitStatus) { 513 // shell script failed 514 // counts as completed 515 numCompletedContainers.incrementAndGet(); 516 numFailedContainers.incrementAndGet(); 517 } 518 else { 519 // something else bad happened 520 // app job did not complete for some reason 521 // we should re-try as the container was lost for some reason 522 numAllocatedContainers.decrementAndGet(); 523 numRequestedContainers.decrementAndGet(); 524 // we do not need to release the container as it would be done 525 // by the RM/CM. 526 } 527 } 528 else { 529 // nothing to do 530 // container completed successfully 531 numCompletedContainers.incrementAndGet(); 532 LOG.info("Container completed successfully." 533 + ", containerId=" + containerStatus.getContainerId()); 534 } 535 536 } 537 if (numCompletedContainers.get() == numTotalContainers) { 538 appDone = true; 539 } 540 541 LOG.info("Current application state: loop=" + loopCounter 542 + ", appDone=" + appDone 543 + ", total=" + numTotalContainers 544 + ", requested=" + numRequestedContainers 545 + ", completed=" + numCompletedContainers 546 + ", failed=" + numFailedContainers 547 + ", currentAllocated=" + numAllocatedContainers); 548 549 // TODO 550 // Add a timeout handling layer 551 // for misbehaving shell commands 552 } 553 554 // Join all launched threads 555 // needed for when we time out 556 // and we need to release containers 557 for (Thread launchThread : launchThreads) { 558 try { 559 launchThread.join(10000); 560 } catch (InterruptedException e) { 561 LOG.info("Exception thrown in thread join: " + e.getMessage()); 562 e.printStackTrace(); 563 } 564 } 565 566 // When the application completes, it should send a finish application signal 567 // to the RM 568 LOG.info("Application completed. Signalling finish to RM"); 569 570 FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); 571 finishReq.setAppAttemptId(appAttemptID); 572 boolean isSuccess = true; 573 if (numFailedContainers.get() == 0) { 574 finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); 575 } 576 else { 577 finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); 578 String diagnostics = "Diagnostics." 579 + ", total=" + numTotalContainers 580 + ", completed=" + numCompletedContainers.get() 581 + ", allocated=" + numAllocatedContainers.get() 582 + ", failed=" + numFailedContainers.get(); 583 finishReq.setDiagnostics(diagnostics); 584 isSuccess = false; 585 } 586 resourceManager.finishApplicationMaster(finishReq); 587 return isSuccess; 588 } 589 590 /** 591 * Thread to connect to the {@link ContainerManager} and 592 * launch the container that will execute the shell command. 593 */ 594 private class LaunchContainerRunnable implements Runnable { 595 596 // Allocated container 597 Container container; 598 // Handle to communicate with ContainerManager 599 ContainerManager cm; 600 601 /** 602 * @param lcontainer Allocated container 603 */ 604 public LaunchContainerRunnable(Container lcontainer) { 605 this.container = lcontainer; 606 } 607 608 /** 609 * Helper function to connect to CM 610 */ 611 private void connectToCM() { 612 LOG.debug("Connecting to ContainerManager for containerid=" + container.getId()); 613 String cmIpPortStr = container.getNodeId().getHost() + ":" 614 + container.getNodeId().getPort(); 615 InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); 616 LOG.info("Connecting to ContainerManager at " + cmIpPortStr); 617 this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf)); 618 } 619 620 621 @Override 622 /** 623 * Connects to CM, sets up container launch context 624 * for shell command and eventually dispatches the container 625 * start request to the CM. 626 */ 627 public void run() { 628 // Connect to ContainerManager 629 connectToCM(); 630 631 LOG.info("Setting up container launch container for containerid=" + container.getId()); 632 ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); 633 634 ctx.setContainerId(container.getId()); 635 ctx.setResource(container.getResource()); 636 637 String jobUserName = System.getenv(ApplicationConstants.Environment.USER 638 .name()); 639 ctx.setUser(jobUserName); 640 LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName); 641 642 // Set the environment 643 ctx.setEnvironment(shellEnv); 644 645 // Set the local resources 646 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 647 648 // The container for the eventual shell commands needs its own local resources too. 649 // In this scenario, if a shell script is specified, we need to have it copied 650 // and made available to the container. 651 if (!shellScriptPath.isEmpty()) { 652 LocalResource shellRsrc = Records.newRecord(LocalResource.class); 653 shellRsrc.setType(LocalResourceType.FILE); 654 shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); 655 try { 656 shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath))); 657 } catch (URISyntaxException e) { 658 LOG.error("Error when trying to use shell script path specified in env" 659 + ", path=" + shellScriptPath); 660 e.printStackTrace(); 661 662 // A failure scenario on bad input such as invalid shell script path 663 // We know we cannot continue launching the container 664 // so we should release it. 665 // TODO 666 numCompletedContainers.incrementAndGet(); 667 numFailedContainers.incrementAndGet(); 668 return; 669 } 670 shellRsrc.setTimestamp(shellScriptPathTimestamp); 671 shellRsrc.setSize(shellScriptPathLen); 672 localResources.put(ExecShellStringPath, shellRsrc); 673 } 674 ctx.setLocalResources(localResources); 675 676 // Set the necessary command to execute on the allocated container 677 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 678 679 // Set executable command 680 vargs.add(shellCommand); 681 // Set shell script path 682 if (!shellScriptPath.isEmpty()) { 683 vargs.add(ExecShellStringPath); 684 } 685 686 // Set args for the shell command if any 687 vargs.add(shellArgs); 688 // Add log redirect params 689 // TODO 690 // We should redirect the output to hdfs instead of local logs 691 // so as to be able to look at the final output after the containers 692 // have been released. 693 // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] 694 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 695 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 696 697 // Get final commmand 698 StringBuilder command = new StringBuilder(); 699 for (CharSequence str : vargs) { 700 command.append(str).append(" "); 701 } 702 703 List<String> commands = new ArrayList<String>(); 704 commands.add(command.toString()); 705 ctx.setCommands(commands); 706 707 StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); 708 startReq.setContainerLaunchContext(ctx); 709 try { 710 cm.startContainer(startReq); 711 } catch (YarnRemoteException e) { 712 LOG.info("Start container failed for :" 713 + ", containerId=" + container.getId()); 714 e.printStackTrace(); 715 // TODO do we need to release this container? 716 } 717 718 // Get container status? 719 // Left commented out as the shell scripts are short lived 720 // and we are relying on the status for completed containers from RM to detect status 721 722 // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class); 723 // statusReq.setContainerId(container.getId()); 724 // GetContainerStatusResponse statusResp; 725 //try { 726 //statusResp = cm.getContainerStatus(statusReq); 727 // LOG.info("Container Status" 728 // + ", id=" + container.getId() 729 // + ", status=" +statusResp.getStatus()); 730 //} catch (YarnRemoteException e) { 731 //e.printStackTrace(); 732 //} 733 } 734 } 735 736 /** 737 * Connect to the Resource Manager 738 * @return Handle to communicate with the RM 739 */ 740 private AMRMProtocol connectToRM() { 741 YarnConfiguration yarnConf = new YarnConfiguration(conf); 742 InetSocketAddress rmAddress = yarnConf.getSocketAddr( 743 YarnConfiguration.RM_SCHEDULER_ADDRESS, 744 YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, 745 YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); 746 LOG.info("Connecting to ResourceManager at " + rmAddress); 747 return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf)); 748 } 749 750 /** 751 * Register the Application Master to the Resource Manager 752 * @return the registration response from the RM 753 * @throws YarnRemoteException 754 */ 755 private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { 756 RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); 757 758 // set the required info into the registration request: 759 // application attempt id, 760 // host on which the app master is running 761 // rpc port on which the app master accepts requests from the client 762 // tracking url for the app master 763 appMasterRequest.setApplicationAttemptId(appAttemptID); 764 appMasterRequest.setHost(appMasterHostname); 765 appMasterRequest.setRpcPort(appMasterRpcPort); 766 appMasterRequest.setTrackingUrl(appMasterTrackingUrl); 767 768 return resourceManager.registerApplicationMaster(appMasterRequest); 769 } 770 771 /** 772 * Setup the request that will be sent to the RM for the container ask. 773 * @param numContainers Containers to ask for from RM 774 * @return the setup ResourceRequest to be sent to RM 775 */ 776 private ResourceRequest setupContainerAskForRM(int numContainers) { 777 ResourceRequest request = Records.newRecord(ResourceRequest.class); 778 779 // setup requirements for hosts 780 // whether a particular rack/host is needed 781 // Refer to apis under org.apache.hadoop.net for more 782 // details on how to get figure out rack/host mapping. 783 // using * as any host will do for the distributed shell app 784 request.setHostName("*"); 785 786 // set no. of containers needed 787 request.setNumContainers(numContainers); 788 789 // set the priority for the request 790 Priority pri = Records.newRecord(Priority.class); 791 // TODO - what is the range for priority? how to decide? 792 pri.setPriority(requestPriority); 793 request.setPriority(pri); 794 795 // Set up resource type requirements 796 // For now, only memory is supported so we set memory requirements 797 Resource capability = Records.newRecord(Resource.class); 798 capability.setMemory(containerMemory); 799 request.setCapability(capability); 800 801 return request; 802 } 803 804 /** 805 * Ask RM to allocate given no. of containers to this Application Master 806 * @param requestedContainers Containers to ask for from RM 807 * @return Response from RM to AM with allocated containers 808 * @throws YarnRemoteException 809 */ 810 private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) 811 throws YarnRemoteException { 812 AllocateRequest req = Records.newRecord(AllocateRequest.class); 813 req.setResponseId(rmRequestID.incrementAndGet()); 814 req.setApplicationAttemptId(appAttemptID); 815 req.addAllAsks(requestedContainers); 816 req.addAllReleases(releasedContainers); 817 req.setProgress((float)numCompletedContainers.get()/numTotalContainers); 818 819 LOG.info("Sending request to RM for containers" 820 + ", requestedSet=" + requestedContainers.size() 821 + ", releasedSet=" + releasedContainers.size() 822 + ", progress=" + req.getProgress()); 823 824 for (ResourceRequest rsrcReq : requestedContainers) { 825 LOG.info("Requested container ask: " + rsrcReq.toString()); 826 } 827 for (ContainerId id : releasedContainers) { 828 LOG.info("Released container, id=" + id.getId()); 829 } 830 831 AllocateResponse resp = resourceManager.allocate(req); 832 return resp.getAMResponse(); 833 } 834 }