001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    import java.net.InetSocketAddress;
023    import java.security.PrivilegedExceptionAction;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.ServiceLoader;
027    
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.fs.Path;
035    import org.apache.hadoop.io.Text;
036    import org.apache.hadoop.ipc.RemoteException;
037    import org.apache.hadoop.mapred.JobConf;
038    import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039    import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040    import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041    import org.apache.hadoop.mapreduce.util.ConfigUtil;
042    import org.apache.hadoop.mapreduce.v2.LogParams;
043    import org.apache.hadoop.security.AccessControlException;
044    import org.apache.hadoop.security.UserGroupInformation;
045    import org.apache.hadoop.security.token.SecretManager.InvalidToken;
046    import org.apache.hadoop.security.token.Token;
047    
048    /**
049     * Provides a way to access information about the map/reduce cluster.
050     */
051    @InterfaceAudience.Public
052    @InterfaceStability.Evolving
053    public class Cluster {
054      
055      @InterfaceStability.Evolving
056      public static enum JobTrackerStatus {INITIALIZING, RUNNING};
057      
058      private ClientProtocolProvider clientProtocolProvider;
059      private ClientProtocol client;
060      private UserGroupInformation ugi;
061      private Configuration conf;
062      private FileSystem fs = null;
063      private Path sysDir = null;
064      private Path stagingAreaDir = null;
065      private Path jobHistoryDir = null;
066      private static final Log LOG = LogFactory.getLog(Cluster.class);
067    
068      private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
069          ServiceLoader.load(ClientProtocolProvider.class);
070      
071      static {
072        ConfigUtil.loadResources();
073      }
074      
075      public Cluster(Configuration conf) throws IOException {
076        this(null, conf);
077      }
078    
079      public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
080          throws IOException {
081        this.conf = conf;
082        this.ugi = UserGroupInformation.getCurrentUser();
083        initialize(jobTrackAddr, conf);
084      }
085      
086      private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
087          throws IOException {
088    
089        synchronized (frameworkLoader) {
090          for (ClientProtocolProvider provider : frameworkLoader) {
091            LOG.debug("Trying ClientProtocolProvider : "
092                + provider.getClass().getName());
093            ClientProtocol clientProtocol = null; 
094            try {
095              if (jobTrackAddr == null) {
096                clientProtocol = provider.create(conf);
097              } else {
098                clientProtocol = provider.create(jobTrackAddr, conf);
099              }
100    
101              if (clientProtocol != null) {
102                clientProtocolProvider = provider;
103                client = clientProtocol;
104                LOG.debug("Picked " + provider.getClass().getName()
105                    + " as the ClientProtocolProvider");
106                break;
107              }
108              else {
109                LOG.debug("Cannot pick " + provider.getClass().getName()
110                    + " as the ClientProtocolProvider - returned null protocol");
111              }
112            } 
113            catch (Exception e) {
114              LOG.info("Failed to use " + provider.getClass().getName()
115                  + " due to error: " + e.getMessage());
116            }
117          }
118        }
119    
120        if (null == clientProtocolProvider || null == client) {
121          throw new IOException(
122              "Cannot initialize Cluster. Please check your configuration for "
123                  + MRConfig.FRAMEWORK_NAME
124                  + " and the correspond server addresses.");
125        }
126      }
127    
128      ClientProtocol getClient() {
129        return client;
130      }
131      
132      Configuration getConf() {
133        return conf;
134      }
135      
136      /**
137       * Close the <code>Cluster</code>.
138       */
139      public synchronized void close() throws IOException {
140        clientProtocolProvider.close(client);
141      }
142    
143      private Job[] getJobs(JobStatus[] stats) throws IOException {
144        List<Job> jobs = new ArrayList<Job>();
145        for (JobStatus stat : stats) {
146          jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
147        }
148        return jobs.toArray(new Job[0]);
149      }
150    
151      /**
152       * Get the file system where job-specific files are stored
153       * 
154       * @return object of FileSystem
155       * @throws IOException
156       * @throws InterruptedException
157       */
158      public synchronized FileSystem getFileSystem() 
159          throws IOException, InterruptedException {
160        if (this.fs == null) {
161          try {
162            this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
163              public FileSystem run() throws IOException, InterruptedException {
164                final Path sysDir = new Path(client.getSystemDir());
165                return sysDir.getFileSystem(getConf());
166              }
167            });
168          } catch (InterruptedException e) {
169            throw new RuntimeException(e);
170          }
171        }
172        return fs;
173      }
174    
175      /**
176       * Get job corresponding to jobid.
177       * 
178       * @param jobId
179       * @return object of {@link Job}
180       * @throws IOException
181       * @throws InterruptedException
182       */
183      public Job getJob(JobID jobId) throws IOException, InterruptedException {
184        JobStatus status = client.getJobStatus(jobId);
185        if (status != null) {
186          return Job.getInstance(this, status, new JobConf(status.getJobFile()));
187        }
188        return null;
189      }
190      
191      /**
192       * Get all the queues in cluster.
193       * 
194       * @return array of {@link QueueInfo}
195       * @throws IOException
196       * @throws InterruptedException
197       */
198      public QueueInfo[] getQueues() throws IOException, InterruptedException {
199        return client.getQueues();
200      }
201      
202      /**
203       * Get queue information for the specified name.
204       * 
205       * @param name queuename
206       * @return object of {@link QueueInfo}
207       * @throws IOException
208       * @throws InterruptedException
209       */
210      public QueueInfo getQueue(String name) 
211          throws IOException, InterruptedException {
212        return client.getQueue(name);
213      }
214    
215      /**
216       * Get log parameters for the specified jobID or taskAttemptID
217       * @param jobID the job id.
218       * @param taskAttemptID the task attempt id. Optional.
219       * @return the LogParams
220       * @throws IOException
221       * @throws InterruptedException
222       */
223      public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
224          throws IOException, InterruptedException {
225        return client.getLogFileParams(jobID, taskAttemptID);
226      }
227    
228      /**
229       * Get current cluster status.
230       * 
231       * @return object of {@link ClusterMetrics}
232       * @throws IOException
233       * @throws InterruptedException
234       */
235      public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
236        return client.getClusterMetrics();
237      }
238      
239      /**
240       * Get all active trackers in the cluster.
241       * 
242       * @return array of {@link TaskTrackerInfo}
243       * @throws IOException
244       * @throws InterruptedException
245       */
246      public TaskTrackerInfo[] getActiveTaskTrackers() 
247          throws IOException, InterruptedException  {
248        return client.getActiveTrackers();
249      }
250      
251      /**
252       * Get blacklisted trackers.
253       * 
254       * @return array of {@link TaskTrackerInfo}
255       * @throws IOException
256       * @throws InterruptedException
257       */
258      public TaskTrackerInfo[] getBlackListedTaskTrackers() 
259          throws IOException, InterruptedException  {
260        return client.getBlacklistedTrackers();
261      }
262      
263      /**
264       * Get all the jobs in cluster.
265       * 
266       * @return array of {@link Job}
267       * @throws IOException
268       * @throws InterruptedException
269       * @deprecated Use {@link #getAllJobStatuses()} instead.
270       */
271      @Deprecated
272      public Job[] getAllJobs() throws IOException, InterruptedException {
273        return getJobs(client.getAllJobs());
274      }
275    
276      /**
277       * Get job status for all jobs in the cluster.
278       * @return job status for all jobs in cluster
279       * @throws IOException
280       * @throws InterruptedException
281       */
282      public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
283        return client.getAllJobs();
284      }
285    
286      /**
287       * Grab the jobtracker system directory path where 
288       * job-specific files will  be placed.
289       * 
290       * @return the system directory where job-specific files are to be placed.
291       */
292      public Path getSystemDir() throws IOException, InterruptedException {
293        if (sysDir == null) {
294          sysDir = new Path(client.getSystemDir());
295        }
296        return sysDir;
297      }
298      
299      /**
300       * Grab the jobtracker's view of the staging directory path where 
301       * job-specific files will  be placed.
302       * 
303       * @return the staging directory where job-specific files are to be placed.
304       */
305      public Path getStagingAreaDir() throws IOException, InterruptedException {
306        if (stagingAreaDir == null) {
307          stagingAreaDir = new Path(client.getStagingAreaDir());
308        }
309        return stagingAreaDir;
310      }
311    
312      /**
313       * Get the job history file path for a given job id. The job history file at 
314       * this path may or may not be existing depending on the job completion state.
315       * The file is present only for the completed jobs.
316       * @param jobId the JobID of the job submitted by the current user.
317       * @return the file path of the job history file
318       * @throws IOException
319       * @throws InterruptedException
320       */
321      public String getJobHistoryUrl(JobID jobId) throws IOException, 
322        InterruptedException {
323        if (jobHistoryDir == null) {
324          jobHistoryDir = new Path(client.getJobHistoryDir());
325        }
326        return new Path(jobHistoryDir, jobId.toString() + "_"
327                        + ugi.getShortUserName()).toString();
328      }
329    
330      /**
331       * Gets the Queue ACLs for current user
332       * @return array of QueueAclsInfo object for current user.
333       * @throws IOException
334       */
335      public QueueAclsInfo[] getQueueAclsForCurrentUser() 
336          throws IOException, InterruptedException  {
337        return client.getQueueAclsForCurrentUser();
338      }
339    
340      /**
341       * Gets the root level queues.
342       * @return array of JobQueueInfo object.
343       * @throws IOException
344       */
345      public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
346        return client.getRootQueues();
347      }
348      
349      /**
350       * Returns immediate children of queueName.
351       * @param queueName
352       * @return array of JobQueueInfo which are children of queueName
353       * @throws IOException
354       */
355      public QueueInfo[] getChildQueues(String queueName) 
356          throws IOException, InterruptedException {
357        return client.getChildQueues(queueName);
358      }
359      
360      /**
361       * Get the JobTracker's status.
362       * 
363       * @return {@link JobTrackerStatus} of the JobTracker
364       * @throws IOException
365       * @throws InterruptedException
366       */
367      public JobTrackerStatus getJobTrackerStatus() throws IOException,
368          InterruptedException {
369        return client.getJobTrackerStatus();
370      }
371      
372      /**
373       * Get the tasktracker expiry interval for the cluster
374       * @return the expiry interval in msec
375       */
376      public long getTaskTrackerExpiryInterval() throws IOException,
377          InterruptedException {
378        return client.getTaskTrackerExpiryInterval();
379      }
380    
381      /**
382       * Get a delegation token for the user from the JobTracker.
383       * @param renewer the user who can renew the token
384       * @return the new token
385       * @throws IOException
386       */
387      public Token<DelegationTokenIdentifier> 
388          getDelegationToken(Text renewer) throws IOException, InterruptedException{
389        // client has already set the service
390        return client.getDelegationToken(renewer);
391      }
392    
393      /**
394       * Renew a delegation token
395       * @param token the token to renew
396       * @return the new expiration time
397       * @throws InvalidToken
398       * @throws IOException
399       * @deprecated Use {@link Token#renew} instead
400       */
401      public long renewDelegationToken(Token<DelegationTokenIdentifier> token
402                                       ) throws InvalidToken, IOException,
403                                                InterruptedException {
404        try {
405          return client.renewDelegationToken(token);
406        } catch (RemoteException re) {
407          throw re.unwrapRemoteException(InvalidToken.class, 
408                                         AccessControlException.class);
409        }
410      }
411    
412      /**
413       * Cancel a delegation token from the JobTracker
414       * @param token the token to cancel
415       * @throws IOException
416       * @deprecated Use {@link Token#cancel} instead
417       */
418      public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
419                                        ) throws IOException,
420                                                 InterruptedException {
421        try {
422          client.cancelDelegationToken(token);
423        } catch (RemoteException re) {
424          throw re.unwrapRemoteException(InvalidToken.class,
425                                         AccessControlException.class);
426        }
427      }
428    
429    }