001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.mapreduce.lib.output;
019    
020    import org.apache.hadoop.classification.InterfaceAudience;
021    import org.apache.hadoop.classification.InterfaceStability;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.mapreduce.*;
024    import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
025    import org.apache.hadoop.util.ReflectionUtils;
026    
027    import java.io.IOException;
028    import java.util.*;
029    
030    /**
031     * The MultipleOutputs class simplifies writing output data 
032     * to multiple outputs
033     * 
034     * <p> 
035     * Case one: writing to additional outputs other than the job default output.
036     *
037     * Each additional output, or named output, may be configured with its own
038     * <code>OutputFormat</code>, with its own key class and with its own value
039     * class.
040     * 
041     * <p>
042     * Case two: to write data to different files provided by user
043     * </p>
044     * 
045     * <p>
046     * MultipleOutputs supports counters, by default they are disabled. The 
047     * counters group is the {@link MultipleOutputs} class name. The names of the 
048     * counters are the same as the output name. These count the number records 
049     * written to each output name.
050     * </p>
051     * 
052     * Usage pattern for job submission:
053     * <pre>
054     *
055     * Job job = new Job();
056     *
057     * FileInputFormat.setInputPath(job, inDir);
058     * FileOutputFormat.setOutputPath(job, outDir);
059     *
060     * job.setMapperClass(MOMap.class);
061     * job.setReducerClass(MOReduce.class);
062     * ...
063     *
064     * // Defines additional single text based output 'text' for the job
065     * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
066     * LongWritable.class, Text.class);
067     *
068     * // Defines additional sequence-file based output 'sequence' for the job
069     * MultipleOutputs.addNamedOutput(job, "seq",
070     *   SequenceFileOutputFormat.class,
071     *   LongWritable.class, Text.class);
072     * ...
073     *
074     * job.waitForCompletion(true);
075     * ...
076     * </pre>
077     * <p>
078     * Usage in Reducer:
079     * <pre>
080     * <K, V> String generateFileName(K k, V v) {
081     *   return k.toString() + "_" + v.toString();
082     * }
083     * 
084     * public class MOReduce extends
085     *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
086     * private MultipleOutputs mos;
087     * public void setup(Context context) {
088     * ...
089     * mos = new MultipleOutputs(context);
090     * }
091     *
092     * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
093     * Context context)
094     * throws IOException {
095     * ...
096     * mos.write("text", , key, new Text("Hello"));
097     * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
098     * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
099     * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
100     * ...
101     * }
102     *
103     * public void cleanup(Context) throws IOException {
104     * mos.close();
105     * ...
106     * }
107     *
108     * }
109     * </pre>
110     */
111    @InterfaceAudience.Public
112    @InterfaceStability.Stable
113    public class MultipleOutputs<KEYOUT, VALUEOUT> {
114    
115      private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
116    
117      private static final String MO_PREFIX = 
118        "mapreduce.multipleoutputs.namedOutput.";
119    
120      private static final String FORMAT = ".format";
121      private static final String KEY = ".key";
122      private static final String VALUE = ".value";
123      private static final String COUNTERS_ENABLED = 
124        "mapreduce.multipleoutputs.counters";
125    
126      /**
127       * Counters group used by the counters of MultipleOutputs.
128       */
129      private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
130    
131      /**
132       * Cache for the taskContexts
133       */
134      private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
135      /**
136       * Cached TaskAttemptContext which uses the job's configured settings
137       */
138      private TaskAttemptContext jobOutputFormatContext;
139    
140      /**
141       * Checks if a named output name is valid token.
142       *
143       * @param namedOutput named output Name
144       * @throws IllegalArgumentException if the output name is not valid.
145       */
146      private static void checkTokenName(String namedOutput) {
147        if (namedOutput == null || namedOutput.length() == 0) {
148          throw new IllegalArgumentException(
149            "Name cannot be NULL or emtpy");
150        }
151        for (char ch : namedOutput.toCharArray()) {
152          if ((ch >= 'A') && (ch <= 'Z')) {
153            continue;
154          }
155          if ((ch >= 'a') && (ch <= 'z')) {
156            continue;
157          }
158          if ((ch >= '0') && (ch <= '9')) {
159            continue;
160          }
161          throw new IllegalArgumentException(
162            "Name cannot be have a '" + ch + "' char");
163        }
164      }
165    
166      /**
167       * Checks if output name is valid.
168       *
169       * name cannot be the name used for the default output
170       * @param outputPath base output Name
171       * @throws IllegalArgumentException if the output name is not valid.
172       */
173      private static void checkBaseOutputPath(String outputPath) {
174        if (outputPath.equals(FileOutputFormat.PART)) {
175          throw new IllegalArgumentException("output name cannot be 'part'");
176        }
177      }
178      
179      /**
180       * Checks if a named output name is valid.
181       *
182       * @param namedOutput named output Name
183       * @throws IllegalArgumentException if the output name is not valid.
184       */
185      private static void checkNamedOutputName(JobContext job,
186          String namedOutput, boolean alreadyDefined) {
187        checkTokenName(namedOutput);
188        checkBaseOutputPath(namedOutput);
189        List<String> definedChannels = getNamedOutputsList(job);
190        if (alreadyDefined && definedChannels.contains(namedOutput)) {
191          throw new IllegalArgumentException("Named output '" + namedOutput +
192            "' already alreadyDefined");
193        } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
194          throw new IllegalArgumentException("Named output '" + namedOutput +
195            "' not defined");
196        }
197      }
198    
199      // Returns list of channel names.
200      private static List<String> getNamedOutputsList(JobContext job) {
201        List<String> names = new ArrayList<String>();
202        StringTokenizer st = new StringTokenizer(
203          job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
204        while (st.hasMoreTokens()) {
205          names.add(st.nextToken());
206        }
207        return names;
208      }
209    
210      // Returns the named output OutputFormat.
211      @SuppressWarnings("unchecked")
212      private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
213        JobContext job, String namedOutput) {
214        return (Class<? extends OutputFormat<?, ?>>)
215          job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
216          OutputFormat.class);
217      }
218    
219      // Returns the key class for a named output.
220      private static Class<?> getNamedOutputKeyClass(JobContext job,
221                                                    String namedOutput) {
222        return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
223          Object.class);
224      }
225    
226      // Returns the value class for a named output.
227      private static Class<?> getNamedOutputValueClass(
228          JobContext job, String namedOutput) {
229        return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
230          null, Object.class);
231      }
232    
233      /**
234       * Adds a named output for the job.
235       * <p/>
236       *
237       * @param job               job to add the named output
238       * @param namedOutput       named output name, it has to be a word, letters
239       *                          and numbers only, cannot be the word 'part' as
240       *                          that is reserved for the default output.
241       * @param outputFormatClass OutputFormat class.
242       * @param keyClass          key class
243       * @param valueClass        value class
244       */
245      @SuppressWarnings("unchecked")
246      public static void addNamedOutput(Job job, String namedOutput,
247          Class<? extends OutputFormat> outputFormatClass,
248          Class<?> keyClass, Class<?> valueClass) {
249        checkNamedOutputName(job, namedOutput, true);
250        Configuration conf = job.getConfiguration();
251        conf.set(MULTIPLE_OUTPUTS,
252          conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
253        conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
254          OutputFormat.class);
255        conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
256        conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
257      }
258    
259      /**
260       * Enables or disables counters for the named outputs.
261       * 
262       * The counters group is the {@link MultipleOutputs} class name.
263       * The names of the counters are the same as the named outputs. These
264       * counters count the number records written to each output name.
265       * By default these counters are disabled.
266       *
267       * @param job    job  to enable counters
268       * @param enabled indicates if the counters will be enabled or not.
269       */
270      public static void setCountersEnabled(Job job, boolean enabled) {
271        job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
272      }
273    
274      /**
275       * Returns if the counters for the named outputs are enabled or not.
276       * By default these counters are disabled.
277       *
278       * @param job    the job 
279       * @return TRUE if the counters are enabled, FALSE if they are disabled.
280       */
281      public static boolean getCountersEnabled(JobContext job) {
282        return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
283      }
284    
285      /**
286       * Wraps RecordWriter to increment counters. 
287       */
288      @SuppressWarnings("unchecked")
289      private static class RecordWriterWithCounter extends RecordWriter {
290        private RecordWriter writer;
291        private String counterName;
292        private TaskInputOutputContext context;
293    
294        public RecordWriterWithCounter(RecordWriter writer, String counterName,
295                                       TaskInputOutputContext context) {
296          this.writer = writer;
297          this.counterName = counterName;
298          this.context = context;
299        }
300    
301        @SuppressWarnings({"unchecked"})
302        public void write(Object key, Object value) 
303            throws IOException, InterruptedException {
304          context.getCounter(COUNTERS_GROUP, counterName).increment(1);
305          writer.write(key, value);
306        }
307    
308        public void close(TaskAttemptContext context) 
309            throws IOException, InterruptedException {
310          writer.close(context);
311        }
312      }
313    
314      // instance code, to be used from Mapper/Reducer code
315    
316      private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
317      private Set<String> namedOutputs;
318      private Map<String, RecordWriter<?, ?>> recordWriters;
319      private boolean countersEnabled;
320      
321      /**
322       * Creates and initializes multiple outputs support,
323       * it should be instantiated in the Mapper/Reducer setup method.
324       *
325       * @param context the TaskInputOutputContext object
326       */
327      public MultipleOutputs(
328          TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
329        this.context = context;
330        namedOutputs = Collections.unmodifiableSet(
331          new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
332        recordWriters = new HashMap<String, RecordWriter<?, ?>>();
333        countersEnabled = getCountersEnabled(context);
334      }
335    
336      /**
337       * Write key and value to the namedOutput.
338       *
339       * Output path is a unique file generated for the namedOutput.
340       * For example, {namedOutput}-(m|r)-{part-number}
341       * 
342       * @param namedOutput the named output name
343       * @param key         the key
344       * @param value       the value
345       */
346      @SuppressWarnings("unchecked")
347      public <K, V> void write(String namedOutput, K key, V value)
348          throws IOException, InterruptedException {
349        write(namedOutput, key, value, namedOutput);
350      }
351    
352      /**
353       * Write key and value to baseOutputPath using the namedOutput.
354       * 
355       * @param namedOutput    the named output name
356       * @param key            the key
357       * @param value          the value
358       * @param baseOutputPath base-output path to write the record to.
359       * Note: Framework will generate unique filename for the baseOutputPath
360       */
361      @SuppressWarnings("unchecked")
362      public <K, V> void write(String namedOutput, K key, V value,
363          String baseOutputPath) throws IOException, InterruptedException {
364        checkNamedOutputName(context, namedOutput, false);
365        checkBaseOutputPath(baseOutputPath);
366        if (!namedOutputs.contains(namedOutput)) {
367          throw new IllegalArgumentException("Undefined named output '" +
368            namedOutput + "'");
369        }
370        TaskAttemptContext taskContext = getContext(namedOutput);
371        getRecordWriter(taskContext, baseOutputPath).write(key, value);
372      }
373    
374      /**
375       * Write key value to an output file name.
376       * 
377       * Gets the record writer from job's output format.  
378       * Job's output format should be a FileOutputFormat.
379       * 
380       * @param key       the key
381       * @param value     the value
382       * @param baseOutputPath base-output path to write the record to.
383       * Note: Framework will generate unique filename for the baseOutputPath
384       */
385      @SuppressWarnings("unchecked")
386      public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
387          throws IOException, InterruptedException {
388        checkBaseOutputPath(baseOutputPath);
389        if (jobOutputFormatContext == null) {
390          jobOutputFormatContext = 
391            new TaskAttemptContextImpl(context.getConfiguration(), 
392                                       context.getTaskAttemptID(),
393                                       new WrappedStatusReporter(context));
394        }
395        getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
396      }
397    
398      // by being synchronized MultipleOutputTask can be use with a
399      // MultithreadedMapper.
400      @SuppressWarnings("unchecked")
401      private synchronized RecordWriter getRecordWriter(
402          TaskAttemptContext taskContext, String baseFileName) 
403          throws IOException, InterruptedException {
404        
405        // look for record-writer in the cache
406        RecordWriter writer = recordWriters.get(baseFileName);
407        
408        // If not in cache, create a new one
409        if (writer == null) {
410          // get the record writer from context output format
411          FileOutputFormat.setOutputName(taskContext, baseFileName);
412          try {
413            writer = ((OutputFormat) ReflectionUtils.newInstance(
414              taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
415              .getRecordWriter(taskContext);
416          } catch (ClassNotFoundException e) {
417            throw new IOException(e);
418          }
419     
420          // if counters are enabled, wrap the writer with context 
421          // to increment counters 
422          if (countersEnabled) {
423            writer = new RecordWriterWithCounter(writer, baseFileName, context);
424          }
425          
426          // add the record-writer to the cache
427          recordWriters.put(baseFileName, writer);
428        }
429        return writer;
430      }
431    
432       // Create a taskAttemptContext for the named output with 
433       // output format and output key/value types put in the context
434      private TaskAttemptContext getContext(String nameOutput) throws IOException {
435          
436        TaskAttemptContext taskContext = taskContexts.get(nameOutput);
437        
438        if (taskContext != null) {
439            return taskContext;
440        }
441        
442        // The following trick leverages the instantiation of a record writer via
443        // the job thus supporting arbitrary output formats.
444        Job job = new Job(context.getConfiguration());
445        job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
446        job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
447        job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
448        taskContext = new TaskAttemptContextImpl(job.getConfiguration(), context
449            .getTaskAttemptID(), new WrappedStatusReporter(context));
450    
451        taskContexts.put(nameOutput, taskContext);
452    
453        return taskContext;
454      }
455    
456      private static class WrappedStatusReporter extends StatusReporter {
457    
458        TaskAttemptContext context;
459    
460        public WrappedStatusReporter(TaskAttemptContext context) {
461          this.context = context;
462        }
463    
464        @Override
465        public Counter getCounter(Enum<?> name) {
466          return context.getCounter(name);
467        }
468    
469        @Override
470        public Counter getCounter(String group, String name) {
471          return context.getCounter(group, name);
472        }
473    
474        @Override
475        public void progress() {
476          context.progress();
477        }
478    
479        @Override
480        public float getProgress() {
481          return context.getProgress();
482        }
483        
484        @Override
485        public void setStatus(String status) {
486          context.setStatus(status);
487        }
488      }
489    
490      /**
491       * Closes all the opened outputs.
492       * 
493       * This should be called from cleanup method of map/reduce task.
494       * If overridden subclasses must invoke <code>super.close()</code> at the
495       * end of their <code>close()</code>
496       * 
497       */
498      @SuppressWarnings("unchecked")
499      public void close() throws IOException, InterruptedException {
500        for (RecordWriter writer : recordWriters.values()) {
501          writer.close(context);
502        }
503      }
504    }