001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.lib.service.hadoop; 020 021 import org.apache.hadoop.classification.InterfaceAudience; 022 import org.apache.hadoop.conf.Configuration; 023 import org.apache.hadoop.fs.CommonConfigurationKeysPublic; 024 import org.apache.hadoop.fs.FileSystem; 025 import org.apache.hadoop.fs.Path; 026 import org.apache.hadoop.lib.server.BaseService; 027 import org.apache.hadoop.lib.server.ServiceException; 028 import org.apache.hadoop.lib.service.FileSystemAccess; 029 import org.apache.hadoop.lib.service.FileSystemAccessException; 030 import org.apache.hadoop.lib.service.Instrumentation; 031 import org.apache.hadoop.lib.service.Scheduler; 032 import org.apache.hadoop.lib.util.Check; 033 import org.apache.hadoop.lib.util.ConfigurationUtils; 034 import org.apache.hadoop.security.UserGroupInformation; 035 import org.apache.hadoop.util.VersionInfo; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 import java.io.File; 040 import java.io.IOException; 041 import java.net.URI; 042 import java.security.PrivilegedExceptionAction; 043 import java.util.Collection; 044 import java.util.HashSet; 045 import java.util.Map; 046 import java.util.Set; 047 import java.util.concurrent.ConcurrentHashMap; 048 import java.util.concurrent.TimeUnit; 049 import java.util.concurrent.atomic.AtomicInteger; 050 051 @InterfaceAudience.Private 052 public class FileSystemAccessService extends BaseService implements FileSystemAccess { 053 private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class); 054 055 public static final String PREFIX = "hadoop"; 056 057 private static final String INSTRUMENTATION_GROUP = "hadoop"; 058 059 public static final String AUTHENTICATION_TYPE = "authentication.type"; 060 public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab"; 061 public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal"; 062 public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency"; 063 public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout"; 064 065 public static final String NAME_NODE_WHITELIST = "name.node.whitelist"; 066 067 public static final String HADOOP_CONF_DIR = "config.dir"; 068 069 private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"}; 070 071 private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created"; 072 073 private static class CachedFileSystem { 074 private FileSystem fs; 075 private long lastUse; 076 private long timeout; 077 private int count; 078 079 public CachedFileSystem(long timeout) { 080 this.timeout = timeout; 081 lastUse = -1; 082 count = 0; 083 } 084 085 synchronized FileSystem getFileSytem(Configuration conf) 086 throws IOException { 087 if (fs == null) { 088 fs = FileSystem.get(conf); 089 } 090 lastUse = -1; 091 count++; 092 return fs; 093 } 094 095 synchronized void release() throws IOException { 096 count--; 097 if (count == 0) { 098 if (timeout == 0) { 099 fs.close(); 100 fs = null; 101 lastUse = -1; 102 } 103 else { 104 lastUse = System.currentTimeMillis(); 105 } 106 } 107 } 108 109 // to avoid race conditions in the map cache adding removing entries 110 // an entry in the cache remains forever, it just closes/opens filesystems 111 // based on their utilization. Worse case scenario, the penalty we'll 112 // pay is that the amount of entries in the cache will be the total 113 // number of users in HDFS (which seems a resonable overhead). 114 synchronized boolean purgeIfIdle() throws IOException { 115 boolean ret = false; 116 if (count == 0 && lastUse != -1 && 117 (System.currentTimeMillis() - lastUse) > timeout) { 118 fs.close(); 119 fs = null; 120 lastUse = -1; 121 ret = true; 122 } 123 return ret; 124 } 125 126 } 127 128 public FileSystemAccessService() { 129 super(PREFIX); 130 } 131 132 private Collection<String> nameNodeWhitelist; 133 134 Configuration serviceHadoopConf; 135 136 private AtomicInteger unmanagedFileSystems = new AtomicInteger(); 137 138 private ConcurrentHashMap<String, CachedFileSystem> fsCache = 139 new ConcurrentHashMap<String, CachedFileSystem>(); 140 141 private long purgeTimeout; 142 143 @Override 144 protected void init() throws ServiceException { 145 LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion()); 146 String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim(); 147 if (security.equals("kerberos")) { 148 String defaultName = getServer().getName(); 149 String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab"; 150 keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim(); 151 if (keytab.length() == 0) { 152 throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB); 153 } 154 String principal = defaultName + "/localhost@LOCALHOST"; 155 principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim(); 156 if (principal.length() == 0) { 157 throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL); 158 } 159 Configuration conf = new Configuration(); 160 conf.set("hadoop.security.authentication", "kerberos"); 161 UserGroupInformation.setConfiguration(conf); 162 try { 163 UserGroupInformation.loginUserFromKeytab(principal, keytab); 164 } catch (IOException ex) { 165 throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex); 166 } 167 LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab); 168 } else if (security.equals("simple")) { 169 Configuration conf = new Configuration(); 170 conf.set("hadoop.security.authentication", "simple"); 171 UserGroupInformation.setConfiguration(conf); 172 LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name")); 173 } else { 174 throw new ServiceException(FileSystemAccessException.ERROR.H09, security); 175 } 176 177 String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir()); 178 File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile(); 179 if (hadoopConfDir == null) { 180 hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile(); 181 } 182 if (!hadoopConfDir.exists()) { 183 throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir); 184 } 185 try { 186 serviceHadoopConf = loadHadoopConf(hadoopConfDir); 187 } catch (IOException ex) { 188 throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex); 189 } 190 191 LOG.debug("FileSystemAccess FileSystem configuration:"); 192 for (Map.Entry entry : serviceHadoopConf) { 193 LOG.debug(" {} = {}", entry.getKey(), entry.getValue()); 194 } 195 setRequiredServiceHadoopConf(serviceHadoopConf); 196 197 nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST)); 198 } 199 200 private Configuration loadHadoopConf(File dir) throws IOException { 201 Configuration hadoopConf = new Configuration(false); 202 for (String file : HADOOP_CONF_FILES) { 203 File f = new File(dir, file); 204 if (f.exists()) { 205 hadoopConf.addResource(new Path(f.getAbsolutePath())); 206 } 207 } 208 return hadoopConf; 209 } 210 211 @Override 212 public void postInit() throws ServiceException { 213 super.postInit(); 214 Instrumentation instrumentation = getServer().get(Instrumentation.class); 215 instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() { 216 @Override 217 public Integer getValue() { 218 return unmanagedFileSystems.get(); 219 } 220 }); 221 instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() { 222 @Override 223 public Long getValue() { 224 return (long) unmanagedFileSystems.get(); 225 } 226 }); 227 Scheduler scheduler = getServer().get(Scheduler.class); 228 int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60); 229 purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60); 230 purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0; 231 if (purgeTimeout > 0) { 232 scheduler.schedule(new FileSystemCachePurger(), 233 purgeInterval, purgeInterval, TimeUnit.SECONDS); 234 } 235 } 236 237 private class FileSystemCachePurger implements Runnable { 238 239 @Override 240 public void run() { 241 int count = 0; 242 for (CachedFileSystem cacheFs : fsCache.values()) { 243 try { 244 count += cacheFs.purgeIfIdle() ? 1 : 0; 245 } catch (Throwable ex) { 246 LOG.warn("Error while purging filesystem, " + ex.toString(), ex); 247 } 248 } 249 LOG.debug("Purged [{}} filesystem instances", count); 250 } 251 } 252 253 private Set<String> toLowerCase(Collection<String> collection) { 254 Set<String> set = new HashSet<String>(); 255 for (String value : collection) { 256 set.add(value.toLowerCase()); 257 } 258 return set; 259 } 260 261 @Override 262 public Class getInterface() { 263 return FileSystemAccess.class; 264 } 265 266 @Override 267 public Class[] getServiceDependencies() { 268 return new Class[]{Instrumentation.class, Scheduler.class}; 269 } 270 271 protected UserGroupInformation getUGI(String user) throws IOException { 272 return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); 273 } 274 275 protected void setRequiredServiceHadoopConf(Configuration conf) { 276 conf.set("fs.hdfs.impl.disable.cache", "true"); 277 } 278 279 private static final String HTTPFS_FS_USER = "httpfs.fs.user"; 280 281 protected FileSystem createFileSystem(Configuration namenodeConf) 282 throws IOException { 283 String user = UserGroupInformation.getCurrentUser().getShortUserName(); 284 CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout); 285 CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS); 286 if (cachedFS == null) { 287 cachedFS = newCachedFS; 288 } 289 Configuration conf = new Configuration(namenodeConf); 290 conf.set(HTTPFS_FS_USER, user); 291 return cachedFS.getFileSytem(conf); 292 } 293 294 protected void closeFileSystem(FileSystem fs) throws IOException { 295 if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) { 296 fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release(); 297 } 298 } 299 300 protected void validateNamenode(String namenode) throws FileSystemAccessException { 301 if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) { 302 if (!nameNodeWhitelist.contains(namenode.toLowerCase())) { 303 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist"); 304 } 305 } 306 } 307 308 protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException { 309 } 310 311 @Override 312 public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor) 313 throws FileSystemAccessException { 314 Check.notEmpty(user, "user"); 315 Check.notNull(conf, "conf"); 316 Check.notNull(executor, "executor"); 317 if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) { 318 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04); 319 } 320 if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null || 321 conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) { 322 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06, 323 CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY); 324 } 325 try { 326 validateNamenode( 327 new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)). 328 getAuthority()); 329 UserGroupInformation ugi = getUGI(user); 330 return ugi.doAs(new PrivilegedExceptionAction<T>() { 331 public T run() throws Exception { 332 FileSystem fs = createFileSystem(conf); 333 Instrumentation instrumentation = getServer().get(Instrumentation.class); 334 Instrumentation.Cron cron = instrumentation.createCron(); 335 try { 336 checkNameNodeHealth(fs); 337 cron.start(); 338 return executor.execute(fs); 339 } finally { 340 cron.stop(); 341 instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron); 342 closeFileSystem(fs); 343 } 344 } 345 }); 346 } catch (FileSystemAccessException ex) { 347 throw ex; 348 } catch (Exception ex) { 349 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex); 350 } 351 } 352 353 public FileSystem createFileSystemInternal(String user, final Configuration conf) 354 throws IOException, FileSystemAccessException { 355 Check.notEmpty(user, "user"); 356 Check.notNull(conf, "conf"); 357 if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) { 358 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04); 359 } 360 try { 361 validateNamenode( 362 new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority()); 363 UserGroupInformation ugi = getUGI(user); 364 return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 365 public FileSystem run() throws Exception { 366 return createFileSystem(conf); 367 } 368 }); 369 } catch (IOException ex) { 370 throw ex; 371 } catch (FileSystemAccessException ex) { 372 throw ex; 373 } catch (Exception ex) { 374 throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex); 375 } 376 } 377 378 @Override 379 public FileSystem createFileSystem(String user, final Configuration conf) throws IOException, 380 FileSystemAccessException { 381 unmanagedFileSystems.incrementAndGet(); 382 return createFileSystemInternal(user, conf); 383 } 384 385 @Override 386 public void releaseFileSystem(FileSystem fs) throws IOException { 387 unmanagedFileSystems.decrementAndGet(); 388 closeFileSystem(fs); 389 } 390 391 @Override 392 public Configuration getFileSystemConfiguration() { 393 Configuration conf = new Configuration(true); 394 ConfigurationUtils.copy(serviceHadoopConf, conf); 395 conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true); 396 return conf; 397 } 398 399 }