001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.jobcontrol;
020    
021    
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.List;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.mapreduce.Job;
034    import org.apache.hadoop.mapreduce.JobID;
035    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
036    import org.apache.hadoop.util.StringUtils;
037    
038    /** 
039     *  This class encapsulates a MapReduce job and its dependency. It monitors 
040     *  the states of the depending jobs and updates the state of this job.
041     *  A job starts in the WAITING state. If it does not have any depending jobs,
042     *  or all of the depending jobs are in SUCCESS state, then the job state 
043     *  will become READY. If any depending jobs fail, the job will fail too. 
044     *  When in READY state, the job can be submitted to Hadoop for execution, with
045     *  the state changing into RUNNING state. From RUNNING state, the job 
046     *  can get into SUCCESS or FAILED state, depending 
047     *  the status of the job execution.
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Evolving
051    public class ControlledJob {
052      private static final Log LOG = LogFactory.getLog(ControlledJob.class);
053    
054      // A job will be in one of the following states
055      public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
056                                DEPENDENT_FAILED}; 
057      public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
058      private State state;
059      private String controlID;     // assigned and used by JobControl class
060      private Job job;               // mapreduce job to be executed.
061      // some info for human consumption, e.g. the reason why the job failed
062      private String message;
063      // the jobs the current job depends on
064      private List<ControlledJob> dependingJobs;
065            
066      /** 
067       * Construct a job.
068       * @param job a mapreduce job to be executed.
069       * @param dependingJobs an array of jobs the current job depends on
070       */
071      public ControlledJob(Job job, List<ControlledJob> dependingJobs) 
072          throws IOException {
073        this.job = job;
074        this.dependingJobs = dependingJobs;
075        this.state = State.WAITING;
076        this.controlID = "unassigned";
077        this.message = "just initialized";
078      }
079      
080      /**
081       * Construct a job.
082       * 
083       * @param conf mapred job configuration representing a job to be executed.
084       * @throws IOException
085       */
086      public ControlledJob(Configuration conf) throws IOException {
087        this(new Job(conf), null);
088      }
089            
090      @Override
091      public String toString() {
092        StringBuffer sb = new StringBuffer();
093        sb.append("job name:\t").append(this.job.getJobName()).append("\n");
094        sb.append("job id:\t").append(this.controlID).append("\n");
095        sb.append("job state:\t").append(this.state).append("\n");
096        sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
097        sb.append("job message:\t").append(this.message).append("\n");
098                    
099        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
100          sb.append("job has no depending job:\t").append("\n");
101        } else {
102          sb.append("job has ").append(this.dependingJobs.size()).
103             append(" dependeng jobs:\n");
104          for (int i = 0; i < this.dependingJobs.size(); i++) {
105            sb.append("\t depending job ").append(i).append(":\t");
106            sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
107          }
108        }
109        return sb.toString();
110      }
111            
112      /**
113       * @return the job name of this job
114       */
115      public String getJobName() {
116        return job.getJobName();
117      }
118            
119      /**
120       * Set the job name for  this job.
121       * @param jobName the job name
122       */
123      public void setJobName(String jobName) {
124        job.setJobName(jobName);
125      }
126            
127      /**
128       * @return the job ID of this job assigned by JobControl
129       */
130      public String getJobID() {
131        return this.controlID;
132      }
133            
134      /**
135       * Set the job ID for  this job.
136       * @param id the job ID
137       */
138      public void setJobID(String id) {
139        this.controlID = id;
140      }
141            
142      /**
143       * @return the mapred ID of this job as assigned by the 
144       * mapred framework.
145       */
146      public JobID getMapredJobID() {
147        return this.job.getJobID();
148      }
149      
150      /**
151       * @return the mapreduce job 
152       */
153      public synchronized Job getJob() {
154        return this.job;
155      }
156    
157      /**
158       * Set the mapreduce job
159       * @param job the mapreduce job for this job.
160       */
161      public synchronized void setJob(Job job) {
162        this.job = job;
163      }
164    
165      /**
166       * @return the state of this job
167       */
168      public synchronized State getJobState() {
169        return this.state;
170      }
171            
172      /**
173       * Set the state for this job.
174       * @param state the new state for this job.
175       */
176      protected synchronized void setJobState(State state) {
177        this.state = state;
178      }
179            
180      /**
181       * @return the message of this job
182       */
183      public synchronized String getMessage() {
184        return this.message;
185      }
186    
187      /**
188       * Set the message for this job.
189       * @param message the message for this job.
190       */
191      public synchronized void setMessage(String message) {
192        this.message = message;
193      }
194    
195      /**
196       * @return the depending jobs of this job
197       */
198      public List<ControlledJob> getDependentJobs() {
199        return this.dependingJobs;
200      }
201      
202      /**
203       * Add a job to this jobs' dependency list. 
204       * Dependent jobs can only be added while a Job 
205       * is waiting to run, not during or afterwards.
206       * 
207       * @param dependingJob Job that this Job depends on.
208       * @return <tt>true</tt> if the Job was added.
209       */
210      public synchronized boolean addDependingJob(ControlledJob dependingJob) {
211        if (this.state == State.WAITING) { //only allowed to add jobs when waiting
212          if (this.dependingJobs == null) {
213            this.dependingJobs = new ArrayList<ControlledJob>();
214          }
215          return this.dependingJobs.add(dependingJob);
216        } else {
217          return false;
218        }
219      }
220            
221      /**
222       * @return true if this job is in a complete state
223       */
224      public synchronized boolean isCompleted() {
225        return this.state == State.FAILED || 
226          this.state == State.DEPENDENT_FAILED ||
227          this.state == State.SUCCESS;
228      }
229            
230      /**
231       * @return true if this job is in READY state
232       */
233      public synchronized boolean isReady() {
234        return this.state == State.READY;
235      }
236    
237      public void killJob() throws IOException, InterruptedException {
238        job.killJob();
239      }
240      
241      public synchronized void failJob(String message) throws IOException, InterruptedException {
242        try {
243          if(job != null && this.state == State.RUNNING) {
244            job.killJob();
245          }
246        } finally {
247          this.state = State.FAILED;
248          this.message = message;
249        }
250      }
251      
252      /**
253       * Check the state of this running job. The state may 
254       * remain the same, become SUCCESS or FAILED.
255       */
256      private void checkRunningState() throws IOException, InterruptedException {
257        try {
258          if (job.isComplete()) {
259            if (job.isSuccessful()) {
260              this.state = State.SUCCESS;
261            } else {
262              this.state = State.FAILED;
263              this.message = "Job failed!";
264            }
265          }
266        } catch (IOException ioe) {
267          this.state = State.FAILED;
268          this.message = StringUtils.stringifyException(ioe);
269          try {
270            if (job != null) {
271              job.killJob();
272            }
273          } catch (IOException e) {}
274        }
275      }
276            
277      /**
278       * Check and update the state of this job. The state changes  
279       * depending on its current state and the states of the depending jobs.
280       */
281       synchronized State checkState() throws IOException, InterruptedException {
282        if (this.state == State.RUNNING) {
283          checkRunningState();
284        }
285        if (this.state != State.WAITING) {
286          return this.state;
287        }
288        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
289          this.state = State.READY;
290          return this.state;
291        }
292        ControlledJob pred = null;
293        int n = this.dependingJobs.size();
294        for (int i = 0; i < n; i++) {
295          pred = this.dependingJobs.get(i);
296          State s = pred.checkState();
297          if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
298            break; // a pred is still not completed, continue in WAITING
299            // state
300          }
301          if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
302            this.state = State.DEPENDENT_FAILED;
303            this.message = "depending job " + i + " with jobID "
304              + pred.getJobID() + " failed. " + pred.getMessage();
305            break;
306          }
307          // pred must be in success state
308          if (i == n - 1) {
309            this.state = State.READY;
310          }
311        }
312    
313        return this.state;
314      }
315            
316      /**
317       * Submit this job to mapred. The state becomes RUNNING if submission 
318       * is successful, FAILED otherwise.  
319       */
320      protected synchronized void submit() {
321        try {
322          Configuration conf = job.getConfiguration();
323          if (conf.getBoolean(CREATE_DIR, false)) {
324            FileSystem fs = FileSystem.get(conf);
325            Path inputPaths[] = FileInputFormat.getInputPaths(job);
326            for (int i = 0; i < inputPaths.length; i++) {
327              if (!fs.exists(inputPaths[i])) {
328                try {
329                  fs.mkdirs(inputPaths[i]);
330                } catch (IOException e) {
331    
332                }
333              }
334            }
335          }
336          job.submit();
337          this.state = State.RUNNING;
338        } catch (Exception ioe) {
339          LOG.info(getJobName()+" got an error while submitting ",ioe);
340          this.state = State.FAILED;
341          this.message = StringUtils.stringifyException(ioe);
342        }
343      }
344            
345    }