Changeset 2827
- Timestamp:
- 02/17/2009 03:42:56 PM (1 year ago)
- Files:
-
- HadoopToys/src/edu/unl/TestMovie/TestMovie.java (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
HadoopToys/src/edu/unl/TestMovie/TestMovie.java
r2806 r2827 18 18 import org.apache.hadoop.mapred.TextOutputFormat; 19 19 import org.apache.hadoop.mapred.*; 20 20 import org.apache.hadoop.conf.*; 21 21 public class TestMovie{ 22 22 … … 25 25 public static double TimeStep=0.002; //Time step for each simulation 26 26 public static double mass=(double)18.0; //Mass of atoms 27 public static double Kai=(double) 100; //Boundary of the simulation box27 public static double Kai=(double)2000; //Boundary of the simulation box 28 28 public static double Rc=(double)10; //Long range force cutoff radius 29 29 public static int totalatom; //Total number of atoms in simulating system … … 260 260 } 261 261 } 262 // Reduce class263 264 262 public class MDADReduce extends MapReduceBase implements Reducer<LongWritable,Text,LongWritable,Text> { 265 public void reduce(LongWritable rkey,Text rvalue,OutputCollector<LongWritable,Text>routput, Reporter reporter)throws IOException{ 266 JobConf rconf=new JobConf(); 267 String res=null; 263 264 public void reduce(LongWritable rkey, Iterator<Text> values, 265 OutputCollector<LongWritable, Text> routput, Reporter reporter) 266 throws IOException { 267 Configuration rconf=new Configuration(); 268 String res=rkey.toString(); 269 int itt=0; 268 270 FileSystem rfs=FileSystem.get(rconf); 269 Path movie=new Path(outputPath+"result.xyz"); 270 FSDataOutputStream fsos; 271 FileStatus rfstatus=new FileStatus(rfs); 272 res=rkey.toString()+rvalue.toString(); 271 Path movie=new Path(moviePath+"result.xyz"); 272 String title1=" "+totalatom+"\n"; 273 String title2="Simulation Time: t ="+"\t"+(numIteration*TimeStep)+"\n"; 274 StringTokenizer rtoken=new StringTokenizer(values.toString()); 275 while(itt<3){ 276 res=res+rtoken.nextToken(); 277 itt++; 278 } 279 280 // FileStatus rfstatus=new FileStatus(rfs); 273 281 try{ 274 282 if(!(rfs.exists(movie))){ 275 fsos=fs.creat(movie); 283 FSDataOutputStream fsos=rfs.create(movie); 284 fsos.writeUTF(title1); 285 fsos.writeUTF(title2); 286 fsos.writeUTF(res); 287 fsos.flush(); 276 288 } 277 289 else { 278 fsos=fs.open(movie); 279 } 280 OutputStream os=fsos.getWrappedStream(); 281 os.write_string(res); 282 os.flush(); 283 // ObjectOutputStream oos=new ObjectOutputStream(os); 284 // oos.writeObject(); 285 os.close(); 290 FSDataOutputStream fsos=rfs.append(movie); 291 if ((rkey.get())==1){ 292 fsos.writeUTF(title1); 293 fsos.writeUTF(title2); 294 fsos.flush(); 295 } 296 fsos.writeUTF(res); 297 fsos.flush(); 298 fsos.close(); 299 } 286 300 }catch(Exception ex){ 287 301 System.out.println("Creating Moviefile failed"); 288 302 } 289 routput.collect(rkey, rvalue);303 routput.collect(rkey,(Text) values); 290 304 } 291 305 } 292 306 293 //main() method which control the iteration and configuration of hadoop job 294 295 public static void main(String[] args) throws Exception{ 307 public static void main(String[] args) throws Exception{ 296 308 int i=0; 297 309 TestMovie t1=new TestMovie(); 298 310 String inputPath="/user/che/mdad/in"; 299 311 String outputPath="/user/che/mdad/out"; 300 312 moviePath=outputPath; 301 313 //Parameter input 302 314 … … 309 321 } 310 322 // else if ("-m".equals(args[i])){ 323 311 324 // t1.moviePath=outputPath+args[++i]; 312 }325 // } 313 326 } 314 327 … … 323 336 // conf.setCombinerClass(); 324 337 conf.setReducerClass(MDADReduce.class); 338 conf.setNumReduceTasks(1); 339 // conf.setNumTasksToExcutePerJvm(-1); 325 340 326 341 conf.set("md.iteration",Integer.toString(i)); … … 340 355 Path fileToCache= filestat.getPath(); 341 356 DistributedCache.addCacheFile(fileToCache.toUri(),conf); 342 }343 FileInputFormat.setInputPaths(conf,new Path(inputPath));344 357 FileOutputFormat.setOutputPath(conf,new Path(outputPath + Integer.toString(i))); 345 } 358 } 359 } 346 360 else{ 347 361 … … 370 384 try{ 371 385 JobClient.runJob(conf); 372 Job jCheck=new Job(conf);373 int jState=jCheck.getState();374 if (jState==4){375 i--;386 // Job jCheck=new Job(conf); 387 // int jState=jCheck.getState(); 388 // if (jState==4){ 389 // i--; 376 390 //delete the output file directory 377 }391 // } 378 392 }catch(Exception e){ 379 393 e.printStackTrace();
