001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.lib.service.scheduler;
020    
021    import org.apache.hadoop.classification.InterfaceAudience;
022    import org.apache.hadoop.lib.lang.RunnableCallable;
023    import org.apache.hadoop.lib.server.BaseService;
024    import org.apache.hadoop.lib.server.Server;
025    import org.apache.hadoop.lib.server.ServiceException;
026    import org.apache.hadoop.lib.service.Instrumentation;
027    import org.apache.hadoop.lib.service.Scheduler;
028    import org.apache.hadoop.lib.util.Check;
029    import org.apache.hadoop.util.Time;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    import java.text.MessageFormat;
034    import java.util.concurrent.Callable;
035    import java.util.concurrent.ScheduledExecutorService;
036    import java.util.concurrent.ScheduledThreadPoolExecutor;
037    import java.util.concurrent.TimeUnit;
038    
039    @InterfaceAudience.Private
040    public class SchedulerService extends BaseService implements Scheduler {
041      private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
042    
043      private static final String INST_GROUP = "scheduler";
044    
045      public static final String PREFIX = "scheduler";
046    
047      public static final String CONF_THREADS = "threads";
048    
049      private ScheduledExecutorService scheduler;
050    
051      public SchedulerService() {
052        super(PREFIX);
053      }
054    
055      @Override
056      public void init() throws ServiceException {
057        int threads = getServiceConfig().getInt(CONF_THREADS, 5);
058        scheduler = new ScheduledThreadPoolExecutor(threads);
059        LOG.debug("Scheduler started");
060      }
061    
062      @Override
063      public void destroy() {
064        try {
065          long limit = Time.now() + 30 * 1000;
066          scheduler.shutdownNow();
067          while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
068            LOG.debug("Waiting for scheduler to shutdown");
069            if (Time.now() > limit) {
070              LOG.warn("Gave up waiting for scheduler to shutdown");
071              break;
072            }
073          }
074          if (scheduler.isTerminated()) {
075            LOG.debug("Scheduler shutdown");
076          }
077        } catch (InterruptedException ex) {
078          LOG.warn(ex.getMessage(), ex);
079        }
080      }
081    
082      @Override
083      public Class[] getServiceDependencies() {
084        return new Class[]{Instrumentation.class};
085      }
086    
087      @Override
088      public Class getInterface() {
089        return Scheduler.class;
090      }
091    
092      @Override
093      public void schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit) {
094        Check.notNull(callable, "callable");
095        if (!scheduler.isShutdown()) {
096          LOG.debug("Scheduling callable [{}], interval [{}] seconds, delay [{}] in [{}]",
097                    new Object[]{callable, delay, interval, unit});
098          Runnable r = new Runnable() {
099            public void run() {
100              String instrName = callable.getClass().getSimpleName();
101              Instrumentation instr = getServer().get(Instrumentation.class);
102              if (getServer().getStatus() == Server.Status.HALTED) {
103                LOG.debug("Skipping [{}], server status [{}]", callable, getServer().getStatus());
104                instr.incr(INST_GROUP, instrName + ".skips", 1);
105              } else {
106                LOG.debug("Executing [{}]", callable);
107                instr.incr(INST_GROUP, instrName + ".execs", 1);
108                Instrumentation.Cron cron = instr.createCron().start();
109                try {
110                  callable.call();
111                } catch (Exception ex) {
112                  instr.incr(INST_GROUP, instrName + ".fails", 1);
113                  LOG.error("Error executing [{}], {}", new Object[]{callable, ex.getMessage(), ex});
114                } finally {
115                  instr.addCron(INST_GROUP, instrName, cron.stop());
116                }
117              }
118            }
119          };
120          scheduler.scheduleWithFixedDelay(r, delay, interval, unit);
121        } else {
122          throw new IllegalStateException(
123            MessageFormat.format("Scheduler shutting down, ignoring scheduling of [{}]", callable));
124        }
125      }
126    
127      @Override
128      public void schedule(Runnable runnable, long delay, long interval, TimeUnit unit) {
129        schedule((Callable<?>) new RunnableCallable(runnable), delay, interval, unit);
130      }
131    
132    }