001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.lib.service.instrumentation; 020 021 import org.apache.hadoop.classification.InterfaceAudience; 022 import org.apache.hadoop.lib.server.BaseService; 023 import org.apache.hadoop.lib.server.ServiceException; 024 import org.apache.hadoop.lib.service.Instrumentation; 025 import org.apache.hadoop.lib.service.Scheduler; 026 import org.apache.hadoop.util.Time; 027 import org.json.simple.JSONAware; 028 import org.json.simple.JSONObject; 029 import org.json.simple.JSONStreamAware; 030 031 import java.io.IOException; 032 import java.io.Writer; 033 import java.util.ArrayList; 034 import java.util.LinkedHashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.concurrent.ConcurrentHashMap; 038 import java.util.concurrent.TimeUnit; 039 import java.util.concurrent.atomic.AtomicLong; 040 import java.util.concurrent.locks.Lock; 041 import java.util.concurrent.locks.ReentrantLock; 042 043 @InterfaceAudience.Private 044 public class InstrumentationService extends BaseService implements Instrumentation { 045 public static final String PREFIX = "instrumentation"; 046 public static final String CONF_TIMERS_SIZE = "timers.size"; 047 048 private int timersSize; 049 private Lock counterLock; 050 private Lock timerLock; 051 private Lock variableLock; 052 private Lock samplerLock; 053 private Map<String, Map<String, AtomicLong>> counters; 054 private Map<String, Map<String, Timer>> timers; 055 private Map<String, Map<String, VariableHolder>> variables; 056 private Map<String, Map<String, Sampler>> samplers; 057 private List<Sampler> samplersList; 058 private Map<String, Map<String, ?>> all; 059 060 public InstrumentationService() { 061 super(PREFIX); 062 } 063 064 @Override 065 @SuppressWarnings("unchecked") 066 public void init() throws ServiceException { 067 timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10); 068 counterLock = new ReentrantLock(); 069 timerLock = new ReentrantLock(); 070 variableLock = new ReentrantLock(); 071 samplerLock = new ReentrantLock(); 072 Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>(); 073 counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>(); 074 timers = new ConcurrentHashMap<String, Map<String, Timer>>(); 075 variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>(); 076 samplers = new ConcurrentHashMap<String, Map<String, Sampler>>(); 077 samplersList = new ArrayList<Sampler>(); 078 all = new LinkedHashMap<String, Map<String, ?>>(); 079 all.put("os-env", System.getenv()); 080 all.put("sys-props", (Map<String, ?>) (Map) System.getProperties()); 081 all.put("jvm", jvmVariables); 082 all.put("counters", (Map) counters); 083 all.put("timers", (Map) timers); 084 all.put("variables", (Map) variables); 085 all.put("samplers", (Map) samplers); 086 087 jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 088 public Long getValue() { 089 return Runtime.getRuntime().freeMemory(); 090 } 091 })); 092 jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 093 public Long getValue() { 094 return Runtime.getRuntime().maxMemory(); 095 } 096 })); 097 jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 098 public Long getValue() { 099 return Runtime.getRuntime().totalMemory(); 100 } 101 })); 102 } 103 104 @Override 105 public void postInit() throws ServiceException { 106 Scheduler scheduler = getServer().get(Scheduler.class); 107 if (scheduler != null) { 108 scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS); 109 } 110 } 111 112 @Override 113 public Class getInterface() { 114 return Instrumentation.class; 115 } 116 117 @SuppressWarnings("unchecked") 118 private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) { 119 boolean locked = false; 120 try { 121 Map<String, T> groupMap = map.get(group); 122 if (groupMap == null) { 123 lock.lock(); 124 locked = true; 125 groupMap = map.get(group); 126 if (groupMap == null) { 127 groupMap = new ConcurrentHashMap<String, T>(); 128 map.put(group, groupMap); 129 } 130 } 131 T element = groupMap.get(name); 132 if (element == null) { 133 if (!locked) { 134 lock.lock(); 135 locked = true; 136 } 137 element = groupMap.get(name); 138 if (element == null) { 139 try { 140 if (klass == Timer.class) { 141 element = (T) new Timer(timersSize); 142 } else { 143 element = klass.newInstance(); 144 } 145 } catch (Exception ex) { 146 throw new RuntimeException(ex); 147 } 148 groupMap.put(name, element); 149 } 150 } 151 return element; 152 } finally { 153 if (locked) { 154 lock.unlock(); 155 } 156 } 157 } 158 159 static class Cron implements Instrumentation.Cron { 160 long start; 161 long lapStart; 162 long own; 163 long total; 164 165 public Cron start() { 166 if (total != 0) { 167 throw new IllegalStateException("Cron already used"); 168 } 169 if (start == 0) { 170 start = Time.now(); 171 lapStart = start; 172 } else if (lapStart == 0) { 173 lapStart = Time.now(); 174 } 175 return this; 176 } 177 178 public Cron stop() { 179 if (total != 0) { 180 throw new IllegalStateException("Cron already used"); 181 } 182 if (lapStart > 0) { 183 own += Time.now() - lapStart; 184 lapStart = 0; 185 } 186 return this; 187 } 188 189 void end() { 190 stop(); 191 total = Time.now() - start; 192 } 193 194 } 195 196 static class Timer implements JSONAware, JSONStreamAware { 197 static final int LAST_TOTAL = 0; 198 static final int LAST_OWN = 1; 199 static final int AVG_TOTAL = 2; 200 static final int AVG_OWN = 3; 201 202 Lock lock = new ReentrantLock(); 203 private long[] own; 204 private long[] total; 205 private int last; 206 private boolean full; 207 private int size; 208 209 public Timer(int size) { 210 this.size = size; 211 own = new long[size]; 212 total = new long[size]; 213 for (int i = 0; i < size; i++) { 214 own[i] = -1; 215 total[i] = -1; 216 } 217 last = -1; 218 } 219 220 long[] getValues() { 221 lock.lock(); 222 try { 223 long[] values = new long[4]; 224 values[LAST_TOTAL] = total[last]; 225 values[LAST_OWN] = own[last]; 226 int limit = (full) ? size : (last + 1); 227 for (int i = 0; i < limit; i++) { 228 values[AVG_TOTAL] += total[i]; 229 values[AVG_OWN] += own[i]; 230 } 231 values[AVG_TOTAL] = values[AVG_TOTAL] / limit; 232 values[AVG_OWN] = values[AVG_OWN] / limit; 233 return values; 234 } finally { 235 lock.unlock(); 236 } 237 } 238 239 void addCron(Cron cron) { 240 cron.end(); 241 lock.lock(); 242 try { 243 last = (last + 1) % size; 244 full = full || last == (size - 1); 245 total[last] = cron.total; 246 own[last] = cron.own; 247 } finally { 248 lock.unlock(); 249 } 250 } 251 252 @SuppressWarnings("unchecked") 253 private JSONObject getJSON() { 254 long[] values = getValues(); 255 JSONObject json = new JSONObject(); 256 json.put("lastTotal", values[0]); 257 json.put("lastOwn", values[1]); 258 json.put("avgTotal", values[2]); 259 json.put("avgOwn", values[3]); 260 return json; 261 } 262 263 @Override 264 public String toJSONString() { 265 return getJSON().toJSONString(); 266 } 267 268 @Override 269 public void writeJSONString(Writer out) throws IOException { 270 getJSON().writeJSONString(out); 271 } 272 273 } 274 275 @Override 276 public Cron createCron() { 277 return new Cron(); 278 } 279 280 @Override 281 public void incr(String group, String name, long count) { 282 AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters); 283 counter.addAndGet(count); 284 } 285 286 @Override 287 public void addCron(String group, String name, Instrumentation.Cron cron) { 288 Timer timer = getToAdd(group, name, Timer.class, timerLock, timers); 289 timer.addCron((Cron) cron); 290 } 291 292 static class VariableHolder<E> implements JSONAware, JSONStreamAware { 293 Variable<E> var; 294 295 public VariableHolder() { 296 } 297 298 public VariableHolder(Variable<E> var) { 299 this.var = var; 300 } 301 302 @SuppressWarnings("unchecked") 303 private JSONObject getJSON() { 304 JSONObject json = new JSONObject(); 305 json.put("value", var.getValue()); 306 return json; 307 } 308 309 @Override 310 public String toJSONString() { 311 return getJSON().toJSONString(); 312 } 313 314 @Override 315 public void writeJSONString(Writer out) throws IOException { 316 out.write(toJSONString()); 317 } 318 319 } 320 321 @Override 322 public void addVariable(String group, String name, Variable<?> variable) { 323 VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables); 324 holder.var = variable; 325 } 326 327 static class Sampler implements JSONAware, JSONStreamAware { 328 Variable<Long> variable; 329 long[] values; 330 private AtomicLong sum; 331 private int last; 332 private boolean full; 333 334 void init(int size, Variable<Long> variable) { 335 this.variable = variable; 336 values = new long[size]; 337 sum = new AtomicLong(); 338 last = 0; 339 } 340 341 void sample() { 342 int index = last; 343 long valueGoingOut = values[last]; 344 full = full || last == (values.length - 1); 345 last = (last + 1) % values.length; 346 values[index] = variable.getValue(); 347 sum.addAndGet(-valueGoingOut + values[index]); 348 } 349 350 double getRate() { 351 return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last)); 352 } 353 354 @SuppressWarnings("unchecked") 355 private JSONObject getJSON() { 356 JSONObject json = new JSONObject(); 357 json.put("sampler", getRate()); 358 json.put("size", (full) ? values.length : last); 359 return json; 360 } 361 362 @Override 363 public String toJSONString() { 364 return getJSON().toJSONString(); 365 } 366 367 @Override 368 public void writeJSONString(Writer out) throws IOException { 369 out.write(toJSONString()); 370 } 371 } 372 373 @Override 374 public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) { 375 Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers); 376 samplerLock.lock(); 377 try { 378 sampler.init(samplingSize, variable); 379 samplersList.add(sampler); 380 } finally { 381 samplerLock.unlock(); 382 } 383 } 384 385 class SamplersRunnable implements Runnable { 386 387 @Override 388 public void run() { 389 samplerLock.lock(); 390 try { 391 for (Sampler sampler : samplersList) { 392 sampler.sample(); 393 } 394 } finally { 395 samplerLock.unlock(); 396 } 397 } 398 } 399 400 @Override 401 public Map<String, Map<String, ?>> getSnapshot() { 402 return all; 403 } 404 405 406 }