001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 026 /** 027 * <code>OutputCommitter</code> describes the commit of task output for a 028 * Map-Reduce job. 029 * 030 * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of 031 * the job to:<p> 032 * <ol> 033 * <li> 034 * Setup the job during initialization. For example, create the temporary 035 * output directory for the job during the initialization of the job. 036 * </li> 037 * <li> 038 * Cleanup the job after the job completion. For example, remove the 039 * temporary output directory after the job completion. 040 * </li> 041 * <li> 042 * Setup the task temporary output. 043 * </li> 044 * <li> 045 * Check whether a task needs a commit. This is to avoid the commit 046 * procedure if a task does not need commit. 047 * </li> 048 * <li> 049 * Commit of the task output. 050 * </li> 051 * <li> 052 * Discard the task commit. 053 * </li> 054 * </ol> 055 * 056 * @see FileOutputCommitter 057 * @see JobContext 058 * @see TaskAttemptContext 059 */ 060 @InterfaceAudience.Public 061 @InterfaceStability.Stable 062 public abstract class OutputCommitter 063 extends org.apache.hadoop.mapreduce.OutputCommitter { 064 /** 065 * For the framework to setup the job output during initialization 066 * 067 * @param jobContext Context of the job whose output is being written. 068 * @throws IOException if temporary output could not be created 069 */ 070 public abstract void setupJob(JobContext jobContext) throws IOException; 071 072 /** 073 * For cleaning up the job's output after job completion 074 * 075 * @param jobContext Context of the job whose output is being written. 076 * @throws IOException 077 * @deprecated Use {@link #commitJob(JobContext)} or 078 * {@link #abortJob(JobContext, int)} instead. 079 */ 080 @Deprecated 081 public void cleanupJob(JobContext jobContext) throws IOException { } 082 083 /** 084 * For committing job's output after successful job completion. Note that this 085 * is invoked for jobs with final runstate as SUCCESSFUL. 086 * 087 * @param jobContext Context of the job whose output is being written. 088 * @throws IOException 089 */ 090 public void commitJob(JobContext jobContext) throws IOException { 091 cleanupJob(jobContext); 092 } 093 094 /** 095 * For aborting an unsuccessful job's output. Note that this is invoked for 096 * jobs with final runstate as {@link JobStatus#FAILED} or 097 * {@link JobStatus#KILLED} 098 * 099 * @param jobContext Context of the job whose output is being written. 100 * @param status final runstate of the job 101 * @throws IOException 102 */ 103 public void abortJob(JobContext jobContext, int status) 104 throws IOException { 105 cleanupJob(jobContext); 106 } 107 108 /** 109 * Sets up output for the task. 110 * 111 * @param taskContext Context of the task whose output is being written. 112 * @throws IOException 113 */ 114 public abstract void setupTask(TaskAttemptContext taskContext) 115 throws IOException; 116 117 /** 118 * Check whether task needs a commit 119 * 120 * @param taskContext 121 * @return true/false 122 * @throws IOException 123 */ 124 public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) 125 throws IOException; 126 127 /** 128 * To promote the task's temporary output to final output location 129 * 130 * The task's output is moved to the job's output directory. 131 * 132 * @param taskContext Context of the task whose output is being written. 133 * @throws IOException if commit is not 134 */ 135 public abstract void commitTask(TaskAttemptContext taskContext) 136 throws IOException; 137 138 /** 139 * Discard the task output 140 * 141 * @param taskContext 142 * @throws IOException 143 */ 144 public abstract void abortTask(TaskAttemptContext taskContext) 145 throws IOException; 146 147 /** 148 * This method implements the new interface by calling the old method. Note 149 * that the input types are different between the new and old apis and this 150 * is a bridge between the two. 151 */ 152 @Override 153 public boolean isRecoverySupported() { 154 return false; 155 } 156 157 /** 158 * Recover the task output. 159 * 160 * The retry-count for the job will be passed via the 161 * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in 162 * {@link TaskAttemptContext#getConfiguration()} for the 163 * <code>OutputCommitter</code>. 164 * 165 * If an exception is thrown the task will be attempted again. 166 * 167 * @param taskContext Context of the task whose output is being recovered 168 * @throws IOException 169 */ 170 public void recoverTask(TaskAttemptContext taskContext) 171 throws IOException { 172 } 173 174 /** 175 * This method implements the new interface by calling the old method. Note 176 * that the input types are different between the new and old apis and this 177 * is a bridge between the two. 178 */ 179 @Override 180 public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext 181 ) throws IOException { 182 setupJob((JobContext) jobContext); 183 } 184 185 /** 186 * This method implements the new interface by calling the old method. Note 187 * that the input types are different between the new and old apis and this 188 * is a bridge between the two. 189 * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)} 190 * or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)} 191 * instead. 192 */ 193 @Override 194 @Deprecated 195 public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context 196 ) throws IOException { 197 cleanupJob((JobContext) context); 198 } 199 200 /** 201 * This method implements the new interface by calling the old method. Note 202 * that the input types are different between the new and old apis and this 203 * is a bridge between the two. 204 */ 205 @Override 206 public final void commitJob(org.apache.hadoop.mapreduce.JobContext context 207 ) throws IOException { 208 commitJob((JobContext) context); 209 } 210 211 /** 212 * This method implements the new interface by calling the old method. Note 213 * that the input types are different between the new and old apis and this 214 * is a bridge between the two. 215 */ 216 @Override 217 public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 218 org.apache.hadoop.mapreduce.JobStatus.State runState) 219 throws IOException { 220 int state = JobStatus.getOldNewJobRunState(runState); 221 if (state != JobStatus.FAILED && state != JobStatus.KILLED) { 222 throw new IOException ("Invalid job run state : " + runState.name()); 223 } 224 abortJob((JobContext) context, state); 225 } 226 227 /** 228 * This method implements the new interface by calling the old method. Note 229 * that the input types are different between the new and old apis and this 230 * is a bridge between the two. 231 */ 232 @Override 233 public final 234 void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 235 ) throws IOException { 236 setupTask((TaskAttemptContext) taskContext); 237 } 238 239 /** 240 * This method implements the new interface by calling the old method. Note 241 * that the input types are different between the new and old apis and this 242 * is a bridge between the two. 243 */ 244 @Override 245 public final boolean 246 needsTaskCommit(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 247 ) throws IOException { 248 return needsTaskCommit((TaskAttemptContext) taskContext); 249 } 250 251 /** 252 * This method implements the new interface by calling the old method. Note 253 * that the input types are different between the new and old apis and this 254 * is a bridge between the two. 255 */ 256 @Override 257 public final 258 void commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 259 ) throws IOException { 260 commitTask((TaskAttemptContext) taskContext); 261 } 262 263 /** 264 * This method implements the new interface by calling the old method. Note 265 * that the input types are different between the new and old apis and this 266 * is a bridge between the two. 267 */ 268 @Override 269 public final 270 void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 271 ) throws IOException { 272 abortTask((TaskAttemptContext) taskContext); 273 } 274 275 /** 276 * This method implements the new interface by calling the old method. Note 277 * that the input types are different between the new and old apis and this 278 * is a bridge between the two. 279 */ 280 @Override 281 public final 282 void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext 283 ) throws IOException { 284 recoverTask((TaskAttemptContext) taskContext); 285 } 286 287 }