001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.yarn.applications.distributedshell;
020    
021    import java.io.BufferedReader;
022    import java.io.IOException;
023    import java.io.InputStreamReader;
024    import java.net.InetSocketAddress;
025    import java.net.URI;
026    import java.net.URISyntaxException;
027    import java.util.ArrayList;
028    import java.util.HashMap;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Vector;
032    import java.util.concurrent.CopyOnWriteArrayList;
033    import java.util.concurrent.atomic.AtomicInteger;
034    
035    import org.apache.commons.cli.CommandLine;
036    import org.apache.commons.cli.GnuParser;
037    import org.apache.commons.cli.HelpFormatter;
038    import org.apache.commons.cli.Options;
039    import org.apache.commons.cli.ParseException;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    import org.apache.hadoop.classification.InterfaceAudience;
044    import org.apache.hadoop.classification.InterfaceStability;
045    import org.apache.hadoop.conf.Configuration;
046    import org.apache.hadoop.net.NetUtils;
047    import org.apache.hadoop.yarn.api.AMRMProtocol;
048    import org.apache.hadoop.yarn.api.ApplicationConstants;
049    import org.apache.hadoop.yarn.api.ContainerManager;
050    
051    import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
052    import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
053    import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
054    //import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
055    //import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
056    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
057    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
058    import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
059    
060    import org.apache.hadoop.yarn.api.records.AMResponse;
061    import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
062    import org.apache.hadoop.yarn.api.records.Container;
063    import org.apache.hadoop.yarn.api.records.ContainerId;
064    import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
065    import org.apache.hadoop.yarn.api.records.ContainerState;
066    import org.apache.hadoop.yarn.api.records.ContainerStatus;
067    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
068    import org.apache.hadoop.yarn.api.records.LocalResource;
069    import org.apache.hadoop.yarn.api.records.LocalResourceType;
070    import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
071    import org.apache.hadoop.yarn.api.records.Priority;
072    import org.apache.hadoop.yarn.api.records.Resource;
073    import org.apache.hadoop.yarn.api.records.ResourceRequest;
074    import org.apache.hadoop.yarn.conf.YarnConfiguration;
075    import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
076    import org.apache.hadoop.yarn.ipc.YarnRPC;
077    import org.apache.hadoop.yarn.util.ConverterUtils;
078    import org.apache.hadoop.yarn.util.Records;
079    
080    /**
081     * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. 
082     * 
083     * <p>This class is meant to act as an example on how to write yarn-based application masters. </p>
084     * 
085     * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher. 
086     * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with 
087     * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code>
088     * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client
089     * as well as a tracking url that a client can use to keep track of status/job history if needed. </p>
090     * 
091     * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals
092     * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the 
093     * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat.
094     * 
095     * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the 
096     * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest}
097     * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements.
098     * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code> 
099     * of the set of newly allocated containers, completed containers as well as current state of available resources. </p>
100     * 
101     * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via 
102     * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, 
103     * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} 
104     * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p>
105     *  
106     * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code> 
107     * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} 
108     * by querying for the status of the allocated container's {@link ContainerId}.
109     * 
110     * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest} 
111     * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed. 
112     */
113    @InterfaceAudience.Public
114    @InterfaceStability.Unstable
115    public class ApplicationMaster {
116    
117      private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
118    
119      // Configuration 
120      private Configuration conf;
121      // YARN RPC to communicate with the Resource Manager or Node Manager
122      private YarnRPC rpc;
123    
124      // Handle to communicate with the Resource Manager
125      private AMRMProtocol resourceManager;
126    
127      // Application Attempt Id ( combination of attemptId and fail count )
128      private ApplicationAttemptId appAttemptID;
129    
130      // TODO
131      // For status update for clients - yet to be implemented
132      // Hostname of the container 
133      private String appMasterHostname = "";
134      // Port on which the app master listens for status update requests from clients
135      private int appMasterRpcPort = 0;
136      // Tracking url to which app master publishes info for clients to monitor 
137      private String appMasterTrackingUrl = "";
138    
139      // App Master configuration
140      // No. of containers to run shell command on
141      private int numTotalContainers = 1;
142      // Memory to request for the container on which the shell command will run 
143      private int containerMemory = 10;
144      // Priority of the request
145      private int requestPriority; 
146    
147      // Incremental counter for rpc calls to the RM
148      private AtomicInteger rmRequestID = new AtomicInteger();
149    
150      // Simple flag to denote whether all works is done
151      private boolean appDone = false; 
152      // Counter for completed containers ( complete denotes successful or failed )
153      private AtomicInteger numCompletedContainers = new AtomicInteger();
154      // Allocated container count so that we know how many containers has the RM
155      // allocated to us
156      private AtomicInteger numAllocatedContainers = new AtomicInteger();
157      // Count of failed containers 
158      private AtomicInteger numFailedContainers = new AtomicInteger();
159      // Count of containers already requested from the RM
160      // Needed as once requested, we should not request for containers again and again. 
161      // Only request for more if the original requirement changes. 
162      private AtomicInteger numRequestedContainers = new AtomicInteger();
163    
164      // Shell command to be executed 
165      private String shellCommand = ""; 
166      // Args to be passed to the shell command
167      private String shellArgs = "";
168      // Env variables to be setup for the shell command 
169      private Map<String, String> shellEnv = new HashMap<String, String>();
170    
171      // Location of shell script ( obtained from info set in env )
172      // Shell script path in fs
173      private String shellScriptPath = ""; 
174      // Timestamp needed for creating a local resource
175      private long shellScriptPathTimestamp = 0;
176      // File length needed for local resource
177      private long shellScriptPathLen = 0;
178    
179      // Hardcoded path to shell script in launch container's local env
180      private final String ExecShellStringPath = "ExecShellScript.sh";
181    
182      // Containers to be released
183      private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
184    
185      // Launch threads
186      private List<Thread> launchThreads = new ArrayList<Thread>();
187    
188      /**
189       * @param args Command line args
190       */
191      public static void main(String[] args) {
192        boolean result = false;
193        try {
194          ApplicationMaster appMaster = new ApplicationMaster();
195          LOG.info("Initializing ApplicationMaster");
196          boolean doRun = appMaster.init(args);
197          if (!doRun) {
198            System.exit(0);
199          }
200          result = appMaster.run();
201        } catch (Throwable t) {
202          LOG.fatal("Error running ApplicationMaster", t);
203          System.exit(1);
204        }
205        if (result) {
206          LOG.info("Application Master completed successfully. exiting");
207          System.exit(0);
208        }
209        else {
210          LOG.info("Application Master failed. exiting");
211          System.exit(2);
212        }
213      }
214    
215      /**
216       * Dump out contents of $CWD and the environment to stdout for debugging
217       */
218      private void dumpOutDebugInfo() {
219    
220        LOG.info("Dump debug output");
221        Map<String, String> envs = System.getenv();
222        for (Map.Entry<String, String> env : envs.entrySet()) {
223          LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
224          System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
225        }
226    
227        String cmd = "ls -al";
228        Runtime run = Runtime.getRuntime();
229        Process pr = null;
230        try {
231          pr = run.exec(cmd);
232          pr.waitFor();
233    
234          BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
235          String line = "";
236          while ((line=buf.readLine())!=null) {
237            LOG.info("System CWD content: " + line);
238            System.out.println("System CWD content: " + line);
239          }
240          buf.close();
241        } catch (IOException e) {
242          e.printStackTrace();
243        } catch (InterruptedException e) {
244          e.printStackTrace();
245        } 
246      }
247    
248      public ApplicationMaster() throws Exception {
249        // Set up the configuration and RPC
250        conf = new Configuration();
251        rpc = YarnRPC.create(conf);
252      }
253      /**
254       * Parse command line options
255       * @param args Command line args 
256       * @return Whether init successful and run should be invoked 
257       * @throws ParseException
258       * @throws IOException 
259       */
260      public boolean init(String[] args) throws ParseException, IOException {
261    
262        Options opts = new Options();
263        opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
264        opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
265        opts.addOption("shell_script", true, "Location of the shell script to be executed");
266        opts.addOption("shell_args", true, "Command line args for the shell script");
267        opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
268        opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
269        opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
270        opts.addOption("priority", true, "Application Priority. Default 0");
271        opts.addOption("debug", false, "Dump out debug information");
272    
273        opts.addOption("help", false, "Print usage");
274        CommandLine cliParser = new GnuParser().parse(opts, args);
275    
276        if (args.length == 0) {
277          printUsage(opts);
278          throw new IllegalArgumentException("No args specified for application master to initialize");
279        }
280    
281        if (cliParser.hasOption("help")) {
282          printUsage(opts);
283          return false;
284        }
285    
286        if (cliParser.hasOption("debug")) {
287          dumpOutDebugInfo();
288        }
289    
290        Map<String, String> envs = System.getenv();
291    
292        appAttemptID = Records.newRecord(ApplicationAttemptId.class);
293        if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
294          if (cliParser.hasOption("app_attempt_id")) {
295            String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
296            appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
297          } 
298          else {
299            throw new IllegalArgumentException("Application Attempt Id not set in the environment");
300          }
301        } else {
302          ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
303          appAttemptID = containerId.getApplicationAttemptId();
304        }
305    
306        LOG.info("Application master for app"
307            + ", appId=" + appAttemptID.getApplicationId().getId()
308            + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
309            + ", attemptId=" + appAttemptID.getAttemptId());
310    
311        if (!cliParser.hasOption("shell_command")) {
312          throw new IllegalArgumentException("No shell command specified to be executed by application master");
313        }
314        shellCommand = cliParser.getOptionValue("shell_command");
315    
316        if (cliParser.hasOption("shell_args")) {
317          shellArgs = cliParser.getOptionValue("shell_args");
318        }
319        if (cliParser.hasOption("shell_env")) { 
320          String shellEnvs[] = cliParser.getOptionValues("shell_env");
321          for (String env : shellEnvs) {
322            env = env.trim();
323            int index = env.indexOf('=');
324            if (index == -1) {
325              shellEnv.put(env, "");
326              continue;
327            }
328            String key = env.substring(0, index);
329            String val = "";
330            if (index < (env.length()-1)) {
331              val = env.substring(index+1);
332            }
333            shellEnv.put(key, val);
334          }
335        }
336    
337        if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
338          shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
339    
340          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
341            shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
342          }
343          if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
344            shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
345          }
346    
347          if (!shellScriptPath.isEmpty()
348              && (shellScriptPathTimestamp <= 0 
349              || shellScriptPathLen <= 0)) {
350            LOG.error("Illegal values in env for shell script path"
351                + ", path=" + shellScriptPath
352                + ", len=" + shellScriptPathLen
353                + ", timestamp=" + shellScriptPathTimestamp);
354            throw new IllegalArgumentException("Illegal values in env for shell script path");
355          }
356        }
357    
358        containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
359        numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
360        requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
361    
362        return true;
363      }
364    
365      /**
366       * Helper function to print usage 
367       * @param opts Parsed command line options
368       */
369      private void printUsage(Options opts) {
370        new HelpFormatter().printHelp("ApplicationMaster", opts);
371      }
372    
373      /**
374       * Main run function for the application master
375       * @throws YarnRemoteException
376       */
377      public boolean run() throws YarnRemoteException {
378        LOG.info("Starting ApplicationMaster");
379    
380        // Connect to ResourceManager
381        resourceManager = connectToRM();
382    
383        // Setup local RPC Server to accept status requests directly from clients 
384        // TODO need to setup a protocol for client to be able to communicate to the RPC server 
385        // TODO use the rpc port info to register with the RM for the client to send requests to this app master
386    
387        // Register self with ResourceManager 
388        RegisterApplicationMasterResponse response = registerToRM();
389        // Dump out information about cluster capability as seen by the resource manager
390        int minMem = response.getMinimumResourceCapability().getMemory();
391        int maxMem = response.getMaximumResourceCapability().getMemory();
392        LOG.info("Min mem capabililty of resources in this cluster " + minMem);
393        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
394    
395        // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
396        // a multiple of the min value and cannot exceed the max. 
397        // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
398        if (containerMemory < minMem) {
399          LOG.info("Container memory specified below min threshold of cluster. Using min value."
400              + ", specified=" + containerMemory
401              + ", min=" + minMem);
402          containerMemory = minMem; 
403        } 
404        else if (containerMemory > maxMem) {
405          LOG.info("Container memory specified above max threshold of cluster. Using max value."
406              + ", specified=" + containerMemory
407              + ", max=" + maxMem);
408          containerMemory = maxMem;
409        }
410    
411        // Setup heartbeat emitter
412        // TODO poll RM every now and then with an empty request to let RM know that we are alive
413        // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 
414        // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
415        // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 
416        // is not required.
417    
418        // Setup ask for containers from RM
419        // Send request for containers to RM
420        // Until we get our fully allocated quota, we keep on polling RM for containers
421        // Keep looping until all the containers are launched and shell script executed on them 
422        // ( regardless of success/failure). 
423    
424        int loopCounter = -1;
425    
426        while (numCompletedContainers.get() < numTotalContainers
427            && !appDone) {
428          loopCounter++;
429    
430          // log current state
431          LOG.info("Current application state: loop=" + loopCounter 
432              + ", appDone=" + appDone
433              + ", total=" + numTotalContainers
434              + ", requested=" + numRequestedContainers
435              + ", completed=" + numCompletedContainers
436              + ", failed=" + numFailedContainers
437              + ", currentAllocated=" + numAllocatedContainers);
438    
439          // Sleep before each loop when asking RM for containers
440          // to avoid flooding RM with spurious requests when it 
441          // need not have any available containers 
442          // Sleeping for 1000 ms.
443          try {
444            Thread.sleep(1000);
445          } catch (InterruptedException e) {
446            LOG.info("Sleep interrupted " + e.getMessage());
447          }
448    
449          // No. of containers to request 
450          // For the first loop, askCount will be equal to total containers needed 
451          // From that point on, askCount will always be 0 as current implementation 
452          // does not change its ask on container failures. 
453          int askCount = numTotalContainers - numRequestedContainers.get();
454          numRequestedContainers.addAndGet(askCount);
455    
456          // Setup request to be sent to RM to allocate containers
457          List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
458          if (askCount > 0) {
459            ResourceRequest containerAsk = setupContainerAskForRM(askCount);
460            resourceReq.add(containerAsk);
461          }
462    
463          // Send the request to RM 
464          LOG.info("Asking RM for containers"
465              + ", askCount=" + askCount);
466          AMResponse amResp =sendContainerAskToRM(resourceReq);
467    
468          // Retrieve list of allocated containers from the response 
469          List<Container> allocatedContainers = amResp.getAllocatedContainers();
470          LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
471          numAllocatedContainers.addAndGet(allocatedContainers.size());
472          for (Container allocatedContainer : allocatedContainers) {
473            LOG.info("Launching shell command on a new container."
474                + ", containerId=" + allocatedContainer.getId()
475                + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
476                + ":" + allocatedContainer.getNodeId().getPort()
477                + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
478                + ", containerState" + allocatedContainer.getState()
479                + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
480            //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
481    
482            LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
483            Thread launchThread = new Thread(runnableLaunchContainer);
484    
485            // launch and start the container on a separate thread to keep the main thread unblocked
486            // as all containers may not be allocated at one go.
487            launchThreads.add(launchThread);
488            launchThread.start();
489          }
490    
491          // Check what the current available resources in the cluster are
492          // TODO should we do anything if the available resources are not enough? 
493          Resource availableResources = amResp.getAvailableResources();
494          LOG.info("Current available resources in the cluster " + availableResources);
495    
496          // Check the completed containers
497          List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
498          LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
499          for (ContainerStatus containerStatus : completedContainers) {
500            LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
501                + ", state=" + containerStatus.getState()
502                + ", exitStatus=" + containerStatus.getExitStatus() 
503                + ", diagnostics=" + containerStatus.getDiagnostics());
504    
505            // non complete containers should not be here 
506            assert(containerStatus.getState() == ContainerState.COMPLETE);
507    
508            // increment counters for completed/failed containers
509            int exitStatus = containerStatus.getExitStatus();
510            if (0 != exitStatus) {
511              // container failed 
512              if (-100 != exitStatus) {
513                // shell script failed
514                // counts as completed 
515                numCompletedContainers.incrementAndGet();
516                numFailedContainers.incrementAndGet();
517              }
518              else { 
519                // something else bad happened 
520                // app job did not complete for some reason 
521                // we should re-try as the container was lost for some reason
522                numAllocatedContainers.decrementAndGet();
523                numRequestedContainers.decrementAndGet();
524                // we do not need to release the container as it would be done
525                // by the RM/CM.
526              }
527            }
528            else { 
529              // nothing to do 
530              // container completed successfully 
531              numCompletedContainers.incrementAndGet();
532              LOG.info("Container completed successfully."
533                  + ", containerId=" + containerStatus.getContainerId());
534            }
535    
536          }
537          if (numCompletedContainers.get() == numTotalContainers) {
538            appDone = true;
539          }
540    
541          LOG.info("Current application state: loop=" + loopCounter
542              + ", appDone=" + appDone
543              + ", total=" + numTotalContainers
544              + ", requested=" + numRequestedContainers
545              + ", completed=" + numCompletedContainers
546              + ", failed=" + numFailedContainers
547              + ", currentAllocated=" + numAllocatedContainers);
548    
549          // TODO 
550          // Add a timeout handling layer 
551          // for misbehaving shell commands
552        }
553    
554        // Join all launched threads
555        // needed for when we time out 
556        // and we need to release containers
557        for (Thread launchThread : launchThreads) {
558          try {
559            launchThread.join(10000);
560          } catch (InterruptedException e) {
561            LOG.info("Exception thrown in thread join: " + e.getMessage());
562            e.printStackTrace();
563          }
564        }
565    
566        // When the application completes, it should send a finish application signal 
567        // to the RM
568        LOG.info("Application completed. Signalling finish to RM");
569    
570        FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
571        finishReq.setAppAttemptId(appAttemptID);
572        boolean isSuccess = true;
573        if (numFailedContainers.get() == 0) {
574          finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
575        }
576        else {
577          finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
578          String diagnostics = "Diagnostics."
579              + ", total=" + numTotalContainers
580              + ", completed=" + numCompletedContainers.get()
581              + ", allocated=" + numAllocatedContainers.get()
582              + ", failed=" + numFailedContainers.get();
583          finishReq.setDiagnostics(diagnostics);
584          isSuccess = false;
585        }
586        resourceManager.finishApplicationMaster(finishReq);
587        return isSuccess;
588      }
589    
590      /**
591       * Thread to connect to the {@link ContainerManager} and 
592       * launch the container that will execute the shell command. 
593       */
594      private class LaunchContainerRunnable implements Runnable {
595    
596        // Allocated container 
597        Container container;
598        // Handle to communicate with ContainerManager
599        ContainerManager cm;
600    
601        /**
602         * @param lcontainer Allocated container
603         */
604        public LaunchContainerRunnable(Container lcontainer) {
605          this.container = lcontainer;
606        }
607    
608        /**
609         * Helper function to connect to CM
610         */
611        private void connectToCM() {
612          LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
613          String cmIpPortStr = container.getNodeId().getHost() + ":"
614              + container.getNodeId().getPort();
615          InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
616          LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
617          this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
618        }
619    
620    
621        @Override
622        /**
623         * Connects to CM, sets up container launch context 
624         * for shell command and eventually dispatches the container 
625         * start request to the CM. 
626         */
627        public void run() {
628          // Connect to ContainerManager 
629          connectToCM();
630    
631          LOG.info("Setting up container launch container for containerid=" + container.getId());
632          ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
633    
634          ctx.setContainerId(container.getId());
635          ctx.setResource(container.getResource());
636    
637          String jobUserName = System.getenv(ApplicationConstants.Environment.USER
638              .name());
639          ctx.setUser(jobUserName);
640          LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName);
641    
642          // Set the environment 
643          ctx.setEnvironment(shellEnv);
644    
645          // Set the local resources 
646          Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
647    
648          // The container for the eventual shell commands needs its own local resources too. 
649          // In this scenario, if a shell script is specified, we need to have it copied 
650          // and made available to the container. 
651          if (!shellScriptPath.isEmpty()) {
652            LocalResource shellRsrc = Records.newRecord(LocalResource.class);
653            shellRsrc.setType(LocalResourceType.FILE);
654            shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
655            try {
656              shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
657            } catch (URISyntaxException e) {
658              LOG.error("Error when trying to use shell script path specified in env"
659                  + ", path=" + shellScriptPath);
660              e.printStackTrace();
661    
662              // A failure scenario on bad input such as invalid shell script path 
663              // We know we cannot continue launching the container 
664              // so we should release it.
665              // TODO
666              numCompletedContainers.incrementAndGet();
667              numFailedContainers.incrementAndGet();
668              return;
669            }
670            shellRsrc.setTimestamp(shellScriptPathTimestamp);
671            shellRsrc.setSize(shellScriptPathLen);
672            localResources.put(ExecShellStringPath, shellRsrc);
673          }
674          ctx.setLocalResources(localResources);
675    
676          // Set the necessary command to execute on the allocated container 
677          Vector<CharSequence> vargs = new Vector<CharSequence>(5);
678    
679          // Set executable command 
680          vargs.add(shellCommand);
681          // Set shell script path 
682          if (!shellScriptPath.isEmpty()) {
683            vargs.add(ExecShellStringPath);
684          }
685    
686          // Set args for the shell command if any
687          vargs.add(shellArgs);
688          // Add log redirect params
689          // TODO
690          // We should redirect the output to hdfs instead of local logs 
691          // so as to be able to look at the final output after the containers 
692          // have been released. 
693          // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] 
694          vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
695          vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
696    
697          // Get final commmand
698          StringBuilder command = new StringBuilder();
699          for (CharSequence str : vargs) {
700            command.append(str).append(" ");
701          }
702    
703          List<String> commands = new ArrayList<String>();
704          commands.add(command.toString());
705          ctx.setCommands(commands);
706    
707          StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
708          startReq.setContainerLaunchContext(ctx);
709          try {
710            cm.startContainer(startReq);
711          } catch (YarnRemoteException e) {
712            LOG.info("Start container failed for :"
713                + ", containerId=" + container.getId());
714            e.printStackTrace();
715            // TODO do we need to release this container? 
716          }
717    
718          // Get container status?
719          // Left commented out as the shell scripts are short lived 
720          // and we are relying on the status for completed containers from RM to detect status
721    
722          //    GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
723          //    statusReq.setContainerId(container.getId());
724          //    GetContainerStatusResponse statusResp;
725          //try {
726          //statusResp = cm.getContainerStatus(statusReq);
727          //    LOG.info("Container Status"
728          //    + ", id=" + container.getId()
729          //    + ", status=" +statusResp.getStatus());
730          //} catch (YarnRemoteException e) {
731          //e.printStackTrace();
732          //}
733        }
734      }
735    
736      /**
737       * Connect to the Resource Manager
738       * @return Handle to communicate with the RM
739       */
740      private AMRMProtocol connectToRM() {
741        YarnConfiguration yarnConf = new YarnConfiguration(conf);
742        InetSocketAddress rmAddress = yarnConf.getSocketAddr(
743            YarnConfiguration.RM_SCHEDULER_ADDRESS,
744            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
745            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
746        LOG.info("Connecting to ResourceManager at " + rmAddress);
747        return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
748      }
749    
750      /** 
751       * Register the Application Master to the Resource Manager
752       * @return the registration response from the RM
753       * @throws YarnRemoteException
754       */
755      private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
756        RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
757    
758        // set the required info into the registration request: 
759        // application attempt id, 
760        // host on which the app master is running
761        // rpc port on which the app master accepts requests from the client 
762        // tracking url for the app master
763        appMasterRequest.setApplicationAttemptId(appAttemptID);
764        appMasterRequest.setHost(appMasterHostname);
765        appMasterRequest.setRpcPort(appMasterRpcPort);
766        appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
767    
768        return resourceManager.registerApplicationMaster(appMasterRequest);
769      }
770    
771      /**
772       * Setup the request that will be sent to the RM for the container ask.
773       * @param numContainers Containers to ask for from RM
774       * @return the setup ResourceRequest to be sent to RM
775       */
776      private ResourceRequest setupContainerAskForRM(int numContainers) {
777        ResourceRequest request = Records.newRecord(ResourceRequest.class);
778    
779        // setup requirements for hosts 
780        // whether a particular rack/host is needed 
781        // Refer to apis under org.apache.hadoop.net for more 
782        // details on how to get figure out rack/host mapping.
783        // using * as any host will do for the distributed shell app
784        request.setHostName("*");
785    
786        // set no. of containers needed
787        request.setNumContainers(numContainers);
788    
789        // set the priority for the request
790        Priority pri = Records.newRecord(Priority.class);
791        // TODO - what is the range for priority? how to decide? 
792        pri.setPriority(requestPriority);
793        request.setPriority(pri);
794    
795        // Set up resource type requirements
796        // For now, only memory is supported so we set memory requirements
797        Resource capability = Records.newRecord(Resource.class);
798        capability.setMemory(containerMemory);
799        request.setCapability(capability);
800    
801        return request;
802      }
803    
804      /**
805       * Ask RM to allocate given no. of containers to this Application Master
806       * @param requestedContainers Containers to ask for from RM
807       * @return Response from RM to AM with allocated containers 
808       * @throws YarnRemoteException
809       */
810      private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
811          throws YarnRemoteException {
812        AllocateRequest req = Records.newRecord(AllocateRequest.class);
813        req.setResponseId(rmRequestID.incrementAndGet());
814        req.setApplicationAttemptId(appAttemptID);
815        req.addAllAsks(requestedContainers);
816        req.addAllReleases(releasedContainers);
817        req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
818    
819        LOG.info("Sending request to RM for containers"
820            + ", requestedSet=" + requestedContainers.size()
821            + ", releasedSet=" + releasedContainers.size()
822            + ", progress=" + req.getProgress());
823    
824        for (ResourceRequest  rsrcReq : requestedContainers) {
825          LOG.info("Requested container ask: " + rsrcReq.toString());
826        }
827        for (ContainerId id : releasedContainers) {
828          LOG.info("Released container, id=" + id.getId());
829        }
830    
831        AllocateResponse resp = resourceManager.allocate(req);
832        return resp.getAMResponse();
833      }
834    }