001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.lib.service.instrumentation;
020    
021    import org.apache.hadoop.classification.InterfaceAudience;
022    import org.apache.hadoop.lib.server.BaseService;
023    import org.apache.hadoop.lib.server.ServiceException;
024    import org.apache.hadoop.lib.service.Instrumentation;
025    import org.apache.hadoop.lib.service.Scheduler;
026    import org.apache.hadoop.util.Time;
027    import org.json.simple.JSONAware;
028    import org.json.simple.JSONObject;
029    import org.json.simple.JSONStreamAware;
030    
031    import java.io.IOException;
032    import java.io.Writer;
033    import java.util.ArrayList;
034    import java.util.LinkedHashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.atomic.AtomicLong;
040    import java.util.concurrent.locks.Lock;
041    import java.util.concurrent.locks.ReentrantLock;
042    
043    @InterfaceAudience.Private
044    public class InstrumentationService extends BaseService implements Instrumentation {
045      public static final String PREFIX = "instrumentation";
046      public static final String CONF_TIMERS_SIZE = "timers.size";
047    
048      private int timersSize;
049      private Lock counterLock;
050      private Lock timerLock;
051      private Lock variableLock;
052      private Lock samplerLock;
053      private Map<String, Map<String, AtomicLong>> counters;
054      private Map<String, Map<String, Timer>> timers;
055      private Map<String, Map<String, VariableHolder>> variables;
056      private Map<String, Map<String, Sampler>> samplers;
057      private List<Sampler> samplersList;
058      private Map<String, Map<String, ?>> all;
059    
060      public InstrumentationService() {
061        super(PREFIX);
062      }
063    
064      @Override
065      @SuppressWarnings("unchecked")
066      public void init() throws ServiceException {
067        timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
068        counterLock = new ReentrantLock();
069        timerLock = new ReentrantLock();
070        variableLock = new ReentrantLock();
071        samplerLock = new ReentrantLock();
072        Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
073        counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
074        timers = new ConcurrentHashMap<String, Map<String, Timer>>();
075        variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
076        samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
077        samplersList = new ArrayList<Sampler>();
078        all = new LinkedHashMap<String, Map<String, ?>>();
079        all.put("os-env", System.getenv());
080        all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
081        all.put("jvm", jvmVariables);
082        all.put("counters", (Map) counters);
083        all.put("timers", (Map) timers);
084        all.put("variables", (Map) variables);
085        all.put("samplers", (Map) samplers);
086    
087        jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
088          public Long getValue() {
089            return Runtime.getRuntime().freeMemory();
090          }
091        }));
092        jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
093          public Long getValue() {
094            return Runtime.getRuntime().maxMemory();
095          }
096        }));
097        jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
098          public Long getValue() {
099            return Runtime.getRuntime().totalMemory();
100          }
101        }));
102      }
103    
104      @Override
105      public void postInit() throws ServiceException {
106        Scheduler scheduler = getServer().get(Scheduler.class);
107        if (scheduler != null) {
108          scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
109        }
110      }
111    
112      @Override
113      public Class getInterface() {
114        return Instrumentation.class;
115      }
116    
117      @SuppressWarnings("unchecked")
118      private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
119        boolean locked = false;
120        try {
121          Map<String, T> groupMap = map.get(group);
122          if (groupMap == null) {
123            lock.lock();
124            locked = true;
125            groupMap = map.get(group);
126            if (groupMap == null) {
127              groupMap = new ConcurrentHashMap<String, T>();
128              map.put(group, groupMap);
129            }
130          }
131          T element = groupMap.get(name);
132          if (element == null) {
133            if (!locked) {
134              lock.lock();
135              locked = true;
136            }
137            element = groupMap.get(name);
138            if (element == null) {
139              try {
140                if (klass == Timer.class) {
141                  element = (T) new Timer(timersSize);
142                } else {
143                  element = klass.newInstance();
144                }
145              } catch (Exception ex) {
146                throw new RuntimeException(ex);
147              }
148              groupMap.put(name, element);
149            }
150          }
151          return element;
152        } finally {
153          if (locked) {
154            lock.unlock();
155          }
156        }
157      }
158    
159      static class Cron implements Instrumentation.Cron {
160        long start;
161        long lapStart;
162        long own;
163        long total;
164    
165        public Cron start() {
166          if (total != 0) {
167            throw new IllegalStateException("Cron already used");
168          }
169          if (start == 0) {
170            start = Time.now();
171            lapStart = start;
172          } else if (lapStart == 0) {
173            lapStart = Time.now();
174          }
175          return this;
176        }
177    
178        public Cron stop() {
179          if (total != 0) {
180            throw new IllegalStateException("Cron already used");
181          }
182          if (lapStart > 0) {
183            own += Time.now() - lapStart;
184            lapStart = 0;
185          }
186          return this;
187        }
188    
189        void end() {
190          stop();
191          total = Time.now() - start;
192        }
193    
194      }
195    
196      static class Timer implements JSONAware, JSONStreamAware {
197        static final int LAST_TOTAL = 0;
198        static final int LAST_OWN = 1;
199        static final int AVG_TOTAL = 2;
200        static final int AVG_OWN = 3;
201    
202        Lock lock = new ReentrantLock();
203        private long[] own;
204        private long[] total;
205        private int last;
206        private boolean full;
207        private int size;
208    
209        public Timer(int size) {
210          this.size = size;
211          own = new long[size];
212          total = new long[size];
213          for (int i = 0; i < size; i++) {
214            own[i] = -1;
215            total[i] = -1;
216          }
217          last = -1;
218        }
219    
220        long[] getValues() {
221          lock.lock();
222          try {
223            long[] values = new long[4];
224            values[LAST_TOTAL] = total[last];
225            values[LAST_OWN] = own[last];
226            int limit = (full) ? size : (last + 1);
227            for (int i = 0; i < limit; i++) {
228              values[AVG_TOTAL] += total[i];
229              values[AVG_OWN] += own[i];
230            }
231            values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
232            values[AVG_OWN] = values[AVG_OWN] / limit;
233            return values;
234          } finally {
235            lock.unlock();
236          }
237        }
238    
239        void addCron(Cron cron) {
240          cron.end();
241          lock.lock();
242          try {
243            last = (last + 1) % size;
244            full = full || last == (size - 1);
245            total[last] = cron.total;
246            own[last] = cron.own;
247          } finally {
248            lock.unlock();
249          }
250        }
251    
252        @SuppressWarnings("unchecked")
253        private JSONObject getJSON() {
254          long[] values = getValues();
255          JSONObject json = new JSONObject();
256          json.put("lastTotal", values[0]);
257          json.put("lastOwn", values[1]);
258          json.put("avgTotal", values[2]);
259          json.put("avgOwn", values[3]);
260          return json;
261        }
262    
263        @Override
264        public String toJSONString() {
265          return getJSON().toJSONString();
266        }
267    
268        @Override
269        public void writeJSONString(Writer out) throws IOException {
270          getJSON().writeJSONString(out);
271        }
272    
273      }
274    
275      @Override
276      public Cron createCron() {
277        return new Cron();
278      }
279    
280      @Override
281      public void incr(String group, String name, long count) {
282        AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
283        counter.addAndGet(count);
284      }
285    
286      @Override
287      public void addCron(String group, String name, Instrumentation.Cron cron) {
288        Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
289        timer.addCron((Cron) cron);
290      }
291    
292      static class VariableHolder<E> implements JSONAware, JSONStreamAware {
293        Variable<E> var;
294    
295        public VariableHolder() {
296        }
297    
298        public VariableHolder(Variable<E> var) {
299          this.var = var;
300        }
301    
302        @SuppressWarnings("unchecked")
303        private JSONObject getJSON() {
304          JSONObject json = new JSONObject();
305          json.put("value", var.getValue());
306          return json;
307        }
308    
309        @Override
310        public String toJSONString() {
311          return getJSON().toJSONString();
312        }
313    
314        @Override
315        public void writeJSONString(Writer out) throws IOException {
316          out.write(toJSONString());
317        }
318    
319      }
320    
321      @Override
322      public void addVariable(String group, String name, Variable<?> variable) {
323        VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
324        holder.var = variable;
325      }
326    
327      static class Sampler implements JSONAware, JSONStreamAware {
328        Variable<Long> variable;
329        long[] values;
330        private AtomicLong sum;
331        private int last;
332        private boolean full;
333    
334        void init(int size, Variable<Long> variable) {
335          this.variable = variable;
336          values = new long[size];
337          sum = new AtomicLong();
338          last = 0;
339        }
340    
341        void sample() {
342          int index = last;
343          long valueGoingOut = values[last];
344          full = full || last == (values.length - 1);
345          last = (last + 1) % values.length;
346          values[index] = variable.getValue();
347          sum.addAndGet(-valueGoingOut + values[index]);
348        }
349    
350        double getRate() {
351          return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
352        }
353    
354        @SuppressWarnings("unchecked")
355        private JSONObject getJSON() {
356          JSONObject json = new JSONObject();
357          json.put("sampler", getRate());
358          json.put("size", (full) ? values.length : last);
359          return json;
360        }
361    
362        @Override
363        public String toJSONString() {
364          return getJSON().toJSONString();
365        }
366    
367        @Override
368        public void writeJSONString(Writer out) throws IOException {
369          out.write(toJSONString());
370        }
371      }
372    
373      @Override
374      public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
375        Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
376        samplerLock.lock();
377        try {
378          sampler.init(samplingSize, variable);
379          samplersList.add(sampler);
380        } finally {
381          samplerLock.unlock();
382        }
383      }
384    
385      class SamplersRunnable implements Runnable {
386    
387        @Override
388        public void run() {
389          samplerLock.lock();
390          try {
391            for (Sampler sampler : samplersList) {
392              sampler.sample();
393            }
394          } finally {
395            samplerLock.unlock();
396          }
397        }
398      }
399    
400      @Override
401      public Map<String, Map<String, ?>> getSnapshot() {
402        return all;
403      }
404    
405    
406    }