001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 022 import java.io.IOException; 023 import java.net.URL; 024 import java.net.URLDecoder; 025 import java.util.Enumeration; 026 import java.util.regex.Pattern; 027 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceStability; 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FileStatus; 034 import org.apache.hadoop.fs.FileSystem; 035 import org.apache.hadoop.fs.Path; 036 import org.apache.hadoop.io.LongWritable; 037 import org.apache.hadoop.io.RawComparator; 038 import org.apache.hadoop.io.Text; 039 import org.apache.hadoop.io.WritableComparable; 040 import org.apache.hadoop.io.WritableComparator; 041 import org.apache.hadoop.io.compress.CompressionCodec; 042 import org.apache.hadoop.mapred.lib.HashPartitioner; 043 import org.apache.hadoop.mapred.lib.IdentityMapper; 044 import org.apache.hadoop.mapred.lib.IdentityReducer; 045 import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator; 046 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner; 047 import org.apache.hadoop.mapreduce.MRConfig; 048 import org.apache.hadoop.mapreduce.MRJobConfig; 049 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 050 import org.apache.hadoop.mapreduce.util.ConfigUtil; 051 import org.apache.hadoop.security.Credentials; 052 import org.apache.hadoop.util.ReflectionUtils; 053 import org.apache.hadoop.util.Tool; 054 import org.apache.log4j.Level; 055 056 /** 057 * A map/reduce job configuration. 058 * 059 * <p><code>JobConf</code> is the primary interface for a user to describe a 060 * map-reduce job to the Hadoop framework for execution. The framework tries to 061 * faithfully execute the job as-is described by <code>JobConf</code>, however: 062 * <ol> 063 * <li> 064 * Some configuration parameters might have been marked as 065 * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams"> 066 * final</a> by administrators and hence cannot be altered. 067 * </li> 068 * <li> 069 * While some job parameters are straight-forward to set 070 * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 071 * rest of the framework and/or job-configuration and is relatively more 072 * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}). 073 * </li> 074 * </ol></p> 075 * 076 * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 077 * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 078 * {@link OutputFormat} implementations to be used etc. 079 * 080 * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 081 * of the job such as <code>Comparator</code>s to be used, files to be put in 082 * the {@link DistributedCache}, whether or not intermediate and/or job outputs 083 * are to be compressed (and how), debugability via user-provided scripts 084 * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}), 085 * for doing post-processing on task logs, task's stdout, stderr, syslog. 086 * and etc.</p> 087 * 088 * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p> 089 * <p><blockquote><pre> 090 * // Create a new JobConf 091 * JobConf job = new JobConf(new Configuration(), MyJob.class); 092 * 093 * // Specify various job-specific parameters 094 * job.setJobName("myjob"); 095 * 096 * FileInputFormat.setInputPaths(job, new Path("in")); 097 * FileOutputFormat.setOutputPath(job, new Path("out")); 098 * 099 * job.setMapperClass(MyJob.MyMapper.class); 100 * job.setCombinerClass(MyJob.MyReducer.class); 101 * job.setReducerClass(MyJob.MyReducer.class); 102 * 103 * job.setInputFormat(SequenceFileInputFormat.class); 104 * job.setOutputFormat(SequenceFileOutputFormat.class); 105 * </pre></blockquote></p> 106 * 107 * @see JobClient 108 * @see ClusterStatus 109 * @see Tool 110 * @see DistributedCache 111 */ 112 @InterfaceAudience.Public 113 @InterfaceStability.Stable 114 public class JobConf extends Configuration { 115 116 private static final Log LOG = LogFactory.getLog(JobConf.class); 117 118 static{ 119 ConfigUtil.loadResources(); 120 } 121 122 /** 123 * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and 124 * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY} 125 */ 126 @Deprecated 127 public static final String MAPRED_TASK_MAXVMEM_PROPERTY = 128 "mapred.task.maxvmem"; 129 130 /** 131 * @deprecated 132 */ 133 @Deprecated 134 public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY = 135 "mapred.task.limit.maxvmem"; 136 137 /** 138 * @deprecated 139 */ 140 @Deprecated 141 public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY = 142 "mapred.task.default.maxvmem"; 143 144 /** 145 * @deprecated 146 */ 147 @Deprecated 148 public static final String MAPRED_TASK_MAXPMEM_PROPERTY = 149 "mapred.task.maxpmem"; 150 151 /** 152 * A value which if set for memory related configuration options, 153 * indicates that the options are turned off. 154 */ 155 public static final long DISABLED_MEMORY_LIMIT = -1L; 156 157 /** 158 * Property name for the configuration property mapreduce.cluster.local.dir 159 */ 160 public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR; 161 162 /** 163 * Name of the queue to which jobs will be submitted, if no queue 164 * name is mentioned. 165 */ 166 public static final String DEFAULT_QUEUE_NAME = "default"; 167 168 static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY = 169 JobContext.MAP_MEMORY_MB; 170 171 static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY = 172 JobContext.REDUCE_MEMORY_MB; 173 174 /** Pattern for the default unpacking behavior for job jars */ 175 public static final Pattern UNPACK_JAR_PATTERN_DEFAULT = 176 Pattern.compile("(?:classes/|lib/).*"); 177 178 /** 179 * Configuration key to set the java command line options for the child 180 * map and reduce tasks. 181 * 182 * Java opts for the task tracker child processes. 183 * The following symbol, if present, will be interpolated: @taskid@. 184 * It is replaced by current TaskID. Any other occurrences of '@' will go 185 * unchanged. 186 * For example, to enable verbose gc logging to a file named for the taskid in 187 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 188 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 189 * 190 * The configuration variable {@link #MAPRED_TASK_ENV} can be used to pass 191 * other environment variables to the child processes. 192 * 193 * @deprecated Use {@link #MAPRED_MAP_TASK_JAVA_OPTS} or 194 * {@link #MAPRED_REDUCE_TASK_JAVA_OPTS} 195 */ 196 @Deprecated 197 public static final String MAPRED_TASK_JAVA_OPTS = "mapred.child.java.opts"; 198 199 /** 200 * Configuration key to set the java command line options for the map tasks. 201 * 202 * Java opts for the task tracker child map processes. 203 * The following symbol, if present, will be interpolated: @taskid@. 204 * It is replaced by current TaskID. Any other occurrences of '@' will go 205 * unchanged. 206 * For example, to enable verbose gc logging to a file named for the taskid in 207 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 208 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 209 * 210 * The configuration variable {@link #MAPRED_MAP_TASK_ENV} can be used to pass 211 * other environment variables to the map processes. 212 */ 213 public static final String MAPRED_MAP_TASK_JAVA_OPTS = 214 JobContext.MAP_JAVA_OPTS; 215 216 /** 217 * Configuration key to set the java command line options for the reduce tasks. 218 * 219 * Java opts for the task tracker child reduce processes. 220 * The following symbol, if present, will be interpolated: @taskid@. 221 * It is replaced by current TaskID. Any other occurrences of '@' will go 222 * unchanged. 223 * For example, to enable verbose gc logging to a file named for the taskid in 224 * /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: 225 * -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc 226 * 227 * The configuration variable {@link #MAPRED_REDUCE_TASK_ENV} can be used to 228 * pass process environment variables to the reduce processes. 229 */ 230 public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 231 JobContext.REDUCE_JAVA_OPTS; 232 233 public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m"; 234 235 /** 236 * @deprecated 237 * Configuration key to set the maximum virtual memory available to the child 238 * map and reduce tasks (in kilo-bytes). This has been deprecated and will no 239 * longer have any effect. 240 */ 241 @Deprecated 242 public static final String MAPRED_TASK_ULIMIT = "mapred.child.ulimit"; 243 244 /** 245 * @deprecated 246 * Configuration key to set the maximum virtual memory available to the 247 * map tasks (in kilo-bytes). This has been deprecated and will no 248 * longer have any effect. 249 */ 250 @Deprecated 251 public static final String MAPRED_MAP_TASK_ULIMIT = "mapreduce.map.ulimit"; 252 253 /** 254 * @deprecated 255 * Configuration key to set the maximum virtual memory available to the 256 * reduce tasks (in kilo-bytes). This has been deprecated and will no 257 * longer have any effect. 258 */ 259 @Deprecated 260 public static final String MAPRED_REDUCE_TASK_ULIMIT = 261 "mapreduce.reduce.ulimit"; 262 263 264 /** 265 * Configuration key to set the environment of the child map/reduce tasks. 266 * 267 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 268 * reference existing environment variables via <code>$key</code>. 269 * 270 * Example: 271 * <ul> 272 * <li> A=foo - This will set the env variable A to foo. </li> 273 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 274 * </ul> 275 * 276 * @deprecated Use {@link #MAPRED_MAP_TASK_ENV} or 277 * {@link #MAPRED_REDUCE_TASK_ENV} 278 */ 279 @Deprecated 280 public static final String MAPRED_TASK_ENV = "mapred.child.env"; 281 282 /** 283 * Configuration key to set the maximum virutal memory available to the 284 * map tasks. 285 * 286 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 287 * reference existing environment variables via <code>$key</code>. 288 * 289 * Example: 290 * <ul> 291 * <li> A=foo - This will set the env variable A to foo. </li> 292 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 293 * </ul> 294 */ 295 public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV; 296 297 /** 298 * Configuration key to set the maximum virutal memory available to the 299 * reduce tasks. 300 * 301 * The format of the value is <code>k1=v1,k2=v2</code>. Further it can 302 * reference existing environment variables via <code>$key</code>. 303 * 304 * Example: 305 * <ul> 306 * <li> A=foo - This will set the env variable A to foo. </li> 307 * <li> B=$X:c This is inherit tasktracker's X env variable. </li> 308 * </ul> 309 */ 310 public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV; 311 312 private Credentials credentials = new Credentials(); 313 314 /** 315 * Configuration key to set the logging {@link Level} for the map task. 316 * 317 * The allowed logging levels are: 318 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 319 */ 320 public static final String MAPRED_MAP_TASK_LOG_LEVEL = 321 JobContext.MAP_LOG_LEVEL; 322 323 /** 324 * Configuration key to set the logging {@link Level} for the reduce task. 325 * 326 * The allowed logging levels are: 327 * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL. 328 */ 329 public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 330 JobContext.REDUCE_LOG_LEVEL; 331 332 /** 333 * Default logging level for map/reduce tasks. 334 */ 335 public static final Level DEFAULT_LOG_LEVEL = Level.INFO; 336 337 338 /** 339 * Construct a map/reduce job configuration. 340 */ 341 public JobConf() { 342 checkAndWarnDeprecation(); 343 } 344 345 /** 346 * Construct a map/reduce job configuration. 347 * 348 * @param exampleClass a class whose containing jar is used as the job's jar. 349 */ 350 public JobConf(Class exampleClass) { 351 setJarByClass(exampleClass); 352 checkAndWarnDeprecation(); 353 } 354 355 /** 356 * Construct a map/reduce job configuration. 357 * 358 * @param conf a Configuration whose settings will be inherited. 359 */ 360 public JobConf(Configuration conf) { 361 super(conf); 362 363 if (conf instanceof JobConf) { 364 JobConf that = (JobConf)conf; 365 credentials = that.credentials; 366 } 367 368 checkAndWarnDeprecation(); 369 } 370 371 372 /** Construct a map/reduce job configuration. 373 * 374 * @param conf a Configuration whose settings will be inherited. 375 * @param exampleClass a class whose containing jar is used as the job's jar. 376 */ 377 public JobConf(Configuration conf, Class exampleClass) { 378 this(conf); 379 setJarByClass(exampleClass); 380 } 381 382 383 /** Construct a map/reduce configuration. 384 * 385 * @param config a Configuration-format XML job description file. 386 */ 387 public JobConf(String config) { 388 this(new Path(config)); 389 } 390 391 /** Construct a map/reduce configuration. 392 * 393 * @param config a Configuration-format XML job description file. 394 */ 395 public JobConf(Path config) { 396 super(); 397 addResource(config); 398 checkAndWarnDeprecation(); 399 } 400 401 /** A new map/reduce configuration where the behavior of reading from the 402 * default resources can be turned off. 403 * <p/> 404 * If the parameter {@code loadDefaults} is false, the new instance 405 * will not load resources from the default files. 406 * 407 * @param loadDefaults specifies whether to load from the default files 408 */ 409 public JobConf(boolean loadDefaults) { 410 super(loadDefaults); 411 checkAndWarnDeprecation(); 412 } 413 414 /** 415 * Get credentials for the job. 416 * @return credentials for the job 417 */ 418 public Credentials getCredentials() { 419 return credentials; 420 } 421 422 void setCredentials(Credentials credentials) { 423 this.credentials = credentials; 424 } 425 426 /** 427 * Get the user jar for the map-reduce job. 428 * 429 * @return the user jar for the map-reduce job. 430 */ 431 public String getJar() { return get(JobContext.JAR); } 432 433 /** 434 * Set the user jar for the map-reduce job. 435 * 436 * @param jar the user jar for the map-reduce job. 437 */ 438 public void setJar(String jar) { set(JobContext.JAR, jar); } 439 440 /** 441 * Get the pattern for jar contents to unpack on the tasktracker 442 */ 443 public Pattern getJarUnpackPattern() { 444 return getPattern(JobContext.JAR_UNPACK_PATTERN, UNPACK_JAR_PATTERN_DEFAULT); 445 } 446 447 448 /** 449 * Set the job's jar file by finding an example class location. 450 * 451 * @param cls the example class. 452 */ 453 public void setJarByClass(Class cls) { 454 String jar = findContainingJar(cls); 455 if (jar != null) { 456 setJar(jar); 457 } 458 } 459 460 public String[] getLocalDirs() throws IOException { 461 return getTrimmedStrings(MRConfig.LOCAL_DIR); 462 } 463 464 /** 465 * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. 466 */ 467 @Deprecated 468 public void deleteLocalFiles() throws IOException { 469 String[] localDirs = getLocalDirs(); 470 for (int i = 0; i < localDirs.length; i++) { 471 FileSystem.getLocal(this).delete(new Path(localDirs[i]), true); 472 } 473 } 474 475 public void deleteLocalFiles(String subdir) throws IOException { 476 String[] localDirs = getLocalDirs(); 477 for (int i = 0; i < localDirs.length; i++) { 478 FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir), true); 479 } 480 } 481 482 /** 483 * Constructs a local file name. Files are distributed among configured 484 * local directories. 485 */ 486 public Path getLocalPath(String pathString) throws IOException { 487 return getLocalPath(MRConfig.LOCAL_DIR, pathString); 488 } 489 490 /** 491 * Get the reported username for this job. 492 * 493 * @return the username 494 */ 495 public String getUser() { 496 return get(JobContext.USER_NAME); 497 } 498 499 /** 500 * Set the reported username for this job. 501 * 502 * @param user the username for this job. 503 */ 504 public void setUser(String user) { 505 set(JobContext.USER_NAME, user); 506 } 507 508 509 510 /** 511 * Set whether the framework should keep the intermediate files for 512 * failed tasks. 513 * 514 * @param keep <code>true</code> if framework should keep the intermediate files 515 * for failed tasks, <code>false</code> otherwise. 516 * 517 */ 518 public void setKeepFailedTaskFiles(boolean keep) { 519 setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep); 520 } 521 522 /** 523 * Should the temporary files for failed tasks be kept? 524 * 525 * @return should the files be kept? 526 */ 527 public boolean getKeepFailedTaskFiles() { 528 return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false); 529 } 530 531 /** 532 * Set a regular expression for task names that should be kept. 533 * The regular expression ".*_m_000123_0" would keep the files 534 * for the first instance of map 123 that ran. 535 * 536 * @param pattern the java.util.regex.Pattern to match against the 537 * task names. 538 */ 539 public void setKeepTaskFilesPattern(String pattern) { 540 set(JobContext.PRESERVE_FILES_PATTERN, pattern); 541 } 542 543 /** 544 * Get the regular expression that is matched against the task names 545 * to see if we need to keep the files. 546 * 547 * @return the pattern as a string, if it was set, othewise null. 548 */ 549 public String getKeepTaskFilesPattern() { 550 return get(JobContext.PRESERVE_FILES_PATTERN); 551 } 552 553 /** 554 * Set the current working directory for the default file system. 555 * 556 * @param dir the new current working directory. 557 */ 558 public void setWorkingDirectory(Path dir) { 559 dir = new Path(getWorkingDirectory(), dir); 560 set(JobContext.WORKING_DIR, dir.toString()); 561 } 562 563 /** 564 * Get the current working directory for the default file system. 565 * 566 * @return the directory name. 567 */ 568 public Path getWorkingDirectory() { 569 String name = get(JobContext.WORKING_DIR); 570 if (name != null) { 571 return new Path(name); 572 } else { 573 try { 574 Path dir = FileSystem.get(this).getWorkingDirectory(); 575 set(JobContext.WORKING_DIR, dir.toString()); 576 return dir; 577 } catch (IOException e) { 578 throw new RuntimeException(e); 579 } 580 } 581 } 582 583 /** 584 * Sets the number of tasks that a spawned task JVM should run 585 * before it exits 586 * @param numTasks the number of tasks to execute; defaults to 1; 587 * -1 signifies no limit 588 */ 589 public void setNumTasksToExecutePerJvm(int numTasks) { 590 setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks); 591 } 592 593 /** 594 * Get the number of tasks that a spawned JVM should execute 595 */ 596 public int getNumTasksToExecutePerJvm() { 597 return getInt(JobContext.JVM_NUMTASKS_TORUN, 1); 598 } 599 600 /** 601 * Get the {@link InputFormat} implementation for the map-reduce job, 602 * defaults to {@link TextInputFormat} if not specified explicity. 603 * 604 * @return the {@link InputFormat} implementation for the map-reduce job. 605 */ 606 public InputFormat getInputFormat() { 607 return ReflectionUtils.newInstance(getClass("mapred.input.format.class", 608 TextInputFormat.class, 609 InputFormat.class), 610 this); 611 } 612 613 /** 614 * Set the {@link InputFormat} implementation for the map-reduce job. 615 * 616 * @param theClass the {@link InputFormat} implementation for the map-reduce 617 * job. 618 */ 619 public void setInputFormat(Class<? extends InputFormat> theClass) { 620 setClass("mapred.input.format.class", theClass, InputFormat.class); 621 } 622 623 /** 624 * Get the {@link OutputFormat} implementation for the map-reduce job, 625 * defaults to {@link TextOutputFormat} if not specified explicity. 626 * 627 * @return the {@link OutputFormat} implementation for the map-reduce job. 628 */ 629 public OutputFormat getOutputFormat() { 630 return ReflectionUtils.newInstance(getClass("mapred.output.format.class", 631 TextOutputFormat.class, 632 OutputFormat.class), 633 this); 634 } 635 636 /** 637 * Get the {@link OutputCommitter} implementation for the map-reduce job, 638 * defaults to {@link FileOutputCommitter} if not specified explicitly. 639 * 640 * @return the {@link OutputCommitter} implementation for the map-reduce job. 641 */ 642 public OutputCommitter getOutputCommitter() { 643 return (OutputCommitter)ReflectionUtils.newInstance( 644 getClass("mapred.output.committer.class", FileOutputCommitter.class, 645 OutputCommitter.class), this); 646 } 647 648 /** 649 * Set the {@link OutputCommitter} implementation for the map-reduce job. 650 * 651 * @param theClass the {@link OutputCommitter} implementation for the map-reduce 652 * job. 653 */ 654 public void setOutputCommitter(Class<? extends OutputCommitter> theClass) { 655 setClass("mapred.output.committer.class", theClass, OutputCommitter.class); 656 } 657 658 /** 659 * Set the {@link OutputFormat} implementation for the map-reduce job. 660 * 661 * @param theClass the {@link OutputFormat} implementation for the map-reduce 662 * job. 663 */ 664 public void setOutputFormat(Class<? extends OutputFormat> theClass) { 665 setClass("mapred.output.format.class", theClass, OutputFormat.class); 666 } 667 668 /** 669 * Should the map outputs be compressed before transfer? 670 * Uses the SequenceFile compression. 671 * 672 * @param compress should the map outputs be compressed? 673 */ 674 public void setCompressMapOutput(boolean compress) { 675 setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress); 676 } 677 678 /** 679 * Are the outputs of the maps be compressed? 680 * 681 * @return <code>true</code> if the outputs of the maps are to be compressed, 682 * <code>false</code> otherwise. 683 */ 684 public boolean getCompressMapOutput() { 685 return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false); 686 } 687 688 /** 689 * Set the given class as the {@link CompressionCodec} for the map outputs. 690 * 691 * @param codecClass the {@link CompressionCodec} class that will compress 692 * the map outputs. 693 */ 694 public void 695 setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) { 696 setCompressMapOutput(true); 697 setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass, 698 CompressionCodec.class); 699 } 700 701 /** 702 * Get the {@link CompressionCodec} for compressing the map outputs. 703 * 704 * @param defaultValue the {@link CompressionCodec} to return if not set 705 * @return the {@link CompressionCodec} class that should be used to compress the 706 * map outputs. 707 * @throws IllegalArgumentException if the class was specified, but not found 708 */ 709 public Class<? extends CompressionCodec> 710 getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) { 711 Class<? extends CompressionCodec> codecClass = defaultValue; 712 String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC); 713 if (name != null) { 714 try { 715 codecClass = getClassByName(name).asSubclass(CompressionCodec.class); 716 } catch (ClassNotFoundException e) { 717 throw new IllegalArgumentException("Compression codec " + name + 718 " was not found.", e); 719 } 720 } 721 return codecClass; 722 } 723 724 /** 725 * Get the key class for the map output data. If it is not set, use the 726 * (final) output key class. This allows the map output key class to be 727 * different than the final output key class. 728 * 729 * @return the map output key class. 730 */ 731 public Class<?> getMapOutputKeyClass() { 732 Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); 733 if (retv == null) { 734 retv = getOutputKeyClass(); 735 } 736 return retv; 737 } 738 739 /** 740 * Set the key class for the map output data. This allows the user to 741 * specify the map output key class to be different than the final output 742 * value class. 743 * 744 * @param theClass the map output key class. 745 */ 746 public void setMapOutputKeyClass(Class<?> theClass) { 747 setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class); 748 } 749 750 /** 751 * Get the value class for the map output data. If it is not set, use the 752 * (final) output value class This allows the map output value class to be 753 * different than the final output value class. 754 * 755 * @return the map output value class. 756 */ 757 public Class<?> getMapOutputValueClass() { 758 Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null, 759 Object.class); 760 if (retv == null) { 761 retv = getOutputValueClass(); 762 } 763 return retv; 764 } 765 766 /** 767 * Set the value class for the map output data. This allows the user to 768 * specify the map output value class to be different than the final output 769 * value class. 770 * 771 * @param theClass the map output value class. 772 */ 773 public void setMapOutputValueClass(Class<?> theClass) { 774 setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class); 775 } 776 777 /** 778 * Get the key class for the job output data. 779 * 780 * @return the key class for the job output data. 781 */ 782 public Class<?> getOutputKeyClass() { 783 return getClass(JobContext.OUTPUT_KEY_CLASS, 784 LongWritable.class, Object.class); 785 } 786 787 /** 788 * Set the key class for the job output data. 789 * 790 * @param theClass the key class for the job output data. 791 */ 792 public void setOutputKeyClass(Class<?> theClass) { 793 setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class); 794 } 795 796 /** 797 * Get the {@link RawComparator} comparator used to compare keys. 798 * 799 * @return the {@link RawComparator} comparator used to compare keys. 800 */ 801 public RawComparator getOutputKeyComparator() { 802 Class<? extends RawComparator> theClass = getClass( 803 JobContext.KEY_COMPARATOR, null, RawComparator.class); 804 if (theClass != null) 805 return ReflectionUtils.newInstance(theClass, this); 806 return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); 807 } 808 809 /** 810 * Set the {@link RawComparator} comparator used to compare keys. 811 * 812 * @param theClass the {@link RawComparator} comparator used to 813 * compare keys. 814 * @see #setOutputValueGroupingComparator(Class) 815 */ 816 public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) { 817 setClass(JobContext.KEY_COMPARATOR, 818 theClass, RawComparator.class); 819 } 820 821 /** 822 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 823 * 824 * @param keySpec the key specification of the form -k pos1[,pos2], where, 825 * pos is of the form f[.c][opts], where f is the number 826 * of the key field to use, and c is the number of the first character from 827 * the beginning of the field. Fields and character posns are numbered 828 * starting with 1; a character position of zero in pos2 indicates the 829 * field's last character. If '.c' is omitted from pos1, it defaults to 1 830 * (the beginning of the field); if omitted from pos2, it defaults to 0 831 * (the end of the field). opts are ordering options. The supported options 832 * are: 833 * -n, (Sort numerically) 834 * -r, (Reverse the result of comparison) 835 */ 836 public void setKeyFieldComparatorOptions(String keySpec) { 837 setOutputKeyComparatorClass(KeyFieldBasedComparator.class); 838 set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec); 839 } 840 841 /** 842 * Get the {@link KeyFieldBasedComparator} options 843 */ 844 public String getKeyFieldComparatorOption() { 845 return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS); 846 } 847 848 /** 849 * Set the {@link KeyFieldBasedPartitioner} options used for 850 * {@link Partitioner} 851 * 852 * @param keySpec the key specification of the form -k pos1[,pos2], where, 853 * pos is of the form f[.c][opts], where f is the number 854 * of the key field to use, and c is the number of the first character from 855 * the beginning of the field. Fields and character posns are numbered 856 * starting with 1; a character position of zero in pos2 indicates the 857 * field's last character. If '.c' is omitted from pos1, it defaults to 1 858 * (the beginning of the field); if omitted from pos2, it defaults to 0 859 * (the end of the field). 860 */ 861 public void setKeyFieldPartitionerOptions(String keySpec) { 862 setPartitionerClass(KeyFieldBasedPartitioner.class); 863 set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec); 864 } 865 866 /** 867 * Get the {@link KeyFieldBasedPartitioner} options 868 */ 869 public String getKeyFieldPartitionerOption() { 870 return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); 871 } 872 873 /** 874 * Get the user defined {@link WritableComparable} comparator for 875 * grouping keys of inputs to the reduce. 876 * 877 * @return comparator set by the user for grouping values. 878 * @see #setOutputValueGroupingComparator(Class) for details. 879 */ 880 public RawComparator getOutputValueGroupingComparator() { 881 Class<? extends RawComparator> theClass = getClass( 882 JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); 883 if (theClass == null) { 884 return getOutputKeyComparator(); 885 } 886 887 return ReflectionUtils.newInstance(theClass, this); 888 } 889 890 /** 891 * Set the user defined {@link RawComparator} comparator for 892 * grouping keys in the input to the reduce. 893 * 894 * <p>This comparator should be provided if the equivalence rules for keys 895 * for sorting the intermediates are different from those for grouping keys 896 * before each call to 897 * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> 898 * 899 * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed 900 * in a single call to the reduce function if K1 and K2 compare as equal.</p> 901 * 902 * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 903 * how keys are sorted, this can be used in conjunction to simulate 904 * <i>secondary sort on values</i>.</p> 905 * 906 * <p><i>Note</i>: This is not a guarantee of the reduce sort being 907 * <i>stable</i> in any sense. (In any case, with the order of available 908 * map-outputs to the reduce being non-deterministic, it wouldn't make 909 * that much sense.)</p> 910 * 911 * @param theClass the comparator class to be used for grouping keys. 912 * It should implement <code>RawComparator</code>. 913 * @see #setOutputKeyComparatorClass(Class) 914 */ 915 public void setOutputValueGroupingComparator( 916 Class<? extends RawComparator> theClass) { 917 setClass(JobContext.GROUP_COMPARATOR_CLASS, 918 theClass, RawComparator.class); 919 } 920 921 /** 922 * Should the framework use the new context-object code for running 923 * the mapper? 924 * @return true, if the new api should be used 925 */ 926 public boolean getUseNewMapper() { 927 return getBoolean("mapred.mapper.new-api", false); 928 } 929 /** 930 * Set whether the framework should use the new api for the mapper. 931 * This is the default for jobs submitted with the new Job api. 932 * @param flag true, if the new api should be used 933 */ 934 public void setUseNewMapper(boolean flag) { 935 setBoolean("mapred.mapper.new-api", flag); 936 } 937 938 /** 939 * Should the framework use the new context-object code for running 940 * the reducer? 941 * @return true, if the new api should be used 942 */ 943 public boolean getUseNewReducer() { 944 return getBoolean("mapred.reducer.new-api", false); 945 } 946 /** 947 * Set whether the framework should use the new api for the reducer. 948 * This is the default for jobs submitted with the new Job api. 949 * @param flag true, if the new api should be used 950 */ 951 public void setUseNewReducer(boolean flag) { 952 setBoolean("mapred.reducer.new-api", flag); 953 } 954 955 /** 956 * Get the value class for job outputs. 957 * 958 * @return the value class for job outputs. 959 */ 960 public Class<?> getOutputValueClass() { 961 return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class); 962 } 963 964 /** 965 * Set the value class for job outputs. 966 * 967 * @param theClass the value class for job outputs. 968 */ 969 public void setOutputValueClass(Class<?> theClass) { 970 setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class); 971 } 972 973 /** 974 * Get the {@link Mapper} class for the job. 975 * 976 * @return the {@link Mapper} class for the job. 977 */ 978 public Class<? extends Mapper> getMapperClass() { 979 return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class); 980 } 981 982 /** 983 * Set the {@link Mapper} class for the job. 984 * 985 * @param theClass the {@link Mapper} class for the job. 986 */ 987 public void setMapperClass(Class<? extends Mapper> theClass) { 988 setClass("mapred.mapper.class", theClass, Mapper.class); 989 } 990 991 /** 992 * Get the {@link MapRunnable} class for the job. 993 * 994 * @return the {@link MapRunnable} class for the job. 995 */ 996 public Class<? extends MapRunnable> getMapRunnerClass() { 997 return getClass("mapred.map.runner.class", 998 MapRunner.class, MapRunnable.class); 999 } 1000 1001 /** 1002 * Expert: Set the {@link MapRunnable} class for the job. 1003 * 1004 * Typically used to exert greater control on {@link Mapper}s. 1005 * 1006 * @param theClass the {@link MapRunnable} class for the job. 1007 */ 1008 public void setMapRunnerClass(Class<? extends MapRunnable> theClass) { 1009 setClass("mapred.map.runner.class", theClass, MapRunnable.class); 1010 } 1011 1012 /** 1013 * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 1014 * to be sent to the {@link Reducer}s. 1015 * 1016 * @return the {@link Partitioner} used to partition map-outputs. 1017 */ 1018 public Class<? extends Partitioner> getPartitionerClass() { 1019 return getClass("mapred.partitioner.class", 1020 HashPartitioner.class, Partitioner.class); 1021 } 1022 1023 /** 1024 * Set the {@link Partitioner} class used to partition 1025 * {@link Mapper}-outputs to be sent to the {@link Reducer}s. 1026 * 1027 * @param theClass the {@link Partitioner} used to partition map-outputs. 1028 */ 1029 public void setPartitionerClass(Class<? extends Partitioner> theClass) { 1030 setClass("mapred.partitioner.class", theClass, Partitioner.class); 1031 } 1032 1033 /** 1034 * Get the {@link Reducer} class for the job. 1035 * 1036 * @return the {@link Reducer} class for the job. 1037 */ 1038 public Class<? extends Reducer> getReducerClass() { 1039 return getClass("mapred.reducer.class", 1040 IdentityReducer.class, Reducer.class); 1041 } 1042 1043 /** 1044 * Set the {@link Reducer} class for the job. 1045 * 1046 * @param theClass the {@link Reducer} class for the job. 1047 */ 1048 public void setReducerClass(Class<? extends Reducer> theClass) { 1049 setClass("mapred.reducer.class", theClass, Reducer.class); 1050 } 1051 1052 /** 1053 * Get the user-defined <i>combiner</i> class used to combine map-outputs 1054 * before being sent to the reducers. Typically the combiner is same as the 1055 * the {@link Reducer} for the job i.e. {@link #getReducerClass()}. 1056 * 1057 * @return the user-defined combiner class used to combine map-outputs. 1058 */ 1059 public Class<? extends Reducer> getCombinerClass() { 1060 return getClass("mapred.combiner.class", null, Reducer.class); 1061 } 1062 1063 /** 1064 * Set the user-defined <i>combiner</i> class used to combine map-outputs 1065 * before being sent to the reducers. 1066 * 1067 * <p>The combiner is an application-specified aggregation operation, which 1068 * can help cut down the amount of data transferred between the 1069 * {@link Mapper} and the {@link Reducer}, leading to better performance.</p> 1070 * 1071 * <p>The framework may invoke the combiner 0, 1, or multiple times, in both 1072 * the mapper and reducer tasks. In general, the combiner is called as the 1073 * sort/merge result is written to disk. The combiner must: 1074 * <ul> 1075 * <li> be side-effect free</li> 1076 * <li> have the same input and output key types and the same input and 1077 * output value types</li> 1078 * </ul></p> 1079 * 1080 * <p>Typically the combiner is same as the <code>Reducer</code> for the 1081 * job i.e. {@link #setReducerClass(Class)}.</p> 1082 * 1083 * @param theClass the user-defined combiner class used to combine 1084 * map-outputs. 1085 */ 1086 public void setCombinerClass(Class<? extends Reducer> theClass) { 1087 setClass("mapred.combiner.class", theClass, Reducer.class); 1088 } 1089 1090 /** 1091 * Should speculative execution be used for this job? 1092 * Defaults to <code>true</code>. 1093 * 1094 * @return <code>true</code> if speculative execution be used for this job, 1095 * <code>false</code> otherwise. 1096 */ 1097 public boolean getSpeculativeExecution() { 1098 return (getMapSpeculativeExecution() || getReduceSpeculativeExecution()); 1099 } 1100 1101 /** 1102 * Turn speculative execution on or off for this job. 1103 * 1104 * @param speculativeExecution <code>true</code> if speculative execution 1105 * should be turned on, else <code>false</code>. 1106 */ 1107 public void setSpeculativeExecution(boolean speculativeExecution) { 1108 setMapSpeculativeExecution(speculativeExecution); 1109 setReduceSpeculativeExecution(speculativeExecution); 1110 } 1111 1112 /** 1113 * Should speculative execution be used for this job for map tasks? 1114 * Defaults to <code>true</code>. 1115 * 1116 * @return <code>true</code> if speculative execution be 1117 * used for this job for map tasks, 1118 * <code>false</code> otherwise. 1119 */ 1120 public boolean getMapSpeculativeExecution() { 1121 return getBoolean(JobContext.MAP_SPECULATIVE, true); 1122 } 1123 1124 /** 1125 * Turn speculative execution on or off for this job for map tasks. 1126 * 1127 * @param speculativeExecution <code>true</code> if speculative execution 1128 * should be turned on for map tasks, 1129 * else <code>false</code>. 1130 */ 1131 public void setMapSpeculativeExecution(boolean speculativeExecution) { 1132 setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution); 1133 } 1134 1135 /** 1136 * Should speculative execution be used for this job for reduce tasks? 1137 * Defaults to <code>true</code>. 1138 * 1139 * @return <code>true</code> if speculative execution be used 1140 * for reduce tasks for this job, 1141 * <code>false</code> otherwise. 1142 */ 1143 public boolean getReduceSpeculativeExecution() { 1144 return getBoolean(JobContext.REDUCE_SPECULATIVE, true); 1145 } 1146 1147 /** 1148 * Turn speculative execution on or off for this job for reduce tasks. 1149 * 1150 * @param speculativeExecution <code>true</code> if speculative execution 1151 * should be turned on for reduce tasks, 1152 * else <code>false</code>. 1153 */ 1154 public void setReduceSpeculativeExecution(boolean speculativeExecution) { 1155 setBoolean(JobContext.REDUCE_SPECULATIVE, 1156 speculativeExecution); 1157 } 1158 1159 /** 1160 * Get configured the number of reduce tasks for this job. 1161 * Defaults to <code>1</code>. 1162 * 1163 * @return the number of reduce tasks for this job. 1164 */ 1165 public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); } 1166 1167 /** 1168 * Set the number of map tasks for this job. 1169 * 1170 * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 1171 * number of spawned map tasks depends on the number of {@link InputSplit}s 1172 * generated by the job's {@link InputFormat#getSplits(JobConf, int)}. 1173 * 1174 * A custom {@link InputFormat} is typically used to accurately control 1175 * the number of map tasks for the job.</p> 1176 * 1177 * <h4 id="NoOfMaps">How many maps?</h4> 1178 * 1179 * <p>The number of maps is usually driven by the total size of the inputs 1180 * i.e. total number of blocks of the input files.</p> 1181 * 1182 * <p>The right level of parallelism for maps seems to be around 10-100 maps 1183 * per-node, although it has been set up to 300 or so for very cpu-light map 1184 * tasks. Task setup takes awhile, so it is best if the maps take at least a 1185 * minute to execute.</p> 1186 * 1187 * <p>The default behavior of file-based {@link InputFormat}s is to split the 1188 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 1189 * bytes, of input files. However, the {@link FileSystem} blocksize of the 1190 * input files is treated as an upper bound for input splits. A lower bound 1191 * on the split size can be set via 1192 * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> 1193 * mapreduce.input.fileinputformat.split.minsize</a>.</p> 1194 * 1195 * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 1196 * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 1197 * used to set it even higher.</p> 1198 * 1199 * @param n the number of map tasks for this job. 1200 * @see InputFormat#getSplits(JobConf, int) 1201 * @see FileInputFormat 1202 * @see FileSystem#getDefaultBlockSize() 1203 * @see FileStatus#getBlockSize() 1204 */ 1205 public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); } 1206 1207 /** 1208 * Get configured the number of reduce tasks for this job. Defaults to 1209 * <code>1</code>. 1210 * 1211 * @return the number of reduce tasks for this job. 1212 */ 1213 public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); } 1214 1215 /** 1216 * Set the requisite number of reduce tasks for this job. 1217 * 1218 * <h4 id="NoOfReduces">How many reduces?</h4> 1219 * 1220 * <p>The right number of reduces seems to be <code>0.95</code> or 1221 * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> * 1222 * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum"> 1223 * mapreduce.tasktracker.reduce.tasks.maximum</a>). 1224 * </p> 1225 * 1226 * <p>With <code>0.95</code> all of the reduces can launch immediately and 1227 * start transfering map outputs as the maps finish. With <code>1.75</code> 1228 * the faster nodes will finish their first round of reduces and launch a 1229 * second wave of reduces doing a much better job of load balancing.</p> 1230 * 1231 * <p>Increasing the number of reduces increases the framework overhead, but 1232 * increases load balancing and lowers the cost of failures.</p> 1233 * 1234 * <p>The scaling factors above are slightly less than whole numbers to 1235 * reserve a few reduce slots in the framework for speculative-tasks, failures 1236 * etc.</p> 1237 * 1238 * <h4 id="ReducerNone">Reducer NONE</h4> 1239 * 1240 * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p> 1241 * 1242 * <p>In this case the output of the map-tasks directly go to distributed 1243 * file-system, to the path set by 1244 * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 1245 * framework doesn't sort the map-outputs before writing it out to HDFS.</p> 1246 * 1247 * @param n the number of reduce tasks for this job. 1248 */ 1249 public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); } 1250 1251 /** 1252 * Get the configured number of maximum attempts that will be made to run a 1253 * map task, as specified by the <code>mapreduce.map.maxattempts</code> 1254 * property. If this property is not already set, the default is 4 attempts. 1255 * 1256 * @return the max number of attempts per map task. 1257 */ 1258 public int getMaxMapAttempts() { 1259 return getInt(JobContext.MAP_MAX_ATTEMPTS, 4); 1260 } 1261 1262 /** 1263 * Expert: Set the number of maximum attempts that will be made to run a 1264 * map task. 1265 * 1266 * @param n the number of attempts per map task. 1267 */ 1268 public void setMaxMapAttempts(int n) { 1269 setInt(JobContext.MAP_MAX_ATTEMPTS, n); 1270 } 1271 1272 /** 1273 * Get the configured number of maximum attempts that will be made to run a 1274 * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code> 1275 * property. If this property is not already set, the default is 4 attempts. 1276 * 1277 * @return the max number of attempts per reduce task. 1278 */ 1279 public int getMaxReduceAttempts() { 1280 return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4); 1281 } 1282 /** 1283 * Expert: Set the number of maximum attempts that will be made to run a 1284 * reduce task. 1285 * 1286 * @param n the number of attempts per reduce task. 1287 */ 1288 public void setMaxReduceAttempts(int n) { 1289 setInt(JobContext.REDUCE_MAX_ATTEMPTS, n); 1290 } 1291 1292 /** 1293 * Get the user-specified job name. This is only used to identify the 1294 * job to the user. 1295 * 1296 * @return the job's name, defaulting to "". 1297 */ 1298 public String getJobName() { 1299 return get(JobContext.JOB_NAME, ""); 1300 } 1301 1302 /** 1303 * Set the user-specified job name. 1304 * 1305 * @param name the job's new name. 1306 */ 1307 public void setJobName(String name) { 1308 set(JobContext.JOB_NAME, name); 1309 } 1310 1311 /** 1312 * Get the user-specified session identifier. The default is the empty string. 1313 * 1314 * The session identifier is used to tag metric data that is reported to some 1315 * performance metrics system via the org.apache.hadoop.metrics API. The 1316 * session identifier is intended, in particular, for use by Hadoop-On-Demand 1317 * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 1318 * HOD will set the session identifier by modifying the mapred-site.xml file 1319 * before starting the cluster. 1320 * 1321 * When not running under HOD, this identifer is expected to remain set to 1322 * the empty string. 1323 * 1324 * @return the session identifier, defaulting to "". 1325 */ 1326 @Deprecated 1327 public String getSessionId() { 1328 return get("session.id", ""); 1329 } 1330 1331 /** 1332 * Set the user-specified session identifier. 1333 * 1334 * @param sessionId the new session id. 1335 */ 1336 @Deprecated 1337 public void setSessionId(String sessionId) { 1338 set("session.id", sessionId); 1339 } 1340 1341 /** 1342 * Set the maximum no. of failures of a given job per tasktracker. 1343 * If the no. of task failures exceeds <code>noFailures</code>, the 1344 * tasktracker is <i>blacklisted</i> for this job. 1345 * 1346 * @param noFailures maximum no. of failures of a given job per tasktracker. 1347 */ 1348 public void setMaxTaskFailuresPerTracker(int noFailures) { 1349 setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures); 1350 } 1351 1352 /** 1353 * Expert: Get the maximum no. of failures of a given job per tasktracker. 1354 * If the no. of task failures exceeds this, the tasktracker is 1355 * <i>blacklisted</i> for this job. 1356 * 1357 * @return the maximum no. of failures of a given job per tasktracker. 1358 */ 1359 public int getMaxTaskFailuresPerTracker() { 1360 return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4); 1361 } 1362 1363 /** 1364 * Get the maximum percentage of map tasks that can fail without 1365 * the job being aborted. 1366 * 1367 * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 1368 * attempts before being declared as <i>failed</i>. 1369 * 1370 * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in 1371 * the job being declared as {@link JobStatus#FAILED}. 1372 * 1373 * @return the maximum percentage of map tasks that can fail without 1374 * the job being aborted. 1375 */ 1376 public int getMaxMapTaskFailuresPercent() { 1377 return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0); 1378 } 1379 1380 /** 1381 * Expert: Set the maximum percentage of map tasks that can fail without the 1382 * job being aborted. 1383 * 1384 * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 1385 * before being declared as <i>failed</i>. 1386 * 1387 * @param percent the maximum percentage of map tasks that can fail without 1388 * the job being aborted. 1389 */ 1390 public void setMaxMapTaskFailuresPercent(int percent) { 1391 setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent); 1392 } 1393 1394 /** 1395 * Get the maximum percentage of reduce tasks that can fail without 1396 * the job being aborted. 1397 * 1398 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1399 * attempts before being declared as <i>failed</i>. 1400 * 1401 * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 1402 * in the job being declared as {@link JobStatus#FAILED}. 1403 * 1404 * @return the maximum percentage of reduce tasks that can fail without 1405 * the job being aborted. 1406 */ 1407 public int getMaxReduceTaskFailuresPercent() { 1408 return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0); 1409 } 1410 1411 /** 1412 * Set the maximum percentage of reduce tasks that can fail without the job 1413 * being aborted. 1414 * 1415 * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 1416 * attempts before being declared as <i>failed</i>. 1417 * 1418 * @param percent the maximum percentage of reduce tasks that can fail without 1419 * the job being aborted. 1420 */ 1421 public void setMaxReduceTaskFailuresPercent(int percent) { 1422 setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent); 1423 } 1424 1425 /** 1426 * Set {@link JobPriority} for this job. 1427 * 1428 * @param prio the {@link JobPriority} for this job. 1429 */ 1430 public void setJobPriority(JobPriority prio) { 1431 set(JobContext.PRIORITY, prio.toString()); 1432 } 1433 1434 /** 1435 * Get the {@link JobPriority} for this job. 1436 * 1437 * @return the {@link JobPriority} for this job. 1438 */ 1439 public JobPriority getJobPriority() { 1440 String prio = get(JobContext.PRIORITY); 1441 if(prio == null) { 1442 return JobPriority.NORMAL; 1443 } 1444 1445 return JobPriority.valueOf(prio); 1446 } 1447 1448 /** 1449 * Set JobSubmitHostName for this job. 1450 * 1451 * @param hostname the JobSubmitHostName for this job. 1452 */ 1453 void setJobSubmitHostName(String hostname) { 1454 set(MRJobConfig.JOB_SUBMITHOST, hostname); 1455 } 1456 1457 /** 1458 * Get the JobSubmitHostName for this job. 1459 * 1460 * @return the JobSubmitHostName for this job. 1461 */ 1462 String getJobSubmitHostName() { 1463 String hostname = get(MRJobConfig.JOB_SUBMITHOST); 1464 1465 return hostname; 1466 } 1467 1468 /** 1469 * Set JobSubmitHostAddress for this job. 1470 * 1471 * @param hostadd the JobSubmitHostAddress for this job. 1472 */ 1473 void setJobSubmitHostAddress(String hostadd) { 1474 set(MRJobConfig.JOB_SUBMITHOSTADDR, hostadd); 1475 } 1476 1477 /** 1478 * Get JobSubmitHostAddress for this job. 1479 * 1480 * @return JobSubmitHostAddress for this job. 1481 */ 1482 String getJobSubmitHostAddress() { 1483 String hostadd = get(MRJobConfig.JOB_SUBMITHOSTADDR); 1484 1485 return hostadd; 1486 } 1487 1488 /** 1489 * Get whether the task profiling is enabled. 1490 * @return true if some tasks will be profiled 1491 */ 1492 public boolean getProfileEnabled() { 1493 return getBoolean(JobContext.TASK_PROFILE, false); 1494 } 1495 1496 /** 1497 * Set whether the system should collect profiler information for some of 1498 * the tasks in this job? The information is stored in the user log 1499 * directory. 1500 * @param newValue true means it should be gathered 1501 */ 1502 public void setProfileEnabled(boolean newValue) { 1503 setBoolean(JobContext.TASK_PROFILE, newValue); 1504 } 1505 1506 /** 1507 * Get the profiler configuration arguments. 1508 * 1509 * The default value for this property is 1510 * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s" 1511 * 1512 * @return the parameters to pass to the task child to configure profiling 1513 */ 1514 public String getProfileParams() { 1515 return get(JobContext.TASK_PROFILE_PARAMS, 1516 "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," + 1517 "verbose=n,file=%s"); 1518 } 1519 1520 /** 1521 * Set the profiler configuration arguments. If the string contains a '%s' it 1522 * will be replaced with the name of the profiling output file when the task 1523 * runs. 1524 * 1525 * This value is passed to the task child JVM on the command line. 1526 * 1527 * @param value the configuration string 1528 */ 1529 public void setProfileParams(String value) { 1530 set(JobContext.TASK_PROFILE_PARAMS, value); 1531 } 1532 1533 /** 1534 * Get the range of maps or reduces to profile. 1535 * @param isMap is the task a map? 1536 * @return the task ranges 1537 */ 1538 public IntegerRanges getProfileTaskRange(boolean isMap) { 1539 return getRange((isMap ? JobContext.NUM_MAP_PROFILES : 1540 JobContext.NUM_REDUCE_PROFILES), "0-2"); 1541 } 1542 1543 /** 1544 * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 1545 * must also be called. 1546 * @param newValue a set of integer ranges of the map ids 1547 */ 1548 public void setProfileTaskRange(boolean isMap, String newValue) { 1549 // parse the value to make sure it is legal 1550 new Configuration.IntegerRanges(newValue); 1551 set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES), 1552 newValue); 1553 } 1554 1555 /** 1556 * Set the debug script to run when the map tasks fail. 1557 * 1558 * <p>The debug script can aid debugging of failed map tasks. The script is 1559 * given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1560 * 1561 * <p>The debug command, run on the node where the map failed, is:</p> 1562 * <p><pre><blockquote> 1563 * $script $stdout $stderr $syslog $jobconf. 1564 * </blockquote></pre></p> 1565 * 1566 * <p> The script file is distributed through {@link DistributedCache} 1567 * APIs. The script needs to be symlinked. </p> 1568 * 1569 * <p>Here is an example on how to submit a script 1570 * <p><blockquote><pre> 1571 * job.setMapDebugScript("./myscript"); 1572 * DistributedCache.createSymlink(job); 1573 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1574 * </pre></blockquote></p> 1575 * 1576 * @param mDbgScript the script name 1577 */ 1578 public void setMapDebugScript(String mDbgScript) { 1579 set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript); 1580 } 1581 1582 /** 1583 * Get the map task's debug script. 1584 * 1585 * @return the debug Script for the mapred job for failed map tasks. 1586 * @see #setMapDebugScript(String) 1587 */ 1588 public String getMapDebugScript() { 1589 return get(JobContext.MAP_DEBUG_SCRIPT); 1590 } 1591 1592 /** 1593 * Set the debug script to run when the reduce tasks fail. 1594 * 1595 * <p>The debug script can aid debugging of failed reduce tasks. The script 1596 * is given task's stdout, stderr, syslog, jobconf files as arguments.</p> 1597 * 1598 * <p>The debug command, run on the node where the map failed, is:</p> 1599 * <p><pre><blockquote> 1600 * $script $stdout $stderr $syslog $jobconf. 1601 * </blockquote></pre></p> 1602 * 1603 * <p> The script file is distributed through {@link DistributedCache} 1604 * APIs. The script file needs to be symlinked </p> 1605 * 1606 * <p>Here is an example on how to submit a script 1607 * <p><blockquote><pre> 1608 * job.setReduceDebugScript("./myscript"); 1609 * DistributedCache.createSymlink(job); 1610 * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); 1611 * </pre></blockquote></p> 1612 * 1613 * @param rDbgScript the script name 1614 */ 1615 public void setReduceDebugScript(String rDbgScript) { 1616 set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript); 1617 } 1618 1619 /** 1620 * Get the reduce task's debug Script 1621 * 1622 * @return the debug script for the mapred job for failed reduce tasks. 1623 * @see #setReduceDebugScript(String) 1624 */ 1625 public String getReduceDebugScript() { 1626 return get(JobContext.REDUCE_DEBUG_SCRIPT); 1627 } 1628 1629 /** 1630 * Get the uri to be invoked in-order to send a notification after the job 1631 * has completed (success/failure). 1632 * 1633 * @return the job end notification uri, <code>null</code> if it hasn't 1634 * been set. 1635 * @see #setJobEndNotificationURI(String) 1636 */ 1637 public String getJobEndNotificationURI() { 1638 return get(JobContext.MR_JOB_END_NOTIFICATION_URL); 1639 } 1640 1641 /** 1642 * Set the uri to be invoked in-order to send a notification after the job 1643 * has completed (success/failure). 1644 * 1645 * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 1646 * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 1647 * identifier and completion-status respectively.</p> 1648 * 1649 * <p>This is typically used by application-writers to implement chaining of 1650 * Map-Reduce jobs in an <i>asynchronous manner</i>.</p> 1651 * 1652 * @param uri the job end notification uri 1653 * @see JobStatus 1654 * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html# 1655 * JobCompletionAndChaining">Job Completion and Chaining</a> 1656 */ 1657 public void setJobEndNotificationURI(String uri) { 1658 set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri); 1659 } 1660 1661 /** 1662 * Get job-specific shared directory for use as scratch space 1663 * 1664 * <p> 1665 * When a job starts, a shared directory is created at location 1666 * <code> 1667 * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>. 1668 * This directory is exposed to the users through 1669 * <code>mapreduce.job.local.dir </code>. 1670 * So, the tasks can use this space 1671 * as scratch space and share files among them. </p> 1672 * This value is available as System property also. 1673 * 1674 * @return The localized job specific shared directory 1675 */ 1676 public String getJobLocalDir() { 1677 return get(JobContext.JOB_LOCAL_DIR); 1678 } 1679 1680 /** 1681 * Get memory required to run a map task of the job, in MB. 1682 * 1683 * If a value is specified in the configuration, it is returned. 1684 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1685 * <p/> 1686 * For backward compatibility, if the job configuration sets the 1687 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1688 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1689 * after converting it from bytes to MB. 1690 * @return memory required to run a map task of the job, in MB, 1691 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1692 */ 1693 public long getMemoryForMapTask() { 1694 long value = getDeprecatedMemoryValue(); 1695 if (value == DISABLED_MEMORY_LIMIT) { 1696 value = normalizeMemoryConfigValue( 1697 getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 1698 DISABLED_MEMORY_LIMIT)); 1699 } 1700 return value; 1701 } 1702 1703 public void setMemoryForMapTask(long mem) { 1704 setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem); 1705 } 1706 1707 /** 1708 * Get memory required to run a reduce task of the job, in MB. 1709 * 1710 * If a value is specified in the configuration, it is returned. 1711 * Else, it returns {@link #DISABLED_MEMORY_LIMIT}. 1712 * <p/> 1713 * For backward compatibility, if the job configuration sets the 1714 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1715 * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used 1716 * after converting it from bytes to MB. 1717 * @return memory required to run a reduce task of the job, in MB, 1718 * or {@link #DISABLED_MEMORY_LIMIT} if unset. 1719 */ 1720 public long getMemoryForReduceTask() { 1721 long value = getDeprecatedMemoryValue(); 1722 if (value == DISABLED_MEMORY_LIMIT) { 1723 value = normalizeMemoryConfigValue( 1724 getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 1725 DISABLED_MEMORY_LIMIT)); 1726 } 1727 return value; 1728 } 1729 1730 // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY, 1731 // converted into MBs. 1732 // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative 1733 // value. 1734 private long getDeprecatedMemoryValue() { 1735 long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 1736 DISABLED_MEMORY_LIMIT); 1737 oldValue = normalizeMemoryConfigValue(oldValue); 1738 if (oldValue != DISABLED_MEMORY_LIMIT) { 1739 oldValue /= (1024*1024); 1740 } 1741 return oldValue; 1742 } 1743 1744 public void setMemoryForReduceTask(long mem) { 1745 setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem); 1746 } 1747 1748 /** 1749 * Return the name of the queue to which this job is submitted. 1750 * Defaults to 'default'. 1751 * 1752 * @return name of the queue 1753 */ 1754 public String getQueueName() { 1755 return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME); 1756 } 1757 1758 /** 1759 * Set the name of the queue to which this job should be submitted. 1760 * 1761 * @param queueName Name of the queue 1762 */ 1763 public void setQueueName(String queueName) { 1764 set(JobContext.QUEUE_NAME, queueName); 1765 } 1766 1767 /** 1768 * Normalize the negative values in configuration 1769 * 1770 * @param val 1771 * @return normalized value 1772 */ 1773 public static long normalizeMemoryConfigValue(long val) { 1774 if (val < 0) { 1775 val = DISABLED_MEMORY_LIMIT; 1776 } 1777 return val; 1778 } 1779 1780 /** 1781 * Compute the number of slots required to run a single map task-attempt 1782 * of this job. 1783 * @param slotSizePerMap cluster-wide value of the amount of memory required 1784 * to run a map-task 1785 * @return the number of slots required to run a single map task-attempt 1786 * 1 if memory parameters are disabled. 1787 */ 1788 int computeNumSlotsPerMap(long slotSizePerMap) { 1789 if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) || 1790 (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) { 1791 return 1; 1792 } 1793 return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap)); 1794 } 1795 1796 /** 1797 * Compute the number of slots required to run a single reduce task-attempt 1798 * of this job. 1799 * @param slotSizePerReduce cluster-wide value of the amount of memory 1800 * required to run a reduce-task 1801 * @return the number of slots required to run a single reduce task-attempt 1802 * 1 if memory parameters are disabled 1803 */ 1804 int computeNumSlotsPerReduce(long slotSizePerReduce) { 1805 if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) || 1806 (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) { 1807 return 1; 1808 } 1809 return 1810 (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce)); 1811 } 1812 1813 /** 1814 * Find a jar that contains a class of the same name, if any. 1815 * It will return a jar file, even if that is not the first thing 1816 * on the class path that has a class with the same name. 1817 * 1818 * @param my_class the class to find. 1819 * @return a jar file that contains the class, or null. 1820 * @throws IOException 1821 */ 1822 public static String findContainingJar(Class my_class) { 1823 ClassLoader loader = my_class.getClassLoader(); 1824 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; 1825 try { 1826 for(Enumeration itr = loader.getResources(class_file); 1827 itr.hasMoreElements();) { 1828 URL url = (URL) itr.nextElement(); 1829 if ("jar".equals(url.getProtocol())) { 1830 String toReturn = url.getPath(); 1831 if (toReturn.startsWith("file:")) { 1832 toReturn = toReturn.substring("file:".length()); 1833 } 1834 // URLDecoder is a misnamed class, since it actually decodes 1835 // x-www-form-urlencoded MIME type rather than actual 1836 // URL encoding (which the file path has). Therefore it would 1837 // decode +s to ' 's which is incorrect (spaces are actually 1838 // either unencoded or encoded as "%20"). Replace +s first, so 1839 // that they are kept sacred during the decoding process. 1840 toReturn = toReturn.replaceAll("\\+", "%2B"); 1841 toReturn = URLDecoder.decode(toReturn, "UTF-8"); 1842 return toReturn.replaceAll("!.*$", ""); 1843 } 1844 } 1845 } catch (IOException e) { 1846 throw new RuntimeException(e); 1847 } 1848 return null; 1849 } 1850 1851 1852 /** 1853 * Get the memory required to run a task of this job, in bytes. See 1854 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1855 * <p/> 1856 * This method is deprecated. Now, different memory limits can be 1857 * set for map and reduce tasks of a job, in MB. 1858 * <p/> 1859 * For backward compatibility, if the job configuration sets the 1860 * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different 1861 * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 1862 * Otherwise, this method will return the larger of the values returned by 1863 * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} 1864 * after converting them into bytes. 1865 * 1866 * @return Memory required to run a task of this job, in bytes, 1867 * or {@link #DISABLED_MEMORY_LIMIT}, if unset. 1868 * @see #setMaxVirtualMemoryForTask(long) 1869 * @deprecated Use {@link #getMemoryForMapTask()} and 1870 * {@link #getMemoryForReduceTask()} 1871 */ 1872 @Deprecated 1873 public long getMaxVirtualMemoryForTask() { 1874 LOG.warn( 1875 "getMaxVirtualMemoryForTask() is deprecated. " + 1876 "Instead use getMemoryForMapTask() and getMemoryForReduceTask()"); 1877 1878 long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT); 1879 value = normalizeMemoryConfigValue(value); 1880 if (value == DISABLED_MEMORY_LIMIT) { 1881 value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask()); 1882 value = normalizeMemoryConfigValue(value); 1883 if (value != DISABLED_MEMORY_LIMIT) { 1884 value *= 1024*1024; 1885 } 1886 } 1887 return value; 1888 } 1889 1890 /** 1891 * Set the maximum amount of memory any task of this job can use. See 1892 * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} 1893 * <p/> 1894 * mapred.task.maxvmem is split into 1895 * mapreduce.map.memory.mb 1896 * and mapreduce.map.memory.mb,mapred 1897 * each of the new key are set 1898 * as mapred.task.maxvmem / 1024 1899 * as new values are in MB 1900 * 1901 * @param vmem Maximum amount of virtual memory in bytes any task of this job 1902 * can use. 1903 * @see #getMaxVirtualMemoryForTask() 1904 * @deprecated 1905 * Use {@link #setMemoryForMapTask(long mem)} and 1906 * Use {@link #setMemoryForReduceTask(long mem)} 1907 */ 1908 @Deprecated 1909 public void setMaxVirtualMemoryForTask(long vmem) { 1910 LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+ 1911 "Instead use setMemoryForMapTask() and setMemoryForReduceTask()"); 1912 if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) { 1913 setMemoryForMapTask(DISABLED_MEMORY_LIMIT); 1914 setMemoryForReduceTask(DISABLED_MEMORY_LIMIT); 1915 } 1916 1917 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) { 1918 setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb 1919 setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb 1920 }else{ 1921 this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem); 1922 } 1923 } 1924 1925 /** 1926 * @deprecated this variable is deprecated and nolonger in use. 1927 */ 1928 @Deprecated 1929 public long getMaxPhysicalMemoryForTask() { 1930 LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated." 1931 + " Refer to the APIs getMemoryForMapTask() and" 1932 + " getMemoryForReduceTask() for details."); 1933 return -1; 1934 } 1935 1936 /* 1937 * @deprecated this 1938 */ 1939 @Deprecated 1940 public void setMaxPhysicalMemoryForTask(long mem) { 1941 LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated." 1942 + " The value set is ignored. Refer to " 1943 + " setMemoryForMapTask() and setMemoryForReduceTask() for details."); 1944 } 1945 1946 static String deprecatedString(String key) { 1947 return "The variable " + key + " is no longer used."; 1948 } 1949 1950 private void checkAndWarnDeprecation() { 1951 if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { 1952 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) 1953 + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY 1954 + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); 1955 } 1956 if(get(JobConf.MAPRED_TASK_ULIMIT) != null ) { 1957 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_ULIMIT)); 1958 } 1959 if(get(JobConf.MAPRED_MAP_TASK_ULIMIT) != null ) { 1960 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_MAP_TASK_ULIMIT)); 1961 } 1962 if(get(JobConf.MAPRED_REDUCE_TASK_ULIMIT) != null ) { 1963 LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT)); 1964 } 1965 } 1966 1967 1968 } 1969