001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.pipes; 020 021 import java.io.IOException; 022 import java.net.URI; 023 import java.net.URISyntaxException; 024 import java.net.URL; 025 import java.net.URLClassLoader; 026 import java.security.AccessController; 027 import java.security.PrivilegedAction; 028 import java.util.StringTokenizer; 029 030 import org.apache.commons.cli.BasicParser; 031 import org.apache.commons.cli.CommandLine; 032 import org.apache.commons.cli.Option; 033 import org.apache.commons.cli.OptionBuilder; 034 import org.apache.commons.cli.Options; 035 import org.apache.commons.cli.ParseException; 036 import org.apache.commons.cli.Parser; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 import org.apache.hadoop.classification.InterfaceAudience; 040 import org.apache.hadoop.classification.InterfaceStability; 041 import org.apache.hadoop.conf.Configuration; 042 import org.apache.hadoop.conf.Configured; 043 import org.apache.hadoop.fs.FileSystem; 044 import org.apache.hadoop.fs.Path; 045 import org.apache.hadoop.io.Text; 046 import org.apache.hadoop.mapred.FileInputFormat; 047 import org.apache.hadoop.mapred.FileOutputFormat; 048 import org.apache.hadoop.mapred.InputFormat; 049 import org.apache.hadoop.mapred.JobClient; 050 import org.apache.hadoop.mapred.JobConf; 051 import org.apache.hadoop.mapred.Mapper; 052 import org.apache.hadoop.mapred.OutputFormat; 053 import org.apache.hadoop.mapred.Partitioner; 054 import org.apache.hadoop.mapred.Reducer; 055 import org.apache.hadoop.mapred.RunningJob; 056 import org.apache.hadoop.mapred.lib.HashPartitioner; 057 import org.apache.hadoop.mapred.lib.LazyOutputFormat; 058 import org.apache.hadoop.mapred.lib.NullOutputFormat; 059 import org.apache.hadoop.mapreduce.MRJobConfig; 060 import org.apache.hadoop.mapreduce.filecache.DistributedCache; 061 import org.apache.hadoop.util.GenericOptionsParser; 062 import org.apache.hadoop.util.Tool; 063 064 /** 065 * The main entry point and job submitter. It may either be used as a command 066 * line-based or API-based method to launch Pipes jobs. 067 */ 068 @InterfaceAudience.Public 069 @InterfaceStability.Stable 070 public class Submitter extends Configured implements Tool { 071 072 protected static final Log LOG = LogFactory.getLog(Submitter.class); 073 public static final String PRESERVE_COMMANDFILE = 074 "mapreduce.pipes.commandfile.preserve"; 075 public static final String EXECUTABLE = "mapreduce.pipes.executable"; 076 public static final String INTERPRETOR = 077 "mapreduce.pipes.executable.interpretor"; 078 public static final String IS_JAVA_MAP = "mapreduce.pipes.isjavamapper"; 079 public static final String IS_JAVA_RR = "mapreduce.pipes.isjavarecordreader"; 080 public static final String IS_JAVA_RW = "mapreduce.pipes.isjavarecordwriter"; 081 public static final String IS_JAVA_REDUCE = "mapreduce.pipes.isjavareducer"; 082 public static final String PARTITIONER = "mapreduce.pipes.partitioner"; 083 public static final String INPUT_FORMAT = "mapreduce.pipes.inputformat"; 084 public static final String PORT = "mapreduce.pipes.command.port"; 085 086 public Submitter() { 087 this(new Configuration()); 088 } 089 090 public Submitter(Configuration conf) { 091 setConf(conf); 092 } 093 094 /** 095 * Get the URI of the application's executable. 096 * @param conf 097 * @return the URI where the application's executable is located 098 */ 099 public static String getExecutable(JobConf conf) { 100 return conf.get(Submitter.EXECUTABLE); 101 } 102 103 /** 104 * Set the URI for the application's executable. Normally this is a hdfs: 105 * location. 106 * @param conf 107 * @param executable The URI of the application's executable. 108 */ 109 public static void setExecutable(JobConf conf, String executable) { 110 conf.set(Submitter.EXECUTABLE, executable); 111 } 112 113 /** 114 * Set whether the job is using a Java RecordReader. 115 * @param conf the configuration to modify 116 * @param value the new value 117 */ 118 public static void setIsJavaRecordReader(JobConf conf, boolean value) { 119 conf.setBoolean(Submitter.IS_JAVA_RR, value); 120 } 121 122 /** 123 * Check whether the job is using a Java RecordReader 124 * @param conf the configuration to check 125 * @return is it a Java RecordReader? 126 */ 127 public static boolean getIsJavaRecordReader(JobConf conf) { 128 return conf.getBoolean(Submitter.IS_JAVA_RR, false); 129 } 130 131 /** 132 * Set whether the Mapper is written in Java. 133 * @param conf the configuration to modify 134 * @param value the new value 135 */ 136 public static void setIsJavaMapper(JobConf conf, boolean value) { 137 conf.setBoolean(Submitter.IS_JAVA_MAP, value); 138 } 139 140 /** 141 * Check whether the job is using a Java Mapper. 142 * @param conf the configuration to check 143 * @return is it a Java Mapper? 144 */ 145 public static boolean getIsJavaMapper(JobConf conf) { 146 return conf.getBoolean(Submitter.IS_JAVA_MAP, false); 147 } 148 149 /** 150 * Set whether the Reducer is written in Java. 151 * @param conf the configuration to modify 152 * @param value the new value 153 */ 154 public static void setIsJavaReducer(JobConf conf, boolean value) { 155 conf.setBoolean(Submitter.IS_JAVA_REDUCE, value); 156 } 157 158 /** 159 * Check whether the job is using a Java Reducer. 160 * @param conf the configuration to check 161 * @return is it a Java Reducer? 162 */ 163 public static boolean getIsJavaReducer(JobConf conf) { 164 return conf.getBoolean(Submitter.IS_JAVA_REDUCE, false); 165 } 166 167 /** 168 * Set whether the job will use a Java RecordWriter. 169 * @param conf the configuration to modify 170 * @param value the new value to set 171 */ 172 public static void setIsJavaRecordWriter(JobConf conf, boolean value) { 173 conf.setBoolean(Submitter.IS_JAVA_RW, value); 174 } 175 176 /** 177 * Will the reduce use a Java RecordWriter? 178 * @param conf the configuration to check 179 * @return true, if the output of the job will be written by Java 180 */ 181 public static boolean getIsJavaRecordWriter(JobConf conf) { 182 return conf.getBoolean(Submitter.IS_JAVA_RW, false); 183 } 184 185 /** 186 * Set the configuration, if it doesn't already have a value for the given 187 * key. 188 * @param conf the configuration to modify 189 * @param key the key to set 190 * @param value the new "default" value to set 191 */ 192 private static void setIfUnset(JobConf conf, String key, String value) { 193 if (conf.get(key) == null) { 194 conf.set(key, value); 195 } 196 } 197 198 /** 199 * Save away the user's original partitioner before we override it. 200 * @param conf the configuration to modify 201 * @param cls the user's partitioner class 202 */ 203 static void setJavaPartitioner(JobConf conf, Class cls) { 204 conf.set(Submitter.PARTITIONER, cls.getName()); 205 } 206 207 /** 208 * Get the user's original partitioner. 209 * @param conf the configuration to look in 210 * @return the class that the user submitted 211 */ 212 static Class<? extends Partitioner> getJavaPartitioner(JobConf conf) { 213 return conf.getClass(Submitter.PARTITIONER, 214 HashPartitioner.class, 215 Partitioner.class); 216 } 217 218 /** 219 * Does the user want to keep the command file for debugging? If this is 220 * true, pipes will write a copy of the command data to a file in the 221 * task directory named "downlink.data", which may be used to run the C++ 222 * program under the debugger. You probably also want to set 223 * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from 224 * being deleted. 225 * To run using the data file, set the environment variable 226 * "mapreduce.pipes.commandfile" to point to the file. 227 * @param conf the configuration to check 228 * @return will the framework save the command file? 229 */ 230 public static boolean getKeepCommandFile(JobConf conf) { 231 return conf.getBoolean(Submitter.PRESERVE_COMMANDFILE, false); 232 } 233 234 /** 235 * Set whether to keep the command file for debugging 236 * @param conf the configuration to modify 237 * @param keep the new value 238 */ 239 public static void setKeepCommandFile(JobConf conf, boolean keep) { 240 conf.setBoolean(Submitter.PRESERVE_COMMANDFILE, keep); 241 } 242 243 /** 244 * Submit a job to the map/reduce cluster. All of the necessary modifications 245 * to the job to run under pipes are made to the configuration. 246 * @param conf the job to submit to the cluster (MODIFIED) 247 * @throws IOException 248 * @deprecated Use {@link Submitter#runJob(JobConf)} 249 */ 250 @Deprecated 251 public static RunningJob submitJob(JobConf conf) throws IOException { 252 return runJob(conf); 253 } 254 255 /** 256 * Submit a job to the map/reduce cluster. All of the necessary modifications 257 * to the job to run under pipes are made to the configuration. 258 * @param conf the job to submit to the cluster (MODIFIED) 259 * @throws IOException 260 */ 261 public static RunningJob runJob(JobConf conf) throws IOException { 262 setupPipesJob(conf); 263 return JobClient.runJob(conf); 264 } 265 266 /** 267 * Submit a job to the Map-Reduce framework. 268 * This returns a handle to the {@link RunningJob} which can be used to track 269 * the running-job. 270 * 271 * @param conf the job configuration. 272 * @return a handle to the {@link RunningJob} which can be used to track the 273 * running-job. 274 * @throws IOException 275 */ 276 public static RunningJob jobSubmit(JobConf conf) throws IOException { 277 setupPipesJob(conf); 278 return new JobClient(conf).submitJob(conf); 279 } 280 281 private static void setupPipesJob(JobConf conf) throws IOException { 282 // default map output types to Text 283 if (!getIsJavaMapper(conf)) { 284 conf.setMapRunnerClass(PipesMapRunner.class); 285 // Save the user's partitioner and hook in our's. 286 setJavaPartitioner(conf, conf.getPartitionerClass()); 287 conf.setPartitionerClass(PipesPartitioner.class); 288 } 289 if (!getIsJavaReducer(conf)) { 290 conf.setReducerClass(PipesReducer.class); 291 if (!getIsJavaRecordWriter(conf)) { 292 conf.setOutputFormat(NullOutputFormat.class); 293 } 294 } 295 String textClassname = Text.class.getName(); 296 setIfUnset(conf, MRJobConfig.MAP_OUTPUT_KEY_CLASS, textClassname); 297 setIfUnset(conf, MRJobConfig.MAP_OUTPUT_VALUE_CLASS, textClassname); 298 setIfUnset(conf, MRJobConfig.OUTPUT_KEY_CLASS, textClassname); 299 setIfUnset(conf, MRJobConfig.OUTPUT_VALUE_CLASS, textClassname); 300 301 // Use PipesNonJavaInputFormat if necessary to handle progress reporting 302 // from C++ RecordReaders ... 303 if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) { 304 conf.setClass(Submitter.INPUT_FORMAT, 305 conf.getInputFormat().getClass(), InputFormat.class); 306 conf.setInputFormat(PipesNonJavaInputFormat.class); 307 } 308 309 String exec = getExecutable(conf); 310 if (exec == null) { 311 throw new IllegalArgumentException("No application program defined."); 312 } 313 // add default debug script only when executable is expressed as 314 // <path>#<executable> 315 if (exec.contains("#")) { 316 // set default gdb commands for map and reduce task 317 String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script"; 318 setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript); 319 setIfUnset(conf, MRJobConfig.REDUCE_DEBUG_SCRIPT,defScript); 320 } 321 URI[] fileCache = DistributedCache.getCacheFiles(conf); 322 if (fileCache == null) { 323 fileCache = new URI[1]; 324 } else { 325 URI[] tmp = new URI[fileCache.length+1]; 326 System.arraycopy(fileCache, 0, tmp, 1, fileCache.length); 327 fileCache = tmp; 328 } 329 try { 330 fileCache[0] = new URI(exec); 331 } catch (URISyntaxException e) { 332 IOException ie = new IOException("Problem parsing execable URI " + exec); 333 ie.initCause(e); 334 throw ie; 335 } 336 DistributedCache.setCacheFiles(fileCache, conf); 337 } 338 339 /** 340 * A command line parser for the CLI-based Pipes job submitter. 341 */ 342 static class CommandLineParser { 343 private Options options = new Options(); 344 345 void addOption(String longName, boolean required, String description, 346 String paramName) { 347 Option option = OptionBuilder.withArgName(paramName).hasArgs(1).withDescription(description).isRequired(required).create(longName); 348 options.addOption(option); 349 } 350 351 void addArgument(String name, boolean required, String description) { 352 Option option = OptionBuilder.withArgName(name).hasArgs(1).withDescription(description).isRequired(required).create(); 353 options.addOption(option); 354 355 } 356 357 Parser createParser() { 358 Parser result = new BasicParser(); 359 return result; 360 } 361 362 void printUsage() { 363 // The CLI package should do this for us, but I can't figure out how 364 // to make it print something reasonable. 365 System.out.println("bin/hadoop pipes"); 366 System.out.println(" [-input <path>] // Input directory"); 367 System.out.println(" [-output <path>] // Output directory"); 368 System.out.println(" [-jar <jar file> // jar filename"); 369 System.out.println(" [-inputformat <class>] // InputFormat class"); 370 System.out.println(" [-map <class>] // Java Map class"); 371 System.out.println(" [-partitioner <class>] // Java Partitioner"); 372 System.out.println(" [-reduce <class>] // Java Reduce class"); 373 System.out.println(" [-writer <class>] // Java RecordWriter"); 374 System.out.println(" [-program <executable>] // executable URI"); 375 System.out.println(" [-reduces <num>] // number of reduces"); 376 System.out.println(" [-lazyOutput <true/false>] // createOutputLazily"); 377 System.out.println(); 378 GenericOptionsParser.printGenericCommandUsage(System.out); 379 } 380 } 381 382 private static <InterfaceType> 383 Class<? extends InterfaceType> getClass(CommandLine cl, String key, 384 JobConf conf, 385 Class<InterfaceType> cls 386 ) throws ClassNotFoundException { 387 return conf.getClassByName(cl.getOptionValue(key)).asSubclass(cls); 388 } 389 390 @Override 391 public int run(String[] args) throws Exception { 392 CommandLineParser cli = new CommandLineParser(); 393 if (args.length == 0) { 394 cli.printUsage(); 395 return 1; 396 } 397 cli.addOption("input", false, "input path to the maps", "path"); 398 cli.addOption("output", false, "output path from the reduces", "path"); 399 400 cli.addOption("jar", false, "job jar file", "path"); 401 cli.addOption("inputformat", false, "java classname of InputFormat", 402 "class"); 403 //cli.addArgument("javareader", false, "is the RecordReader in Java"); 404 cli.addOption("map", false, "java classname of Mapper", "class"); 405 cli.addOption("partitioner", false, "java classname of Partitioner", 406 "class"); 407 cli.addOption("reduce", false, "java classname of Reducer", "class"); 408 cli.addOption("writer", false, "java classname of OutputFormat", "class"); 409 cli.addOption("program", false, "URI to application executable", "class"); 410 cli.addOption("reduces", false, "number of reduces", "num"); 411 cli.addOption("jobconf", false, 412 "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.", 413 "key=val"); 414 cli.addOption("lazyOutput", false, "Optional. Create output lazily", 415 "boolean"); 416 Parser parser = cli.createParser(); 417 try { 418 419 GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args); 420 CommandLine results = parser.parse(cli.options, genericParser.getRemainingArgs()); 421 422 JobConf job = new JobConf(getConf()); 423 424 if (results.hasOption("input")) { 425 FileInputFormat.setInputPaths(job, results.getOptionValue("input")); 426 } 427 if (results.hasOption("output")) { 428 FileOutputFormat.setOutputPath(job, 429 new Path(results.getOptionValue("output"))); 430 } 431 if (results.hasOption("jar")) { 432 job.setJar(results.getOptionValue("jar")); 433 } 434 if (results.hasOption("inputformat")) { 435 setIsJavaRecordReader(job, true); 436 job.setInputFormat(getClass(results, "inputformat", job, 437 InputFormat.class)); 438 } 439 if (results.hasOption("javareader")) { 440 setIsJavaRecordReader(job, true); 441 } 442 if (results.hasOption("map")) { 443 setIsJavaMapper(job, true); 444 job.setMapperClass(getClass(results, "map", job, Mapper.class)); 445 } 446 if (results.hasOption("partitioner")) { 447 job.setPartitionerClass(getClass(results, "partitioner", job, 448 Partitioner.class)); 449 } 450 if (results.hasOption("reduce")) { 451 setIsJavaReducer(job, true); 452 job.setReducerClass(getClass(results, "reduce", job, Reducer.class)); 453 } 454 if (results.hasOption("reduces")) { 455 job.setNumReduceTasks(Integer.parseInt( 456 results.getOptionValue("reduces"))); 457 } 458 if (results.hasOption("writer")) { 459 setIsJavaRecordWriter(job, true); 460 job.setOutputFormat(getClass(results, "writer", job, 461 OutputFormat.class)); 462 } 463 464 if (results.hasOption("lazyOutput")) { 465 if (Boolean.parseBoolean(results.getOptionValue("lazyOutput"))) { 466 LazyOutputFormat.setOutputFormatClass(job, 467 job.getOutputFormat().getClass()); 468 } 469 } 470 471 if (results.hasOption("program")) { 472 setExecutable(job, results.getOptionValue("program")); 473 } 474 if (results.hasOption("jobconf")) { 475 LOG.warn("-jobconf option is deprecated, please use -D instead."); 476 String options = results.getOptionValue("jobconf"); 477 StringTokenizer tokenizer = new StringTokenizer(options, ","); 478 while (tokenizer.hasMoreTokens()) { 479 String keyVal = tokenizer.nextToken().trim(); 480 String[] keyValSplit = keyVal.split("="); 481 job.set(keyValSplit[0], keyValSplit[1]); 482 } 483 } 484 // if they gave us a jar file, include it into the class path 485 String jarFile = job.getJar(); 486 if (jarFile != null) { 487 final URL[] urls = new URL[]{ FileSystem.getLocal(job). 488 pathToFile(new Path(jarFile)).toURL()}; 489 //FindBugs complains that creating a URLClassLoader should be 490 //in a doPrivileged() block. 491 ClassLoader loader = 492 AccessController.doPrivileged( 493 new PrivilegedAction<ClassLoader>() { 494 public ClassLoader run() { 495 return new URLClassLoader(urls); 496 } 497 } 498 ); 499 job.setClassLoader(loader); 500 } 501 502 runJob(job); 503 return 0; 504 } catch (ParseException pe) { 505 LOG.info("Error : " + pe); 506 cli.printUsage(); 507 return 1; 508 } 509 510 } 511 512 /** 513 * Submit a pipes job based on the command line arguments. 514 * @param args 515 */ 516 public static void main(String[] args) throws Exception { 517 int exitCode = new Submitter().run(args); 518 System.exit(exitCode); 519 } 520 521 }