001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    
022    import java.io.IOException;
023    import java.net.URL;
024    import java.net.URLDecoder;
025    import java.util.Enumeration;
026    import java.util.regex.Pattern;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FileStatus;
034    import org.apache.hadoop.fs.FileSystem;
035    import org.apache.hadoop.fs.Path;
036    import org.apache.hadoop.io.LongWritable;
037    import org.apache.hadoop.io.RawComparator;
038    import org.apache.hadoop.io.Text;
039    import org.apache.hadoop.io.WritableComparable;
040    import org.apache.hadoop.io.WritableComparator;
041    import org.apache.hadoop.io.compress.CompressionCodec;
042    import org.apache.hadoop.mapred.lib.HashPartitioner;
043    import org.apache.hadoop.mapred.lib.IdentityMapper;
044    import org.apache.hadoop.mapred.lib.IdentityReducer;
045    import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
046    import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
047    import org.apache.hadoop.mapreduce.MRConfig;
048    import org.apache.hadoop.mapreduce.MRJobConfig;
049    import org.apache.hadoop.mapreduce.filecache.DistributedCache;
050    import org.apache.hadoop.mapreduce.util.ConfigUtil;
051    import org.apache.hadoop.security.Credentials;
052    import org.apache.hadoop.util.ReflectionUtils;
053    import org.apache.hadoop.util.Tool;
054    import org.apache.log4j.Level;
055    
056    /** 
057     * A map/reduce job configuration.
058     * 
059     * <p><code>JobConf</code> is the primary interface for a user to describe a 
060     * map-reduce job to the Hadoop framework for execution. The framework tries to
061     * faithfully execute the job as-is described by <code>JobConf</code>, however:
062     * <ol>
063     *   <li>
064     *   Some configuration parameters might have been marked as 
065     *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
066     *   final</a> by administrators and hence cannot be altered.
067     *   </li>
068     *   <li>
069     *   While some job parameters are straight-forward to set 
070     *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 
071     *   rest of the framework and/or job-configuration and is relatively more 
072     *   complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
073     *   </li>
074     * </ol></p>
075     * 
076     * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
077     * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
078     * {@link OutputFormat} implementations to be used etc.
079     *
080     * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
081     * of the job such as <code>Comparator</code>s to be used, files to be put in  
082     * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
083     * are to be compressed (and how), debugability via user-provided scripts 
084     * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
085     * for doing post-processing on task logs, task's stdout, stderr, syslog. 
086     * and etc.</p>
087     * 
088     * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
089     * <p><blockquote><pre>
090     *     // Create a new JobConf
091     *     JobConf job = new JobConf(new Configuration(), MyJob.class);
092     *     
093     *     // Specify various job-specific parameters     
094     *     job.setJobName("myjob");
095     *     
096     *     FileInputFormat.setInputPaths(job, new Path("in"));
097     *     FileOutputFormat.setOutputPath(job, new Path("out"));
098     *     
099     *     job.setMapperClass(MyJob.MyMapper.class);
100     *     job.setCombinerClass(MyJob.MyReducer.class);
101     *     job.setReducerClass(MyJob.MyReducer.class);
102     *     
103     *     job.setInputFormat(SequenceFileInputFormat.class);
104     *     job.setOutputFormat(SequenceFileOutputFormat.class);
105     * </pre></blockquote></p>
106     * 
107     * @see JobClient
108     * @see ClusterStatus
109     * @see Tool
110     * @see DistributedCache
111     */
112    @InterfaceAudience.Public
113    @InterfaceStability.Stable
114    public class JobConf extends Configuration {
115      
116      private static final Log LOG = LogFactory.getLog(JobConf.class);
117    
118      static{
119        ConfigUtil.loadResources();
120      }
121    
122      /**
123       * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
124       * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
125       */
126      @Deprecated
127      public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
128        "mapred.task.maxvmem";
129    
130      /**
131       * @deprecated 
132       */
133      @Deprecated
134      public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
135        "mapred.task.limit.maxvmem";
136    
137      /**
138       * @deprecated
139       */
140      @Deprecated
141      public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
142        "mapred.task.default.maxvmem";
143    
144      /**
145       * @deprecated
146       */
147      @Deprecated
148      public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
149        "mapred.task.maxpmem";
150    
151      /**
152       * A value which if set for memory related configuration options,
153       * indicates that the options are turned off.
154       */
155      public static final long DISABLED_MEMORY_LIMIT = -1L;
156    
157      /**
158       * Property name for the configuration property mapreduce.cluster.local.dir
159       */
160      public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
161    
162      /**
163       * Name of the queue to which jobs will be submitted, if no queue
164       * name is mentioned.
165       */
166      public static final String DEFAULT_QUEUE_NAME = "default";
167    
168      static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 
169          JobContext.MAP_MEMORY_MB;
170    
171      static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
172        JobContext.REDUCE_MEMORY_MB;
173    
174      /** Pattern for the default unpacking behavior for job jars */
175      public static final Pattern UNPACK_JAR_PATTERN_DEFAULT =
176        Pattern.compile("(?:classes/|lib/).*");
177    
178      /**
179       * Configuration key to set the java command line options for the child
180       * map and reduce tasks.
181       * 
182       * Java opts for the task tracker child processes.
183       * The following symbol, if present, will be interpolated: @taskid@. 
184       * It is replaced by current TaskID. Any other occurrences of '@' will go 
185       * unchanged.
186       * For example, to enable verbose gc logging to a file named for the taskid in
187       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
188       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
189       * 
190       * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 
191       * other environment variables to the child processes.
192       * 
193       * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 
194       *                 {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}
195       */
196      @Deprecated
197      public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts";
198      
199      /**
200       * Configuration key to set the java command line options for the map tasks.
201       * 
202       * Java opts for the task tracker child map processes.
203       * The following symbol, if present, will be interpolated: @taskid@. 
204       * It is replaced by current TaskID. Any other occurrences of '@' will go 
205       * unchanged.
206       * For example, to enable verbose gc logging to a file named for the taskid in
207       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
208       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
209       * 
210       * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 
211       * other environment variables to the map processes.
212       */
213      public static final String MAPRED_MAP_TASK_JAVA_OPTS = 
214        JobContext.MAP_JAVA_OPTS;
215      
216      /**
217       * Configuration key to set the java command line options for the reduce tasks.
218       * 
219       * Java opts for the task tracker child reduce processes.
220       * The following symbol, if present, will be interpolated: @taskid@. 
221       * It is replaced by current TaskID. Any other occurrences of '@' will go 
222       * unchanged.
223       * For example, to enable verbose gc logging to a file named for the taskid in
224       * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of:
225       *          -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc
226       * 
227       * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 
228       * pass process environment variables to the reduce processes.
229       */
230      public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
231        JobContext.REDUCE_JAVA_OPTS;
232      
233      public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
234      
235      /**
236       * @deprecated
237       * Configuration key to set the maximum virtual memory available to the child
238       * map and reduce tasks (in kilo-bytes). This has been deprecated and will no
239       * longer have any effect.
240       */
241      @Deprecated
242      public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit";
243    
244      /**
245       * @deprecated
246       * Configuration key to set the maximum virtual memory available to the
247       * map tasks (in kilo-bytes). This has been deprecated and will no
248       * longer have any effect.
249       */
250      @Deprecated
251      public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit";
252      
253      /**
254       * @deprecated
255       * Configuration key to set the maximum virtual memory available to the
256       * reduce tasks (in kilo-bytes). This has been deprecated and will no
257       * longer have any effect.
258       */
259      @Deprecated
260      public static final String MAPRED_REDUCE_TASK_ULIMIT =
261        "mapreduce.reduce.ulimit";
262    
263    
264      /**
265       * Configuration key to set the environment of the child map/reduce tasks.
266       * 
267       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
268       * reference existing environment variables via <code>$key</code>.
269       * 
270       * Example:
271       * <ul>
272       *   <li> A=foo - This will set the env variable A to foo. </li>
273       *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
274       * </ul>
275       * 
276       * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 
277       *                 {@link #MAPRED_REDUCE_TASK_ENV}
278       */
279      @Deprecated
280      public static final String MAPRED_TASK_ENV = "mapred.child.env";
281    
282      /**
283       * Configuration key to set the maximum virutal memory available to the
284       * map tasks.
285       * 
286       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
287       * reference existing environment variables via <code>$key</code>.
288       * 
289       * Example:
290       * <ul>
291       *   <li> A=foo - This will set the env variable A to foo. </li>
292       *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
293       * </ul>
294       */
295      public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
296      
297      /**
298       * Configuration key to set the maximum virutal memory available to the
299       * reduce tasks.
300       * 
301       * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 
302       * reference existing environment variables via <code>$key</code>.
303       * 
304       * Example:
305       * <ul>
306       *   <li> A=foo - This will set the env variable A to foo. </li>
307       *   <li> B=$X:c This is inherit tasktracker's X env variable. </li>
308       * </ul>
309       */
310      public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
311    
312      private Credentials credentials = new Credentials();
313      
314      /**
315       * Configuration key to set the logging {@link Level} for the map task.
316       *
317       * The allowed logging levels are:
318       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
319       */
320      public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
321        JobContext.MAP_LOG_LEVEL;
322      
323      /**
324       * Configuration key to set the logging {@link Level} for the reduce task.
325       *
326       * The allowed logging levels are:
327       * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
328       */
329      public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
330        JobContext.REDUCE_LOG_LEVEL;
331      
332      /**
333       * Default logging level for map/reduce tasks.
334       */
335      public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
336      
337      
338      /**
339       * Construct a map/reduce job configuration.
340       */
341      public JobConf() {
342        checkAndWarnDeprecation();
343      }
344    
345      /** 
346       * Construct a map/reduce job configuration.
347       * 
348       * @param exampleClass a class whose containing jar is used as the job's jar.
349       */
350      public JobConf(Class exampleClass) {
351        setJarByClass(exampleClass);
352        checkAndWarnDeprecation();
353      }
354      
355      /**
356       * Construct a map/reduce job configuration.
357       * 
358       * @param conf a Configuration whose settings will be inherited.
359       */
360      public JobConf(Configuration conf) {
361        super(conf);
362        
363        if (conf instanceof JobConf) {
364          JobConf that = (JobConf)conf;
365          credentials = that.credentials;
366        }
367        
368        checkAndWarnDeprecation();
369      }
370    
371    
372      /** Construct a map/reduce job configuration.
373       * 
374       * @param conf a Configuration whose settings will be inherited.
375       * @param exampleClass a class whose containing jar is used as the job's jar.
376       */
377      public JobConf(Configuration conf, Class exampleClass) {
378        this(conf);
379        setJarByClass(exampleClass);
380      }
381    
382    
383      /** Construct a map/reduce configuration.
384       *
385       * @param config a Configuration-format XML job description file.
386       */
387      public JobConf(String config) {
388        this(new Path(config));
389      }
390    
391      /** Construct a map/reduce configuration.
392       *
393       * @param config a Configuration-format XML job description file.
394       */
395      public JobConf(Path config) {
396        super();
397        addResource(config);
398        checkAndWarnDeprecation();
399      }
400    
401      /** A new map/reduce configuration where the behavior of reading from the
402       * default resources can be turned off.
403       * <p/>
404       * If the parameter {@code loadDefaults} is false, the new instance
405       * will not load resources from the default files.
406       *
407       * @param loadDefaults specifies whether to load from the default files
408       */
409      public JobConf(boolean loadDefaults) {
410        super(loadDefaults);
411        checkAndWarnDeprecation();
412      }
413    
414      /**
415       * Get credentials for the job.
416       * @return credentials for the job
417       */
418      public Credentials getCredentials() {
419        return credentials;
420      }
421      
422      void setCredentials(Credentials credentials) {
423        this.credentials = credentials;
424      }
425      
426      /**
427       * Get the user jar for the map-reduce job.
428       * 
429       * @return the user jar for the map-reduce job.
430       */
431      public String getJar() { return get(JobContext.JAR); }
432      
433      /**
434       * Set the user jar for the map-reduce job.
435       * 
436       * @param jar the user jar for the map-reduce job.
437       */
438      public void setJar(String jar) { set(JobContext.JAR, jar); }
439    
440      /**
441       * Get the pattern for jar contents to unpack on the tasktracker
442       */
443      public Pattern getJarUnpackPattern() {
444        return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT);
445      }
446    
447      
448      /**
449       * Set the job's jar file by finding an example class location.
450       * 
451       * @param cls the example class.
452       */
453      public void setJarByClass(Class cls) {
454        String jar = findContainingJar(cls);
455        if (jar != null) {
456          setJar(jar);
457        }   
458      }
459    
460      public String[] getLocalDirs() throws IOException {
461        return getTrimmedStrings(MRConfig.LOCAL_DIR);
462      }
463    
464      /**
465       * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
466       */
467      @Deprecated
468      public void deleteLocalFiles() throws IOException {
469        String[] localDirs = getLocalDirs();
470        for (int i = 0; i < localDirs.length; i++) {
471          FileSystem.getLocal(this).delete(new Path(localDirs[i]), true);
472        }
473      }
474    
475      public void deleteLocalFiles(String subdir) throws IOException {
476        String[] localDirs = getLocalDirs();
477        for (int i = 0; i < localDirs.length; i++) {
478          FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true);
479        }
480      }
481    
482      /** 
483       * Constructs a local file name. Files are distributed among configured
484       * local directories.
485       */
486      public Path getLocalPath(String pathString) throws IOException {
487        return getLocalPath(MRConfig.LOCAL_DIR, pathString);
488      }
489    
490      /**
491       * Get the reported username for this job.
492       * 
493       * @return the username
494       */
495      public String getUser() {
496        return get(JobContext.USER_NAME);
497      }
498      
499      /**
500       * Set the reported username for this job.
501       * 
502       * @param user the username for this job.
503       */
504      public void setUser(String user) {
505        set(JobContext.USER_NAME, user);
506      }
507    
508    
509      
510      /**
511       * Set whether the framework should keep the intermediate files for 
512       * failed tasks.
513       * 
514       * @param keep <code>true</code> if framework should keep the intermediate files 
515       *             for failed tasks, <code>false</code> otherwise.
516       * 
517       */
518      public void setKeepFailedTaskFiles(boolean keep) {
519        setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
520      }
521      
522      /**
523       * Should the temporary files for failed tasks be kept?
524       * 
525       * @return should the files be kept?
526       */
527      public boolean getKeepFailedTaskFiles() {
528        return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
529      }
530      
531      /**
532       * Set a regular expression for task names that should be kept. 
533       * The regular expression ".*_m_000123_0" would keep the files
534       * for the first instance of map 123 that ran.
535       * 
536       * @param pattern the java.util.regex.Pattern to match against the 
537       *        task names.
538       */
539      public void setKeepTaskFilesPattern(String pattern) {
540        set(JobContext.PRESERVE_FILES_PATTERN, pattern);
541      }
542      
543      /**
544       * Get the regular expression that is matched against the task names
545       * to see if we need to keep the files.
546       * 
547       * @return the pattern as a string, if it was set, othewise null.
548       */
549      public String getKeepTaskFilesPattern() {
550        return get(JobContext.PRESERVE_FILES_PATTERN);
551      }
552      
553      /**
554       * Set the current working directory for the default file system.
555       * 
556       * @param dir the new current working directory.
557       */
558      public void setWorkingDirectory(Path dir) {
559        dir = new Path(getWorkingDirectory(), dir);
560        set(JobContext.WORKING_DIR, dir.toString());
561      }
562      
563      /**
564       * Get the current working directory for the default file system.
565       * 
566       * @return the directory name.
567       */
568      public Path getWorkingDirectory() {
569        String name = get(JobContext.WORKING_DIR);
570        if (name != null) {
571          return new Path(name);
572        } else {
573          try {
574            Path dir = FileSystem.get(this).getWorkingDirectory();
575            set(JobContext.WORKING_DIR, dir.toString());
576            return dir;
577          } catch (IOException e) {
578            throw new RuntimeException(e);
579          }
580        }
581      }
582      
583      /**
584       * Sets the number of tasks that a spawned task JVM should run
585       * before it exits
586       * @param numTasks the number of tasks to execute; defaults to 1;
587       * -1 signifies no limit
588       */
589      public void setNumTasksToExecutePerJvm(int numTasks) {
590        setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
591      }
592      
593      /**
594       * Get the number of tasks that a spawned JVM should execute
595       */
596      public int getNumTasksToExecutePerJvm() {
597        return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
598      }
599      
600      /**
601       * Get the {@link InputFormat} implementation for the map-reduce job,
602       * defaults to {@link TextInputFormat} if not specified explicity.
603       * 
604       * @return the {@link InputFormat} implementation for the map-reduce job.
605       */
606      public InputFormat getInputFormat() {
607        return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
608                                                                 TextInputFormat.class,
609                                                                 InputFormat.class),
610                                                        this);
611      }
612      
613      /**
614       * Set the {@link InputFormat} implementation for the map-reduce job.
615       * 
616       * @param theClass the {@link InputFormat} implementation for the map-reduce 
617       *                 job.
618       */
619      public void setInputFormat(Class<? extends InputFormat> theClass) {
620        setClass("mapred.input.format.class", theClass, InputFormat.class);
621      }
622      
623      /**
624       * Get the {@link OutputFormat} implementation for the map-reduce job,
625       * defaults to {@link TextOutputFormat} if not specified explicity.
626       * 
627       * @return the {@link OutputFormat} implementation for the map-reduce job.
628       */
629      public OutputFormat getOutputFormat() {
630        return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
631                                                                  TextOutputFormat.class,
632                                                                  OutputFormat.class),
633                                                         this);
634      }
635    
636      /**
637       * Get the {@link OutputCommitter} implementation for the map-reduce job,
638       * defaults to {@link FileOutputCommitter} if not specified explicitly.
639       * 
640       * @return the {@link OutputCommitter} implementation for the map-reduce job.
641       */
642      public OutputCommitter getOutputCommitter() {
643        return (OutputCommitter)ReflectionUtils.newInstance(
644          getClass("mapred.output.committer.class", FileOutputCommitter.class,
645                   OutputCommitter.class), this);
646      }
647    
648      /**
649       * Set the {@link OutputCommitter} implementation for the map-reduce job.
650       * 
651       * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
652       *                 job.
653       */
654      public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
655        setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
656      }
657      
658      /**
659       * Set the {@link OutputFormat} implementation for the map-reduce job.
660       * 
661       * @param theClass the {@link OutputFormat} implementation for the map-reduce 
662       *                 job.
663       */
664      public void setOutputFormat(Class<? extends OutputFormat> theClass) {
665        setClass("mapred.output.format.class", theClass, OutputFormat.class);
666      }
667    
668      /**
669       * Should the map outputs be compressed before transfer?
670       * Uses the SequenceFile compression.
671       * 
672       * @param compress should the map outputs be compressed?
673       */
674      public void setCompressMapOutput(boolean compress) {
675        setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
676      }
677      
678      /**
679       * Are the outputs of the maps be compressed?
680       * 
681       * @return <code>true</code> if the outputs of the maps are to be compressed,
682       *         <code>false</code> otherwise.
683       */
684      public boolean getCompressMapOutput() {
685        return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
686      }
687    
688      /**
689       * Set the given class as the  {@link CompressionCodec} for the map outputs.
690       * 
691       * @param codecClass the {@link CompressionCodec} class that will compress  
692       *                   the map outputs.
693       */
694      public void 
695      setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
696        setCompressMapOutput(true);
697        setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 
698                 CompressionCodec.class);
699      }
700      
701      /**
702       * Get the {@link CompressionCodec} for compressing the map outputs.
703       * 
704       * @param defaultValue the {@link CompressionCodec} to return if not set
705       * @return the {@link CompressionCodec} class that should be used to compress the 
706       *         map outputs.
707       * @throws IllegalArgumentException if the class was specified, but not found
708       */
709      public Class<? extends CompressionCodec> 
710      getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
711        Class<? extends CompressionCodec> codecClass = defaultValue;
712        String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
713        if (name != null) {
714          try {
715            codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
716          } catch (ClassNotFoundException e) {
717            throw new IllegalArgumentException("Compression codec " + name + 
718                                               " was not found.", e);
719          }
720        }
721        return codecClass;
722      }
723      
724      /**
725       * Get the key class for the map output data. If it is not set, use the
726       * (final) output key class. This allows the map output key class to be
727       * different than the final output key class.
728       *  
729       * @return the map output key class.
730       */
731      public Class<?> getMapOutputKeyClass() {
732        Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
733        if (retv == null) {
734          retv = getOutputKeyClass();
735        }
736        return retv;
737      }
738      
739      /**
740       * Set the key class for the map output data. This allows the user to
741       * specify the map output key class to be different than the final output
742       * value class.
743       * 
744       * @param theClass the map output key class.
745       */
746      public void setMapOutputKeyClass(Class<?> theClass) {
747        setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
748      }
749      
750      /**
751       * Get the value class for the map output data. If it is not set, use the
752       * (final) output value class This allows the map output value class to be
753       * different than the final output value class.
754       *  
755       * @return the map output value class.
756       */
757      public Class<?> getMapOutputValueClass() {
758        Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
759            Object.class);
760        if (retv == null) {
761          retv = getOutputValueClass();
762        }
763        return retv;
764      }
765      
766      /**
767       * Set the value class for the map output data. This allows the user to
768       * specify the map output value class to be different than the final output
769       * value class.
770       * 
771       * @param theClass the map output value class.
772       */
773      public void setMapOutputValueClass(Class<?> theClass) {
774        setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
775      }
776      
777      /**
778       * Get the key class for the job output data.
779       * 
780       * @return the key class for the job output data.
781       */
782      public Class<?> getOutputKeyClass() {
783        return getClass(JobContext.OUTPUT_KEY_CLASS,
784                        LongWritable.class, Object.class);
785      }
786      
787      /**
788       * Set the key class for the job output data.
789       * 
790       * @param theClass the key class for the job output data.
791       */
792      public void setOutputKeyClass(Class<?> theClass) {
793        setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
794      }
795    
796      /**
797       * Get the {@link RawComparator} comparator used to compare keys.
798       * 
799       * @return the {@link RawComparator} comparator used to compare keys.
800       */
801      public RawComparator getOutputKeyComparator() {
802        Class<? extends RawComparator> theClass = getClass(
803          JobContext.KEY_COMPARATOR, null, RawComparator.class);
804        if (theClass != null)
805          return ReflectionUtils.newInstance(theClass, this);
806        return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
807      }
808    
809      /**
810       * Set the {@link RawComparator} comparator used to compare keys.
811       * 
812       * @param theClass the {@link RawComparator} comparator used to 
813       *                 compare keys.
814       * @see #setOutputValueGroupingComparator(Class)                 
815       */
816      public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
817        setClass(JobContext.KEY_COMPARATOR,
818                 theClass, RawComparator.class);
819      }
820    
821      /**
822       * Set the {@link KeyFieldBasedComparator} options used to compare keys.
823       * 
824       * @param keySpec the key specification of the form -k pos1[,pos2], where,
825       *  pos is of the form f[.c][opts], where f is the number
826       *  of the key field to use, and c is the number of the first character from
827       *  the beginning of the field. Fields and character posns are numbered 
828       *  starting with 1; a character position of zero in pos2 indicates the
829       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
830       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
831       *  (the end of the field). opts are ordering options. The supported options
832       *  are:
833       *    -n, (Sort numerically)
834       *    -r, (Reverse the result of comparison)                 
835       */
836      public void setKeyFieldComparatorOptions(String keySpec) {
837        setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
838        set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
839      }
840      
841      /**
842       * Get the {@link KeyFieldBasedComparator} options
843       */
844      public String getKeyFieldComparatorOption() {
845        return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
846      }
847    
848      /**
849       * Set the {@link KeyFieldBasedPartitioner} options used for 
850       * {@link Partitioner}
851       * 
852       * @param keySpec the key specification of the form -k pos1[,pos2], where,
853       *  pos is of the form f[.c][opts], where f is the number
854       *  of the key field to use, and c is the number of the first character from
855       *  the beginning of the field. Fields and character posns are numbered 
856       *  starting with 1; a character position of zero in pos2 indicates the
857       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
858       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
859       *  (the end of the field).
860       */
861      public void setKeyFieldPartitionerOptions(String keySpec) {
862        setPartitionerClass(KeyFieldBasedPartitioner.class);
863        set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
864      }
865      
866      /**
867       * Get the {@link KeyFieldBasedPartitioner} options
868       */
869      public String getKeyFieldPartitionerOption() {
870        return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
871      }
872    
873      /** 
874       * Get the user defined {@link WritableComparable} comparator for 
875       * grouping keys of inputs to the reduce.
876       * 
877       * @return comparator set by the user for grouping values.
878       * @see #setOutputValueGroupingComparator(Class) for details.  
879       */
880      public RawComparator getOutputValueGroupingComparator() {
881        Class<? extends RawComparator> theClass = getClass(
882          JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
883        if (theClass == null) {
884          return getOutputKeyComparator();
885        }
886        
887        return ReflectionUtils.newInstance(theClass, this);
888      }
889    
890      /** 
891       * Set the user defined {@link RawComparator} comparator for 
892       * grouping keys in the input to the reduce.
893       * 
894       * <p>This comparator should be provided if the equivalence rules for keys
895       * for sorting the intermediates are different from those for grouping keys
896       * before each call to 
897       * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
898       *  
899       * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
900       * in a single call to the reduce function if K1 and K2 compare as equal.</p>
901       * 
902       * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
903       * how keys are sorted, this can be used in conjunction to simulate 
904       * <i>secondary sort on values</i>.</p>
905       *  
906       * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
907       * <i>stable</i> in any sense. (In any case, with the order of available 
908       * map-outputs to the reduce being non-deterministic, it wouldn't make 
909       * that much sense.)</p>
910       * 
911       * @param theClass the comparator class to be used for grouping keys. 
912       *                 It should implement <code>RawComparator</code>.
913       * @see #setOutputKeyComparatorClass(Class)                 
914       */
915      public void setOutputValueGroupingComparator(
916          Class<? extends RawComparator> theClass) {
917        setClass(JobContext.GROUP_COMPARATOR_CLASS,
918                 theClass, RawComparator.class);
919      }
920    
921      /**
922       * Should the framework use the new context-object code for running
923       * the mapper?
924       * @return true, if the new api should be used
925       */
926      public boolean getUseNewMapper() {
927        return getBoolean("mapred.mapper.new-api", false);
928      }
929      /**
930       * Set whether the framework should use the new api for the mapper.
931       * This is the default for jobs submitted with the new Job api.
932       * @param flag true, if the new api should be used
933       */
934      public void setUseNewMapper(boolean flag) {
935        setBoolean("mapred.mapper.new-api", flag);
936      }
937    
938      /**
939       * Should the framework use the new context-object code for running
940       * the reducer?
941       * @return true, if the new api should be used
942       */
943      public boolean getUseNewReducer() {
944        return getBoolean("mapred.reducer.new-api", false);
945      }
946      /**
947       * Set whether the framework should use the new api for the reducer. 
948       * This is the default for jobs submitted with the new Job api.
949       * @param flag true, if the new api should be used
950       */
951      public void setUseNewReducer(boolean flag) {
952        setBoolean("mapred.reducer.new-api", flag);
953      }
954    
955      /**
956       * Get the value class for job outputs.
957       * 
958       * @return the value class for job outputs.
959       */
960      public Class<?> getOutputValueClass() {
961        return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
962      }
963      
964      /**
965       * Set the value class for job outputs.
966       * 
967       * @param theClass the value class for job outputs.
968       */
969      public void setOutputValueClass(Class<?> theClass) {
970        setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
971      }
972    
973      /**
974       * Get the {@link Mapper} class for the job.
975       * 
976       * @return the {@link Mapper} class for the job.
977       */
978      public Class<? extends Mapper> getMapperClass() {
979        return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
980      }
981      
982      /**
983       * Set the {@link Mapper} class for the job.
984       * 
985       * @param theClass the {@link Mapper} class for the job.
986       */
987      public void setMapperClass(Class<? extends Mapper> theClass) {
988        setClass("mapred.mapper.class", theClass, Mapper.class);
989      }
990    
991      /**
992       * Get the {@link MapRunnable} class for the job.
993       * 
994       * @return the {@link MapRunnable} class for the job.
995       */
996      public Class<? extends MapRunnable> getMapRunnerClass() {
997        return getClass("mapred.map.runner.class",
998                        MapRunner.class, MapRunnable.class);
999      }
1000      
1001      /**
1002       * Expert: Set the {@link MapRunnable} class for the job.
1003       * 
1004       * Typically used to exert greater control on {@link Mapper}s.
1005       * 
1006       * @param theClass the {@link MapRunnable} class for the job.
1007       */
1008      public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
1009        setClass("mapred.map.runner.class", theClass, MapRunnable.class);
1010      }
1011    
1012      /**
1013       * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
1014       * to be sent to the {@link Reducer}s.
1015       * 
1016       * @return the {@link Partitioner} used to partition map-outputs.
1017       */
1018      public Class<? extends Partitioner> getPartitionerClass() {
1019        return getClass("mapred.partitioner.class",
1020                        HashPartitioner.class, Partitioner.class);
1021      }
1022      
1023      /**
1024       * Set the {@link Partitioner} class used to partition 
1025       * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
1026       * 
1027       * @param theClass the {@link Partitioner} used to partition map-outputs.
1028       */
1029      public void setPartitionerClass(Class<? extends Partitioner> theClass) {
1030        setClass("mapred.partitioner.class", theClass, Partitioner.class);
1031      }
1032    
1033      /**
1034       * Get the {@link Reducer} class for the job.
1035       * 
1036       * @return the {@link Reducer} class for the job.
1037       */
1038      public Class<? extends Reducer> getReducerClass() {
1039        return getClass("mapred.reducer.class",
1040                        IdentityReducer.class, Reducer.class);
1041      }
1042      
1043      /**
1044       * Set the {@link Reducer} class for the job.
1045       * 
1046       * @param theClass the {@link Reducer} class for the job.
1047       */
1048      public void setReducerClass(Class<? extends Reducer> theClass) {
1049        setClass("mapred.reducer.class", theClass, Reducer.class);
1050      }
1051    
1052      /**
1053       * Get the user-defined <i>combiner</i> class used to combine map-outputs 
1054       * before being sent to the reducers. Typically the combiner is same as the
1055       * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
1056       * 
1057       * @return the user-defined combiner class used to combine map-outputs.
1058       */
1059      public Class<? extends Reducer> getCombinerClass() {
1060        return getClass("mapred.combiner.class", null, Reducer.class);
1061      }
1062    
1063      /**
1064       * Set the user-defined <i>combiner</i> class used to combine map-outputs 
1065       * before being sent to the reducers. 
1066       * 
1067       * <p>The combiner is an application-specified aggregation operation, which
1068       * can help cut down the amount of data transferred between the 
1069       * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
1070       * 
1071       * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
1072       * the mapper and reducer tasks. In general, the combiner is called as the
1073       * sort/merge result is written to disk. The combiner must:
1074       * <ul>
1075       *   <li> be side-effect free</li>
1076       *   <li> have the same input and output key types and the same input and 
1077       *        output value types</li>
1078       * </ul></p>
1079       * 
1080       * <p>Typically the combiner is same as the <code>Reducer</code> for the  
1081       * job i.e. {@link #setReducerClass(Class)}.</p>
1082       * 
1083       * @param theClass the user-defined combiner class used to combine 
1084       *                 map-outputs.
1085       */
1086      public void setCombinerClass(Class<? extends Reducer> theClass) {
1087        setClass("mapred.combiner.class", theClass, Reducer.class);
1088      }
1089      
1090      /**
1091       * Should speculative execution be used for this job? 
1092       * Defaults to <code>true</code>.
1093       * 
1094       * @return <code>true</code> if speculative execution be used for this job,
1095       *         <code>false</code> otherwise.
1096       */
1097      public boolean getSpeculativeExecution() { 
1098        return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
1099      }
1100      
1101      /**
1102       * Turn speculative execution on or off for this job. 
1103       * 
1104       * @param speculativeExecution <code>true</code> if speculative execution 
1105       *                             should be turned on, else <code>false</code>.
1106       */
1107      public void setSpeculativeExecution(boolean speculativeExecution) {
1108        setMapSpeculativeExecution(speculativeExecution);
1109        setReduceSpeculativeExecution(speculativeExecution);
1110      }
1111    
1112      /**
1113       * Should speculative execution be used for this job for map tasks? 
1114       * Defaults to <code>true</code>.
1115       * 
1116       * @return <code>true</code> if speculative execution be 
1117       *                           used for this job for map tasks,
1118       *         <code>false</code> otherwise.
1119       */
1120      public boolean getMapSpeculativeExecution() { 
1121        return getBoolean(JobContext.MAP_SPECULATIVE, true);
1122      }
1123      
1124      /**
1125       * Turn speculative execution on or off for this job for map tasks. 
1126       * 
1127       * @param speculativeExecution <code>true</code> if speculative execution 
1128       *                             should be turned on for map tasks,
1129       *                             else <code>false</code>.
1130       */
1131      public void setMapSpeculativeExecution(boolean speculativeExecution) {
1132        setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
1133      }
1134    
1135      /**
1136       * Should speculative execution be used for this job for reduce tasks? 
1137       * Defaults to <code>true</code>.
1138       * 
1139       * @return <code>true</code> if speculative execution be used 
1140       *                           for reduce tasks for this job,
1141       *         <code>false</code> otherwise.
1142       */
1143      public boolean getReduceSpeculativeExecution() { 
1144        return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
1145      }
1146      
1147      /**
1148       * Turn speculative execution on or off for this job for reduce tasks. 
1149       * 
1150       * @param speculativeExecution <code>true</code> if speculative execution 
1151       *                             should be turned on for reduce tasks,
1152       *                             else <code>false</code>.
1153       */
1154      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
1155        setBoolean(JobContext.REDUCE_SPECULATIVE, 
1156                   speculativeExecution);
1157      }
1158    
1159      /**
1160       * Get configured the number of reduce tasks for this job.
1161       * Defaults to <code>1</code>.
1162       * 
1163       * @return the number of reduce tasks for this job.
1164       */
1165      public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
1166      
1167      /**
1168       * Set the number of map tasks for this job.
1169       * 
1170       * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
1171       * number of spawned map tasks depends on the number of {@link InputSplit}s 
1172       * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
1173       *  
1174       * A custom {@link InputFormat} is typically used to accurately control 
1175       * the number of map tasks for the job.</p>
1176       * 
1177       * <h4 id="NoOfMaps">How many maps?</h4>
1178       * 
1179       * <p>The number of maps is usually driven by the total size of the inputs 
1180       * i.e. total number of blocks of the input files.</p>
1181       *  
1182       * <p>The right level of parallelism for maps seems to be around 10-100 maps 
1183       * per-node, although it has been set up to 300 or so for very cpu-light map 
1184       * tasks. Task setup takes awhile, so it is best if the maps take at least a 
1185       * minute to execute.</p>
1186       * 
1187       * <p>The default behavior of file-based {@link InputFormat}s is to split the 
1188       * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
1189       * bytes, of input files. However, the {@link FileSystem} blocksize of the 
1190       * input files is treated as an upper bound for input splits. A lower bound 
1191       * on the split size can be set via 
1192       * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
1193       * mapreduce.input.fileinputformat.split.minsize</a>.</p>
1194       *  
1195       * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
1196       * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
1197       * used to set it even higher.</p>
1198       * 
1199       * @param n the number of map tasks for this job.
1200       * @see InputFormat#getSplits(JobConf, int)
1201       * @see FileInputFormat
1202       * @see FileSystem#getDefaultBlockSize()
1203       * @see FileStatus#getBlockSize()
1204       */
1205      public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
1206    
1207      /**
1208       * Get configured the number of reduce tasks for this job. Defaults to 
1209       * <code>1</code>.
1210       * 
1211       * @return the number of reduce tasks for this job.
1212       */
1213      public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
1214      
1215      /**
1216       * Set the requisite number of reduce tasks for this job.
1217       * 
1218       * <h4 id="NoOfReduces">How many reduces?</h4>
1219       * 
1220       * <p>The right number of reduces seems to be <code>0.95</code> or 
1221       * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
1222       * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
1223       * mapreduce.tasktracker.reduce.tasks.maximum</a>).
1224       * </p>
1225       * 
1226       * <p>With <code>0.95</code> all of the reduces can launch immediately and 
1227       * start transfering map outputs as the maps finish. With <code>1.75</code> 
1228       * the faster nodes will finish their first round of reduces and launch a 
1229       * second wave of reduces doing a much better job of load balancing.</p>
1230       * 
1231       * <p>Increasing the number of reduces increases the framework overhead, but 
1232       * increases load balancing and lowers the cost of failures.</p>
1233       * 
1234       * <p>The scaling factors above are slightly less than whole numbers to 
1235       * reserve a few reduce slots in the framework for speculative-tasks, failures
1236       * etc.</p> 
1237       *
1238       * <h4 id="ReducerNone">Reducer NONE</h4>
1239       * 
1240       * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
1241       * 
1242       * <p>In this case the output of the map-tasks directly go to distributed 
1243       * file-system, to the path set by 
1244       * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
1245       * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
1246       * 
1247       * @param n the number of reduce tasks for this job.
1248       */
1249      public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
1250      
1251      /** 
1252       * Get the configured number of maximum attempts that will be made to run a
1253       * map task, as specified by the <code>mapreduce.map.maxattempts</code>
1254       * property. If this property is not already set, the default is 4 attempts.
1255       *  
1256       * @return the max number of attempts per map task.
1257       */
1258      public int getMaxMapAttempts() {
1259        return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
1260      }
1261      
1262      /** 
1263       * Expert: Set the number of maximum attempts that will be made to run a
1264       * map task.
1265       * 
1266       * @param n the number of attempts per map task.
1267       */
1268      public void setMaxMapAttempts(int n) {
1269        setInt(JobContext.MAP_MAX_ATTEMPTS, n);
1270      }
1271    
1272      /** 
1273       * Get the configured number of maximum attempts  that will be made to run a
1274       * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
1275       * property. If this property is not already set, the default is 4 attempts.
1276       * 
1277       * @return the max number of attempts per reduce task.
1278       */
1279      public int getMaxReduceAttempts() {
1280        return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
1281      }
1282      /** 
1283       * Expert: Set the number of maximum attempts that will be made to run a
1284       * reduce task.
1285       * 
1286       * @param n the number of attempts per reduce task.
1287       */
1288      public void setMaxReduceAttempts(int n) {
1289        setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
1290      }
1291      
1292      /**
1293       * Get the user-specified job name. This is only used to identify the 
1294       * job to the user.
1295       * 
1296       * @return the job's name, defaulting to "".
1297       */
1298      public String getJobName() {
1299        return get(JobContext.JOB_NAME, "");
1300      }
1301      
1302      /**
1303       * Set the user-specified job name.
1304       * 
1305       * @param name the job's new name.
1306       */
1307      public void setJobName(String name) {
1308        set(JobContext.JOB_NAME, name);
1309      }
1310      
1311      /**
1312       * Get the user-specified session identifier. The default is the empty string.
1313       *
1314       * The session identifier is used to tag metric data that is reported to some
1315       * performance metrics system via the org.apache.hadoop.metrics API.  The 
1316       * session identifier is intended, in particular, for use by Hadoop-On-Demand 
1317       * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
1318       * HOD will set the session identifier by modifying the mapred-site.xml file 
1319       * before starting the cluster.
1320       *
1321       * When not running under HOD, this identifer is expected to remain set to 
1322       * the empty string.
1323       *
1324       * @return the session identifier, defaulting to "".
1325       */
1326      @Deprecated
1327      public String getSessionId() {
1328          return get("session.id", "");
1329      }
1330      
1331      /**
1332       * Set the user-specified session identifier.  
1333       *
1334       * @param sessionId the new session id.
1335       */
1336      @Deprecated
1337      public void setSessionId(String sessionId) {
1338          set("session.id", sessionId);
1339      }
1340        
1341      /**
1342       * Set the maximum no. of failures of a given job per tasktracker.
1343       * If the no. of task failures exceeds <code>noFailures</code>, the 
1344       * tasktracker is <i>blacklisted</i> for this job. 
1345       * 
1346       * @param noFailures maximum no. of failures of a given job per tasktracker.
1347       */
1348      public void setMaxTaskFailuresPerTracker(int noFailures) {
1349        setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
1350      }
1351      
1352      /**
1353       * Expert: Get the maximum no. of failures of a given job per tasktracker.
1354       * If the no. of task failures exceeds this, the tasktracker is
1355       * <i>blacklisted</i> for this job. 
1356       * 
1357       * @return the maximum no. of failures of a given job per tasktracker.
1358       */
1359      public int getMaxTaskFailuresPerTracker() {
1360        return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4); 
1361      }
1362    
1363      /**
1364       * Get the maximum percentage of map tasks that can fail without 
1365       * the job being aborted. 
1366       * 
1367       * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
1368       * attempts before being declared as <i>failed</i>.
1369       *  
1370       * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
1371       * the job being declared as {@link JobStatus#FAILED}.
1372       * 
1373       * @return the maximum percentage of map tasks that can fail without
1374       *         the job being aborted.
1375       */
1376      public int getMaxMapTaskFailuresPercent() {
1377        return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
1378      }
1379    
1380      /**
1381       * Expert: Set the maximum percentage of map tasks that can fail without the
1382       * job being aborted. 
1383       * 
1384       * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
1385       * before being declared as <i>failed</i>.
1386       * 
1387       * @param percent the maximum percentage of map tasks that can fail without 
1388       *                the job being aborted.
1389       */
1390      public void setMaxMapTaskFailuresPercent(int percent) {
1391        setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
1392      }
1393      
1394      /**
1395       * Get the maximum percentage of reduce tasks that can fail without 
1396       * the job being aborted. 
1397       * 
1398       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1399       * attempts before being declared as <i>failed</i>.
1400       * 
1401       * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
1402       * in the job being declared as {@link JobStatus#FAILED}.
1403       * 
1404       * @return the maximum percentage of reduce tasks that can fail without
1405       *         the job being aborted.
1406       */
1407      public int getMaxReduceTaskFailuresPercent() {
1408        return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
1409      }
1410      
1411      /**
1412       * Set the maximum percentage of reduce tasks that can fail without the job
1413       * being aborted.
1414       * 
1415       * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
1416       * attempts before being declared as <i>failed</i>.
1417       * 
1418       * @param percent the maximum percentage of reduce tasks that can fail without 
1419       *                the job being aborted.
1420       */
1421      public void setMaxReduceTaskFailuresPercent(int percent) {
1422        setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
1423      }
1424      
1425      /**
1426       * Set {@link JobPriority} for this job.
1427       * 
1428       * @param prio the {@link JobPriority} for this job.
1429       */
1430      public void setJobPriority(JobPriority prio) {
1431        set(JobContext.PRIORITY, prio.toString());
1432      }
1433      
1434      /**
1435       * Get the {@link JobPriority} for this job.
1436       * 
1437       * @return the {@link JobPriority} for this job.
1438       */
1439      public JobPriority getJobPriority() {
1440        String prio = get(JobContext.PRIORITY);
1441        if(prio == null) {
1442          return JobPriority.NORMAL;
1443        }
1444        
1445        return JobPriority.valueOf(prio);
1446      }
1447    
1448      /**
1449       * Set JobSubmitHostName for this job.
1450       * 
1451       * @param hostname the JobSubmitHostName for this job.
1452       */
1453      void setJobSubmitHostName(String hostname) {
1454        set(MRJobConfig.JOB_SUBMITHOST, hostname);
1455      }
1456      
1457      /**
1458       * Get the  JobSubmitHostName for this job.
1459       * 
1460       * @return the JobSubmitHostName for this job.
1461       */
1462      String getJobSubmitHostName() {
1463        String hostname = get(MRJobConfig.JOB_SUBMITHOST);
1464        
1465        return hostname;
1466      }
1467    
1468      /**
1469       * Set JobSubmitHostAddress for this job.
1470       * 
1471       * @param hostadd the JobSubmitHostAddress for this job.
1472       */
1473      void setJobSubmitHostAddress(String hostadd) {
1474        set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd);
1475      }
1476      
1477      /**
1478       * Get JobSubmitHostAddress for this job.
1479       * 
1480       * @return  JobSubmitHostAddress for this job.
1481       */
1482      String getJobSubmitHostAddress() {
1483        String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR);
1484        
1485        return hostadd;
1486      }
1487    
1488      /**
1489       * Get whether the task profiling is enabled.
1490       * @return true if some tasks will be profiled
1491       */
1492      public boolean getProfileEnabled() {
1493        return getBoolean(JobContext.TASK_PROFILE, false);
1494      }
1495    
1496      /**
1497       * Set whether the system should collect profiler information for some of 
1498       * the tasks in this job? The information is stored in the user log 
1499       * directory.
1500       * @param newValue true means it should be gathered
1501       */
1502      public void setProfileEnabled(boolean newValue) {
1503        setBoolean(JobContext.TASK_PROFILE, newValue);
1504      }
1505    
1506      /**
1507       * Get the profiler configuration arguments.
1508       *
1509       * The default value for this property is
1510       * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
1511       * 
1512       * @return the parameters to pass to the task child to configure profiling
1513       */
1514      public String getProfileParams() {
1515        return get(JobContext.TASK_PROFILE_PARAMS,
1516                   "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
1517                     "verbose=n,file=%s");
1518      }
1519    
1520      /**
1521       * Set the profiler configuration arguments. If the string contains a '%s' it
1522       * will be replaced with the name of the profiling output file when the task
1523       * runs.
1524       *
1525       * This value is passed to the task child JVM on the command line.
1526       *
1527       * @param value the configuration string
1528       */
1529      public void setProfileParams(String value) {
1530        set(JobContext.TASK_PROFILE_PARAMS, value);
1531      }
1532    
1533      /**
1534       * Get the range of maps or reduces to profile.
1535       * @param isMap is the task a map?
1536       * @return the task ranges
1537       */
1538      public IntegerRanges getProfileTaskRange(boolean isMap) {
1539        return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 
1540                           JobContext.NUM_REDUCE_PROFILES), "0-2");
1541      }
1542    
1543      /**
1544       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
1545       * must also be called.
1546       * @param newValue a set of integer ranges of the map ids
1547       */
1548      public void setProfileTaskRange(boolean isMap, String newValue) {
1549        // parse the value to make sure it is legal
1550          new Configuration.IntegerRanges(newValue);
1551        set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 
1552              newValue);
1553      }
1554    
1555      /**
1556       * Set the debug script to run when the map tasks fail.
1557       * 
1558       * <p>The debug script can aid debugging of failed map tasks. The script is 
1559       * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1560       * 
1561       * <p>The debug command, run on the node where the map failed, is:</p>
1562       * <p><pre><blockquote> 
1563       * $script $stdout $stderr $syslog $jobconf.
1564       * </blockquote></pre></p>
1565       * 
1566       * <p> The script file is distributed through {@link DistributedCache} 
1567       * APIs. The script needs to be symlinked. </p>
1568       * 
1569       * <p>Here is an example on how to submit a script 
1570       * <p><blockquote><pre>
1571       * job.setMapDebugScript("./myscript");
1572       * DistributedCache.createSymlink(job);
1573       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1574       * </pre></blockquote></p>
1575       * 
1576       * @param mDbgScript the script name
1577       */
1578      public void  setMapDebugScript(String mDbgScript) {
1579        set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
1580      }
1581      
1582      /**
1583       * Get the map task's debug script.
1584       * 
1585       * @return the debug Script for the mapred job for failed map tasks.
1586       * @see #setMapDebugScript(String)
1587       */
1588      public String getMapDebugScript() {
1589        return get(JobContext.MAP_DEBUG_SCRIPT);
1590      }
1591      
1592      /**
1593       * Set the debug script to run when the reduce tasks fail.
1594       * 
1595       * <p>The debug script can aid debugging of failed reduce tasks. The script
1596       * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
1597       * 
1598       * <p>The debug command, run on the node where the map failed, is:</p>
1599       * <p><pre><blockquote> 
1600       * $script $stdout $stderr $syslog $jobconf.
1601       * </blockquote></pre></p>
1602       * 
1603       * <p> The script file is distributed through {@link DistributedCache} 
1604       * APIs. The script file needs to be symlinked </p>
1605       * 
1606       * <p>Here is an example on how to submit a script 
1607       * <p><blockquote><pre>
1608       * job.setReduceDebugScript("./myscript");
1609       * DistributedCache.createSymlink(job);
1610       * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
1611       * </pre></blockquote></p>
1612       * 
1613       * @param rDbgScript the script name
1614       */
1615      public void  setReduceDebugScript(String rDbgScript) {
1616        set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
1617      }
1618      
1619      /**
1620       * Get the reduce task's debug Script
1621       * 
1622       * @return the debug script for the mapred job for failed reduce tasks.
1623       * @see #setReduceDebugScript(String)
1624       */
1625      public String getReduceDebugScript() {
1626        return get(JobContext.REDUCE_DEBUG_SCRIPT);
1627      }
1628    
1629      /**
1630       * Get the uri to be invoked in-order to send a notification after the job 
1631       * has completed (success/failure). 
1632       * 
1633       * @return the job end notification uri, <code>null</code> if it hasn't
1634       *         been set.
1635       * @see #setJobEndNotificationURI(String)
1636       */
1637      public String getJobEndNotificationURI() {
1638        return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
1639      }
1640    
1641      /**
1642       * Set the uri to be invoked in-order to send a notification after the job
1643       * has completed (success/failure).
1644       * 
1645       * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
1646       * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
1647       * identifier and completion-status respectively.</p>
1648       * 
1649       * <p>This is typically used by application-writers to implement chaining of 
1650       * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
1651       * 
1652       * @param uri the job end notification uri
1653       * @see JobStatus
1654       * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
1655       *       JobCompletionAndChaining">Job Completion and Chaining</a>
1656       */
1657      public void setJobEndNotificationURI(String uri) {
1658        set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
1659      }
1660    
1661      /**
1662       * Get job-specific shared directory for use as scratch space
1663       * 
1664       * <p>
1665       * When a job starts, a shared directory is created at location
1666       * <code>
1667       * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
1668       * This directory is exposed to the users through 
1669       * <code>mapreduce.job.local.dir </code>.
1670       * So, the tasks can use this space 
1671       * as scratch space and share files among them. </p>
1672       * This value is available as System property also.
1673       * 
1674       * @return The localized job specific shared directory
1675       */
1676      public String getJobLocalDir() {
1677        return get(JobContext.JOB_LOCAL_DIR);
1678      }
1679    
1680      /**
1681       * Get memory required to run a map task of the job, in MB.
1682       * 
1683       * If a value is specified in the configuration, it is returned.
1684       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1685       * <p/>
1686       * For backward compatibility, if the job configuration sets the
1687       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1688       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1689       * after converting it from bytes to MB.
1690       * @return memory required to run a map task of the job, in MB,
1691       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1692       */
1693      public long getMemoryForMapTask() {
1694        long value = getDeprecatedMemoryValue();
1695        if (value == DISABLED_MEMORY_LIMIT) {
1696          value = normalizeMemoryConfigValue(
1697                    getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
1698                              DISABLED_MEMORY_LIMIT));
1699        }
1700        return value;
1701      }
1702    
1703      public void setMemoryForMapTask(long mem) {
1704        setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
1705      }
1706    
1707      /**
1708       * Get memory required to run a reduce task of the job, in MB.
1709       * 
1710       * If a value is specified in the configuration, it is returned.
1711       * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
1712       * <p/>
1713       * For backward compatibility, if the job configuration sets the
1714       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1715       * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
1716       * after converting it from bytes to MB.
1717       * @return memory required to run a reduce task of the job, in MB,
1718       *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
1719       */
1720      public long getMemoryForReduceTask() {
1721        long value = getDeprecatedMemoryValue();
1722        if (value == DISABLED_MEMORY_LIMIT) {
1723          value = normalizeMemoryConfigValue(
1724                    getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
1725                            DISABLED_MEMORY_LIMIT));
1726        }
1727        return value;
1728      }
1729      
1730      // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
1731      // converted into MBs.
1732      // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
1733      // value.
1734      private long getDeprecatedMemoryValue() {
1735        long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
1736            DISABLED_MEMORY_LIMIT);
1737        oldValue = normalizeMemoryConfigValue(oldValue);
1738        if (oldValue != DISABLED_MEMORY_LIMIT) {
1739          oldValue /= (1024*1024);
1740        }
1741        return oldValue;
1742      }
1743    
1744      public void setMemoryForReduceTask(long mem) {
1745        setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
1746      }
1747    
1748      /**
1749       * Return the name of the queue to which this job is submitted.
1750       * Defaults to 'default'.
1751       * 
1752       * @return name of the queue
1753       */
1754      public String getQueueName() {
1755        return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
1756      }
1757      
1758      /**
1759       * Set the name of the queue to which this job should be submitted.
1760       * 
1761       * @param queueName Name of the queue
1762       */
1763      public void setQueueName(String queueName) {
1764        set(JobContext.QUEUE_NAME, queueName);
1765      }
1766      
1767      /**
1768       * Normalize the negative values in configuration
1769       * 
1770       * @param val
1771       * @return normalized value
1772       */
1773      public static long normalizeMemoryConfigValue(long val) {
1774        if (val < 0) {
1775          val = DISABLED_MEMORY_LIMIT;
1776        }
1777        return val;
1778      }
1779    
1780      /**
1781       * Compute the number of slots required to run a single map task-attempt
1782       * of this job.
1783       * @param slotSizePerMap cluster-wide value of the amount of memory required
1784       *                       to run a map-task
1785       * @return the number of slots required to run a single map task-attempt
1786       *          1 if memory parameters are disabled.
1787       */
1788      int computeNumSlotsPerMap(long slotSizePerMap) {
1789        if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
1790            (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
1791          return 1;
1792        }
1793        return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
1794      }
1795      
1796      /**
1797       * Compute the number of slots required to run a single reduce task-attempt
1798       * of this job.
1799       * @param slotSizePerReduce cluster-wide value of the amount of memory 
1800       *                          required to run a reduce-task
1801       * @return the number of slots required to run a single reduce task-attempt
1802       *          1 if memory parameters are disabled
1803       */
1804      int computeNumSlotsPerReduce(long slotSizePerReduce) {
1805        if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
1806            (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
1807          return 1;
1808        }
1809        return 
1810        (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
1811      }
1812      
1813      /** 
1814       * Find a jar that contains a class of the same name, if any.
1815       * It will return a jar file, even if that is not the first thing
1816       * on the class path that has a class with the same name.
1817       * 
1818       * @param my_class the class to find.
1819       * @return a jar file that contains the class, or null.
1820       * @throws IOException
1821       */
1822      public static String findContainingJar(Class my_class) {
1823        ClassLoader loader = my_class.getClassLoader();
1824        String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
1825        try {
1826          for(Enumeration itr = loader.getResources(class_file);
1827              itr.hasMoreElements();) {
1828            URL url = (URL) itr.nextElement();
1829            if ("jar".equals(url.getProtocol())) {
1830              String toReturn = url.getPath();
1831              if (toReturn.startsWith("file:")) {
1832                toReturn = toReturn.substring("file:".length());
1833              }
1834              // URLDecoder is a misnamed class, since it actually decodes
1835              // x-www-form-urlencoded MIME type rather than actual
1836              // URL encoding (which the file path has). Therefore it would
1837              // decode +s to ' 's which is incorrect (spaces are actually
1838              // either unencoded or encoded as "%20"). Replace +s first, so
1839              // that they are kept sacred during the decoding process.
1840              toReturn = toReturn.replaceAll("\\+", "%2B");
1841              toReturn = URLDecoder.decode(toReturn, "UTF-8");
1842              return toReturn.replaceAll("!.*$", "");
1843            }
1844          }
1845        } catch (IOException e) {
1846          throw new RuntimeException(e);
1847        }
1848        return null;
1849      }
1850    
1851    
1852      /**
1853       * Get the memory required to run a task of this job, in bytes. See
1854       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1855       * <p/>
1856       * This method is deprecated. Now, different memory limits can be
1857       * set for map and reduce tasks of a job, in MB. 
1858       * <p/>
1859       * For backward compatibility, if the job configuration sets the
1860       * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
1861       * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 
1862       * Otherwise, this method will return the larger of the values returned by 
1863       * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
1864       * after converting them into bytes.
1865       *
1866       * @return Memory required to run a task of this job, in bytes,
1867       *          or {@link #DISABLED_MEMORY_LIMIT}, if unset.
1868       * @see #setMaxVirtualMemoryForTask(long)
1869       * @deprecated Use {@link #getMemoryForMapTask()} and
1870       *             {@link #getMemoryForReduceTask()}
1871       */
1872      @Deprecated
1873      public long getMaxVirtualMemoryForTask() {
1874        LOG.warn(
1875          "getMaxVirtualMemoryForTask() is deprecated. " +
1876          "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
1877    
1878        long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
1879        value = normalizeMemoryConfigValue(value);
1880        if (value == DISABLED_MEMORY_LIMIT) {
1881          value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
1882          value = normalizeMemoryConfigValue(value);
1883          if (value != DISABLED_MEMORY_LIMIT) {
1884            value *= 1024*1024;
1885          }
1886        }
1887        return value;
1888      }
1889    
1890      /**
1891       * Set the maximum amount of memory any task of this job can use. See
1892       * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
1893       * <p/>
1894       * mapred.task.maxvmem is split into
1895       * mapreduce.map.memory.mb
1896       * and mapreduce.map.memory.mb,mapred
1897       * each of the new key are set
1898       * as mapred.task.maxvmem / 1024
1899       * as new values are in MB
1900       *
1901       * @param vmem Maximum amount of virtual memory in bytes any task of this job
1902       *             can use.
1903       * @see #getMaxVirtualMemoryForTask()
1904       * @deprecated
1905       *  Use {@link #setMemoryForMapTask(long mem)}  and
1906       *  Use {@link #setMemoryForReduceTask(long mem)}
1907       */
1908      @Deprecated
1909      public void setMaxVirtualMemoryForTask(long vmem) {
1910        LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
1911          "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
1912        if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
1913          setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
1914          setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
1915        }
1916    
1917        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
1918          setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
1919          setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
1920        }else{
1921          this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
1922        }
1923      }
1924    
1925      /**
1926       * @deprecated this variable is deprecated and nolonger in use.
1927       */
1928      @Deprecated
1929      public long getMaxPhysicalMemoryForTask() {
1930        LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
1931                  + " Refer to the APIs getMemoryForMapTask() and"
1932                  + " getMemoryForReduceTask() for details.");
1933        return -1;
1934      }
1935    
1936      /*
1937       * @deprecated this
1938       */
1939      @Deprecated
1940      public void setMaxPhysicalMemoryForTask(long mem) {
1941        LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
1942            + " The value set is ignored. Refer to "
1943            + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
1944      }
1945    
1946      static String deprecatedString(String key) {
1947        return "The variable " + key + " is no longer used.";
1948      }
1949    
1950      private void checkAndWarnDeprecation() {
1951        if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
1952          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
1953                    + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
1954                    + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
1955        }
1956        if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) {
1957          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT));
1958        }
1959        if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) {
1960          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT));
1961        }
1962        if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) {
1963          LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
1964        }
1965      }
1966      
1967    
1968    }
1969