001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.lib.service.hadoop;
020    
021    import org.apache.hadoop.classification.InterfaceAudience;
022    import org.apache.hadoop.conf.Configuration;
023    import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
024    import org.apache.hadoop.fs.FileSystem;
025    import org.apache.hadoop.fs.Path;
026    import org.apache.hadoop.lib.server.BaseService;
027    import org.apache.hadoop.lib.server.ServiceException;
028    import org.apache.hadoop.lib.service.FileSystemAccess;
029    import org.apache.hadoop.lib.service.FileSystemAccessException;
030    import org.apache.hadoop.lib.service.Instrumentation;
031    import org.apache.hadoop.lib.service.Scheduler;
032    import org.apache.hadoop.lib.util.Check;
033    import org.apache.hadoop.lib.util.ConfigurationUtils;
034    import org.apache.hadoop.security.UserGroupInformation;
035    import org.apache.hadoop.util.VersionInfo;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    import java.io.File;
040    import java.io.IOException;
041    import java.net.URI;
042    import java.security.PrivilegedExceptionAction;
043    import java.util.Collection;
044    import java.util.HashSet;
045    import java.util.Map;
046    import java.util.Set;
047    import java.util.concurrent.ConcurrentHashMap;
048    import java.util.concurrent.TimeUnit;
049    import java.util.concurrent.atomic.AtomicInteger;
050    
051    @InterfaceAudience.Private
052    public class FileSystemAccessService extends BaseService implements FileSystemAccess {
053      private static final Logger LOG = LoggerFactory.getLogger(FileSystemAccessService.class);
054    
055      public static final String PREFIX = "hadoop";
056    
057      private static final String INSTRUMENTATION_GROUP = "hadoop";
058    
059      public static final String AUTHENTICATION_TYPE = "authentication.type";
060      public static final String KERBEROS_KEYTAB = "authentication.kerberos.keytab";
061      public static final String KERBEROS_PRINCIPAL = "authentication.kerberos.principal";
062      public static final String FS_CACHE_PURGE_FREQUENCY = "filesystem.cache.purge.frequency";
063      public static final String FS_CACHE_PURGE_TIMEOUT = "filesystem.cache.purge.timeout";
064    
065      public static final String NAME_NODE_WHITELIST = "name.node.whitelist";
066    
067      public static final String HADOOP_CONF_DIR = "config.dir";
068    
069      private static final String[] HADOOP_CONF_FILES = {"core-site.xml", "hdfs-site.xml"};
070    
071      private static final String FILE_SYSTEM_SERVICE_CREATED = "FileSystemAccessService.created";
072    
073      private static class CachedFileSystem {
074        private FileSystem fs;
075        private long lastUse;
076        private long timeout;
077        private int count;
078    
079        public CachedFileSystem(long timeout) {
080          this.timeout = timeout;
081          lastUse = -1;
082          count = 0;
083        }
084    
085        synchronized FileSystem getFileSytem(Configuration conf)
086          throws IOException {
087          if (fs == null) {
088            fs = FileSystem.get(conf);
089          }
090          lastUse = -1;
091          count++;
092          return fs;
093        }
094    
095        synchronized void release() throws IOException {
096          count--;
097          if (count == 0) {
098            if (timeout == 0) {
099              fs.close();
100              fs = null;
101              lastUse = -1;
102            }
103            else {
104              lastUse = System.currentTimeMillis();
105            }
106          }
107        }
108    
109        // to avoid race conditions in the map cache adding removing entries
110        // an entry in the cache remains forever, it just closes/opens filesystems
111        // based on their utilization. Worse case scenario, the penalty we'll
112        // pay is that the amount of entries in the cache will be the total
113        // number of users in HDFS (which seems a resonable overhead).
114        synchronized boolean purgeIfIdle() throws IOException {
115          boolean ret = false;
116          if (count == 0 && lastUse != -1 &&
117              (System.currentTimeMillis() - lastUse) > timeout) {
118            fs.close();
119            fs = null;
120            lastUse = -1;
121            ret = true;
122          }
123          return ret;
124        }
125    
126      }
127    
128      public FileSystemAccessService() {
129        super(PREFIX);
130      }
131    
132      private Collection<String> nameNodeWhitelist;
133    
134      Configuration serviceHadoopConf;
135    
136      private AtomicInteger unmanagedFileSystems = new AtomicInteger();
137    
138      private ConcurrentHashMap<String, CachedFileSystem> fsCache =
139        new ConcurrentHashMap<String, CachedFileSystem>();
140    
141      private long purgeTimeout;
142    
143      @Override
144      protected void init() throws ServiceException {
145        LOG.info("Using FileSystemAccess JARs version [{}]", VersionInfo.getVersion());
146        String security = getServiceConfig().get(AUTHENTICATION_TYPE, "simple").trim();
147        if (security.equals("kerberos")) {
148          String defaultName = getServer().getName();
149          String keytab = System.getProperty("user.home") + "/" + defaultName + ".keytab";
150          keytab = getServiceConfig().get(KERBEROS_KEYTAB, keytab).trim();
151          if (keytab.length() == 0) {
152            throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_KEYTAB);
153          }
154          String principal = defaultName + "/localhost@LOCALHOST";
155          principal = getServiceConfig().get(KERBEROS_PRINCIPAL, principal).trim();
156          if (principal.length() == 0) {
157            throw new ServiceException(FileSystemAccessException.ERROR.H01, KERBEROS_PRINCIPAL);
158          }
159          Configuration conf = new Configuration();
160          conf.set("hadoop.security.authentication", "kerberos");
161          UserGroupInformation.setConfiguration(conf);
162          try {
163            UserGroupInformation.loginUserFromKeytab(principal, keytab);
164          } catch (IOException ex) {
165            throw new ServiceException(FileSystemAccessException.ERROR.H02, ex.getMessage(), ex);
166          }
167          LOG.info("Using FileSystemAccess Kerberos authentication, principal [{}] keytab [{}]", principal, keytab);
168        } else if (security.equals("simple")) {
169          Configuration conf = new Configuration();
170          conf.set("hadoop.security.authentication", "simple");
171          UserGroupInformation.setConfiguration(conf);
172          LOG.info("Using FileSystemAccess simple/pseudo authentication, principal [{}]", System.getProperty("user.name"));
173        } else {
174          throw new ServiceException(FileSystemAccessException.ERROR.H09, security);
175        }
176    
177        String hadoopConfDirProp = getServiceConfig().get(HADOOP_CONF_DIR, getServer().getConfigDir());
178        File hadoopConfDir = new File(hadoopConfDirProp).getAbsoluteFile();
179        if (hadoopConfDir == null) {
180          hadoopConfDir = new File(getServer().getConfigDir()).getAbsoluteFile();
181        }
182        if (!hadoopConfDir.exists()) {
183          throw new ServiceException(FileSystemAccessException.ERROR.H10, hadoopConfDir);
184        }
185        try {
186          serviceHadoopConf = loadHadoopConf(hadoopConfDir);
187        } catch (IOException ex) {
188          throw new ServiceException(FileSystemAccessException.ERROR.H11, ex.toString(), ex);
189        }
190    
191        LOG.debug("FileSystemAccess FileSystem configuration:");
192        for (Map.Entry entry : serviceHadoopConf) {
193          LOG.debug("  {} = {}", entry.getKey(), entry.getValue());
194        }
195        setRequiredServiceHadoopConf(serviceHadoopConf);
196    
197        nameNodeWhitelist = toLowerCase(getServiceConfig().getTrimmedStringCollection(NAME_NODE_WHITELIST));
198      }
199    
200      private Configuration loadHadoopConf(File dir) throws IOException {
201        Configuration hadoopConf = new Configuration(false);
202        for (String file : HADOOP_CONF_FILES) {
203          File f = new File(dir, file);
204          if (f.exists()) {
205            hadoopConf.addResource(new Path(f.getAbsolutePath()));
206          }
207        }
208        return hadoopConf;
209      }
210    
211      @Override
212      public void postInit() throws ServiceException {
213        super.postInit();
214        Instrumentation instrumentation = getServer().get(Instrumentation.class);
215        instrumentation.addVariable(INSTRUMENTATION_GROUP, "unmanaged.fs", new Instrumentation.Variable<Integer>() {
216          @Override
217          public Integer getValue() {
218            return unmanagedFileSystems.get();
219          }
220        });
221        instrumentation.addSampler(INSTRUMENTATION_GROUP, "unmanaged.fs", 60, new Instrumentation.Variable<Long>() {
222          @Override
223          public Long getValue() {
224            return (long) unmanagedFileSystems.get();
225          }
226        });
227        Scheduler scheduler = getServer().get(Scheduler.class);
228        int purgeInterval = getServiceConfig().getInt(FS_CACHE_PURGE_FREQUENCY, 60);
229        purgeTimeout = getServiceConfig().getLong(FS_CACHE_PURGE_TIMEOUT, 60);
230        purgeTimeout = (purgeTimeout > 0) ? purgeTimeout : 0;
231        if (purgeTimeout > 0) {
232          scheduler.schedule(new FileSystemCachePurger(),
233                             purgeInterval, purgeInterval, TimeUnit.SECONDS);
234        }
235      }
236    
237      private class FileSystemCachePurger implements Runnable {
238    
239        @Override
240        public void run() {
241          int count = 0;
242          for (CachedFileSystem cacheFs : fsCache.values()) {
243            try {
244              count += cacheFs.purgeIfIdle() ? 1 : 0;
245            } catch (Throwable ex) {
246              LOG.warn("Error while purging filesystem, " + ex.toString(), ex);
247            }
248          }
249          LOG.debug("Purged [{}} filesystem instances", count);
250        }
251      }
252    
253      private Set<String> toLowerCase(Collection<String> collection) {
254        Set<String> set = new HashSet<String>();
255        for (String value : collection) {
256          set.add(value.toLowerCase());
257        }
258        return set;
259      }
260    
261      @Override
262      public Class getInterface() {
263        return FileSystemAccess.class;
264      }
265    
266      @Override
267      public Class[] getServiceDependencies() {
268        return new Class[]{Instrumentation.class, Scheduler.class};
269      }
270    
271      protected UserGroupInformation getUGI(String user) throws IOException {
272        return UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());
273      }
274    
275      protected void setRequiredServiceHadoopConf(Configuration conf) {
276        conf.set("fs.hdfs.impl.disable.cache", "true");
277      }
278    
279      private static final String HTTPFS_FS_USER = "httpfs.fs.user";
280    
281      protected FileSystem createFileSystem(Configuration namenodeConf)
282        throws IOException {
283        String user = UserGroupInformation.getCurrentUser().getShortUserName();
284        CachedFileSystem newCachedFS = new CachedFileSystem(purgeTimeout);
285        CachedFileSystem cachedFS = fsCache.putIfAbsent(user, newCachedFS);
286        if (cachedFS == null) {
287          cachedFS = newCachedFS;
288        }
289        Configuration conf = new Configuration(namenodeConf);
290        conf.set(HTTPFS_FS_USER, user);
291        return cachedFS.getFileSytem(conf);
292      }
293    
294      protected void closeFileSystem(FileSystem fs) throws IOException {
295        if (fsCache.containsKey(fs.getConf().get(HTTPFS_FS_USER))) {
296          fsCache.get(fs.getConf().get(HTTPFS_FS_USER)).release();
297        }
298      }
299    
300      protected void validateNamenode(String namenode) throws FileSystemAccessException {
301        if (nameNodeWhitelist.size() > 0 && !nameNodeWhitelist.contains("*")) {
302          if (!nameNodeWhitelist.contains(namenode.toLowerCase())) {
303            throw new FileSystemAccessException(FileSystemAccessException.ERROR.H05, namenode, "not in whitelist");
304          }
305        }
306      }
307    
308      protected void checkNameNodeHealth(FileSystem fileSystem) throws FileSystemAccessException {
309      }
310    
311      @Override
312      public <T> T execute(String user, final Configuration conf, final FileSystemExecutor<T> executor)
313        throws FileSystemAccessException {
314        Check.notEmpty(user, "user");
315        Check.notNull(conf, "conf");
316        Check.notNull(executor, "executor");
317        if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
318          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
319        }
320        if (conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) == null ||
321            conf.getTrimmed(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY).length() == 0) {
322          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H06,
323                                              CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
324        }
325        try {
326          validateNamenode(
327            new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).
328              getAuthority());
329          UserGroupInformation ugi = getUGI(user);
330          return ugi.doAs(new PrivilegedExceptionAction<T>() {
331            public T run() throws Exception {
332              FileSystem fs = createFileSystem(conf);
333              Instrumentation instrumentation = getServer().get(Instrumentation.class);
334              Instrumentation.Cron cron = instrumentation.createCron();
335              try {
336                checkNameNodeHealth(fs);
337                cron.start();
338                return executor.execute(fs);
339              } finally {
340                cron.stop();
341                instrumentation.addCron(INSTRUMENTATION_GROUP, executor.getClass().getSimpleName(), cron);
342                closeFileSystem(fs);
343              }
344            }
345          });
346        } catch (FileSystemAccessException ex) {
347          throw ex;
348        } catch (Exception ex) {
349          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H03, ex);
350        }
351      }
352    
353      public FileSystem createFileSystemInternal(String user, final Configuration conf)
354        throws IOException, FileSystemAccessException {
355        Check.notEmpty(user, "user");
356        Check.notNull(conf, "conf");
357        if (!conf.getBoolean(FILE_SYSTEM_SERVICE_CREATED, false)) {
358          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H04);
359        }
360        try {
361          validateNamenode(
362            new URI(conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)).getAuthority());
363          UserGroupInformation ugi = getUGI(user);
364          return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
365            public FileSystem run() throws Exception {
366              return createFileSystem(conf);
367            }
368          });
369        } catch (IOException ex) {
370          throw ex;
371        } catch (FileSystemAccessException ex) {
372          throw ex;
373        } catch (Exception ex) {
374          throw new FileSystemAccessException(FileSystemAccessException.ERROR.H08, ex.getMessage(), ex);
375        }
376      }
377    
378      @Override
379      public FileSystem createFileSystem(String user, final Configuration conf) throws IOException,
380        FileSystemAccessException {
381        unmanagedFileSystems.incrementAndGet();
382        return createFileSystemInternal(user, conf);
383      }
384    
385      @Override
386      public void releaseFileSystem(FileSystem fs) throws IOException {
387        unmanagedFileSystems.decrementAndGet();
388        closeFileSystem(fs);
389      }
390    
391      @Override
392      public Configuration getFileSystemConfiguration() {
393        Configuration conf = new Configuration(true);
394        ConfigurationUtils.copy(serviceHadoopConf, conf);
395        conf.setBoolean(FILE_SYSTEM_SERVICE_CREATED, true);
396        return conf;
397      }
398    
399    }