001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    
026    /**
027     * <code>OutputCommitter</code> describes the commit of task output for a 
028     * Map-Reduce job.
029     *
030     * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 
031     * the job to:<p>
032     * <ol>
033     *   <li>
034     *   Setup the job during initialization. For example, create the temporary 
035     *   output directory for the job during the initialization of the job.
036     *   </li>
037     *   <li>
038     *   Cleanup the job after the job completion. For example, remove the
039     *   temporary output directory after the job completion. 
040     *   </li>
041     *   <li>
042     *   Setup the task temporary output.
043     *   </li> 
044     *   <li>
045     *   Check whether a task needs a commit. This is to avoid the commit
046     *   procedure if a task does not need commit.
047     *   </li>
048     *   <li>
049     *   Commit of the task output.
050     *   </li>  
051     *   <li>
052     *   Discard the task commit.
053     *   </li>
054     * </ol>
055     * 
056     * @see FileOutputCommitter 
057     * @see JobContext
058     * @see TaskAttemptContext 
059     */
060    @InterfaceAudience.Public
061    @InterfaceStability.Stable
062    public abstract class OutputCommitter 
063                    extends org.apache.hadoop.mapreduce.OutputCommitter {
064      /**
065       * For the framework to setup the job output during initialization
066       * 
067       * @param jobContext Context of the job whose output is being written.
068       * @throws IOException if temporary output could not be created
069       */
070      public abstract void setupJob(JobContext jobContext) throws IOException;
071    
072      /**
073       * For cleaning up the job's output after job completion
074       * 
075       * @param jobContext Context of the job whose output is being written.
076       * @throws IOException
077       * @deprecated Use {@link #commitJob(JobContext)} or 
078       *                 {@link #abortJob(JobContext, int)} instead.
079       */
080      @Deprecated
081      public void cleanupJob(JobContext jobContext) throws IOException { }
082    
083      /**
084       * For committing job's output after successful job completion. Note that this
085       * is invoked for jobs with final runstate as SUCCESSFUL.     
086       * 
087       * @param jobContext Context of the job whose output is being written.
088       * @throws IOException 
089       */
090      public void commitJob(JobContext jobContext) throws IOException {
091        cleanupJob(jobContext);
092      }
093      
094      /**
095       * For aborting an unsuccessful job's output. Note that this is invoked for 
096       * jobs with final runstate as {@link JobStatus#FAILED} or 
097       * {@link JobStatus#KILLED}
098       * 
099       * @param jobContext Context of the job whose output is being written.
100       * @param status final runstate of the job
101       * @throws IOException
102       */
103      public void abortJob(JobContext jobContext, int status) 
104      throws IOException {
105        cleanupJob(jobContext);
106      }
107      
108      /**
109       * Sets up output for the task.
110       * 
111       * @param taskContext Context of the task whose output is being written.
112       * @throws IOException
113       */
114      public abstract void setupTask(TaskAttemptContext taskContext)
115      throws IOException;
116      
117      /**
118       * Check whether task needs a commit
119       * 
120       * @param taskContext
121       * @return true/false
122       * @throws IOException
123       */
124      public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
125      throws IOException;
126    
127      /**
128       * To promote the task's temporary output to final output location
129       * 
130       * The task's output is moved to the job's output directory.
131       * 
132       * @param taskContext Context of the task whose output is being written.
133       * @throws IOException if commit is not 
134       */
135      public abstract void commitTask(TaskAttemptContext taskContext)
136      throws IOException;
137      
138      /**
139       * Discard the task output
140       * 
141       * @param taskContext
142       * @throws IOException
143       */
144      public abstract void abortTask(TaskAttemptContext taskContext)
145      throws IOException;
146    
147      /**
148       * This method implements the new interface by calling the old method. Note
149       * that the input types are different between the new and old apis and this
150       * is a bridge between the two.
151       */
152      @Override
153      public boolean isRecoverySupported() {
154        return false;
155      }
156    
157      /**
158       * Recover the task output. 
159       * 
160       * The retry-count for the job will be passed via the 
161       * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
162       * {@link TaskAttemptContext#getConfiguration()} for the 
163       * <code>OutputCommitter</code>.
164       * 
165       * If an exception is thrown the task will be attempted again. 
166       * 
167       * @param taskContext Context of the task whose output is being recovered
168       * @throws IOException
169       */
170      public void recoverTask(TaskAttemptContext taskContext) 
171      throws IOException {
172      }
173      
174      /**
175       * This method implements the new interface by calling the old method. Note
176       * that the input types are different between the new and old apis and this
177       * is a bridge between the two.
178       */
179      @Override
180      public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
181                                 ) throws IOException {
182        setupJob((JobContext) jobContext);
183      }
184    
185      /**
186       * This method implements the new interface by calling the old method. Note
187       * that the input types are different between the new and old apis and this
188       * is a bridge between the two.
189       * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
190       *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
191       *             instead.
192       */
193      @Override
194      @Deprecated
195      public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
196                                   ) throws IOException {
197        cleanupJob((JobContext) context);
198      }
199    
200      /**
201       * This method implements the new interface by calling the old method. Note
202       * that the input types are different between the new and old apis and this
203       * is a bridge between the two.
204       */
205      @Override
206      public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
207                                 ) throws IOException {
208        commitJob((JobContext) context);
209      }
210      
211      /**
212       * This method implements the new interface by calling the old method. Note
213       * that the input types are different between the new and old apis and this
214       * is a bridge between the two.
215       */
216      @Override
217      public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
218                                       org.apache.hadoop.mapreduce.JobStatus.State runState) 
219      throws IOException {
220        int state = JobStatus.getOldNewJobRunState(runState);
221        if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
222          throw new IOException ("Invalid job run state : " + runState.name());
223        }
224        abortJob((JobContext) context, state);
225      }
226      
227      /**
228       * This method implements the new interface by calling the old method. Note
229       * that the input types are different between the new and old apis and this
230       * is a bridge between the two.
231       */
232      @Override
233      public final 
234      void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
235                     ) throws IOException {
236        setupTask((TaskAttemptContext) taskContext);
237      }
238      
239      /**
240       * This method implements the new interface by calling the old method. Note
241       * that the input types are different between the new and old apis and this
242       * is a bridge between the two.
243       */
244      @Override
245      public final boolean 
246        needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
247                        ) throws IOException {
248        return needsTaskCommit((TaskAttemptContext) taskContext);
249      }
250    
251      /**
252       * This method implements the new interface by calling the old method. Note
253       * that the input types are different between the new and old apis and this
254       * is a bridge between the two.
255       */
256      @Override
257      public final 
258      void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
259                      ) throws IOException {
260        commitTask((TaskAttemptContext) taskContext);
261      }
262      
263      /**
264       * This method implements the new interface by calling the old method. Note
265       * that the input types are different between the new and old apis and this
266       * is a bridge between the two.
267       */
268      @Override
269      public final 
270      void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
271                     ) throws IOException {
272        abortTask((TaskAttemptContext) taskContext);
273      }
274      
275      /**
276       * This method implements the new interface by calling the old method. Note
277       * that the input types are different between the new and old apis and this
278       * is a bridge between the two.
279       */
280      @Override
281      public final 
282      void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
283          ) throws IOException {
284        recoverTask((TaskAttemptContext) taskContext);
285      }
286    
287    }